Interactive Queries

Interactive Queries allow you to leverage the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable.

This example is using gRPC to request queries to the kafka stream server.

Query the total supplier count by a given country:

kafka-streams count <country>

Example:

kafka-streams count Ecuador

Output:

Country: Ecuador, Total Suppliers: 4

Take into account that you have to run the stream first. Check the Kafka Streams section.

Check the stream topology:

package kafka.sandbox.cli;

import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import kafka.sandbox.avro.Supplier;
import kafka.sandbox.grpc.CounterService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.Server;
import picocli.CommandLine;
import picocli.CommandLine.Command;

@Slf4j
@Command(name = "streams", description = "Creates and start kafka streams")
public class Streams implements Callable<Integer> {

    public static final String TOPIC_FROM = "kafka-clients.suppliers";
    public static final String TOPIC_TO = "kafka-streams.supplier_counts_by_country";
    private final Properties props;

    public Streams(Properties props) {
        this.props = props;
    }

    @Override
    public Integer call() throws Exception {
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        StreamsBuilder builder = new StreamsBuilder();

        // read from suppliers topic
        KStream<String, Supplier> suppliers = builder.stream(TOPIC_FROM);

        // aggregate the new supplier counts by country
        KTable<String, Long> aggregated = suppliers
                // map the country as key
                .map((key, value) -> new KeyValue<>(value.getCountry(), value))
                .groupByKey()
                // aggregate and materialize the store
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("SupplierCountByCountry"));

        // print results
        aggregated
                .toStream()
                .foreach((key, value) -> log.info("Country = {}, Total supplier counts = {}", key, value));

        // write the results to a topic
        aggregated.toStream().to(TOPIC_TO, Produced.with(stringSerde, longSerde));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp();

        // GRPC Server
        Server server = Grpc.newServerBuilderForPort(5050, InsecureServerCredentials.create())
                .addService(new CounterService(streams))
                .build();

        // attach shutdown handler to catch control-c and creating a latch
        CountDownLatch latch = new CountDownLatch(1);
        Runtime
                .getRuntime()
                .addShutdownHook(
                        new Thread("consumer-shutdown-hook") {
                            @Override
                            public void run() {
                                server.shutdownNow();
                                streams.close();
                                latch.countDown();
                            }
                        });

        streams.start();
        server.start();
        latch.await();

        return CommandLine.ExitCode.OK;
    }
}

Check the gRPC server:

package kafka.sandbox.grpc;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;

import io.grpc.stub.StreamObserver;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceImplBase;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;

public class CounterService extends CounterServiceImplBase {

    private KafkaStreams streams;

    public CounterService(KafkaStreams streams) {
        this.streams = streams;
    }

    @Override
    public void getCountByCountry(CountRequest request, StreamObserver<CountReply> responseObserver) {
        ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
                StoreQueryParameters.fromNameAndType("SupplierCountByCountry", QueryableStoreTypes.keyValueStore()));

        String country = request.getName();
        Long total = keyValueStore.get(country);
        String value = String.format("Country: %s, Total Suppliers: %s", country, total != null ? total : 0);
        CountReply reply = CountReply.newBuilder().setMessage(value).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

Check the gRPC client:

package kafka.sandbox.cli;

import java.util.concurrent.Callable;

import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import kafka.sandbox.proto.CountReply;
import kafka.sandbox.proto.CountRequest;
import kafka.sandbox.proto.CounterServiceGrpc;
import kafka.sandbox.proto.CounterServiceGrpc.CounterServiceBlockingStub;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Parameters;

@Slf4j
@Command(name = "count", description = "Print the total supplier count by a given country")
public class Count implements Callable<Integer> {

    @Parameters(index = "0", description = "Country (default: ${DEFAULT-VALUE})", defaultValue = "Ecuador")
    private String country;

    @Override
    public Integer call() throws Exception {
        ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
                .build();

        CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
        CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
        log.info(countByCountry.getMessage());

        return CommandLine.ExitCode.OK;
    }

}