JSON Producer and Consumer
This example shows you how to use json and schema registry for producing and consuming.
Other Links
POJO
The first step is to create the structure:
package kafka.sandbox.cli;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class User {
@JsonProperty
public String id;
@JsonProperty
public String firstName;
@JsonProperty
public String lastName;
@JsonProperty
public String address;
@JsonProperty
public int age;
}
Confluent java library uses jackson annotations.
Configurations
It is possible to produce with or without Schema Registry. It'll depend on the configurations.
Producer:
if (useSchemaRegistry) {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSchemaSerializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
}
Consumer:
if (useSchemaRegistry) {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonSchemaDeserializer.class);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
} else {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaJsonDeserializer.class);
}
Setup
Create a topic to produce json messages without Schema Registry:
kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic client.users
Create a topic to produce json messages with Schema Registry:
kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic client.schema.users
Produce/Consume without Schema Registry
gradle kafka-json-clients:run --args="produce client.users 100"
gradle kafka-json-clients:run --args="consume client.users"
Produce/Consume with Schema Registry
gradle kafka-json-clients:run --args="produce -s client.schema.users 100"
gradle kafka-json-clients:run --args="consume -s client.schema.users"