我很荣幸地宣布,不仅是 Debezium 2.2 系列的第一个版本,也是 Debezium 在 2023 年的第一个版本,2.2.0.Alpha!
Debezium 2.2.0.Alpha1 版本包含一些重大更改、许多错误修复以及一些值得注意的改进和功能,包括但不限于:
-
[重大更改] -
ZonedTimestamp值将不再截断小数秒。 -
[新功能] - 支持从 Oracle 逻辑备用数据库摄取更改
-
[新功能] - 使用 Debezium Storage API 支持 Amazon S3 存储桶
-
[新功能] - 支持在连接器启动期间重试数据库连接
-
[新功能] - Debezium Server sink 连接器支持 Apache RocketMQ 和 Infinispan
让我们花一些时间来更详细地了解其中的一些功能!
重大更改
在 DBZ-5996 中报告了一个边缘情况,如果一个时间列使用了 ZonedTimestamp,并且该列的值包含 0 微秒或纳秒,那么该值将不会被发出为 2023-01-19T12:30:00.123000Z,而是以截断的方式发出为 2023-01-19T12:30:00.123Z。这可能导致事件管道中使用的转换器出现其他问题,因为该列的输出格式可能不一致。
为了解决这个边缘情况,ZonedTimestamp 的实现现在将使用源数据库列的长度/精度来填充该列值的基于小数的秒数。以上面 TIMESTAMP(6) MySQL 列类型为例,发出的值现在将正确地反映为 2023-01-19T12:30:00.123000Z。
虽然这个行为变化可能对大多数用户影响很小,但我们想提请注意,以防您在管道中通过其他方式处理过此边缘情况。如果您这样做过,您应该可以依赖 Debezium 来一致地发出值,即使小数秒为 0。
从 Oracle 逻辑备库摄取变更
Debezium 的 Oracle 连接器通常会管理一个所谓的“*flush table*”(刷新表),这是一个内部表,用于管理 Oracle 日志写入器(LGWR)进程使用的刷新周期。此刷新过程要求连接器使用的用户账户拥有创建和写入此表的权限。逻辑备库数据库通常对数据操作有更严格的规则,甚至可能是只读的,因此,写入数据库是不受欢迎的,甚至是不可允许的。
为了支持 Oracle 只读逻辑备库数据库,我们引入了一个标志来禁用此“*flush table*”的创建和管理。此功能可与 Oracle 单机版和 Oracle RAC 安装一起使用,目前被视为实验性功能,这意味着它将来可能会发生变化。
为了启用 Oracle 只读逻辑备库支持,请添加以下连接器选项:
internal.log.mining.read.only=true 在未来的版本中,我们计划添加对 Oracle 只读物理备库数据库的支持。
| 此配置选项以 |
使用 Amazon S3 存储桶和 Storage API
Debezium 提供了一个 Storage API 框架,该框架允许连接器将偏移量和模式历史状态存储在各种持久化数据存储中。此外,该框架还允许贡献者通过添加新的存储实现来轻松扩展 API。目前,Storage API 框架支持本地文件系统、Kafka Topic 或 Redis 数据存储。
随着 Debezium 2.2 的发布,我们很高兴地将 Amazon S3 存储桶添加到该框架中,从而允许将模式历史持久化到 S3 存储桶。一个使用 S3 的示例连接器配置可能如下所示:
...
schema.history.internal=io.debezium.storage.s3.history
schema.history.internal.s3.access.key.id=aa
schema.history.internal.s3.secret.access.key=bb
schema.history.internal.s3.region.name=aws-global
schema.history.internal.s3.bucket.name=debezium
schema.history.internal.s3.object.name=db-history.log
schema.history.internal.s3.endpoint=http://<server>:<port> schema.history.internal.s3.access.key.id-
指定了向 S3 进行身份验证所需的访问密钥。
schema.history.internal.s3.secret.access.key-
指定了向 S3 进行身份验证所需的秘密访问密钥。
schema.history.internal.s3.region.name-
指定了 S3 存储桶所在的区域。
schema.history.internal.s3.bucket.name-
指定了要将模式历史持久化到的 S3 存储桶的名称。
schema.history.internal.s3.object.name-
指定了要将模式历史持久化到的存储桶中的对象名称。
schema.history.internal.s3.endpoint-
指定 S3 端点的格式为
http://<server>:<port>;。
启动时重试数据库连接
在 Debezium 的先前版本中,连接器启动阶段采用了快速失败策略。简单来说,这意味着如果我们无法连接、进行身份验证或执行连接器所需的任何启动阶段步骤,连接器将进入 FAILED 状态。
用户的一个特定问题区域是,如果连接器正常启动、运行一段时间,然后最终遇到一些致命错误。如果错误与连接器启动生命周期中未访问的资源有关,连接器通常会正常重启。然而,如果问题与数据库的可用性有关,并且在连接器启动阶段数据库仍然不可用,情况就会不同。在这种情况下,连接器将快速失败,并进入 FAILED 状态,需要手动干预。
快速失败的方法多年来一直很好地服务于 Debezium,但在资源可能毫无征兆地出现又消失的世界里,显然需要进行更改来提高 Debezium 的可靠性和弹性。虽然 Kafka Connect 的重试/退避框架在这方面有所帮助,但它并不能解决当前代码编写方式下启动资源不可用的问题。
Debezium 2.2 改变了这种格局,略微改变了我们与 Kafka Connect 源连接器 API 的集成方式。我们没有在启动生命周期中访问可能不可用的资源,而是将其访问移到了连接器生命周期的稍后阶段。实际上,Debezium 的启动代码是惰性执行的,它访问可能不可用的资源,这使我们能够利用 Kafka Connect 的重试/退避框架,即使在我们的启动代码中。简而言之,如果在连接器启动期间数据库仍然不可用,并且启用了 Kafka Connect 重试,连接器将继续重试/退避。只有当达到最大重试次数或发生不可重试错误时,连接器任务才会进入 FAILED 状态。
我们希望这能为 Debezium 带来更高的可靠性和弹性,改进在不断变化的环境中处理错误的方式,并为管理连接器生命周期提供坚实的基础。
Debezium Server 中的 RocketMQ 和 Infinispan 支持
Debezium Server 是一个基于 Quarkus 的框架,允许从命令行执行 Debezium 连接器,无需 Kafka 或 Kafka Connect,从而可以将 Debezium 变更事件传递到任何目标框架。通过 Debezium 2.2,Debezium Server 中添加了两个新的接收器连接器,用于支持将变更事件发送到 Apache RocketMQ 和 Infinispan。
RocketMQ
Apache RocketMQ 是一个云原生消息、事件和流式实时数据处理平台,涵盖云-边缘-设备协作场景。为了将 Debezium Server 与 RocketMQ 集成,必须修改 Debezium Server application.properties 文件,添加以下条目:
debezium.sink.type=rocketmq
debezium.sink.rocketmq.producer.name.srv.addr=<hostname>:<port>
debezium.sink.rocketmq.producer.group=debezuim-group
debezium.sink.rocketmq.producer.max.message.size=4194304
debezium.sink.rocketmq.producer.send.msg.timeout=3000
debezium.sink.rocketmq.producer.acl.enabled=false
debezium.sink.rocketmq.producer.access.key=<access-key>
debezium.sink.rocketmq.producer.secret.key=<secret-key> 上面的配置指定要使用的接收器类型为 rocketmq,这启用了 RocketMQ 模块的使用。以下是对上面显示的所有属性的描述:
debezium.sink.rocketmq.producer.name.srv.addr-
指定 Apache RocketMQ 可用的主机和端口。
debezium.sink.rocketmq.producer.group-
指定与 Apache RocketMQ 生产者组关联的名称。
debezium.sink.rocketmq.producer.max.message.size-
(可选) 指定消息的最大字节数。默认为
4193404(4MB)。 debezium.sink.rocketmq.producer.send.msg.timeout-
(可选) 指定发送消息的超时时间(以毫秒为单位)。默认为
3000(3秒)。 debezium.sink.rocketmq.producer.acl.enabled-
(可选) 控制是否启用访问控制列表。默认为
false。 debezium.sink.rocketmq.producer.access.key-
(可选) 用于连接 Apache RocketMQ 集群的访问密钥。
debezium.sink.rocketmq.producer.secret.key-
(可选) 用于连接 Apache RocketMQ 集群的访问密钥。
有关使用 Debezium Server 和 RocketMQ 的更多信息,请参阅文档。
Infinispan
Infinispan 是一个内存式、分布式数据存储,提供灵活的部署选项和强大的功能来存储、管理和处理数据。Infinispan 基于键值存储的概念,允许存储任何数据类型。为了将 Debezium Server 与 Infinispan 集成,必须修改 Debezium Server application.properties 文件,添加以下条目:
debezium.sink.type=infinispan
debezium.sink.infinispan.server.host=<hostname>
debezium.sink.infinispan.server.port=<port>
debezium.sink.infinispan.cache=<cache-name>
debezium.sink.infinispan.user=<user>
debezium.sink.infinispan.password=<password> 上面的配置指定要使用的接收器类型为 infinispan,这启用了 Infinispan 模块的使用。以下是对上面显示的所有属性的描述:
debezium.sink.infinispan.server.host-
指定 Infinispan 集群中某个服务器的主机名。此配置选项也可以提供逗号分隔的主机名列表,例如
hostname1,hostname2。 debezium.sink.infinispan.server.port-
指定 Infinispan 集群的端口。默认为
11222。 debezium.sink.infinispan.cache-
指定要将变更事件写入的 Infinispan 缓存的名称。
| Infinispan 接收器要求提前手动创建缓存。这使得能够创建具有任何变量配置的缓存,以满足您的需求。 |
debezium.sink.infinispan.user-
一个可选配置,用于指定用户以进行身份验证,如果需要身份验证。
debezium.sink.infinispan.password-
一个可选配置,用于指定正在验证的用户的密码,如果需要身份验证。
有关使用 Debezium Server 和 Infinispan 的更多信息,请参阅文档。
其他修复
此版本中修复了许多错误并进行了稳定性改进,其中一些值得注意的包括:
-
从 MySQL 连接器中移除指定驱动程序类的选项 DBZ-4663
-
Debezium 与 Apicurio 和自定义信任库不兼容 DBZ-5282
-
连接器视图详细信息屏幕上的显示/隐藏密码不起作用 DBZ-5322
-
Oracle 无法撤销更改 DBZ-5907
-
Postgresql 重启时数据丢失 DBZ-5915
-
为 Debezium Server 添加 Connect Headers 支持 DBZ-5926
-
Oracle 多线程导致数据丢失 DBZ-5945
-
Spanner 连接器缺少 JSR-310 依赖项 DBZ-5959
-
截断与 ExtractNewRecordState 不兼容的记录 DBZ-5966
-
计算出的分区不能为负数 DBZ-5967
-
针对 snapshot.select.statement.overrides 表的表大小日志消息不正确 DBZ-5985
-
在 exclude.tables 配置时,执行快照信号时出现 NPE,因为提供了错误的表名 DBZ-5988
-
Postgresql 连接器解析 money 类型边界值时出现问题 DBZ-5991
-
MySQL 数据库模式中无法解析的 DDL 语句的日志语句包含占位符 DBZ-5993
-
Postgresql 连接器将 money 类型中的 null 解析为 0 DBZ-6001
-
Postgres LSN 检查应遵循 event.processing.failure.handling.mode DBZ-6012
总共有 42 个问题在此版本中得到修复。非常感谢社区所有为本次发布做出贡献的贡献者:Akshansh Jain、Gabor、Anil Dasari、Animesh Kumar、Anisha Mohanty、Bob Roldan、Chris Cranford、Erdinç Taşkın、Govinda Sakhare、Harvey Yue、Hossein Torabi、Indra Shukla、Jakub Zalas、Jeremy Ford、Jiri Pechanec、Jochen Schalanda、Luca Scannapieco、Mario Fiore Vitale、Mark Lambert、Rajendra Dangwal、Sun Xiao Jian、Vojtech Juranek、Yohei Yoshimuta,以及 yohei yoshimuta!
下一步是什么?
Debezium 2.2 的发布之路才刚刚开始,这个初始版本涵盖了我们在最近的 2023 年 路线图更新中概述的大部分功能。但是,仍有许多功能仍在积极开发中,包括:
-
可配置的信号通道,允许用户不仅从数据库表或 Kafka 主题发送信号,还可以从其他方式发送,例如 HTTP 端点、文件系统等。
-
Debezium JDBC 接收器连接器,支持开箱即用的原生 Debezium 变更事件,无需使用 Event Flattening 转换。
-
一个名为
ExtractChangedRecordState的新单一消息转换,它支持向发出的事件添加描述源事件已更改或未更改字段的头部。 -
以及 Debezium UI 的大量增强功能。
随着我们继续开发 Debezium 2.2 并修复 Debezium 2.1 的错误,我们希望听到您的反馈或建议,无论它们是关于我们的路线图、此版本的更改,还是您希望看到但我们尚未提及的内容。请务必在 邮件列表 或我们的 聊天 上与我们联系。如果您只是想来打个招呼,我们也欢迎。
下次再见!
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。