Kafka Streams
Kafka Streams is a client library providing organizations with a particularly efficient framework for processing streaming data. It offers a streamlined method for creating applications and microservices that must process data in real-time to be effective.
- kafka streams examples
- more kafka streams examples here.
- kafka stream architecture
In this example we are going to create a data processing pipeline (topology), which
is going to count the suppliers by country (check the Kafka Clients - Avro Producer and Consumer section).
So, kafka stream is going to consume form the client.suppliers
topic, then is
going to process the data and finally is going to publish the results in the
topic streams.results
.
// read from suppliers topic
KStream<String, Supplier> suppliers = builder.stream(source);
// aggregate the new supplier counts by country
KTable<String, Long> aggregated = suppliers
// map the country as key
.map((key, value) -> new KeyValue<>(value.getCountry().toString(), value))
.groupByKey()
// aggregate and materialize the store
.count(Materialized.as("SupplierCountByCountry"));
// write the results to a topic
aggregated.toStream()
// print results
.peek((key, value) -> log.info("Country = {}, Total supplier counts = {}", key, value))
// publish results
.to(sink, Produced.with(Serdes.String(), Serdes.Long()));
// build the topology
Topology topology = builder.build();
Run Topology
gradle kafka-streams:run --args="streams client.suppliers streams.results"
Print results (in another terminal):
kafka-console-consumer --from-beginning --group kafka-streams.consumer \
--topic streams.results \
--bootstrap-server kafka1:9092 \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Send new suppliers (in another terminal):
gradle kafka-avro-clients:run --args="produce client.suppliers 100"