自定义 Kafka Connect 自动创建主题

Kafka 提供了两种自动创建主题的机制。您可以启用 Kafka Broker 的自动主题创建,并且从 Kafka 2.6.0 开始,您还可以启用 Kafka Connect 来创建主题。Kafka Broker 使用 auto.create.topics.enable 属性来控制自动主题创建。在 Kafka Connect 中,topic.creation.enable 属性指定是否允许 Kafka Connect 创建主题。在这两种情况下,属性的默认设置都启用了自动主题创建。

当启用自动主题创建时,如果 Debezium 源连接器发出一个表的变更事件记录,而该表尚未存在目标主题,则在事件记录被摄入 Kafka 时,将在运行时创建该主题。

Broker 自动主题创建与 Kafka Connect 自动主题创建之间的区别

Broker 创建的主题仅限于共享单个默认配置。Broker 无法为不同的主题或主题集应用唯一的配置。相比之下,Kafka Connect 在创建主题时可以应用多种配置中的任何一种,根据 Debezium 连接器配置中指定的设置副本因子、分区数和其他特定主题的设置。连接器配置定义了一组主题创建组,并将一组主题配置属性与每个组关联起来。

Broker 配置和 Kafka Connect 配置是相互独立的。无论您是在 Broker 端禁用主题创建,Kafka Connect 都可以创建主题。如果您同时启用了 Broker 端和 Kafka Connect 中的自动主题创建,Connect 配置将优先,并且只有当 Kafka Connect 配置中的所有设置都不适用时,Broker 才会创建主题。

禁用 Kafka Broker 的自动主题创建

默认情况下,Kafka Broker 配置会启用 Broker 在主题不存在时在运行时创建主题。Broker 创建的主题无法使用自定义属性进行配置。如果您使用的是 2.6.0 之前的 Kafka 版本,并且希望使用特定配置创建主题,则必须在 Broker 端禁用自动主题创建,然后显式创建主题,无论是手动创建还是通过自定义部署流程。

过程
  • 在 Broker 配置中,将 auto.create.topics.enable 的值设置为 false

设置 Kafka Connect

Kafka Connect 中的自动主题创建由 topic.creation.enable 属性控制。该属性的默认值为 true,启用自动主题创建,如下例所示。

topic.creation.enable = true

topic.creation.enable 属性的设置适用于 Connect 集群中的所有 worker。

Kafka Connect 自动主题创建要求您定义 Kafka Connect 在创建主题时应用的配置属性。您可以在 Debezium 连接器配置中通过定义主题组,然后指定要应用于每个组的属性来指定主题配置属性。连接器配置定义了一个默认主题创建组,以及可选的一个或多个自定义主题创建组。自定义主题创建组使用主题名称模式列表来指定适用于该组设置的主题。

有关 Kafka Connect 如何将主题匹配到主题创建组的详细信息,请参阅 主题创建组。有关配置属性如何分配给组的更多信息,请参阅 主题创建组配置属性

默认情况下,Kafka Connect 创建的主题名称遵循 server.schema.table 的模式,例如 dbserver.myschema.inventory

如果您不想允许 Kafka Connect 进行自动主题创建,请在 Kafka Connect 配置(connect-distributed.properties 文件或使用 Debezium 的 Kafka Connect 容器镜像时通过环境变量 CONNECT_TOPIC_CREATION_ENABLE)中将 topic.creation.enable 的值设置为 false

Kafka Connect 自动主题创建要求至少为 default 主题创建组设置 replication.factorpartitions 属性。允许组从 Kafka Broker 的默认值中获取必需属性的值。

配置

为了让 Kafka Connect 自动创建主题,它需要来自源连接器关于创建主题时要应用的配置属性的信息。您在每个 Debezium 连接器的配置中定义控制主题创建的属性。当 Kafka Connect 创建连接器发出的事件记录的主题时,生成的这些主题将从适用的组获取其配置。此配置仅适用于该连接器发出的事件记录。

主题创建组

一组主题属性与一个主题创建组相关联。最低限度,您必须定义一个 default 主题创建组并指定其配置属性。除此之外,您还可以选择性地定义一个或多个自定义主题创建组并为每个组指定唯一的属性。

当您创建自定义主题创建组时,您基于主题名称模式定义每个组的成员主题。您可以指定描述要包含或排除在每个组中的主题的命名模式。includeexclude 属性包含逗号分隔的正则表达式列表,这些列表定义了主题名称模式。例如,如果您希望一个组包含所有以字符串 dbserver1.inventory 开头的主题,请将其 topic.creation.inventory.include 属性的值设置为 dbserver1\\.inventory\\.*

如果您为自定义主题组同时指定了 includeexclude 属性,则排除规则将优先,并覆盖包含规则。

主题创建组配置属性

default 主题创建组和每个自定义组都关联着一组唯一的配置属性。您可以配置一个组以包含任何 Kafka 主题级别配置属性。例如,您可以为主题组指定 旧主题段的清理策略保留时间,或 主题压缩类型。您必须至少定义一组属性来描述要创建的主题的配置。

如果没有注册自定义组,或者任何已注册组的 include 模式不匹配要创建的任何主题的名称,那么 Kafka Connect 将使用 default 组的配置来创建主题。

有关通用主题配置的注意事项,请参阅 Debezium 安装指南中的 配置 Debezium 主题

默认组配置

在使用 Kafka Connect 自动主题创建之前,您必须创建一个默认主题创建组并为其定义配置。默认主题创建组的配置将应用于名称不匹配自定义主题创建组的 include 列表模式的所有主题。

过程
  • 要为 topic.creation.default 组定义属性,请将它们添加到连接器配置 JSON 中,如下例所示。

    {
        ...
    
        "topic.creation.default.replication.factor": 3,  (1)
        "topic.creation.default.partitions": 10,  (2)
        "topic.creation.default.cleanup.policy": "compact",  (3)
        "topic.creation.default.compression.type": "lz4"  (4)
    
         ...
    }

    您可以在 default 组的配置中包含任何 Kafka 主题级别配置属性

表 1. default 主题创建组的连接器配置
Item 描述

1

topic.creation.default.replication.factor 定义了由默认组创建的主题的副本因子。
replication.factor 对于 default 组是必需的,但对于自定义组是可选的。如果未设置,自定义组将回退到默认组的值。使用 -1 来使用 Kafka Broker 的默认值。

2

topic.creation.default.partitions 定义了由默认组创建的主题的分区数。
partitions 对于 default 组是必需的,但对于自定义组是可选的。如果未设置,自定义组将回退到默认组的值。使用 -1 来使用 Kafka Broker 的默认值。

3

topic.creation.default.cleanup.policy 映射到 主题级别配置参数cleanup.policy 属性,并定义了日志保留策略。

4

topic.creation.default.compression.type 映射到 主题级别配置参数compression.type 属性,并定义了消息在硬盘上的压缩方式。

自定义组仅在必需的 replication.factorpartitions 属性上回退到 default 组的设置。如果自定义主题组的配置留有其他属性未定义,则不应用 default 组中指定的这些值。

自定义组配置

您可以定义多个自定义主题组,每个组都有自己的配置。

过程
  • 要定义一个自定义主题组,请将 topic.creation.<group_name>.include 属性添加到连接器 JSON 中,并在组名之后列出自定义组的属性。

    以下示例展示了 inventoryapplicationlogs 自定义主题创建组的示例配置。

    {
        ...
    
        (1)
        "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",  (2)
        "topic.creation.inventory.partitions": 20,
        "topic.creation.inventory.cleanup.policy": "compact",
        "topic.creation.inventory.delete.retention.ms": 7776000000,
    
        (3)
        "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",  (4)
        "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",  (5)
        "topic.creation.applicationlogs.replication.factor": 1,
        "topic.creation.applicationlogs.partitions": 20,
        "topic.creation.applicationlogs.cleanup.policy": "delete",
        "topic.creation.applicationlogs.retention.ms": 7776000000,
        "topic.creation.applicationlogs.compression.type": "lz4",
    
         ...
    }
表 2. inventoryapplicationlogs 自定义主题创建组的连接器配置
Item 描述

1

定义 inventory 组的配置。
replication.factorpartitions 属性对于自定义组是可选的。如果未设置值,自定义组将回退到 default 组设置的值。将值设置为 -1 以使用 Kafka Broker 设置的值。

2

topic.creation.inventory.include 定义了一个正则表达式,用于匹配所有以 dbserver1.inventory. 开头的主题。为 inventory 组定义的配置仅应用于名称与指定正则表达式匹配的主题。

3

定义 applicationlogs 组的配置。
replication.factorpartitions 属性对于自定义组是可选的。如果未设置值,自定义组将回退到 default 组设置的值。将值设置为 -1 以使用 Kafka Broker 设置的值。

4

topic.creation.applicationlogs.include 定义了一个正则表达式,用于匹配所有以 dbserver1.logs.applog- 开头的主题。为 applicationlogs 组定义的配置仅应用于名称与指定正则表达式匹配的主题。由于此组还定义了 exclude 属性,因此与 include 正则表达式匹配的主题可能会被该 exclude 属性进一步限制。

5

topic.creation.applicationlogs.exclude 定义了一个正则表达式,用于匹配所有以 dbserver1.logs.applog-old- 开头的主题。为 applicationlogs 组定义的配置仅应用于名称匹配给定正则表达式的主题。由于此组还定义了 include 属性,因此 applicationlogs 组的配置仅应用于名称与指定的 include 正则表达式匹配与指定的 exclude 正则表达式匹配的主题。

注册自定义组

在指定了任何自定义主题创建组的配置后,请注册这些组。

过程
  • 通过将 topic.creation.groups 属性添加到连接器 JSON 中,并指定一个逗号分隔的组列表来注册自定义组。

    以下示例注册了自定义主题创建组 inventoryapplicationlogs

    {
        ...
    
        "topic.creation.groups": "inventory,applicationlogs",
    
        ...
    }
完成的配置

以下示例显示了一个完整的配置,其中包含 default 主题组的配置,以及 inventoryapplicationlogs 自定义主题创建组的配置。

示例:默认主题创建组和两个自定义组的配置
{
    ...

    "topic.creation.default.replication.factor": 3,
    "topic.creation.default.partitions": 10,
    "topic.creation.default.cleanup.policy": "compact",
    "topic.creation.default.compression.type": "lz4",
    "topic.creation.groups": "inventory,applicationlogs",
    "topic.creation.inventory.include": "dbserver1\\.inventory\\.*",
    "topic.creation.inventory.partitions": 20,
    "topic.creation.inventory.cleanup.policy": "compact",
    "topic.creation.inventory.delete.retention.ms": 7776000000,
    "topic.creation.applicationlogs.include": "dbserver1\\.logs\\.applog-.*",
    "topic.creation.applicationlogs.exclude": "dbserver1\\.logs\\.applog-old-.*",
    "topic.creation.applicationlogs.replication.factor": 1,
    "topic.creation.applicationlogs.partitions": 20,
    "topic.creation.applicationlogs.cleanup.policy": "delete",
    "topic.creation.applicationlogs.retention.ms": 7776000000,
    "topic.creation.applicationlogs.compression.type": "lz4"
}

附加资源

有关主题自动创建的更多信息,您可以查阅以下资源: