· python quixstreams til

Quix Streams: Consuming and Producing JSON messages

I’ve been meaning to take Quix Streams for a spin for a while and got the chance while building a recent demo. Quix Streams is a library for building streaming applications on time-series data, but I wanted to use it to do some basic consuming and producing of JSON messages. That’s what we’re going to do in this blog post.

We’re going to use Redpanda to store our messages. We’ll launch a Redpanda instance using the following Docker Compose file:

version: '3.7'
    container_name: "redpanda-quix"
    image: docker.redpanda.com/vectorized/redpanda:v22.2.2
      - redpanda start
      - --smp 1
      - --overprovisioned
      - --node-id 0
      - --kafka-addr PLAINTEXT://,OUTSIDE://
      - --advertise-kafka-addr PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
      - --pandaproxy-addr
      - --advertise-pandaproxy-addr localhost:8082
      - 9093:9092

Run docker compose up to start the cluster and then we’re going to create some topics:

rpk topic create \
  -c cleanup.policy=compact \
  -r 1 -p 5 \
  events big-events

events is where we’ll write our generated data and big-events will be used later in the blog post. Our data generator is shown below:

import datetime
import uuid
import random
import json

while True:
    ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    id = str(uuid.uuid4())
    count = random.randint(0, 1000)
        json.dumps({"tsString": ts, "uuid": id[:3], "count": count})

We can then call it to generate some data and ingest it into Redpanda using the kcat tool:

python datagen.py 2>/dev/null |
jq -cr --arg sep 😊 '[.uuid, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -K😊

Let’s check the messages have been ingested using the rpk tool:

rpk topic consume events --brokers localhost:9092 |
jq -Cc '.value | fromjson' |
head -n5

So far, so good. Next, let’s install Quix Streams:

pip install quixstreams

And now we’re going to create a little application that consumes data from the events topic and writes to the big-events topic any events that have a count bigger than 500:

import quixstreams as qx
from quixstreams import StreamConsumer, EventData
import json

client = qx.KafkaStreamingClient('')

topic_consumer = client.get_topic_consumer( (1)

def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
    with data:
        payload = json.loads(data.value)

        if payload["count"] > 500: (2)
            with (producer := client.get_raw_topic_producer("big-events")): (3)
                message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8'))
                message.key = payload["uuid"].encode('utf-8')
                producer.publish(message) (4)

def on_stream_received_handler(stream_received: StreamConsumer):
    stream_received.events.on_data_received = on_event_data_received_handler

print("Listening to streams. Press CTRL-C to exit.")

topic_consumer.on_stream_received = on_stream_received_handler

1 Create consumer for events topic
2 Only keep events that have a count bigger than 500
3 Create producer for big-events topic
4 Publish to big-events

Let’s run this file:

python filter_events.py

We can then check if any events have made it into the big-events topic:

rpk topic consume big-events --brokers localhost:9092 |
jq -Cc '.value | fromjson' |
head -n5

Job done!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket