Debezium 从 1.6 版本开始的一个主要改进是支持 增量快照。在这篇博文中,我们将解释此功能背后的动机,深入探讨实现细节,并进行演示。
为什么需要增量快照?
自 Debezium 问世以来,捕获表列表变更的支持一直是一个最大的痛点。作为用户,您创建一个新的连接器,其中包含要捕获的表列表(table.include.list 及相关选项);稍后,可能需要调整此配置,以便捕获最初不属于 CDC 的其他表。如果只需要从这些表中流式传输更改,那么问题就相当简单。但是,如果您还需要捕获表中现有的内容呢?
Debezium 在快照阶段传统上会捕获现有数据。此阶段在连接器首次启动时执行一次,其目标是捕获某个时间点的连续数据(将静态数据转换为动态数据)。这可能是一个相当长的操作,并且根据定义,它必须全部完成或根本不执行 - 类似于事务的语义。这意味着,如果由于连接器重启等原因导致快照未完成,则必须从头开始重新执行,并且所有已完成的工作都会被丢弃。此外,在快照拍摄期间,数据库中并行执行的任何数据修改都不会被流式传输,直到快照完成为止。对于非常大的快照,这可能导致数据库资源问题,因为事务日志必须一直可用,直到流式传输开始。
因此,我们需要解决三个问题:
-
如果必须流式传输现有数据,则几乎不可能将其他表添加到已捕获的表列表中。
-
一个不可中断或恢复的、运行时间长的连续快照进程。
-
在快照完成之前,更改数据流会被阻塞。
旧的解决方案
这个问题众所周知,随着时间的推移,我们开发了变通方法,并构思了可能的改进和新解决方案。作为一种变通方法,一般的建议是使用多个连接器。用户需要
-
停止连接器
-
创建一个新的连接器来拍摄新表的快照(使用
initial_only快照模式) -
完成后,停止新的连接器
-
重新配置并启动旧的连接器,将新捕获的表添加到列表中。
这种方法在一定程度上奏效,但非常笨拙,并且上述所有关于快照一致性的问题仍然存在。
下一步是社区通过 DBZ-175 向 Debezium 的 MySQL 连接器贡献代码。它基于同时存在多个二进制日志读取器的概念。一个读取器捕获最初配置的表,而另一个读取器则快照新表,然后捕获新表的更改。后者读取器将追赶前者,然后将它们协调并合并为一个。
代码运行良好,但由于该过程本身相当复杂且容易出现细微的错误,因此从未走出孵化阶段。最重要的是,这是一个巧妙的方法,但不幸的是,它不能移植到其他连接器。
基于水印的快照
2019 年底,Netflix 工程团队宣布他们开发了一个内部变更数据捕获框架。他们还提出了一个创新的解决方案,通过水印执行并发快照,该解决方案在 Andreas Andreakis 和 Ioannis Papapanagiotou 的论文 DBLog: A Watermark Based Change-Data-Capture Framework 中进行了描述。
这种方法背后的主要思想是,变更数据流与快照同时连续执行。该框架将低水印和高水印插入到事务日志中(通过写入源数据库),并在这两个点之间读取快照表的一部分。该框架会记录水印之间的数据库更改,并将其与快照值进行协调,如果快照期间的相同记录被快照和修改。
这意味着数据是分块快照的 - 连接器启动时没有冗长的过程,并且在发生崩溃或连接器被正常终止的情况下,快照可以从最后一个完成的块恢复。
根据 Netflix 的说法,该实现已为 MySQL 和 PostgreSQL 数据库提供。
信号表
在深入了解 Debezium 对基于水印的快照方法的实现之前,需要做一个小的绕道。
有时从外部控制 Debezium 很有用,以强制它执行某些请求的操作。假设需要重新快照一个已快照的表 - 所谓的即时快照。用户需要向 Debezium 发送命令以暂停当前操作并进行快照。为此,Debezium 定义了信号的概念,通过信号表发出。这是一个特殊表,用于用户和 Debezium 之间的通信。Debezium 捕获该表,当用户需要执行某个操作时,他们只需向信号表中写入一条记录(发送一个信号)。Debezium 将收到捕获的更改,然后执行所需的操作。
Debezium 中的增量快照
当我们了解到 DBLog 的快照方法时,我们决定该方法是通用的,并且也可以尝试在 Debezium 中采用它。另外,由于我们在不同的连接器之间共享大量代码(使用 Debezium 连接器框架),我们的目标是在 Debezium 核心组件中实现它,以便所有连接器都能一次性受益于此功能。该设计和实现由 DDD-3 Debezium 设计文档驱动。
Debezium 中的增量快照以即时快照的形式提供。用户不配置连接器来执行快照,而是使用信号机制发送快照信号,从而触发对一组表的快照。该信号称为 execute-snapshot,信号消息遵循以下格式:
{"data-collections": ["<table-id-1>", "<table-id-2>", "<table-id-3>", ...]} 当请求表快照时,Debezium 将执行以下操作:
-
获取表中的最大主键;这是快照终结点,其值存储在连接器偏移量中。
-
根据主键的总顺序和
incremental.snapshot.chunk.size配置选项规定的大小将表分成块。
查询某个块时,会构建一个动态 SQL 语句,选择比前一个块的最后一个主键(或第一个块的第一个主键)更大且小于或等于记录的最大主键的下一个 incremental.snapshot.chunk.size 条记录。
| 默认块大小为 1,024。您可以增加此值以提高效率(将执行更少的快照查询),但这应与增加的缓冲区内存消耗相权衡。建议在您自己的环境中进行一些实验,以确定最适合您情况的设置。 |
块的读取过程稍微复杂一些。
-
发送
snapshot-window-open信号。 -
执行块查询并将块内容读取到内存中。
-
发送
snapshot-window-close信号。
为什么需要这个?为什么只查询数据库不够?答案在下图。
图 1. 事务隔离
Debezium 并不是唯一访问数据库的进程。我们可以预期有多个进程并发访问数据库,可能会访问当前正在被快照的相同记录。如图所示,对数据的任何更改都根据提交顺序写入事务日志。由于无法精确确定块读取事务的时间以识别潜在冲突,因此会添加打开和关闭窗口事件来区分可能发生冲突的时间。Debezium 的任务是去重这些冲突。
为此,Debezium 将块生成的所有事件记录到缓冲区中。当收到 snapshot-window-open 信号时,将检查来自事务日志的所有事件是否属于要快照的表。如果是,则检查缓冲区是否包含主键。如果是,则从缓冲区中删除快照事件,因为这可能是一个冲突。并且由于无法正确排序快照和事务日志事件,因此只保留事务日志事件。当收到 snapshot-window-close 信号时,缓冲区中剩余的快照事件将被发送到下游。
下图展示了一个缓冲区的工作示例,以及交易日志事件在发送到下游之前是如何被过滤的。
图 2. 缓冲区运行中
记录 K2、K3 和 K4 已存在于数据库中。在快照窗口打开之前,记录 K1 被插入,K2 被更新,K3 被删除。这些事件在读取日志时被发送到下游。快照窗口打开,其查询将 K1、K2 和 K4 读取到缓冲区中。在窗口打开期间,K4 的删除操作会从事务日志中检索;缓冲区中 K4 的快照事件被删除,删除事件被发送到下游。K5 和 K6 被插入,这将从日志中检索,并发出相应的事件。根据具体的时间,缓冲区中可能还有它们的读取事件(在图中 K5 就是这种情况),这些事件将被丢弃。当快照窗口关闭时,缓冲区中剩余的 K1 和 K2 的快照事件将被发出。
连接器重启
现在我们已经证明,使用增量快照的概念,在连接器运行时,如果需要,相同的表可以被反复快照。我们已经展示了它的执行不会停止从事务日志中流式传输。最后一项是暂停和继续进程。
当增量快照运行时,增量快照上下文会添加到每个消息偏移量中。上下文由三条信息表示:
-
要快照的表列表,其中第一个是当前正在快照的表。
-
表的 maximum primary key。
-
发送到下游的最后一个增量快照事件的主键。
这三项信息足以在连接器重启后(无论是故意的还是在崩溃后)恢复快照。连接器启动后,负责快照的组件会从偏移量中读取数据。它会初始化其内部状态,并在最后一个已处理事件之后恢复快照。请注意,在连接器未运行时插入或更新的任何记录都将通过常规流读取进行处理,即它们不受正在进行的快照的影响。
这种方法确保了该过程的稳健性、对重启和崩溃的弹性,并最大限度地减少了重新传递的事件数量(仍然适用“至少一次”传递语义)。
局限性
与初始连续快照相比,增量快照有一些缺点:
-
被快照的表必须包含主键。
-
如果在快照过程中从表中删除了一个事件,那么可能会发生以下情况之一:
-
下游消费者收到
read事件和delete事件。 -
只收到
delete事件。
-
-
如果在快照过程中更新了表中的一个事件,那么可能会发生以下情况之一:
-
下游消费者收到
read事件和update事件。 -
收到
update事件和read事件(请注意顺序相反)。 -
只收到
update事件(如果更新发生在会发出read事件的块内,导致该read事件在去重过程中被丢弃)。
-
总的来说,不应将 read 事件理解为表中记录的初始状态,而应理解为记录在任意时间点的状态。与 Debezium 中的传统初始快照相比,消费者语义略有改变,虽然可以保证消费者在增量快照完成后收到了完整的数据集,但并非所有记录都会有 read(快照)事件,而可能是 update 事件。对于 delete 事件也是如此:消费者必须准备好接收尚未见过的记录的此类事件。
演示
在讨论了通用概念之后,让我们通过一个示例来更深入地探讨。我们将使用我们标准的 教程部署来演示即时增量快照。我们将使用 PostgreSQL 作为源数据库。为了进行演示,您需要多个终端窗口。
首先,我们将启动部署,创建信号表,并启动连接器。
# Terminal 1 - start the deployment
# Start the deployment
export DEBEZIUM_VERSION=1.7
docker-compose -f docker-compose-postgres.yaml up
# Terminal 2
# Create a signalling table
echo "CREATE TABLE inventory.dbz_signal (id varchar(64), type varchar(32), data varchar(2048))" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
# Start Postgres connector, capture only customers table and enable signalling
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/ -d @- <<EOF
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory",
"table.include.list": "inventory.customers,inventory.dbz_signal",
"signal.data.collection": "inventory.dbz_signal"
}
}
EOF 从日志中可以看到,根据 table.include.list 设置,只快照了一个表,即 customers。
connect_1 | 2021-09-24 13:38:21,781 INFO Postgres|dbserver1|snapshot Snapshotting contents of 1 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
下一步,我们将模拟数据库中的连续活动。
# Terminal 3
# Continuously consume messages from Debezium topic for customers table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
# Terminal 4
# Modify records in the database via Postgres client
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c "i=0; while true; do psql -U $POSTGRES_USER postgres -c \"INSERT INTO customers VALUES(default,'name\$i','surname\$i','email\$i')\"; ((i++)); done" 主题 dbserver1.inventory.customers 接收连续的消息流。现在,连接器将被重新配置为也捕获 orders 表。
# Terminal 5
# Add orders table among the captured
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/inventory-connector/config -d @- <<EOF
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include": "inventory",
"table.include.list": "inventory.customers,inventory.dbz_signal,inventory.orders",
"signal.data.collection": "inventory.dbz_signal"
}
EOF 正如预期的那样,orders 表没有消息。
# Terminal 5
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders 现在,让我们通过发送信号来启动增量即时快照。orders 表的快照消息被传递到 dbserver1.inventory.orders 主题。customers 表的消息不间断地传递。
# Terminal 5
# Send the signal
echo "INSERT INTO inventory.dbz_signal VALUES ('signal-1', 'execute-snapshot', '{\"data-collections\": [\"inventory.orders\"]}')" | docker-compose -f docker-compose-postgres.yaml exec -T postgres env PGOPTIONS="--search_path=inventory" bash -c "psql -U $POSTGRES_USER postgres"
# Check messages for orders table
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.orders 如果您在快照运行时修改了 orders 表中的任何记录,它将被作为 read 事件或 update 事件发出,具体取决于确切的时间和事件顺序。
最后一步,我们将终止部署的系统并关闭所有终端。
# Shut down the cluster
docker-compose -f docker-compose-postgres.yaml down 总结
在这篇博客文章中,我们讨论了 DBLog 论文提出的增量快照概念的动机。我们回顾了过去用于实现所述功能的方法。然后,我们深入研究了 Debezium 中这种新颖快照方法的实现细节,最后尝试了现场使用。
我们希望您会发现增量快照很有用,并期待您的反馈、经验和用例。在未来的博客文章中,我们将讨论对只读数据库的增量快照支持(自 1.7 版本起由 Debezium MySQL 连接器支持),以及如何使用 Kafka 主题(而不是数据库表)作为信号机制来触发即时快照。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。