JSON Producer and Consumer

This example shows you how to use json and schema registry for producing and consuming.

POJO

The first step is to create the structure:

package kafka.sandbox.cli;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class User {
    @JsonProperty
    public String id;

    @JsonProperty
    public String firstName;

    @JsonProperty
    public String lastName;

    @JsonProperty
    public String address;

    @JsonProperty
    public int age;
}

Confluent java library uses jackson annotations.

Configurations

It is possible to produce with or without Schema Registry. It'll depend on the configurations.

Producer:

if (useSchemaRegistry) {
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
}

Consumer:

if (useSchemaRegistry) {
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
}

Setup

Create a topic to produce json messages without Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.users

Create a topic to produce json messages with Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.schema.users

Produce/Consume without Schema Registry

gradle kafka-json-clients:run --args="produce client.users 100"
gradle kafka-json-clients:run --args="consume client.users"

Produce/Consume with Schema Registry

gradle kafka-json-clients:run --args="produce -s client.schema.users 100"
gradle kafka-json-clients:run --args="consume -s client.schema.users"