“如何添加一个新表来开始捕获其更改?”

这是我们社区中最常见的问题之一。

如果你问《银河系漫游指南》,万事万物的答案是42。遗憾的是,在现实世界中,事情并没有那么简单。真正的答案是:这取决于具体情况。

在这篇文章中,我不仅想通过讲解不同的场景来提供一个答案,还想解释其背后的原因。

引言

在探索所有可能性之前,我们首先需要定义一些 Debezium 的基本概念。

Debezium 如何知道要捕获哪些表?

Debezium 提供了一组配置属性来决定要捕获哪些数据库、Schema 或表。这些过滤器的应用方式取决于源数据库。

MySQL / MariaDB

在这些数据库中,"数据库"这个词基本上等同于 "Schema"。所以在这里你可以通过 `database.include.list` 和 `table.include.list` 进行过滤,但没有单独的 Schema 过滤器。

PostgreSQL, Oracle, SQL Server, Db2

一个连接器实例始终只捕获一个数据库。在该数据库中,你可以通过 `schema.include.list` 和 `table.include.list` 进行过滤。

Include 属性是

database.include.list

一个可选的、逗号分隔的正则表达式列表,用于匹配数据库名称。默认情况下,所有数据库都包含(在适用情况下)。

schema.include.list

一个可选的、逗号分隔的正则表达式列表,用于匹配 Schema 名称。默认情况下,所有非系统 Schema 都包含在内。

table.include.list

一个可选的、逗号分隔的正则表达式列表,用于匹配完全限定的表标识符。每个标识符的格式为 schemaName.tableName。默认情况下,所有非系统表都包含在内。

如果需要相反的逻辑,还有 exclude 对应的属性。

database.exclude.list

一个可选的、逗号分隔的要排除的数据库名称列表。

schema.exclude.list

一个可选的、逗号分隔的要排除的 Schema 名称列表。

table.exclude.list

一个可选的、逗号分隔的要排除的完全限定表标识符列表。

最终,这些属性的组合决定了 Debezium 将捕获哪些表的事件。请记住:你可以使用 `include` 属性或 `exclude` 属性,但绝不能同时使用。

Debezium 如何获取表 Schema 信息?

Debezium 为了开始从表中流式传输数据更改,需要了解其 Schema,以便能够提供有关表结构的信息。通常,Debezium 在流式传输过程中学习表 Schema,以便正确解释事件。具体机制取决于数据库连接器。

DDL 语句

大多数连接器,例如 MariaDB、MySQL 和 Oracle,会从事务日志中读取 DDL 语句,进行解析,并更新内存中每个表的 Schema 表示。

元数据消息

PostgreSQL 使用 逻辑解码 来发送关系(表)元数据消息。当 Schema 发生变化、复制恢复,或者在发送表的第一个更改事件之前,Debezium 都会接收到关系消息(R)。这些消息使 Debezium 能够知道更改属于哪个表,列的顺序、数据类型以及其他结构信息。

JDBC 查询

对于 SQL Server 和 Db2 等连接器,如果收到了一个表中还没有在内存 Schema 表示中的更改事件,Debezium 会直接查询数据库以检索表 Schema。

对于不依赖元数据消息的连接器,Schema 信息也会被持久化,以便能够承受连接器重启或故障。为此,有几个以 `schema.history.internal.` 为前缀的属性用于管理此区域的配置。特别值得关注的是属性

schema.history.internal.store.only.captured.tables.ddl

指定连接器是记录 Schema 中所有表的 Schema 结构,还是只记录指定要捕获的表的 Schema 结构。

还有一个相关的属性

schema.history.internal.store.only.captured.databases.ddl

指定连接器是记录所有数据库的 Schema 结构,还是只记录指定要捕获的数据库的 Schema 结构。

现在先记住它,稍后我们会详细介绍它的作用。

现在,让我们退一步:当连接器首次启动时会发生什么?

在发出表的事件之前,Debezium 需要知道表的结构。即使这些信息可以通过流式传输中的 DDL 更改获得,但当连接器首次启动时,DDL 很可能已经不在日志中了。这就是为什么 Debezium 要求在首次启动时捕获数据库 Schema。

另外,初始快照主要用于捕获数据库中存在的当前数据,因为 Debezium 首次启动时,事务日志中可能并非所有数据都可用。捕获 Schema 是必需的,以便 Debezium 了解表的结构。即使您不想捕获现有数据,也无法禁用 Schema 捕获。但是,您可以使用 `no_data` 快照模式来跳过捕获数据本身。

话虽如此,您应该开始理解为什么答案是“这取决于”。

让我们通过几个常见的场景来回顾一下。

场景 1

让我们从一个简单的场景开始:您已将连接器配置为捕获数据库中所有可用的表。这可以通过相应地设置 `*.include.list` 属性来实现,或者简单地将 include 和 exclude 配置都留空——在这种情况下,Debezium 默认捕获所有内容。

首先,在此场景中,`schema.history.internal.store.only.captured.tables.ddl` 属性没有影响,因为所有表 Schema 都被捕获了。

如果您在 Debezium 正在流式传输时创建了一个新表,它会捕获 DDL 并更新其内部 Schema 表示。当该表发生 DML 更改时,Debezium 会立即开始发出更改事件,因为表结构已经已知。

您需要检查数据库是否已配置为支持 CDC,以便新表能够自动启用 CDC。这取决于连接器的具体实现。例如,在 PostgreSQL 中,这意味着 `publication.autocreate.mode` 设置为 `all_tables`(默认值)或禁用(并且您已手动为其创建了所有表)。

场景 2

现在假设我们处于 `*.include.list` 参数已设置,使得 Debezium 只捕获数据库中可用表的一个子集的场景。如果您将 `schema.history.internal.store.only.captured.tables.ddl` 保持不变(默认为 `false`),Debezium 将在初始快照期间检索所有表的 Schema。

因此,如果您想开始捕获先前未捕获的表中的更改,您需要:

  1. 编辑 `table.include.list` 属性以指定您要捕获的表。

  2. Restart the connector. (重新启动连接器。)

  3. [可选] 发起一个 `incremental snapshot`,如果您想捕获新添加表中存在的现有数据。

如果将 `schema.history.internal.store.only.captured.tables.ddl` 设置为 `true`,则过程会稍微复杂一些。在这种情况下,由于 Debezium 没有关于您要添加的表的信息,因此您需要明确告知连接器检索它。

过程是:

  1. Stop the connector. (停止连接器。)

  2. 删除由 `schema.history.internal.kafka.topic` 属性指定的内部数据库 Schema 历史记录 Topic。

  3. 将 `snapshot.mode` 属性设置为 `recovery`(对于不支持 recovery 的连接器,如 PostgreSQL,可以使用 `no_data`)。

  4. 将表添加到捕获列表中。

  5. Restart the connector. (重新启动连接器。)

基本上,这会让 Debezium 进入一个仅用于 Schema 的“首次运行”模式。不会重新快照任何现有数据。

应谨慎使用 `recovery` 选项,因为它假设自连接器上次停止以来没有发生 Schema 更改,否则在空隙期间发生的某些事件可能会以不正确的 Schema 进行处理而损坏。

我们为 Oracle 实现了一个实验性功能,该功能可以在 DML 发生时动态注册表的 Schema。这意味着,如果 Debezium 检测到一个未捕获 Schema 的表的插入、更新或删除事件,并且该表符合 include 列表,它将尝试注册当前 Schema 并使用该 Schema 来处理该事件。

等等!MongoDB 呢?

由于其无模式的特性,MongoDB 不需要您像关系数据库那样预先定义 Schema。因此,如果您想添加一个新集合来捕获,您只需要:

  1. 确保配置的 `capture.scope` 设置为包含所需的集合。

  2. 编辑 `collection.include.list` 以添加新集合。

结论

我们已经了解了如何在不同场景下将新表添加到捕获列表中,并探讨了所需步骤背后的原因。我希望这能提高您对 Debezium 的理解,并帮助您更自信地管理它。

Debezium 团队正在不断寻求方法来改进项目及其可用性,本文是这项持续努力的一部分。

由于最简单的功能是不需要解释的功能,我们正在考虑添加一个开箱即用的信号操作来触发特定表的 Schema 刷新。这将使将新表添加到捕获列表的过程更加平滑和直接。

Fiore Mario Vitale

Mario 活跃于开源社区,为多个项目做出了贡献,并深度参与 Debezium (分布式变更数据捕获平台) 的开发。在他的职业生涯中,他积累了在受数据影响的事件驱动架构方面的丰富经验。在他的整个职业生涯中,Mario 主要专注于数据密集型软件和产品开发,这提高了他对开发者体验和数据驱动应用程序的敏感度。除了他的职业追求,Mario 在技术和个人兴趣的交汇处找到了自己的舒适区。他喜欢拍照,尤其擅长捕捉美好的瞬间。他对赛车运动和比赛也充满热情。不写代码的时候,您经常会发现他在户外骑山地自行车探索,以此来满足他对冒险的热情。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×