基于内容的路由
默认情况下,Debezium 将其从表中读取的所有更改事件流式传输到一个静态主题。但是,在某些情况下,您可能希望根据事件内容将选定的事件路由到其他主题。基于其内容路由消息的过程在 内容路由 消息模式中有描述。要在 Debezium 中应用此模式,您可以使用基于内容的路由 单消息转换 (SMT) 来编写为每个事件求值的表达式。根据事件的求值方式,SMT 会将事件消息路由到原始目标主题,或将其路由到您在表达式中指定的主题。
虽然可以使用 Java 创建自定义 SMT 来编码路由逻辑,但使用自定义编码的 SMT 有其缺点。例如:
-
需要预先编译转换并将其部署到 Kafka Connect。
-
每次更改都需要重新编译和重新部署代码,导致操作不够灵活。
基于内容的路由 SMT 支持与 JSR 223 (Java™ 平台脚本) 集成的脚本语言。
Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。
设置
出于安全原因,基于内容的路由 SMT 未包含在 Debezium 连接器存档中。而是将其提供在一个单独的工件 debezium-scripting-3.3.1.Final.tar.gz 中。
要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 工件显式添加到您的 Kafka Connect 环境。重要提示:路由 SMT 存在于 Kafka Connect 实例中后,任何允许向该实例添加连接器的用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请确保在添加路由 SMT 之前保护 Kafka Connect 实例及其配置界面。
在安装了 Kafka、Kafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:
-
下载 脚本 SMT 存档
-
将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。
-
获取 JSR-223 脚本引擎实现,并将其内容添加到您 Kafka Connect 环境的 Debezium 插件目录中。
-
重新启动您的 Kafka Connect 进程以加载新 JAR 文件。
Groovy 语言需要在类路径上提供以下库:
-
groovy -
groovy-json(可选) -
groovy-jsr223
JavaScript 语言需要在类路径上提供以下库:
-
graalvm.js -
graalvm.js.scriptengine
示例:基本配置
要配置 Debezium 连接器以根据事件内容路由更改事件记录,您需要在连接器的 Kafka Connect 配置中配置 ContentBasedRouter SMT。
基于内容的路由 SMT 的配置要求您指定一个定义过滤条件的正则表达式。在配置中,您创建一个定义路由条件的正则表达式。该表达式定义了一个用于评估事件记录的模式。它还指定了目标主题的名称,以将匹配模式的事件路由到该主题。您指定的模式可能指定事件类型,例如表插入、更新或删除操作。您还可以定义一个匹配特定列或行中值的模式。
例如,要将所有更新 (u) 记录路由到 updates 主题,您可能需要在连接器配置中添加以下配置:
...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...
前面的示例指定使用了 Groovy 表达式语言。
不匹配模式的记录将被路由到默认主题。
前面的示例显示了一个简单的 SMT 配置,该配置旨在仅处理 DML 事件(包含 op 字段)。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或关于事务或模式更改的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT 谓词语句,以选择性地将转换应用于特定事件。
用于基于内容的路由表达式的变量
Debezium 将某些变量绑定到 SMT 的求值上下文中。当您创建表达式来指定控制路由目标的条件时,SMT 可以查找并解释这些变量的值来评估表达式中的条件。
下表列出了 Debezium 绑定到基于内容的路由 SMT 的求值上下文的变量。
| 名称 | 描述 | Type |
|---|---|---|
|
消息的键。 |
|
|
消息的值。 |
|
|
消息键的 schema。 |
|
|
消息值的 schema。 |
|
|
目标主题的名称。 |
String |
|
消息头的 Java Map。键字段是头名称。
|
|
表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的路由条件求值为 true 时,消息将被保留。当路由条件求值为 false 时,消息将被删除。
表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
使用 SMT 的
topic.regex配置选项。
特定语言的细节
您表达基于内容路由条件的方式取决于您使用的脚本语言。例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会将所有更新 (u) 记录路由到 updates 主题,而将其他记录路由到默认主题:
value.op == 'u' ? 'updates' : null
其他语言使用不同的方法来表达相同的条件。
|
Debezium MongoDB 连接器将 您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。 |
当您使用 JavaScript 作为表达式语言时,您可以调用 Struct#get() 方法来指定基于内容的路由条件,如下例所示:
value.get('op') == 'u' ? 'updates' : null
当您使用 JavaScript 和 Graal.js 创建基于内容的路由条件时,您可以使用与 Groovy 类似的方法。例如:
value.op == 'u' ? 'updates' : null
当您使用 TinyGo 编译器使用 Go 创建基于内容的路由条件时,您可以利用完全类型的 API 来延迟访问字段。例如:
var value = debezium.Get(proxyPtr, "value")
if !debezium.IsNull(value) {
var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
if op == "u" {
return debezium.SetString("updates")
}
}
return debezium.SetNull()
配置选项
属性 |
Default (默认值) |
描述 |
无默认值 |
一个可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用条件逻辑。如果目标主题的名称与 |
|
无默认值 |
编写表达式的语言。对于 JSR223,在值前面加上 |
|
无默认值 |
要为每个消息求值的表达式。必须解析为 |
|
|
指定转换如何处理
|