分区路由

默认情况下,当 Debezium 检测到数据集合中的更改时,它发出的更改事件会被发送到一个使用单个 Apache Kafka 分区的 topic。如 自定义 Debezium 自动创建的 topic 中所述,您可以自定义默认配置,将事件路由到多个分区,基于主键的哈希值。

但是,在某些情况下,您可能还希望 Debezium 将事件路由到特定的 topic 分区。分区路由 SMT 使您能够根据一个或多个指定载荷字段的值,将事件路由到特定的目标分区。为了计算目标分区,Debezium 使用指定字段值的哈希值。

示例:基本配置

您在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定了以下参数:

partition.payload.fields

指定 SMT 用于计算目标分区的事件载荷中的字段。您可以使用点符号指定嵌套的载荷字段。

partition.topic.num

指定目标 topic 中的分区数量。

partition.hash.function

指定用于字段哈希的哈希函数,该哈希函数将确定目标分区的数量。

默认情况下,Debezium 将配置好的数据集合的所有更改事件记录路由到一个 Apache Kafka topic。Connectors 不会将事件记录定向到 topic 的特定分区。

要配置 Debezium 连接器将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting SMT。

例如,您可能需要在连接器配置中添加以下配置:

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

基于前面的配置,每当 SMT 收到一个目标 topic 名称以 fulfillment 前缀开头的消息时,它就会将消息重定向到特定的 topic 分区。

SMT 从消息载荷中 name 字段值的哈希值计算目标分区。通过指定 `allTopic` predicate,该配置选择性地应用 SMT。change 前缀是一个特殊关键字,它使 SMT 能够根据数据的 beforeafter 状态自动引用载荷中的元素。如果指定的字段在事件消息中不存在,SMT 将忽略它。如果没有任何字段存在于消息中,那么转换将完全忽略事件消息,并将消息的原始版本传递给默认目标 topic。SMT 配置中 topic.num 设置指定的分区数量必须与 Kafka Connect 配置中指定的分区数量相匹配。例如,在前面的配置示例中,Kafka Connect 属性 topic.creation.default.partitions 指定的值与 SMT 配置中的 topic.num 值相匹配。

给定这个 Products

表 1. Products 表

id

name (名称)

description

weight

101

scooter

Small 2-wheel scooter

3.14

102

car battery

12V car battery

8.1

103

12-pack drill bits

12-pack of drill bits with sizes ranging from #40 to #3

0.8

104

hammer

12oz carpenter’s hammer

0.75

105

hammer

14oz carpenter’s hammer

0.875

106

hammer

16oz carpenter’s hammer

1.0

107

rocks

box of assorted rocks

5.3

108

jacket

water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

根据配置,SMT 将字段名称为 hammer 的记录的更改事件路由到同一分区。也就是说,id 值为 104105106 的项目被路由到同一分区。

示例:高级配置

假设您想将来自两个数据集合(t1,t2)的事件路由到同一个 topic(例如,my_topic),并且您想通过字段 f1 对来自数据集合 t1 的事件进行分区,通过字段 f2 对来自数据集合 t2 的事件进行分区。

您可以应用以下配置:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

上述配置没有指定如何重新路由事件以将它们发送到特定的目标 topic。有关如何将事件发送到其默认目标 topic 之外的 topic 的信息,请参阅 Topic Routing SMT

从 Debezium ComputePartition SMT 迁移

Debezium ComputePartition SMT 已停用。下一节中的信息描述了如何从 ComputePartition SMT 迁移到新的 PartitionRouting SMT。

假设配置为所有 topic 设置了相同数量的分区,请将以下 ComputePartition 配置替换为 PartitionRouting SMT。以下示例提供了两种配置的比较。

示例:旧版 ComputePartition 配置
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

将前面的 ComputePartition 替换为以下 PartitionRouting 配置:示例:替换先前 ComputePartition 配置的 PartitionRouting 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

如果 SMT 将事件发送到具有不同分区数量的 topic,您必须为每个 topic 指定唯一的 partition.num.mappings 值。例如,在以下示例中,旧版 products 集合的 topic 配置为 3 个分区,orders 数据集合的 topic 配置为 2 个分区。

示例:为不同 topic 设置唯一分区值的旧版 ComputePartition 配置
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

将前面的 ComputePartition 配置替换为以下 PartitionRouting 配置:. 为不同 topic 设置唯一 partition.topic.num 值的 PartitionRouting 配置

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

配置选项

下表列出了您可以为分区路由 SMT 设置的配置选项。

表 2. 分区路由 SMT (PartitionRouting) 配置选项

属性

Default (默认值)

描述

指定 SMT 用于计算目标分区的事件载荷中的字段。如果希望 SMT 将字段从原始载荷添加到输出数据结构的特定级别,请使用点符号。要访问与数据集合相关的字段,您可以使用:afterbeforechange。'change' 字段是一个特殊字段,它会导致 SMT 根据操作类型自动填充 'after' 或 'before' 元素的内容。如果记录中不存在指定的字段,SMT 将跳过它。例如:after.name,source.table,change.name

此 SMT 作用的 topic 的分区数量。使用 TopicNameMatches predicate 按 topic 过滤记录。

java

计算字段哈希时使用的哈希函数,该哈希函数将确定目标分区的数量。可能的值包括:

java - 标准 Java Object::hashCode 函数

murmur - 最新版本的 MurmurHash 函数,MurmurHash3

此配置是可选的。如果未指定或使用了无效值,将使用默认值。