我非常高兴地宣布 Debezium 1.2.0.Alpha1 的发布!

1.2 系列的第一个版本提供了许多有用的新功能。

  • Debezium 嵌入式引擎 API 对消息转换(SMTs)和转换器的支持。

  • 一种用于使用脚本语言过滤变更事件的新 SMT。

  • SQL Server 连接器的自动重新连接。

  • 使用一致哈希值的新列掩码模式。

总体而言,社区为此版本修复了 不少于 41 个问题。让我们在本文的其余部分仔细看看其中的一些。

嵌入式引擎改进

Debezium 的 嵌入式引擎 是在没有 Apache Kafka 和 Kafka Connect 的情况下处理变更事件的非常有用的工具。例如,它允许使用 Debezium 的 CDC 功能,并将变更事件流式传输到 Amazon Kinesis 或 Google Pub/Sub 等替代消息传递基础设施。

为了进一步改善使用此 API 的体验,现在它支持将变更事件序列化为不同的格式(DBZ-1807):JSON、Avro 和 CloudEvents。这使得开发人员无需自己处理记录序列化。例如,以下是如何使用 JSON 作为序列化格式:

Properties props = new Properties();

// don't include schema in message
props.setProperty("converter.schemas.enable", "false"); (1)
// further properties as needed...

DebeziumEngine<ChangeEvent<String>> engine = DebeziumEngine.create(Json.class) (2)
    .using(props)
    .notifying((records, committer) -> { (3)
        for (ChangeEvent<String> r : records) {
            System.out.println("Key = '" + key + "' value = '" + value + "'");
            committer.markProcessed(r);
        }
    })
    .build();
1 底层转换器的所有选项都可以使用。
2 Json.class 是一个类型令牌,请求序列化为 JSON。
3 records 是一个变更事件批次,表示为 JSON 字符串。

嵌入式引擎现在还支持使用 Kafka Connect SMT(DBZ-1930)。可以通过传递给引擎构建器的属性轻松配置它们。

Properties props = new Properties();

props.setProperty("transforms", "router");
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
props.setProperty("transforms.router.regex", "(.*)");
props.setProperty("transforms.router.replacement", "trf$1");

这允许使用任何现有的 Kafka Connect SMT,例如 Kafka Connect 本身附带的 SMT,或者 Debezium 的 SMT,例如用于 主题路由新记录状态提取发件箱事件路由

这些改进为即将推出的独立 Debezium 运行时奠定了基础,该运行时将基于嵌入式引擎,并将其功能作为即用型服务提供。

基于内容的事件过滤

此版本还为 Debezium 添加了另一个非常有用的转换:消息过滤器 SMT。将其应用于 Kafka Connect 数据流管道源端的 Debezium 连接器,允许根据字段值过滤掉特定的变更事件。

例如,您可以使用此功能过滤掉特定客户类型或产品类别的任何变更事件。过滤器以脚本表达式的形式给出,使用任何与 javax.scripting API(JSR 223)兼容的语言。请注意,Debezium 本身不提供任何此类脚本语言实现;相反,您可以从 Groovy、MVEL 或 graal.js(通过 GraalVM 的 JavaScript)等广泛的可用选项中进行选择,并将其添加到 Kafka Connect 插件路径中。

这是使用 Groovy 的示例:

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.after.customerType != 42
...

value 是变更事件的值;您也可以引用事件的键,甚至相应的模式对象。Groovy 会自动解析 value.after.customerType 等属性路径,以便在类似映射的数据结构(如 Kafka Connect 的 Struct 类型)中进行查找。这允许非常简洁的过滤条件。

请注意,此 SMT 目前处于孵化状态,即其 API 和配置界面的详细信息仍可能发生变化。请尝试使用并分享您的体验。

其他功能

除了这些关键功能之外,1.2.0.Alpha1 版本还带来了一些其他新功能:

  • MongoDB 连接器的新指标 NumberOfDisconnectsNumberOfPrimaryElectionsDBZ-1859)。

  • SQL Server 连接器支持连接丢失后的自动重连(DBZ-1882)。

  • 新的列屏蔽模式“一致性哈希”(DBZ-1692):Debezium 允许屏蔽特定的列值,例如为了满足数据隐私和保护方面的顾虑。使用新的“一致性哈希”模式,现在不仅可以使用星号作为屏蔽字符,还可以根据屏蔽数据内容基于哈希值。引用原始问题报告者的话说,“这对于[匿名化]数据很有用,但在这种情况下,它在不同主题之间仍然需要是相关的。这是数据仓库中一个典型的要求,您希望匿名化敏感数据,但仍然需要保持数据的引用完整性。”

  • 在主键更新时允许链接更新变更事件(DBZ-1531):大多数关系型 Debezium 连接器通过使用旧键的删除事件和使用更新键的后续插入事件来表示记录主键的更新;使用新的记录头 __debezium.newkey__debezium.oldkey,消费者在处理 MySQL 和 Postgres 连接器的数据变更时现在可以链接这些变更事件。

  • Debezium 容器镜像已升级到 Apache Kafka 2.4.1(DBZ-1925)。

Bug 修复

此外,还修复了许多 Bug,例如:

  • Postgres 连接器空闲时 CPU 使用率高(DBZ-1960)。

  • 空的 wal2json 空变更事件可能导致 NPE(DBZ-1922)。

  • Cassandra 连接器:无法反序列化具有反向类型的列变异(DBZ-1967)。

  • Outbox Quarkus 扩展在 quarkus:dev 模式下抛出 NPE(DBZ-1966)。

  • binlog_row_image 的验证与 MySQL 5.5 不兼容(DBZ-1950)。

请参阅 发布说明 以获取已解决问题的完整列表以及从早期 Debezium 版本升级的步骤。我们还将关键的错误修复向后移植到 1.1 分支,并将于明天发布 Debezium 1.1.1。

Gunnar Morling

Gunnar 是一位软件工程师,内心是一个开源爱好者,目前在 Confluent 担任技术专家。此前,他曾帮助构建一个基于 Apache Flink 的实时流处理平台,并领导了 Debezium 项目,这是一个用于变更数据捕获的分布式平台。他是 Java Champion,并创立了多个开源项目,如 JfrUnit、kcctl 和 MapStruct。Gunnar 是一位热情的博主 (morling.dev),并曾在 QCon、Java One 和 Devoxx 等各种会议上发表演讲。他居住在德国汉堡。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×