· kafka python

Kafka: A basic tutorial

In this post we’re going to learn how to launch Kafka locally and write to and read from a topic using one of the Python drivers.

To make things easy for myself, I’ve created a Docker Compose template that launches 3 containers:

  • broker - our Kafka broker

  • zookeeper - used by Kafka for leader election

  • jupyter - notebooks for connecting to our Kafka broker

This template can be downloaded from the mneedham/basic-kafka-tutorial repository, and reads as follows:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper-tutorial
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker-tutorial
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  jupyter:
    container_name: jupyter-tutorial
    image: jupyter/scipy-notebook:${JUPYTER_VERSION:-latest}
    volumes:
      - ./notebooks:/home/jovyan/
    ports:
      - "8888:8888"

The easiest way to use this template is to first clone the repository locally using the following command:

git clone git@github.com:mneedham/basic-kafka-tutorial.git && cd basic-kafka-tutorial

And then launch the Docker containers using the following command:

docker-compose up

This command outputs a lot of stuff, and we’ll need to keep a close eye on the first few lines so that we can grab the Jupyter Notebook token. The output we’re looking for looks like this:

jupyter-tutorial | [I 10:35:27.804 NotebookApp] The Jupyter Notebook is running at:
jupyter-tutorial | [I 10:35:27.804 NotebookApp] http://(4a031e4b5326 or 127.0.0.1):8888/?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1
jupyter-tutorial | [I 10:35:27.804 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
jupyter-tutorial | [C 10:35:27.811 NotebookApp]
jupyter-tutorial |
jupyter-tutorial |     To access the notebook, open this file in a browser:
jupyter-tutorial |         file:///home/jovyan/.local/share/jupyter/runtime/nbserver-6-open.html
jupyter-tutorial |     Or copy and paste one of these URLs:
jupyter-tutorial |         http://(4a031e4b5326 or 127.0.0.1):8888/?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1

We then need to navigate to http://localhost:8888?token=<token>; in our web browser. Based on the output above we’d navigate to http://localhost:8888?token=7907fef53948168308c829d48d9978b8f9c475b7c621c7d1.

Once we’ve done that we can open Kafka Tutorial.ipynb. Let’s go through that notebook.

Once we’ve installed the kafka-python library we write the following functions to create a Kafka Producer and publish a message to a topic:

from kafka import KafkaConsumer, KafkaProducer
import json
import uuid


def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
        print('Message published successfully.')
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


def connect_kafka_producer(server):
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[server], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer

Now we’ll create a producer:

server = 'broker:9093'
kafka_producer = connect_kafka_producer(server)

And publish a JSON message to the broker:

topic_name = "intro-to-kafka"
message = json.dumps({"name": "Mark"})
publish_message(kafka_producer, topic_name, str(uuid.uuid4()), message)

And now we’ll create a consumer to process all the message on that topic:

consumer = KafkaConsumer(topic_name,
                         auto_offset_reset='earliest',
                         bootstrap_servers=[server],
                         api_version=(0, 10),
                         value_deserializer = json.loads,
                         consumer_timeout_ms=1000)
for msg in consumer:
    print(msg.key.decode("utf-8"), msg.value)

If we run this code we’ll see the following output:

716a46df-2b10-4270-b294-b04ced51bd73 {'name': 'Mark'}

And now we’re ready to go and do some more fun stuff with streams!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket