Kafka’s brilliance isn’t in its durability, it’s in its ability to make future systems predictable by making past systems immutable.
Let’s see this in action. Imagine a simple producer sending user events to Kafka:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class UserEventProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Addacks: 1 is default, but for production, we want at least 2 for durability
props.put(ProducerConfig.ACKS_CONFIG, "2");
// Retries: Increase for production to handle transient network issues
props.put(ProducerConfig.RETRIES_CONFIG, "3");
// Enable idempotence for exactly-once semantics
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
String key = "user-" + i;
String value = "{\"userId\": " + i + ", \"event\": \"login\", \"timestamp\": " + System.currentTimeMillis() + "}";
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, value);
// Asynchronous send with callback
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
System.err.println("Error sending record: " + exception.getMessage());
exception.printStackTrace();
}
});
}
// Flush to ensure all buffered records are sent
producer.flush();
System.out.println("All records sent.");
}
}
}
And here’s a consumer that processes these events:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class UserEventConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-event-processor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Auto commit is dangerous in production. Use manual commits.
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Set isolation level to read_committed for transactional consumers
// props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("user-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received record: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// Process the record...
}
// Commit offsets manually after processing
consumer.commitSync();
System.out.println("Committed offsets.");
}
}
}
}
}
This setup addresses a core problem: reliably moving data between independent services without tight coupling. Kafka acts as a central, fault-tolerant log. Producers write events, and consumers read them at their own pace. The "future systems" part comes in because the data is preserved in Kafka. If you need to build a new service that analyzes user logins from six months ago, you can simply spin up a new consumer that reads from the beginning of the user-events topic. The past is a resource, not a historical artifact.
The producer’s acks=2 setting means the leader broker and one follower broker must acknowledge the write before it’s considered successful. This is a good balance for durability without excessive latency. retries=3 ensures that transient network glitches or temporary broker unavailability don’t cause message loss. Crucially, enable.idempotence=true (which implicitly sets acks=all, retries > 0, and max.in.flight.requests.per.connection <= 5) prevents duplicate messages from being written to Kafka, even if the producer retries a send operation. This is the foundation for exactly-once processing semantics.
For the consumer, disabling enable.auto.commit is paramount. If auto-commit were enabled, Kafka would periodically commit offsets based on what the consumer pulled, not necessarily what it processed. If the consumer crashed after pulling a batch but before processing it, those messages would be lost upon restart because the offset would have already been committed. Manual commits (consumer.commitSync()) happen after processing, guaranteeing that a committed offset means all preceding messages have been successfully handled.
A critical, often overlooked, aspect of robust Kafka consumers is how they handle rebalancing. When a consumer group experiences a rebalance (e.g., a consumer starts or stops), all consumers in that group are temporarily stopped. During this pause, the consumer must ensure any in-flight processing is finished and offsets are committed. If you are performing complex, multi-step processing, this commit point becomes vital. If your processing involves external database writes, you might even consider using Kafka transactions with isolation.level=read_committed on the consumer to ensure that either all messages in a batch are processed and committed to the database, or none of them are, preventing partial updates from corrupting your downstream state.
The next step in building a truly production-ready Kafka system is to understand how to monitor its health and performance, looking at metrics like producer throughput, consumer lag, and broker resource utilization.