Kafka Streams’ Queryable State allows you to query the results of your stream processing directly from any Kafka Streams application instance. The challenge is, you don’t always know which specific instance (host and port) is holding the data you need, especially in a distributed, dynamic environment.

Let’s see it in action. Imagine a simple Kafka Streams application that counts word occurrences in a topic named words.

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.common.serialization.Serdes;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class WordCountQueryableState {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "word-count-queryable-state");
        props.put(org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        // Enable the interactive queries
        props.put(org.apache.kafka.streams.StreamsConfig.STATE_STORE_TYPE_CONFIG, "rocksdb"); // RocksDB is default but explicit is good
        props.put("rocksdb.config.block-cache-size", "67108864"); // Example RocksDB config

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.stream("words");

        KStream<String, Long> wordCounts = source
            .flatMapValues(value -> java.util.Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .count()
            .toStream();

        // This is the state store we want to query
        wordCounts.to("word-counts", org.apache.kafka.streams.kstream.Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp(); // For development purposes
        streams.start();

        // Add a shutdown hook for graceful termination
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                streams.close();
            } catch (Exception e) {
                // handle exception
            }
        }));

        // Example of how to query locally
        String storeName = "word-count-processor-state-store"; // Default store name for count()
        System.out.println("Waiting for streams to start...");
        TimeUnit.SECONDS.sleep(10); // Give it time to start and process some data

        // To query dynamically, you'd typically have a separate service or endpoint
        // that uses streams.allStores(storeName, QueryableStoreTypes.keyValueStore())
        // to find all instances and then route the query.
        // For a simple local example:
        ReadOnlyKeyValueStore<String, Long> localStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
        System.out.println("Local count for 'kafka': " + localStore.get("kafka"));
    }
}

When you run multiple instances of this application, each instance will maintain its own local RocksDB state store for the word-count-processor-state-store. If you want to query the count for "kafka" and instance A has it, you query instance A. If instance B has it, you query instance B. The challenge is knowing which instance to query.

This is where Kafka Streams’ discovery mechanism for Queryable State comes in. Internally, Kafka Streams uses Kafka’s consumer group coordination to manage its state. Each application instance registers itself with the Kafka cluster, and this registration information, including its network address, is made available. When you call streams.allStores(storeName, QueryableStoreTypes.keyValueStore()), Kafka Streams consults this internal registry to find all instances that are hosting the specified state store.

The allStores method returns a list of QueryMetadata objects. Each QueryMetadata contains information about a specific instance hosting the store, including its host, port, and the store name. Your application can then iterate through this list, pick an instance (e.g., round-robin, or based on which partition it’s responsible for), and send an HTTP request (or use another RPC mechanism) to that instance’s REST endpoint to retrieve the data.

The default REST endpoint for Kafka Streams interactive queries is exposed on the streams.http-host and streams.http-port configuration properties. If these are not set, it will default to localhost and a dynamically assigned port (or you can set streams.port for a fixed port).

Here’s how you might query across instances:

Suppose you have two instances running:

  • Instance 1: localhost:8080 (assuming streams.http-host=localhost and streams.http-port=8080 are set)
  • Instance 2: localhost:8081 (assuming streams.http-host=localhost and streams.http-port=8081 are set)

You would write a client that does something like this:

// This is a conceptual client, not part of the Streams app
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryMetadata;
import org.apache.kafka.streams.KafkaStreams; // Assuming you have access to a KafkaStreams instance or its metadata

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class QueryClient {

    private final KafkaStreams streams; // Or a way to get QueryMetadata
    private final String storeName;
    private final HttpClient httpClient = HttpClient.newHttpClient();

    public QueryClient(KafkaStreams streams, String storeName) {
        this.streams = streams;
        this.storeName = storeName;
    }

    public Long queryCountFor(String word) {
        // This is the magic: get metadata for all instances hosting the store
        Set<QueryMetadata> queryMetadata = streams.allStores(storeName, QueryableStoreTypes.keyValueStore());

        if (queryMetadata.isEmpty()) {
            System.err.println("No instances found hosting store: " + storeName);
            return null;
        }

        // Example: pick the first one found (in a real app, you'd have better logic)
        QueryMetadata metadata = queryMetadata.iterator().next();
        HostInfo hostInfo = metadata.hostInfo();

        try {
            String url = String.format("http://%s:%d/state/%s/%s",
                                       hostInfo.host(),
                                       hostInfo.port(),
                                       storeName,
                                       word);

            HttpRequest request = HttpRequest.newBuilder()
                                      .uri(URI.create(url))
                                      .build();

            HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());

            if (response.statusCode() == 200) {
                return Long.parseLong(response.body());
            } else {
                System.err.println("Error querying " + hostInfo + ": " + response.statusCode());
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) {
        // In a real scenario, you'd have a running KafkaStreams instance to query metadata from.
        // For this example, we'll simulate it.
        // Assume streams.allStores() would return something like this:
        List<QueryMetadata> mockMetadata = List.of(
            new QueryMetadata() { // Mock implementation
                @Override public String getStoreName() { return "word-count-processor-state-store"; }
                @Override public HostInfo getHostInfo() { return new HostInfo("localhost", 8080); }
                @Override public String getTopicName() { return null; } // Not relevant for KeyValueStore
                @Override public Integer getPartition() { return null; } // Not relevant for KeyValueStore
            },
            new QueryMetadata() { // Mock implementation
                @Override public String getStoreName() { return "word-count-processor-state-store"; }
                @Override public HostInfo getHostInfo() { return new HostInfo("localhost", 8081); }
                @Override public String getTopicName() { return null; }
                @Override public Integer getPartition() { return null; }
            }
        );

        // A real QueryClient would have access to a running KafkaStreams instance
        // and call streams.allStores() directly.
        // For this mock, we'll simulate the outcome.

        System.out.println("Simulating query...");
        // Simulate querying instance at localhost:8080 for "kafka"
        System.out.println("Querying for 'kafka' via mock instance at localhost:8080...");
        // In a real client, this would involve an HTTP call.
        // Here, we just print what would happen.
        System.out.println("Simulated result for 'kafka' from localhost:8080: 42"); // Placeholder
    }
}

The key here is streams.allStores(storeName, QueryableStoreTypes.keyValueStore()). This method, when called on a running KafkaStreams instance, queries the internal Kafka Streams state registry. This registry is built and maintained by Kafka Streams itself, leveraging Kafka’s consumer group protocol. Each instance of your Kafka Streams application registers its available state stores and its network endpoint (defined by streams.http-host and streams.http-port) with the Kafka cluster. When allStores is invoked, Kafka Streams retrieves this information from Kafka, effectively giving you a dynamic map of which application instance is hosting which state store.

The most surprising thing about Kafka Streams Queryable State is that it doesn’t rely on an external service discovery mechanism like ZooKeeper or Consul. Instead, it cleverly repurposes Kafka’s own consumer group coordination to manage the metadata about which instances are running and which state stores they expose, making it feel almost magical how instances find each other.

The configuration for the HTTP server that exposes the state is crucial. streams.http-host determines the IP address or hostname that the REST API will bind to. If not set, it defaults to localhost. streams.http-port sets the port. If not set, Kafka Streams will try to find an available port. For dynamic discovery to work across different machines, streams.http-host must be set to an address accessible from other machines (e.g., an internal network IP or 0.0.0.0 to bind to all interfaces).

The QueryMetadata object, which is what streams.allStores() returns, is an abstraction that encapsulates the location of a state store. It provides the HostInfo (host and port) and the storeName. The HostInfo is critical for constructing the endpoint to query. The REST API endpoint for querying a specific key in a key-value store typically follows the pattern /state/{storeName}/{key}.

The default REST endpoint is exposed using Jetty. You can customize this or even disable it if you prefer a different communication mechanism, but the default is convenient for many use cases. The streams.statestore.wrapper configuration can be used to inject custom state store wrappers, which could potentially alter how queries are handled or exposed, but this is an advanced customization.

The next step in mastering Queryable State is understanding how to handle state store partitions and ensuring your queries are routed to the instance that actually owns the data for a given key, rather than just any instance that happens to be hosting the store.

Want structured learning?

Take the full Kafka-streams course →