在这篇博文中,我们将创建一个简单的流式数据管道,以连续捕获 MySQL 数据库中的更改,并近乎实时地将其复制到 PostgreSQL 数据库。我们将展示如何不编写任何代码,而是通过使用和配置 Kafka Connect、Debezium MySQL 源连接器、Confluent JDBC sink 连接器以及一些单消息转换(SMT)来完成此操作。
通过 Kafka 复制数据的方法本身就非常有用,但当我们可以将近乎实时的数据更改流与其他流、连接器和流处理应用程序结合使用时,它的优势会更加明显。最近的 Confluent 博客系列展示了一个类似的流式数据管道,但使用了不同的连接器和 SMT。Kafka Connect 的优点在于您可以混合搭配连接器来移动不同系统之间的数据。
我们还将演示 Debezium 0.6.0 (Debezium 0.6.0) 发布的一个新功能:用于CDC 事件展平的单消息转换。
拓扑
此场景的一般拓扑结构如下图所示
图 1:一般拓扑
为了稍微简化设置,我们将只使用一个 Kafka Connect 实例,其中包含所有连接器。也就是说,该实例将同时充当事件生产者和事件消费者。
图 2:简化拓扑
配置
我们将使用这个 compose 来快速部署演示。部署由以下 Docker 镜像组成:
-
一个包含更改的、增强的 Kafka Connect / Debezium 镜像。
-
PostgreSQL JDBC 驱动程序,放置在
/kafka/libs目录中。 -
Kafka Connect JDBC 连接器(由 Confluent 开发),放置在
/kafka/connect/kafka-connect-jdbc目录中。
-
-
我们 教程 中使用的预填充的 MySQL 数据库。
-
空的 PostgreSQL 数据库。
Debezium MySQL 连接器专门设计用于捕获数据库更改,并提供有关这些事件的尽可能多的信息,而不仅仅是每一行的最新状态。与此同时,Confluent JDBC Sink 连接器被设计为简单地将每个消息转换为数据库插入/更新操作,这取决于消息的结构。因此,这两个连接器在消息的结构上存在差异,并且它们还使用不同的主题命名约定和表示已删除记录的行为。
在使用并非为协同工作而设计的连接器时,这种结构和行为上的不匹配很常见。但这是我们可以轻松处理的问题,我们在接下来的几节中将对此进行讨论。
事件格式
Debezium 以一种复杂的格式发出事件,其中包含有关捕获的数据更改的所有信息:操作类型、源元数据、连接器处理该事件的时间戳,以及更改发生前后的行的状态。Debezium 将这种结构称为*“信封”*。
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
} 许多其他 Kafka Connect 源连接器无法像 Debezium 那样了解有关更改的如此多的信息,而是使用一种更简单的模型,其中每个消息直接表示行的最终状态。这也是许多接收连接器期望的格式,Confluent JDBC Sink 连接器也不例外。
{
"field1" : "newvalue1",
"field2" : "newvalue2"
} 虽然我们认为 Debezium CDC 连接器提供尽可能多的细节是一件很棒的事情,但我们也使其易于您将 Debezium 的*“信封”*格式转换为许多其他连接器所期望的*“行”*格式。Debezium 以单个消息转换(SMT) 的形式在这两种格式之间提供了桥梁。ExtractNewRecordState 转换会自动提取新的行记录,从而有效地*“展平”*了复杂的记录,使其成为其他连接器可用的简单格式。
您可以在源连接器上使用此 SMT,在消息写入 Kafka*之前*对其进行转换;或者,您可以选择将源连接器更丰富的*“信封”*形式的消息存储在 Kafka 中,然后在接收连接器上使用此 SMT,在从 Kafka 读取消息*之后*并将其传递给接收连接器之前进行转换。这两种选项都可行,具体取决于您是否发现消息的信封形式对其他用途有用。
在我们的示例中,我们使用以下配置属性将 SMT 应用于接收连接器:
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", 删除记录
当 Debezium 连接器检测到行被删除时,它会创建两个事件消息:一个*删除*事件和一个*墓碑*消息。*删除*消息的信封在其 before 字段中包含已删除行的状态,而 after 字段为 null。*墓碑*消息包含与*删除*消息相同的键,但整个消息值为 null,Kafka 的日志压缩会利用此信息来知道它可以删除具有相同键的任何先前消息。许多接收连接器,包括 Confluent 的 JDBC Sink 连接器,都不期望这些消息,并且在看到任何一种消息时都会失败。ExtractNewRecordState SMT 默认会过滤掉*删除*和*墓碑*记录,但如果您正在使用 SMT 并希望保留其中一种或两种消息,则可以更改此行为。
主题命名
最后但并非最不重要的是,主题命名也存在差异。Debezium 对其管理的每个表的表示目标主题使用完全限定的命名。命名遵循模式 <logical-name>.<database-name>.<table-name>。Kafka Connect JDBC 连接器使用简单名称 <table-name>。
在更复杂的场景中,用户可能会部署 Kafka Streams 框架,以在源路由和目标路由之间建立复杂的路由。在我们的示例中,我们将使用标准的 RegexRouter SMT,它会将 Debezium 创建的记录路由到根据 JDBC 连接器模式命名的主题。同样,我们可以在源连接器或接收连接器中使用此 SMT,但在这个示例中,我们将它用在源连接器中,这样我们就可以选择记录将写入的 Kafka 主题的名称。
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3" 示例
尝试一下,让我们试试我们的例子!
首先,我们需要部署所有组件。
export DEBEZIUM_VERSION=0.6
docker-compose up 所有组件启动后,我们将注册将数据写入 PostgreSQL 数据库的 JDBC Sink 连接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/ -d @jdbc-sink.json 使用此注册请求:
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap", (1)
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",(1)
"auto.create": "true", (2)
"insert.mode": "upsert", (3)
"pk.fields": "id", (4)
"pk.mode": "record_value" (4)
}
} 该请求配置了以下选项:
-
将 Debezium 的复杂格式解包为简单格式。
-
自动创建目标表。
-
如果行不存在则插入,否则更新。
-
识别存储在 Kafka 记录值字段中的主键。
然后必须设置源连接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/ -d @source.json 使用此注册请求:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1", (1)
"database.whitelist": "inventory", (2)
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route", (3)
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", (3)
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", (3)
"transforms.route.replacement": "$3" (3)
}
} 该请求配置了以下选项:
-
数据库的逻辑名称。
-
我们要监控的数据库。
-
一个 SMT,它定义了一个匹配主题名称
<logical-name>.<database-name>.<table-name>的正则表达式,并提取其第三部分作为最终主题名称。
让我们检查数据库是否已同步。customers 表的所有行都应在源数据库(MySQL)和目标数据库(Postgres)中找到。
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org 在连接器仍在运行时,我们可以向 MySQL 数据库添加新行,然后检查它是否已复制到 PostgreSQL 数据库。
docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
...
Doe | 1005 | John | john.doe@example.com
(5 rows) 总结
我们设置了一个简单的流式数据管道,以近乎实时的方式将数据从 MySQL 数据库复制到 PostgreSQL 数据库。我们通过使用 Kafka Connect、Debezium MySQL 源连接器、Confluent JDBC Sink 连接器以及一些 SMT 来实现这一点——这一切都不需要编写任何代码。而且因为它是一个流式系统,它将继续捕获对 MySQL 数据库所做的所有更改,并以近乎实时的方式进行复制。
下一步是什么?
在未来的博文中,我们将使用 Elasticsearch 作为事件目标来重现相同的场景。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。