您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

消息过滤

默认情况下,Debezium 会将接收到的每个数据更改事件都传递给 Kafka 代理。然而,在许多情况下,您可能只对生产者发出的事件子集感兴趣。为了使您能够仅处理与您相关的记录,Debezium 提供了 *filter* 单消息转换 (SMT)。

虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义编码的 SMT 有其缺点。例如:

  • 需要预先编译转换并将其部署到 Kafka Connect。

  • 每次更改都需要重新编译和重新部署代码,导致操作不够灵活。

filter SMT 支持与 JSR 223(Java™ 平台的脚本)集成的脚本语言。目前正在孵化对使用 Go 编写 SMT 的支持(TinyGoWebAssembly)。

JSR 223

Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。

Go

Debezium 提供了一个社区维护的辅助 PDK (Plugin Development Kit) 以方便与 Go 集成。

您可以通过输入以下命令来获取 Debezium SMT Go PDK:

go get github.com/debezium/debezium-smt-go-pdk

以下示例展示了如何使用 Go 来实现最小化的过滤逻辑:

package main

import (
	"github.com/debezium/debezium-smt-go-pdk"
)

//export process
func process(proxyPtr uint32) uint32 {
	...
}

func main() {}

在前面的示例中,`main` 函数是 Wasm 编译目标所必需的。`process` 函数是执行实际过滤逻辑的入口点。

要将过滤器代码编译为 Wasm,请使用较新版本的 TinyGo,如下例所示:

docker run --rm \
    -v ./:/src \
    -w /src tinygo/tinygo:0.34.0 bash \
    -c "tinygo build --no-debug -target=wasm-unknown -o /tmp/tmp.wasm myfilter.go && cat /tmp/tmp.wasm" > \
    myfilter.wasm

有关使用 Go 开发 Debezium 单消息转换的更多信息,请参阅 PDK 仓库:Debezium SMT Go PDK

设置

出于安全原因,filter SMT 未包含在 Debezium 连接器归档文件中。相反,它包含在一个单独的构件 `debezium-scripting-3.3.0.Final.tar.gz` 中。

要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 构件显式添加到您的 Kafka Connect 环境中。重要提示:在 filter SMT 存在于 Kafka Connect 实例之后,任何被允许向该实例添加连接器的用户都可以运行脚本表达式。为确保脚本表达式只能由授权用户运行,请务必在添加 filter SMT 之前保护 Kafka Connect 实例及其配置界面。

在安装了 KafkaKafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:

  1. 下载 scripting SMT archive

  2. 将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。

  3. 要么

    1. 获取一个 JSR-223 脚本引擎实现,并将其内容添加到您的 Kafka Connect 环境的 Debezium 插件目录中。

    2. 将编译后的 `.wasm` 文件放在磁盘上可访问的位置。

  4. 重启您的 Kafka Connect 进程以加载新配置。

Groovy 语言需要在类路径上提供以下库:

  • groovy

  • groovy-json (可选)

  • groovy-jsr223

JavaScript 语言需要在类路径上提供以下库:

  • graalvm.js

  • graalvm.js.scriptengine

示例:基本配置

您可以在 Debezium 连接器的 Kafka Connect 配置中配置 filter 转换。在配置中,您通过定义基于业务规则的过滤条件来指定您感兴趣的事件。当 filter SMT 处理事件流时,它会根据配置的过滤条件评估每个事件。只有满足过滤条件标准的事件才会被传递到代理。

要将 Debezium 连接器配置为过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 `Filter` SMT。filter SMT 的配置需要您指定一个定义过滤条件的正则表达式。

例如,您可能需要在连接器配置中添加以下配置:

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

前面的示例指定了使用 `Groovy` 表达式语言。正则表达式 `value.op == 'u' && value.before.id == 2` 会移除所有消息,除了那些代表更新 (u) 记录且 id 值等于 2 的消息。

使用 Go 时

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=wasm.chicory
transforms.filter.expression=file://myfilter.wasm
...
自定义配置

前面的示例显示了一个简单的 SMT 配置,用于处理仅包含 `op` 字段的 DML 事件。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或有关 schema 更改和事务的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT 谓词语句,选择性地将转换应用于特定事件

用于过滤器表达式的变量

Debezium 将某些变量绑定到 filter SMT 的评估上下文中。当您创建表达式来指定过滤条件时,可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debezium 使 SMT 能够在评估表达式中的条件时查找并解释它们的值。

下表列出了 Debezium 绑定到 filter SMT 评估上下文的变量:

表 1. 过滤器表达式变量
名称 描述 Type

key

消息的键。

org.apache.kafka.connect​.data​.Struct

value

消息的值。

org.apache.kafka.connect.data​.Struct

keySchema

消息键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

String

header

消息头的 Java 映射。键字段是头名称。`header` 变量公开以下属性:

  • value (类型为 Object)

  • schema (类型为 org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的过滤条件求值为 true 时,消息将被保留。当过滤条件求值为 false 时,消息将被删除。

表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

特定语言的细节

您表达过滤条件的方式取决于您使用的脚本语言。

例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会移除所有消息,除了那些 `id` 值设置为 2 的更新记录:

value.op == 'u' && value.before.id == 2

其他语言使用不同的方法来表达相同的条件。

Debezium MongoDB 连接器将 afterpatch 字段作为序列化的 JSON 文档而不是结构发出。
要将 filter SMT 与 MongoDB 连接器一起使用,您必须首先将 JSON 中的数组字段展开为单独的文档。
您可以通过应用 MongoDB ExtractNewDocumentState SMT 来实现此目的。

您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。
例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到类路径,然后添加一个类似 (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' 的表达式。

Javascript

如果您使用 JavaScript 作为表达式语言,您可以调用 `Struct#get()` 方法来指定过滤条件,如下例所示:

value.get('op') == 'u' && value.get('before').get('id') == 2
Javascript with Graal.js

如果您使用 JavaScript 和 Graal.js 来定义过滤条件,您可以使用与使用 Groovy 类似的方法。例如:

value.op == 'u' && value.before.id == 2
Go with TinyGo

如果您使用 Go 和 TinyGo 编译器来定义过滤条件,您可以利用完全类型的 API 来延迟访问字段。例如:

var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
var beforeId = debezium.GetInt8(debezium.Get(proxyPtr, "value.before.id"))

return debezium.SetBool(op != "d" || beforeId != 2)

配置选项

下表列出了您可以与 filter SMT 一起使用的配置选项。

表 2. filter SMT 配置选项

属性

Default (默认值)

描述

一个可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 `topic.regex` 中的值匹配,则转换会在将事件传递到主题之前应用过滤逻辑。如果主题的名称与 `topic.regex` 中的值不匹配,则 SMT 会未经修改地将事件传递到主题。

编写表达式的语言。对于 JSR223,必须以 `jsr223.` 开头,例如 `jsr223.groovy` 或 `jsr223.graal.js`。Debezium 支持通过 JSR 223 API(“Java™ 平台的脚本”)进行引导。对于基于 Go 的过滤器,它应该是 `wasm.chicory` 或 `wasm.chicory-interpreter`。

要为每条消息求值的表达式。必须计算为布尔值,其中结果为 true 会保留消息,结果为 false 会移除消息。

`wasm` 表达式可供评估每条消息的本地文件系统位置。Go 函数必须计算为布尔值,其中结果为 true 会保留消息,结果为 false 会移除消息。

keep

指定转换如何处理 null (tombstone) 消息。您可以指定以下选项之一:

keep

(默认) 直通消息。

drop

完全删除消息。

evaluate

将过滤条件应用于消息。