今天,我们怀着极大的喜悦宣布 Debezium 2.2.0.Final 的可用性!
许多人可能已经注意到,此发布周期比我们传统的三个月要长一些。虽然我们通常更喜欢保持我们一贯的发布周期,但这次调整给我们提供了一个独特的机会,发布包含大量新功能和错误修复的 Debezium 2.2,同时还对几个核心组件进行了重大升级。
目录
重大变更
在深入了解新增和变更内容之前,让我们花点时间讨论一下 Debezium 2.2 版本中的几项重大变更。
ZonedTimestamp 截断
在 DBZ-5996 中报告了一个边缘情况,如果一个时间列使用了 ZonedTimestamp,并且该列的值包含 0 微秒或纳秒,那么该值将不会被发出为 2023-01-19T12:30:00.123000Z,而是以截断的方式发出为 2023-01-19T12:30:00.123Z。这可能导致事件管道中使用的转换器出现其他问题,因为该列的输出格式可能不一致。
为了解决这个边缘情况,ZonedTimestamp 的实现现在将使用源数据库列的长度/精度来填充该列值的基于小数的秒数。以上面 TIMESTAMP(6) MySQL 列类型为例,发出的值现在将正确地反映为 2023-01-19T12:30:00.123000Z。
虽然这个行为变化可能对大多数用户影响很小,但我们想提请注意,以防您在管道中通过其他方式处理过此边缘情况。如果您这样做过,您应该可以依赖 Debezium 来一致地发出值,即使小数秒为 0。
Topic 和 Schema 命名变更
Debezium 之前会通过使用下划线 (_) 来替换非 ASCII 字符,从而对 Topic 和 Schema 名称进行清理,以避免在使用 Schema 注册表时出现不支持的 Topic 或 Schema 名称。然而,如果此非 ASCII 字符是两个相似 Topic 或 Schema 名称之间唯一的区别,而其他方面仅因大小写而异,则会导致其他问题。
为了以最兼容的方式解决此问题,Debezium 现在使用基于策略的方法来唯一地映射字符。作为此变更的附带效果,sanitize.field.names 配置属性已被弃用,并被此新的基于策略的方法取代。
每个连接器支持两个配置属性来控制此行为。
schema.name.adjustment.mode-
指定如何调整 Schema 名称以兼容消息转换器。
field.name.adjustment.mode-
指定如何调整字段名称以兼容消息转换器。
这两个连接器配置属性支持三种模式:
none-
不对 Schema 或字段名称进行任何调整,直接传递。
avro-
用下划线 (
_) 替换不能在 Avro 中使用的字符。 avro_unicode-
用基于 Unicode 的字符替换下划线 (
_) 和不能在 Avro 中使用的字符。
现在,您可以根据表或集合的命名约定选择最合适的策略。
Oracle 源信息块变更
所有与插入、更新和删除相关的 Debezium 变更事件都在事件的 payload 中包含一个 source 信息块。对于 Oracle 连接器,此块包含一个名为 ssn 的特殊字段,表示此变更的 SQL 序列号。
已发现存在一些极端情况,从数据库中为此字段获取的值可能会超过 2,147,483,647 的最大值,或者 INT32 数据类型的最大值。为解决此极端情况,我们将数据类型从 INT32 更改为 INT64,允许的最大值为 9,223,372,036,854,775,807。
此变更应该是完全无损的,但我们希望引起您的注意,以防您的管道可能正在将此值存储在接收端系统或您正在使用 Schema 注册表。
Debezium Server 已迁移到新仓库
Debezium Server 是一个基于 Quarkus 的独立运行时,用于 Debezium 源连接器,支持与 EventHubs、PubSub、Pulsar、Redis 和 Kafka 等各种平台集成。在此版本中,我们将 Debezium Server 相关的代码迁移到了其自身的 GitHub 仓库。
之所以需要此更改,是为了支持构建 Debezium Server 以包含不属于 Debezium 主仓库的连接器,例如 Db2、Google Spanner、Cassandra 4 和 Vitess。因此,这意味着从这个版本开始,Debezium Server 现在默认包含所有连接器(不包括 Cassandra 3)。
| Cassandra 3 被排除在外,原因是类加载方面的一些技术限制导致与 Cassandra 4 发生冲突。我们已知晓此问题,并计划将来提供包含 Cassandra 3 的解决方案。 |
停止向 docker.io 发布容器镜像
Debezium 计划于 2023 年 6 月停止向 docker.io 发布容器镜像。有些人可能知道 Docker 最近关于缩减其免费组织计划的政策变更,而 Debezium 等许多开源项目都使用了该计划。
虽然 Docker 撤回了其决定,但这确实引发了一个关于未来是否会再次发生这种情况的问题。Debezium 长期以来一直同时向 docker.io 和 quay.io 发布容器制品,我们计划在即将到来的季度中,在 Debezium 2.3 的 **预览** 版本中继续这样做。
然而,自 2023 年 6 月底发布 **Debezium 2.3.0.Final** 起,Debezium 将停止向 docker.io 发布容器制品,今后将仅向 quay.io 发布容器镜像。
有什么新内容?
Debezium 2.2 包含大量新功能,其中最值得注意的是:
-
容器
-
核心
-
连接器
-
JDBC
-
Oracle
-
Spanner
-
-
Debezium 服务器
-
出库 Quarkus 扩展
-
Storage API
Jolokia 支持
Jolokia 是一个 JMX-HTTP 桥,提供了使用 JSR-160 收集指标的替代方案。它是一种基于代理的方法,通过引入批量请求和细粒度安全策略等独特功能来改进传统的 JMX。
在 Debezium 2.2 中,debezium/connect 镜像现在捆绑了 Jolokia,但此代理默认未启用。为了启用 Jolokia 支持,必须以 ENABLE_JOLOKIA 设置为 true 的方式启动容器。默认情况下,Jolokia 在启用时将绑定到端口 8778。
如果需要不同的端口,则需要以不同的方式启用 Jolokia。例如,要使用端口 9779 启用 Jolokia,请不要设置 ENABLE_JOLOKIA,而是按如下方式配置 KAFKA_OPTS 环境变量。
-e KAFKA_OPTS="-javaagent:$(ls "$KAFKA_HOME"/libs/jolokia-jvm-*.jar)=port=9779,host=*" 通过指定上述环境变量,Jolokia 的 JMX-HTTP 桥将在容器的端口 9779 上可用。
| 启动容器时,请不要忘记将 Jolokia 端口添加到容器的暴露端口列表中。 |
连接器启动时重试数据库连接
在 Debezium 的先前版本中,连接器启动阶段采用的是快速失败策略。简单来说,这意味着如果我们无法连接、进行身份验证或执行连接器所需的任何启动阶段步骤,连接器将进入 FAILED 状态。
用户的一个特定问题区域是,如果连接器正常启动,运行一段时间,然后最终遇到某些致命错误。如果错误与数据库的可用性有关,并且在连接器启动阶段数据库仍然不可用,连接器通常会正常重启。但是,如果问题与数据库的可用性有关,并且在连接器启动阶段数据库仍然不可用,情况则不同。在这种情况下,连接器将快速失败,并进入 FAILED 状态,需要手动干预。
快速失败方法多年来一直很好地服务于 Debezium,但在资源可能无预警地出现和消失的世界中,很明显需要进行更改来提高 Debezium 的可靠性和弹性。虽然 Kafka Connect 的重试/回退框架在这方面有所帮助,但这并不能解决由于当前代码编写方式而导致的启动资源不可用的问题。
Debezium 2.2 改变了这种状况,略微改变了我们与 Kafka Connect 源连接器 API 的集成方式。不再在启动生命周期中访问可能不可用的资源,我们将该访问移到了连接器生命周期的稍后阶段。实际上,Debezium 启动代码被惰性执行,访问可能不可用的资源,这使我们能够即使在启动代码中也能利用 Kafka Connect 的重试/回退框架。简而言之,如果在连接器启动期间数据库仍然不可用,并且启用了 Kafka Connect 重试,连接器将继续重试/回退。只有当达到最大重试次数或发生不可重试错误时,连接器任务才会进入 FAILED 状态。
我们希望这能为 Debezium 体验带来更高的可靠性和弹性,改善在不断变化的环境中错误的处理方式,并为管理连接器生命周期提供坚实的基础。
ExtractNewRecordState 单消息转换
我们从社区多次听到,能够以开箱即用的方式确定 Debezium 变更事件中哪些值已更改,这将是非常棒的。新的单消息转换 (SMT) ExtractChangedRecordState 旨在通过向事件添加元数据来标识哪些字段已更改或未更改,从而满足此请求。
要开始使用此新转换,请将其配置为连接器配置的一部分。
transforms=changes
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed=ChangedFields
transforms.changes.header.unchanged=UnchangedFields 此转换可以配置为通过设置 header.changed 来显示哪些字段已更改,通过设置 header.unchanged 来显示哪些字段未更改,或者通过同时设置这两个属性(如上所示)来同时显示两者。转换将添加一个具有指定名称的新头,其值将包含一个字段名称集合,具体取决于您是配置了更改、非更改还是两者都配置。
使用 ExtractNewRecordState 单消息转换删除字段
ExtractNewRecordState 单消息转换在需要以“展平”格式消费 Debezium 变更事件的情况下非常有用。此版本对 SMT 进行了更改,增加了从事件的 payload 和消息键中删除字段的功能。
此新功能为该转换引入了三个新的配置属性:
drop.fields.header.name-
用于列出要删除的源消息中的字段名称的 Kafka 消息头名称。
drop.fields.from.key-
指定是否也从键中删除字段,默认为
false。 drop.fields.keep.schema.compatible-
指定是否删除仅可选的字段,默认为
true。
| 使用 Avro 时,Schema 兼容性至关重要。这就是我们选择默认强制执行 Schema 兼容性的原因。如果某个字段被配置为删除,但它不是可选的,那么除非禁用 Schema 兼容性,否则该字段不会从键或 payload 中删除。 |
这些新配置选项允许以一些令人兴奋的方式来操作变更事件。例如,要仅发出已更改字段的事件,将 ExtractNewRecordState 与新的 ExtractChangedRecordState 转换配对可以非常简单直接。仅发出已更改列的示例配置如下所示:
transforms=changes,extract
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.unchanged=UnchangedFields
transforms.extract.type=io.debezium.transforms.ExtractNewRecordState
transforms.extract.drop.fields.header.name=UnchangedFields 上述配置将明确不包含事件 payload 值中的未更改字段。如果键中的某个字段未更改,则不会受到影响,因为 drop.fields.from.key 保持其默认值 false。最后,如果事件 payload 中的某个字段由于未更改而要被删除,但它不是可选的,为了符合 Schema 兼容性,它将继续包含在转换的输出事件中。
并行快照
Debezium 的关系数据库初始快照过程一直是单线程的。此限制主要源于确保跨多个事务的数据一致性的复杂性。
从 Debezium 2.2 开始,我们正在添加一种新的、最初可选的方式来利用多线程执行连接器的一致数据库快照。此实现使用这些多线程并行执行表级快照。
要利用此新功能,请在连接器配置中指定 snapshot.max.threads,当此属性的值大于 1 时,将使用并行快照。
snapshot.max.threads=4 在上面的示例中,如果连接器需要快照超过 4 张表,则最多会有 4 张表并行快照。当一个线程完成处理一张表后,它将从队列中获取一张新表进行快照,然后继续进行,直到所有表都完成快照。
| 此功能被认为是 **孵化中** 的,但我们强烈建议新连接器部署尝试此功能。我们欢迎您提供任何关于如何改进此功能的反馈。 |
使用代理键的增量快照
Debezium 的增量快照功能取得了巨大成功。它提供了一种高效的方式来执行可恢复的一致快照,这对于快照包含大量数据至关重要。
然而,增量快照在使用前有特定的要求。其中一项要求是所有被快照的表都必须使用主键。您可能会问,为什么表没有主键,我们今天不在这里讨论;但 suffice to say,这种情况比您想象的更常见。
在 Debezium 2.2 中,只要有一个列是唯一的,并且可以被视为用于增量快照目的的“代理键”,就可以对无键表执行增量快照。
| MongoDB 不支持代理键功能;仅支持关系型连接器。 |
为了在增量快照信号中提供代理键列数据,信号的 payload 必须包含新的代理键属性 surrogate-key。
{
"data-collections": [ "public.mytab" ],
"surrogate-key": "customer_ref"
} 在上面的示例中,将为表 public.mytab 启动增量快照,并且增量快照将使用 customer_ref 列作为生成快照窗口的主键。
| 代理键不能使用多个列定义,只能使用 **单个** 列。 |
然而,代理键功能不仅适用于没有主键的表。在使用具有主键的表时,此功能还有一系列优点:
-
一个明显的优点是当表的主键由多个列组成时。查询为每个主键列生成一个析取谓词,其性能高度依赖于环境。将列数减少到一列通常可以普遍提高性能。
-
另一个优点是当代理键基于数字数据类型,而主键列基于字符类型数据类型时。关系型数据库通常比字符比较更有效地执行谓词评估。在这种情况下,通过调整查询以使用数字数据类型,可以提高查询性能。
Quarkus 3 支持
Quarkus 是一个 Kubernetes 原生 Java 栈,它结合了最好的 Java 库来创建快速、低占用的应用程序。Debezium Server 运行时基于 Quarkus,Debezium UI 的一部分也是如此。此外,Debezium Outbox 扩展也基于 Quarkus 平台。
升级到 Quarkus 3 带来了许多改进,包括使用最新的稳定版 Java 库,以及从 Java EE 迁移到 Jakarta EE。如果您不熟悉这次迁移,以前大多数 Java EE 平台类都包含在 javax.* 包中。在过去一两年中,越来越多的应用程序开始从 JavaEE 或 J2EE 迁移到 Jakarta EE,Quarkus 3 标志着这个过渡时代。总的来说,唯一真正的变化是以前位于 javax.* 的类现在位于 jakarta.*。
如果您的应用程序使用了 Debezium Quarkus Outbox 扩展,请注意,为了使用 Quarkus 的 Debezium 2.2,您需要迁移到 Quarkus 3。这也意味着,如果您想利用 Outbox 扩展来处理 Reactive 数据源,您也必须使用 Quarkus 3。
最后,如果您正在开发或维护 Debezium Server 的 sink 适配器,您也需要调整使用新的 Jakarta EE 注释而不是旧的 Java EE 注释。
JDBC Sink 连接器
Debezium 2.2 版本开启了 Debezium 的新时代,Debezium 长期以来一直专注于提供一套用于关系型和非关系型数据库的源连接器。此版本改变了这种格局,引入了一个新的 JDBC sink 连接器实现。
Debezium JDBC sink 连接器与其他供应商的实现有很大不同,它能够摄取 Debezium 连接器发出的变更事件,而无需进行事件展平。这有可能减少您管道中的处理占用空间,简化管道的配置,并允许 Debezium 的 JDBC sink 连接器利用许多 Debezium 源连接器的功能,如列类型传播等等。
开始使用 Debezium JDBC sink 连接器非常简单,让我们看一个例子。
假设我们有一个名为 orders 的 Kafka 主题,其中包含未经 ExtractNewRecordState 转换从 MySQL 创建的 Debezium 变更事件。将这些变更事件摄取到 PostgreSQL 数据库的简单配置可能如下所示:
{
"name": "mysql-to-postgres-pipeline",
"config": {
"connector_class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics": "orders",
"connection.url": "jdbc://postgresql://<host>:<port>/<database>",
"connection.user": "<username>",
"connection.password": "<password>",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic"
}
} 在此示例中,我们指定了一系列 connection.* 属性,用于定义访问目标 PostgreSQL 数据库的连接字符串和凭据。此外,记录将在写入目标数据库时使用 **UPSERT** 语义,如果记录不存在则插入,如果存在则更新。我们还启用了 Schema 演进,并指定表的键列应从事件的主键派生。
JDBC sink 连接器目前支持以下关系型数据库:
-
Db2
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
我们确实打算将来添加其他方言,如果您希望看到某种方言,请通过我们的邮件列表、聊天或提交 Jira 增强请求与我们联系。
从 Oracle 逻辑备用实例中提取变更
Debezium 的 Oracle 连接器通常管理所谓的 **刷新表**,这是一个内部表,用于管理 Oracle Log Writer Buffer (LGWR) 进程使用的刷新周期。此刷新过程要求连接器使用的用户帐户具有创建此表并向其写入数据的权限。逻辑备用数据库通常对数据操作有更严格的规则,甚至可能是只读的,因此,写入数据库是不受欢迎的,甚至是不允许的。
为了支持 Oracle 只读逻辑备用数据库,我们引入了一个标志来禁用此 **刷新表** 的创建和管理。此功能可用于 Oracle Standalone 和 Oracle RAC 安装,目前被视为孵化中,意味着它将来可能会发生变化。
为了启用 Oracle 只读逻辑备用支持,添加以下连接器选项:
internal.log.mining.read.only=true 在未来的版本中,我们计划增加对 Oracle 只读物理备用数据库的支持。
Google Spanner PostgreSQL 方言支持
Google 的 Cloud Spanner 平台支持 PostgreSQL 接口,该接口将 Google Spanner 平台的规模化和可靠性与 PostgreSQL 的熟悉性和可移植性结合在一起。当使用此 PostgreSQL 接口操作 Google Spanner 时,列和表的元数据与使用标准 GoogleSQL 方言时不同。
此版本扩展了 Debezium Spanner 连接器的支持,不仅支持 GoogleSQL 方言,还支持使用 Spanner PostgreSQL 方言功能的用户。这意味着无论您的 Spanner 环境依赖于哪种方言,您都可以使用 Debezium Spanner 连接器无缝地捕获 Spanner 的变更事件。
Infinispan sink 适配器
Infinispan 是一个内存中分布式数据存储,提供灵活的部署选项和强大的功能来存储、管理和处理数据。Infinispan 基于键值存储的概念,允许存储任何数据类型。为了将 Debezium Server 与 Infinispan 集成,必须修改 Debezium Server application.properties 文件以包含以下条目:
debezium.sink.type=infinispan
debezium.sink.infinispan.server.host=<hostname>
debezium.sink.infinispan.server.port=<port>
debezium.sink.infinispan.cache=<cache-name>
debezium.sink.infinispan.user=<user>
debezium.sink.infinispan.password=<password> 上述配置指定要使用的 sink 类型为 infinispan,这启用了 Infinispan 模块的使用。以下是上面显示的每个属性的描述:
debezium.sink.infinispan.server.host-
指定 Infinispan 集群中某个服务器的主机名。此配置选项也可以提供逗号分隔的主机名列表,例如
hostname1,hostname2。 debezium.sink.infinispan.server.port-
指定 Infinispan 集群的端口。默认为
11222。 debezium.sink.infinispan.cache-
指定 Infinispan 缓存的名称,用于写入变更事件。
| Infinispan sink 要求缓存必须提前手动创建。这使得能够创建具有任何所需变量配置的缓存,以满足您的需求。 |
debezium.sink.infinispan.user-
一个可选配置,用于指定用于身份验证的用户名,如果需要身份验证。
debezium.sink.infinispan.password-
一个可选配置,用于指定用于身份验证的用户的密码,如果需要身份验证。
有关使用 Debezium Server 和 Infinispan 的更多信息,请参阅 文档。
RabbitMQ sink 适配器
Debezium 2.2 在 Debezium Server 产品组合中引入了一个新的 sink 适配器,允许 Debezium 用户将变更事件发送到 RabbitMQ。以下配置展示了如何轻松配置的简单示例:
debezium.sink.type=rabbitmq
# Connection details
debezium.sink.rabbitmq.connection.host=<hostname>
debezium.sink.rabbitmq.connection.port=<port>
# The routing key specifies an override of where events are published
debezium.sink.rabbitmq.routingKey=<routing-key>
# The default is 30 seconds, specified in milliseconds
debezium.sink.rabbitmq.ackTimeout=30000 debezium.sink.rabbitmq.connection.* 属性是必需的,而后面的 routingKey 和 ackTimeout 属性是可选的,或具有默认值,对于大多数用例应该足够了。
RocketMQ sink 适配器
Apache RocketMQ 是一个云原生消息、事件和流式实时数据处理平台,涵盖云-边缘-设备协作场景。为了将 Debezium Server 与 RocketMQ 集成,必须修改 Debezium Server application.properties 文件以包含以下条目:
debezium.sink.type=rocketmq
debezium.sink.rocketmq.producer.name.srv.addr=<hostname>:<port>
debezium.sink.rocketmq.producer.group=debezuim-group
debezium.sink.rocketmq.producer.max.message.size=4194304
debezium.sink.rocketmq.producer.send.msg.timeout=3000
debezium.sink.rocketmq.producer.acl.enabled=false
debezium.sink.rocketmq.producer.access.key=<access-key>
debezium.sink.rocketmq.producer.secret.key=<secret-key> 上述配置指定要使用的 sink 类型为 rocketmq,这启用了 RocketMQ 模块的使用。以下是上面显示的每个属性的描述:
debezium.sink.rocketmq.producer.name.srv.addr-
指定 Apache RocketMQ 可用的主机和端口。
debezium.sink.rocketmq.producer.group-
指定与 Apache RocketMQ producer 组关联的名称。
debezium.sink.rocketmq.producer.max.message.size-
(可选)指定消息的最大字节数。默认为
4193404(4MB)。 debezium.sink.rocketmq.producer.send.msg.timeout-
(可选)指定发送消息的超时时间(以毫秒为单位)。默认为
3000(3 秒)。 debezium.sink.rocketmq.producer.acl.enabled-
(可选)控制是否启用访问控制列表。默认为
false。 debezium.sink.rocketmq.producer.access.key-
(可选)用于连接到 Apache RocketMQ 集群的访问密钥。
debezium.sink.rocketmq.producer.secret.key-
(可选)用于连接到 Apache RocketMQ 集群的访问密钥。
有关使用 Debezium Server 和 RocketMQ 的更多信息,请参阅 文档。
Pulsar 异步事件交付
在以前版本的 Debezium Server Pulsar sink 中,适配器利用 send() 方法同步发送消息。虽然这对于发送一次性消息有效,但由于该方法按顺序等待发送操作的确认,这可能会引入连接器延迟。由于 Debezium Server sink 适配器接收的是要发送的事件集合,同步的性质性能不佳。
从 Debezium 2.2 开始,Pulsar sink 现在将使用 sendAsync() 异步将事件批次发送到 Pulsar,从而显著提高整体吞吐量。虽然批次中的每个事件都是异步发送的,但适配器将在当前批次完全确认后才继续处理下一个批次。
Reactive Quarkus Outbox 扩展
Outbox 模式 是许多微服务用于在微服务边界之间共享数据的一种方法。我们在 2020 年初的 Debezium 1.1 中引入了 Debezium Outbox Quarkus Extension,它使 Quarkus 用户能够轻松地使用 Debezium 利用 outbox 模式。
感谢 Ingmar Fjolla,Debezium 2.2 包含了一个新的基于 Reactive 的 Debezium Outbox Quarkus Extension 实现。这个新实现基于 Vert.x 和 Hibernate Reactive,使用 Debezium 提供了完全异步的 outbox 模式解决方案。
这个新扩展包含在本月晚些时候发布的 Quarkus 3 平台中。但是,如果您今天想开始使用它,您可以轻松地将其直接放入项目的配置中,使用以下坐标:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-quarkus-outbox-reactive</artifactId>
<version>2.2.0.Final</version>
</dependency> io.debezium:debezium-quarkus-outbox-reactive:2.2.0.Final Amazon S3 存储桶支持
Debezium 提供了一个 Storage API 框架,使连接器能够将偏移量和 Schema 历史状态存储在各种持久化数据存储中。此外,该框架还使贡献者能够通过添加新的存储实现来轻松扩展 API。目前,Storage API 框架支持本地文件系统、Kafka Topic 或 Redis 数据存储。
在 Debezium 2.2 中,我们很高兴地将 Amazon S3 存储桶添加到了该框架中,允许将 Schema 历史持久化到 S3 存储桶。使用 S3 的示例连接器配置可能如下所示:
...
schema.history.internal=io.debezium.storage.s3.history
schema.history.internal.s3.access.key.id=aa
schema.history.internal.s3.secret.access.key=bb
schema.history.internal.s3.region.name=aws-global
schema.history.internal.s3.bucket.name=debezium
schema.history.internal.s3.object.name=db-history.log
schema.history.internal.s3.endpoint=http://<server>:<port> schema.history.internal.s3.access.key.id-
指定访问 S3 所需的访问密钥。
schema.history.internal.s3.secret.access.key-
指定访问 S3 所需的秘密访问密钥。
schema.history.internal.s3.region.name-
指定 S3 存储桶所在的区域。
schema.history.internal.s3.bucket.name-
指定要将 Schema 历史持久化的 S3 存储桶的名称。
schema.history.internal.s3.object.name-
指定要在存储桶中将 Schema 历史持久化的对象名称。
schema.history.internal.s3.endpoint-
指定 S3 端点,格式为
http://<server>:<port>;。
RocketMQ 存储支持
Debezium 的新存储 API 在过去一年中取得了巨大成功。最初,我们从原始的文件和基于 Kafka 的偏移量和 Schema 历史存储实现开始,但后来扩展到支持将 Schema 历史存储在其他平台,如 Amazon S3 和 Redis。
此版本继续扩展这一点,为 Rocket MQ 添加了新的 Schema 历史存储实现。要开始将 Schema 历史存储到 Rocket MQ,debezium-storage-rocketmq 依赖项必须首先在类路径上,并且可以被连接器运行时访问。
一旦依赖项存在,剩下的唯一一步就是配置 Schema 历史连接器配置。以下示例展示了 Rocket MQ Schema 历史的基本用法:
schema.history.internal.rocketmq.topic=schema-history
schema.history.internal.rocketmq.name.srv.addr=172.17.15.2
schema.history.internal.rocketmq.acl.enabled=true
schema.history.internal.rocketmq.access.key=<rocketmq-access-key>
schema.history.internal.rocketmq.secret.key=<rocketmq-secret-key>
schema.history.internal.rocketmq.recovery.attempts=5
schema.history.internal.rocketmq.recovery.poll.interval.ms=1000
schema.history.internal.rocketmq.store.record.timeout.ms=2000 schema.history.internal.rocketmq.topic-
指定将存储 Schema 历史的主题名称。
schema.history.internal.rocketmq.name.srv.addr-
指定 Rocket MQ 的服务发现服务名称服务器。
schema.history.internal.rocketmq.acl.enabled-
指定是否启用访问控制列表 (ACL),默认为
false。 schema.history.internal.rocketmq.access.key-
指定 Rocket MQ 访问密钥,仅在启用 ACL 时需要。
schema.history.internal.rocketmq.secret.key-
指定 Rocket MQ 密钥,仅在启用 ACL 时需要。
schema.history.internal.rocketmq.recovery.attempts-
指定在恢复完成之前没有数据返回的连续尝试次数。
schema.history.internal.rocketmq.recovery.poll.interval.ms-
指定每次轮询尝试以毫秒为单位的时间,以恢复历史记录。
schema.history.internal.rocketmq.store.record.timeout.ms-
指定写入 Rocket MQ 的超时时间(以毫秒为单位)。
其他修复和改进
在 Debezium 2.2 的开发过程中,进行了许多错误修复、稳定性改进和优化。总共有 228 个问题 在此版本中得到修复。
非常感谢社区所有为本版本做出贡献的贡献者:Akshansh Jain、Đỗ Ngọc Sơn、Anatolii Popov、Gabor Andras Anil Dasari、Animesh Kumar、Anisha Mohanty、Bob Roldan、Bobby Tiernay、Byron Ruth、Chris Cranford、Erdinç Taşkın、Eugene Abramchuk、Gabor Andras、Govinda Sakhare、Gunnar Morling、Harvey Yue、Henrik Schnell、Henry Cai、Hossein Torabi、Indra Shukla、Ingmar Fjolla、Ismail Simsek、Jacob Barrieault、Jacob Gminder、Jakub Cechacek、Jakub Zalas、Jeremy Ford、Jiri Pechanec、Jochen Schalanda、Liz Chatman、Lokesh Sanapalli、Luca Scannapieco、Mario Fiore Vitale、Mark Bereznitsky、Mark Lambert、Martin Medek、Mehmet Firat Komurcu、My Lang Pangzi、Nir Levy、Olivier Boudet、Ondrej Babec、Pengwei Dou、Plugaru Tudor、RJ Nowling、Rajendra Dangwal、Robert Roldan、Ronak Jain、Russell Mora、Sergei Morozov、Stefan Miklosovic、Subodh Kant Chaturvedi、Sun Xiao Jian、Thomas Thornton、Théophile Helleboid、Tim Loes、Vojtech Juranek、Vojtěch Juránek、Xinbin Huang、Yang Wu、Yohei Yoshimuta、ming luo、tony joseph、yohei yoshimuta 和 蔡灿材!
下一步是什么?
我们几周前开始预先规划 Debezium 2.3,并且随着 2.2 的发布,我们的重点将放在下一个次要版本上。由于 Debezium 2.2 的发布周期比正常情况长一些,因此 2.3 的发布周期将缩短,因为我们希望恢复到我们的季度末发布节奏。为了实现这一目标,我们选择将以下功能作为下一个次要版本进行重点关注:
- 可配置的信号通道
-
此更改的目的是提供一种方式,可以通过各种来源(包括文件系统、Kafka 主题、数据库表等)向连接器发送信号。
- 仅一次交付语义
-
Debezium 目前仅保证至少一次交付语义,这意味着在连接器发生不安全关机或故障的情况下,变更事件可能会被写入主题多次。Kafka 以及 Kafka Connect 现在支持仅一次交付,我们希望将此功能作为 Debezium 的一部分进行探索。目标是至少将此功能添加到一次连接器作为概念验证,并根据反馈将其扩展到所有连接器。
- Debezium Server 的 Kubernetes Operator
-
Debezium Server 在最近几个月获得了相当大的关注,包括新的 sink 适配器以及社区的普遍使用。我们希望将 Kubernetes 的强大功能带给 Debezium Server,引入一个您可以部署的 Operator,以管理 Debezium Server 部署的整个生命周期。
- 使用 OpenLogReplicator 从 Oracle 进行摄取
-
Debezium Oracle 连接器提供了使用 XStream 或 LogMiner 进行更改摄取的功能。我们希望使用 OpenLogReplicator 构建一个概念验证,这是一个能够直接从文件系统读取 Oracle redo 和 archive 日志的原生应用程序。我们不打算用这种新方法替换现有的任何适配器,而是扩展连接器的功能,提供开销可能更小的替代数据摄取方式。
- Debezium UI 增强功能
-
我们相信 Debezium UI 还有很大的潜力,因此此版本将通过添加新的功能来改进整体用户体验,例如启动/停止临时快照、编辑连接器部署以及显示关键连接器指标。
下次再见!
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。