Producing and Consuming Natives

Now we are going to develop consumers and producer with java.

Create All the Topics

for topic in "client.string" "client.integer" "client.long" "client.float" "client.double" "client.boolean" ; do \
kafka-topics --create --bootstrap-server kafka1:9092 \
             --replication-factor 3 \
             --partitions 3 \
             --topic $topic ; done

Produce

This code example shows you how to produce a message. The configuration ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG define what type are we using.

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getSerializer());
KafkaProducer<String, V> producer = new KafkaProducer<>(props);

for (int i = 0; i < messages; i++) {
    V value = createValue();
    ProducerRecord<String, V> record = new ProducerRecord<>(
        topic,
        value
    );
    producer.send(
        record,
        (metadata, exception) -> log.info("Producing message: {}", value)
    );
}

Then, produce for each type:

for type in "string" "integer" "long" "float" "double" "boolean" ; do \
  gradle kafka-native-clients:run --args="produce client.$type $type 100" ; done

Consume

It's going to be the same for the consumer, but in this case you have to use the deserializer: ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG.

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getDeserializer());
KafkaConsumer<String, V> consumer = new KafkaConsumer<>(props);

ConsumerRecords<String, V> records = consumer.poll(Duration.ofMillis(500));

for (ConsumerRecord<String, V> record : records) {
    log.info("Supplier ID: {}", record.value());
}

Consume a topic, for example client.string:

gradle kafka-native-clients:run --args="consume client.string string"