Avro Producer and Consumer
Next, you will see how to serialize/deserialize messages using avro schemas.
Other Links
Avro Schema
Avro allows us to serialize/deserialize messages. Here it's our example avro schema:
{
"type": "record",
"name": "Supplier",
"namespace": "kafka.sandbox.avro",
"version": "1",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "address",
"type": "string"
},
{
"name": "country",
"type": "string"
}
]
}
Create Topic
kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic client.suppliers
Produce
As you can see now we are using the autogenerated class Supplier
.
KafkaProducer<String, Supplier> producer = new KafkaProducer<>(props);
for (int i = 0; i < messages; i++) {
Supplier supplier = createNew();
ProducerRecord<String, Supplier> record = new ProducerRecord<>(
topic,
supplier.getId().toString(),
supplier
);
producer.send(
record,
(metadata, exception) -> log.info("Producing message: {}", supplier)
);
}
gradle kafka-avro-clients:run --args="produce client.suppliers 100"
Consume
It is the same for the consumer, we are using the avro class Supplier
.
ConsumerRecords<String, Supplier> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, Supplier> record : records) {
log.info("Supplier ID: {}", record.key());
}
gradle kafka-avro-clients:run --args="consume client.suppliers"