您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

基于内容的路由

默认情况下,Debezium 将其从表中读取的所有更改事件流式传输到一个静态主题。但是,在某些情况下,您可能希望根据事件内容将选定的事件重新路由到其他主题。基于其内容路由消息的过程在 基于内容的路由 消息传递模式中进行了描述。要在 Debezium 中应用此模式,您可以使用基于内容的路由 单个消息转换 (SMT) 来编写针对每个事件进行评估的表达式。根据事件的评估方式,SMT 会将事件消息路由到原始目标主题,或者将其重新路由到您在表达式中指定的主题。

虽然可以使用 Java 创建自定义 SMT 来编码路由逻辑,但使用自定义编码的 SMT 有其缺点。例如

  • 需要预先编译转换并将其部署到 Kafka Connect。

  • 每次更改都需要重新编译和重新部署代码,导致操作不够灵活。

基于内容的路由 SMT 支持与 JSR 223 (Java™ 平台的脚本) 集成的脚本语言。

Debezium 不自带任何 JSR 223 API 的实现。要使用表达式语言与 Debezium 结合使用,您必须下载该语言的 JSR 223 脚本引擎实现。例如,对于 Groovy 3,您可以从 https://groovy-lang.cn/ 下载其 JSR 223 实现。GraalVM JavaScript 的 JSR223 实现可在 https://github.com/graalvm/graaljs 获取。获取脚本引擎文件后,将其添加到 Debezium 连接器插件目录中,以及语言实现使用的任何其他 JAR 文件。

设置

出于安全原因,基于内容的路由 SMT 不包含在 Debezium 连接器存档中。而是将其包含在一个单独的工件 debezium-scripting-3.3.0.Final.tar.gz 中。

要将基于内容的路由 SMT 与 Debezium 连接器插件一起使用,您必须将 SMT 工件显式添加到 Kafka Connect 环境中。重要提示:在路由 SMT 出现在 Kafka Connect 实例后,任何允许向该实例添加连接器的用户都可以运行脚本表达式。为确保只有授权用户可以运行脚本表达式,请在添加路由 SMT 之前确保 Kafka Connect 实例及其配置界面得到安全保护。

在安装了 KafkaKafka Connect 以及一个或多个 Debezium 连接器后,安装 filter SMT 的剩余任务包括:

  1. 下载 脚本 SMT 存档

  2. 将归档文件的内容解压到您的 Kafka Connect 环境的 Debezium 插件目录中。

  3. 获取 JSR-223 脚本引擎实现,并将其内容添加到 Kafka Connect 环境的 Debezium 插件目录中。

  4. 重新启动您的 Kafka Connect 进程以加载新 JAR 文件。

Groovy 语言需要在类路径上提供以下库:

  • groovy

  • groovy-json (可选)

  • groovy-jsr223

JavaScript 语言需要在类路径上提供以下库:

  • graalvm.js

  • graalvm.js.scriptengine

示例:基本配置

要配置 Debezium 连接器以根据事件内容路由更改事件记录,您需要在连接器的 Kafka Connect 配置中配置 ContentBasedRouter SMT。

基于内容的路由 SMT 的配置要求您指定一个定义过滤条件的正则表达式。在配置中,您创建一个定义路由条件的正则表达式。该表达式定义了一个用于评估事件记录的模式。它还指定了匹配模式的事件将被路由到的目标主题的名称。您指定的模式可能指定一个事件类型,例如表插入、更新或删除操作。您还可以定义一个匹配特定列或行中值的模式。

例如,要将所有更新 (u) 记录重新路由到一个 updates 主题,您可以在连接器配置中添加以下配置:

...
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.op == 'u' ? 'updates' : null
...

前面的示例指定了使用 Groovy 表达式语言。

不匹配模式的记录将被路由到默认主题。

自定义配置

前面的示例显示了一个简单的 SMT 配置,旨在仅处理 DML 事件,这些事件包含一个 op 字段。连接器可能发出的其他类型的消息(心跳消息、墓碑消息或关于事务或模式更改的元数据消息)不包含此字段。为避免处理失败,您可以定义 一个 SMT 谓词语句,以选择性地将转换应用于特定事件

用于基于内容的路由表达式的变量

Debezium 将某些变量绑定到 SMT 的评估上下文中。当您创建用于指定控制路由目标的条件的表达式时,SMT 可以查找和解释这些变量的值来评估表达式中的条件。

下表列出了 Debezium 绑定到基于内容的路由 SMT 评估上下文的变量:

表 1. 基于内容的路由表达式变量
名称 描述 Type

key

消息的键。

org.apache.kafka.connect​.data​.Struct

value

消息的值。

org.apache.kafka.connect​.data​.Struct

keySchema

消息键的 schema。

org.apache.kafka.connect​.data​.Schema

valueSchema

消息值的 schema。

org.apache.kafka.connect​.data​.Schema

topic

目标主题的名称。

String

header

消息头的 Java Map。键是头名称。headers 变量公开了以下属性:

  • value (类型为 Object)

  • schema (类型为 org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String,​ io.debezium​.transforms​.scripting​.RecordHeader>

表达式可以调用其变量上的任意方法。表达式应解析为布尔值,该值决定 SMT 如何处理消息。当表达式中的路由条件评估为 true 时,消息将被保留。当路由条件评估为 false 时,消息将被删除。

表达式不应产生任何副作用。也就是说,它们不应修改它们传递的任何变量。

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

特定语言的细节

您表达基于内容的路由条件的方式取决于您使用的脚本语言。例如,如 基本配置示例 所示,当您使用 Groovy 作为表达式语言时,以下表达式将所有更新 (u) 记录重新路由到 updates 主题,同时将其他记录路由到默认主题:

value.op == 'u' ? 'updates' : null

其他语言使用不同的方法来表达相同的条件。

Debezium MongoDB 连接器将 afterpatch 字段作为序列化的 JSON 文档而不是结构发出。
要将 ContentBasedRouting SMT 与 MongoDB 连接器一起使用,您必须先将 JSON 中的数组字段展开为单独的文档。
您可以通过应用 MongoDB ExtractNewDocumentState SMT 来实现此目的。

您也可以采取使用 JSON 解析器在表达式中生成每个数组项的单独输出文档的方法。
例如,如果您使用 Groovy 作为表达式语言,请将 groovy-json 工件添加到类路径,然后添加一个类似 (new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' 的表达式。

Javascript

当您使用 JavaScript 作为表达式语言时,您可以调用 Struct#get() 方法来指定基于内容的路由条件,如下例所示:

value.get('op') == 'u' ? 'updates' : null
Javascript with Graal.js

当您使用 Graal.js 通过 JavaScript 创建基于内容的路由条件时,您可以使用与 Groovy 类似的方法。例如:

value.op == 'u' ? 'updates' : null
Go with TinyGo

当您使用 TinyGo 编译器通过 Go 创建基于内容的路由条件时,您可以利用完全类型化的 API 来惰性访问字段。例如:

var value = debezium.Get(proxyPtr, "value")
if !debezium.IsNull(value) {
    var op = debezium.GetString(debezium.Get(proxyPtr, "value.op"))
    if op == "u" {
        return debezium.SetString("updates")
    }
}
return debezium.SetNull()

配置选项

属性

Default (默认值)

描述

无默认值

一个可选的正则表达式,用于评估目标主题的名称,以确定是否应用条件逻辑。如果目标主题的名称与 topic.regex 中的值匹配,则转换会在将事件传递到主题之前应用条件逻辑。如果主题名称与 topic.regex 中的值不匹配,则 SMT 会在不修改事件的情况下将其传递到主题。

无默认值

编写表达式的语言。对于 JSR223,在值前面加上 jsr223.,例如 jsr223.groovyjsr223.graal.js。Debezium 支持通过 JSR 223 API ("Java™ 平台的脚本") 进行引导。对于基于 Go 的过滤器,请指定 wasm.chicorywasm.chicory-interpreter

无默认值

要为每个消息评估的表达式。必须计算为 String 值,其中非 null 结果会将消息重新路由到新主题,而 null 值会将消息路由到默认主题。对于 Go,此属性的值指定了 wasm 表达式可用于评估每个消息的文件系统位置。Go 函数必须评估为 String 值或 Null。

keep

指定转换如何处理 null (tombstone) 消息。您可以指定以下选项之一:

keep

(默认) 直通消息。

drop

完全删除消息。

evaluate

将条件逻辑应用于消息。