Kafka Connect Database Example

In this example you are going to learn how to move data from a source (mysql), to a synk (postgres).

important

This example does not support deletion, for that you have to implement tombstone events at the source and sink.

Populate Database

Run MySQL and PostgreSQL:

docker compose --profile sql up -d

Populate it:

mysql --host=mysql --user=root --database=sandbox < kafka-connect/sql/customers.sql

That command should have created the table customers and inserted 200 records.

Now you can open Adminer or run:

mysql --host=mysql --user=root --database=sandbox -e "select * from customers"

Create Source Connector

Check the installed plugins:

http kafka-connect:8083/connector-plugins

Now you have to hit the kafka connect rest service to create a new source, next you have the rest payload:

{ "name": "mysql-source", "config": { "tasks.max": "1", "table.whitelist": "customers", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/sandbox", "connection.user": "root", "connection.password": "notasecret", "mode": "timestamp", "timestamp.column.name": "created", "topic.prefix": "connect.", "transforms": "createKey", "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields": "id" } }

Create the connector using the API:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mysql-source.json

If you open AKHQ you should see a new topic: connect.customers.

Create Sink Connector

Payload:

{ "name": "postgres-sink", "config": { "tasks.max": "1", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url": "jdbc:postgresql://postgres:5432/sandbox", "connection.user": "postgres", "connection.password": "notasecret", "delete.enabled": false, "pk.mode": "record_key", "pk.key": "id", "insert.mode": "upsert", "auto.create": true, "topics": "connect.customers", "transforms": "dropPrefix", "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropPrefix.regex": "connect\\.(.*)", "transforms.dropPrefix.replacement": "$1" } }

Create sink connector:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-postgres-sink.json

This sink connector is going to create a table customers on postgres and insert all records.

Now you can open Adminer or run:

psql --host=postgres --user=postgres --dbname=sandbox -c "select * from customers"

List connector:

http kafka-connect:8083/connectors

Deleting Connectors

http DELETE kafka-connect:8083/connectors/postgres-sink http DELETE kafka-connect:8083/connectors/mysql-source