The most surprising thing about Kafka Streams session windows is that they don’t actually group events by "session" as you might intuitively think; they group events by gaps.

Let’s say you’re tracking user activity on a website. You get events like user_id, timestamp, action. You want to group all actions by a single user into "sessions," where a session is defined by a period of activity followed by a period of inactivity.

Here’s a simplified Kafka Streams application that does this:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;

import java.time.Duration;
import java.util.Properties;

public class SessionWindowExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "session-window-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CONFIG, Serdes.String().getClass());
        // Increase default cache size to handle potentially large state stores
        props.put(StreamsConfig.STATE_CACHE_MAX_BYTES_CONFIG, 1024 * 1024 * 10L); // 10MB

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("user-activity-topic");

        source
            // Assuming the value is "userId:action"
            .mapValues(value -> value.split(":")[1]) // Extract the action
            .groupByKey() // Group by user ID (which is the key in this example)
            .windowedBy(SessionWindows.with(Duration.ofMinutes(30))) // Define session windows with a 30-minute gap
            .count(Materialized.as("session-counts")); // Count events within each session

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.cleanUp(); // Good practice for development/testing
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

In this example, SessionWindows.with(Duration.ofMinutes(30)) tells Kafka Streams to consider a new session to start if there’s more than 30 minutes of inactivity after the last event for a given key. All events that arrive within 30 minutes of each other (with no gaps larger than 30 minutes) will be considered part of the same session.

Let’s trace some events:

  1. User A, 10:00:00, "login"

  2. User A, 10:05:00, "view_page"

    • These two events are within 30 minutes of each other. They belong to the same session.
  3. User A, 10:10:00, "add_to_cart"

    • Still within 30 minutes of the previous event. Same session.
  4. User A, 10:45:00, "logout"

    • This event arrives 35 minutes after the previous one (10:10:00). This is a gap larger than the 30-minute inactivity threshold. Therefore, a new session begins with this event.
  5. User A, 10:46:00, "view_ads"

    • This event arrives 1 minute after the previous one. It’s part of the new session that started at 10:45:00.

The key insight is that the window defines the maximum allowed gap between events to still be considered part of the same session. When a gap exceeding this threshold occurs, the current session is considered closed, and a new one begins with the next event. The actual duration of a session is determined by the timestamps of the events within it, not by the fixed inactivity period. The inactivity period is just the trigger for ending a session.

The Materialized.as("session-counts") part creates a state store. This store will hold the latest session information (start time, end time, and the count of events within it) for each user. When a new event arrives, Kafka Streams checks if it falls within an existing session for that user. If it does, the session’s end time is updated. If it doesn’t, a new session is created.

The cleanUp() call before start() is crucial during development. It ensures that any old state from previous runs of the application is cleared, preventing unexpected behavior due to stale state. In production, you’d typically manage state store cleanup more carefully, often by having separate applications for state store management or by using Kafka Streams’ built-in state store migration capabilities if you change your state store configuration.

The STATE_CACHE_MAX_BYTES_CONFIG is set to 1024 * 1024 * 10L (10MB) to give the state store enough memory. Session windows can sometimes lead to a larger number of distinct windows (and thus state) compared to fixed-size windows, especially if you have many users with sporadic activity. If this value is too low, you might encounter performance issues or out-of-memory errors related to state management.

The next logical step after aggregating events within sessions is often to analyze the duration of these sessions or to perform actions based on session completion.

Want structured learning?

Take the full Kafka-streams course →