Protobuf Producer and Consumer

Next, you will see how to serialize/deserialize messages using protobuf.

Protobuf Schema

Protobuf allows us to serialize/deserialize messages.

syntax = "proto3"; import "google/protobuf/timestamp.proto"; option java_multiple_files = true; option java_package = "kafka.sandbox.proto"; enum InvoiceStatus { PAID = 0; PENDING = 1; } message Invoice { string id = 1; google.protobuf.Timestamp created_at = 3; Customer customer = 4; repeated Product products = 5; InvoiceStatus status = 6; } message Product { string id = 1; string name = 2; string code = 3; double price = 4; } message Customer { string id = 1; string firstName = 2; string lastName = 3; Address address = 4; } message Address { string city = 1; string zipCode = 2; string street = 3; }

Configurations

It is possible to produce with or without Schema Registry. It'll depend on the configurations.

Producer:

if (useSchemaRegistry) { props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class); } else { // ProtobufSerializer is a custom class props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class); }

Consumer:

if (useSchemaRegistry) { props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); // pass the confluent protobuf deserializer props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); // you have to pass the protobuf class props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Invoice.class); } else { // ProtobufDeserializer is a custom class props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class); // here we pass a configuration to the custom deserializer props.put(ProtobufDeserializer.PROTOBUF_PARSER, Invoice.parser()); }

Setup

Create a topic to produce protobuf messages without Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \ --replication-factor 3 \ --partitions 3 \ --topic client.invoices

Create a topic to produce protobuf messages with Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \ --replication-factor 3 \ --partitions 3 \ --topic client.schema.invoices

Produce/Consume without Schema Registry

gradle kafka-protobuf-clients:run --args="produce client.invoices 100" gradle kafka-protobuf-clients:run --args="consume client.invoices"

Produce/Consume with Schema Registry

gradle kafka-protobuf-clients:run --args="produce -s client.schema.invoices 100" gradle kafka-protobuf-clients:run --args="consume -s client.schema.invoices"