Kafka Connect Database Example
In this example you are going to learn how to move data from a source (mysql), to a synk (postgres).
important
This example does not support deletion, for that you have to implement tombstone events at the source and sink.
Populate Database
Run MySQL and PostgreSQL:
docker compose --profile sql up -d
Populate it:
mysql --host=mysql --user=root --database=sandbox < kafka-connect/sql/customers.sql
That command should have created the table customers
and inserted 200 records.
Now you can open Adminer or run:
mysql --host=mysql --user=root --database=sandbox -e "select * from customers"
Create Source Connector
Check the installed plugins:
http kafka-connect:8083/connector-plugins
Now you have to hit the kafka connect rest service to create a new source, next you have the rest payload:
{
"name": "mysql-source",
"config": {
"tasks.max": "1",
"table.whitelist": "customers",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/sandbox",
"connection.user": "root",
"connection.password": "notasecret",
"mode": "timestamp",
"timestamp.column.name": "created",
"topic.prefix": "connect.",
"transforms": "createKey",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id"
}
}
Create the connector using the API:
http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mysql-source.json
If you open AKHQ you should see a new topic: connect.customers
.
Create Sink Connector
Payload:
{
"name": "postgres-sink",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/sandbox",
"connection.user": "postgres",
"connection.password": "notasecret",
"delete.enabled": false,
"pk.mode": "record_key",
"pk.key": "id",
"insert.mode": "upsert",
"auto.create": true,
"topics": "connect.customers",
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "connect\\.(.*)",
"transforms.dropPrefix.replacement": "$1"
}
}
Create sink connector:
http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-postgres-sink.json
This sink connector is going to create a table customers
on postgres and insert all records.
Now you can open Adminer or run:
psql --host=postgres --user=postgres --dbname=sandbox -c "select * from customers"
List connector:
http kafka-connect:8083/connectors
Deleting Connectors
http DELETE kafka-connect:8083/connectors/postgres-sink
http DELETE kafka-connect:8083/connectors/mysql-source