Protobuf Producer and Consumer

Next, you will see how to serialize/deserialize messages using protobuf.

Protobuf Schema

Protobuf allows us to serialize/deserialize messages.

syntax = "proto3";
import "google/protobuf/timestamp.proto";

option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";

enum InvoiceStatus {
  PAID = 0;
  PENDING = 1;
}

message Invoice {
  string id = 1;
  google.protobuf.Timestamp created_at = 3;
  Customer customer = 4;
  repeated Product products = 5;
  InvoiceStatus status = 6;
}

message Product {
  string id = 1;
  string name = 2;
  string code = 3;
  double price = 4;
}

message Customer {
  string id = 1;
  string firstName = 2;
  string lastName = 3;
  Address address = 4;
}

message Address {
  string city = 1;
  string zipCode = 2;
  string street = 3;
}

Configurations

It is possible to produce with or without Schema Registry. It'll depend on the configurations.

Producer:

if (useSchemaRegistry) {
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
} else {
    // ProtobufSerializer is a custom class
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class);
}

Consumer:

if (useSchemaRegistry) {
    props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
    // pass the confluent protobuf deserializer
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
    // you have to pass the protobuf class
    props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Invoice.class);
} else {
    // ProtobufDeserializer is a custom class
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class);
    // here we pass a configuration to the custom deserializer
    props.put(ProtobufDeserializer.PROTOBUF_PARSER, Invoice.parser());
}

Setup

Create a topic to produce protobuf messages without Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.invoices

Create a topic to produce protobuf messages with Schema Registry:

kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic client.schema.invoices

Produce/Consume without Schema Registry

gradle kafka-protobuf-clients:run --args="produce client.invoices 100"
gradle kafka-protobuf-clients:run --args="consume client.invoices"

Produce/Consume with Schema Registry

gradle kafka-protobuf-clients:run --args="produce -s client.schema.invoices 100"
gradle kafka-protobuf-clients:run --args="consume -s client.schema.invoices"