Introduction

Kafka Sandbox helps you to deploy a kafka sandbox locally. It intends to be a simple way to get started with kafka and help you on your learning path. It provides you with a wide variety of tools from the kafka ecosystem and a simple way to run them all. It also includes a set of tools and tips to make it easier for you to use kafka. It does not include security since it is not a production system.

Install Dependencies

Other Useful Utilities

  • curl - command line http client
  • lazydocker - docker text user interface
  • kaskade - kafka text user interface

Quick Start

Clone the repo:

git clone https://github.com/sauljabin/kafka-sandbox.git
cd kafka-sandbox

Create a docker network:

docker network create kafka-sandbox_network

Run the kafka cluster:

cd kafka-cluster
docker compose up -d
cd ..

Run AKHQ:

cd kafka-akhq
docker compose up -d
cd ..

Open AKHQ at http://localhost:8080/

Kafka CLI Tools

It is a collection of tools to interact with kafka cluster through the terminal.

⚠️ Run these commands inside the root folder.

Create an alias for kafka-cli:

alias kafka-cli='docker run --rm -it --network kafka-sandbox_network kafka-cli:latest '

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias kafka-cli='docker run --rm -it --network kafka-sandbox_network kafka-cli:latest '" >> ~/.zshrc

Create the docker image:

cd kafka-cli
docker build -t kafka-cli:latest .
kafka-cli

SQL Database

Create a MySQL and PostgresSQL instance and a database.

Run MySQL, PostgresSQL and Adminer:

cd sql-database
docker compose up -d
cd ..

Docker Compose

services:
  mysql:
    image: mysql:8
    environment:
      MYSQL_DATABASE: sandbox
      MYSQL_ROOT_PASSWORD: notasecret
    ports:
      - "3306:3306"
    restart: on-failure
    volumes:
      - mysql_data:/var/lib/mysql

  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: sandbox
      POSTGRES_PASSWORD: notasecret
    ports:
      - "5432:5432"
    restart: on-failure
    volumes:
      - postgres_data:/var/lib/postgresql/data

  adminer:
    image: adminer:4
    ports:
      - "9090:8080"
    restart: on-failure

volumes:
  mysql_data:
  postgres_data:

networks:
  default:
    external: true
    name: kafka-sandbox_network

SQL Populate Database

This tool helps to populate either a MySQL or PostgresSQL database with random customers. This is an ancillary project that can help us to set different scenarios.

⚠️ Run these commands inside the root folder.

Create an alias for sql-populate:

alias sql-populate="$PWD/sql-populate/build/install/sql-populate/bin/sql-populate "

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias sql-populate='$PWD/sql-populate/build/install/sql-populate/bin/sql-populate '" >> ~/.zshrc

Install the app:

./gradlew sql-populate:install
sql-populate

Examples:

sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100
sql-populate --url "jdbc:postgresql://localhost:5432/sandbox" --user "postgres" --password "notasecret" 100

NoSQL Database

Create a MongoDB instance and a database.

Run MongoDB and Mongo Express:

cd nosql-database
docker compose up -d
cd ..

Docker Compose

services:
  mongo:
    image: mongo:5
    environment:
      MONGO_INITDB_ROOT_PASSWORD: notasecret
      MONGO_INITDB_ROOT_USERNAME: root
    ports:
      - "27017:27017"
    restart: on-failure
    volumes:
      - mongo_data:/data/db
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongo mongo:27017/test --quiet
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 40s

  mongo-express:
    image: mongo-express:latest
    environment:
      ME_CONFIG_MONGODB_ADMINPASSWORD: notasecret
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_PORT: "27017"
      ME_CONFIG_MONGODB_SERVER: mongo
      ME_CONFIG_MONGODB_ENABLE_ADMIN: "true"
    ports:
      - "7070:8081"
    restart: on-failure
    depends_on:
      mongo:
        condition: service_healthy

volumes:
  mongo_data:

networks:
  default:
    external: true
    name: kafka-sandbox_network

NoSQL Populate Database

This tool helps to populate MongoDB with random customers. This is an ancillary project that can help us to set different scenarios.

⚠️ Run these commands inside the root folder.

Create an alias for nosql-populate:

alias nosql-populate="$PWD/nosql-populate/build/install/nosql-populate/bin/nosql-populate "

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias nosql-populate='$PWD/nosql-populate/build/install/nosql-populate/bin/nosql-populate '" >> ~/.zshrc

Install the app:

./gradlew nosql-populate:install
nosql-populate

Example:

nosql-populate --url "mongodb://root:notasecret@localhost:27017" -d "sandbox" 100

MQTT CLI Tools

MQTT collection of tools to interact with a MQTT broker.

⚠️ Run these commands inside the root folder.

Create an alias for mqtt-cli:

alias mqtt-cli='docker run --rm -it --network kafka-sandbox_network hivemq/mqtt-cli:latest '

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias mqtt-cli='docker run --rm -it --network kafka-sandbox_network hivemq/mqtt-cli:latest '" >> ~/.zshrc

Test the cli:

mqtt-cli

MQTT Broker

Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1. Mosquitto is lightweight and is suitable for use on all devices from low power single board computers to full servers.

Run Mosquitto:

cd mqtt-broker
docker compose up -d
cd ..

After a few seconds:

mqtt-cli test -h mosquitto

Docker Compose

services:
  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
    restart: on-failure
    volumes:
      - ./mosquitto.conf:/mosquitto/config/mosquitto.conf
      - mosquitto_data:/mosquitto/data

volumes:
  mosquitto_data:

networks:
  default:
    external: true
    name: kafka-sandbox_network

Kafka Cluster

A three node kafka cluster.

Run kafka cluster:

cd kafka-cluster
docker compose up -d
cd ..

Create a topic:

kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
                       --replication-factor 3 \
                       --partitions 3 \
                       --topic kafka-cluster.test
kafka-cli kafka-topics --bootstrap-server kafka1:9092 --list

Produce a message:

kafka-cli kafka-console-producer --broker-list kafka1:9092 --topic kafka-cluster.test

Consume messages:

kafka-cli kafka-console-consumer --from-beginning --group kafka-cluster.test \
                                 --topic kafka-cluster.test  \
                                 --bootstrap-server kafka1:9092

⚠️ The JMX ports were opened to monitor kafka using jconsole.

Run jconsole:

jconsole localhost:19999

Docker Compose

services:
  kafka1:
    image: confluentinc/cp-kafka:${VERSION}
    environment:
      KAFKA_BROKER_ID: "1"
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:19092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_JMX_PORT: "19999"
      CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
    ports:
      - "19092:19092"
      - "19999:19999"
    restart: on-failure
    volumes:
      - kafka1_data:/var/lib/kafka/data
    healthcheck:
      test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 10s

  kafka2:
    image: confluentinc/cp-kafka:${VERSION}
    environment:
      KAFKA_BROKER_ID: "2"
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_JMX_PORT: "29999"
      CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
    ports:
      - "29092:29092"
      - "29999:29999"
    restart: on-failure
    volumes:
      - kafka2_data:/var/lib/kafka/data
    healthcheck:
      test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 10s

  kafka3:
    image: confluentinc/cp-kafka:${VERSION}
    environment:
      KAFKA_BROKER_ID: "3"
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:39092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: CONTROLLER://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_JMX_PORT: "39999"
      CLUSTER_ID: "arN_n1N_QHqihZJPxv7URA"
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka1:9093,2@kafka2:9093,3@kafka3:9093"
    ports:
      - "39092:39092"
      - "39999:39999"
    restart: on-failure
    volumes:
      - kafka3_data:/var/lib/kafka/data
    healthcheck:
      test: kafka-topics --bootstrap-server localhost:9092 --list > /dev/null 2>&1
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 10s

volumes:
  kafka1_data:
  kafka2_data:
  kafka3_data:

networks:
  default:
    external: true
    name: kafka-sandbox_network

Kafka AKHQ

UI for managing kafka cluster.

Run AKHQ:

cd kafka-akhq
docker compose up -d
cd ..

Docker Compose

services:
  akhq:
    image: tchiotludo/akhq:latest
    ports:
      - "8080:8080"
    restart: on-failure
    healthcheck:
      test: curl http://localhost:8080
      interval: 30s
      timeout: 30s
      retries: 5
      start_period: 30s
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: kafka1:9092,kafka2:9092,kafka3:9092
              schema-registry:
                url: http://schema-registry:8081
              connect:
                - name: kafka-connect
                  url: http://kafka-connect:8083

networks:
  default:
    external: true
    name: kafka-sandbox_network

Kafka Schema Registry

It provides a RESTful interface for storing and retrieving your Avro, JSON Schema, and Protobuf schemas.

Run Schema Registry:

cd kafka-schema-registry
docker compose up -d
cd ..

After a few seconds:

http :8081/config

Docker Compose

services:
  schema-registry:
    image: confluentinc/cp-schema-registry:${VERSION}
    environment:
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_METHODS: GET,POST,PUT,OPTIONS
      SCHEMA_REGISTRY_ACCESS_CONTROL_ALLOW_ORIGIN: "*"
      SCHEMA_REGISTRY_DEBUG: "true"
      SCHEMA_REGISTRY_HOST_NAME: localhost
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: kafka-schema-registry.schemas
    ports:
      - "8081:8081"
    restart: on-failure
    healthcheck:
      test: curl http://localhost:8081
      interval: 30s
      timeout: 30s
      retries: 5
      start_period: 30s

networks:
  default:
    external: true
    name: kafka-sandbox_network

Kafka Connect

It makes it simple to quickly define connectors that move large data sets into and out of Kafka.

Run Kafka Connect:

cd kafka-connect
docker compose up -d
cd ..

After a few seconds:

http :8083/connector-plugins

Docker Compose

services:
  kafka-connect:
    image: confluentinc/cp-kafka-connect:${VERSION}
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: localhost
      CONNECT_GROUP_ID: kafka-connect-sandbox
      CONNECT_PLUGIN_PATH: /usr/local/share/kafka/plugins
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect.config
      CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect.offsets
      CONNECT_STATUS_STORAGE_TOPIC: kafka-connect.status
    ports:
      - "8083:8083"
    restart: on-failure
    volumes:
      - ./plugins:/usr/local/share/kafka/plugins
    healthcheck:
      test: curl http://localhost:8083
      interval: 30s
      timeout: 30s
      retries: 5
      start_period: 30s

networks:
  default:
    external: true
    name: kafka-sandbox_network

Kafka Connect Database Example

⚠️ This example does not support deletion, for that you have to implement tombstone events at the source and sink.

Populate the databases:

sql-populate --url "jdbc:mysql://localhost:3306/sandbox" --user "root" --password "notasecret" 100

Create the connectors using the API:

cd kafka-connect
http :8083/connectors < requests/create-connector-mysql-source.json
http :8083/connectors < requests/create-connector-mongo-sink.json
http :8083/connectors < requests/create-connector-postgres-sink.json
cd ..

For deleting the connectors:

http DELETE :8083/connectors/postgres-sink
http DELETE :8083/connectors/mongo-sink
http DELETE :8083/connectors/mysql-source

Requests

requests/create-connector-mysql-source.json

{
  "name": "mysql-source",
  "config": {
    "tasks.max": "1",
    "table.whitelist": "customers",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/sandbox",
    "connection.user": "root",
    "connection.password": "notasecret",
    "mode": "timestamp",
    "timestamp.column.name": "created",
    "topic.prefix": "kafka-connect.",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

requests/create-connector-mongo-sink.json

{
  "name": "mongo-sink",
  "config": {
    "tasks.max": "1",
    "topics": "kafka-connect.customers",
    "collection": "customers",
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "connection.uri": "mongodb://root:notasecret@mongo:27017",
    "database": "sandbox"
  }
}

requests/create-connector-postgres-sink.json

{
  "name": "postgres-sink",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/sandbox",
    "connection.user": "postgres",
    "connection.password": "notasecret",
    "delete.enabled": false,
    "pk.mode": "record_key",
    "pk.key": "id",
    "insert.mode": "upsert",
    "auto.create": true,
    "topics": "kafka-connect.customers",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "kafka-connect\\.(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}

Kafka Connect MQTT Example

Subscribe to topics (for debugging purposes):

mqtt-cli sub -h mosquitto -t 'house/+/brightness'

Create a connector using the API:

cd kafka-connect
http :8083/connectors < requests/create-connector-mqtt-source.json
cd ..

Publish messages:

mqtt-cli pub -h mosquitto -t 'house/room/brightness' -m '800LM'
mqtt-cli pub -h mosquitto -t 'house/kitchen/brightness' -m '1000LM'

Consuming the data:

kafka-cli kafka-console-consumer --from-beginning --group kafka-connect.brightness_consumer \
                                 --topic kafka-connect.brightness  \
                                 --bootstrap-server kafka1:9092 \
                                 --property print.key=true

For deleting the connector:

http DELETE :8083/connectors/mqtt-source

Requests

requests/create-connector-mqtt-source.json

{
  "name": "mqtt-source",
  "config": {
      "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
      "tasks.max": "1",
      "mqtt.server.uri": "tcp://mosquitto:1883",
      "mqtt.topics":"house/+/brightness",
      "kafka.topic":"kafka-connect.brightness",
      "mqtt.qos": "2",
      "confluent.topic.bootstrap.servers": "kafka1:9092",
      "confluent.topic.replication.factor": "1"
  }
}

Kafka ksqlDB

ksqlDB is a database that's purpose-built for stream processing applications.

⚠️ ksqlDB it is not a SQL database, it provides an extra layer for implementing kstream, ktable and connectors through a language (ksql) based on sql.

Create an alias for ksqldb-cli:

⚠️ Run alias commands inside the root folder.

alias ksqldb-cli="docker run --rm -it --network kafka-sandbox_network --workdir /ksqldb -v $PWD/kafka-ksqldb/tests:/ksqldb/tests -v $PWD/kafka-ksqldb/statements:/ksqldb/statements -v $PWD/kafka-ksqldb-extensions/extensions:/ksqldb/extensions kafka-cli:latest "

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias ksqldb-cli='docker run --rm -it --network kafka-sandbox_network --workdir /ksqldb -v $PWD/kafka-ksqldb/tests:/ksqldb/tests -v $PWD/kafka-ksqldb/statements:/ksqldb/statements -v $PWD/kafka-ksqldb-extensions/extensions:/ksqldb/extensions kafka-cli:latest '" >> ~/.zshrc

Run ksqlDB:

cd kafka-ksqldb
docker compose up -d
cd ..

After a few seconds:

http :8088/info

One line shell interaction:

ksqldb-cli ksql -e "SHOW STREAMS;" http://ksqldb:8088

Interactive ksqlDB shell:

ksqldb-cli ksql http://ksqldb:8088
SHOW STREAMS;

Docker Compose

services:
  ksqldb:
    image: confluentinc/cp-ksqldb-server:${VERSION}
    environment:
      KSQL_KSQL_SERVICE_ID: kafka-ksqldb.
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_EXTENSION_DIR: /ksqldb/extensions
    ports:
      - "8088:8088"
    restart: on-failure
    volumes:
      - ../kafka-ksqldb-extensions/extensions:/ksqldb/extensions
    healthcheck:
      test: curl http://localhost:8088
      interval: 30s
      timeout: 30s
      retries: 5
      start_period: 30s

networks:
  default:
    external: true
    name: kafka-sandbox_network

ksqlDB Streams Example

Test runner:

ksqldb-cli ksql-test-runner -e extensions/ \
                            -s statements/create-orders.ksql \
                            -i tests/orders-input.json \
                            -o tests/orders-output.json | grep '>>>'

Execute statement files, create orders stream:

ksqldb-cli ksql -f statements/create-orders.ksql http://ksqldb:8088

Insert orders:

ksqldb-cli ksql -f statements/insert-orders.ksql http://ksqldb:8088

List of streams:

http :8088/ksql ksql="list streams;" | jq '.[].streams[] | [{name: .name, topic: .topic}]'

Show content:

ksqldb-cli ksql -e "PRINT 'kafka-ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088

Deleting all orders:

ksqldb-cli ksql -e "DROP STREAM ORDERSIZES DELETE TOPIC; DROP STREAM ORDERS DELETE TOPIC;" http://ksqldb:8088

Statements

statements/create-orders.ksql

CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
  WITH (kafka_topic='kafka-ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);

CREATE STREAM orderSizes 
  WITH (kafka_topic='kafka-ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
  AS SELECT
    orderId,
    orderUnits,
    totalAmount,
    CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
    taxes(totalAmount) AS tax
  FROM orders EMIT CHANGES;

statements/insert-orders.ksql

INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);

Tests

tests/orders-input.json

{
  "inputs": [
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000000, "value": {"orderUnits": 2, "totalAmount": 100.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000001, "value": {"orderUnits": 4, "totalAmount": 200.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000002, "value": {"orderUnits": 6, "totalAmount": 300.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000003, "value": {"orderUnits": 3, "totalAmount": 150.0}},
    {"topic": "kafka-ksqldb.orders", "timestamp": 0, "key": 1000004, "value": {"orderUnits": 1, "totalAmount": 50.0}}
  ]
}

tests/orders-output.json

{
  "outputs": [
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000000, "value": {"ORDERUNITS": 2, "TOTALAMOUNT": 100.0, "ORDERSIZE": "medium", "TAX": 12.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000001, "value": {"ORDERUNITS": 4, "TOTALAMOUNT": 200.0, "ORDERSIZE": "large" , "TAX": 24.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000002, "value": {"ORDERUNITS": 6, "TOTALAMOUNT": 300.0, "ORDERSIZE": "large" , "TAX": 36.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000003, "value": {"ORDERUNITS": 3, "TOTALAMOUNT": 150.0, "ORDERSIZE": "medium", "TAX": 18.0}},
    {"topic": "kafka-ksqldb.order_sizes", "timestamp": 0, "key": 1000004, "value": {"ORDERUNITS": 1, "TOTALAMOUNT": 50.0, "ORDERSIZE": "small" , "TAX": 6.0}}
  ]
}

ksqlDB Extensions

ksqlDB extensions are pieces of logic for transforming or aggregating events that ksqlDB can't currently express.

Check the Kafka ksqlDB section.

For creating the jar extension, you can use the following command (development purposes):

./gradlew kafka-ksqldb-extensions:shadowJar

⚠️ Java 11 is needed here.

Custom UDF

package kafka.sandbox.ksqldb;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(
    name = "taxes",
    author = "kafka sandbox",
    version = "1.0.0",
    description = "A custom taxes formula for orders."
)
public class TaxesUdf {

    public static final double TAXES = .12;

    @Udf(description = "Calculate taxes.")
    public double taxes(@UdfParameter double amount) {
        return amount * TAXES;
    }
}

Kafka Clients

Java examples for producing and consuming messages from Kafka using the java kafka client lib.

Avro Producer and Consumer

These examples produce and consume messages from the supplier topic. The producer example produces random suppliers.

⚠️ Run these commands inside the root folder.

Create an alias for kafka-clients:

alias kafka-clients="$PWD/kafka-clients/build/install/kafka-clients/bin/kafka-clients "

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias kafka-clients='$PWD/kafka-clients/build/install/kafka-clients/bin/kafka-clients '" >> ~/.zshrc

Create a topic:

kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
                       --replication-factor 3 \
                       --partitions 3 \
                       --topic kafka-clients.suppliers

Install the app:

./gradlew kafka-clients:install
kafka-clients

Run clients:

kafka-clients producer 100
kafka-clients consumer

For creating a AVRO schema, you can use the following command (development purposes):

./gradlew kafka-avro:build

Avro Schema

suppliers-v1.avsc

{
  "type": "record",
  "name": "Supplier",
  "namespace": "kafka.sandbox.avro",
  "version": "1",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": "string"
    },
    {
      "name": "country",
      "type": "string"
    }
  ]
}

Spring Boot

Spring Boot + Spring Kafka producer and consumer examples.

⚠️ Run these commands inside the root folder.

Run spring boot:

./gradlew kafka-spring-boot:bootRun

In another terminal:

http :8585/actuator/health
http :8585/produce messages==10

Streams

Kafka Streams is a client library providing organizations with a particularly efficient framework for processing streaming data. It offers a streamlined method for creating applications and microservices that must process data in real-time to be effective.

Check the Kafka Clients - Avro Producer and Consumer section.

⚠️ Run these commands inside the root folder.

Create an alias for kafka-streams:

alias kafka-streams="$PWD/kafka-streams/build/install/kafka-streams/bin/kafka-streams "

To permanently add the alias to your shell (~/.bashrc or ~/.zshrc file):

echo "alias kafka-streams='$PWD/kafka-streams/build/install/kafka-streams/bin/kafka-streams '" >> ~/.zshrc

Install the app:

./gradlew kafka-streams:install
kafka-streams

Run streams:

kafka-streams streams

Print results:

kafka-cli kafka-console-consumer --from-beginning --group kafka-streams.consumer \
                                 --topic kafka-streams.supplier_counts_by_country  \
                                 --bootstrap-server kafka1:9092 \
                                 --property print.key=true \
                                 --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Interactive Queries

Interactive Queries allow you to leverage the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable.

This example is using gRPC to request queries to the kafka stream server.

Query the total supplier count by a given country:

kafka-streams count <country>

Example:

kafka-streams count Ecuador

Output:

Country: Ecuador, Total Suppliers: 4

Take into account that you have to run the stream first. Check the Kafka Streams section.

Check the stream topology:

package kafka.sandbox.cli;

import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import kafka.sandbox.avro.Supplier;
import kafka.sandbox.grpc.CounterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import picocli.CommandLine;
import picocli.CommandLine.Command;

@Slf4j
@Command(name = "streams", description = "Creates and start kafka streams")
public class Streams implements Callable<Integer> {

    public static final String TOPIC_FROM = "kafka-clients.suppliers";
    public static final String TOPIC_TO = "kafka-streams.supplier_counts_by_country";
    private final Properties props;

    public Streams(Properties props) {
        this.props = props;
    }

    @Override
    public Integer call() throws Exception {
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        StreamsBuilder builder = new StreamsBuilder();

        // read from suppliers topic
        KStream<String, Supplier> suppliers = builder.stream(TOPIC_FROM);

        // aggregate the new supplier counts by country
        KTable<String, Long> aggregated = suppliers
                // map the country as key
                .map((key, value) -> new KeyValue<>(value.getCountry(), value))
                .groupByKey()
                // aggregate and materialize the store
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("SupplierCountByCountry"));

        // print results
        aggregated
                .toStream()
                .foreach((key, value) -> log.info("Country = {}, Total supplier counts = {}", key, value));

        // write the results to a topic
        aggregated.toStream().to(TOPIC_TO, Produced.with(stringSerde, longSerde));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp();

        // GRPC Server
        Server server = Grpc.newServerBuilderForPort(5050, InsecureServerCredentials.create())
                .addService(new CounterService(streams))
                .build();

        // attach shutdown handler to catch control-c and creating a latch
        CountDownLatch latch = new CountDownLatch(1);
        Runtime
                .getRuntime()
                .addShutdownHook(
                        new Thread("consumer-shutdown-hook") {
                            @Override
                            public void run() {
                                server.shutdownNow();
                                streams.close();
                                latch.countDown();
                            }
                        });

        streams.start();
        server.start();
        latch.await();

        return CommandLine.ExitCode.OK;
    }
}

Check the gRPC server:

package kafka.sandbox.grpc;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import io.grpc.stub.StreamObserver;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceImplBase;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;

public class CounterService extends CounterServiceImplBase {

    private KafkaStreams streams;

    public CounterService(KafkaStreams streams) {
        this.streams = streams;
    }

    @Override
    public void getCountByCountry(CountRequest request, StreamObserver<CountReply> responseObserver) {
        ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
                StoreQueryParameters.fromNameAndType("SupplierCountByCountry", QueryableStoreTypes.keyValueStore()));

        String country = request.getName();
        Long total = keyValueStore.get(country);
        String value = String.format("Country: %s, Total Suppliers: %s", country, total != null ? total : 0);
        CountReply reply = CountReply.newBuilder().setMessage(value).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

Check the gRPC client:

package kafka.sandbox.cli;

import java.util.concurrent.Callable;

import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
import kafka.sandbox.proto.CounterServiceGrpc;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceBlockingStub;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

@Slf4j
@Command(name = "count", description = "Print the total supplier count by a given country")
public class Count implements Callable<Integer> {

    @Parameters(index = "0", description = "Country (default: ${DEFAULT-VALUE})", defaultValue = "Ecuador")
    private String country;

    @Override
    public Integer call() throws Exception {
        ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
                .build();

        CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
        CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
        log.info(countByCountry.getMessage());

        return CommandLine.ExitCode.OK;
    }

}

Kafka Performance Tools

Performance tuning involves two important metrics:

  • Latency measures how long it takes to process one event.
  • Throughput measures how many events arrive within a specific amount of time.

Run help:

kafka-cli kafka-producer-perf-test --help
kafka-cli kafka-consumer-perf-test --help

Create a topic:

kafka-cli kafka-topics --create --bootstrap-server kafka1:9092 \
                       --replication-factor 3 \
                       --partitions 3 \
                       --topic kafka-cluster.performance-test

Performance Tests

Test producer:

kafka-cli kafka-producer-perf-test --topic kafka-cluster.performance-test \
                                   --throughput -1 \
                                   --num-records 3000000 \
                                   --record-size 1024 \
                                   --producer-props acks=all bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
  • Throughput in MB/sec.
  • Latency in milliseconds.

Test consumer:

kafka-cli kafka-consumer-perf-test --topic kafka-cluster.performance-test \
                                   --broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
                                   --messages 3000000
  • start.time, end.time: shows test start and end time.
  • data.consumed.in.MB: shows the size of all messages consumed.
  • MB.sec: shows how much data transferred in megabytes per second (Throughput on size).
  • data.consumed.in.nMsg: shows the count of the total messages consumed during this test.
  • nMsg.sec: shows how many messages were consumed in a second (Throughput on the count of messages).

Test end to end latency:

kafka-cli kafka-run-class kafka.tools.EndToEndLatency kafka1:9092,kafka2:9092,kafka3:9092 kafka-cluster.performance-test 10000 1 1024
  • This class records the average end to end latency for a single message to travel through Kafka.
  • Latency in milliseconds.

Kafka REST Proxy

The Kafka REST Proxy provides a RESTful interface to a Kafka cluster.

⚠️ Use this when you really need a rest interface since it is usually more complex than using conventional kafka clients.

Run Kafka REST Proxy:

cd kafka-rest
docker compose up -d
cd ..

After a few seconds:

http :8082/brokers

Create topics:

cd kafka-rest
http :8082/topics/kafka-rest.test Content-Type:application/vnd.kafka.json.v2+json records:='[{ "key": "test", "value": "test" }]'
http :8082/topics/kafka-rest.users Content-Type:application/vnd.kafka.avro.v2+json < requests/produce-avro-message.json
cd ..

Docker Compose

services:
  kafka-rest:
    image: confluentinc/cp-kafka-rest:${VERSION}
    environment:
      KAFKA_REST_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://schema-registry:8081
    ports:
      - "8082:8082"
    restart: on-failure
    healthcheck:
      test: curl http://localhost:8082
      interval: 30s
      timeout: 30s
      retries: 5
      start_period: 30s

networks:
  default:
    external: true
    name: kafka-sandbox_network

Requests

requests/produce-avro-message.json

{
  "value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}",
  "records": [
    {
      "value": {
        "name": "John Doe"
      }
    }
  ]
}

Kafka MQTT Proxy

MQTT Proxy enables MQTT clients to use the MQTT 3.1.1 protocol to publish data directly to Apache Kafka.

⚠️ This does not convert kafka into a MQTT broker, this aims to provide a simple way to publish/persist IoT data to Kafka.

Run Kafka MQTT Proxy:

cd kafka-mqtt
docker compose up -d
cd ..

Publish a message:

mqtt-cli pub -h kafka-mqtt -p 1884 -t 'house/room/temperature' -m '20C'

Consuming the data:

kafka-cli kafka-console-consumer --from-beginning --group kafka-mqtt.consumer \
                                 --topic kafka-mqtt.temperature  \
                                 --bootstrap-server kafka1:9092 \
                                 --property print.key=true

Docker Compose

services:
  kafka-mqtt:
    image: confluentinc/cp-kafka-mqtt:${VERSION}
    environment:
      KAFKA_MQTT_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
      KAFKA_MQTT_TOPIC_REGEX_LIST: kafka-mqtt.temperature:.*temperature
      KAFKA_MQTT_LISTENERS: 0.0.0.0:1884
    ports:
      - "1884:1884"
    restart: on-failure

networks:
  default:
    external: true
    name: kafka-sandbox_network

UI Ports Table

ServiceHostPort
AKHQlocalhost8080
Adminerlocalhost9090
Mongo Expresslocalhost7070

Component Ports Table

ServiceHostPort
MySQLmysql3306
MySQLlocalhost3306
PostgreSQLpostgres5432
PostgreSQLlocalhost5432
MongoDBmongo27017
MongoDBlocalhost27017
Mosquittomosquitto1883
Mosquittolocalhost1883
Kafka 1kafka19092
Kafka 1localhost19092
Kafka 2kafka29092
Kafka 2localhost29092
Kafka 3kafka39092
Kafka 3localhost39092
Kafka 1 JMXkafka119999
Kafka 1 JMXlocalhost19999
Kafka 2 JMXkafka229999
Kafka 2 JMXlocalhost29999
Kafka 3 JMXkafka339999
Kafka 3 JMXlocalhost39999
Schema Registryschema-registry8081
Schema Registrylocalhost8081
Kafka RESTkafka-rest8082
Kafka RESTlocalhost8082
Kafka Connectkafka-connect8083
Kafka Connectlocalhost8083
Kafka MQTTkafka-mqtt1884
Kafka MQTTlocalhost1884
ksqlDBksqldb8088
ksqlDBlocalhost8088
Kafka Clients Spring Bootlocalhost8585
Kafka Streams gRPC Serverlocalhost5050

About This Book

This book is power by mdBook.

GitHub Repository.

Developing Commands

You must install rust first.

Install mdbook:

cargo install mdbook

Run local server:

mdbook serve --open

Build statics:

mdbook build

Using Docker

Create docker image:

docker build -t sauljabin/kafka-sandbox-book:latest -f docker/Dockerfile .

Running the book (open it in the web browser):

docker run --name kafka-sandbox-book -d -p 80:80 sauljabin/kafka-sandbox-book:latest