选择性地应用转换

当您为连接器配置单个消息转换 (SMT) 时,您可以为该转换定义一个谓词。该谓词指定如何有条件地将转换应用于连接器处理的消息子集。您可以将谓词分配给您为源连接器(如 Debezium)或接收器连接器配置的转换。

SMT 谓词

Debezium 提供了一些单个消息转换 (SMT),您可以使用它们在 Kafka Connect 将记录保存到 Kafka 主题之前修改事件记录。默认情况下,当您为 Debezium 连接器配置这些 SMT 之一时,Kafka Connect 会将该转换应用于连接器发出的每条记录。但是,在某些情况下,您可能希望有选择性地应用转换,以便它只修改具有共同特征的更改事件消息子集。

例如,对于 Debezium 连接器,您可能只想在来自特定表或包含特定标头键的事件消息上运行转换。在运行 Apache Kafka 2.6 或更高版本的环境中,您可以向转换附加一个谓词语句,以指示 Kafka Connect 仅将 SMT 应用于某些记录。在谓词中,您指定一个条件,Kafka Connect 使用该条件来评估它处理的每条消息。当 Debezium 连接器发出更改事件消息时,Kafka Connect 会将消息与配置的谓词条件进行比较。如果条件对事件消息为 true,Kafka Connect 会应用转换,然后将消息写入 Kafka 主题。不满足条件的的消息将保持不变地发送到 Kafka。

对于您为接收器连接器 SMT 定义的谓词,情况也类似。连接器从 Kafka 主题读取消息,Kafka Connect 会将消息与谓词条件进行比较。如果消息与条件匹配,Kafka Connect 会应用转换,然后将消息传递给接收器连接器。

定义谓词后,您可以重用它并将其应用于多个转换。谓词还包括一个 negate 选项,您可以使用它来反转谓词,以便谓词条件仅应用于匹配谓词语句中定义的条件的记录。您可以使用 negate 选项将谓词与其他基于条件反转的转换配对。

谓词元素

谓词包含以下元素

  • predicates 前缀

  • 别名 (例如,isOutboxTable)

  • 类型 (例如,org.apache.kafka.connect.transforms.predicates.TopicNameMatches)。Kafka Connect 提供了一组默认谓词类型,您可以通过定义自己的自定义谓词来补充。

  • 条件语句和任何其他配置属性,具体取决于谓词的类型 (例如,正则表达式命名模式)

默认谓词类型

以下谓词类型默认可用

HasHeaderKey

指定事件消息中要让 Kafka Connect 评估的标头中的键名。对于包含具有指定名称的标头键的任何记录,该谓词都评估为 true。

RecordIsTombstone

匹配 Kafka 墓碑记录。对于值为 null 的任何记录,该谓词都评估为 true。将此谓词与过滤器 SMT 结合使用以删除墓碑记录。此谓词没有配置参数。

Kafka 中的墓碑是具有 0 字节、null 载荷的键的记录。当 Debezium 连接器处理源数据库中的删除操作时,连接器会为该删除操作发出两个更改事件

  • 一个删除操作 ("op" : "d") 事件,提供数据库记录的先前值。

  • 一个具有相同键但值为 null 的墓碑事件。

    墓碑代表行的删除标记。当为 Kafka 启用 日志压缩时,在压缩过程中,Kafka 会删除所有与墓碑具有相同键的事件。日志压缩会定期发生,压缩间隔由主题的 delete.retention.ms 设置控制。

    尽管有可能 配置 Debezium 以便它不发出墓碑事件,但最好允许 Debezium 发出墓碑事件以保持日志压缩期间的预期行为。抑制墓碑会阻止 Kafka 在日志压缩期间删除已删除键的记录。如果您的环境包含无法处理墓碑的接收器连接器,您可以将接收器连接器配置为使用带有 RecordIsTombstone 谓词的 SMT 来过滤掉墓碑记录。

TopicNameMatches

一个正则表达式,用于指定您希望 Kafka Connect 匹配的主题名称。对于主题名称与指定正则表达式匹配的连接器记录,该谓词为 true。使用此谓词根据源表的名称将 SMT 应用于记录。

定义 SMT 谓词

配置 Kafka Connect 谓词与配置转换类似。您指定一个谓词别名,将该别名与一个转换关联,然后定义谓词的类型和配置。

先决条件
  • Debezium 环境运行 Apache Kafka 2.6 或更高版本。

  • SMT 是为 Debezium 连接器配置的。

过程
  1. 在 Debezium 连接器配置中,为 predicates 参数指定一个谓词别名,例如 IsOutboxTable

  2. 通过将谓词别名附加到连接器配置中的转换别名,将谓词别名与您想要有条件应用它的转换关联起来

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    For example (例如:)

    transforms.outbox.predicate=IsOutboxTable
  3. 通过指定其类型并为配置参数提供值来配置谓词。

    1. 对于类型,指定以下默认类型之一,这些类型在 Kafka Connect 中可用

      • HasHeaderKey

      • RecordIsTombstone

      • TopicNameMatches

        For example (例如:)

        predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. 对于 TopicNameMatch 或 HasHeaderKey 谓词,为要匹配的主题或标头名称指定一个正则表达式。

      For example (例如:)

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. 如果要否定条件,请将 negate 关键字附加到转换别名并将其设置为 true

    For example (例如:)

    transforms.outbox.negate=true

    前面的属性会反转谓词匹配的记录集,以便 Kafka Connect 将转换应用于任何不满足谓词中指定的条件的记录。

示例:outbox 事件路由器转换的 TopicNameMatch 谓词

以下示例显示了一个 Debezium 连接器配置,该配置仅将 outbox 事件路由器转换应用于 Debezium 发送到 Kafka outbox.event.order 主题的消息。

因为 TopicNameMatch 谓词仅对来自 outbox 表 (outbox.event.*) 的消息求值为true,所以该转换不会应用于源自数据库中其他表的那些消息。

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*

忽略墓碑事件

您可以控制 Debezium 是否发出墓碑事件,以及 Kafka 保留它们多长时间。根据您的数据管道,您可能希望为连接器设置 tombstones.on.delete 属性,以便 Debezium 不发出墓碑事件。

是否启用 Debezium 发出墓碑事件取决于您的环境中主题的消耗方式以及接收器消费者的特性。一些接收器连接器依赖墓碑事件来从下游数据存储中删除记录。在接收器连接器依赖墓碑记录来指示何时删除下游数据存储中的记录的情况下,请将 Debezium 配置为发出它们。

当您将 Debezium 配置为生成墓碑时,还需要进一步的配置以确保接收器连接器能够收到墓碑事件。必须为主题设置保留策略,以便连接器有时间读取事件消息,然后再被 Kafka 在日志压缩过程中删除。主题在压缩前保留墓碑的时间长度由主题的 delete.retention.ms 属性控制。

默认情况下,连接器的 tombstones.on.delete 属性设置为 true,以便连接器在每个删除事件后生成一个墓碑。如果您将该属性设置为 false 以防止 Debezium 将墓碑记录保存到 Kafka 主题,则墓碑记录的缺失可能会导致意外后果。Kafka 在日志压缩期间依赖墓碑来删除与已删除键相关的记录。

如果您需要支持无法处理值为 null 的记录的接收器连接器或下游 Kafka 消费者,而不是阻止 Debezium 发出墓碑事件,请考虑为连接器配置一个 SMT,该 SMT 使用带有 RecordIsTombstone 谓词类型的谓词,在消费者读取它们之前删除墓碑消息。

过程
  • 要防止 Debezium 为已删除的数据库记录发出墓碑事件,请将连接器选项 tombstones.on.delete 设置为 false

    For example (例如:)

    “tombstones.on.delete”: “false”