我非常高兴地宣布 Debezium 1.2.0.Alpha1 的发布!
1.2 系列的第一个版本提供了许多有用的新功能。
-
Debezium 嵌入式引擎 API 对消息转换(SMTs)和转换器的支持。
-
一种用于使用脚本语言过滤变更事件的新 SMT。
-
SQL Server 连接器的自动重新连接。
-
使用一致哈希值的新列掩码模式。
总体而言,社区为此版本修复了 不少于 41 个问题。让我们在本文的其余部分仔细看看其中的一些。
嵌入式引擎改进
Debezium 的 嵌入式引擎 是在没有 Apache Kafka 和 Kafka Connect 的情况下处理变更事件的非常有用的工具。例如,它允许使用 Debezium 的 CDC 功能,并将变更事件流式传输到 Amazon Kinesis 或 Google Pub/Sub 等替代消息传递基础设施。
为了进一步改善使用此 API 的体验,现在它支持将变更事件序列化为不同的格式(DBZ-1807):JSON、Avro 和 CloudEvents。这使得开发人员无需自己处理记录序列化。例如,以下是如何使用 JSON 作为序列化格式:
Properties props = new Properties();
// don't include schema in message
props.setProperty("converter.schemas.enable", "false"); (1)
// further properties as needed...
DebeziumEngine<ChangeEvent<String>> engine = DebeziumEngine.create(Json.class) (2)
.using(props)
.notifying((records, committer) -> { (3)
for (ChangeEvent<String> r : records) {
System.out.println("Key = '" + key + "' value = '" + value + "'");
committer.markProcessed(r);
}
})
.build(); | 1 | 底层转换器的所有选项都可以使用。 |
| 2 | Json.class 是一个类型令牌,请求序列化为 JSON。 |
| 3 | records 是一个变更事件批次,表示为 JSON 字符串。 |
嵌入式引擎现在还支持使用 Kafka Connect SMT(DBZ-1930)。可以通过传递给引擎构建器的属性轻松配置它们。
Properties props = new Properties();
props.setProperty("transforms", "router");
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
props.setProperty("transforms.router.regex", "(.*)");
props.setProperty("transforms.router.replacement", "trf$1"); 这允许使用任何现有的 Kafka Connect SMT,例如 Kafka Connect 本身附带的 SMT,或者 Debezium 的 SMT,例如用于 主题路由、新记录状态提取 和 发件箱事件路由。
这些改进为即将推出的独立 Debezium 运行时奠定了基础,该运行时将基于嵌入式引擎,并将其功能作为即用型服务提供。
基于内容的事件过滤
此版本还为 Debezium 添加了另一个非常有用的转换:消息过滤器 SMT。将其应用于 Kafka Connect 数据流管道源端的 Debezium 连接器,允许根据字段值过滤掉特定的变更事件。
例如,您可以使用此功能过滤掉特定客户类型或产品类别的任何变更事件。过滤器以脚本表达式的形式给出,使用任何与 javax.scripting API(JSR 223)兼容的语言。请注意,Debezium 本身不提供任何此类脚本语言实现;相反,您可以从 Groovy、MVEL 或 graal.js(通过 GraalVM 的 JavaScript)等广泛的可用选项中进行选择,并将其添加到 Kafka Connect 插件路径中。
这是使用 Groovy 的示例:
...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.after.customerType != 42
... value 是变更事件的值;您也可以引用事件的键,甚至相应的模式对象。Groovy 会自动解析 value.after.customerType 等属性路径,以便在类似映射的数据结构(如 Kafka Connect 的 Struct 类型)中进行查找。这允许非常简洁的过滤条件。
请注意,此 SMT 目前处于孵化状态,即其 API 和配置界面的详细信息仍可能发生变化。请尝试使用并分享您的体验。
其他功能
除了这些关键功能之外,1.2.0.Alpha1 版本还带来了一些其他新功能:
-
MongoDB 连接器的新指标
NumberOfDisconnects和NumberOfPrimaryElections(DBZ-1859)。 -
SQL Server 连接器支持连接丢失后的自动重连(DBZ-1882)。
-
新的列屏蔽模式“一致性哈希”(DBZ-1692):Debezium 允许屏蔽特定的列值,例如为了满足数据隐私和保护方面的顾虑。使用新的“一致性哈希”模式,现在不仅可以使用星号作为屏蔽字符,还可以根据屏蔽数据内容基于哈希值。引用原始问题报告者的话说,“这对于[匿名化]数据很有用,但在这种情况下,它在不同主题之间仍然需要是相关的。这是数据仓库中一个典型的要求,您希望匿名化敏感数据,但仍然需要保持数据的引用完整性。”
-
在主键更新时允许链接更新变更事件(DBZ-1531):大多数关系型 Debezium 连接器通过使用旧键的删除事件和使用更新键的后续插入事件来表示记录主键的更新;使用新的记录头
__debezium.newkey和__debezium.oldkey,消费者在处理 MySQL 和 Postgres 连接器的数据变更时现在可以链接这些变更事件。 -
Debezium 容器镜像已升级到 Apache Kafka 2.4.1(DBZ-1925)。
Bug 修复
此外,还修复了许多 Bug,例如:
请参阅 发布说明 以获取已解决问题的完整列表以及从早期 Debezium 版本升级的步骤。我们还将关键的错误修复向后移植到 1.1 分支,并将于明天发布 Debezium 1.1.1。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。