Debezium: Change Data Capture for Apache Kafka

In this post, we'll show you how to reliably import data from various databases into Kafka in near real-time using Debezium. Debezium is a Kafka Connect plugin that connects to the internal log of databases to capture changes and write them to Kafka.

Debezium: Change Data Capture for Apache Kafka

In our Apache Kafka Lunch and Learn sessions, participants from our Kafka courses can learn about additional interesting topics, even after the training has actually finished. This blog post is an edited transcript of the video. The practical exercises are exclusively available to participants. If you’d like to try them out as well, then get in touch with us.

In today’s data-driven world, the ability to move data seamlessly between different systems is invaluable. One of the key elements in modern data architectures is Apache Kafka, a powerful tool for processing streaming data. But how do you integrate data from a variety of databases reliably and in near real-time into Kafka? This is where Debezium comes into play, an open-source data streaming tool that solves exactly this problem.

Debezium enables us to capture changes in our databases and route these changes as an event stream to Kafka. This process, known as Change Data Capture (CDC), is crucial for creating reactive applications that need to respond to data changes in near real time.

Kafka Connect Ecosystem
Figure 1. We recommend our training participants be cautious with the JDBC Source Connector and avoid using the FileStream connectors altogether

Whilst there are a variety of tools available for moving data to Kafka, we’d like to make a particular note about the JDBC Source Connector. In our training sessions, we consistently point out that we should be cautious with this particular connector. The reason for this lies partly in its simple querying methodology, such as SELECT * FROM TABLE WHERE id/timestamp > ?, which presents us with challenges when it comes to effectively capturing UPDATEs and detecting DELETEs.

Change Data Capture

Debezium Architecture
Figure 2. Debezium connects directly to the database’s commit log

Moreover, our database administrators aren’t particularly pleased about this additional load on the database, but there’s a better solution: Nearly every database has an internal log where every change is written. Some databases call it the transaction log, some the commit log, and some the Write Ahead Log.

Databases use this log to provide higher reliability or to replicate data between replicas. Using Debezium, we can connect to this log and write data from the database when it changes, i.e., during an INSERT, an UPDATE, or even a DELETE.

Debezium is merely a Kafka Connect plugin and supports numerous database systems such as PostgreSQL, MySQL, Oracle, Microsoft SQL Server, or even MongoDB. Since Debezium only reads from the transaction log (at least for most databases) and doesn’t fire queries, Debezium’s overhead is usually negligible.

Configuring PostgreSQL

PostgreSQL Configuration
Figure 3. Debezium comes with the cost that we need to set several configurations in the database

The downside is that some preparation is required. Let’s look at this together using PostgreSQL as an example. This is already prepared in the lab. First, we need to increase the log level for the Write Ahead Log (wal_level). The default is replica, which is sufficient to replicate data to read-only replicas. But we need a bit more data for Debezium and increase the level to logical. However, we then need to restart Postgres.

Increasing the wal_level also works with AWS and other providers. Check the Debezium documentation for details.

Next, we need to give our database user the rights to replicate data. For this, we simply add the REPLICATION role to the user.

Of course, Debezium doesn’t circumvent all security mechanisms and provide access to all data in all tables, but rather creates a so-called Publications where the data for the required tables is collected. However, this is handled automatically by Debezium, and we don’t need to do it manually.

Configuring the Connector

Next, we configure the connector. We need to set the database credentials and define which tables should or shouldn’t be connected. Finally, we set the topic prefix, as Debezium creates topics following the pattern prefix.schema.table. We can change this as well. Here’s an example of a simple configuration:

{
  "name": "debezium",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "localhost",
    "database.port": "5432",
    "database.user": "user",
    "database.password": "password",
    "database.dbname" : "user",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "topic.prefix": "dbz",
    "schema.include.list": "public"
  }
}

This connector writes all data from all tables in the public schema of the user database on localhost to Kafka. The Kafka topics are named following this pattern: dbz.public.[table-name].

After we’ve created the connector, Debezium starts replicating the data. The problem is that the Write Ahead Log doesn’t keep data for eternity. Therefore, Debezium performs an initial snapshot of all data in all requested tables on the first start. Only then does the Change Data Capture part begin. Debezium even guarantees that this happens consistently.

Debezium’s Message Format

{
  "schema": {
    
  },
  "payload": {
    "before": null,
    "after": {
      "id": 1004,
      "first_name": "Foo",
      "last_name": "Bar",
      "email": "foo@example.com"
    },
    "source": {
      "version": "2.3.0.Final",
      "name": "dbserver1",
      "server_id": 0,
      "ts_sec": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "snapshot": true,
      "thread": null,
      "db": "inventory",
      "table": "customers"
    },
    "op": "r",
    "ts_ms": 1486500577691
  }
}

But what do the data that Debezium writes look like? Debezium doesn’t just write the change, but a bit more information. The schema (schema) of the data is also stored in each message.

We can use a Schema Registry to avoid sending this data every time. More on this in a later module. The actual payload (payload) consists of some metadata about the data source, the source block, the operation (op) that triggered this message. So was it a Create (c), Update (u), Delete (d), or a Read (r)? Reads only happen during snapshots, and of course the timestamp (ts_ms). You can find more information in the Debezium Tutorial.

The most important fields are before, i.e., the state of the entry before the operation, and after, the state of the entry after the operation. But often we’re not interested in most of this data, so we can also use the Event Flattening SMT to avoid writing all the additional data. Then Debezium will only write the after payload to Kafka.

About Anatoly Zelenin
Hi, I’m Anatoly! I love to spark that twinkle in people’s eyes. As an Apache Kafka expert and book author, I’ve been bringing IT to life for over a decade—with passion instead of boredom, with real experiences instead of endless slides.

Continue reading

article-image
Practical Guide: Stream Data from Kafka to OpenSearch in Kubernetes

In this article, I'll show you how to automatically send data from a Kafka topic to OpenSearch. We'll use Kafka Connect and Strimzi in Kubernetes.

Read more
article-image
Schema Management in Kafka

In this post, you'll learn how explicit schemas help you avoid potential chaos in Kafka and how schema registries support this.

Read more