您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

分区路由

默认情况下,当 Debezium 检测到数据集合中的更改时,它发出的更改事件会被发送到一个使用单个 Apache Kafka 分区的 topic。如自定义 Debezium 自动创建的 topic中所述,您可以自定义默认配置,以根据主键的哈希值将事件路由到多个分区。

但是,在某些情况下,您可能还希望 Debezium 将事件路由到特定的 topic 分区。分区路由 SMT 使您能够根据一个或多个指定负载字段的值将事件路由到特定的目标分区。要计算目标分区,Debezium 使用指定字段值的哈希值。

示例:基本配置

您在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定了以下参数:

partition.payload.fields

指定 SMT 用于计算目标分区的事件负载中的字段。您可以使用点表示法指定嵌套的负载字段。

partition.topic.num

指定目标 topic 的分区数。

partition.hash.function

指定用于字段哈希的哈希函数,这将确定目标分区的数量。

默认情况下,Debezium 将已配置数据集合的所有更改事件记录路由到一个 Apache Kafka topic。Connectors 不会将事件记录定向到 topic 中的特定分区。

要配置 Debezium 连接器将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting SMT。

例如,您可能需要在连接器配置中添加以下配置:

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

根据前面的配置,每当 SMT 收到一个 bound 到名称以 fulfillment 前缀开头的 topic 的消息时,它会将消息重定向到特定的 topic 分区。

SMT 从消息负载中 name 字段的值的哈希计算目标分区。通过指定 `allTopic` predicate,该配置选择性地应用 SMT。change 前缀是一个特殊关键字,它使 SMT 能够根据数据的 beforeafter 状态自动引用负载中的元素。如果指定字段在事件消息中不存在,SMT 将忽略它。如果消息中不存在任何字段,则转换将完全忽略事件消息,并将原始版本的消息传递给默认目标 topic。SMT 配置中由 topic.num 设置指定的分区数必须与 Kafka Connect 配置中指定的分区数匹配。例如,在前面的配置示例中,Kafka Connect 属性 topic.creation.default.partitions 指定的值与 SMT 配置中的 topic.num 值匹配。

给定这个 Products

表 1. Products 表

id

name (名称)

description

weight

101

scooter

Small 2-wheel scooter

3.14

102

car battery

12V car battery

8.1

103

12-pack drill bits

12-pack of drill bits with sizes ranging from #40 to #3

0.8

104

hammer

12oz carpenter’s hammer

0.75

105

hammer

14oz carpenter’s hammer

0.875

106

hammer

16oz carpenter’s hammer

1.0

107

rocks

box of assorted rocks

5.3

108

jacket

water resistent black wind breaker

0.1

109

spare tire

24 inch spare tire

22.2

根据配置,SMT 将 hammer 字段名称的记录的更改事件路由到同一个分区。也就是说,id 值为 104105106 的项被路由到同一个分区。

示例:高级配置

假设您想将两个数据集合(t1,t2)的事件路由到同一个 topic(例如,my_topic),并且您想使用字段 f1 对来自数据集合 t1 的事件进行分区,并使用字段 f2 对来自数据集合 t2 的事件进行分区。

您可以应用以下配置:

transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic

predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic

前面的配置没有指定如何重新路由事件以将它们发送到特定的目标 topic。有关如何将事件发送到其默认目标 topic 以外的 topic 的信息,请参阅Topic Routing SMT

从 Debezium ComputePartition SMT 迁移

Debezium ComputePartition SMT 已停用。以下部分中的信息描述了如何从 ComputePartition SMT 迁移到新的 PartitionRouting SMT。

假设配置为所有 topic 设置相同数量的分区,请将以下 ComputePartition 配置替换为 PartitionRouting SMT。以下示例提供了两种配置的比较。

示例:旧版 ComputePartition 配置
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...

用以下 PartitionRouting 配置替换前面的 ComputePartition。示例:替换早期 ComputePartition 配置的 PartitionRouting 配置

...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...

topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...

如果 SMT 发出的事件被发送到分区数不同的 topic,您必须为每个 topic 指定唯一的 partition.num.mappings 值。例如,在以下示例中,旧版 products 集合的 topic 配置为 3 个分区,orders 数据集合的 topic 配置为 2 个分区。

示例:设置不同 topic 的唯一分区值的旧版 ComputePartition 配置
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...

用以下 PartitionRouting 配置替换前面的 ComputePartition 配置:.为不同 topic 设置唯一 partition.topic.num 值的 PartitionRouting 配置

...
topic.prefix=fulfillment

transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products

transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products

predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...

配置选项

下表列出了您可以为分区路由 SMT 设置的配置选项。

表 2. 分区路由 SMT (PartitionRouting) 配置选项

属性

Default (默认值)

描述

指定 SMT 用于计算目标分区的事件负载中的字段。如果希望 SMT 将原始负载中的字段添加到输出数据结构的特定级别,请使用点表示法。要访问与数据集合相关的字段,您可以使用:afterbeforechange。'change' 字段是一个特殊字段,它会导致 SMT 根据操作类型自动填充 'after' 或 'before' 元素中的内容。如果记录中不存在指定的字段,SMT 将跳过它。例如:after.name,source.table,change.name

此 SMT 所作用的 topic 的分区数。使用 TopicNameMatches predicate 按 topic 过滤记录。

java

用于计算字段哈希的哈希函数,该哈希将确定目标分区的数量。可能的值为:

java - 标准 Java Object::hashCode 函数

murmur - 最新版本的 MurmurHash 函数,MurmurHash3

此配置是可选的。如果未指定或使用了无效值,将使用默认值。