· flink kafka

Flink SQL: Exporting nested JSON to a Kafka topic

I’ve been playing around with Flink as part of a workshop that I’m doing at JFokus in a couple of weeks and I wanted to export some data from Flink to Apache Kafka in a nested format. In this blog post we’ll learn how to do that.

Setup

We’re going to be using the following Docker Compose config:

docker-compose.yml
version: "3"
services:
  zookeeper:
    image: zookeeper:latest
    container_name: zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888;2181
  kafka:
    image: confluentinc/cp-kafka:7.1.0
    hostname: kafka
    container_name: kafka
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
    depends_on:
      [zookeeper]
    healthcheck: {test: nc -z localhost 9092, interval: 1s, start_period: 120s}
  sql-client:
    container_name: 'flink-sql-client'
    build:
      context: .
      dockerfile: sql-client/Dockerfile
    depends_on:
      - jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager
    volumes:
      - ./flink/settings/:/settings
  jobmanager:
    image: flink:1.16.0-scala_2.12-java11
    container_name: 'flink-jobmanager'
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 30
    volumes:
      - ./flink/settings/:/settings
      - ./flink/data/:/data
  taskmanager:
    image: flink:1.16.0-scala_2.12-java11
    container_name: 'flink-taskmanager'
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 30
    volumes:
      - ./flink/settings/:/settings
      - ./flink/data/:/data

The Flink config used here is adapted Francesco Tisiot’s repository at github.com/aiven/sql-cli-for-apache-flink-docker/, so thanks Francesco!

Let’s get started:

docker-compose up

Data Generation

We’re going to generate data using the github.com/mneedham/livestream-data-generator repository, which simulates users joining and leaving live stream events. Once we’ve checked out that repository, we can install the dependencies:

pip install -r requirements.txt

And then run the data generator:

python loop.py \
  --timeout 1 \
  --users 1000 \
  --events 100 \
  --max-start-delay 0 \
  --min-event-length 60 \
  --max-event-length 180

We’ll see output similar to this:

Output
{"eventTime": "2023-01-23T10:15:24.089789", "eventId": "89653462-d58c-4751-974b-cc94d9fa9a11", "userId": "cf29d9e5-4f52-43cf-99c6-b6138ae612eb", "name": "Beverly Kelley", "lat": "51.5085", "lng": "-0.1257", "city": "London", "region": "England", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.048042", "eventId": "3bf77680-7664-44a2-b5eb-7281fb759999", "userId": "31ab115f-b7a4-48a6-a282-5e5a3c15ddf0", "name": "Jeffery Adams", "lat": "32.5530", "lng": "-92.0422", "city": "Monroe", "region": "Louisiana", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.033714", "eventId": "0283e8e6-14a7-4d8e-8aab-5d40d38eb52d", "userId": "0acad44c-2545-4c81-bd3f-33385d21160f", "name": "Alexander Fuller", "lat": "43.2501", "lng": "-79.8496", "city": "Hamilton", "region": "Ontario", "action": "Join"}
{"eventTime": "2023-01-23T10:15:23.979862", "eventId": "bf061b6c-b03d-43d7-9d04-f4f8c71a9ab0", "userId": "0a9f8527-a2b4-4c5f-b82b-2a3930182c62", "name": "Julie Grant", "lat": "35.6910", "lng": "139.7679", "city": "Tokyo", "region": "Tokyo", "action": "Join"}
{"eventTime": "2023-01-23T10:15:24.075753", "eventId": "868320eb-96b2-496f-af72-9cd83d9726c0", "userId": "fc20e945-f4ce-4c71-9a55-8f7253583c1c", "name": "Natalie Martinez", "lat": "39.9690", "lng": "-83.0114", "city": "Columbus", "region": "Ohio", "action": "Join"}

Ingesting data into Kafka

Let’s now ingest that data into Kafka, using the jq and kcat command line tools:

python loop.py \
  --timeout 1 \
  --users 1000 \
  --events 100 \
  --max-start-delay 0 \
  --min-event-length 60 \
  --max-event-length 180 |
jq -cr --arg sep 😊 '[.eventId, tostring] | join($sep)' |
kcat -P -b localhost:29092 -t events -K😊
Note

I’ve created a video showing how ingest data using this technique on my YouTube channel, Learn Data with Mark, which is embedded below.

We can also use kcat to check that the data has made its way into Kafka:

kcat -C -b localhost:29092 -t events -c3 | jq
Output
{
  "eventTime": "2023-01-24T11:13:05.213589",
  "eventId": "ebfec380-c3f1-4471-b30b-da822db57117",
  "userId": "983aa5c2-4f98-4507-937b-d60f41e1407e",
  "name": "Joanne Walters",
  "lat": "42.2399",
  "lng": "-83.1508",
  "city": "Dearborn",
  "region": "Michigan",
  "action": "Join"
}
{
  "eventTime": "2023-01-24T11:13:05.216398",
  "eventId": "9cc0a7f5-0638-4b27-a897-2a61ddab98ac",
  "userId": "1043a9d5-e722-4d86-91f6-8926046050d5",
  "name": "Michael Miller",
  "lat": "34.0498",
  "lng": "-117.4706",
  "city": "Fontana",
  "region": "California",
  "action": "Join"
}
{
  "eventTime": "2023-01-24T11:13:05.291973",
  "eventId": "2bb4a79b-f5ec-488e-84ee-72bf2fb8c293",
  "userId": "cd0e36ff-9171-41b5-91d2-bd32feaae17c",
  "name": "David Trujillo",
  "lat": "35.9139",
  "lng": "47.0239",
  "city": "Dīvāndarreh",
  "region": "Kordestān",
  "action": "Join"
}

So far, so good.

Now let’s say that we want to tranform these events to have a nested structure like this:

{
    "event": {
        "time": "2023-01-24T11:13:05.213589",
        "id": "ebfec380-c3f1-4471-b30b-da822db57117"
    },
    "user": {
        "id": "983aa5c2-4f98-4507-937b-d60f41e1407e",
        "name": "Joanne Walters",
        "lat": "42.2399",
        "lng": "-83.1508",
        "city": "Dearborn",
        "region": "Michigan",
    },
    "action": "Join"
}

We’re going to do this using Flink, a popular stateful stream processor. We can interact with Flink via its SQL client:

docker exec -it flink-sql-client /opt/sql-client/sql-client.sh

Create a table on the events stream:

CREATE TABLE Events (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `eventTime` STRING,
  `eventId` STRING,
  `userId` STRING,
  `name` STRING,
  `lat` DOUBLE,
  `lng` DOUBLE,
  `city` STRING,
  `region` STRING,
  `action` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'events',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'eventsGroup=',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

We can query this table to view some of the events:

select eventTime, eventId, userId, name, lat, lng, city, region, action
FROM Events
LIMIT 5;
Table 1. Output
eventTime eventId userId name lat lng city region action

2023-01-24T11:35:21.017729

6a8fbce6-aa20-46eb-a201-75807~

ff8663b0-4376-448a-a443-c14bb

Terri Simmons

-27.4679

153.0281

Brisbane

Queensland

Join

2023-01-24T11:35:21.037883

9d46a3ad-63f6-436e-9623-e2338~

63aa6903-9e37-4009-8571-bbf1c

Matthew Baldwin

35.691

139.7679

Tokyo

Tokyo

Join

2023-01-24T11:35:21.049036

dbb56840-130c-4356-9b36-ddec6~

3a748d48-7adb-45aa-8d9d-e0021

Leah Cook

35.0202

135.734

Kyoto

Kyoto

Join

2023-01-24T11:35:21.029667

6994ffa4-3c0d-44a6-9cc0-ab866~

3d322e87-f090-4484-8ea1-c7d15

Kendra Crawford

32.0809

34.7806

Tel Aviv

Tel Aviv

Join

2023-01-24T11:35:20.988061

3d443ffc-f1cd-4ee4-8675-ac99e~

a41f260f-5b57-4e69-9bb2-31715

Jeremy Jones

30.6667

104.0667

Chengdu

Sichuan

Join

We can use the map function to massage this data into the nested structure. The following query does this:

SELECT map[
  'time', eventTime,
  'id', eventId
] AS event,
map [
  'id', userId,
  'name', name,
  'lat', lat,
  'lng', lng,
  'city', city,
  'region', region
] AS `user`,
action
FROM Events
LIMIT 1;

If we run this query, we’ll get the following exception:

Output
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Parameters must be of the same type

The problem is that it expects all the value to be of the same type and the lat and lng fields are doubles. The easiest solution is to cast these values to strings, as shown below:

SELECT map[
  'time', eventTime,
  'id', eventId
] AS event,
map [
  'id', userId,
  'name', name,
  'lat', CAST(lat AS STRING),
  'lng', CAST(lng AS STRING),
  'city', city,
  'region', region
] AS `user`,
action
FROM Events
LIMIT 1;

Now let’s create a table that exports its contents to another Kafka topic called events-nested;

CREATE TABLE EventsNested (
  `user` MAP<STRING,STRING>,
  `event` MAP<STRING,STRING>,
  `action` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'events-nested',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'testGroup',
  'value.format' = 'json'
);

And now let’s ingest the previous query into that table:

INSERT INTO EventsNested
SELECT map[
  'time', eventTime,
  'id', eventId
] AS event,
map [
  'id', userId,
  'name', name,
  'lat', CAST(lat AS STRING),
  'lng', CAST(lng AS STRING),
  'city', city,
  'region', region
] AS `user`,
action
FROM Events;
Output
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f2e6f707a9c69b84f4d6e9ca7bc34fc6

We can then check that data is making its way into that topic using kcat:

kcat -C -b localhost:29092 -t events-nested -c1 | jq
Output
{
  "user": {
    "time": "2023-01-24T11:35:21.017729",
    "id": "6a8fbce6-aa20-46eb-a201-7580762c2a16"
  },
  "event": {
    "city": "Brisbane",
    "lng": "153.0281",
    "id": "ff8663b0-4376-448a-a443-c14bb0cee0cc",
    "region": "Queensland",
    "name": "Terri Simmons",
    "lat": "-27.4679"
  },
  "action": "Join"
}

Job done!

In Summary

The map structure is very helpful for creating nested structures, but it took me a little while to figure out how to use it. Hopefully this blog post will save you going through that same journey of exploration.

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket