Kafka Transactions and Exactly-Once Semantics

How to build robust exactly-once processing in Apache Kafka with idempotent producers, transactions, and read_committed.

tl;dr

  • Idempotence (enable.idempotence=true) prevents duplicate writes, but it does not give you end-to-end exactly-once processing.

  • For read-process-write pipelines, you need Kafka transactions and offset commits in the same transaction (send_offsets_to_transaction).

  • Consumers must use isolation.level=read_committed to avoid reading data from aborted transactions.

  • Exactly-once has overhead: larger transactions usually improve throughput, but increase commit latency.

  • If you want fewer implementation details to manage, Kafka Streams is often the pragmatic choice.

Exactly-Once Semantics

In distributed systems, we must always assume that individual components can fail. In general, it is not possible to send a message exactly once across an unreliable network.

At-most-once delivery is easy to achieve. In Kafka, set acks=0 on the producer. The producer sends the message and does not wait for confirmation. If the message arrives, it arrives once. If it does not, there is no retry. So it is delivered at most once.

At-least-once delivery is also straightforward. Set acks=all on the producer. The producer sends a message and waits for the leader’s acknowledgment. Before acknowledging, the leader waits until all in-sync replicas have received the message. If the producer gets the acknowledgment, the record is durably replicated under reasonable assumptions.

Set min.insync.replicas=2 to avoid writes being acknowledged by only one surviving broker. If an acknowledgment fails, the producer retries, which can create duplicates.

Exactly-once is harder. We cannot just "turn on" perfect delivery in distributed systems. Still, on the producer side, acks=all, min.insync.replicas=2, and enable.idempotence=true provide exactly-once write semantics for a running producer session.

How does this work? The producer sends a sequence number with each record. It starts at 0 and increments for every message. If the leader receives sequence 2 before sequence 1, it detects a gap and temporarily rejects 2. The producer then retries 1 and 2. Once records arrive in order, they are acknowledged after replication. What if the producer does not receive the ack? It retries. Since the leader has already processed that sequence number, it drops the duplicate and only acknowledges it.

So idempotence gives you duplicate-free, in-order writes for one producer session, as long as the producer stays alive and does not hit delivery.timeout.ms.

Is this already end-to-end exactly-once? Not yet.

Idempotence solves duplicate writes by one producer. For read-process-write pipelines, you also need transactions.

Transactions in Kafka

Producer idempotence alone cannot guarantee that each message is processed exactly once.

Imagine a service that reads messages from Kafka, processes them, and writes results to another topic.

One critical part is still missing: in Kafka, reading is also writing because offsets must be committed. To guarantee exactly-once processing, we need a way to atomically execute multiple writes. In short: we need transactions.

# Pseudo-code

# By default, we can ensure a message is written exactly once
# while the producer session remains alive.
producer.send(msg1)
producer.send(msg2)
# If the service crashes here, msg1 and msg2 were written, but msg3 was not.
producer.send(msg3)
consumer.commit_offset()
# and offsets were not committed either
# so msg1 and msg2 may be processed again after restart.

This is exactly what Kafka transactions give us:

Atomic multi-write behavior. No more, no less.

# Pseudo-code
producer.begin_transaction()
producer.send(msg1)
producer.send(msg2)
# If we crash here, the transaction is aborted automatically.
producer.send(msg3)
# What about offsets?
producer.commit_transaction()

The catch: consumers do not participate in transactions, so they cannot commit offsets inside a transaction on their own. The solution is to let the producer write offsets on the consumer’s behalf.

But wait - aren’t Kafka logs immutable? Yes. Records from later-aborted transactions are still written to the log. They are not deleted; they are skipped during reads. This only works if consumers are configured with isolation.level=read_committed, and that is not the default.

Always set isolation.level=read_committed on consumers. Even if you are not using transactions today, future you will thank you.

See also Essential Kafka Client Configurations for more important client settings.

Implementation

Let’s walk through an implementation in Python using Confluent Kafka Python for brevity. The same pattern applies in Java.

Configure clients

For a read-process-write loop, we need both a producer and a consumer:

from confluent_kafka import Consumer, Producer, TopicPartition

producer_props = {
    'bootstrap.servers': 'localhost:9092',
    # See dataflow.academy/en/knowledge-hub/essential-kafka-client-configurations for important settings.
    'partitioner': 'murmur2_random',  # Keep partitioning compatible with Java clients.
    'transactional.id': 'bank-transaction-transformer-1'  # Must be unique.
    # Recommendation: use a StatefulSet and derive transactional.id from hostname or pod identity.
}
producer = Producer(producer_props)
producer.init_transactions()  # Call once during startup.

In non-Java clients, use partitioner=murmur2_random to keep partitioning behavior compatible with Java. To use transactions, you must set transactional.id. This protects against zombie producers: when a producer calls init_transactions(), any old open transaction with the same transactional.id is aborted.

transactional.id must be unique per producer instance and should stay stable across restarts. In Kubernetes, a StatefulSet is usually the safest option.

consumer_props = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,  # Disable auto commits.
    'isolation.level': 'read_committed'  # Default is read_uncommitted.
}
consumer = Consumer(consumer_props)
consumer.subscribe(["bank-transactions"])

On the consumer side, use enable.auto.commit=False because offset commits should happen as part of the producer transaction. Also use isolation.level=read_committed so only committed transactional data is read.

Read-process-write loop

Now we can implement the main loop:

while True:
    message = consumer.poll(0.1)
    if message is None:
        continue
    if message.error():
        raise Exception(message.error())

    result_message = process_message(message)  # Business logic

    # NAIVE implementation. Not production-ready.
    producer.begin_transaction()
    producer.produce("target-topic", result_message)
    offsets_to_commit = [TopicPartition(message.topic(), message.partition(), message.offset() + 1)]
    producer.send_offsets_to_transaction(offsets_to_commit, consumer.consumer_group_metadata())
    producer.commit_transaction()

The key point is atomicity between producing output and committing offsets. Both operations must happen within the same transaction: begin transaction, write output, add offsets via send_offsets_to_transaction, commit transaction.

The committed offset must be the next record to read, so use message.offset() + 1.

Transaction overhead

What is the downside of this approach? Transaction overhead is roughly linear with the number of partitions, but almost constant with respect to the number of records inside one transaction. So batching more records per transaction lowers overhead per record.

For example, if each transaction adds 10 ms overhead and your processing logic takes 10 ms, overhead is around 100%. If you process 10 messages in one transaction and total time is 100 ms, overhead drops to roughly 10%. The trade-off is higher end-to-end latency because consumers must wait for commits.

You also do not need to start transactions too early. Start them only when the first message arrives. Let’s initialize a few global variables:

from time import monotonic
offsets_to_commit = {}  # {(topic, partition): offset}
last_commit = monotonic()  # Timestamp of last commit
commit_every_s = 1.0  # Commit interval
is_in_tx = False  # Whether a transaction is open

Now define a helper that commits every commit_every_s seconds:

def commit_transaction():
    global is_in_tx, last_commit, offsets_to_commit
    commit_list = []  # confluent-kafka expects a list of TopicPartition.
    for (topic, partition), offset in offsets_to_commit.items():
        commit_list.append(TopicPartition(topic, partition, offset))

    # Offset commit
    producer.send_offsets_to_transaction(commit_list, consumer.consumer_group_metadata())
    # Transaction commit
    producer.commit_transaction()
    # Reset state
    is_in_tx = False
    last_commit = monotonic()
    offsets_to_commit = {}

And a helper to begin transactions:

def begin_transaction():
    global is_in_tx, last_commit

    producer.begin_transaction()
    is_in_tx = True
    last_commit = monotonic()

We should also shut down cleanly. Just like in Java, register a shutdown hook:

import signal
import sys

def shutdown_hook(signum, frame):
    global consumer, producer, is_in_tx
    print("Shutting down...")
    # Clean shutdown
    if is_in_tx:
        commit_transaction()
    consumer.close()
    producer.flush()
    sys.exit(0)

signal.signal(signal.SIGTERM, shutdown_hook)
signal.signal(signal.SIGINT, shutdown_hook)  # Ctrl+C

Now the main loop:

while True:
    # Commit when it is time.
    if monotonic() - last_commit > commit_every_s and is_in_tx:
        commit_transaction()

    # Poll frequently to keep overhead low.
    message = consumer.poll(0.1)
    if message is None:
        continue
    if message.error():
        raise Exception(message.error())
    # Start a transaction with the first received message.
    if not is_in_tx:
        begin_transaction()

    result_message = process_message(message)
    producer.produce("target-topic", result_message)

    # Prepare offsets.
    topic_partition = (message.topic(), message.partition())
    offsets_to_commit[topic_partition] = message.offset() + 1  # Next offset to commit

That is the core implementation. As you can see, low-level consumers and producers make this fairly complex. In many real-world cases, Kafka Streams is the better option because it provides these abstractions out of the box. To enable transactional safety there, set processing.guarantee=exactly_once_v2.

The main downside of Kafka Streams is that it requires a more functional style of thinking and is available only in Java.

Database vs. Kafka transactions (ACID)

When I think of transactions, I usually think of ACID in relational databases. Kafka transactions are useful, but less powerful than classic ACID transactions. Let’s break that down:

  • Atomicity: Either all operations in a transaction succeed, or all fail. Kafka gives us this.

  • Consistency: Databases can guarantee consistency under explicit constraints. Kafka cannot provide the same strict consistency guarantees across a full data model.

  • Isolation: Databases provide isolation levels for concurrent transactions. In Kafka, only producers directly participate in transactions, so this is only partially comparable.

  • Durability: Once committed, data is durably stored. Kafka also gives us this.

In short: Kafka gives you strong A, partial I, and solid D. If you need full ACID behavior, you still need a relational database.

Conclusion

Idempotence is an important building block, but end-to-end exactly-once in read-process-write scenarios requires transactions and correctly configured consumers with read_committed. If you do not want to own this complexity yourself, Kafka Streams is often the pragmatic path.

About Anatoly Zelenin
Hi, I’m Anatoly! I love to spark that twinkle in people’s eyes. As an Apache Kafka expert and book author, I’ve been bringing IT to life for over a decade—with passion instead of boredom, with real experiences instead of endless slides.

Continue reading

article-image
Debezium: Change Data Capture for Apache Kafka

In this post, we'll show you how to reliably import data from various databases into Kafka in near real-time using Debezium. Debezium is a Kafka Connect plugin that connects to the internal log of databases to capture changes and write them to Kafka.

Read more
article-image
Why Apache Kafka?

Every company that processes countless data streams can optimize them using Apache Kafka. The first lesson of this four-part blog series shows you why larger organizations should turn to this software.

Read more