这是 Apache Pulsar PMC 成员兼 Comitter 翟嘉(Jia Zhai)的客座博文。

Debezium 是一个开源的变更数据捕获(CDC)项目。它基于 Apache Kafka Connect 构建,并支持多种数据库,如 MySQL、MongoDB、PostgreSQL、Oracle 和 SQL Server。 Apache Pulsar 包含一套基于 Pulsar IO 框架的 内置连接器,它与 Apache Kafka Connect 是对应的。

从 2.3.0 版本开始,Pulsar IO 开箱即用地支持 Debezium 源连接器,因此您可以利用 Debezium 将数据库中的变更流式传输到 Apache Pulsar。本教程将引导您完成 Debezium MySQL 连接器与 Pulsar IO 的设置。

教程步骤

本教程与 Debezium 教程 类似,唯一的区别在于事件流的存储从 Kafka 迁移到了 Pulsar。本教程主要包含六个步骤:

  1. 启动一个 MySQL 服务器;

  2. 启动独立的 Pulsar 服务;

  3. 在 Pulsar IO 中启动 Debezium 连接器。Pulsar IO 读取 MySQL 服务器中存在的数据库变更;

  4. 订阅 Pulsar Topic 以监控 MySQL 变更;

  5. 在 MySQL 服务器中进行变更,并验证变更是否立即记录在 Pulsar Topic 中;

  6. 清理。

步骤 1:启动一个 MySQL 服务器

启动一个包含示例数据库的 MySQL 服务器,Debezium 将从中捕获变更。打开一个新的终端,启动一个运行预先配置了名为 `inventory` 的数据库的 MySQL 数据库服务器的容器;

docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

将显示以下信息:

2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: '5.7.25-log'  socket: '/var/run/mysqld/mysqld.sock'  port: 3306  MySQL Community Server (GPL)

步骤 2:启动独立的 Pulsar 服务

在本地以独立模式启动 Pulsar 服务。Pulsar 2.3.0 版本引入了在 Pulsar IO 中运行 Debezium 连接器的支持。下载 2.3.0 版本的 Pulsar 二进制文件2.3.0 版本的 pulsar-io-kafka-connect-adaptor-2.3.0.nar。在 Pulsar 中,所有 Pulsar IO 连接器都被打包成单独的 NAR 文件。

$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
start pulsar standalone]

步骤 3:在 Pulsar IO 中启动 Debezium MySQL 连接器

在另一个终端标签页中,以本地运行模式在 Pulsar IO 中启动 Debezium MySQL 连接器。“debezium-mysql-source-config.yaml” 文件包含所有配置,主要参数列在“configs”节点下。该 .yaml 文件包含“task.class”参数。配置文件还包括 MySQL 相关参数(如服务器、端口、用户、密码)以及用于“history”和“offset”存储的两个 Pulsar Topic 名称。

$ bin/pulsar-admin source localrun  --sourceConfigFile debezium-mysql-source-config.yaml

“debezium-mysql-source-config.yaml” 文件中的内容如下:

tenant: "test"
namespace: "test-namespace"
name: "debezium-kafka-source"
topicName: "kafka-connect-topic"
archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar"

parallelism: 1

configs:
  ## sourceTask
  task.class: "io.debezium.connector.mysql.MySqlConnectorTask"

  ## config for mysql, docker image: debezium/example-mysql:0.8
  database.hostname: "localhost"
  database.port: "3306"
  database.user: "debezium"
  database.password: "dbz"
  database.server.id: "184054"
  database.server.name: "dbserver1"
  database.whitelist: "inventory"

  database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
  database.history.pulsar.topic: "history-topic"
  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
  ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
  key.converter: "org.apache.kafka.connect.json.JsonConverter"
  value.converter: "org.apache.kafka.connect.json.JsonConverter"
  ## PULSAR_SERVICE_URL_CONFIG
  pulsar.service.url: "pulsar://127.0.0.1:6650"
  ## OFFSET_STORAGE_TOPIC_CONFIG
  offset.storage.topic: "offset-topic"

上述 MySQL 服务器会自动创建表。因此,Debezium 连接器从一开始就读取 MySQL binlog 文件中的历史记录。在输出中,您会发现连接器已被触发并已处理 47 条记录。

connector start process records

有关如何管理连接器的更多信息,请参阅 Pulsar IO 文档

Debezium 已捕获和读取的记录会自动发布到 Pulsar Topic。当您启动一个新的终端时,可以通过以下命令找到 Pulsar 中的当前 Topic:

$ bin/pulsar-admin topics list public/default
list Pulsar topics

对于每个已发生变更的表,变更数据都存储在一个单独的 Pulsar Topic 中。除了数据库表相关的 Topic 外,还有两个名为“history-topic”和“offset-topic”的 Topic 用于存储历史和偏移量相关数据。

persistent://public/default/history-topic
persistent://public/default/offset-topic

步骤 4:订阅 Pulsar Topic 以监控 MySQL 变更

以 `persistent://public/default/dbserver1.inventory.products` Topic 为例。使用 CLI 命令消费此 Topic,并在“products”表发生变更时监控这些变更。

 $ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0

输出如下:

…
22:17:41.201 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0

当表变更存储在 `persistent://public/default/dbserver1.inventory.products` Pulsar Topic 中时,您也可以消费 offset topic 来监控偏移量(offset)的变化。

$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0

步骤 5:在 MySQL 服务器中进行变更,并验证变更是否立即记录在 Pulsar Topic 中

启动一个 MySQL CLI Docker 容器,您可以对 MySQL 服务器中的“products”表进行更改。

$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

运行该命令后,将显示 MySQL CLI,您可以在“products”表中更改两个条目的名称。

mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM  products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
mysql updates

在您消费 products topic 的终端中,您会发现添加了两个变更。

table topic stores mysql updates

在您消费 offset topic 的终端中,您会发现添加了两个偏移量。

offset topic get updated

在您本地运行连接器的终端中,您会发现又处理了两个记录。

table topic get more records

步骤 6:清理

使用“Ctrl + C”关闭终端。使用“docker ps”和“docker kill”停止与 MySQL 相关的容器。

mysql> quit

$ docker ps
CONTAINER ID        IMAGE                        COMMAND                  CREATED             STATUS              PORTS                               NAMES
84d66c2f591d        debezium/example-mysql:0.8   "docker-entrypoint.s…"   About an hour ago   Up About an hour    0.0.0.0:3306->3306/tcp, 33060/tcp   mysql

$ docker kill 84d66c2f591d

要删除 Pulsar 数据,请删除 Pulsar 二进制目录中的数据目录。

$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0

$ rm -rf data

结论

Pulsar IO 框架允许运行 Debezium 连接器来进行变更数据捕获,将来自不同数据库的数据变更流式传输到 Apache Pulsar。在本教程中,您学习了如何捕获 MySQL 数据库中的数据变更并将其传播到 Pulsar。我们正在持续改进 Debezium 连接器与 Apache Pulsar 的配合支持,在 Pulsar 2.4.0 版本发布后,使用起来会更加方便。

Jia Zhai

Jia 是 StreamNative 的核心软件工程师,同时也是 Apache BookKeeper 和 Apache Pulsar 的 PMC 成员,并持续为这两个项目做出贡献。他居住在中国北京。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×