· flink kafka

Flink SQL: Could not execute SQL statement. Reason: java.io.IOException: Corrupt Debezium JSON message

As part of a JFokus workshop that I’m working on I wanted to create a Flink table around a Kafka stream that I’d populated from MySQL with help from Debezium. In this blog post I want to show how to do this and explain an error that I encountered along the way.

To start, we have a products table in MySQL that’s publishing events to Apache Kafka. We can see the fields in this event by running the following command:

kcat -C -b localhost:29092 -t mysql.pizzashop.products -c1 | jq 'keys'
[
  "payload",
  "schema"
]

Let’s first have a look at the payload:

kcat -C -b localhost:29092 -t mysql.pizzashop.products -c1 | jq '.payload'
Output
{
  "before": null,
  "after": {
    "id": 1,
    "name": "Moroccan Spice Pasta Pizza - Veg",
    "description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
    "category": "veg pizzas",
    "price": 335,
    "image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
    "created_at": "2023-01-24T12:53:48Z",
    "updated_at": 1674564828000
  },
  "source": {
    "version": "1.8.1.Final",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1674565167817,
    "snapshot": "true",
    "db": "pizzashop",
    "sequence": null,
    "table": "products",
    "server_id": 0,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 156,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "r",
  "ts_ms": 1674565167827,
  "transaction": null
}

before is null because there wasn’t anything there before. If we’d done an update to this record we’d see a before entry that contained all the fields that are under the after property.

And now, we’ll zoom in on the schema:

kcat -C -b localhost:29092 -t mysql.pizzashop.products -c1 | jq -c '.schema'
Output
{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"string","optional":true,"field":"category"},{"type":"double","optional":true,"field":"price"},{"type":"string","optional":true,"field":"image"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"default":0,"field":"updated_at"}],"optional":true,"name":"mysql.pizzashop.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"string","optional":true,"field":"category"},{"type":"double","optional":true,"field":"price"},{"type":"string","optional":true,"field":"image"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"default":0,"field":"updated_at"}],"optional":true,"name":"mysql.pizzashop.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql.pizzashop.products.Envelope"}

There’s a lot there, let’s get a list of fields in the schema:

kcat -C -b localhost:29092 -t mysql.pizzashop.products -c1 | jq -c '.schema .fields[] .field'
Output
"before"
"after"
"source"
"op"
"ts_ms"
"transaction"

Let’s select the schema for the before field:

kcat -C -b localhost:29092 -t mysql.pizzashop.products -c1 | jq '.schema .fields[] | select(.field == "before")'
Output
{
  "type": "struct",
  "fields": [
    {
      "type": "int64",
      "optional": false,
      "field": "id"
    },
    {
      "type": "string",
      "optional": true,
      "field": "name"
    },
    {
      "type": "string",
      "optional": true,
      "field": "description"
    },
    {
      "type": "string",
      "optional": true,
      "field": "category"
    },
    {
      "type": "double",
      "optional": true,
      "field": "price"
    },
    {
      "type": "string",
      "optional": true,
      "field": "image"
    },
    {
      "type": "string",
      "optional": true,
      "name": "io.debezium.time.ZonedTimestamp",
      "version": 1,
      "field": "created_at"
    },
    {
      "type": "int64",
      "optional": true,
      "name": "org.apache.kafka.connect.data.Timestamp",
      "version": 1,
      "default": 0,
      "field": "updated_at"
    }
  ],
  "optional": true,
  "name": "mysql.pizzashop.products.Value",
  "field": "before"
}

Next, we’re going to launch the Flink CLI and create a Products table with the mysql.pizzashop.products topic as its source:

CREATE TABLE Products (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `id` STRING,
  `name` STRING,
  `description` STRING,
  `category` STRING,
  `price` DOUBLE,
  `image` STRING,
  `createdAt` STRING

) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql.pizzashop.products',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'debezium-json'
);
Output
[INFO] Execute statement succeed.

Now let’s try to query the table:

SELECT *
FROM Products;

This results in the following error:

Output
[ERROR] Could not execute SQL statement. Reason:
java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"string","optional":true,"field":"category"},{"type":"double","optional":true,"field":"price"},{"type":"string","optional":true,"field":"image"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"default":0,"field":"updated_at"}],"optional":true,"name":"mysql.pizzashop.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"string","optional":true,"field":"category"},{"type":"double","optional":true,"field":"price"},{"type":"string","optional":true,"field":"image"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"created_at"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"default":0,"field":"updated_at"}],"optional":true,"name":"mysql.pizzashop.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql.pizzashop.products.Envelope"},"payload":{"before":null,"after":{"id":1,"name":"Moroccan Spice Pasta Pizza - Veg","description":"A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.","category":"veg pizzas","price":335.0,"image":"https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg","created_at":"2023-01-24T12:53:48Z","updated_at":1674564828000},"source":{"version":"1.8.1.Final","connector":"mysql","name":"mysql","ts_ms":1674565167817,"snapshot":"true","db":"pizzashop","sequence":null,"table":"products","server_id":0,"gtid":null,"file":"binlog.000002","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1674565167827,"transaction":null}}'.

A bit of Googling led me to this StackOverflow question, but my issue didn’t seem to match those on that thread. Instead the problem is actually that we need to specify an extra property for the table, as described in the documentation:

In order to interpret such messages, you need to add the option 'debezium-json.schema-include' = 'true' into above DDL WITH clause (false by default). Usually, this is not recommended to include schema because this makes the messages very verbose and reduces parsing performance.

Let’s fix our table by adding 'debezium-json.schema-include' = 'true' to the WITH part of the CREATE clause:

DROP TABLE Products;

CREATE TABLE Products (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `id` STRING,
  `name` STRING,
  `description` STRING,
  `category` STRING,
  `price` DOUBLE,
  `image` STRING,
  `createdAt` STRING

) WITH (
  'connector' = 'kafka',
  'topic' = 'mysql.pizzashop.products',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'debezium-json',
  'debezium-json.schema-include' = 'true'
);

If we query the table again, this time it works!

  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket