今天,我很高兴地宣布 2.2 系列的第二个 alpha 版本 Debezium 2.2.0.Alpha2。此版本包含大量错误修复、改进、重大更改以及一些新功能,包括但不限于:新的 ExtractRecordChanges 单条消息转换,Quarkus 的 Debezium Outbox 扩展的 Reactive 实现,Apache RocketMQ 的 Debezium Storage 模块,以及更多内容。让我们花点时间深入了解这些新功能、改进和重大更改。

重大变更

我们通常会尽量避免任何破坏性更改,即使是在小版本发布中;但是,在某些情况下,破坏性更改是不可避免的。Debezium 2.2.0.Alpha2 包含三项破坏性更改:

Topic / Schema 命名更改

Debezium 之前通过使用下划线(_)替换非 ASCII 字符来清理 Topic 和 Schema 名称,以避免在使用 Schema 注册表时出现不支持的 Topic 或 Schema 名称。但是,如果这个非 ASCII 字符是两个相似的 Topic 或 Schema 名称之间唯一的区别(它们仅在大小写方面有所不同),这会导致其他问题。

为了以最兼容的方式解决此问题,Debezium 现在使用基于策略的方法来唯一地映射字符。作为此更改的副作用,sanitize.field.names 配置属性已被弃用,并被这个新的基于策略的方法所取代。

每个连接器支持两个配置属性来控制此行为:

schema.name.adjustment.mode

指定应如何调整 Schema 名称以兼容消息转换器。

field.name.adjustment.mode

指定应如何调整字段名称以兼容消息转换器。

这两个连接器配置属性支持三种模式:

none

不对 Schema 或字段名称进行调整,按原样传递。

avro

使用下划线(_)替换无法在 Avro 中使用的字符。

avro_unicode

使用基于 Unicode 的字符替换下划线(_)和无法在 Avro 中使用的字符。

这现在允许您根据表或集合的命名约定选择最合适的策略。

Oracle 连接器的 Source Info Block 更改

所有与插入、更新和删除相关的 Debezium 更改事件都包含一个 source 信息块在事件的 payload 中。对于 Oracle 连接器,这个块包含一个名为 ssn 的特殊字段,它代表这个更改的 SQL 序列号。

已发现一些边缘情况,其中从数据库为此字段获取的值可能超过 2,147,483,647 的最大值,或 INT32 数据类型的最大值。为了修复这个边缘情况,我们已将数据类型从 INT32 更改为 INT64,这将允许最大值为 9,223,372,036,854,775,807

此更改应该是完全无损的,但我们想提请注意,如果您有管道可能会将此值存储在接收系统,或者您正在使用 Schema 注册表,则需要注意。

Debezium Server 已移至新仓库

Debezium Server 是一个独立的、基于 Quarkus 的运行时,用于 Debezium 源连接器,能够与 EventHubs、PubSub、Pulsar、Redis 和 Kafka 等各种平台集成。在此版本中,我们将 Debezium Server 的相关代码移至其自己的 GitHub 仓库

此更改是为了支持构建 Debezium Server 以包含不属于主 Debezium 仓库的连接器,例如 Db2、Google Spanner、Cassandra 4 和 Vitess。因此,这意味着从这个版本开始,Debezium Server 默认将包含所有连接器(不包括 Cassandra 3)。

Cassandra 3 被排除的原因是类加载存在一些技术限制,导致与 Cassandra 4 发生冲突。我们已意识到此问题,并计划将来提供解决方案以包含 Cassandra 3。

新的 ExtractChangedRecordState SMT

我们从社区多次听到,希望有一种开箱即用的方法来确定 Debezium 更改事件中哪些值已更改。新的单消息转换 (SMT) ExtractChangedRecordState 旨在通过向事件添加元数据来标识哪些字段已更改或未更改,以满足此请求。

要开始使用此新转换,请将其配置为连接器配置的一部分:

transforms=changes
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed=ChangedFields
transforms.changes.header.unchanged=UnchangedFields

此转换可以配置为通过设置 header.changed 来显示已更改的字段,通过设置 header.unchanged 来显示未更改的字段,或者通过同时设置这两个属性(如上所示)来显示两者。转换将添加一个具有指定名称的新头,其值将包含一个字段名称集合,具体取决于您是配置了更改、非更改还是两者都配置。

使用 ExtractNewRecordState SMT 删除字段

ExtractNewRecordState 单消息转换在您需要以“扁平化”格式消费 Debezium 更改事件的情况下非常有用。此 SMT 在此版本中已得到更改,增加了从事件的 payload 和 key 中删除字段的功能。

此新功能为转换引入了三个新的配置属性:

drop.fields.header.name

用于列出源消息中要删除的字段名称的 Kafka 消息头名称。

drop.fields.from.key

指定是否也从 key 中删除字段,默认为 false

drop.fields.keep.schema.compatible

指定是否删除可选字段,默认为 true

使用 Avro 时,Schema 兼容性至关重要。这就是为什么我们选择默认强制执行 Schema 兼容性的原因。如果一个字段被配置为删除,但它不是可选的,除非禁用 Schema 兼容性,否则该字段不会从 key 或 payload 中移除。

这些新的配置选项允许以令人兴奋的方式来操作更改事件。例如,要仅发出已更改字段的事件,将 ExtractNewRecordState 与新的 ExtractChangedRecordState 转换配对可以使其极其简单和直接。仅发出更改列的示例配置如下所示:

transforms=changes,extract
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.unchanged=UnchangedFields
transforms.extract.type=io.debezium.transforms.ExtractNewRecordState
transforms.extract.drop.fields.header.name=UnchangedFields

上述配置将明确不包含事件 payload 值中的未更改字段。如果 key 中的字段未更改,它将不受影响,因为 drop.fields.from.key 的默认值仍为 false。最后,如果事件 payload 中的某个字段因未更改而被删除,但它不是可选的,那么它将继续包含在转换的输出事件中,以符合 Schema 兼容性。

响应式 Debezium Outbox Quarkus 扩展

Outbox 模式是许多微服务用来在微服务边界之间共享数据的一种方法。我们在 2020 年初的 Debezium 1.1 中引入了 Debezium Outbox Quarkus 扩展,它允许 Quarkus 用户轻松地使用 Debezium 来利用 Outbox 模式。

感谢 Ingmar Fjolla,Debezium 2.2.0.Alpha2 包含了一个新的基于响应式的 Debezium Outbox Quarkus 扩展实现。这个新实现基于 Vert.x 和 Hibernate Reactive,提供了使用 Debezium 的 Outbox 模式的完全异步解决方案。

此新扩展将在本季度末或第二季度初包含在 Quarkus Platform 版本中。但是,如果您今天就想开始使用它,可以使用以下坐标轻松将其直接添加到项目的配置中:

Maven 坐标
<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox-reactive</artifactId>
  <version>2.2.0.Alpha2</version>
</dependency>
Gradle 坐标
io.debezium:debezium-quarkus-outbox-reactive:2.2.0.Alpha2

新的 RocketMQ Schema History 存储

Debezium 的新存储 API 在过去一年中取得了巨大成功。我们最初从最初的文件和基于 Kafka 的偏移量和 Schema history 存储实现开始,但后来已扩展到支持在 Amazon S3 和 Redis 等其他平台上存储 Schema history。

此版本继续扩展这一点,为 RocketMQ 添加了一个新的 Schema History 存储实现。为了开始将您的 Schema History 存储到 RocketMQ,debezium-storage-rocketmq 依赖项必须首先在类路径中,并且可供连接器运行时访问。

一旦依赖项存在,仅剩的步骤将是配置 Schema History 连接器配置。以下示例显示了 RocketMQ Schema History 的基本用法:

schema.history.internal.rocketmq.topic=schema-history
schema.history.internal.rocketmq.name.srv.addr=172.17.15.2
schema.history.internal.rocketmq.acl.enabled=true
schema.history.internal.rocketmq.access.key=<rocketmq-access-key>
schema.history.internal.rocketmq.secret.key=<rocketmq-secret-key>
schema.history.internal.rocketmq.recovery.attempts=5
schema.history.internal.rocketmq.recovery.poll.interval.ms=1000
schema.history.internal.rocketmq.store.record.timeout.ms=2000
schema.history.internal.rocketmq.topic

指定 Schema History 将被存储的主题名称。

schema.history.internal.rocketmq.name.srv.addr

指定 RocketMQ 的服务发现服务名称服务器。

schema.history.internal.rocketmq.acl.enabled

指定是否启用了访问控制列表 (ACL),默认为 false

schema.history.internal.rocketmq.access.key

指定 RocketMQ 访问密钥,仅在启用 ACL 时需要。

schema.history.internal.rocketmq.secret.key

指定 RocketMQ 密钥,仅在启用 ACL 时需要。

schema.history.internal.rocketmq.recovery.attempts

指定在恢复完成之前,没有数据返回的连续尝试次数。

schema.history.internal.rocketmq.recovery.poll.interval.ms

指定每次轮询尝试以毫秒为单位的时间,以恢复历史记录。

schema.history.internal.rocketmq.store.record.timeout.ms

指定写入 RocketMQ 完成之前的时间(以毫秒为单位),否则超时。

其他修复

此版本中还有许多其他改进、错误修复和稳定性更改,其中一些值得注意的包括

  • 更好地控制 Debezium GTID 的使用 DBZ-2296

  • mysql bigint 的数据类型转换失败 DBZ-5798

  • ActivateTracingSpan 报告了错误的 timestamp DBZ-5827

  • 如果名称包含反斜杠 \,则无法指定列或表的包含列表 DBZ-5917

  • debezium-connector-cassandra 2.1.0.Alpha2 插件已无法“开箱即用” DBZ-5925

  • MongoDB 增量快照无法正常工作 DBZ-5973

  • 可为空的列在 DDL 事件中被标记为“optional: false” DBZ-6003

  • 升级到 Quarkus 2.16.0.Final DBZ-6005

  • Vitess: 处理当前 db shards 和持久化 shards 之间的 shard 列表差异 DBZ-6011

  • 在 Postgres 连接器发生错误时,偏移量未刷新到 connect offsets topic DBZ-6026

  • TIME 列的意外格式:8:00 DBZ-6029

  • Oracle 不支持 LOB 存储子句之后的压缩/日志记录子句 DBZ-6031

  • Debezium-server Pulsar 支持非默认租户和命名空间 DBZ-6033

  • Debezium 正在记录包含错误信息的完整消息 DBZ-6037

  • 提高从 Kafka 恢复内部模式历史的弹性 DBZ-6039

  • Vitess:支持将无符号 bigint MySQL 列类型映射到 long DBZ-6043

  • 增量快照将来自信号数据库的事件发送到 Kafka DBZ-6051

  • 升级 Kafka 至 3.3.2 DBZ-6054

  • 在日志语句中屏蔽密码 DBZ-6064

  • 加载自定义偏移存储时,由于 Class not found 错误而失败 DBZ-6075

  • 将 query.fetch.size 默认值提高到零以上的某个合理值 DBZ-6079

  • 如果数据库数量小于 maxTasks,SQL Server 任务将失败 DBZ-6084

  • 使用 LOB 支持时,对多行的 UPDATE 可能导致事件数据不一致 DBZ-6107

  • 在 CloudEvents 消息 ID 中公开 sequence 字段 DBZ-6089

  • 如果事务不包含与捕获表相关的事件,则减少跳过事务的详细程度 DBZ-6094

  • 升级 Kafka 客户端至 3.4.0 DBZ-6102

下一步是什么?

Debezium 2.2 的开发周期还处于非常早期的阶段,还有许多其他功能正在开发中,包括:

  • 可配置的信号通道,使用户不仅可以从数据库表或 Kafka 主题发送信号,还可以从其他方式发送,例如 HTTP 端点、文件系统等。

  • Debezium JDBC 接收器连接器,开箱即用地支持原生的 Debezium 更改事件,无需使用事件扁平化转换。

  • 以及大量的 Debezium UI 增强功能。

我们已经进入本季度中期,Debezium 2.2 即将进入 Beta 阶段。我们非常希望听到您对路线图、本次发布的更改、尚未完成的更改或我们未提及的任何内容的反馈或建议。如果有,请务必通过 邮件列表 或我们的 聊天 与我们联系。

另外,请关注我们的 2023 年第一期通讯,以及博客系列“Debezium for Oracle”的即将推出的最终篇,其中我将介绍 Oracle 连接器的性能、调试和常见问题解答。

下次再见……

Chris Cranford

Chris 是 IBM 的一名软件工程师,之前在 Red Hat 工作,他致力于 Debezium 项目,并每天都在深入研究 Oracle 和 Change Data Capture 的各个方面。他此前曾从事 Hibernate(领先的开源 JPA 持久化框架)方面的工作,并且继续为 Quarkus 做贡献。Chris 居住在美国北卡罗来纳州。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×