Protobuf Producer and Consumer
Next, you will see how to serialize/deserialize messages using protobuf.
Other Links
Protobuf Schema
Protobuf allows us to serialize/deserialize messages.
syntax = "proto3";
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
option java_package = "kafka.sandbox.proto";
enum InvoiceStatus {
PAID = 0;
PENDING = 1;
}
message Invoice {
string id = 1;
google.protobuf.Timestamp created_at = 3;
Customer customer = 4;
repeated Product products = 5;
InvoiceStatus status = 6;
}
message Product {
string id = 1;
string name = 2;
string code = 3;
double price = 4;
}
message Customer {
string id = 1;
string firstName = 2;
string lastName = 3;
Address address = 4;
}
message Address {
string city = 1;
string zipCode = 2;
string street = 3;
}
Configurations
It is possible to produce with or without Schema Registry. It'll depend on the configurations.
Producer:
if (useSchemaRegistry) {
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
} else {
// ProtobufSerializer is a custom class
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class);
}
Consumer:
if (useSchemaRegistry) {
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
// pass the confluent protobuf deserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
// you have to pass the protobuf class
props.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, Invoice.class);
} else {
// ProtobufDeserializer is a custom class
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufDeserializer.class);
// here we pass a configuration to the custom deserializer
props.put(ProtobufDeserializer.PROTOBUF_PARSER, Invoice.parser());
}
Setup
Create a topic to produce protobuf messages without Schema Registry:
kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic client.invoices
Create a topic to produce protobuf messages with Schema Registry:
kafka-topics --create --bootstrap-server kafka1:9092 \
--replication-factor 3 \
--partitions 3 \
--topic client.schema.invoices
Produce/Consume without Schema Registry
gradle kafka-protobuf-clients:run --args="produce client.invoices 100"
gradle kafka-protobuf-clients:run --args="consume client.invoices"
Produce/Consume with Schema Registry
gradle kafka-protobuf-clients:run --args="produce -s client.schema.invoices 100"
gradle kafka-protobuf-clients:run --args="consume -s client.schema.invoices"