· python kafka avro til

Confluent Kafka: DeprecationWarning: AvroProducer has been deprecated. Use AvroSerializer instead.

I’ve been creating a demo showing how to ingest Avro-encoded data from Apache Kafka into Apache Pinot and ran into a deprecation warning. In this blog post, I’ll show how to update code using the Confluent Kafka Python client to get rid of that warning.

I started by installing the following libraries:

pip install confluent-kafka avro urllib3 requests

And then my code to publish an Avro encoded event to Kafka looked like this:

from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro

schema_name = "telemetry.avsc"
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://localhost:8081',
    'broker.address.family': 'v4'
}

value_schema = avro.load(schema_name)
producer = AvroProducer(producer_config, default_value_schema=value_schema)

event = {
 # all my fields
}
producer.produce(topic="telemetry", value=event)

When I ran this script data did make its way into Kafka, but I also got the following warning on the AvroProducer line:

Output
DeprecationWarning: AvroProducer has been deprecated. Use AvroSerializer instead.

It took me a little while to figure out where AvroSerializer lived and how to use it, but I eventually ended up with the following code for creating the producer:

from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import (
    SerializationContext,
    MessageField,
)
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

with open("telemetry.avsc") as f:
    value_schema = f.read()
avro_serializer = AvroSerializer(schema_registry_client, value_schema)

producer_conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_conf)

I ran this code and got the following error message:

Output
Traceback (most recent call last):
  File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/new.py", line 5, in <module>
    from confluent_kafka.schema_registry.avro import AvroSerializer
  File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/.venv/lib/python3.11/site-packages/confluent_kafka/schema_registry/avro.py", line 22, in <module>
    from fastavro import (parse_schema,
ModuleNotFoundError: No module named 'fastavro'

Let’s get fastavro installed:

pip install fastavro

When I ran the script again, the deprecation warning was gone, but I got the following error message instead:

Output
Traceback (most recent call last):
  File "/Users/markhneedham/projects/hugo-blog/blog/content/2023/07/25/new.py", line 47, in <module>
    producer.produce(topic="telemetry", value=event)
TypeError: a bytes-like object is required, not 'dict'

In the previous API, our messages were encoded inside the producer, but here we need to do it explicitly using the Avro serialiser. We, therefore, need to update this line:

producer.produce(topic="telemetry", value=event)

To read like this:

producer.produce(
    topic="telemetry",
    value=avro_serializer(event, SerializationContext("telemetry", MessageField.VALUE)),
)

Now if I run the script, messages make their way into Kafka and I don’t have any warnings or errors!

Note

If you want to see the entirety of both code samples, I’ve included them both in a GitHub Gist.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket