· debezium til

Debezium: Capture changes from MySQL

I’ve been working on a Real-Time Analytics workshop that I’m going to be presenting at the ODSC Europe conference in June 2023 and I wanted to have Debezium publish records from a MySQL database without including the schema.

I’m using the debezium/connect:2.3 Docker image to run Debezium locally and I have a MySQL database running with the hostname mysql on port 3306. Below is the way that I configured this:

curl -X PUT \
  -H  "Content-Type:application/json" \
  http://localhost:8083/connectors/mysql/config \
    -d '{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": 3306,
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.name": "mysql",
    "database.server.id": "223344",
    "database.allowPublicKeyRetrieval": true,
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "mysql-history",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "mysql-schema-history",
    "database.include.list": "pizzashop",
    "time.precision.mode": "connect",
    "topic.prefix": "mysql",
    "include.schema.changes": false
}'

My database pizzashop contains the tables products and users, which will map to the following Kafka topics:

  • productsmysql.pizzashop.products

  • usersmysql.pizzashop.users

We can use kcat to see the messages published to these topics:

kcat -C -b localhost:29092 -t mysql.pizzashop.users -c1 -u | jq
Output
{
  "before": null,
  "after": {
    "id": 1,
    "first_name": "Kismat",
    "last_name": "Shroff",
    "email": "drishyamallick@hotmail.com",
    "residence": "575, Edwin Circle, Bahraich-255519",
    "lat": "TYu5Xg==",
    "lon": "Ac6oUak=",
    "created_at": "2023-05-31T08:44:19Z",
    "updated_at": 1685522659000
  },
  "source": {
    "version": "2.3.0.Beta1",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1685524951000,
    "snapshot": "first_in_data_collection",
    "db": "pizzashop",
    "sequence": null,
    "table": "users",
    "server_id": 0,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 156,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "r",
  "ts_ms": 1685524951843,
  "transaction": null
}

We can then go to our MySQL table and update one of the products:

update products
set name = 'Super Awesome Moroccan Pasta Pizza - Veg'
where id =  1;

And then if we query the Kafka topic again, we’ll see this output:

Output
{
  "before": {
    "id": 1,
    "name": "Moroccan Spice Pasta Pizza - Veg",
    "description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
    "category": "veg pizzas",
    "price": 335,
    "image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
    "created_at": "2023-05-31T08:44:19Z",
    "updated_at": 1685522659000
  },
  "after": {
    "id": 1,
    "name": "Super Awesome Moroccan Pasta Pizza - Veg",
    "description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
    "category": "veg pizzas",
    "price": 335,
    "image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
    "created_at": "2023-05-31T08:44:19Z",
    "updated_at": 1685527662000
  },
  "source": {
    "version": "2.3.0.Beta1",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1685527662000,
    "snapshot": "false",
    "db": "pizzashop",
    "sequence": null,
    "table": "products",
    "server_id": 1,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 414,
    "row": 0,
    "thread": 4714,
    "query": null
  },
  "op": "u",
  "ts_ms": 1685527691344,
  "transaction": null
}

Let’s now delete one record:

DELETE FROM pizzashop.products WHERE id =1;

And back to Kafka:

Output
{
  "before": {
    "id": 1,
    "name": "Super Awesome Moroccan Pasta Pizza - Veg",
    "description": "A pizza loaded with a spicy combination of Harissa sauce and delicious pasta.",
    "category": "veg pizzas",
    "price": 335,
    "image": "https://www.dominos.co.in//files/items/MoroccanSpicePPVG_N.jpg",
    "created_at": "2023-05-31T08:44:19Z",
    "updated_at": 1685527662000
  },
  "after": null,
  "source": {
    "version": "2.3.0.Beta1",
    "connector": "mysql",
    "name": "mysql",
    "ts_ms": 1685527882000,
    "snapshot": "false",
    "db": "pizzashop",
    "sequence": null,
    "table": "products",
    "server_id": 1,
    "gtid": null,
    "file": "binlog.000002",
    "pos": 1143,
    "row": 0,
    "thread": 4868,
    "query": null
  },
  "op": "d",
  "ts_ms": 1685527882035,
  "transaction": null
}
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket