Kafka Connect Database Example

In this example you are going to learn how to move data from a source (mysql), to a synk (postgres).

important

This example does not support deletion, for that you have to implement tombstone events at the source and sink.

Populate Database

Run MySQL and PostgreSQL:

docker compose --profile sql up -d

Populate it:

mysql --host=mysql --user=root --database=sandbox < kafka-connect/sql/customers.sql

That command should have created the table customers and inserted 200 records.

Now you can open Adminer or run:

mysql --host=mysql --user=root --database=sandbox -e "select * from customers"

Create Source Connector

Check the installed plugins:

http kafka-connect:8083/connector-plugins

Now you have to hit the kafka connect rest service to create a new source, next you have the rest payload:

{
  "name": "mysql-source",
  "config": {
    "tasks.max": "1",
    "table.whitelist": "customers",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/sandbox",
    "connection.user": "root",
    "connection.password": "notasecret",
    "mode": "timestamp",
    "timestamp.column.name": "created",
    "topic.prefix": "connect.",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

Create the connector using the API:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mysql-source.json

If you open AKHQ you should see a new topic: connect.customers.

Create Sink Connector

Payload:

{
  "name": "postgres-sink",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/sandbox",
    "connection.user": "postgres",
    "connection.password": "notasecret",
    "delete.enabled": false,
    "pk.mode": "record_key",
    "pk.key": "id",
    "insert.mode": "upsert",
    "auto.create": true,
    "topics": "connect.customers",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "connect\\.(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}

Create sink connector:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-postgres-sink.json

This sink connector is going to create a table customers on postgres and insert all records.

Now you can open Adminer or run:

psql --host=postgres --user=postgres --dbname=sandbox -c "select * from customers"

List connector:

http kafka-connect:8083/connectors

Deleting Connectors

http DELETE kafka-connect:8083/connectors/postgres-sink
http DELETE kafka-connect:8083/connectors/mysql-source