消息过滤
默认情况下,Debezium 会将收到的所有数据变更事件都发送到 Kafka broker。但在许多情况下,您可能只对生产者发出的事件的一个子集感兴趣。为了使您能够仅处理与您相关的记录,Debezium 提供了 filter 单消息转换 (SMT)。
虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义编码 SMT 有其缺点。例如:
-
需要预先编译转换并将其部署到 Kafka Connect。
-
每次更改都需要重新编译和重新部署代码,导致操作不够灵活。
filter SMT 支持与 JSR 223 (Scripting for the Java™ Platform) 集成的脚本语言。目前,使用 Go 编写 SMT 的支持处于孵化阶段(TinyGo 和 WebAssembly)。
JSR 223
Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。
Go
Debezium 提供了一个社区维护的辅助 PDK (Plugin Development Kit),以方便与 Go 结合使用。
您可以通过输入以下命令来获取 Debezium SMT Go PDK:
go get github.com/debezium/debezium-smt-go-pdk
以下示例展示了您可能如何使用 Go 实现最小的过滤逻辑:
package main
import (
"github.com/debezium/debezium-smt-go-pdk"
)
//export process
func process(proxyPtr uint32) uint32 {
...
}
func main() {}
在前面的示例中,main 函数是 Wasm 编译目标所必需的。process 函数是执行实际过滤逻辑的入口点。
要将过滤器代码编译为 Wasm,请使用最新版本的 TinyGo,如下例所示:
docker run --rm \
-v ./:/src \
-w /src tinygo/tinygo:0.34.0 bash \
-c "tinygo build --no-debug -target=wasm-unknown -o /tmp/tmp.wasm myfilter.go && cat /tmp/tmp.wasm" > \
myfilter.wasm
有关使用 Go 开发 Debezium 单消息转换的更多信息,请参阅 PDK 存储库:Debezium SMT Go PDK。
设置
出于安全原因,filter SMT 不包含在 Debezium 连接器归档文件中。而是提供在一个单独的工件中,名为 debezium-scripting-3.3.1.Final.tar.gz。
要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 工件显式添加到您的 Kafka Connect 环境中。重要提示:一旦 filter SMT 在 Kafka Connect 实例中可用,任何被允许向实例添加连接器的用户都可以运行脚本表达式。为确保只有授权用户才能运行脚本表达式,请务必在添加 filter SMT 之前保护 Kafka Connect 实例及其配置界面。
在安装了 Kafka、Kafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:
-
下载 脚本 SMT 归档文件。
-
将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。
-
以下任一操作:
-
获取一个 JSR-223 脚本引擎实现,并将其内容添加到您的 Kafka Connect 环境的 Debezium 插件目录中。
-
使编译后的
.wasm文件在磁盘上可用。
-
-
重新启动您的 Kafka Connect 进程以加载新配置。
Groovy 语言需要在类路径上提供以下库:
-
groovy -
groovy-json(可选) -
groovy-jsr223
JavaScript 语言需要在类路径上提供以下库:
-
graalvm.js -
graalvm.js.scriptengine
示例:基本配置
您在 Debezium 连接器的 Kafka Connect 配置中配置 filter 转换。在配置中,您通过定义基于业务规则的过滤条件来指定您感兴趣的事件。当 filter SMT 处理事件流时,它会针对配置的过滤条件评估每个事件。只有满足过滤条件标准的事件才会被传递给 broker。
要配置 Debezium 连接器以过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 Filter SMT。filter SMT 的配置要求您指定一个定义过滤条件的正则表达式。
例如,您可能需要在连接器配置中添加以下配置:
...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...
前面的示例指定了使用 Groovy 表达式语言。正则表达式 value.op == 'u' && value.before.id == 2 删除了除表示 ID 为 2 的更新 (u) 记录之外的所有消息。
使用 Go 时
...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=wasm.chicory
transforms.filter.expression=file://myfilter.wasm
...
前面的示例展示了一个简单的 SMT 配置,用于处理仅包含 op 字段的 DML 事件。连接器可能发出的其他类型的消息(心跳消息、tombstone 消息或关于 schema 更改和事务的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT predicate 语句,以选择性地将转换应用于特定事件。
可在过滤器表达式中使用的变量
Debezium 将某些变量绑定到 filter SMT 的评估上下文中。当您创建表达式来指定过滤条件时,可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debezium 使 SMT 能够在评估表达式中的条件时查找并解释其值。
下表列出了 Debezium 绑定到 filter SMT 评估上下文的变量:
| 名称 | 描述 | Type |
|---|---|---|
|
消息的键。 |
|
|
消息的值。 |
|
|
消息键的 schema。 |
|
|
消息值的 schema。 |
|
|
目标主题的名称。 |
String |
|
消息头的 Java Map。键字段是头名称。
|
|
表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的过滤条件评估为 true 时,消息被保留。当过滤条件评估为 false 时,消息被删除。
表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
使用 SMT 的 topic.regex 配置选项。
特定语言的细节
您表达过滤条件的方式取决于您使用的脚本语言。
例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会删除除 ID 为 2 的更新记录之外的所有消息:
value.op == 'u' && value.before.id == 2
其他语言使用不同的方法来表达相同的条件。
|
Debezium MongoDB 连接器将 您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。 |
如果您使用 JavaScript 作为表达式语言,您可以调用 Struct#get() 方法来指定过滤条件,如下例所示:
value.get('op') == 'u' && value.get('before').get('id') == 2
如果您使用 JavaScript 和 Graal.js 来定义过滤条件,您可以使用与 Groovy 类似的方法。例如:
value.op == 'u' && value.before.id == 2
如果您使用 Go 和 TinyGo 编译器来定义过滤条件,您可以利用完全类型的 API 来惰性访问字段。例如:
var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
var beforeId = debezium.GetInt8(debezium.Get(proxyPtr, "value.before.id"))
return debezium.SetBool(op != "d" || beforeId != 2)
配置选项
下表列出了可与 filter SMT 一起使用的配置选项。
属性 |
Default (默认值) |
描述 |
一个可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 |
||
编写表达式的语言。对于 JSR223,必须以 |
||
要为每个消息评估的表达式。必须评估为布尔值,其中 |
||
|
||
|
指定转换如何处理
|