Avro Producer and Consumer

Next, you will see how to serialize/deserialize messages using avro schemas.

Avro Schema

Avro allows us to serialize/deserialize messages. Here it's our example avro schema:

{
  "type": "record",
  "name": "Supplier",
  "namespace": "kafka.sandbox.avro",
  "version": "1",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "address",
      "type": "string"
    },
    {
      "name": "country",
      "type": "string"
    }
  ]
}

Create Topic

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.suppliers

Produce

As you can see now we are using the autogenerated class Supplier.

KafkaProducer<String, Supplier> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
    Supplier supplier = createNew();
    ProducerRecord<String, Supplier> record = new ProducerRecord<>(
        topic,
        supplier.getId().toString(),
        supplier
    );
    producer.send(
        record,
        (metadata, exception) -> log.info("Producing message: {}", supplier)
    );
}
gradle kafka-avro-clients:run --args="produce client.suppliers 100"

Consume

It is the same for the consumer, we are using the avro class Supplier.

ConsumerRecords<String, Supplier> records = consumer.poll(Duration.ofMillis(500));

for (ConsumerRecord<String, Supplier> record : records) {
    log.info("Supplier ID: {}", record.key());
}
gradle kafka-avro-clients:run --args="consume client.suppliers"