· quixstreams python til

Quix Streams: Process certain number of Kafka messages

In a recent demo, I wanted to use Quix Streams to process a specified number of messages from a Kafka topic, write a message to another stream, and then exit the Quix app. This is an unusual use of Quix Streams, so it took me a while to figure out how to do it.

Let’s assume we have a Kafka broker running. We’ll create a couple of topics using the rpk tool:

rpk topic create events -p1
rpk topic create massaged-events -p1

Now, we’re going to start by installing Quix Streams and Click, a library for processing command line arguments:

pip install quixstreams click

We’re going to start by creating a version of the application that processes all messages. Create a file called massage.py and add the following imports

import quixstreams as qx
from quixstreams import StreamConsumer, EventData, CommitMode
import json
import click

Next, let’s create a 'main' method that’s going to bootstrap the app:

@click.command()
def run_app():
    global client, topic_consumer, producer

    client = qx.KafkaStreamingClient('127.0.0.1:9092') (1)

    topic_consumer = client.get_topic_consumer( (2)
        topic="events",
        auto_offset_reset=qx.AutoOffsetReset.Earliest,
        consumer_group="events-consumer",
        commit_settings=CommitMode.Manual
    )
    producer = client.get_raw_topic_producer("massaged-events") (3)

    print("Listening to streams. Press CTRL-C to exit.")

    topic_consumer.on_stream_received = on_stream_received_handler
    topic_consumer.subscribe()

    qx.App.run(before_shutdown=before_shutdown)
1 Create Kafka client
2 Create consumer for the events topic
3 Create producer for the massaged-events topic

We then need to add the following functions to process each message and handle the shutdown of the app:

def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
    with data:
        payload = json.loads(data.value)
        payload["count"] *= 2 (1)

        message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8')) (2)
        message.key = str(payload["id"]).encode('utf-8')
        producer.publish(message) (3)

        topic_consumer.commit()


def on_stream_received_handler(stream_received: StreamConsumer):
    stream_received.events.on_data_received = on_event_data_received_handler


def before_shutdown():
    print('before shutdown')
    topic_consumer.dispose()
    producer.dispose()
1 Multiply the count property by 2
2 Create a new message
3 Publish the message to the massaged-events topic

Finally, let’s call the 'main' function:

if __name__ == "__main__":
    run_app()

If we run this script, it will process any messages received by the events topic and write a new message to massaged-events with the count property doubled.

Let’s ingest a message into Kafka to see if it works:

echo '{"id": 1, "count": 4}' |
jq -cr --arg sep ø '[.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

And now we’ll check the contents of the events and massaged-events topics:

kcat -C -b localhost:9092 -t events -e | jq -c
Output
{"id":1,"count":4}
kcat -C -b localhost:9092 -t massaged-events -e | jq -c
Output
{"id":1,"count":8}

So far, so good. If we write any more messages to the events topic they will automatically be processed as well.

But now we want to update our script so that we can specify how many messages to consume before stopping. If we then run the script again, it will continue from where we left off because our topic consumer was created with a consumer group that’s keeping track of the last read offset. We can return that offset by running the following command:

rpk group describe events-consumer
Output
GROUP        events-consumer
COORDINATOR  0
STATE        Empty
BALANCER
MEMBERS      0
TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   MEMBER-ID  CLIENT-ID  HOST
events  0          1               1               0

Let’s update our imports:

from quixstreams import StreamConsumer, EventData, CancellationTokenSource, CommitMode
import threading

And now we’ll update the run_app function to look like this:

@click.command()
@click.option('--number-events', default=1)
def run_app(number_events):
    global client, topic_consumer, producer
    global events_to_consume, events_consumed, thread_lock, cancellation_thread

    client = qx.KafkaStreamingClient('127.0.0.1:9092')

    topic_consumer = client.get_topic_consumer(
        topic="events",
        auto_offset_reset=qx.AutoOffsetReset.Earliest,
        consumer_group="events-consumer",
        commit_settings=CommitMode.Manual
    )
    producer = client.get_raw_topic_producer("massaged-events")

    thread_lock = threading.Lock()
    cts = CancellationTokenSource() (1)
    cancellation_thread = threading.Thread(target=lambda: cts.cancel()) (2)

    events_to_consume = number_events
    events_consumed = 0

    print("Listening to streams. Press CTRL-C to exit.")

    topic_consumer.on_stream_received = on_stream_received_handler
    topic_consumer.subscribe()

    qx.App.run(cts.token, before_shutdown=before_shutdown)
    if cancellation_thread.is_alive(): (3)
        cancellation_thread.join()
1 Cancellation token used to stop message processing
2 Cancellation thread which will trigger the cancellation token
3 Join the cancellation thread to the main thread before exiting

And the on_event_data_received_handler needs to be updated to keep track the messages consumed:

def on_event_data_received_handler(stream: StreamConsumer, data: EventData):
    global events_consumed
    with data:
        payload = json.loads(data.value)
        payload["count"] *= 2

        message = qx.RawMessage(json.dumps(payload, indent=2).encode('utf-8'))
        message.key = str(payload["id"]).encode('utf-8')
        producer.publish(message)

        topic_consumer.commit()

        with thread_lock:
            events_consumed += 1 (1)

        if events_consumed >= events_to_consume: (2)
            if not cancellation_thread.is_alive():
                cancellation_thread.start() (3)
                print("Cancellation token triggered")
            return
1 Increment the number of messages consumed
2 Check if we’ve exceeded the count
3 Trigger the cancellation thread, which will cancel the token

We can then call our Python script like this to process one event:

python massage.py --number-events 1

Let’s now add another message to Kafka:

echo '{"id": 42, "count": 9000}' |
jq -cr --arg sep ø '[.id, tostring] | join($sep)' |
kcat -P -b localhost:9092 -t events -Kø

We’ll see the following output from massage.py:

Output
Listening to streams. Press CTRL-C to exit.
Cancellation token triggered
before shutdown

And if we look at the massaged-events topic, it now has the following message:

Output
{"id":42,"count":18000}

You can find the full code in this GitHub Gist.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket