在 Debezium 的 聊天 或 邮件列表 中,时不时会有人询问如何确保 Debezium 生成的记录被精确一次(exactly-once)投递。到目前为止,Debezium 只保证至少一次(at-least-once)投递。这意味着 Debezium 保证每一条变更都会被投递,不会丢失或跳过任何变更事件。然而,在出现故障、重启或数据库连接中断的情况下,同一个事件可能会被投递多次。典型的场景是事件被投递两次——一次在故障/重启之前,另一次在之后。精确一次投递(或语义)提供了一个更强的保证——每一条消息都会被投递,并且不会有重复,每条消息只会被投递一次。到目前为止,我们的回答是,如果用户需要精确一次投递,他们需要自己实现去重系统。但是,随着 Kafka Connect 对精确一次投递的支持,我们似乎可以开箱即用地为 Debezium 连接器提供精确一次投递,只需稍作配置更改。
Kafka Connect 精确一次语义
Kafka 本身早已提供了事务支持,这是实现精确一次投递的基石(自 Kafka 0.11 起),也提供了精确一次投递本身。缺失的是 Kafka Connect 的精确一次投递支持。这在 Kafka 3.3.0 中得到了改变,它增加了对源连接器(source connectors)的精确一次投递支持,请参阅 KIP-618 和相关的 Jira 问题。从高层来看,源连接器生成并写入未提交事务中的事件对消费者是不可见的,直到事务提交(包括对应这些事件的偏移量提交)才会变得可见。源连接器本身不创建事务,而是由 Kafka Connect 处理。连接器只在需要时定义事务边界,在许多情况下甚至不需要这样做。
Kafka Connect 配置
目前,精确一次语义(EOS)仅在 Kafka Connect 的分布式模式下受支持。用户需要做的就是为所有 Kafka Connect 工作节点启用精确一次投递。工作节点可以通过设置 exactly.once.source.support=enabled 来启用精确一次支持。如果您想在不关闭集群的情况下进行滚动更新,您可以先将所有工作节点更新到 exactly.once.source.support=preparing,然后逐渐将工作节点切换到 exactly.once.source.support=enabled。
需要精确一次投递的源连接器需要以类似的方式进行配置,设置 exactly.once.support。您可以将其设置为 requested 或 required。
-
required- Kafka Connect 会检查源连接器是否通过实现SourceConnector::exactlyOnceSupport方法明确支持精确一次语义。如果连接器未实现此方法,连接器的启动将失败。 -
requested- 跳过检查源连接器是否明确支持精确一次语义。在这种情况下,需要仔细检查连接器的文档或源代码,以确定它是否能提供 EOS。
为确保 EOS 正常工作,当应用程序需要精确一次投递时,最好使用 exactly.once.support=required,而 exactly.once.support=requested 应仅用于测试目的。
另一个相关的源连接器配置选项是 transaction.boundary。它决定了事务的范围。可能的值为:
-
poll- 在单个SourceTask::poll方法调用中返回的所有事件都将包装在单个事务中。 -
interval- 事件将根据指定的时间间隔分块到事务中,该时间间隔可以通过transaction.boundary.interval.ms选项进行配置。 -
connector- 事务边界的定义委托给指定的源连接器。
默认值为 poll 选项。
总而言之,如果您想启用 EOS 并且您的源连接器支持它,您需要在所有工作节点上配置 exactly.once.source.support=enabled,并更新连接器配置为 exactly.once.support=required。
精确一次投递与 Debezium 连接器
Debezium 分为两个阶段:初始快照阶段和流式传输阶段。我们有一个硬性要求,即初始快照必须成功完成,否则必须重新执行。如果在快照阶段发生任何故障,在下次启动或重启连接器后,快照将再次执行,因此会导致重复。在下次启动时重新执行整个快照是有意义的,因为数据可能在下次启动或重启期间发生变化。由于快照应该反映拍摄快照时的确切数据,因此在发生故障时,我们必须从头开始。在初始快照阶段可能有避免重复事件的方法,但目前我们只关注流式传输阶段。
另一方面,在流式传输数据的过程中,情况非常简单。我们将从数据库获取的事件存储在 Debezium 内部队列中,并在每次 Kafka Connect poll() 方法调用时,清空队列并更新 Kafka 偏移量。因此,默认的事务边界(包装 poll() 方法)非常适合 Debezium,Debezium 不需要定义任何自定义事务边界。
看来 Debezium 连接器可以开箱即用地与 Kafka Connect 的精确一次投递配合使用,而无需任何进一步的修改。但是,我们必须开发更健壮的测试来测试 EOS,并首先测试所有连接器。对于我们至少在一定程度上测试过的连接器,我们将实现 SourceConnector::exactlyOnceSupport 方法。在此期间,如果您想在自己的测试或暂存环境中测试 EOS,可以使用 exactly.once.support=requested。
数据库连接中断时精确一次投递的简单测试
让我们展示一个简单的 EOS 测试。我们将尝试在 Debezium 连接器运行时中断其与数据库的连接,同时数据持续写入数据库。在这种情况下,Debezium 将抛出可重试异常,Kafka Connect 会重启连接器,我们将关心连接器是否真的从上次停止的点开始,并且不会发送任何事件两次。
场景可能如下:
-
启动 Kafka Connect
-
部署 Debezium 连接器
-
启动一个自定义加载器,该加载器将持续将数据插入被捕获的数据库表中
-
中断 Debezium 与数据库的连接
-
让 Kafka Connect 重启 Debezium 连接器
-
停止加载器
-
检查相关 Kafka 主题中是否有重复项。
以下是针对 Postgres 数据库和 Debezium Postgres 连接器的具体示例。
让我们创建一个简单的测试表,其中包含 ID、一个整数值(可以与 ID 相同),以及一个时间戳,如果我们想检查记录的创建时间。
DROP TABLE IF EXISTS public.eos_test; -- remove the table first if exists
CREATE TABLE public.eos_test(id SERIAL NOT NULL PRIMARY KEY, val SERIAL, t TIMESTAMP DEFAULT now()); 不要忘记在 Kafka Connect 中更新 exactly.once.source.support=enabled。此外,Kafka Connect 必须在分布式模式下运行,因此您需要修改 config/connect-distributed.properties 并使用这些属性运行工作节点。一旦 Kafka Connect 运行起来,您就可以启动 Debezium Postgres 连接器。
{
"name": "eos",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "eos",
"table.include.list": "public.eos_test",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"exactly.once.support": "requested"
}
} 为了简化主题的读取,我们在此处禁用了将 schema 添加到记录中,并添加了 Debezium SMT 来提取新记录状态,以便只有新值被存储到 Kafka 主题中。
现在,您可以使用此 Python 脚本等加载数据到 Postgres 测试表中。
#!/usr/bin/env python3
"""
Loads data into Postgres EOS test table.
"""
import psycopg2
DROP_TABLE = "DROP TABLE IF EXISTS public.eos_test;"
CREATE_TABLE = "CREATE TABLE public.eos_test(id SERIAL NOT NULL PRIMARY KEY," \
"val SERIAL, t TIMESTAMP DEFAULT now());"
INSERT_INTO_TABLE = "INSERT INTO public.eos_test(val) VALUES (%s)"
try:
connection = psycopg2.connect(
user="postgres",
password="postgres",
host="127.0.0.1",
port="5432",
database="postgres")
cursor = connection.cursor()
# cursor.execute(DROP_TABLE)
# cursor.execute(CREATE_TABLE)
# connection.commit()
for i in range(1, 50000):
cursor.execute(INSERT_INTO_TABLE, (i,))
connection.commit()
print(f"wrote {i}")
finally:
if connection:
cursor.close()
connection.close() 它将向测试表中加载 50,000 条记录。这应该足够长的时间来检查数据是否被 Debezium 连接器捕获,然后切换到另一个窗口并中断 Debezium 与数据库的连接。根据您的测试环境的速度,您可能可以在一次运行中多次中断数据库连接。您可以根据需要增加/减少加载的事件数量。
一旦我们的加载器脚本将数据加载到数据库中,就中断 Debezium 与数据库的连接,例如,连接到 Postgres 数据库并运行以下命令:
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = 'postgres' AND query like 'START_REPLICATION SLOT %'; 这将终止 Debezium 与数据库的连接,随后导致 Postgres 驱动程序抛出 PSQLException 和 Kafka Connect RetriableException。由于该异常是可重试的,Kafka Connect 将自动重启连接器,并继续从测试表中捕获数据。您可以使用 kafka-console-consumer 工具来检查所有数据是否已到达 Kafka 主题。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic eos.public.eos_test 一旦您确定所有记录都在 Kafka 主题中,您就可以运行检查脚本来查找重复事件。该脚本(也用 Python 编写)可能如下所示:
#!/usr/bin/env python3
"""
Check Kafka topic for records with duplicate key/value.
"""
import json
import kafka
UNIQUE_COLUMN = "id"
def deserialize(rec):
"""
Deserialize JSON payload and extract value number.
Ignore transaction boundaries markers.
"""
try:
value = json.loads(rec.decode("utf-8"))
if UNIQUE_COLUMN in value:
return value[UNIQUE_COLUMN]
print(f"WARN: no value in record {rec}")
except:
print(f"WARN: cannot deserialize record {rec}")
return None
def check_topic():
"""
Check Kafka topic for duplicates and prin statistics, including skipped
records.
"""
consumer = kafka.KafkaConsumer(
"eos.public.eos_test",
bootstrap_servers=["localhost:9092"],
auto_offset_reset="earliest",
consumer_timeout_ms=1000,
enable_auto_commit=True,
group_id="eos-group",
value_deserializer=deserialize)
vals = []
items = 0
duplicates = 0
skipped = 0
for msg in consumer:
val = msg.value
if not val:
print(f"WARN: skipping None value, items: {items}")
skipped = skipped + 1
continue
items = items + 1
if val in vals:
print(f"Duplicate value: {val}")
duplicates = duplicates + 1
else:
vals.append(val)
print(
f"Found {duplicates} duplicates in {items} items (unique values: " \
f"{len(vals)}, skipped values: {skipped})")
check_topic() 输出的尾部可能如下所示:
....
WARN: skipping None value, items: 46297
WARN: skipping None value, items: 48345
WARN: cannot deserialize record b'\x00\x00\x00\x00\x00\x00'
WARN: skipping None value, items: 49999
Found 0 duplicates in 49999 items (unique values: 49999, skipped values: 54) 在这种情况下,没有重复的记录,所以一切看起来都很好。唯一的问题是那些被跳过的事件。这些是事务边界标记。Python Kafka 客户端不知何故无法处理它们并反序列化失败,所以我们跳过了它们。Java 客户端应该能够识别这些记录并正常处理它们。
总结和后续步骤
在这篇博客文章中,我们展示了如何为 Kafka Connect 源连接器配置精确一次语义,以及如何将其与 Debezium Postgres 连接器一起使用。到目前为止,似乎没有发现问题,至少 Debezium Postgres 连接器可以很好地与精确一次语义配合使用。
然而,找不到问题并不意味着没有问题。因此,下一步,我们希望开发一个更严格的测试框架来测试数据一致性和精确一次投递。我们想使用著名的 Jepsen 框架来编写测试。如果我们成功编写了测试,我们将在后续的博客文章中分享结果。在此期间,我们鼓励您也在您的环境和部署中测试精确一次投递,以增加发现任何潜在错误的几率。如果您进行了任何此类测试,我们将非常感谢您与我们分享结果,包括发现错误的负面结果以及一切通过的正面结果。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。