Debezium: Change Data Capture für Apache Kafka

In diesem Post zeigen wir euch, wie ihr mithilfe von Debezium Daten aus verschiedenen Datenbanken zuverlässig und nahezu in Echtzeit nach Kafka importieren könnt. Debezium ist ein Kafka Connect-Plugin, das sich an das interne Log jeder Datenbank anschließen kann, um Änderungen zu erfassen und in Kafka zu schreiben.

Debezium: Change Data Capture für Apache Kafka

In unserer Apache Kafka Mittagsakademie können Teilnehmerinnen und Teilnehmer unserer Kafka Kurse weitere interessante Themen kennenlernen, auch wenn die Schulung eigentlich schon vorbei ist. Dieser Blogpost ist ein bearbeitetes Transkript des Videos. Die praktischen Übungen stehen exklusiv den Teilnehmenden zur Verfügung. Wenn ihr euch auch darin ausprobieren möchtet, dann schreibt uns gerne.

In der heutigen datengetriebenen Welt ist die Fähigkeit, Daten nahtlos zwischen verschiedenen Systemen zu bewegen, von unschätzbarem Wert. Eines der Schlüsselelemente in modernen Datenarchitekturen ist Apache Kafka, ein leistungsstarkes Tool für die Verarbeitung von Streaming-Daten. Doch wie integriert man Daten aus einer Vielzahl von Datenbanken zuverlässig und in nahezu Echtzeit in Kafka? Hier kommt Debezium ins Spiel, ein Open-Source-Datenstreaming-Tool, das genau dieses Problem löst.

Debezium ermöglicht es uns, Änderungen in unseren Datenbanken zu erfassen und diese Änderungen als Event-Stream nach Kafka zu leiten. Dieser Prozess, bekannt als Change Data Capture (CDC), ist entscheidend für die Erstellung reaktiver Anwendungen, die in Echtzeit auf Datenänderungen reagieren müssen.

Kafka Connect Ecosystem
Figure 1. Wir empfehlen unseren Schulungsteilnehmern vorsichtig mit dem JDBC Source Connector zu sein und die FileStream-Connectoren gar nicht zu benutzen

Während es eine Vielfalt von Werkzeugen gibt um Daten nach Kafka zu befördern, möchten wir eine besondere Anmerkung zum JDBC Source Connector machen. In unseren Schulungen weisen wir stets darauf hin, dass wir mit diesem speziellen Connector vorsichtig sein sollten. Der Grund dafür liegt unter anderem in seiner simplen Abfrage-Methodik, wie zum Beispiel SELECT * FROM TABLE WHERE id/timestamp > ?, die uns vor Herausforderungen stellt, wenn es darum geht, UPDATEs effektiv zu erfassen und DELETEs zu bemerken.

Change Data Capture

Debezium Architecture
Figure 2. Debezium greift direkt auf das Commit Log der Datenbank zu

Außerdem freuen sich unsere Datenbank-Admins nicht unbedingt über diese Zusatzlast auf der Datenbank, aber es gibt eine bessere Lösung: Nahezu jede Datenbank hat intern ein Log, in das jede Änderung geschrieben wird. Manche Datenbanken nennen es das Transaktions-Log manche das Commit-Log oder manche Write Ahead Log.

Datenbanken nutzen dieses Log, um für höhere Zuverlässigkeit zu sorgen oder auch um Daten zwischen Replicas zu replizieren. Mithilfe von Debezium können wir uns an dieses Log anschließen und Daten aus der Datenbank dann schreiben, wenn sie sich verändern, also bei einem INSERT einem UPDATE und gar einem DELETE.

Dabei ist Debezium lediglich ein Kafka Connect Plugin und unterstützt zahlreiche Datenbanksysteme wie PostgreSQL, MySQL, Oracle, Microsoft SQL Server oder gar MongoDB. Da Debezium nur aus dem Transaktions-Log liest (zumindest bei den meisten Datenbanken) und keine Queries abfeuert, ist der Overhead Debeziums meistens vernachlässigbar.

PostgreSQL konfigurieren

PostgreSQL Configuration
Figure 3. Debezium hat den Preis, dass wir in der Datenbank noch einige Konfigurationen setzen müssen.

Der Nachteil ist aber, dass ein paar Vorbereitungen zu tun sind. Lasst uns das gemeinsam anhand des Beispiels PostgreSQL anschauen. Im Lab ist das schon vorbereitet. Als Erstes müssen wir das Log Level für den Write Ahead Log (wal_level) erhöhen. Der Default ist replica, welches ausreicht, um Daten auf Read-Only Replicas zu replizieren. Aber wir benötigen für Debezium ein paar mehr Daten und erhöhen das Level auf logical. Danach müssen wir aber Postgres neu starten.

Das wal_level zu erhöhen geht auch bei AWS und Co prüft dazu am besten die Debezium Doku.

Als Nächstes müssen wir unserem Datenbanknutzer die Rechte geben, Daten zu replizieren. Dafür fügen wir dem Nutzer einfach die Rolle REPLICATION hinzu.

Natürlich umschifft Debezium dabei nicht alle Sicherheitsmechanismen und gibt Zugriff auf alle Daten in allen Tabellen, sondern es erstellt eine sogenannte Publication, in die die Daten für die benötigten Tabellen gesammelt sind. Das wird aber automatisch von Debezium erledigt, und wir müssen es nicht manuell tun.

Connector konfigurieren

Als Nächstes konfigurieren wir uns den Connector. Wir müssen die Datenbank Zugangsdaten setzen und definieren, welche Tabellen verbunden beziehungsweise nicht verbunden werden sollen. Zu guter Letzt setzen wir das Topic Präfix, denn Debezium erstellt uns Topics nach dem Muster prefix.schema.table. Das können wir aber auch ändern. Hier ist ein Beispiel für eine einfache Konfiguration:

{
  "name": "debezium",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname" : "user",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "topic.prefix": "dbz",
    "schema.include.list": "public"
  }
}

Dieser Connector schreibt alle Daten aus allen Tabellen des public-Schemas der user-Datenbank auf localhost nach Kafka. Dabei werden die Kafka-Topics nach dem folgenden Muster benannt: dbz.public.[table-name].

Nachdem wir also den Connector erstellt haben, fängt Debezium an, die Daten zu replizieren. Das Problem ist aber, dass im Write Ahead Log nicht die Daten für alle Ewigkeit aufbewahrt werden. Deshalb macht Debezium beim ersten Start einen initialen Snapshot aller Daten in allen angefragten Tabellen. Und erst danach fängt der Change Data Capture-Teil an. Debezium garantiert uns sogar, dass dies konsistent passiert.

Debeziums Nachrichtenformat

{
  "schema": {
    
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Foo",
      "last_name": "Bar",
      "email": "foo@example.com"
    },
    "source": {
      "version": "2.3.0.Final",
      "name": "dbserver1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers"
    },
    "op": "r",
    "ts_ms": 1486500577691
  }
}

Wie sehen aber die Daten aus, die Debezium schreibt? Denn Debezium schreibt nicht nur die Veränderung, sondern ein paar mehr Informationen. Das Schema (schema) der Daten wird auch in jeder Nachricht gespeichert.

Wir können eine Schema Registry benutzen, um diese Daten nicht jedes Mal mitzuschicken. Dazu aber etwas mehr in einem späteren Modul. Der eigentliche Payload (payload) besteht aus einigen Metadaten zur Datenquelle, dem source Block, der Operation(op), die diese Nachricht ausgelöst hat. Also war das ein Create (c), Update (u), Delete (d) oder ein Read (r)? Reads passieren aber nur beim Snapshot und natürlich den Timestamp (ts_ms). Mehr Infos findet ihr im Debezium Tutorial.

Die wichtigsten Felder sind aber before, also der Zustand des Eintrags vor der Operation und after, der Zustand des Eintrags nach der Operation. Aber oft interessieren uns die meisten Daten hier gar nicht, dafür können wir auch das Event Flattening SMT benutzen, um die ganzen Zusatzdaten nicht mitzuschreiben. Dann wird Debezium lediglich den after Payload nach Kafka schreiben.

Über Anatoly Zelenin
Hallo, ich bin Anatoly! Ich liebe es, bei Menschen das Funkeln in den Augen zu wecken. Als Apache Kafka Experte und Buchautor bringe ich seit über einem Jahrzehnt IT zum Leben - mit Leidenschaft statt Langeweile, mit Erlebnissen statt endlosen Folien.

Weiterlesen

article-image
Praxisguide: Streame Daten von Kafka nach OpenSearch in Kubernetes

In diesem Artikel zeige ich, wie du Daten aus einem Kafka Topic automatisch nach OpenSearch senden kannst. Wir nutzen dafür Kafka Connect und Strimzi in Kubernetes.

Mehr lesen
article-image
Schema Management in Kafka

In diesem Post erfährst du, wie explizite Schemas, dir dabei helfen, ein mögliches Chaos in Kafka zu vermeiden und wie die Schema Registries dabei unterstützen.

Mehr lesen