Kafka Connect Database Example
⚠️ This example does not support deletion, for that you have to implement tombstone events at the source and sink.
Populate the databases:
sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100
Create the connectors using the API:
cd kafka-connect
http :8083/connectors < requests/create-connector-mysql-source.json
http :8083/connectors < requests/create-connector-mongo-sink.json
http :8083/connectors < requests/create-connector-postgres-sink.json
cd ..
For deleting the connectors:
http DELETE :8083/connectors/postgres-sink
http DELETE :8083/connectors/mongo-sink
http DELETE :8083/connectors/mysql-source
Requests
requests/create-connector-mysql-source.json
{
"name": "mysql-source",
"config": {
"tasks.max": "1",
"table.whitelist": "customers",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/sandbox",
"connection.user": "root",
"connection.password": "notasecret",
"mode": "timestamp",
"timestamp.column.name": "created",
"topic.prefix": "kafka-connect.",
"transforms": "createKey",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id"
}
}
requests/create-connector-mongo-sink.json
{
"name": "mongo-sink",
"config": {
"tasks.max": "1",
"topics": "kafka-connect.customers",
"collection": "customers",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://root:notasecret@mongo:27017",
"database": "sandbox"
}
}
requests/create-connector-postgres-sink.json
{
"name": "postgres-sink",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/sandbox",
"connection.user": "postgres",
"connection.password": "notasecret",
"delete.enabled": false,
"pk.mode": "record_key",
"pk.key": "id",
"insert.mode": "upsert",
"auto.create": true,
"topics": "kafka-connect.customers",
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "kafka-connect\\.(.*)",
"transforms.dropPrefix.replacement": "$1"
}
}