Interactive Queries

Interactive Queries allow you to leverage the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable.

This example is using gRPC to request queries to the kafka stream server.

Query Results

Having kafka streams running (see Kafka Streams), you can query form the terminal a specific country, example:

gradle -q kafka-streams:run --args="count Ecuador"

This is possible because we have access to the kafka streams stores in runtime. This way we can query over the store a return and response.

Check the gRPC server:

ReadOnlyKeyValueStore<String, Long> keyValueStore = streams.store(
        StoreQueryParameters.fromNameAndType("SupplierCountByCountry", QueryableStoreTypes.keyValueStore()));

String country = request.getName();
Long total = keyValueStore.get(country);
String value = String.format("Country: %s, Total Suppliers: %s", country, total != null ? total : 0);
CountReply reply = CountReply.newBuilder().setMessage(value).build();
responseObserver.onNext(reply);

Check the gRPC client:

ManagedChannel channel = Grpc.newChannelBuilder("localhost:5050", InsecureChannelCredentials.create())
                .build();

CounterServiceBlockingStub blockingStub = CounterServiceGrpc.newBlockingStub(channel);
CountReply countByCountry = blockingStub.getCountByCountry(CountRequest.newBuilder().setName(country).build());
System.out.println(countByCountry.getMessage());