这是 Apache Pulsar PMC 成员兼 Comitter 翟嘉(Jia Zhai)的客座博文。
Debezium 是一个开源的变更数据捕获(CDC)项目。它基于 Apache Kafka Connect 构建,并支持多种数据库,如 MySQL、MongoDB、PostgreSQL、Oracle 和 SQL Server。 Apache Pulsar 包含一套基于 Pulsar IO 框架的 内置连接器,它与 Apache Kafka Connect 是对应的。
从 2.3.0 版本开始,Pulsar IO 开箱即用地支持 Debezium 源连接器,因此您可以利用 Debezium 将数据库中的变更流式传输到 Apache Pulsar。本教程将引导您完成 Debezium MySQL 连接器与 Pulsar IO 的设置。
教程步骤
本教程与 Debezium 教程 类似,唯一的区别在于事件流的存储从 Kafka 迁移到了 Pulsar。本教程主要包含六个步骤:
-
启动一个 MySQL 服务器;
-
启动独立的 Pulsar 服务;
-
在 Pulsar IO 中启动 Debezium 连接器。Pulsar IO 读取 MySQL 服务器中存在的数据库变更;
-
订阅 Pulsar Topic 以监控 MySQL 变更;
-
在 MySQL 服务器中进行变更,并验证变更是否立即记录在 Pulsar Topic 中;
-
清理。
步骤 1:启动一个 MySQL 服务器
启动一个包含示例数据库的 MySQL 服务器,Debezium 将从中捕获变更。打开一个新的终端,启动一个运行预先配置了名为 `inventory` 的数据库的 MySQL 数据库服务器的容器;
docker run --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9 将显示以下信息:
2019-03-25T14:12:41.178325Z 0 [Note] Event Scheduler: Loaded 0 events
2019-03-25T14:12:41.178670Z 0 [Note] mysqld: ready for connections.
Version: '5.7.25-log' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server (GPL) 步骤 2:启动独立的 Pulsar 服务
在本地以独立模式启动 Pulsar 服务。Pulsar 2.3.0 版本引入了在 Pulsar IO 中运行 Debezium 连接器的支持。下载 2.3.0 版本的 Pulsar 二进制文件 和 2.3.0 版本的 pulsar-io-kafka-connect-adaptor-2.3.0.nar。在 Pulsar 中,所有 Pulsar IO 连接器都被打包成单独的 NAR 文件。
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/apache-pulsar-2.3.0-bin.tar.gz
$ wget https://archive.apache.org/dist/pulsar/pulsar-2.3.0/connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar
$ tar zxf apache-pulsar-2.3.0-bin.tar.gz
$ cd apache-pulsar-2.3.0
$ mkdir connectors
$ cp ../pulsar-io-kafka-connect-adaptor-2.3.0.nar connectors
$ bin/pulsar standalone
步骤 3:在 Pulsar IO 中启动 Debezium MySQL 连接器
在另一个终端标签页中,以本地运行模式在 Pulsar IO 中启动 Debezium MySQL 连接器。“debezium-mysql-source-config.yaml” 文件包含所有配置,主要参数列在“configs”节点下。该 .yaml 文件包含“task.class”参数。配置文件还包括 MySQL 相关参数(如服务器、端口、用户、密码)以及用于“history”和“offset”存储的两个 Pulsar Topic 名称。
$ bin/pulsar-admin source localrun --sourceConfigFile debezium-mysql-source-config.yaml “debezium-mysql-source-config.yaml” 文件中的内容如下:
tenant: "test"
namespace: "test-namespace"
name: "debezium-kafka-source"
topicName: "kafka-connect-topic"
archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0.nar"
parallelism: 1
configs:
## sourceTask
task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
## config for mysql, docker image: debezium/example-mysql:0.8
database.hostname: "localhost"
database.port: "3306"
database.user: "debezium"
database.password: "dbz"
database.server.id: "184054"
database.server.name: "dbserver1"
database.whitelist: "inventory"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
## PULSAR_SERVICE_URL_CONFIG
pulsar.service.url: "pulsar://127.0.0.1:6650"
## OFFSET_STORAGE_TOPIC_CONFIG
offset.storage.topic: "offset-topic" 上述 MySQL 服务器会自动创建表。因此,Debezium 连接器从一开始就读取 MySQL binlog 文件中的历史记录。在输出中,您会发现连接器已被触发并已处理 47 条记录。
有关如何管理连接器的更多信息,请参阅 Pulsar IO 文档。
Debezium 已捕获和读取的记录会自动发布到 Pulsar Topic。当您启动一个新的终端时,可以通过以下命令找到 Pulsar 中的当前 Topic:
$ bin/pulsar-admin topics list public/default
对于每个已发生变更的表,变更数据都存储在一个单独的 Pulsar Topic 中。除了数据库表相关的 Topic 外,还有两个名为“history-topic”和“offset-topic”的 Topic 用于存储历史和偏移量相关数据。
persistent://public/default/history-topic
persistent://public/default/offset-topic 步骤 4:订阅 Pulsar Topic 以监控 MySQL 变更
以 `persistent://public/default/dbserver1.inventory.products` Topic 为例。使用 CLI 命令消费此 Topic,并在“products”表发生变更时监控这些变更。
$ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0 输出如下:
…
22:17:41.201 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribing to topic on cnx [id: 0xfe0b4feb, L:/127.0.0.1:55585 - R:localhost/127.0.0.1:6650]
22:17:41.223 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [public/default/dbserver1.inventory.products][sub-products] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0 当表变更存储在 `persistent://public/default/dbserver1.inventory.products` Pulsar Topic 中时,您也可以消费 offset topic 来监控偏移量(offset)的变化。
$ bin/pulsar-client consume -s "sub-offset" offset-topic -n 0 步骤 5:在 MySQL 服务器中进行变更,并验证变更是否立即记录在 Pulsar Topic 中
启动一个 MySQL CLI Docker 容器,您可以对 MySQL 服务器中的“products”表进行更改。
$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"' 运行该命令后,将显示 MySQL CLI,您可以在“products”表中更改两个条目的名称。
mysql> use inventory;
mysql> show tables;
mysql> SELECT * FROM products ;
mysql> UPDATE products SET name='1111111111' WHERE id=101;
mysql> UPDATE products SET name='1111111111' WHERE id=107;
在您消费 products topic 的终端中,您会发现添加了两个变更。
在您消费 offset topic 的终端中,您会发现添加了两个偏移量。
在您本地运行连接器的终端中,您会发现又处理了两个记录。
步骤 6:清理
使用“Ctrl + C”关闭终端。使用“docker ps”和“docker kill”停止与 MySQL 相关的容器。
mysql> quit
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
84d66c2f591d debezium/example-mysql:0.8 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
$ docker kill 84d66c2f591d 要删除 Pulsar 数据,请删除 Pulsar 二进制目录中的数据目录。
$ pwd
/Users/jia/ws/releases/apache-pulsar-2.3.0
$ rm -rf data 结论
Pulsar IO 框架允许运行 Debezium 连接器来进行变更数据捕获,将来自不同数据库的数据变更流式传输到 Apache Pulsar。在本教程中,您学习了如何捕获 MySQL 数据库中的数据变更并将其传播到 Pulsar。我们正在持续改进 Debezium 连接器与 Apache Pulsar 的配合支持,在 Pulsar 2.4.0 版本发布后,使用起来会更加方便。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。