Introduction
Kafka Sandbox helps you to deploy a kafka sandbox locally. It intends to be a simple way to get started with kafka and help you on your learning path. It provides you with a wide variety of tools from the kafka ecosystem and a simple way to run them all. It also includes a set of tools and tips to make it easier for you to use kafka. It does not include security since it is not a production system.
Interesting Links
Install Dependencies
- docker - version 20 or higher
- docker compose - version 2 or higher
- java - version 11 or higher
- httpie - rest client
- jq - json parser
Other Useful Utilities
- curl - command line http client
- lazydocker - docker text user interface
- kaskade - kafka text user interface
Quick Start
Clone the repo:
git clone https://github.com/sauljabin/kafka-sandbox.git
cd kafka-sandbox
Create a docker network:
docker network create kafka-sandbox_network
Run the kafka cluster:
cd kafka-cluster
docker compose up -d
cd ..
Run AKHQ:
cd kafka-akhq
docker compose up -d
cd ..
Open AKHQ at http://localhost:8080/
Kafka CLI Tools
It is a collection of tools to interact with kafka cluster through the terminal.
- kafkacat
- confluent community tools
- project location: kafka-cli
⚠️ Run these commands inside the root folder.
Create an alias for kafka-cli
:
alias kafka-cli='docker run --rm -it --network kafka-sandbox_network kafka-cli:latest '
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias kafka-cli='docker run --rm -it --network kafka-sandbox_network kafka-cli:latest '" >> ~/.zshrc
Create the docker image:
cd kafka-cli
docker build -t kafka-cli:latest .
kafka-cli
SQL Database
Create a MySQL and PostgresSQL instance and a database.
- mysql
- postgres
- adminer
- project location: sql-database
- postgres port:
5432
- mysql port:
3306
- adminer port:
9090
(open it in the web browser)
Run MySQL, PostgresSQL and Adminer:
cd sql-database
docker compose up -d
cd ..
Docker Compose
services:
mysql:
image: mysql:8
environment:
MYSQL_DATABASE: sandbox
MYSQL_ROOT_PASSWORD: notasecret
ports:
- "3306:3306"
restart: on-failure
volumes:
- mysql_data:/var/lib/mysql
postgres:
image: postgres:13
environment:
POSTGRES_DB: sandbox
POSTGRES_PASSWORD: notasecret
ports:
- "5432:5432"
restart: on-failure
volumes:
- postgres_data:/var/lib/postgresql/data
adminer:
image: adminer:4
ports:
- "9090:8080"
restart: on-failure
volumes:
mysql_data:
postgres_data:
networks:
default:
external: true
name: kafka-sandbox_network
SQL Populate Database
This tool helps to populate either a MySQL or PostgresSQL database with random customers. This is an ancillary project that can help us to set different scenarios.
- project location: sql-populate
⚠️ Run these commands inside the root folder.
Create an alias for sql-populate
:
alias sql-populate="$PWD/sql-populate/build/install/sql-populate/bin/sql-populate "
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias sql-populate='$PWD/sql-populate/build/install/sql-populate/bin/sql-populate '" >> ~/.zshrc
Install the app:
./gradlew sql-populate:install
sql-populate
Examples:
sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100
sql-populate --url "jdbc:postgresql://localhost:5432/sandbox" --user "postgres" --password "notasecret" 100
NoSQL Database
Create a MongoDB instance and a database.
- mongo
- mongo express
- project location: nosql-database
- mongo port:
27017
- mongo express port:
7070
(open it in the web browser)
Run MongoDB and Mongo Express:
cd nosql-database
docker compose up -d
cd ..
Docker Compose
services:
mongo:
image: mongo:5
environment:
MONGO_INITDB_ROOT_PASSWORD: notasecret
MONGO_INITDB_ROOT_USERNAME: root
ports:
- "27017:27017"
restart: on-failure
volumes:
- mongo_data:/data/db
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo mongo:27017/test --quiet
interval: 10s
timeout: 10s
retries: 5
start_period: 40s
mongo-express:
image: mongo-express:latest
environment:
ME_CONFIG_MONGODB_ADMINPASSWORD: notasecret
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_PORT: "27017"
ME_CONFIG_MONGODB_SERVER: mongo
ME_CONFIG_MONGODB_ENABLE_ADMIN: "true"
ports:
- "7070:8081"
restart: on-failure
depends_on:
mongo:
condition: service_healthy
volumes:
mongo_data:
networks:
default:
external: true
name: kafka-sandbox_network
NoSQL Populate Database
This tool helps to populate MongoDB with random customers. This is an ancillary project that can help us to set different scenarios.
- project location: nosql-populate
⚠️ Run these commands inside the root folder.
Create an alias for nosql-populate
:
alias nosql-populate="$PWD/nosql-populate/build/install/nosql-populate/bin/nosql-populate "
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias nosql-populate='$PWD/nosql-populate/build/install/nosql-populate/bin/nosql-populate '" >> ~/.zshrc
Install the app:
./gradlew nosql-populate:install
nosql-populate
Example:
nosql-populate --url "mongodb://root:notasecret@localhost:27017" -d "sandbox" 100
MQTT CLI Tools
MQTT collection of tools to interact with a MQTT broker.
⚠️ Run these commands inside the root folder.
Create an alias for mqtt-cli
:
alias mqtt-cli='docker run --rm -it --network kafka-sandbox_network hivemq/mqtt-cli:latest '
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias mqtt-cli='docker run --rm -it --network kafka-sandbox_network hivemq/mqtt-cli:latest '" >> ~/.zshrc
Test the cli:
mqtt-cli
MQTT Broker
Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.
- mosquitto
- project location: mqtt-broker
- mosquitto port:
1883
Run Mosquitto:
cd mqtt-broker
docker compose up -d
cd ..
After a few seconds:
mqtt-cli test -h mosquitto
Docker Compose
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
restart: on-failure
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
- mosquitto_data:/mosquitto/data
volumes:
mosquitto_data:
networks:
default:
external: true
name: kafka-sandbox_network
Kafka Cluster
A three node kafka cluster.
- kafka
- kafka settings
- project location: kafka-cluster
- kafka version: cp 7.5.2
- kafka ports:
19092
,29092
,39092
- kafka jmx ports:
19999
,29999
,39999
Run kafka cluster:
cd kafka-cluster
docker compose up -d
cd ..
Create a topic:
kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic kafka-cluster.test
kafka-cli kafka-topics --bootstrap-server kafka1:9092 --list
Produce a message:
kafka-cli kafka-console-producer --broker-list kafka1:9092 --topic kafka-cluster.test
Consume messages:
kafka-cli kafka-console-consumer --from-beginning --group kafka-cluster.test \
--topic kafka-cluster.test \
--bootstrap-server kafka1:9092
⚠️ The
JMX
ports were opened to monitor kafka usingjconsole
.
Run jconsole
:
jconsole localhost:19999
Docker Compose
services:
kafka1:
image: confluentinc/cp-kafka:${VERSION}
environment:
KAFKA_BROKER_ID: "1"
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:19092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_JMX_PORT: "19999"
CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
ports:
- "19092:19092"
- "19999:19999"
restart: on-failure
volumes:
- kafka1_data:/var/lib/kafka/data
healthcheck:
test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
kafka2:
image: confluentinc/cp-kafka:${VERSION}
environment:
KAFKA_BROKER_ID: "2"
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:29092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_JMX_PORT: "29999"
CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
ports:
- "29092:29092"
- "29999:29999"
restart: on-failure
volumes:
- kafka2_data:/var/lib/kafka/data
healthcheck:
test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
kafka3:
image: confluentinc/cp-kafka:${VERSION}
environment:
KAFKA_BROKER_ID: "3"
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:39092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_JMX_PORT: "39999"
CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
ports:
- "39092:39092"
- "39999:39999"
restart: on-failure
volumes:
- kafka3_data:/var/lib/kafka/data
healthcheck:
test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
volumes:
kafka1_data:
kafka2_data:
kafka3_data:
networks:
default:
external: true
name: kafka-sandbox_network
Kafka AKHQ
UI for managing kafka cluster.
- akhq
- akhq settings
- project location: kafka-akhq
- akhq port:
8080
(open it in the web browser)
Run AKHQ:
cd kafka-akhq
docker compose up -d
cd ..
Docker Compose
services:
akhq:
image: tchiotludo/akhq:latest
ports:
- "8080:8080"
restart: on-failure
healthcheck:
test: curl http://localhost:8080
interval: 30s
timeout: 30s
retries: 5
start_period: 30s
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: kafka1:9092,kafka2:9092,kafka3:9092
schema-registry:
url: http://schema-registry:8081
connect:
- name: kafka-connect
url: http://kafka-connect:8083
networks:
default:
external: true
name: kafka-sandbox_network
Kafka Schema Registry
It provides a RESTful interface for storing and retrieving your Avro, JSON Schema, and Protobuf schemas.
- schema registry
- schema registry settings
- project location: kafka-schema-registry
- schema registry port:
8081
Run Schema Registry:
cd kafka-schema-registry
docker compose up -d
cd ..
After a few seconds:
http :8081/config
Docker Compose
services:
schema-registry:
image: confluentinc/cp-schema-registry:${VERSION}
environment:
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,OPTIONS
SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: "*"
SCHEMA_REGISTRY_DEBUG: "true"
SCHEMA_REGISTRY_HOST_NAME: localhost
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: kafka-schema-registry.schemas
ports:
- "8081:8081"
restart: on-failure
healthcheck:
test: curl http://localhost:8081
interval: 30s
timeout: 30s
retries: 5
start_period: 30s
networks:
default:
external: true
name: kafka-sandbox_network
Kafka Connect
It makes it simple to quickly define connectors that move large data sets into and out of Kafka.
- connect
- connect settings
- connect api reference
- jdbc connector plugin
- mongo connector plugin
- project location: kafka-connect
- plugins location: kafka-connect/plugins
- requests location: kafka-connect/requests
- connect port:
8083
Run Kafka Connect:
cd kafka-connect
docker compose up -d
cd ..
After a few seconds:
http :8083/connector-plugins
Docker Compose
services:
kafka-connect:
image: confluentinc/cp-kafka-connect:${VERSION}
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_GROUP_ID: kafka-connect-sandbox
CONNECT_PLUGIN_PATH: /usr/local/share/kafka/plugins
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect.config
CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect.offsets
CONNECT_STATUS_STORAGE_TOPIC: kafka-connect.status
ports:
- "8083:8083"
restart: on-failure
volumes:
- ./plugins:/usr/local/share/kafka/plugins
healthcheck:
test: curl http://localhost:8083
interval: 30s
timeout: 30s
retries: 5
start_period: 30s
networks:
default:
external: true
name: kafka-sandbox_network
Kafka Connect Database Example
⚠️ This example does not support deletion, for that you have to implement tombstone events at the source and sink.
Populate the databases:
sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100
Create the connectors using the API:
cd kafka-connect
http :8083/connectors < requests/create-connector-mysql-source.json
http :8083/connectors < requests/create-connector-mongo-sink.json
http :8083/connectors < requests/create-connector-postgres-sink.json
cd ..
For deleting the connectors:
http DELETE :8083/connectors/postgres-sink
http DELETE :8083/connectors/mongo-sink
http DELETE :8083/connectors/mysql-source
Requests
requests/create-connector-mysql-source.json
{
"name": "mysql-source",
"config": {
"tasks.max": "1",
"table.whitelist": "customers",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mysql:3306/sandbox",
"connection.user": "root",
"connection.password": "notasecret",
"mode": "timestamp",
"timestamp.column.name": "created",
"topic.prefix": "kafka-connect.",
"transforms": "createKey",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id"
}
}
requests/create-connector-mongo-sink.json
{
"name": "mongo-sink",
"config": {
"tasks.max": "1",
"topics": "kafka-connect.customers",
"collection": "customers",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://root:notasecret@mongo:27017",
"database": "sandbox"
}
}
requests/create-connector-postgres-sink.json
{
"name": "postgres-sink",
"config": {
"tasks.max": "1",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://postgres:5432/sandbox",
"connection.user": "postgres",
"connection.password": "notasecret",
"delete.enabled": false,
"pk.mode": "record_key",
"pk.key": "id",
"insert.mode": "upsert",
"auto.create": true,
"topics": "kafka-connect.customers",
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": "kafka-connect\\.(.*)",
"transforms.dropPrefix.replacement": "$1"
}
}
Kafka Connect MQTT Example
Subscribe to topics (for debugging purposes):
mqtt-cli sub -h mosquitto -t 'house/+/brightness'
Create a connector using the API:
cd kafka-connect
http :8083/connectors < requests/create-connector-mqtt-source.json
cd ..
Publish messages:
mqtt-cli pub -h mosquitto -t 'house/room/brightness' -m '800LM'
mqtt-cli pub -h mosquitto -t 'house/kitchen/brightness' -m '1000LM'
Consuming the data:
kafka-cli kafka-console-consumer --from-beginning --group kafka-connect.brightness_consumer \
--topic kafka-connect.brightness \
--bootstrap-server kafka1:9092 \
--property print.key=true
For deleting the connector:
http DELETE :8083/connectors/mqtt-source
Requests
requests/create-connector-mqtt-source.json
{
"name": "mqtt-source",
"config": {
"connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max": "1",
"mqtt.server.uri": "tcp://mosquitto:1883",
"mqtt.topics":"house/+/brightness",
"kafka.topic":"kafka-connect.brightness",
"mqtt.qos": "2",
"confluent.topic.bootstrap.servers": "kafka1:9092",
"confluent.topic.replication.factor": "1"
}
}
Kafka ksqlDB
ksqlDB is a database that's purpose-built for stream processing applications.
⚠️ ksqlDB it is not a SQL database, it provides an extra layer for implementing kstream, ktable and connectors through a language (ksql) based on sql.
- ksqldb
- ksqldb settings
- ksqldb test runner
- project location: kafka-ksqldb
- ksqldb port:
8088
Create an alias for ksqldb-cli
:
⚠️ Run alias commands inside the root folder.
alias ksqldb-cli="docker run --rm -it --network kafka-sandbox_network --workdir /ksqldb -v $PWD/kafka-ksqldb/tests:/ksqldb/tests -v $PWD/kafka-ksqldb/statements:/ksqldb/statements -v $PWD/kafka-ksqldb-extensions/extensions:/ksqldb/extensions kafka-cli:latest "
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias ksqldb-cli='docker run --rm -it --network kafka-sandbox_network --workdir /ksqldb -v $PWD/kafka-ksqldb/tests:/ksqldb/tests -v $PWD/kafka-ksqldb/statements:/ksqldb/statements -v $PWD/kafka-ksqldb-extensions/extensions:/ksqldb/extensions kafka-cli:latest '" >> ~/.zshrc
Run ksqlDB:
cd kafka-ksqldb
docker compose up -d
cd ..
After a few seconds:
http :8088/info
One line shell interaction:
ksqldb-cli ksql -e "SHOW STREAMS;" http://ksqldb:8088
Interactive ksqlDB shell:
ksqldb-cli ksql http://ksqldb:8088
SHOW STREAMS;
Docker Compose
services:
ksqldb:
image: confluentinc/cp-ksqldb-server:${VERSION}
environment:
KSQL_KSQL_SERVICE_ID: kafka-ksqldb.
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_EXTENSION_DIR: /ksqldb/extensions
ports:
- "8088:8088"
restart: on-failure
volumes:
- ../kafka-ksqldb-extensions/extensions:/ksqldb/extensions
healthcheck:
test: curl http://localhost:8088
interval: 30s
timeout: 30s
retries: 5
start_period: 30s
networks:
default:
external: true
name: kafka-sandbox_network
ksqlDB Streams Example
- statements location: kafka-ksqldb/statements
- test location: kafka-ksqldb/tests
Test runner:
ksqldb-cli ksql-test-runner -e extensions/ \
-s statements/create-orders.ksql \
-i tests/orders-input.json \
-o tests/orders-output.json | grep '>>>'
Execute statement files, create orders stream:
ksqldb-cli ksql -f statements/create-orders.ksql http://ksqldb:8088
Insert orders:
ksqldb-cli ksql -f statements/insert-orders.ksql http://ksqldb:8088
List of streams:
http :8088/ksql ksql="list streams;" | jq '.[].streams[] | [{name: .name, topic: .topic}]'
Show content:
ksqldb-cli ksql -e "PRINT 'kafka-ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088
Deleting all orders:
ksqldb-cli ksql -e "DROP STREAM ORDERSIZES DELETE TOPIC; DROP STREAM ORDERS DELETE TOPIC;" http://ksqldb:8088
Statements
statements/create-orders.ksql
CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
WITH (kafka_topic='kafka-ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);
CREATE STREAM orderSizes
WITH (kafka_topic='kafka-ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
AS SELECT
orderId,
orderUnits,
totalAmount,
CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
taxes(totalAmount) AS tax
FROM orders EMIT CHANGES;
statements/insert-orders.ksql
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);
Tests
tests/orders-input.json
{
"inputs": [
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000000, "value": {"orderUnits": 2, "totalAmount": 100.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000001, "value": {"orderUnits": 4, "totalAmount": 200.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000002, "value": {"orderUnits": 6, "totalAmount": 300.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000003, "value": {"orderUnits": 3, "totalAmount": 150.0}},
{"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000004, "value": {"orderUnits": 1, "totalAmount": 50.0}}
]
}
tests/orders-output.json
{
"outputs": [
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000000, "value": {"ORDERUNITS": 2, "TOTALAMOUNT": 100.0, "ORDERSIZE": "medium", "TAX": 12.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000001, "value": {"ORDERUNITS": 4, "TOTALAMOUNT": 200.0, "ORDERSIZE": "large" , "TAX": 24.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000002, "value": {"ORDERUNITS": 6, "TOTALAMOUNT": 300.0, "ORDERSIZE": "large" , "TAX": 36.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000003, "value": {"ORDERUNITS": 3, "TOTALAMOUNT": 150.0, "ORDERSIZE": "medium", "TAX": 18.0}},
{"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000004, "value": {"ORDERUNITS": 1, "TOTALAMOUNT": 50.0, "ORDERSIZE": "small" , "TAX": 6.0}}
]
}
ksqlDB Extensions
ksqlDB extensions are pieces of logic for transforming or aggregating events that ksqlDB can't currently express.
- ksqldb extensions (udf, udtf, udaf)
- project location: kafka-ksqldb-extensions
- extensions location: kafka-ksqldb-extensions/extensions
Check the Kafka ksqlDB section.
For creating the jar
extension, you can use the following command (development purposes):
./gradlew kafka-ksqldb-extensions:shadowJar
⚠️ Java 11 is needed here.
Custom UDF
package kafka.sandbox.ksqldb;
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;
@UdfDescription(
name = "taxes",
author = "kafka sandbox",
version = "1.0.0",
description = "A custom taxes formula for orders."
)
public class TaxesUdf {
public static final double TAXES = .12;
@Udf(description = "Calculate taxes.")
public double taxes(@UdfParameter double amount) {
return amount * TAXES;
}
}
Kafka Clients
Java examples for producing and consuming messages from Kafka using the java kafka client lib.
Avro Producer and Consumer
These examples produce and consume messages from the supplier
topic. The producer example produces random suppliers.
- kafka producer and consumer example
- kafka consumer settings
- kafka producer settings
- avro project location: kafka-avro
- project location: kafka-clients
⚠️ Run these commands inside the root folder.
Create an alias for kafka-clients
:
alias kafka-clients="$PWD/kafka-clients/build/install/kafka-clients/bin/kafka-clients "
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias kafka-clients='$PWD/kafka-clients/build/install/kafka-clients/bin/kafka-clients '" >> ~/.zshrc
Create a topic:
kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic kafka-clients.suppliers
Install the app:
./gradlew kafka-clients:install
kafka-clients
Run clients:
kafka-clients producer 100
kafka-clients consumer
For creating a AVRO schema, you can use the following command (development purposes):
./gradlew kafka-avro:build
Avro Schema
suppliers-v1.avsc
{
"type": "record",
"name": "Supplier",
"namespace": "kafka.sandbox.avro",
"version": "1",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "address",
"type": "string"
},
{
"name": "country",
"type": "string"
}
]
}
Spring Boot
Spring Boot + Spring Kafka producer and consumer examples.
- confluent spring kafka examples
- spring kafka settings
- project location: kafka-spring-boot
- spring port:
8585
⚠️ Run these commands inside the root folder.
Run spring boot:
./gradlew kafka-spring-boot:bootRun
In another terminal:
http :8585/actuator/health
http :8585/produce messages==10
Streams
Kafka Streams is a client library providing organizations with a particularly efficient framework for processing streaming data. It offers a streamlined method for creating applications and microservices that must process data in real-time to be effective.
Check the Kafka Clients - Avro Producer and Consumer section.
- kafka streams
- kafka streams examples
- more kafka streams examples here.
- project location: kafka-streams
⚠️ Run these commands inside the root folder.
Create an alias for kafka-streams
:
alias kafka-streams="$PWD/kafka-streams/build/install/kafka-streams/bin/kafka-streams "
To permanently add the alias to your shell (~/.bashrc
or ~/.zshrc
file):
echo "alias kafka-streams='$PWD/kafka-streams/build/install/kafka-streams/bin/kafka-streams '" >> ~/.zshrc
Install the app:
./gradlew kafka-streams:install
kafka-streams
Run streams:
kafka-streams streams
Print results:
kafka-cli kafka-console-consumer --from-beginning --group kafka-streams.consumer \
--topic kafka-streams.supplier_counts_by_country \
--bootstrap-server kafka1:9092 \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Interactive Queries
Interactive Queries allow you to leverage the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable.
This example is using gRPC to request queries to the kafka stream server.
Query the total supplier count by a given country:
kafka-streams count <country>
Example:
kafka-streams count Ecuador
Output:
Country: Ecuador, Total Suppliers: 4
Take into account that you have to run the stream first. Check the Kafka Streams section.
Check the stream topology:
package kafka.sandbox.cli;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import kafka.sandbox.avro.Supplier;
import kafka.sandbox.grpc.CounterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import picocli.CommandLine;
import picocli.CommandLine.Command;
@Slf4j
@Command(name = "streams", description = "Creates and start kafka streams")
public class Streams implements Callable<Integer> {
public static final String TOPIC_FROM = "kafka-clients.suppliers";
public static final String TOPIC_TO = "kafka-streams.supplier_counts_by_country";
private final Properties props;
public Streams(Properties props) {
this.props = props;
}
@Override
public Integer call() throws Exception {
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
StreamsBuilder builder = new StreamsBuilder();
// read from suppliers topic
KStream<String, Supplier> suppliers = builder.stream(TOPIC_FROM);
// aggregate the new supplier counts by country
KTable<String, Long> aggregated = suppliers
// map the country as key
.map((key, value) -> new KeyValue<>(value.getCountry(), value))
.groupByKey()
// aggregate and materialize the store
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("SupplierCountByCountry"));
// print results
aggregated
.toStream()
.foreach((key, value) -> log.info("Country = {}, Total supplier counts = {}", key, value));
// write the results to a topic
aggregated.toStream().to(TOPIC_TO, Produced.with(stringSerde, longSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.cleanUp();
// GRPC Server
Server server = Grpc.newServerBuilderForPort(5050, InsecureServerCredentials.create())
.addService(new CounterService(streams))
.build();
// attach shutdown handler to catch control-c and creating a latch
CountDownLatch latch = new CountDownLatch(1);
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
server.shutdownNow();
streams.close();
latch.countDown();
}
});
streams.start();
server.start();
latch.await();
return CommandLine.ExitCode.OK;
}
}
Check the gRPC server:
package kafka.sandbox.grpc;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import io.grpc.stub.StreamObserver;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceImplBase;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
public class CounterService extends CounterServiceImplBase {
private KafkaStreams streams;
public CounterService(KafkaStreams streams) {
this.streams = streams;
}
@Override
public void getCountByCountry(CountRequest request, StreamObserver<CountReply> responseObserver) {
ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
StoreQueryParameters.fromNameAndType("SupplierCountByCountry", QueryableStoreTypes.keyValueStore()));
String country = request.getName();
Long total = keyValueStore.get(country);
String value = String.format("Country: %s, Total Suppliers: %s", country, total != null ? total : 0);
CountReply reply = CountReply.newBuilder().setMessage(value).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
Check the gRPC client:
package kafka.sandbox.cli;
import java.util.concurrent.Callable;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
import kafka.sandbox.proto.CounterServiceGrpc;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceBlockingStub;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;
@Slf4j
@Command(name = "count", description = "Print the total supplier count by a given country")
public class Count implements Callable<Integer> {
@Parameters(index = "0", description = "Country (default: ${DEFAULT-VALUE})", defaultValue = "Ecuador")
private String country;
@Override
public Integer call() throws Exception {
ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
.build();
CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
log.info(countByCountry.getMessage());
return CommandLine.ExitCode.OK;
}
}
Kafka Performance Tools
Performance tuning involves two important metrics:
- Latency measures how long it takes to process one event.
- Throughput measures how many events arrive within a specific amount of time.
Run help:
kafka-cli kafka-producer-perf-test --help
kafka-cli kafka-consumer-perf-test --help
Create a topic:
kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic kafka-cluster.performance-test
Performance Tests
Test producer:
kafka-cli kafka-producer-perf-test --topic kafka-cluster.performance-test \
--throughput -1 \
--num-records 3000000 \
--record-size 1024 \
--producer-props acks=all bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
- Throughput in MB/sec.
- Latency in milliseconds.
Test consumer:
kafka-cli kafka-consumer-perf-test --topic kafka-cluster.performance-test \
--broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
--messages 3000000
start.time, end.time
: shows test start and end time.data.consumed.in.MB
: shows the size of all messages consumed.MB.sec
: shows how much data transferred in megabytes per second (Throughput on size).data.consumed.in.nMsg
: shows the count of the total messages consumed during this test.nMsg.sec
: shows how many messages were consumed in a second (Throughput on the count of messages).
Test end to end latency:
kafka-cli kafka-run-class kafka.tools.EndToEndLatency kafka1:9092,kafka2:9092,kafka3:9092 kafka-cluster.performance-test 10000 1 1024
- This class records the average end to end latency for a single message to travel through Kafka.
- Latency in milliseconds.
Kafka REST Proxy
The Kafka REST Proxy provides a RESTful interface to a Kafka cluster.
⚠️ Use this when you really need a rest interface since it is usually more complex than using conventional kafka clients.
- kafka rest
- kafka rest settings
- kafka rest api reference
- project location: kafka-rest
- requests location: kafka-rest/requests
- kafka rest port:
8082
Run Kafka REST Proxy:
cd kafka-rest
docker compose up -d
cd ..
After a few seconds:
http :8082/brokers
Create topics:
cd kafka-rest
http :8082/topics/kafka-rest.test Content-Type:application/vnd.kafka.json.v2+json records:='[{ "key": "test", "value": "test" }]'
http :8082/topics/kafka-rest.users Content-Type:application/vnd.kafka.avro.v2+json < requests/produce-avro-message.json
cd ..
Docker Compose
services:
kafka-rest:
image: confluentinc/cp-kafka-rest:${VERSION}
environment:
KAFKA_REST_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
ports:
- "8082:8082"
restart: on-failure
healthcheck:
test: curl http://localhost:8082
interval: 30s
timeout: 30s
retries: 5
start_period: 30s
networks:
default:
external: true
name: kafka-sandbox_network
Requests
requests/produce-avro-message.json
{
"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}",
"records": [
{
"value": {
"name": "John Doe"
}
}
]
}
Kafka MQTT Proxy
MQTT Proxy enables MQTT clients to use the MQTT 3.1.1 protocol to publish data directly to Apache Kafka.
⚠️ This does not convert kafka into a MQTT broker, this aims to provide a simple way to publish/persist IoT data to Kafka.
- kafka mqtt
- kafka mqtt settings
- project location: kafka-mqtt
- kafka mqtt tcp port:
1884
Run Kafka MQTT Proxy:
cd kafka-mqtt
docker compose up -d
cd ..
Publish a message:
mqtt-cli pub -h kafka-mqtt -p 1884 -t 'house/room/temperature' -m '20C'
Consuming the data:
kafka-cli kafka-console-consumer --from-beginning --group kafka-mqtt.consumer \
--topic kafka-mqtt.temperature \
--bootstrap-server kafka1:9092 \
--property print.key=true
Docker Compose
services:
kafka-mqtt:
image: confluentinc/cp-kafka-mqtt:${VERSION}
environment:
KAFKA_MQTT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
KAFKA_MQTT_TOPIC_REGEX_LIST: kafka-mqtt.temperature:.*temperature
KAFKA_MQTT_LISTENERS: 0.0.0.0:1884
ports:
- "1884:1884"
restart: on-failure
networks:
default:
external: true
name: kafka-sandbox_network
UI Ports Table
Component Ports Table
Service | Host | Port |
---|---|---|
MySQL | mysql | 3306 |
MySQL | localhost | 3306 |
PostgreSQL | postgres | 5432 |
PostgreSQL | localhost | 5432 |
MongoDB | mongo | 27017 |
MongoDB | localhost | 27017 |
Mosquitto | mosquitto | 1883 |
Mosquitto | localhost | 1883 |
Kafka 1 | kafka1 | 9092 |
Kafka 1 | localhost | 19092 |
Kafka 2 | kafka2 | 9092 |
Kafka 2 | localhost | 29092 |
Kafka 3 | kafka3 | 9092 |
Kafka 3 | localhost | 39092 |
Kafka 1 JMX | kafka1 | 19999 |
Kafka 1 JMX | localhost | 19999 |
Kafka 2 JMX | kafka2 | 29999 |
Kafka 2 JMX | localhost | 29999 |
Kafka 3 JMX | kafka3 | 39999 |
Kafka 3 JMX | localhost | 39999 |
Schema Registry | schema-registry | 8081 |
Schema Registry | localhost | 8081 |
Kafka REST | kafka-rest | 8082 |
Kafka REST | localhost | 8082 |
Kafka Connect | kafka-connect | 8083 |
Kafka Connect | localhost | 8083 |
Kafka MQTT | kafka-mqtt | 1884 |
Kafka MQTT | localhost | 1884 |
ksqlDB | ksqldb | 8088 |
ksqlDB | localhost | 8088 |
Kafka Clients Spring Boot | localhost | 8585 |
Kafka Streams gRPC Server | localhost | 5050 |
About This Book
This book is power by mdBook.
GitHub Repository.
Developing Commands
You must install rust first.
Install mdbook
:
cargo install mdbook
Run local server:
mdbook serve --open
Build statics:
mdbook build
Using Docker
Create docker image:
docker build -t sauljabin/kafka-sandbox-book:latest -f docker/Dockerfile .
Running the book (open it in the web browser):
docker run --name kafka-sandbox-book -d -p 80:80 sauljabin/kafka-sandbox-book:latest