Introduction

Kafka Sandbox helps you to deploy a kafka sandbox locally. It intends to be a simple way to get started with kafka and help you on your learning path. It provides you with a wide variety of tools from the kafka ecosystem and a simple way to run them all. It also includes a set of tools and tips to make it easier for you to use kafka. This is not intended to replace official courses.

note

This repository was set up for development and learning purposes. It does not include security since it is not a production system.

Dependencies

Quick Start

Clone the repo:

git clone https://github.com/sauljabin/kafka-sandbox.git
cd kafka-sandbox

Run the kafka cluster:

docker compose up -d

Check running services:

docker compose ps

Open AKHQ (a web UI for kafka) at http://localhost:8080/.

Sandbox Environment

For opening the sandbox environment just run:

docker compose exec cli bash

important

You will use this environment throughout the sandbox. Remember to open it before executing a command different to docker compose ....

What is Kafka?

Create a Topic

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic sandbox.test

List Topics

kafka-topics --list --bootstrap-server kafka1:9092

At this point you must understand that the most important concept for you is the partition. You will be tempted to focus on the topic concept, but you should focus on the partition. This is due to the very nature of a distributed system.

Consuming and Producing

Produce Messages

kafka-console-producer --bootstrap-server kafka1:9092 \
                       --topic sandbox.test

Consume Messages

kafka-console-consumer --from-beginning \
                       --bootstrap-server kafka1:9092 \
                       --group sandbox.test \
                       --topic sandbox.test

It is key to know that kafka stores binary data, it does not care if the internal serialization of the data represents a character string, image, number, or even if the data is encrypted or plain.

All the serialization/deserialization process occurs on the client side, when producing or consuming.

Producing and Consuming Natives

Now we are going to develop consumers and producer with java.

Create All the Topics

for topic in "client.string" "client.integer" "client.long" "client.float" "client.double" "client.boolean" ; do \
kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic $topic ; done

Produce

This code example shows you how to produce a message. The configuration ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG define what type are we using.

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer());
KafkaProducer<String, V> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
    V value = createValue();
    ProducerRecord<String, V> record = new ProducerRecord<>(
        topic,
        value
    );
    producer.send(
        record,
        (metadata, exception) -> log.info("Producing message: {}", value)
    );
}

Then, produce for each type:

for type in "string" "integer" "long" "float" "double" "boolean" ; do \
  gradle kafka-native-clients:run --args="produce client.$type $type 100" ; done

Consume

It's going to be the same for the consumer, but in this case you have to use the deserializer: ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG.

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getDeserializer());
KafkaConsumer<String, V> consumer = new KafkaConsumer<>(props);

ConsumerRecords<String, V> records = consumer.poll(Duration.ofMillis(500));

for (ConsumerRecord<String, V> record : records) {
    log.info("Supplier ID: {}", record.value());
}

Consume a topic, for example client.string:

gradle kafka-native-clients:run --args="consume client.string string"

Message Schemas

In this sandbox Schema Registry is already running, we are going to use it in some examples. Schema Registry will allow you to create applications that share the same data structure when they communicate with each other.

This service could be key for your productions services, especially in a microservices ecosystems. Although, you will find that evolving a schema could be a problem, particularly when in your firsts stages of a new services.

It is important to know that Schema Registry is not a plugin or a kafka extension. It is a separated services running in its own machine.

So, the concept Schema Evolution and Compatibility will become essential for you. Check this out: Schema Evolution and Compatibility, there you will find the compatibility strategies for multiple use cases.

Additionally, here you will find a lot of information about Schema Registry: Getting Started with Schemas and Schema Registries.

What is Kafka Connect?

It is worth mentioning that just like Schema Registry, Kafka Connect is an external and independent service to Kafka.

Kafka Connect will be very useful when you want to have constant flow of data from a data store (source) to another (sink).

Kafka Connect works with plugin, you can find several connector on Confluent Hub. In this sandbox we have installed 3 plugins:

Plugins above can be found at:

ls kafka-connect/plugins

Kafka Connect Database Example

In this example you are going to learn how to move data from a source (mysql), to a synk (postgres).

important

This example does not support deletion, for that you have to implement tombstone events at the source and sink.

Populate Database

Run MySQL and PostgreSQL:

docker compose --profile sql up -d

Populate it:

mysql --host=mysql --user=root --database=sandbox < kafka-connect/sql/customers.sql

That command should have created the table customers and inserted 200 records.

Now you can open Adminer or run:

mysql --host=mysql --user=root --database=sandbox -e "select * from customers"

Create Source Connector

Check the installed plugins:

http kafka-connect:8083/connector-plugins

Now you have to hit the kafka connect rest service to create a new source, next you have the rest payload:

{
  "name": "mysql-source",
  "config": {
    "tasks.max": "1",
    "table.whitelist": "customers",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://mysql:3306/sandbox",
    "connection.user": "root",
    "connection.password": "notasecret",
    "mode": "timestamp",
    "timestamp.column.name": "created",
    "topic.prefix": "connect.",
    "transforms": "createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id"
  }
}

Create the connector using the API:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mysql-source.json

If you open AKHQ you should see a new topic: connect.customers.

Create Sink Connector

Payload:

{
  "name": "postgres-sink",
  "config": {
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/sandbox",
    "connection.user": "postgres",
    "connection.password": "notasecret",
    "delete.enabled": false,
    "pk.mode": "record_key",
    "pk.key": "id",
    "insert.mode": "upsert",
    "auto.create": true,
    "topics": "connect.customers",
    "transforms": "dropPrefix",
    "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.dropPrefix.regex": "connect\\.(.*)",
    "transforms.dropPrefix.replacement": "$1"
  }
}

Create sink connector:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-postgres-sink.json

This sink connector is going to create a table customers on postgres and insert all records.

Now you can open Adminer or run:

psql --host=postgres --user=postgres --dbname=sandbox -c "select * from customers"

List connector:

http kafka-connect:8083/connectors

Deleting Connectors

http DELETE kafka-connect:8083/connectors/postgres-sink
http DELETE kafka-connect:8083/connectors/mysql-source

Kafka Connect MQTT Example

Setup Broker

Start MQTT server:

docker compose --profile mqtt up -d

In one terminal, subscribe to mqtt topics:

mosquitto_sub -h mosquitto -t "house/+/brightness"

In another terminal, publish messages:

mosquitto_pub -h mosquitto -t "house/room/brightness" -m "800LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1000LM"

Create Source Connector

Payload:

{
  "name": "mqtt-source",
  "config": {
      "connector.class": "io.confluent.connect.mqtt.MqttSourceConnector",
      "tasks.max": "1",
      "mqtt.server.uri": "tcp://mosquitto:1883",
      "mqtt.topics":"house/+/brightness",
      "kafka.topic":"connect.brightness",
      "mqtt.qos": "2",
      "confluent.topic.bootstrap.servers": "kafka1:9092",
      "confluent.topic.replication.factor": "1"
  }
}

Create a connector using the API:

http kafka-connect:8083/connectors < kafka-connect/requests/create-connector-mqtt-source.json

In one terminal, consume from kafka:

kafka-console-consumer --from-beginning --group connect.mqtt \
                       --topic connect.brightness  \
                       --bootstrap-server kafka1:9092 \
                       --property print.key=true

In another terminal, publish new messages to the MQTT broker:

mosquitto_pub -h mosquitto -t "house/room/brightness" -m "810LM"
mosquitto_pub -h mosquitto -t "house/kitchen/brightness" -m "1020LM"

Deleting the connector:

http DELETE kafka-connect:8083/connectors/mqtt-source

Performance Tools

Performance tuning involves two important metrics:

  • Latency measures how long it takes to process one event.
  • Throughput measures how many events arrive within a specific amount of time.

Run help:

kafka-producer-perf-test --help
kafka-consumer-perf-test --help

Create a topic:

kafka-topics --create \
             --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic sandbox.performance

Performance Tests

Test producer (confluent doc):

kafka-producer-perf-test --topic sandbox.performance \
                         --throughput -1 \
                         --num-records 3000000 \
                         --record-size 1024 \
                         --producer-props acks=all bootstrap.servers=kafka1:9092
  • Throughput in MB/sec.
  • Latency in milliseconds.

Test consumer (confluent doc):

kafka-consumer-perf-test --topic sandbox.performance \
                         --bootstrap-server kafka1:9092 \
                         --messages 3000000
  • start.time, end.time: shows test start and end time.
  • data.consumed.in.MB: shows the size of all messages consumed.
  • MB.sec: shows how much data transferred in megabytes per second (Throughput on size).
  • data.consumed.in.nMsg: shows the count of the total messages consumed during this test.
  • nMsg.sec: shows how many messages were consumed in a second (Throughput on the count of messages).

Test end to end latency (confluent doc):

kafka-e2e-latency kafka1:9092 sandbox.performance 10000 all 1024

Following are the required arguments:

  • broker_list: The location of the bootstrap broker for both the producer and the consumer.
  • topic: The topic name used by both the producer and the consumer to send/receive messages.
  • num_messages: The number of messages to send
  • producer_acks: The producer setting for acks.
  • message_size_bytes: size of each message in bytes.

Kafka Proxies

The kafka proxies are tools that simplify the communication between a component and kafka through a non-kafka protocol.

Confluent provides a couple of proxies. It is important to mention that these tools are used when the use case is simple enough. Generally when you need a lot of control over clients, using proxies is not recommended.

Kafka REST Proxy

The Kafka REST Proxy provides a RESTful interface to a Kafka cluster. Use this when you really need a rest interface since it is usually more complex than using conventional kafka clients. You can check the API reference here.

Setup

Run Kafka REST Proxy:

docker compose --profile proxies up -d

Check the cluster information:

http kafka-rest:8082/v3/clusters

Create Topic

Payload:

{
  "topic_name": "proxy.rest",
  "partitions_count": 3,
  "replication_factor": 3
}

Hit rest proxy:

http kafka-rest:8082/v3/clusters/${CLUSTER_ID}/topics < kafka-rest/requests/create-topic.json

List topics:

http kafka-rest:8082/v3/clusters/${CLUSTER_ID}/topics | jq -r '.data[].topic_name'

Produce

Payload:

{
  "key": {
    "type": "STRING",
    "data": "ce59e9a6-aafd-44f4-bdc6-f285adfcc836"
  },
  "value": {
    "type": "AVRO",
    "schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}",
    "data": {
      "name": "John Doe"
    }
  }
}

Send payload:

http kafka-rest:8082/v3/clusters/${CLUSTER_ID}/topics/proxy.rest/records < kafka-rest/requests/produce-avro-message.json

Consume

kafka-avro-console-consumer --bootstrap-server kafka1:9092 \
        --topic proxy.rest \
        --from-beginning \
        --property schema.registry.url=http://schema-registry:8081

Kafka MQTT Proxy

MQTT Proxy enables MQTT clients to use the MQTT protocol to publish data directly to Apache Kafka. This does not convert kafka into a MQTT broker, this aims to provide a simple way to publish/persist IoT data to Kafka.

Setup

Run Kafka MQTT Proxy:

docker compose --profile proxies up -d

Publish Messages

Create topic:

kafka-topics --create \
             --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic proxy.mqtt

Publish using mqtt proxy:

mosquitto_pub -h kafka-mqtt -p 1884 -t "house/room/temperature" -m "20C"

Check the data:

kafka-console-consumer --from-beginning \
                       --bootstrap-server kafka1:9092 \
                       --group proxy.mqtt \
                       --topic proxy.mqtt  \
                       --property print.key=true

JSON Producer and Consumer

This example shows you how to use json and schema registry for producing and consuming.

POJO

The first step is to create the structure:

package kafka.sandbox.cli;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class User {
    @JsonProperty
    public String id;

    @JsonProperty
    public String firstName;

    @JsonProperty
    public String lastName;

    @JsonProperty
    public String address;

    @JsonProperty
    public int age;
}

Confluent java library uses jackson annotations.

Configurations

It is possible to produce with or without Schema Registry. It'll depend on the configurations.

Producer:

if (useSchemaRegistry) {
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
}

Consumer:

if (useSchemaRegistry) {
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
}

Setup

Create a topic to produce json messages without Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.users

Create a topic to produce json messages with Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.schema.users

Produce/Consume without Schema Registry

gradle kafka-json-clients:run --args="produce client.users 100"
gradle kafka-json-clients:run --args="consume client.users"

Produce/Consume with Schema Registry

gradle kafka-json-clients:run --args="produce -s client.schema.users 100"
gradle kafka-json-clients:run --args="consume -s client.schema.users"

Avro Producer and Consumer

Next, you will see how to serialize/deserialize messages using avro schemas.

Avro Schema

Avro allows us to serialize/deserialize messages. Here it's our example avro schema:

{
  "type": "record",
  "name": "Supplier",
  "namespace": "kafka.sandbox.avro",
  "version": "1",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": "string"
    },
    {
      "name": "country",
      "type": "string"
    }
  ]
}

Create Topic

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.suppliers

Produce

As you can see now we are using the autogenerated class Supplier.

KafkaProducer<String, Supplier> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
    Supplier supplier = createNew();
    ProducerRecord<String, Supplier> record = new ProducerRecord<>(
        topic,
        supplier.getId().toString(),
        supplier
    );
    producer.send(
        record,
        (metadata, exception) -> log.info("Producing message: {}", supplier)
    );
}
gradle kafka-avro-clients:run --args="produce client.suppliers 100"

Consume

It is the same for the consumer, we are using the avro class Supplier.

ConsumerRecords<String, Supplier> records = consumer.poll(Duration.ofMillis(500));

for (ConsumerRecord<String, Supplier> record : records) {
    log.info("Supplier ID: {}", record.key());
}
gradle kafka-avro-clients:run --args="consume client.suppliers"

Avro Union

These example show you how to use Unions.

Avro Schema

In this schema we create a field metric that can be a TimerMetric or CounterMetric.

{
  "type": "record",
  "name": "Metric",
  "namespace": "kafka.sandbox.avro",
  "version": "1",
  "fields": [
    {
      "name": "metricId",
      "type": "string"
    },
    {
      "name": "metricType",
      "type": {
        "type": "enum",
        "name": "MetricType",
        "symbols": [
          "TIMER",
          "COUNTER"
        ],
        "default": "TIMER"
      }
    },
    {
      "name": "metric",
      "type": [
        {
          "type": "record",
          "name": "TimerMetric",
          "namespace": "kafka.sandbox.avro",
          "version": "1",
          "fields": [
            {
              "name": "avg",
              "type": "double"
            }
          ]
        },
        {
          "type": "record",
          "name": "CounterMetric",
          "namespace": "kafka.sandbox.avro",
          "version": "1",
          "fields": [
            {
              "name": "count",
              "type": "long"
            }
          ]
        }
      ]
    }
  ]
}

The Metric java class will define an object (for the metric field) instead of a specific type (TimerMetric or CounterMetric).

private java.lang.Object metric;

Setup

Create a topic:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.metrics

Produce

gradle kafka-avro-union-clients:run --args="produce client.metrics 100"

Consume

gradle kafka-avro-union-clients:run --args="consume client.metrics"

Protobuf Producer and Consumer

Next, you will see how to serialize/deserialize messages using protobuf.

Protobuf Schema

Protobuf allows us to serialize/deserialize messages.

syntax = "proto3";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";

enum InvoiceStatus {
  PAID = 0;
  PENDING = 1;
}

message Invoice {
  string id = 1;
  google.protobuf.Timestamp created_at = 3;
  Customer customer = 4;
  repeated Product products = 5;
  InvoiceStatus status = 6;
}

message Product {
  string id = 1;
  string name = 2;
  string code = 3;
  double price = 4;
}

message Customer {
  string id = 1;
  string firstName = 2;
  string lastName = 3;
  Address address = 4;
}

message Address {
  string city = 1;
  string zipCode = 2;
  string street = 3;
}

Configurations

It is possible to produce with or without Schema Registry. It'll depend on the configurations.

Producer:

if (useSchemaRegistry) {
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
} else {
    // ProtobufSerializer is a custom class
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class);
}

Consumer:

if (useSchemaRegistry) {
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    // pass the confluent protobuf deserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
    // you have to pass the protobuf class
    props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Invoice.class);
} else {
    // ProtobufDeserializer is a custom class
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class);
    // here we pass a configuration to the custom deserializer
    props.put(ProtobufDeserializer.PROTOBUF_PARSER, Invoice.parser());
}

Setup

Create a topic to produce protobuf messages without Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.invoices

Create a topic to produce protobuf messages with Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.schema.invoices

Produce/Consume without Schema Registry

gradle kafka-protobuf-clients:run --args="produce client.invoices 100"
gradle kafka-protobuf-clients:run --args="consume client.invoices"

Produce/Consume with Schema Registry

gradle kafka-protobuf-clients:run --args="produce -s client.schema.invoices 100"
gradle kafka-protobuf-clients:run --args="consume -s client.schema.invoices"

Protobuf Oneof

More at https://protobuf.dev/reference/java/java-generated/#oneof-fields.

Protobuf Schema

Here you can see that the Measurement has an oneof field:

syntax = "proto3";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";

enum SensorStatus {
  UP = 0;
  ERROR = 1;
}

message Sensor {
  string id = 1;
  SensorStatus status = 2;
}

message Environment {
  double temperature = 1;
  double humidity = 2;
}

message Speed {
  int32 wheel_rpm = 1;
  double speed = 2;
}

message Measurement {
  oneof value {
    Environment environment = 1;
    Speed speed = 2;
  }
  google.protobuf.Timestamp created_at = 3;
  Sensor sensor = 4;
}

Setup

Create a topic:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.measurements

Produce

gradle kafka-protobuf-oneof-clients:run --args="produce client.measurements 100"

Consume

You can verify which value is inside (speed or environment) with record.value().getValueCase().

gradle kafka-protobuf-oneof-clients:run --args="consume client.measurements"

Spring Boot

Spring Boot + Spring Kafka producer and consumer examples.

Setup

Run spring boot:

gradle kafka-spring-boot:bootRun

Produce

Spring has the class KafkaTemplate that allows you to produce messages.

@Value("${spring.kafka.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Customer> kafkaTemplate;

public void sendCustomer(Customer customer) {
    log.info("Producing message: {}", customer);
    kafkaTemplate.send(topic, customer.getId().toString(), customer);
}

In another terminal:

http :8585/produce messages==10

Consume

You can use the KafkaListener annotation.

@KafkaListener(topics = { "${spring.kafka.topic}" })
public void consume(ConsumerRecord<String, Customer> record) {
    log.info("Customer ID: {}", record.value());
}

Message Delivery Guarantees

Message Delivery Guarantees is an important concept in the kafka ecosystem.

There are three types of message sharing:

  1. At most once: Messages are delivered once, and if there is a system failure, messages may be lost and are not redelivered.
  2. At least once: This means messages are delivered one or more times. If there is a system failure, messages are never lost, but they may be delivered more than once.
  3. Exactly once: This is the preferred behavior in that each message is delivered once and only once. Messages are never lost or read twice even if some part of the system fails.

More at Message Delivery Guarantees.

Setup

Produce some messages:

gradle kafka-json-clients:run --args="produce client.users 100"

At most once in consumers

Offsets are committed as soon as the message is received. If the processing goes wrong, the message will be lost (it won’t be read again).

private void atMostOnce() {
    while (true) {
        ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(500));

        // offsets are committed as soon as the message is received
        consumer.commitSync();

        for (ConsumerRecord<String, User> record : records) {
            try {
                businessLogic(record);
            } catch (Exception e) {
                // the exception is ignored and the message is lost
                log.warn("There was an error but it was ignored because this is at-most-once");
            }
        }
    }
}

Consume:

gradle kafka-delivery-guarantees-clients:run --args="consume client.users -s at-most-once"

At least once in consumers

Offsets are committed after the message is processed. If the processing goes wrong, the message will be read again. This can result in duplicate processing of messages. Make sure your processing is idempotent.

More about idempotency in system at Idempotency in Event-driven systems.

private void atLeastOnce() {
    while (true) {
        ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(500));

        for (ConsumerRecord<String, User> record : records) {
            try {
                businessLogic(record);

                // offsets after the message is processed
                consumer.commitSync(Map.of(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
            } catch (Exception e) {
                log.error("There was an error processing a message: ", e);

                // implement recovery and restart (kubernetes), dead-letter queue, etc

                // throw the exception up
                throw e;
            }
        }

    }
}

Consume:

gradle kafka-delivery-guarantees-clients:run --args="consume client.users -s at-least-once"

Exactly once in consumers

It's important to say that kafka support exactly once semantic through transaction. Keep in mind that transactions in Kafka were developed specifically for stream processing applications. And therefore they were built to work with the consume-process-produce pattern that forms the basis of stream processing applications.

Kafka does not support exactly once when:

  • Publish/subscribe pattern (common producer and consumer)
  • Reading from a Kafka topic and writing to a database
  • Reading data from a database, writing to Kafka, and from there writing to another database
  • Copying data from one Kafka cluster to another
  • Any other side effects while stream processing (calling a REST API, writing to a file, sending an email, etc)

tip

If you want to build a stream processing application (sum, max, min, count, map, filter, etc) it is highly recommended to use kafka stream.

warning

Is highly recommended to read the chapter 8 of Kafka: The Definitive Guide, 2nd Edition.

note

Microservices often need to update the database and publish a message to Kafka within a single atomic transaction, so either both will happen or neither will... Kafka transactions will not do this. A common solution to this common problem is known as the outbox pattern. The microservice only publishes the message to a Kafka topic (the outbox), and a separate message relay service reads the event from Kafka and updates the database. Because... Kafka won’t guarantee an exactly-once update to the database, it is important to make sure the update is idempotent.

Kafka: The Definitive Guide, 2nd Edition

More about the outbox pattern at: Reliable Microservices Data Exchange With the Outbox Pattern.

Kafka Streams

Kafka Streams is a client library providing organizations with a particularly efficient framework for processing streaming data. It offers a streamlined method for creating applications and microservices that must process data in real-time to be effective.

In this example we are going to create a data processing pipeline (topology), which is going to count the suppliers by country (check the Kafka Clients - Avro Producer and Consumer section). So, kafka stream is going to consume form the client.suppliers topic, then is going to process the data and finally is going to publish the results in the topic streams.results.

// read from suppliers topic
KStream<String, Supplier> suppliers = builder.stream(source);

// aggregate the new supplier counts by country
KTable<String, Long> aggregated = suppliers
        // map the country as key
        .map((key, value) -> new KeyValue<>(value.getCountry().toString(), value))
        .groupByKey()
        // aggregate and materialize the store
        .count(Materialized.as("SupplierCountByCountry"));

// write the results to a topic
aggregated.toStream()
        // print results
        .peek((key, value) -> log.info("Country = {}, Total supplier counts = {}", key, value))
        // publish results
        .to(sink, Produced.with(Serdes.String(), Serdes.Long()));

// build the topology
Topology topology = builder.build();

Run Topology

gradle kafka-streams:run --args="streams client.suppliers streams.results"

Print results (in another terminal):

kafka-console-consumer --from-beginning --group kafka-streams.consumer \
                       --topic streams.results  \
                       --bootstrap-server kafka1:9092 \
                       --property print.key=true \
                       --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Send new suppliers (in another terminal):

gradle kafka-avro-clients:run --args="produce client.suppliers 100"

Interactive Queries

Interactive Queries allow you to leverage the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable.

This example is using gRPC to request queries to the kafka stream server.

Query Results

Having kafka streams running (see Kafka Streams), you can query form the terminal a specific country, example:

gradle -q kafka-streams:run --args="count Ecuador"

This is possible because we have access to the kafka streams stores in runtime. This way we can query over the store a return and response.

Check the gRPC server:

ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
        StoreQueryParameters.fromNameAndType("SupplierCountByCountry", QueryableStoreTypes.keyValueStore()));

String country = request.getName();
Long total = keyValueStore.get(country);
String value = String.format("Country: %s, Total Suppliers: %s", country, total != null ? total : 0);
CountReply reply = CountReply.newBuilder().setMessage(value).build();
responseObserver.onNext(reply);

Check the gRPC client:

ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
                .build();

CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
System.out.println(countByCountry.getMessage());

What is ksqlDB?

ksqlDB is a database that's purpose-built for stream processing applications. ksqlDB it is not a SQL database, it provides an extra layer for implementing kstream, ktable and connectors through a language (ksql) based on sql.

Run ksqlDB

Check if it's up:

http ksqldb:8088/info

One line shell interaction:

ksql -e "SHOW STREAMS;" http://ksqldb:8088

Interactive ksqlDB shell:

ksql http://ksqldb:8088

Then enter SHOW STREAMS;.

ksqlDB Extensions

ksqlDB extensions are pieces of logic for transforming or aggregating events that ksqlDB can't currently express.

Check the Kafka ksqlDB section.

For creating the jar extension, you can use the following command (development purposes):

gradle kafka-ksqldb-extensions:shadowJar

Custom UDF

package kafka.sandbox.ksqldb;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(
    name = "taxes",
    author = "kafka sandbox",
    version = "1.0.0",
    description = "A custom taxes formula for orders."
)
public class TaxesUdf {

    public static final double TAXES = .12;

    @Udf(description = "Calculate taxes.")
    public double taxes(@UdfParameter double amount) {
        return amount * TAXES;
    }
}

ksqlDB Queries

Create a Stream

Create orders stream:

ksql -f kafka-ksqldb/ksql/create-orders.ksql http://ksqldb:8088

The previous command executed this:

CREATE STREAM orders (orderId INT KEY, orderUnits INT, totalAmount DOUBLE)
  WITH (kafka_topic='ksqldb.orders', key_format='json', value_format='json', partitions=10, replicas=3);

CREATE STREAM orderSizes 
  WITH (kafka_topic='ksqldb.order_sizes', key_format='json', value_format='json', partitions=10, replicas=3)
  AS SELECT
    orderId,
    orderUnits,
    totalAmount,
    CASE WHEN orderUnits < 2 THEN 'small' WHEN orderUnits < 4 THEN 'medium' ELSE 'large' END AS orderSize,
    taxes(totalAmount) AS tax
  FROM orders EMIT CHANGES;

List of streams:

ksql -e "SHOW STREAMS;" http://ksqldb:8088

Insert

As any other SQL interpreter, ksqlDB will use the command INSERT to populate a table.

INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000000, 2, 100.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000001, 4, 200.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000002, 6, 300.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000003, 3, 150.0);
INSERT INTO orders (orderId, orderUnits, totalAmount) VALUES (1000004, 1, 50.0);

Insert orders:

ksql -f kafka-ksqldb/ksql/insert-orders.ksql http://ksqldb:8088

Show

ksql -e "PRINT 'ksqldb.order_sizes' FROM BEGINNING;" http://ksqldb:8088

Drop

ksql -e "DROP STREAM ORDERSIZES;" http://ksqldb:8088
ksql -e "DROP STREAM ORDERS;" http://ksqldb:8088

ksqlDB Tests

One interesting feature that ksqlDB has is the test runner, it allows you to test a query before deploying it.

Run a Test

ksql-test-runner -e kafka-ksqldb-extensions/extensions/ \
        -s kafka-ksqldb/ksql/create-orders.ksql \
        -i kafka-ksqldb/tests/orders-input.json \
        -o kafka-ksqldb/tests/orders-output.json | grep ">>>"

Tests

A test has 2 parts, inputs and outputs, the ksqldb test runner will compare then to define if the test passes or fails.

orders-input.json

{
  "inputs": [
    {"topic": "ksqldb.orders", "timestamp": 0, "key": 1000000, "value": {"orderUnits": 2, "totalAmount": 100.0}},
    {"topic": "ksqldb.orders", "timestamp": 0, "key": 1000001, "value": {"orderUnits": 4, "totalAmount": 200.0}},
    {"topic": "ksqldb.orders", "timestamp": 0, "key": 1000002, "value": {"orderUnits": 6, "totalAmount": 300.0}},
    {"topic": "ksqldb.orders", "timestamp": 0, "key": 1000003, "value": {"orderUnits": 3, "totalAmount": 150.0}},
    {"topic": "ksqldb.orders", "timestamp": 0, "key": 1000004, "value": {"orderUnits": 1, "totalAmount": 50.0}}
  ]
}

orders-output.json

{
  "outputs": [
    {"topic": "ksqldb.order_sizes", "timestamp": 0, "key": 1000000, "value": {"ORDERUNITS": 2, "TOTALAMOUNT": 100.0, "ORDERSIZE": "medium", "TAX": 12.0}},
    {"topic": "ksqldb.order_sizes", "timestamp": 0, "key": 1000001, "value": {"ORDERUNITS": 4, "TOTALAMOUNT": 200.0, "ORDERSIZE": "large" , "TAX": 24.0}},
    {"topic": "ksqldb.order_sizes", "timestamp": 0, "key": 1000002, "value": {"ORDERUNITS": 6, "TOTALAMOUNT": 300.0, "ORDERSIZE": "large" , "TAX": 36.0}},
    {"topic": "ksqldb.order_sizes", "timestamp": 0, "key": 1000003, "value": {"ORDERUNITS": 3, "TOTALAMOUNT": 150.0, "ORDERSIZE": "medium", "TAX": 18.0}},
    {"topic": "ksqldb.order_sizes", "timestamp": 0, "key": 1000004, "value": {"ORDERUNITS": 1, "TOTALAMOUNT": 50.0, "ORDERSIZE": "small" , "TAX": 6.0}}
  ]
}

Cleanup

Shutting down all services:

docker compose --profile proxies --profile sql --profile mqtt down

tip

If you want to remove the data pass -v at the end of the previous command.

Ports

ServicePort
AKHQ Kafka UI8080
Adminer SQL UI9090
MySQL3306
PostgreSQL5432
Mosquitto1883
Kafka 119092
Kafka 229092
Kafka 339092
Schema Registry8081
Kafka REST8082
Kafka Connect8083
Kafka MQTT1884
ksqlDB8088
Kafka Clients Spring Boot8585
Kafka Streams gRPC Server5050

About this Book

You can access this mdbook at: