消息过滤
默认情况下,Debezium 会将接收到的每个数据更改事件都传递给 Kafka 代理。然而,在许多情况下,您可能只对生产者发出的事件子集感兴趣。为了使您能够仅处理与您相关的记录,Debezium 提供了 *filter* 单消息转换 (SMT)。
虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义编码的 SMT 有其缺点。例如:
-
需要预先编译转换并将其部署到 Kafka Connect。
-
每次更改都需要重新编译和重新部署代码,导致操作不够灵活。
filter SMT 支持与 JSR 223(Java™ 平台的脚本)集成的脚本语言。目前正在孵化对使用 Go 编写 SMT 的支持(TinyGo 和 WebAssembly)。
JSR 223
Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。
Go
Debezium 提供了一个社区维护的辅助 PDK (Plugin Development Kit) 以方便与 Go 集成。
您可以通过输入以下命令来获取 Debezium SMT Go PDK:
go get github.com/debezium/debezium-smt-go-pdk
以下示例展示了如何使用 Go 来实现最小化的过滤逻辑:
package main
import (
"github.com/debezium/debezium-smt-go-pdk"
)
//export process
func process(proxyPtr uint32) uint32 {
...
}
func main() {}
在前面的示例中,`main` 函数是 Wasm 编译目标所必需的。`process` 函数是执行实际过滤逻辑的入口点。
要将过滤器代码编译为 Wasm,请使用较新版本的 TinyGo,如下例所示:
docker run --rm \
-v ./:/src \
-w /src tinygo/tinygo:0.34.0 bash \
-c "tinygo build --no-debug -target=wasm-unknown -o /tmp/tmp.wasm myfilter.go && cat /tmp/tmp.wasm" > \
myfilter.wasm
有关使用 Go 开发 Debezium 单消息转换的更多信息,请参阅 PDK 仓库:Debezium SMT Go PDK。
设置
出于安全原因,filter SMT 未包含在 Debezium 连接器归档文件中。相反,它包含在一个单独的构件 `debezium-scripting-3.3.0.Final.tar.gz` 中。
要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 构件显式添加到您的 Kafka Connect 环境中。重要提示:在 filter SMT 存在于 Kafka Connect 实例之后,任何被允许向该实例添加连接器的用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请务必在添加 filter SMT 之前保护 Kafka Connect 实例及其配置界面。
在安装了 Kafka、Kafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:
-
将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。
-
要么
-
获取一个 JSR-223 脚本引擎实现,并将其内容添加到您的 Kafka Connect 环境的 Debezium 插件目录中。
-
将编译后的 `.wasm` 文件放在磁盘上可访问的位置。
-
-
重启您的 Kafka Connect 进程以加载新配置。
Groovy 语言需要在类路径上提供以下库:
-
groovy -
groovy-json(可选) -
groovy-jsr223
JavaScript 语言需要在类路径上提供以下库:
-
graalvm.js -
graalvm.js.scriptengine
示例:基本配置
您可以在 Debezium 连接器的 Kafka Connect 配置中配置 filter 转换。在配置中,您通过定义基于业务规则的过滤条件来指定您感兴趣的事件。当 filter SMT 处理事件流时,它会根据配置的过滤条件评估每个事件。只有满足过滤条件标准的事件才会被传递到代理。
要将 Debezium 连接器配置为过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 `Filter` SMT。filter SMT 的配置需要您指定一个定义过滤条件的正则表达式。
例如,您可能需要在连接器配置中添加以下配置:
...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...
前面的示例指定了使用 `Groovy` 表达式语言。正则表达式 `value.op == 'u' && value.before.id == 2` 会移除所有消息,除了那些代表更新 (u) 记录且 id 值等于 2 的消息。
使用 Go 时
...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=wasm.chicory
transforms.filter.expression=file://myfilter.wasm
...
前面的示例显示了一个简单的 SMT 配置,用于处理仅包含 `op` 字段的 DML 事件。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或有关 schema 更改和事务的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT 谓词语句,选择性地将转换应用于特定事件。
用于过滤器表达式的变量
Debezium 将某些变量绑定到 filter SMT 的评估上下文中。当您创建表达式来指定过滤条件时,可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debezium 使 SMT 能够在评估表达式中的条件时查找并解释它们的值。
下表列出了 Debezium 绑定到 filter SMT 评估上下文的变量:
| 名称 | 描述 | Type |
|---|---|---|
|
消息的键。 |
|
|
消息的值。 |
|
|
消息键的 schema。 |
|
|
消息值的 schema。 |
|
|
目标主题的名称。 |
String |
|
消息头的 Java 映射。键字段是头名称。`header` 变量公开以下属性:
|
|
表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的过滤条件求值为 true 时,消息将被保留。当过滤条件求值为 false 时,消息将被删除。
表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
使用 SMT 的 `topic.regex` 配置选项。
特定语言的细节
您表达过滤条件的方式取决于您使用的脚本语言。
例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会移除所有消息,除了那些 `id` 值设置为 2 的更新记录:
value.op == 'u' && value.before.id == 2
其他语言使用不同的方法来表达相同的条件。
|
Debezium MongoDB 连接器将 您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。 |
如果您使用 JavaScript 作为表达式语言,您可以调用 `Struct#get()` 方法来指定过滤条件,如下例所示:
value.get('op') == 'u' && value.get('before').get('id') == 2
如果您使用 JavaScript 和 Graal.js 来定义过滤条件,您可以使用与使用 Groovy 类似的方法。例如:
value.op == 'u' && value.before.id == 2
如果您使用 Go 和 TinyGo 编译器来定义过滤条件,您可以利用完全类型的 API 来延迟访问字段。例如:
var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
var beforeId = debezium.GetInt8(debezium.Get(proxyPtr, "value.before.id"))
return debezium.SetBool(op != "d" || beforeId != 2)
配置选项
下表列出了您可以与 filter SMT 一起使用的配置选项。
属性 |
Default (默认值) |
描述 |
一个可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 `topic.regex` 中的值匹配,则转换会在将事件传递到主题之前应用过滤逻辑。如果主题的名称与 `topic.regex` 中的值不匹配,则 SMT 会未经修改地将事件传递到主题。 |
||
编写表达式的语言。对于 JSR223,必须以 `jsr223.` 开头,例如 `jsr223.groovy` 或 `jsr223.graal.js`。Debezium 支持通过 JSR 223 API(“Java™ 平台的脚本”)进行引导。对于基于 Go 的过滤器,它应该是 `wasm.chicory` 或 `wasm.chicory-interpreter`。 |
||
要为每条消息求值的表达式。必须计算为布尔值,其中结果为 |
||
`wasm` 表达式可供评估每条消息的本地文件系统位置。Go 函数必须计算为布尔值,其中结果为 |
||
|
指定转换如何处理
|