Kafka’s "exactly-once" semantics aren’t really about guaranteeing a message is delivered precisely one time to a consumer. Instead, it’s about ensuring that a transaction involving multiple Kafka operations (like producing to one topic and consuming from another, or producing multiple messages within a single producer session) either fully succeeds or fully fails, without leaving partial states in Kafka. This transactional capability is key to building robust, fault-tolerant stream processing applications.

Let’s see how this plays out with transactional producers. Imagine you have a service that reads data, transforms it, and then writes the transformed data to a new Kafka topic. Without transactions, if this service crashes after writing the transformed data but before acknowledging the original read (or marking it as processed), you could end up with duplicate processed data or lost original data.

Here’s a simplified producer sending a message transactionally:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import uuid

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    enable_idempotence=True,  # Crucial for exactly-once within a single producer session
    transactional_id='my-transactional-producer-id',
    acks='all',  # Ensure all replicas acknowledge the write
    retries=5,
    value_serializer=lambda x: x.encode('utf-8')
)

# Initialize the transaction
producer.init_transactions()

try:
    # Begin a new transaction
    producer.begin_transaction()

    # Produce messages within the transaction
    message_key = str(uuid.uuid4())
    message_value = "Transaction Payload 1"
    print(f"Producing key: {message_key}, value: {message_value}")
    producer.send('transactional-topic', key=message_key.encode('utf-8'), value=message_value.encode('utf-8'))

    message_key_2 = str(uuid.uuid4())
    message_value_2 = "Transaction Payload 2"
    print(f"Producing key: {message_key_2}, value: {message_value_2}")
    producer.send('transactional-topic', key=message_key_2.encode('utf-8'), value=message_value_2.encode('utf-8'))

    # If this was part of a larger flow, you might also be consuming from another topic
    # and marking those messages as processed here (which would require a transactional consumer)

    # Commit the transaction
    producer.commit_transaction()
    print("Transaction committed successfully.")

except KafkaError as e:
    print(f"An error occurred: {e}")
    # Abort the transaction on error
    producer.abort_transaction()
    print("Transaction aborted.")
finally:
    producer.close()

The core idea behind Kafka’s transactional capabilities revolves around a transaction coordinator within the Kafka cluster. When init_transactions() is called, the producer registers its transactional_id with the coordinator. This ID is how Kafka uniquely identifies a specific producer’s transactional activity across restarts.

When begin_transaction() is called, the producer asks the coordinator to start a new transaction for it. The coordinator assigns a unique transaction ID and an epoch number to this transaction. All subsequent send() calls within this transaction are associated with this transaction ID.

The magic happens with commit_transaction() and abort_transaction(). When you commit, the producer signals the coordinator. The coordinator then performs two critical steps:

  1. Marks the produced messages as "committed": This makes them visible to consumers configured to read transactional data.
  2. Writes a transaction marker to the log: This marker indicates the transaction’s outcome.

If the producer crashes before committing, the transaction remains open. When the producer restarts, it re-initializes transactions, and the coordinator will detect the incomplete transaction. It then cleans it up by aborting it, ensuring none of the messages produced during that incomplete transaction are ever made visible to consumers.

Conversely, if commit_transaction() is called and the producer crashes after signaling the commit but before the coordinator fully writes the marker, the coordinator will still eventually complete the commit because it has a record of the producer’s intent. This is where acks='all' and enable_idempotence=True become essential: acks='all' ensures the message is replicated to all in-sync replicas, and enable_idempotence=True prevents duplicate messages within a single producer session by using sequence numbers. The transactional API builds on top of idempotence to provide atomicity across multiple operations.

The key component enabling this is the transactional_id. It’s a persistent identifier that Kafka uses to track the state of a producer’s transactions. If a producer restarts, it uses the same transactional_id to connect to the transaction coordinator. The coordinator can then resume or clean up any ongoing transactions associated with that ID. This allows you to build applications that can recover from failures and continue processing without data loss or duplication.

What most people don’t realize is how the transaction coordinator handles broker failures. If the broker acting as the transaction coordinator for a particular transaction fails, Kafka automatically elects a new coordinator from the remaining brokers. The producer will then re-establish its connection with the new coordinator, and the transaction can continue or be properly finalized. This resilience is built into the Kafka protocol itself.

The next step is to explore how consumers can read these transactional messages correctly, typically by configuring their isolation.level to read_committed.

Want structured learning?

Take the full Kafka course →