选择性应用转换
当您为连接器配置单个消息转换 (SMT) 时,可以为转换定义谓词。谓词指定如何有条件地将转换应用于连接器处理的消息子集。您可以将谓词分配给为源连接器(如 Debezium)或接收器连接器配置的转换。
SMT 谓词
Debezium 提供了一些单个消息转换 (SMT),您可以在 Kafka Connect 将事件记录保存到 Kafka 主题之前使用它们来修改事件记录。默认情况下,当您为 Debezium 连接器配置这些 SMT 之一时,Kafka Connect 会将该转换应用于连接器发出的每条记录。但是,有时您可能希望选择性地应用转换,使其仅修改具有共同特征的变化事件消息的子集。
例如,对于 Debezium 连接器,您可能只想在来自特定表或包含特定标头键的事件消息上运行转换。在运行 Apache Kafka 2.6 或更高版本的环境中,您可以向转换追加一个谓词语句,指示 Kafka Connect 仅将 SMT 应用于某些记录。在谓词中,您指定一个 Kafka Connect 用于评估其处理的每条消息的条件。当 Debezium 连接器发出一个变化事件消息时,Kafka Connect 会将消息与配置的谓词条件进行比较。如果该条件对事件消息为 true,Kafka Connect 会应用转换,然后将消息写入 Kafka 主题。不匹配条件的邮件将保持不变。
对于为接收器连接器 SMT 定义的谓词,情况类似。连接器从 Kafka 主题读取消息,Kafka Connect 会根据谓词条件评估消息。如果消息与条件匹配,Kafka Connect 将应用转换,然后将消息传递给接收器连接器。
定义谓词后,您可以重用它并将其应用于多个转换。谓词还包含一个 negate 选项,您可以使用它来反转谓词,以便仅将谓词条件应用于不匹配谓词语句中定义的条件的记录。您可以使用 negate 选项将谓词与其他基于条件取反的转换配对。
谓词包含以下元素:
-
predicates前缀 -
别名(例如,
isOutboxTable) -
类型(例如,
org.apache.kafka.connect.transforms.predicates.TopicNameMatches)。Kafka Connect 提供了一组默认谓词类型,您可以通过定义自己的自定义谓词来补充。 -
条件语句和任何额外的配置属性,取决于谓词的类型(例如,正则表达式命名模式)
以下谓词类型默认可用:
- HasHeaderKey
-
指定事件消息中您希望 Kafka Connect 评估的标头中的键名。对于包含具有指定名称的标头键的任何记录,谓词评估为 true。
- RecordIsTombstone
-
匹配 Kafka 墓碑记录。对于值为
null的任何记录,谓词评估为true。将此谓词与过滤器 SMT 结合使用以删除墓碑记录。此谓词没有配置参数。Kafka 中的墓碑是键为 0 字节、值为空的记录。当 Debezium 连接器处理源数据库中的删除操作时,连接器会为该删除操作发出两个更改事件:
-
一个删除操作(
"op" : "d")事件,提供数据库记录的先前值。 -
一个墓碑事件,具有相同的键,但值为
null。墓碑代表行的删除标记。当为 Kafka 启用 日志压缩时,在压缩过程中,Kafka 会删除与墓碑具有相同键的所有事件。日志压缩会定期发生,压缩间隔由主题的
delete.retention.ms设置控制。虽然可以 配置 Debezium 以不发出墓碑事件,但最好允许 Debezium 发出墓碑以保持日志压缩期间的预期行为。抑制墓碑会阻止 Kafka 在日志压缩期间删除已删除键的记录。如果您的环境包含无法处理墓碑的接收器连接器,您可以将接收器连接器配置为使用带有
RecordIsTombstone谓词的 SMT 来过滤掉墓碑记录。
-
- TopicNameMatches
-
一个正则表达式,指定您希望 Kafka Connect 匹配的主题名称。对于主题名称与指定正则表达式匹配的连接器记录,谓词为 true。使用此谓词根据源表的名称将 SMT 应用于记录。
定义 SMT 谓词
配置 Kafka Connect 谓词类似于配置转换。您指定一个谓词别名,将该别名与转换关联,然后定义谓词的类型和配置。
-
Debezium 环境运行 Apache Kafka 2.6 或更高版本。
-
SMT 为 Debezium 连接器配置。
-
在 Debezium 连接器配置中,为
predicates参数指定一个谓词别名,例如IsOutboxTable。 -
通过在连接器配置中将谓词别名附加到转换别名,将谓词别名与您想有条件应用的转换关联起来。
transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>
For example (例如:)
transforms.outbox.predicate=IsOutboxTable -
通过指定其类型并为配置参数提供值来配置谓词。
-
对于类型,指定以下默认类型之一,这些类型在 Kafka Connect 中可用:
-
HasHeaderKey
-
RecordIsTombstone
-
TopicNameMatches
For example (例如:)
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
-
-
对于 TopicNameMatch 或
HasHeaderKey谓词,为要匹配的主题或标头名称指定一个正则表达式。For example (例如:)
predicates.IsOutboxTable.pattern=outbox.event.*
-
-
如果您想反转条件,请将
negate关键字附加到转换别名并将其设置为true。For example (例如:)
transforms.outbox.negate=true前面的属性会反转谓词匹配的记录集,以便 Kafka Connect 将转换应用于不满足谓词中指定的条件的任何记录。
以下示例显示了一个 Debezium 连接器配置,该配置仅将 outbox 事件路由转换应用于 Debezium 发送到 Kafka outbox.event.order 主题的消息。
由于 TopicNameMatch 谓词仅对来自 outbox 表(outbox.event.*)的消息求值为 *true*,因此该转换不会应用于源自数据库中其他表的任何消息。
transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*
忽略墓碑事件
您可以控制 Debezium 是否发出墓碑事件以及 Kafka 将它们保留多长时间。根据您的数据管道,您可能希望为连接器设置 tombstones.on.delete 属性,以便 Debezium 不发出墓碑事件。
是否启用 Debezium 发出墓碑取决于您的环境中主题的消费方式以及接收器消费者的特征。一些接收器连接器依赖墓碑事件来从下游数据存储中删除记录。在接收器连接器依赖墓碑记录来指示何时在下游数据存储中删除记录的情况下,请将 Debezium 配置为发出它们。
当您将 Debezium 配置为生成墓碑时,需要进一步的配置以确保接收器连接器接收墓碑事件。必须为主题设置保留策略,以便连接器有时间在 Kafka 在日志压缩期间删除它们之前读取事件消息。主题在压缩之前保留墓碑的时间长度由主题的 delete.retention.ms 属性控制。
默认情况下,连接器的 tombstones.on.delete 属性设置为 true,因此连接器在每次删除事件后都会生成一个墓碑。如果您将该属性设置为 false 以阻止 Debezium 将墓碑记录保存到 Kafka 主题,则缺少墓碑记录可能会导致意外后果。Kafka 在日志压缩期间依赖墓碑来删除与已删除键相关的记录。
如果您需要支持无法处理值为 null 的记录的接收器连接器或下游 Kafka 消费者,而不是阻止 Debezium 发出墓碑,请考虑为连接器配置一个 SMT,该 SMT 使用 RecordIsTombstone 谓词类型来在消费者读取它们之前删除墓碑消息。
-
要阻止 Debezium 为已删除的数据库记录发出墓碑事件,请将连接器选项
tombstones.on.delete设置为false。For example (例如:)
“tombstones.on.delete”: “false”