自定义 Kafka Connect 自动创建主题
Kafka 提供了两种自动创建主题的机制。您可以启用 Kafka Broker 的自动主题创建,并且从 Kafka 2.6.0 开始,您还可以启用 Kafka Connect 来创建主题。Kafka Broker 使用 auto.create.topics.enable 属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable 属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用了自动主题创建。
当启用自动主题创建时,如果 Debezium 源连接器发出一个表的变更事件记录,而该表尚未存在目标主题,则在事件记录被摄入 Kafka 时,将在运行时创建该主题。
Broker 创建的主题仅限于共享单个默认配置。Broker 无法为不同的主题或主题集应用唯一的配置。相比之下,Kafka Connect 在创建主题时可以应用多种配置中的任何一种,根据 Debezium 连接器配置中指定的设置副本因子、分区数和其他特定主题的设置。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组关联起来。
Broker 配置和 Kafka Connect 配置是相互独立的。无论您是在 Broker 端禁用主题创建,Kafka Connect 都可以创建主题。如果您同时启用了 Broker 端和 Kafka Connect 中的自动主题创建,Connect 配置将优先,并且只有当 Kafka Connect 配置中的所有设置都不适用时,Broker 才会创建主题。
禁用 Kafka Broker 的自动主题创建
默认情况下,Kafka Broker 配置会启用 Broker 在主题不存在时在运行时创建主题。Broker 创建的主题无法使用自定义属性进行配置。如果您使用的是 2.6.0 之前的 Kafka 版本,并且希望使用特定配置创建主题,则必须在 Broker 端禁用自动主题创建,然后显式创建主题,无论是手动创建还是通过自定义部署流程。
-
在 Broker 配置中,将
auto.create.topics.enable的值设置为false。
设置 Kafka Connect
Kafka Connect 中的自动主题创建由 topic.creation.enable 属性控制。该属性的默认值为 true,启用自动主题创建,如下例所示。
topic.creation.enable = true
topic.creation.enable 属性的设置适用于 Connect 集群中的所有 worker。
Kafka Connect 自动主题创建要求您定义 Kafka Connect 在创建主题时应用的配置属性。您可以在 Debezium 连接器配置中通过定义主题组,然后指定要应用于每个组的属性来指定主题配置属性。连接器配置定义了一个默认主题创建组,以及可选的一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定适用于该组设置的主题。
默认情况下,Kafka Connect 创建的主题名称遵循 server.schema.table 的模式,例如 dbserver.myschema.inventory。
|
如果您不想允许 Kafka Connect 进行自动主题创建,请在 Kafka Connect 配置(connect-distributed.properties 文件或使用 Debezium 的 Kafka Connect 容器镜像时通过环境变量 CONNECT_TOPIC_CREATION_ENABLE)中将 |
|
Kafka Connect 自动主题创建要求至少为 |
配置
为了让 Kafka Connect 自动创建主题,它需要来自源连接器关于创建主题时要应用的配置属性的信息。您在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 创建连接器发出的事件记录的主题时,生成的这些主题将从适用的组获取其配置。此配置仅适用于该连接器发出的事件记录。
主题创建组
一组主题属性与一个主题创建组相关联。最低限度,您必须定义一个 default 主题创建组并指定其配置属性。除此之外,您还可以选择性地定义一个或多个自定义主题创建组并为每个组指定唯一的属性。
当您创建自定义主题创建组时,您基于主题名称模式定义每个组的成员主题。您可以指定描述要包含或排除在每个组中的主题的命名模式。include 和 exclude 属性包含逗号分隔的正则表达式列表,这些列表定义了主题名称模式。例如,如果您希望一个组包含所有以字符串 dbserver1.inventory 开头的主题,请将其 topic.creation.inventory.include 属性的值设置为 dbserver1\\.inventory\\.*。
|
如果您为自定义主题组同时指定了 |
主题创建组配置属性
default 主题创建组和每个自定义组都关联着一组唯一的配置属性。您可以配置一个组以包含任何 Kafka 主题级别配置属性。例如,您可以为主题组指定 旧主题段的清理策略、保留时间,或 主题压缩类型。您必须至少定义一组属性来描述要创建的主题的配置。
如果没有注册自定义组,或者任何已注册组的 include 模式不匹配要创建的任何主题的名称,那么 Kafka Connect 将使用 default 组的配置来创建主题。
有关通用主题配置的注意事项,请参阅 Debezium 安装指南中的 配置 Debezium 主题。
默认组配置
在使用 Kafka Connect 自动主题创建之前,您必须创建一个默认主题创建组并为其定义配置。默认主题创建组的配置将应用于名称不匹配自定义主题创建组的 include 列表模式的所有主题。
-
要为
topic.creation.default组定义属性,请将它们添加到连接器配置 JSON 中,如下例所示。{ ... "topic.creation.default.replication.factor": 3, (1) "topic.creation.default.partitions": 10, (2) "topic.creation.default.cleanup.policy": "compact", (3) "topic.creation.default.compression.type": "lz4" (4) ... }您可以在
default组的配置中包含任何 Kafka 主题级别配置属性。
| Item | 描述 |
|---|---|
1 |
|
2 |
|
3 |
|
4 |
|
|
自定义组仅在必需的 |
自定义组配置
您可以定义多个自定义主题组,每个组都有自己的配置。
-
要定义一个自定义主题组,请将
topic.creation.<group_name>.include属性添加到连接器 JSON 中,并在组名之后列出自定义组的属性。以下示例展示了
inventory和applicationlogs自定义主题创建组的示例配置。{ ... (1) "topic.creation.inventory.include": "dbserver1\\.inventory\\.*", (2) "topic.creation.inventory.partitions": 20, "topic.creation.inventory.cleanup.policy": "compact", "topic.creation.inventory.delete.retention.ms": 7776000000, (3) "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*", (4) "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*", (5) "topic.creation.applicationlogs.replication.factor": 1, "topic.creation.applicationlogs.partitions": 20, "topic.creation.applicationlogs.cleanup.policy": "delete", "topic.creation.applicationlogs.retention.ms": 7776000000, "topic.creation.applicationlogs.compression.type": "lz4", ... }
| Item | 描述 |
|---|---|
1 |
定义 |
2 |
|
3 |
定义 |
4 |
|
5 |
|
注册自定义组
在指定了任何自定义主题创建组的配置后,请注册这些组。
-
通过将
topic.creation.groups属性添加到连接器 JSON 中,并指定一个逗号分隔的组列表来注册自定义组。以下示例注册了自定义主题创建组
inventory和applicationlogs。{ ... "topic.creation.groups": "inventory,applicationlogs", ... }
以下示例显示了一个完整的配置,其中包含 default 主题组的配置,以及 inventory 和 applicationlogs 自定义主题创建组的配置。
{
...
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"topic.creation.groups": "inventory,applicationlogs",
"topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
"topic.creation.inventory.partitions": 20,
"topic.creation.inventory.cleanup.policy": "compact",
"topic.creation.inventory.delete.retention.ms": 7776000000,
"topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
"topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
"topic.creation.applicationlogs.replication.factor": 1,
"topic.creation.applicationlogs.partitions": 20,
"topic.creation.applicationlogs.cleanup.policy": "delete",
"topic.creation.applicationlogs.retention.ms": 7776000000,
"topic.creation.applicationlogs.compression.type": "lz4"
}
附加资源
有关主题自动创建的更多信息,您可以查阅以下资源:
-
Debezium 博客:自动创建 Debezium 变更数据主题
-
关于向 Kafka Connect 添加主题自动创建的 Kafka 改进提案:KIP-158 Kafka Connect 应允许源连接器为新主题设置特定主题的设置