教程
本教程演示了如何使用 Debezium 来监控 MySQL 数据库。当数据库中的数据发生变化时,您将看到由此产生的事件流。
在本教程中,您将启动 Debezium 服务,运行一个带有简单示例数据库的 MySQL 服务器,并使用 Debezium 监控数据库的变化。
-
Docker 已安装并正在运行。
本教程使用 Docker 和 Debezium 容器镜像来运行所需的服务。您应该使用最新版本的 Docker。有关更多信息,请参阅 Docker Engine 安装文档。
|
也可以使用 Podman 来运行此示例。有关更多信息,请参阅 Podman。 |
Debezium 简介
Debezium 是一个分布式平台,它将来自现有数据库的信息转换为事件流,使应用程序能够检测数据库中的行级更改并立即对其做出响应。
Debezium 构建在 Apache Kafka 之上,并提供了一组 Kafka Connect 兼容的连接器。每个连接器都适用于特定的数据库管理系统 (DBMS)。连接器通过检测 DBMS 中的数据更改并将其流式传输到 Kafka 主题来记录数据更改的历史。消费应用程序随后可以从 Kafka 主题中读取由此产生的事件记录。
通过利用 Kafka 可靠的流处理平台,Debezium 使应用程序能够正确且完整地消费数据库中发生的更改。即使您的应用程序意外停止或连接丢失,它也不会错过停机期间发生的事件。应用程序重新启动后,它将从中断的地方恢复读取主题。
接下来的教程将向您展示如何使用 Debezium MySQL 连接器 和一个简单的配置来部署和使用它。有关部署和使用 Debezium 连接器的更多信息,请参阅连接器文档。
启动服务
使用 Debezium 需要两个独立的服务:Kafka 和 Debezium 连接器服务。在本教程中,您将使用 Docker 和 Debezium 容器镜像 设置每个服务的单个实例。
要启动本教程所需的服务,您必须
使用 Docker 运行 Debezium 的注意事项
本教程使用 Docker 和 Debezium 容器镜像 来运行 Kafka、Debezium 和 MySQL 服务。在单独的容器中运行每个服务可以简化设置,让您能够看到 Debezium 的运行情况。
|
在生产环境中,您会运行多个服务实例来提供性能、可靠性、复制和容错能力。通常,您会部署这些服务到 OpenShift 或 Kubernetes 等管理多个 Docker 容器在多台主机和机器上运行的平台上,或者安装在专用硬件上。 |
您应该注意以下使用 Docker 运行 Debezium 的事项:
-
Kafka 的容器是临时的。
Kafka 通常会将数据本地存储在容器内,这需要您将宿主机上的目录挂载为卷。这样,当容器停止时,持久化数据仍然存在。但是,本教程跳过了此设置——当容器停止时,所有持久化数据都会丢失。这样,完成教程后清理工作就变得很简单。
有关存储持久化数据的更多信息,请参阅 容器镜像 的文档。
-
本教程要求您在不同的容器中运行每个服务。
为避免混淆,您将在单独的终端中以前台模式运行每个容器。这样,容器的所有输出都会显示在用于运行它的终端中。
Docker 还允许您以分离模式(使用
-d选项)运行容器,此时容器启动,docker命令会立即返回。但是,分离模式的容器不会在其终端中显示输出。要查看输出,您需要使用docker logs --follow --name <container-name>命令。有关更多信息,请参阅 Docker 文档。
启动 Kafka
|
Debezium 3.3.1.Final 已在多个版本的 Kafka Connect 上进行了测试。请参阅 Debezium 测试矩阵 以确定 Debezium 与 Kafka Connect 的兼容性。 |
-
打开一个新终端,并使用它在一个容器中启动 Kafka。
此命令使用
quay.io/debezium/kafka镜像的 3.3 版本运行一个新容器。$ docker run -it --rm -p 9092:9092 \ --name kafka --hostname kafka \ -e CLUSTER_ID=<YOUR_UNIQUE_CLUSTER_IDENTIFIER> \ -e NODE_ID=1 \ -e NODE_ROLE=combined \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 \ -e KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \ quay.io/debezium/kafka:3.3-it-
容器是交互式的,这意味着终端的标准输入和输出已连接到容器。
--rm-
容器停止时将被删除。
--name kafka-
容器的名称。
--hostname kafka-
标签确保将正确的 Kafka 监听器与容器配对。
-p 9092:9092-
将容器中的
9092端口映射到 Docker 主机上的同一端口,以便容器外的应用程序可以与 Kafka 通信。 -e CLUSTER_ID=<YOUR_UNIQUE_CLUSTER_IDENTIFIER>-
集群中的唯一标识符。集群中的所有节点必须具有相同的集群 ID。
-e NODE_ID=1-
Raft 协议中使用的节点标识符。在集群中必须是唯一的。
-e NODE_ROLE=combined-
集群中的节点角色。可以是控制器、代理或组合。
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093-
指定充当 Kafka 控制器仲裁的投票者节点。如果需要多个值,请使用
nodeId@host:port格式的逗号分隔列表,例如:1@kafka1:9093,2@kafka2:9093,3@kafka3:9093。 -e KAFKA_LISTENERS=PLAINTEXT://kafka:9092,CONTROLLER://kafka:9093-
定义 Kafka 监听器的内部端点和协议。
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092-
指定代理如何向外部客户端通告自己。
如果您使用 Podman,请运行以下命令:
$ podman run -it --rm --name kafka --pod dbz -e HOST_NAME=127.0.0.1 quay.io/debezium/kafka:3.3在本教程中,您将始终从 Docker 容器内部连接到 Kafka。这些容器中的任何一个都可以通过链接到
kafka容器来与之通信。如果您需要从 Docker 容器外部连接到 Kafka,则必须将-e选项设置为通过 Docker 主机通告 Kafka 地址(-e ADVERTISED_HOST_NAME=后跟 Docker 主机的 IP 地址或可解析的主机名)。 -
验证 Kafka 是否已启动。
您应该看到类似以下的输出:
... 2025-07-22T11:43:00,935 - INFO [main:AppInfoParser$AppInfo@125] - Kafka version: 4.0.0 2025-07-22T11:43:00,938 - INFO [main:AppInfoParser$AppInfo@126] - Kafka commitId: 985bc99521dd22bb 2025-07-22T11:43:00,939 - INFO [main:AppInfoParser$AppInfo@127] - Kafka startTimeMs: 1753184580923 2025-07-22T11:43:00,959 - INFO [main:Logging@66] - [KafkaRaftServer nodeId=1] Kafka Server started (1)1 Kafka 节点已成功启动并准备好接受客户端连接。当 Kafka 生成输出时,终端将继续显示其他输出。
启动 MySQL 数据库
此时,您已经启动了 Kafka,但仍然需要一个数据库服务器供 Debezium 捕获更改。在此过程中,您将启动一个预先配置了 inventory 数据库的 MySQL 服务器。
-
打开一个新终端,并使用它来启动一个运行 MySQL 数据库服务器的新容器,该服务器已预先配置了一个
inventory数据库。此命令使用
quay.io/debezium/example-mysql镜像的 3.3 版本运行一个新容器,该镜像基于 mysql:8.2 镜像。它还定义并填充了一个示例inventory数据库。$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:3.3-it-
容器是交互式的,这意味着终端的标准输入和输出已连接到容器。
--rm-
容器停止时将被删除。
--name mysql-
容器的名称。
-p 3306:3306-
将容器中的
3306端口(默认 MySQL 端口)映射到 Docker 主机上的同一端口,以便容器外的应用程序可以连接到数据库服务器。 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw-
创建了一个具有 Debezium MySQL 连接器所需最低权限的用户和密码。
|
如果您使用 Podman,请运行以下命令:
|
-
验证 MySQL 服务器是否已启动。
MySQL 服务器会启动和停止几次,因为配置正在被修改。您应该看到类似以下的输出:
... [System] [MY-010931] [Server] /usr/sbin/mysqld: ready for connections. Version: '8.0.27' socket: '/var/run/mysqld/mysqld.sock' port: 3306 MySQL Community Server - GPL. [System] [MY-011323] [Server] X Plugin ready for connections. Bind-address: '::' port: 33060, socket: /var/run/mysqld/mysqlx.sock
启动 MySQL 命令行客户端
启动 MySQL 后,您会启动一个 MySQL 命令行客户端,以便访问示例 inventory 数据库。
-
打开一个新终端,并使用它在一个容器中启动 MySQL 命令行客户端。
此命令使用 mysql:8.2 镜像运行一个新容器,并定义了一个 shell 命令来运行具有正确选项的 MySQL 命令行客户端。
$ docker run -it --rm --name mysqlterm --link mysql mysql:8.2 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -umysqluser -p"mysqlpw"'-it-
容器是交互式的,这意味着终端的标准输入和输出已连接到容器。
--rm-
容器停止时将被删除。
--name mysqlterm-
容器的名称。
--link mysql-
将容器链接到
mysql容器。
|
如果您使用 Podman,请运行以下命令:
|
-
验证 MySQL 命令行客户端是否已启动。
您应该看到类似以下的输出:
mysql: [Warning] Using a password on the command line interface can be insecure. Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 9 Server version: 8.0.27 MySQL Community Server - GPL Copyright (c) 2000, 2021, Oracle and/or its affiliates. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> -
在
mysql>命令提示符下,切换到 inventory 数据库。mysql> use inventory; -
列出数据库中的表。
mysql> show tables; +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+ 6 rows in set (0.00 sec) -
使用 MySQL 命令行客户端探索数据库并查看数据库中预先加载的数据。
For example (例如:)
mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)
启动 Kafka Connect
在启动 MySQL 并使用 MySQL 命令行客户端连接到 inventory 数据库后,您将启动 Kafka Connect 服务。此服务公开一个 REST API 来管理 Debezium MySQL 连接器。
-
打开一个新终端,并使用它在一个容器中启动 Kafka Connect 服务。
此命令使用
quay.io/debezium/connect镜像的 3.3 版本运行一个新容器。$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.3-it-
容器是交互式的,这意味着终端的标准输入和输出已连接到容器。
--rm-
容器停止时将被删除。
--name connect-
容器的名称。
-p 8083:8083-
将容器中的
8083端口映射到 Docker 主机上的同一端口。这使得容器外的应用程序可以使用 Kafka Connect 的 REST API 来设置和管理新的容器实例。 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses-
设置 Debezium 镜像所需的环保变量。
--link kafka:kafka --link mysql:mysql-
将容器链接到正在运行 Kafka 和 MySQL 服务器的容器。
|
如果您使用 Podman,请运行以下命令:
|
|
如果您提供了 如果这是个问题,请设置环保变量 |
-
验证 Kafka Connect 是否已启动并准备好接受连接。
您应该看到类似以下的输出:
... 2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser] ... 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder] -
使用 Kafka Connect REST API 检查 Kafka Connect 服务的状态。
Kafka Connect 公开了一个 REST API 来管理 Debezium 连接器。要与 Kafka Connect 服务通信,您可以使用
curl命令向 Docker 主机的 8083 端口(在启动 Kafka Connect 时将其映射到connect容器的 8083 端口)发送 API 请求。这些命令使用
localhost。如果您使用的是非原生 Docker 平台(如 Docker Toolbox),请将localhost替换为 Docker 主机的 IP 地址。-
打开一个新终端并检查 Kafka Connect 服务的状态。
$ curl -H "Accept:application/json" localhost:8083/ {"version":"4.1.0","commit":"cb8625948210849f"} (1)1 响应显示 Kafka Connect 版本 4.1.0 正在运行。 -
检查已注册到 Kafka Connect 的连接器列表。
$ curl -H "Accept:application/json" localhost:8083/connectors/ [] (1)1 目前没有连接器注册到 Kafka Connect。
-
部署 MySQL 连接器
启动 Debezium 和 MySQL 服务后,您就可以部署 Debezium MySQL 连接器,以便它可以开始监控示例 MySQL 数据库 (inventory)。
此时,您正在运行 Debezium 服务、一个带有示例 inventory 数据库的 MySQL 服务器,以及连接到该数据库的 MySQL 命令行客户端。要部署 MySQL 连接器,您必须:
-
注册连接器后,它将开始监控数据库服务器的
binlog,并为每一行更改生成更改事件。 -
在连接器启动时查看 Kafka Connect 日志输出来帮助您更好地理解它在开始监控
binlog之前必须完成的每个任务。
注册一个用于监控 inventory 数据库的连接器
通过注册 Debezium MySQL 连接器,连接器将开始监控 MySQL 数据库服务器的 binlog。binlog 记录了数据库的所有事务(例如行更改和模式更改)。当数据库中的某一行发生更改时,Debezium 会生成一个更改事件。
|
在生产环境中,您通常会使用 Kafka 工具手动创建必要的主题,包括指定副本数量,或者使用 Kafka Connect 机制来定制自动创建主题的设置。但是,在本教程中,Kafka 配置为自动创建主题,只有一个副本。 |
-
审查您将要注册的 Debezium MySQL 连接器的配置。
在注册连接器之前,您应该熟悉它的配置。在下一步中,您将注册以下连接器:
{ "name": "inventory-connector", (1) "config": { (2) "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", (3) "database.hostname": "mysql", (4) "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", (5) "topic.prefix": "dbserver1", (5) "database.include.list": "inventory", (6) "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", (7) "schema.history.internal.kafka.topic": "schema-changes.inventory" (7) } }1 连接器的名称。 2 连接器的配置。 3 一次只应有一个任务运行。因为 MySQL 连接器读取 MySQL 服务器的 binlog,使用单个连接器任务可确保正确的顺序和事件处理。Kafka Connect 服务使用连接器来启动一个或多个执行工作的任务,并自动将正在运行的任务分布到 Kafka Connect 服务集群中。如果任何服务停止或崩溃,这些任务将被重新分配给正在运行的服务。4 数据库主机,即运行 MySQL 服务器的 Docker 容器的名称 ( mysql)。Docker 会操纵容器内的网络堆栈,以便通过容器名称作为主机名,使用/etc/hosts解析每个链接的容器。如果 MySQL 运行在普通网络上,您将为此值指定 IP 地址或可解析的主机名。5 一个唯一的主题前缀。此名称将用作所有 Kafka 主题的前缀。 6 仅会检测 inventory数据库中的更改。7 连接器将使用此代理(与您正在发送事件的代理相同)和主题名称在 Kafka 中存储数据库模式的历史记录。重新启动后,连接器将恢复连接器开始读取的 binlog时间点存在的数据库模式。有关更多信息,请参阅 MySQL 连接器配置属性。
|
出于安全原因,您不应将密码或其他机密明文放入连接器配置中。相反,任何机密都应通过 KIP-297(“为 Connect 配置外部化机密”)中定义的机制进行外部化。 |
-
打开一个新终端,并使用
curl命令注册 Debezium MySQL 连接器。此命令使用 Kafka Connect 服务的 API,向
/connectors资源提交一个POST请求,并附带一个描述新连接器(名为inventory-connector)的 JSON 文档。此命令使用
localhost连接到 Docker 主机。如果您使用的是非原生 Docker 平台,请将localhost替换为 Docker 主机的 IP 地址。$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'Windows 用户可能需要转义双引号。例如:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ \"name\": \"inventory-connector\", \"config\": { \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\", \"tasks.max\": \"1\", \"database.hostname\": \"mysql\", \"database.port\": \"3306\", \"database.user\": \"debezium\", \"database.password\": \"dbz\", \"database.server.id\": \"184054\", \"topic.prefix\": \"dbserver1\", \"database.include.list\": \"inventory\", \"schema.history.internal.kafka.bootstrap.servers\": \"kafka:9092\", \"schema.history.internal.kafka.topic\": \"schemahistory.inventory\" } }'否则,您可能会看到类似以下的错误:
{"error_code":500,"message":"Unexpected character ('n' (code 110)): was expecting double-quote to start field name\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 4]"}
|
如果您使用 Podman,请运行以下命令:
|
-
验证
inventory-connector是否包含在连接器列表中。$ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector"] -
审查连接器的任务。
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector您应该看到类似以下的响应(为便于阅读而格式化):
HTTP/1.1 200 OK Date: Thu, 06 Feb 2020 22:12:03 GMT Content-Type: application/json Content-Length: 531 Server: Jetty(9.4.20.v20190813) { "name": "inventory-connector", ... "tasks": [ { "connector": "inventory-connector", (1) "task": 0 } ] }1 该连接器正在运行一个任务 (任务 0) 来执行其工作。连接器仅支持单个任务,因为 MySQL 在一个顺序的binlog中记录所有活动。这意味着连接器只需要一个读取器来获得所有事件的一致、有序的视图。
查看连接器启动
注册连接器时,它会在 Kafka Connect 容器中生成大量日志输出。通过审查这些输出,您可以更好地了解连接器从创建到开始读取 MySQL 服务器 binlog 的整个过程。
注册 inventory-connector 连接器后,您可以审查 Kafka Connect 容器 (connect) 中的日志输出来跟踪连接器的状态。
前几行显示了连接器 (inventory-connector) 的创建和启动。
...
2021-11-30 01:38:44,223 INFO || [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks [] [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Rebalance started [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] (Re-)joining group [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,227 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,230 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,231 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...
再往下,您应该看到连接器类似以下的输出:
...
2021-11-30 01:38:44,406 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,406 INFO || Kafka commitId: 8cb0a5e9d3441962 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,407 INFO || Kafka startTimeMs: 1638236324406 [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,437 INFO || Database schema history topic '(name=schemahistory.inventory, numPartitions=1, replicationFactor=1, replicasAssignments=null, configs={cleanup.policy=delete, retention.ms=9223372036854775807, retention.bytes=-1})' created [io.debezium.storage.kafka.history.KafkaSchemaHistory]
2021-11-30 01:38:44,497 INFO || App info kafka.admin.client for dbserver1-schemahistory unregistered [org.apache.kafka.common.utils.AppInfoParser]
2021-11-30 01:38:44,499 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2021-11-30 01:38:44,499 INFO || Reconnecting after finishing schema recovery [io.debezium.connector.mysql.MySqlConnectorTask]
2021-11-30 01:38:44,524 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,525 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads]
2021-11-30 01:38:44,526 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,529 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot No previous offset has been found [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot According to the connector configuration both schema and data will be snapshotted [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
...
Debezium 日志输出使用映射诊断上下文 (MDC) 在日志输出中提供特定于线程的信息,并使多线程 Kafka Connect 服务中的情况更容易理解。这包括连接器类型(上面日志消息中的 MySQL)、连接器的逻辑名称(上面的 dbserver1)以及连接器的活动(task、snapshot 和 binlog)。
在上面的日志输出中,前几行涉及连接器的 task 活动,并报告了一些簿记信息(在本例中,表示连接器在没有先前偏移量的情况下启动)。接下来的三行涉及连接器的 snapshot 活动,并报告正在使用 debezium MySQL 用户以及与该用户关联的 MySQL 权限启动快照。
|
如果连接器无法连接,或者它看不到任何表或 |
接下来,连接器报告了构成快照操作的步骤:
...
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,548 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,551 INFO MySQL|dbserver1|snapshot Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,552 INFO MySQL|dbserver1|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,557 INFO MySQL|dbserver1|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,560 INFO MySQL|dbserver1|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '156' and gtid '' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,058 INFO MySQL|dbserver1|snapshot Reading structure of database 'inventory' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,187 INFO MySQL|dbserver1|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,273 INFO MySQL|dbserver1|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Writes to MySQL tables prevented for a total of 00:00:00.717 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Snapshotting contents of 6 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.addresses' (1 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,276 INFO MySQL|dbserver1|snapshot For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,295 INFO MySQL|dbserver1|snapshot Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.customers' (2 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.geom' (3 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,305 INFO MySQL|dbserver1|snapshot For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.orders' (4 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products' (5 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,343 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products_on_hand' (6 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,353 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,355 INFO MySQL|dbserver1|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2021-11-30 01:38:45,356 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
这些步骤中的每一步都报告了连接器为执行一致性快照正在执行的操作。例如,第 6 步涉及反向工程正在捕获的表的 DDL create 语句,并在获取全局写锁的 1 秒后释放它,第 7 步读取每个表中的所有行,并报告找到的时间和行数。在本例中,连接器在不到 1 秒的时间内完成了其一致性快照。
|
您的数据库的快照过程会更长,但连接器输出了足够的日志消息,以便您可以跟踪它正在处理的内容,即使表包含大量行。并且,尽管在快照过程开始时使用了独占写锁,但对于大型数据库来说,它应该不会持续很长时间。这是因为锁在任何数据被复制之前就被释放了。有关更多信息,请参阅 MySQL 连接器文档。 |
接下来,Kafka Connect 会报告一些“错误”。但是,您可以安全地忽略这些警告:这些消息只是表示创建了新的 Kafka 主题,并且 Kafka 为每个主题分配了一个新领导者。
...
2021-11-30 01:38:45,555 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 3 : {dbserver1=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,691 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 9 : {dbserver1.inventory.addresses=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,813 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 13 : {dbserver1.inventory.customers=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:45,927 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 18 : {dbserver1.inventory.geom=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,043 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 22 : {dbserver1.inventory.orders=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,153 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 26 : {dbserver1.inventory.products=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
2021-11-30 01:38:46,269 WARN || [Producer clientId=connector-producer-inventory-connector-0] Error while fetching metadata with correlation id 31 : {dbserver1.inventory.products_on_hand=LEADER_NOT_AVAILABLE} [org.apache.kafka.clients.NetworkClient]
...
最后,日志输出显示连接器已从快照模式转换为持续读取 MySQL 服务器的 binlog。
...
2021-11-30 01:38:45,362 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|streaming Waiting for keepalive thread to start [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,393 INFO MySQL|dbserver1|binlog Creating thread debezium-mysqlconnector-dbserver1-binlog-client [io.debezium.util.Threads]
...
查看变更事件
部署 Debezium MySQL 连接器后,它开始监控 inventory 数据库以获取数据更改事件。
当您查看连接器启动时,您看到事件被写入了以下主题,并且带有 dbserver1 前缀(连接器的名称):
dbserver1-
模式更改主题,所有 DDL 语句都写入此主题。
dbserver1.inventory.products-
捕获
inventory数据库中products表的更改事件。 dbserver1.inventory.products_on_hand-
捕获
inventory数据库中products_on_hand表的更改事件。 dbserver1.inventory.customers-
捕获
inventory数据库中customers表的更改事件。 dbserver1.inventory.orders-
捕获
inventory数据库中orders表的更改事件。
在本教程中,您将探索 dbserver1.inventory.customers 主题。在这个主题中,您将看到不同类型的更改事件,以了解连接器是如何捕获它们的。
查看 创建 事件
通过查看 dbserver1.inventory.customers 主题,您可以了解 MySQL 连接器如何捕获 inventory 数据库中的创建事件。在这种情况下,创建事件捕获了新客户被添加到数据库。
-
打开一个新终端,并使用它来启动
watch-topic工具,从主题的开头监视dbserver1.inventory.customers主题。watch-topic工具非常简单,功能有限。它不打算被应用程序用来消费事件。在这种情况下,您将改用 Kafka 消费者和适用的消费者库,它们提供完整的功能和灵活性。此命令使用
debezium/kafka镜像的 3.3 版本在一个新容器中运行watch-topic工具。$ docker run -it --rm --name watcher --link kafka:kafka quay.io/debezium/kafka:3.3 watch-topic -a -k dbserver1.inventory.customers-a-
监视自主题创建以来的所有事件。没有此选项,
watch-topic将仅显示您开始监视后记录的事件。 -k-
指定输出应包含事件的键。在这种情况下,这包含了行的主键。
如果您使用 Podman,请运行以下命令:
$ podman run -it --rm --name watcher --pod dbz quay.io/debezium/kafka:3.3 watch-topic -a -k dbserver1.inventory.customerswatch-topic工具返回customers表的事件记录。共有四个事件,每个表行一个事件。每个事件都以 JSON 格式化,因为这是您配置 Kafka Connect 服务的方式。每个事件有两个 JSON 文档:一个用于键,一个用于值。您应该看到类似以下的输出:
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092 Using KAFKA_BROKER=172.17.0.3:9092 Contents of topic dbserver1.inventory.customers: {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} ...此工具将继续监视主题,因此只要该工具正在运行,任何新事件都会自动出现。
-
对于最后一个事件,请查看键的详细信息。
以下是最后一个事件键的详细信息(为便于阅读而格式化):
{ "schema":{ "type":"struct", "fields":[ { "type":"int32", "optional":false, "field":"id" } ], "optional":false, "name":"dbserver1.inventory.customers.Key" }, "payload":{ "id":1004 } }该事件有两个部分:一个
schema和一个payload。schema包含一个描述 payload 中内容的 Kafka Connect 模式。在本例中,payload 是一个名为dbserver1.inventory.customers.Key的struct,它是不可选的,并且有一个必需字段(类型为int32的id)。payload有一个名为id的字段,值为1004。通过查看事件的键,您可以看到此事件适用于
inventory.customers表中id主键列值为1004的那一行。 -
查看同一事件值的详细信息。
事件的值显示该行已被创建,并描述了其内容(在本例中,是插入行的
id、first_name、last_name和email)。以下是最后一个事件值的详细信息(为便于阅读而格式化):
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "before" }, { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" }, { "type": "string", "optional": false, "field": "last_name" }, { "type": "string", "optional": false, "field": "email" } ], "optional": true, "name": "dbserver1.inventory.customers.Value", "field": "after" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "version" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "server_id" }, { "type": "int64", "optional": false, "field": "ts_sec" }, { "type": "string", "optional": true, "field": "gtid" }, { "type": "string", "optional": false, "field": "file" }, { "type": "int64", "optional": false, "field": "pos" }, { "type": "int32", "optional": false, "field": "row" }, { "type": "boolean", "optional": true, "field": "snapshot" }, { "type": "int64", "optional": true, "field": "thread" }, { "type": "string", "optional": true, "field": "db" }, { "type": "string", "optional": true, "field": "table" } ], "optional": false, "name": "io.debezium.connector.mysql.Source", "field": "source" }, { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" }, { "type": "int64", "optional": true, "field": "ts_us" }, { "type": "int64", "optional": true, "field": "ts_ns" } ], "optional": false, "name": "dbserver1.inventory.customers.Envelope", "version": 1 }, "payload": { "before": null, "after": { "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { "version": "3.3.1.Final", "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.000003", "pos": 154, "row": 0, "snapshot": true, "thread": null, "db": "inventory", "table": "customers" }, "op": "r", "ts_ms": 1486500577691, "ts_us": 1486500577691547, "ts_ns": 1486500577691547930 } }事件的此部分内容要长得多,但与事件的键一样,它也有一个
schema和一个payload。schema包含一个名为dbserver1.inventory.customers.Envelope(版本 1)的 Kafka Connect 模式,它可以包含五个字段:op (操作)-
一个必需的字段,包含描述操作类型的字符串值。MySQL 连接器的值为
c(创建或插入)、u(更新)、d(删除)和r(读取,在快照的情况下)。 before-
一个可选字段,如果存在,包含事件发生之前行的状态。结构将由
dbserver1.inventory.customers.ValueKafka Connect 模式描述,该模式是dbserver1连接器用于inventory.customers表中所有行的模式。 after-
一个可选字段,如果存在,包含事件发生之后行的状态。结构由与
before中使用的dbserver1.inventory.customers.ValueKafka Connect 模式相同。 source (源)-
一个必需的字段,包含一个结构,描述事件的源元数据。对于 MySQL,其中包含几个字段:连接器名称、记录事件的
binlog文件的名称、事件在该binlog文件中的位置、事件中的行(如果有多个)、受影响数据库和表的名称、进行更改的 MySQL 线程 ID、此事件是否是快照的一部分,以及(如果可用)MySQL 服务器 ID 和以秒为单位的时间戳。 ts_ms-
一个可选字段,如果存在,包含连接器处理事件的时间(使用运行 Kafka Connect 任务的 JVM 中的系统时钟)。
事件的 JSON 表示比它们描述的行要长得多。这是因为 Kafka Connect 随每个事件键和值一起发送描述
payload的模式。随着时间的推移,此结构可能会发生变化。但是,在事件本身中包含键和值的模式使得消费应用程序更容易理解消息,尤其是随着它们随时间演变。Debezium MySQL 连接器根据数据库表的结构构建这些模式。如果您使用 DDL 语句来更改 MySQL 数据库中的表定义,连接器会读取这些 DDL 语句并更新其 Kafka Connect 模式。这是每个事件的结构与事件发生时其源表的结构完全匹配的唯一方法。但是,包含单个表所有事件的 Kafka 主题可能包含与表定义每个状态相对应的事件。
JSON 转换器在每条消息中都包含键和值模式,因此它确实会产生非常冗长的事件。或者,您可以使用Apache Avro 作为序列化格式,这会产生更小的事件消息。这是因为它会将每个 Kafka Connect 模式转换为 Avro 模式,并将 Avro 模式存储在单独的 Schema Registry 服务中。因此,当 Avro 转换器序列化事件消息时,它只放置模式的唯一标识符以及值的 Avro 编码二进制表示。结果,通过线路传输并在 Kafka 中存储的序列化消息比您在此处看到的小得多。实际上,Avro 转换器能够使用 Avro 模式演进技术来维护 Schema Registry 中每个模式的历史记录。
-
将事件的键和值模式与
inventory数据库的状态进行比较。在运行 MySQL 命令行客户端的终端中,运行以下语句:mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec)这表明您审查的事件记录与数据库中的记录匹配。
更新数据库并查看 更新 事件
现在您已经看到了 Debezium MySQL 连接器如何捕获 inventory 数据库中的创建事件,现在您将更改其中一条记录,并查看连接器如何捕获它。
通过完成此过程,您将学习如何查找有关数据库提交中发生的变化的详细信息,以及您如何比较更改事件以确定更改相对于其他更改发生的时间。
-
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004; Query OK, 1 row affected (0.05 sec) Rows matched: 1 Changed: 1 Warnings: 0 -
查看更新后的
customers表。mysql> SELECT * FROM customers; +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne Marie | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+ 4 rows in set (0.00 sec) -
切换到运行
watch-topic的终端,查看新的第五个事件。通过更改
customers表中的一条记录,Debezium MySQL 连接器生成了一个新事件。您应该看到两个新的 JSON 文档:一个用于事件的键,一个用于新事件的值。以下是更新事件键的详细信息(为便于阅读而格式化):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key", "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }这个键与之前事件的键相同。
这是新事件的值。
schema部分没有变化,因此只显示payload部分(为便于阅读而格式化):{ "schema": {...}, "payload": { "before": { (1) "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": { (2) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "source": { (3) "name": "3.3.1.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501486, "gtid": null, "file": "mysql-bin.000003", "pos": 364, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "u", (4) "ts_ms": 1486501486308, (5) "ts_us": 1486501486308910, (5) "ts_ns": 1486501486308910814 (5) } }表 1. update事件值 payload 中字段的描述Item 描述 1
before字段显示了数据库提交之前该行存在的值。原始first_name值为Anne。2
after字段显示了更改事件之后该行的状态。first_name值现在是Anne Marie。3
source字段结构与之前有很多相同的值,只是ts_sec和pos字段已更改(在其他情况下,file可能也会更改)。4
op字段值现在是u,表示该行由于更新而更改。5
ts_ms、ts_us、ts_ns字段显示一个时间戳,表示 Debezium 处理此事件的时间。通过查看
payload部分,您可以了解到关于更新事件的几个重要信息:-
通过比较
before和after结构,您可以确定提交后受影响行中实际发生的变化。 -
通过审查
source结构,您可以找到有关 MySQL 对更改的记录的信息(提供可追溯性)。 -
通过将事件的
payload部分与其他主题中的事件(或不同主题中的事件)进行比较,您可以确定该事件是发生在另一个事件之前、之后,还是与另一个事件在同一个 MySQL 提交中。
-
删除数据库中的记录并查看 删除 事件
现在您已经看到了 Debezium MySQL 连接器如何捕获 inventory 数据库中的创建和更新事件,现在您将删除其中一条记录,并查看连接器如何捕获它。
通过完成此过程,您将学习如何查找有关删除事件的详细信息,以及 Kafka 如何使用日志压缩来减少删除事件的数量,同时仍然使消费者能够获取所有事件。
-
在运行 MySQL 命令行客户端的终端中,运行以下语句:
mysql> DELETE FROM customers WHERE id=1004; Query OK, 1 row affected (0.00 sec)如果上述命令因外键约束冲突而失败,那么您必须使用以下语句从addresses表中删除客户地址的引用:
mysql> DELETE FROM addresses WHERE customer_id=1004; -
切换到运行
watch-topic的终端,查看两个新事件。通过删除
customers表中的一行,Debezium MySQL 连接器生成了两个新事件。 -
查看第一个新事件的键和值。
以下是第一个新事件键的详细信息(为便于阅读而格式化):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key", "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }此键与您之前查看的两个事件的键相同。
这是第一个新事件的值(为便于阅读而格式化):
{ "schema": {...}, "payload": { "before": { (1) "id": 1004, "first_name": "Anne Marie", "last_name": "Kretchmar", "email": "annek@noanswer.org" }, "after": null, (2) "source": { (3) "name": "3.3.1.Final", "name": "dbserver1", "server_id": 223344, "ts_sec": 1486501558, "gtid": null, "file": "mysql-bin.000003", "pos": 725, "row": 0, "snapshot": null, "thread": 3, "db": "inventory", "table": "customers" }, "op": "d", (4) "ts_ms": 1486501558315, (5) "ts_us": 1486501558315901, (5) "ts_ns": 1486501558315901687 (5) } }表 2. 事件值字段的描述 Item 描述 1
before字段现在包含数据库提交时已删除行的状态。2
after字段为null,因为该行已不存在。3
source字段结构与之前有很多相同的值,只是ts_sec和pos字段的值已更改。在某些情况下,file值也可能更改。4
op字段值现在是d,表示记录已被删除。5
ts_ms、ts_us和ts_ns字段显示时间戳,表示 Debezium 处理事件的时间。因此,此事件为消费者提供了处理行删除所需的信息。旧值也已提供,因为某些消费者可能需要它们来正确处理删除。
-
查看第二个新事件的键和值。
以下是第二个新事件的键(为便于阅读而格式化):
{ "schema": { "type": "struct", "name": "dbserver1.inventory.customers.Key" "optional": false, "fields": [ { "field": "id", "type": "int32", "optional": false } ] }, "payload": { "id": 1004 } }再次,此键与您之前查看的前三个事件的键完全相同。
以下是同一事件的值(为便于阅读而格式化):
{ "schema": null, "payload": null }如果 Kafka 设置为日志压缩,它将从主题中删除较早的消息,只要主题中有至少一条具有相同键的后续消息。最后一个事件称为墓碑事件,因为它有一个键和一个空值。这意味着 Kafka 将删除所有先前具有相同键的消息。即使先前消息被删除,墓碑事件也意味着消费者仍然可以从头开始读取主题而不会错过任何事件。
重启 Kafka Connect 服务
现在您已经看到了 Debezium MySQL 连接器如何捕获创建、更新和删除事件,现在您将看到即使在它未运行时它也能捕获更改事件。
Kafka Connect 服务自动管理其已注册连接器的任务。因此,如果它离线,当它重新启动时,它将启动任何未运行的任务。这意味着即使 Debezium 未运行,它仍然可以报告数据库中的更改。
在此过程中,您将停止 Kafka Connect,更改数据库中的一些数据,然后重新启动 Kafka Connect 以查看更改事件。
-
打开一个新终端并使用它来停止运行 Kafka Connect 服务的
connect容器。$ docker stop connectconnect容器已停止,Kafka Connect 服务已正常关闭。因为您使用了
--rm选项启动容器,所以在容器停止后 Docker 会将其删除。 -
当服务关闭时,切换到 MySQL 命令行客户端的终端,并添加几条记录:
mysql> INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com"); mysql> INSERT INTO customers VALUES (default, "Kenneth", "Anderson", "kander@acme.com");记录已添加到数据库。但是,因为 Kafka Connect 未运行,
watch-topic未记录任何更新。在生产系统中,您将拥有足够的代理来处理生产者和消费者,并为每个主题维护最少数量的同步副本。因此,如果足够的代理发生故障,导致没有足够的 ISR,Kafka 将变得不可用。在这种情况下,生产者(如 Debezium 连接器)和消费者将等待 Kafka 集群或网络恢复。这意味着,暂时,您的消费者可能看不到任何更改事件,因为数据库中的数据正在发生变化。这是因为没有生成更改事件。一旦 Kafka 集群重新启动或网络恢复,Debezium 将恢复生成更改事件,您的消费者将从中断的地方恢复消费事件。
-
打开一个新终端,并使用它在一个容器中重启 Kafka Connect 服务。
此命令使用与最初启动时相同的选项来启动 Kafka Connect。
$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:3.3Kafka Connect 服务启动,连接到 Kafka,读取前一个服务的配置,并启动已注册的连接器,这些连接器将从上次中断的地方恢复。
以下是此已重启服务的一些最后几行:
... 2021-11-30 01:49:07,938 INFO || Get all known binlogs from MySQL [io.debezium.connector.mysql.MySqlConnection] 2021-11-30 01:49:07,941 INFO || MySQL has the binlog file 'mysql-bin.000003' required by the connector [io.debezium.connector.mysql.MySqlConnectorTask] 2021-11-30 01:49:07,967 INFO || Requested thread factory for connector MySqlConnector, id = dbserver1 named = change-event-source-coordinator [io.debezium.util.Threads] 2021-11-30 01:49:07,968 INFO || Creating thread debezium-mysqlconnector-dbserver1-change-event-source-coordinator [io.debezium.util.Threads] 2021-11-30 01:49:07,968 INFO || WorkerSourceTask{id=inventory-connector-0} Source task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask] 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Metrics registered [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,971 INFO MySQL|dbserver1|snapshot Context created [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,976 INFO MySQL|dbserver1|snapshot A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted. [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource] 2021-11-30 01:49:07,977 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=SKIPPED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=null, threadId=-1, currentQuery=null, tableIds=[], databaseName=null], snapshotCompleted=false, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator] 2021-11-30 01:49:07,981 INFO MySQL|dbserver1|streaming Requested thread factory for connector MySqlConnector, id = dbserver1 named = binlog-client [io.debezium.util.Threads] 2021-11-30 01:49:07,983 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] ...这些行显示服务找到了在关闭之前由最后一个任务先前记录的偏移量,连接到 MySQL 数据库,从该位置开始读取
binlog,并生成自那时以来 MySQL 数据库中的任何更改的事件。 -
切换到运行
watch-topic的终端,查看您在 Kafka Connect 离线时创建的两条新记录的事件。{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Sarah","last_name":"Thompson","email":"kitt@acme.com"},"source":{"version":"3.3.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635153,"gtid":null,"file":"mysql-bin.000003","pos":1046,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181455,"ts_us":1490635181455501,"ts_ns":1490635181455501571}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1006}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1006,"first_name":"Kenneth","last_name":"Anderson","email":"kander@acme.com"},"source":{"version":"3.3.1.Final","name":"dbserver1","server_id":223344,"ts_sec":1490635160,"gtid":null,"file":"mysql-bin.000003","pos":1356,"row":0,"snapshot":null,"thread":3,"db":"inventory","table":"customers"},"op":"c","ts_ms":1490635181456,"ts_us":1490635181456101,"ts_ns":1490635181456101571}}这些事件是创建事件,与您之前看到的类似。正如您所见,Debezium 仍然会报告数据库中的所有更改,即使它未运行时(只要它在 MySQL 数据库清除其
binlog中错过的提交之前重新启动)。
清理
完成本教程后,您可以使用 Docker 来停止所有正在运行的容器。
-
停止每个容器。
$ docker stop mysqlterm watcher connect mysql kafkaDocker 会停止每个容器。因为您在启动它们时使用了
--rm选项,所以 Docker 也会删除它们。
|
如果您使用 Podman,请运行以下命令:
|
-
验证所有进程是否已停止并已删除。
$ docker ps -a如果任何进程仍在运行,请使用
docker stop <process-name>或docker stop <containerId>命令停止它们。
后续步骤
完成教程后,请考虑以下后续步骤:
-
进一步探索教程。
使用 MySQL 命令行客户端在数据库表中添加、修改和删除行,并查看对主题的影响。您可能需要为每个主题运行单独的
watch-topic命令。请记住,您不能删除被外键引用的行。 -
尝试使用 Postgres、MongoDB、SQL Server 和 Oracle 的 Debezium 连接器运行教程。
您可以使用本教程的 Docker Compose 版本,该版本位于 Debezium 示例存储库中。提供了 Docker Compose 文件,用于使用 MySQL、Postgres、MongoDB、SQL Server 和 Oracle 运行本教程。