Kafka Transaktionen und Exactly-Once-Semantik

Wie du mit Idempotenz, Transaktionen und read_committed in Apache Kafka eine robuste Exactly-Once-Verarbeitung aufbaust.

tl;dr

  • Idempotenz (enable.idempotence=true) verhindert Duplikate beim Schreiben - liefert aber noch keine End-to-End-Exactly-Once-Verarbeitung.

  • Für Read-Process-Write brauchst du Kafka-Transaktionen und das Offset-Commit in derselben Transaktion (send_offsets_to_transaction).

  • Consumer müssen isolation.level=read_committed setzen, damit sie keine abgebrochenen Transaktionen lesen.

  • Exactly-Once hat Overhead: Größere Transaktionen verbessern den Durchsatz, erhöhen aber die Commit-Latenz.

  • Nutze Kafka Streams, wenn du dich weniger um Implementierungsdetails kümmern willst.

Exactly-Once-Semantik

In einem verteilten System müssen wir immer davon ausgehen, dass einzelne Komponenten ausfallen. Es ist in verteilten Systemen im Allgemeinen nicht möglich, eine Nachricht exakt einmal zu senden.

Es ist relativ einfach, eine Nachricht höchstens einmal zuzustellen. In Kafka setzen wir dafür acks=0 im Producer. Der Producer sendet die Nachricht und wartet nicht auf eine Bestätigung. Ob die Nachricht ankommt oder nicht, ist in diesem Modus egal. Wenn sie ankommt, dann genau einmal. Wenn nicht, wird nicht erneut versucht. Also kommt sie höchstens einmal an.

At-least-once ist ebenfalls relativ einfach umzusetzen. Wir setzen acks=all im Producer. Der Producer sendet eine Nachricht und wartet auf die Bestätigung des Leaders. Bevor der Leader bestätigt, wartet er darauf, dass alle In-Sync Replicas die Nachricht erhalten haben. Wenn der Producer die Bestätigung erhält, weiß er, dass die Nachricht angekommen ist und unter vernünftigen Annahmen auch lesbar bleibt, selbst wenn der Leader ausfällt.

Wir setzen min.insync.replicas=2, um zu vermeiden, dass nur noch ein Broker bestätigt und der Producer trotzdem weiterschreibt. Schlägt das Acknowledgement fehl, versucht der Producer, die Nachricht erneut zu senden. Dadurch können Nachrichten mehrfach ankommen.

Exactly-once ist etwas anspruchsvoller. Wir können Nachrichten nicht einfach exakt einmal senden, da das Netzwerk jederzeit ausfallen kann. Dennoch erreichen wir auf Producer-Seite mit acks=all, min.insync.replicas=2 und enable.idempotence=true eine Exactly-Once-Semantik - solange derselbe Producer aktiv ist.

Wie funktioniert das genau? Der Producer sendet in jeder Nachricht eine Sequenznummer mit. Sie beginnt bei 0 und wird mit jeder Nachricht inkrementiert. Wenn der Leader die Sequenznummer 2 vor der Sequenznummer 1 erhält, erkennt er die Lücke und lehnt die 2 zunächst ab. Der Producer sendet dann sowohl 1 als auch 2 erneut. Wenn die Nachrichten in der richtigen Reihenfolge ankommen, werden sie nach der Replikation bestätigt. Was passiert, wenn der Producer das Ack nicht erhält? Er sendet die Nachricht erneut. Da der Leader dieselbe Sequenznummer bereits verarbeitet hat, verwirft er das Duplikat und bestätigt nur noch.

Mit Idempotenz stellen wir also sicher, dass eine Nachricht pro Producer-Session nicht doppelt geschrieben wird und die Reihenfolge erhalten bleibt. Aber nur, solange der Producer lebt und nicht in den Timeout delivery.timeout.ms läuft.

Reicht das schon für End-to-End-Exactly-Once? Noch nicht.

Idempotenz löst nur das Duplikatproblem beim Schreiben durch einen Producer. Für eine Read-Process-Write-Pipeline brauchst du zusätzlich Transaktionen.

Transaktionen in Kafka

Mit Producer-Idempotenz allein können wir nicht sicherstellen, dass eine Nachricht genau einmal verarbeitet wird.

Stellen wir uns einen Service vor, der Nachrichten aus Kafka liest, verarbeitet und anschließend in ein anderes Topic schreibt.

Hier fehlt ein wichtiger Teil: Jede Leseoperation in Kafka ist zugleich auch eine Schreiboperation, weil wir Offsets committen müssen. Um eine Exactly-Once-Verarbeitung sicherzustellen, brauchen wir eine Methode, mehrere Schreiboperationen atomar auszuführen. Kurz: Wir brauchen Transaktionen.

# Pseudo-Code

# Standardmäßig können wir sicherstellen, dass eine
# Nachricht genau einmal gesendet wird, solange der Producer am Leben ist.
producer.send(msg1)
producer.send(msg2)
# Aber wenn der Service hier crasht, wurden msg1 und msg2 versendet, aber msg3 nicht.
producer.send(msg3)
consumer.commit_offset()
# und die Offsets wurden ebenfalls nicht committet
# Das heißt, bei einem Neustart werden msg1 und msg2 erneut verarbeitet.

Und genau das können wir mit Transaktionen in Kafka sicherstellen:

Mehrere Schreiboperationen atomar auszuführen. Nicht mehr und nicht weniger. Hier ein Pseudo-Code-Beispiel:

# Pseudo-Code
producer.begin_transaction()
producer.send(msg1)
producer.send(msg2)
# Wenn wir hier crashen, wird die Transaktion automatisch abgebrochen
producer.send(msg3)
# Was machen wir mit den Offsets??
producer.commit_transaction()

Das Problem dabei: Consumer nehmen nicht an Transaktionen teil, das heißt, sie können auch keine Offsets innerhalb einer Transaktion committen. Die Lösung dafür ist, dass der Producer an Stelle des Consumers die Offsets schreibt.

Aber haben wir nicht gesagt, dass Nachrichten im Log immutable sind? Ja - und tatsächlich werden Nachrichten auch bei später abgebrochenen Transaktionen ins Log geschrieben. Sie werden nicht gelöscht, sondern beim Konsumieren nur übersprungen. Dafür muss allerdings isolation.level=read_committed gesetzt sein, und das ist nicht der Standard.

Setze IMMER isolation.level=read_committed im Consumer! Egal ob du heute schon Transaktionen verwendest oder nicht. Du wirst mir irgendwann dafür dankbar sein.

Siehe auch Apache Kafka Clients richtig konfigurieren für mehr wichtige Einstellungen.

Implementierung

Schauen wir uns eine Implementierung an. Wir nutzen hier Python mit der Confluent-Kafka-Python-Bibliothek, um den Code kompakt zu halten. In Java funktioniert das Prinzip genauso.

Clients konfigurieren

Für unsere Read-Process-Write-Loop benötigen wir einen Producer und einen Consumer:

from confluent_kafka import Consumer, Producer, TopicPartition
producer_props = {
    'bootstrap.servers': 'localhost:9092',
    # Siehe dataflow.academy/de/knowledge-hub/apache-kafka-clients-richtig-konfigurieren für wichtige Einstellungen.
    'partitioner': 'murmur2_random', # Kompatibilität mit Java sicherstellen.
    'transactional.id': 'bank-transaction-transformer-1' # Muss eindeutig sein!
    # Empfehlung: StatefulSet verwenden und transactional.id auf den Hostname setzen
}
producer = Producer(producer_props)
producer.init_transactions() # Muss nur einmal aufgerufen werden.

In nicht-Java-Clients setzen wir den partitioner auf murmur2_random, um eine konsistente Partitionierung gegenüber Java sicherzustellen. Für Transaktionen musst du eine transactional.id setzen. Damit werden Zombie-Producer verhindert: Sobald ein Producer init_transactions() aufruft, wird eine eventuell offene Transaktion mit derselben transactional.id abgebrochen.

Die transactional.id muss für jede Producer-Instanz eindeutig sein - und idealerweise über Neustarts hinweg stabil bleiben. In Kubernetes solltest du dafür ein StatefulSet verwenden und die transactional.id auf den Hostname setzen.

consumer_props = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False, # Auto-Commits deaktivieren
    'isolation.level': 'read_committed' # Standard ist read_uncommitted; dann lesen wir auch abgebrochene Transaktionen

}
consumer = Consumer(consumer_props)
consumer.subscribe(["bank-transactions"])

Auf Consumer-Seite setzen wir enable.auto.commit=False, weil der Offset-Commit als Teil der Producer-Transaktion erfolgen soll. Zusätzlich setzen wir isolation.level=read_committed, damit nur erfolgreich committete Transaktionen gelesen werden.

Read-Process-Write-Loop

Nun können wir unsere Hauptschleife implementieren, die Nachrichten konsumiert, verarbeitet und in die Ziel-Topics schreibt:

while True:
    message = consumer.poll(0.1)
    if message is None:
        continue
    if message.error():
        raise Exception(message.error())

    result_message = process_message(message) # Business-Logik

    # Naive Implementierung. Nicht für den Produktiveinsatz geeignet.
    producer.begin_transaction()
    producer.produce("target-topic", result_message)
    offsets_to_commit = [TopicPartition(message.topic(), message.partition(), message.offset() + 1)]
    producer.send_offsets_to_transaction(offsets_to_commit, consumer.consumer_group_metadata())
    producer.commit_transaction()

Wir konsumieren und verarbeiten die Nachricht zunächst ganz normal. Der entscheidende Punkt ist das atomare Produzieren zusammen mit dem Offset-Commit. Beides muss innerhalb derselben Transaktion passieren: Transaktion starten, Ergebnis schreiben, Offsets über send_offsets_to_transaction eintragen, Transaktion committen.

Der zu committende Offset ist immer die nächste zu lesende Nachricht, daher message.offset() + 1.

Transaktions-Overhead

Was ist das Problem mit diesem Ansatz? Der Overhead einer Transaktion ist in etwa linear zur Anzahl der Partitionen, aber nahezu konstant in Bezug auf die Anzahl der Nachrichten innerhalb der Transaktion. Das heißt: Je mehr Nachrichten wir in eine Transaktion packen, desto geringer wird der Overhead pro Nachricht.

Wenn wir z. B. annehmen, dass eine Transaktion im Durchschnitt 10 ms Overhead verursacht und unsere Verarbeitungslogik ebenfalls 10 ms benötigt, liegen wir bei rund 100 % Overhead. Packen wir stattdessen 10 Nachrichten in eine Transaktion und brauchen dafür insgesamt 100 ms, sinkt der Overhead auf etwa 10 %. Gleichzeitig steigt aber die End-to-End-Latenz, weil Consumer auf den Commit warten müssen.

Wir müssen Transaktionen außerdem nicht unnötig früh starten, sondern erst dann, wenn die erste Nachricht eingetroffen ist. Dafür initialisieren wir zunächst einige globale Variablen:

from time import monotonic
offsets_to_commit = {} # {(topic, partition): offset}
last_commit = monotonic() # Zeitpunkt des letzten Commits
commit_every_s = 1.0 # Wie oft soll die Transaktion committet werden?
is_in_tx = False # Flag: Läuft gerade eine Transaktion?

Und nun implementieren wir die Funktion, die wir alle commit_every_s Sekunden aufrufen, um die Transaktion zu committen:

def commit_transaction():
    global is_in_tx, last_commit, offsets_to_commit
    commit_list = [] # `offsets_to_commit` ist ein Dict, Confluent Python benötigt aber eine Liste von TopicPartitions
    for (topic, partition), offset in offsets_to_commit.items():
        commit_list.append(TopicPartition(topic, partition, offset))

    # Offset-Commit
    producer.send_offsets_to_transaction(commit_list, consumer.consumer_group_metadata())
    # Transaktions-Commit
    producer.commit_transaction()
    # Status zurücksetzen
    is_in_tx = False
    last_commit = monotonic()
    offsets_to_commit = {}

Außerdem noch eine Hilfsfunktion, um Transaktionen zu beginnen:

def begin_transaction():
    global is_in_tx, last_commit

    producer.begin_transaction()
    is_in_tx = True
    last_commit = monotonic()

Jetzt fehlt noch ein sauberes Beenden der Applikation. Dafür implementieren wir - wie in Java - einen Shutdown-Hook:

import signal
import sys

def shutdown_hook(signum, frame):
    global consumer, producer, is_in_tx
    print("Shutting down...")
    # Alles sauber beenden
    if is_in_tx:
        commit_transaction()
    consumer.close()
    producer.flush()
    sys.exit(0)

signal.signal(signal.SIGTERM, shutdown_hook)
signal.signal(signal.SIGINT, shutdown_hook) # Strg+C

Und nun können wir die Hauptschleife implementieren:

while True:
    # Committen, wenn es Zeit ist
    if monotonic() - last_commit > commit_every_s and is_in_tx:
        commit_transaction()

    # Polling passiert so häufig, dass der Overhead hier in der Regel kein Problem ist
    message = consumer.poll(0.1)
    if message is None:
        continue
    if message.error():
        raise Exception(message.error())
    # Wir starten eine neue Transaktion, sobald die erste Nachricht ankommt
    if not is_in_tx:
        begin_transaction()

    result_message = process_message(message)
    producer.produce("target-topic", result_message)

    # Offsets vorbereiten
    topic_partition = (message.topic(), message.partition())
    offsets_to_commit[topic_partition] = message.offset() + 1 # Den zu committenden Offset vorbereiten

Damit sind wir beim Kern der Implementierung angekommen. Du siehst: Mit Low-Level-Consumern und -Producern ist das nicht trivial. Deshalb empfehle ich in vielen Fällen Kafka Streams statt einer eigenen Transaktionslogik. Um dort transaktionelle Sicherheit zu aktivieren, setzt du processing.guarantee=exactly_once_v2.

Das einzige Problem bei Kafka Streams: Du musst etwas anders, also funktionaler, denken - und Kafka Streams gibt es leider nur für Java.

Datenbank- vs Kafka-Transaktionen (ACID)

Wenn ich an Transaktionen denke, denke ich primär an ACID-Transaktionen in relationalen Datenbanken. Kafka-Transaktionen sind (leider) deutlich weniger mächtig als klassische ACID-Transaktionen. Aber schauen wir uns das genauer an:

  • Atomicity: Entweder werden alle Operationen einer Transaktion erfolgreich ausgeführt, oder alle schlagen fehl. Datenbanken garantieren das seit Jahrzehnten - und auch mit Kafka-Transaktionen bekommen wir das A.

  • Consistency: Eine Datenbank kann zu jedem Zeitpunkt garantieren, dass der Datenbestand unter den gesetzten Constraints konsistent ist. Das können wir mit Kafka so nicht erreichen. Kafka und fast alle verteilten Systeme sind eventual consistent: Konsistenz wird irgendwann erreicht, aber nicht zu einem fest definierten Zeitpunkt.

  • Isolation: In einer Datenbank beeinflussen sich parallel laufende Transaktionen (je nach Isolationslevel) nicht gegenseitig. In Kafka nehmen nur Producer direkt an der Transaktion teil. Daher ist diese Form von Isolation nur eingeschränkt vergleichbar.

  • Durability: Eine Datenbank garantiert, dass die Daten nach einem Commit persistent gespeichert sind. Das bekommen wir in Kafka zum Glück auch hin.

Kurz zusammengefasst: In Kafka bekommen wir ein starkes A, ein eingeschränktes I und ein solides D. Wenn du vollständige ACID-Eigenschaften brauchst, kommst du an relationalen Datenbanken nicht vorbei.

Fazit

Idempotenz ist ein wichtiger Baustein. Für End-to-End-Exactly-Once brauchst du in Read-Process-Write-Szenarien zusätzlich Transaktionen und korrekt konfigurierte Consumer mit read_committed. Wenn du diese Komplexität nicht selbst tragen willst, ist Kafka Streams häufig der pragmatischere Weg.

Über Anatoly Zelenin
Hallo, ich bin Anatoly! Ich liebe es, bei Menschen das Funkeln in den Augen zu wecken. Als Apache Kafka Experte und Buchautor bringe ich seit über einem Jahrzehnt IT zum Leben - mit Leidenschaft statt Langeweile, mit Erlebnissen statt endlosen Folien.

Weiterlesen

article-image
Ausblick auf das Data Mesh: 4 Schritte für den Paradigmenwechsel

Das „Data Mesh“ ist in der IT-Szene ein echter Hype. Warum Unternehmen von einer dezentralisierten Datenarchitektur profitieren und wie Apache Kafka beim Etablieren dieser neuen Struktur hilft, erörtert dieser Artikel.

Mehr lesen
article-image
Warum Apache Kafka?

Jedes Unternehmen, das unzählige Datenströme verarbeitet, kann diese mithilfe von Apache Kafka optimieren. Die erste Lektion dieser vierteiligen Blog-Reihe zeigt dir, warum sich größere Organisationen dieser Software zuwenden sollten.

Mehr lesen