Kafka Connect MQTT Example

Setup Broker

Start MQTT server:

docker compose --profile mqtt up -d

In one terminal, subscribe to mqtt topics:

mosquitto_sub -h mosquitto -t "house/+/brightness"

In another terminal, publish messages:

mosquitto_pub -h mosquitto -t "house/room/brightness" -m "800LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1000LM"

Create Source Connector

Payload:

{
  "name": "mqtt-source",
  "config": {
      "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
      "tasks.max": "1",
      "mqtt.server.uri": "tcp://mosquitto:1883",
      "mqtt.topics":"house/+/brightness",
      "kafka.topic":"connect.brightness",
      "mqtt.qos": "2",
      "confluent.topic.bootstrap.servers": "kafka1:9092",
      "confluent.topic.replication.factor": "1"
  }
}

Create a connector using the API:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mqtt-source.json

In one terminal, consume from kafka:

kafka-console-consumer --from-beginning --group connect.mqtt \
                       --topic connect.brightness  \
                       --bootstrap-server kafka1:9092 \
                       --property print.key=true

In another terminal, publish new messages to the MQTT broker:

mosquitto_pub -h mosquitto -t "house/room/brightness" -m "810LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1020LM"

Deleting the connector:

http DELETE kafka-connect:8083/connectors/mqtt-source