Create new topics / pipes

当您使用Kafka Connect分布式模式时,您可能会发现一旦启动Kafka Connect,就会自动创建一些与Kafka Connect相关的内部主题。

$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --list

connect_configs
connect_offsets
connect_statuses

这是由Kafka Connect自动为您完成的,使用了合理的、定制化的默认主题配置,以满足这些内部主题的需求。

当您启动一个Debezium连接器时,捕获事件的主题将由Kafka代理根据默认(可能经过定制)的代理配置创建,前提是代理配置中启用了auto.create.topics.enable = true

auto.create.topics.enable = true
default.replication.factor = 1
num.partitions = 1
compression.type = producer
log.cleanup.policy = delete
log.retention.ms = 604800000  ## 7 days

但是,在生产环境中,当您使用Debezium和Kafka时,通常会选择禁用Kafka的主题自动创建功能(设置auto.create.topics.enable = false),或者您希望连接器主题的配置与默认值不同。在这种情况下,您必须提前为Debezium的捕获数据源创建主题。
但是,有个好消息!从Kafka Connect版本2.6.0开始,由于KIP-158的实现,可以通过Kafka Connect实现可定制的主题创建,这使得这一过程自动化成为可能。

Kafka Connect

Kafka 2.6.0 起,Kafka Connect 已启用主题创建功能

topic.creation.enable = true

如果您不希望连接器自动创建主题,可以在 Kafka Connect 配置中将此值设置为 false(在 connect-distributed.properties 文件中,或者在使用 Debezium 的 Kafka Connect 容器镜像时通过环境变量 CONNECT_TOPIC_CREATION_ENABLE 设置)。

更新连接器配置

Kafka Connect 的主题创建按组工作。始终存在一个 default 组,当没有其他匹配主题的组被定义时,将使用该组。

每个组都可以指定一组主题配置属性,以及一个正则表达式列表,用于匹配应应用该配置的主题名称。

您可以指定所有 主题级别配置参数 来定制匹配的主题如何被创建。

让我们看看如何为 Kafka Connect 主题创建扩展此 PostgreSQL 配置

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory"
    }
}

默认配置

所有不匹配其他 topic.creation 组的主题都将应用 default 组的配置。
默认情况下,我们希望 replication.factor = 3partitions = 10,主题是键压缩的,cleanup.policy = "compact",并且所有消息在磁盘上都将使用 LZ4 压缩,compression.type = "lz4"
所以我们为默认组配置如下

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory",

        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 10,
        "topic.creation.default.cleanup.policy": "compact",
        "topic.creation.default.compression.type": "lz4"
    }
}

Productlog 配置

在数据库的 inventory schema 中,有一些表的表名以 product 开头。
默认情况下,Debezium 会将完全限定的表名捕获到同名主题中,例如,dbserver1inventory schema 中的 products 表将被捕获到 dbserver1.inventory.products 主题中。

我们希望所有发送到表名以 product 开头的表的主题的消息,其保留时间为 3 个月/90 天,cleanup.policy": "delete"retention.ms = 7776000000replication.factor = 1partitions = 20,并且仅使用生产者使用的压缩格式 compression.type": "producer"
您可以省略与集群默认值匹配的属性,但请注意,一旦您更改了 Kafka Broker 上的默认配置,生成的主题配置可能会有所不同!

首先,我们需要使用 topic.creation.groups 属性注册一个 productlog 组。
然后,我们可以定义哪些主题名称应该包含在该组中,并指定我们组的配置,就像我们对 default 组所做的那样。

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": 1,
        "database.hostname": "postgres",
        "database.port": 5432,
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "database.server.name": "dbserver1",
        "schema.include.list": "inventory",
        "topic.creation.default.replication.factor": 3,
        "topic.creation.default.partitions": 10,
        "topic.creation.default.cleanup.policy": "compact",
        "topic.creation.default.compression.type": "lz4",

        "topic.creation.groups": "productlog",  (1)

        "topic.creation.productlog.include": "dbserver1\\.inventory\\.product.*",  (2)
        "topic.creation.productlog.replication.factor": 1,
        "topic.creation.productlog.partitions": 20,
        "topic.creation.productlog.cleanup.policy": "delete",
        "topic.creation.productlog.retention.ms": 7776000000,
        "topic.creation.productlog.compression.type": "producer"
    }
}
表 1. 用于自定义自动主题创建的连接器配置
Item 描述

1

topic.creation.groups 定义了一个逗号分隔的附加组名列表。这里我们只定义了我们的 productlog 组。

2

topic.creation.productlog.include 字段包含一个逗号分隔的正则表达式列表,用于匹配应应用 productlog 组配置的主题名称。productlog 组匹配所有以 dbserver1.inventory.product 开头的主题。

探索结果

现在当我们启动连接器并使用 kafka-topics.sh 查看主题是如何创建的时,我们可以看到一切都按定义的那样工作了。

## the `dbserver1.inventory.products` topic has the config from the `productlog` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.products

Topic: dbserver1.inventory.products     PartitionCount: 20      ReplicationFactor: 1
Configs: compression.type=producer,cleanup.policy=delete,retention.ms=7776000000,segment.bytes=1073741824

## the `dbserver1.inventory.orders` topic has the config from the `default` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.orders

Topic: dbserver1.inventory.orders       PartitionCount: 10       ReplicationFactor: 3
Configs: compression.type=lz4,cleanup.policy=compact,segment.bytes=1073741824,delete.retention.ms=2592000000

结论

在许多(尤其是在生产环境中)情况下,我们通常不希望在 Kafka Broker 端启用主题的自动创建,或者我们需要与默认主题配置不同的配置。
在 Kafka 2.6 之前,这只能通过手动预先创建主题或通过一些自定义设置过程(可能在部署期间)来实现。

自 Kafka 2.6 起,Kafka Connect 为连接器主题提供了内置的主题创建功能,本文展示了如何与 Debezium 一起使用它。

您可以在 GitHub 上的 Debezium 示例仓库中找到一个示例 这里

René Kerner

René 是 Red Hat 的一名软件工程师。在此之前,他曾在 trivago 担任软件架构师和工程师,并在 Codecentric 担任顾问。现在他是 Debezium 团队的一员。他住在德国门兴格拉德巴赫。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×