分区路由
默认情况下,当 Debezium 检测到数据集合中的更改时,它发出的更改事件会被发送到一个使用单个 Apache Kafka 分区的 topic。如 自定义 Debezium 自动创建的 topic 中所述,您可以自定义默认配置,将事件路由到多个分区,基于主键的哈希值。
但是,在某些情况下,您可能还希望 Debezium 将事件路由到特定的 topic 分区。分区路由 SMT 使您能够根据一个或多个指定载荷字段的值,将事件路由到特定的目标分区。为了计算目标分区,Debezium 使用指定字段值的哈希值。
示例:基本配置
您在 Debezium 连接器的 Kafka Connect 配置中配置分区路由转换。配置指定了以下参数:
partition.payload.fields
-
指定 SMT 用于计算目标分区的事件载荷中的字段。您可以使用点符号指定嵌套的载荷字段。
partition.topic.num
-
指定目标 topic 中的分区数量。
partition.hash.function
-
指定用于字段哈希的哈希函数,该哈希函数将确定目标分区的数量。
默认情况下,Debezium 将配置好的数据集合的所有更改事件记录路由到一个 Apache Kafka topic。Connectors 不会将事件记录定向到 topic 的特定分区。
要配置 Debezium 连接器将事件路由到特定分区,请在 Debezium 连接器的 Kafka Connect 配置中配置 PartitionRouting
SMT。
例如,您可能需要在连接器配置中添加以下配置:
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
基于前面的配置,每当 SMT 收到一个目标 topic 名称以 fulfillment
前缀开头的消息时,它就会将消息重定向到特定的 topic 分区。
SMT 从消息载荷中 name
字段值的哈希值计算目标分区。通过指定 `allTopic` predicate,该配置选择性地应用 SMT。change
前缀是一个特殊关键字,它使 SMT 能够根据数据的 before
或 after
状态自动引用载荷中的元素。如果指定的字段在事件消息中不存在,SMT 将忽略它。如果没有任何字段存在于消息中,那么转换将完全忽略事件消息,并将消息的原始版本传递给默认目标 topic。SMT 配置中 topic.num
设置指定的分区数量必须与 Kafka Connect 配置中指定的分区数量相匹配。例如,在前面的配置示例中,Kafka Connect 属性 topic.creation.default.partitions
指定的值与 SMT 配置中的 topic.num
值相匹配。
给定这个 Products
表
id |
name (名称) |
description |
weight |
101 |
scooter |
Small 2-wheel scooter |
3.14 |
102 |
car battery |
12V car battery |
8.1 |
103 |
12-pack drill bits |
12-pack of drill bits with sizes ranging from #40 to #3 |
0.8 |
104 |
hammer |
12oz carpenter’s hammer |
0.75 |
105 |
hammer |
14oz carpenter’s hammer |
0.875 |
106 |
hammer |
16oz carpenter’s hammer |
1.0 |
107 |
rocks |
box of assorted rocks |
5.3 |
108 |
jacket |
water resistent black wind breaker |
0.1 |
109 |
spare tire |
24 inch spare tire |
22.2 |
根据配置,SMT 将字段名称为 hammer
的记录的更改事件路由到同一分区。也就是说,id
值为 104
、105
和 106
的项目被路由到同一分区。
示例:高级配置
假设您想将来自两个数据集合(t1,t2)的事件路由到同一个 topic(例如,my_topic),并且您想通过字段 f1 对来自数据集合 t1 的事件进行分区,通过字段 f2 对来自数据集合 t2 的事件进行分区。
您可以应用以下配置:
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.f1,change.f2
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=myTopic
predicates=myTopic
predicates.myTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.myTopic.pattern=my_topic
上述配置没有指定如何重新路由事件以将它们发送到特定的目标 topic。有关如何将事件发送到其默认目标 topic 之外的 topic 的信息,请参阅 Topic Routing SMT。
从 Debezium ComputePartition SMT 迁移
Debezium ComputePartition
SMT 已停用。下一节中的信息描述了如何从 ComputePartition
SMT 迁移到新的 PartitionRouting
SMT。
假设配置为所有 topic 设置了相同数量的分区,请将以下 ComputePartition
配置替换为 PartitionRouting
SMT。以下示例提供了两种配置的比较。
ComputePartition
配置...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:2,inventory.orders:2
...
将前面的 ComputePartition
替换为以下 PartitionRouting
配置:示例:替换先前 ComputePartition
配置的 PartitionRouting
配置
...
topic.creation.default.partitions=2
topic.creation.default.replication.factor=1
...
topic.prefix=fulfillment
transforms=PartitionRouting
transforms.PartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.PartitionRouting.partition.payload.fields=change.name,change.purchaser
transforms.PartitionRouting.partition.topic.num=2
transforms.PartitionRouting.predicate=allTopic
predicates=allTopic
predicates.allTopic.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.allTopic.pattern=fulfillment.*
...
如果 SMT 将事件发送到具有不同分区数量的 topic,您必须为每个 topic 指定唯一的 partition.num.mappings
值。例如,在以下示例中,旧版 products
集合的 topic 配置为 3 个分区,orders
数据集合的 topic 配置为 2 个分区。
ComputePartition
配置...
topic.prefix=fulfillment
transforms=ComputePartition
transforms.ComputePartition.type=io.debezium.transforms.partitions.ComputePartition
transforms.ComputePartition.partition.data-collections.field.mappings=inventory.products:name,inventory.orders:purchaser
transforms.ComputePartition.partition.data-collections.partition.num.mappings=inventory.products:3,inventory.orders:2
...
将前面的 ComputePartition
配置替换为以下 PartitionRouting
配置:. 为不同 topic 设置唯一 partition.topic.num
值的 PartitionRouting
配置
...
topic.prefix=fulfillment
transforms=ProductsPartitionRouting,OrdersPartitionRouting
transforms.ProductsPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.ProductsPartitionRouting.partition.payload.fields=change.name
transforms.ProductsPartitionRouting.partition.topic.num=3
transforms.ProductsPartitionRouting.predicate=products
transforms.OrdersPartitionRouting.type=io.debezium.transforms.partitions.PartitionRouting
transforms.OrdersPartitionRouting.partition.payload.fields=change.purchaser
transforms.OrdersPartitionRouting.partition.topic.num=2
transforms.OrdersPartitionRouting.predicate=products
predicates=products,orders
predicates.products.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.products.pattern=fulfillment.inventory.products
predicates.orders.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.orders.pattern=fulfillment.inventory.orders
...
配置选项
下表列出了您可以为分区路由 SMT 设置的配置选项。
属性 |
Default (默认值) |
描述 |
指定 SMT 用于计算目标分区的事件载荷中的字段。如果希望 SMT 将字段从原始载荷添加到输出数据结构的特定级别,请使用点符号。要访问与数据集合相关的字段,您可以使用: |
||
此 SMT 作用的 topic 的分区数量。使用 |
||
|
计算字段哈希时使用的哈希函数,该哈希函数将确定目标分区的数量。可能的值包括: |