主题路由
每个包含数据更改事件的 Kafka 记录都有一个默认的目标主题。如果您需要,可以在记录到达 Kafka Connect 转换器之前,将记录重新路由到您指定的主题。为此,Debezium 提供了主题路由单消息转换 (SMT)。在 Debezium 连接器的 Kafka Connect 配置中配置此转换。配置选项使您可以指定以下内容:
-
用于标识要重新路由的记录的表达式
-
解析为目标主题的表达式
-
如何确保要重新路由到目标主题的记录之间的唯一键
您需要自己确保转换配置提供您想要的行为。Debezium 不会验证转换配置所产生的行为。
主题路由转换是一个 Kafka Connect SMT。
用例
默认行为是 Debezium 连接器将每个更改事件记录发送到一个主题,该主题的名称由数据库名称和更改发生的表名称组成。换句话说,一个主题接收一个物理表的记录。当您希望一个主题接收多个物理表的记录时,您必须配置 Debezium 连接器将记录重新路由到该主题。
逻辑表是将多个物理表的记录路由到一个主题的常见用例。在逻辑表中,有多个具有相同模式的物理表。例如,分片表具有相同的模式。一个逻辑表可能包含两个或多个分片表:db_shard1.my_table
和 db_shard2.my_table
。这些表位于不同的分片中,物理上是独立的,但它们共同构成了一个逻辑表。您可以将任何分片中的表上的更改事件记录重新路由到同一个主题。
当 Debezium PostgreSQL 连接器捕获分区表中的更改时,默认行为是为每个分区将更改事件记录路由到一个不同的主题。要将所有分区的记录发送到一个主题,请配置主题路由 SMT。由于分区表中的每个键都保证是唯一的,请配置 key.enforce.uniqueness=false
,这样 SMT 就不会添加一个键字段来确保唯一键。添加键字段是默认行为。
示例
要将多个物理表的更改事件记录路由到同一个主题,请在 Debezium 连接器的 Kafka Connect 配置中配置主题路由转换。主题路由 SMT 的配置要求您指定正则表达式,这些正则表达式确定:
-
要路由记录的表。这些表必须具有相同的模式。
-
目标主题名称。
以下示例中的连接器配置为主题路由 SMT 设置了几个选项:
transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
topic.regex
-
指定一个转换应用于每个更改事件记录以确定它是否应路由到特定主题的正则表达式。
在示例中,正则表达式
(.*)customers_shard(.*)
匹配包含customers_shard
字符串的表名称的记录。这将重新路由以下名称的表的记录:myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
topic.replacement
-
指定一个表示目标主题名称的正则表达式。转换会将每个匹配的记录路由到此表达式标识的主题。在此示例中,上述三个分片表的记录将被路由到
myserver.mydb.customers_all_shards
主题。 schema.name.adjustment.mode
-
指定如何调整从结果主题名称派生的消息键模式名称,以与连接器使用的消息转换器兼容。该值可以是
none
(默认) 或avro
。
要自定义配置,您可以定义 SMT 谓词语句,该语句指定您希望转换处理或不处理的表。如果您配置 SMT 以路由匹配正则表达式的表,但又不希望 SMT 重新路由其中一个匹配该表达式的表,那么谓词可能会很有用。
确保唯一键
Debezium 更改事件键使用构成表主键的表列。要将多个物理表的记录路由到一个主题,事件键必须在所有这些表中是唯一的。但是,每个物理表都有一个仅在该表内唯一的主键是可能的。例如,myserver.mydb.customers_shard1
表中的一行可能与 myserver.mydb.customers_shard2
表中的一行具有相同的键值。
为确保每个事件键在具有相同主题的更改事件记录的表之间是唯一的,主题路由转换会将一个字段插入到更改事件键中。默认情况下,插入字段的名称是 __dbz__physicalTableIdentifier
。插入字段的值是默认的目标主题名称。
如果需要,您可以配置主题路由转换以在键中插入不同的字段。为此,请指定 key.field.name
选项并将其设置为一个不与现有主键字段名称冲突的字段名称。例如:
transforms=Reroute
transforms.Reroute.type=io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex=(.*)customers_shard(.*)
transforms.Reroute.topic.replacement=$1customers_all_shards
transforms.Reroute.key.field.name=shard_id
此示例将 shard_id
字段添加到路由记录的键结构中。
如果您想调整键的新字段的值,请同时配置这两个选项:
key.field.regex
-
指定一个转换应用于默认目标主题名称以捕获一个或多个字符组的正则表达式。
key.field.replacement
-
指定一个正则表达式,用于根据捕获的组确定插入键字段的值。
For example (例如:)
transforms.Reroute.key.field.regex=(.*)customers_shard(.*)
transforms.Reroute.key.field.replacement=$2
通过此配置,假设默认目标主题名称是:
myserver.mydb.customers_shard1
myserver.mydb.customers_shard2
myserver.mydb.customers_shard3
转换使用第二个捕获组(分片号)中的值作为键的新字段的值。在此示例中,插入的键字段的值将是 1
、2
或 3
。
如果您的表包含全局唯一键并且您不需要更改键结构,您可以将 key.enforce.uniqueness
选项设置为 false
。
...
transforms.Reroute.key.enforce.uniqueness=false
...
选择性应用主题路由转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。
您可以使用以下方法之一来配置连接器以选择性地应用 SMT:
-
使用 SMT 的
topic.regex
配置选项。
配置选项
下表描述了主题路由 SMT 的配置选项。
Option | Default (默认值) | 描述 |
---|---|---|
指定一个转换应用于每个更改事件记录以确定它是否应路由到特定主题的正则表达式。 |
||
指定一个表示目标主题名称的正则表达式。转换会将每个匹配的记录路由到此表达式标识的主题。此表达式可以引用您为 |
||
|
指示是否将字段添加到记录的更改事件键中。添加键字段可确保每个事件键在具有相同主题的更改事件记录的表之间是唯一的。这有助于防止具有相同键但源自不同源表的记录之间的更改事件冲突。 |
|
|
要添加到更改事件键的字段名称。此字段的值标识原始表名。SMT 要添加此字段, |
|
指定一个转换应用于默认目标主题名称以捕获一个或多个字符组的正则表达式。SMT 要应用此表达式, |
||
指定一个正则表达式,用于根据 |
||
none |
指定如何调整从结果主题名称派生的消息键模式名称,以与连接器使用的消息转换器兼容,包括: |
|
|
用于保持 LRUCache 中最大条目的大小。缓存将保留逻辑表键和值的旧/新模式,还缓存派生的键和主题 regex 结果,以改进源记录转换。 |