Kafka Connect Database Example

⚠️ This example does not support deletion, for that you have to implement tombstone events at the source and sink.

Populate the databases:

sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100

Create the connectors using the API:

cd kafka-connect
http :8083/connectors < requests/create-connector-mysql-source.json
http :8083/connectors < requests/create-connector-mongo-sink.json
http :8083/connectors < requests/create-connector-postgres-sink.json
cd ..

For deleting the connectors:

http DELETE :8083/connectors/postgres-sink
http DELETE :8083/connectors/mongo-sink
http DELETE :8083/connectors/mysql-source

Requests

requests/create-connector-mysql-source.json

{
  "name": "mysql-source",
  "config": {
    "tasks.max": "1",
    "table.whitelist": "customers",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/sandbox",
    "connection.user": "root",
    "connection.password": "notasecret",
    "mode": "timestamp",
    "timestamp.column.name": "created",
    "topic.prefix": "kafka-connect.",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

requests/create-connector-mongo-sink.json

{
  "name": "mongo-sink",
  "config": {
    "tasks.max": "1",
    "topics": "kafka-connect.customers",
    "collection": "customers",
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "connection.uri": "mongodb://root:notasecret@mongo:27017",
    "database": "sandbox"
  }
}

requests/create-connector-postgres-sink.json

{
  "name": "postgres-sink",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/sandbox",
    "connection.user": "postgres",
    "connection.password": "notasecret",
    "delete.enabled": false,
    "pk.mode": "record_key",
    "pk.key": "id",
    "insert.mode": "upsert",
    "auto.create": true,
    "topics": "kafka-connect.customers",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "kafka-connect\\.(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}