消息过滤

默认情况下,Debezium 会将收到的所有数据变更事件都发送到 Kafka broker。但在许多情况下,您可能只对生产者发出的事件的一个子集感兴趣。为了使您能够仅处理与您相关的记录,Debezium 提供了 filter 单消息转换 (SMT)。

虽然可以使用 Java 创建自定义 SMT 来编码过滤逻辑,但使用自定义编码 SMT 有其缺点。例如:

  • 需要预先编译转换并将其部署到 Kafka Connect。

  • 每次更改都需要重新编译和重新部署代码,导致操作不够灵活。

filter SMT 支持与 JSR 223 (Scripting for the Java™ Platform) 集成的脚本语言。目前,使用 Go 编写 SMT 的支持处于孵化阶段(TinyGoWebAssembly)。

JSR 223

Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。

Go

Debezium 提供了一个社区维护的辅助 PDK (Plugin Development Kit),以方便与 Go 结合使用。

您可以通过输入以下命令来获取 Debezium SMT Go PDK:

go get github.com/debezium/debezium-smt-go-pdk

以下示例展示了您可能如何使用 Go 实现最小的过滤逻辑:

package main

import (
	"github.com/debezium/debezium-smt-go-pdk"
)

//export process
func process(proxyPtr uint32) uint32 {
	...
}

func main() {}

在前面的示例中,main 函数是 Wasm 编译目标所必需的。process 函数是执行实际过滤逻辑的入口点。

要将过滤器代码编译为 Wasm,请使用最新版本的 TinyGo,如下例所示:

docker run --rm \
    -v ./:/src \
    -w /src tinygo/tinygo:0.34.0 bash \
    -c "tinygo build --no-debug -target=wasm-unknown -o /tmp/tmp.wasm myfilter.go && cat /tmp/tmp.wasm" > \
    myfilter.wasm

有关使用 Go 开发 Debezium 单消息转换的更多信息,请参阅 PDK 存储库:Debezium SMT Go PDK

设置

出于安全原因,filter SMT 不包含在 Debezium 连接器归档文件中。而是提供在一个单独的工件中,名为 debezium-scripting-3.3.1.Final.tar.gz

要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 工件显式添加到您的 Kafka Connect 环境中。重要提示:一旦 filter SMT 在 Kafka Connect 实例中可用,任何被允许向实例添加连接器的用户都可以运行脚本表达式。为确保只有授权用户才能运行脚本表达式,请务必在添加 filter SMT 之前保护 Kafka Connect 实例及其配置界面。

在安装了 KafkaKafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:

  1. 下载 脚本 SMT 归档文件

  2. 将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。

  3. 以下任一操作:

    1. 获取一个 JSR-223 脚本引擎实现,并将其内容添加到您的 Kafka Connect 环境的 Debezium 插件目录中。

    2. 使编译后的 .wasm 文件在磁盘上可用。

  4. 重新启动您的 Kafka Connect 进程以加载新配置。

Groovy 语言需要在类路径上提供以下库:

  • groovy

  • groovy-json (可选)

  • groovy-jsr223

JavaScript 语言需要在类路径上提供以下库:

  • graalvm.js

  • graalvm.js.scriptengine

示例:基本配置

您在 Debezium 连接器的 Kafka Connect 配置中配置 filter 转换。在配置中,您通过定义基于业务规则的过滤条件来指定您感兴趣的事件。当 filter SMT 处理事件流时,它会针对配置的过滤条件评估每个事件。只有满足过滤条件标准的事件才会被传递给 broker。

要配置 Debezium 连接器以过滤更改事件记录,请在 Debezium 连接器的 Kafka Connect 配置中配置 Filter SMT。filter SMT 的配置要求您指定一个定义过滤条件的正则表达式。

例如,您可能需要在连接器配置中添加以下配置:

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

前面的示例指定了使用 Groovy 表达式语言。正则表达式 value.op == 'u' && value.before.id == 2 删除了除表示 ID 为 2 的更新 (u) 记录之外的所有消息。

使用 Go 时

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=wasm.chicory
transforms.filter.expression=file://myfilter.wasm
...
自定义配置

前面的示例展示了一个简单的 SMT 配置,用于处理仅包含 op 字段的 DML 事件。连接器可能发出的其他类型的消息(心跳消息、tombstone 消息或关于 schema 更改和事务的元数据消息)不包含此字段。为避免处理失败,您可以定义 SMT predicate 语句,以选择性地将转换应用于特定事件

可在过滤器表达式中使用的变量

Debezium 将某些变量绑定到 filter SMT 的评估上下文中。当您创建表达式来指定过滤条件时,可以使用 Debezium 绑定到评估上下文的变量。通过绑定变量,Debezium 使 SMT 能够在评估表达式中的条件时查找并解释其值。

下表列出了 Debezium 绑定到 filter SMT 评估上下文的变量:

表 1. 过滤器表达式变量
名称 描述 Type

key

消息的键。

org.apache.kafka.connect​.data​.Struct

value

消息的值。

org.apache.kafka.connect.data​.Struct

keySchema

消息键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

String

header

消息头的 Java Map。键字段是头名称。header 变量公开以下属性:

  • value (类型为 Object)

  • schema (类型为 org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的过滤条件评估为 true 时,消息被保留。当过滤条件评估为 false 时,消息被删除。

表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

特定语言的细节

您表达过滤条件的方式取决于您使用的脚本语言。

例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式会删除除 ID 为 2 的更新记录之外的所有消息:

value.op == 'u' && value.before.id == 2

其他语言使用不同的方法来表达相同的条件。

Debezium MongoDB 连接器将 afterpatch 字段作为序列化的 JSON 文档而不是结构发出。
要将 filter SMT 与 MongoDB 连接器一起使用,您必须首先将 JSON 中的数组字段展开为单独的文档。
您可以通过应用 MongoDB ExtractNewDocumentState SMT 来实现此目的。

您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。
例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到类路径,然后添加一个类似 (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' 的表达式。

Javascript

如果您使用 JavaScript 作为表达式语言,您可以调用 Struct#get() 方法来指定过滤条件,如下例所示:

value.get('op') == 'u' && value.get('before').get('id') == 2
Javascript with Graal.js

如果您使用 JavaScript 和 Graal.js 来定义过滤条件,您可以使用与 Groovy 类似的方法。例如:

value.op == 'u' && value.before.id == 2
Go with TinyGo

如果您使用 Go 和 TinyGo 编译器来定义过滤条件,您可以利用完全类型的 API 来惰性访问字段。例如:

var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
var beforeId = debezium.GetInt8(debezium.Get(proxyPtr, "value.before.id"))

return debezium.SetBool(op != "d" || beforeId != 2)

配置选项

下表列出了可与 filter SMT 一起使用的配置选项。

表 2. filter SMT 配置选项

属性

Default (默认值)

描述

一个可选的正则表达式,用于评估事件的目标主题名称,以确定是否应用过滤逻辑。如果目标主题的名称与 topic.regex 中的值匹配,则转换在将事件传递到主题之前应用过滤逻辑。如果主题名称与 topic.regex 中的值不匹配,则 SMT 会将事件 unmodified 地传递到主题。

编写表达式的语言。对于 JSR223,必须以 jsr223. 开头,例如 jsr223.groovyjsr223.graal.js。Debezium 支持通过 JSR 223 API ("Scripting for the Java ™ Platform") 进行引导。对于基于 Go 的过滤器,它应该是 wasm.chicorywasm.chicory-interpreter

要为每个消息评估的表达式。必须评估为布尔值,其中 true 的结果保留消息,false 的结果删除消息。

wasm 表达式可供评估每个消息的目录位置。Go 函数必须评估为布尔值,其中 true 的结果保留消息,false 的结果删除消息。

keep

指定转换如何处理 null (tombstone) 消息。您可以指定以下选项之一:

keep

(默认) 直通消息。

drop

完全删除消息。

evaluate

将过滤条件应用于消息。