Kafka Connect MQTT Example
Setup Broker
Start MQTT server:
docker compose --profile mqtt up -d
In one terminal, subscribe to mqtt topics:
mosquitto_sub -h mosquitto -t "house/+/brightness"
In another terminal, publish messages:
mosquitto_pub -h mosquitto -t "house/room/brightness" -m "800LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1000LM"
Create Source Connector
Payload:
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics":"house/+/brightness",
"kafka.topic":"connect.brightness",
"mqtt.qos": "2",
"confluent.topic.bootstrap.servers": "kafka1:9092",
"confluent.topic.replication.factor": "1"
}
}
Create a connector using the API:
http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mqtt-source.json
In one terminal, consume from kafka:
kafka-console-consumer --from-beginning --group connect.mqtt \
--topic connect.brightness \
--bootstrap-server kafka1:9092 \
--property print.key=true
In another terminal, publish new messages to the MQTT broker:
mosquitto_pub -h mosquitto -t "house/room/brightness" -m "810LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1020LM"
Deleting the connector:
http DELETE kafka-connect:8083/connectors/mqtt-source