当您使用Kafka Connect分布式模式时,您可能会发现一旦启动Kafka Connect,就会自动创建一些与Kafka Connect相关的内部主题。
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --list
connect_configs
connect_offsets
connect_statuses 这是由Kafka Connect自动为您完成的,使用了合理的、定制化的默认主题配置,以满足这些内部主题的需求。
当您启动一个Debezium连接器时,捕获事件的主题将由Kafka代理根据默认(可能经过定制)的代理配置创建,前提是代理配置中启用了auto.create.topics.enable = true。
auto.create.topics.enable = true
default.replication.factor = 1
num.partitions = 1
compression.type = producer
log.cleanup.policy = delete
log.retention.ms = 604800000 ## 7 days 但是,在生产环境中,当您使用Debezium和Kafka时,通常会选择禁用Kafka的主题自动创建功能(设置auto.create.topics.enable = false),或者您希望连接器主题的配置与默认值不同。在这种情况下,您必须提前为Debezium的捕获数据源创建主题。
但是,有个好消息!从Kafka Connect版本2.6.0开始,由于KIP-158的实现,可以通过Kafka Connect实现可定制的主题创建,这使得这一过程自动化成为可能。
Kafka Connect
Kafka 2.6.0 起,Kafka Connect 已启用主题创建功能
topic.creation.enable = true 如果您不希望连接器自动创建主题,可以在 Kafka Connect 配置中将此值设置为 false(在 connect-distributed.properties 文件中,或者在使用 Debezium 的 Kafka Connect 容器镜像时通过环境变量 CONNECT_TOPIC_CREATION_ENABLE 设置)。
更新连接器配置
Kafka Connect 的主题创建按组工作。始终存在一个 default 组,当没有其他匹配主题的组被定义时,将使用该组。
每个组都可以指定一组主题配置属性,以及一个正则表达式列表,用于匹配应应用该配置的主题名称。
您可以指定所有 主题级别配置参数 来定制匹配的主题如何被创建。
让我们看看如何为 Kafka Connect 主题创建扩展此 PostgreSQL 配置
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory"
}
} 默认配置
所有不匹配其他 topic.creation 组的主题都将应用 default 组的配置。
默认情况下,我们希望 replication.factor = 3,partitions = 10,主题是键压缩的,cleanup.policy = "compact",并且所有消息在磁盘上都将使用 LZ4 压缩,compression.type = "lz4"。
所以我们为默认组配置如下
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4"
}
} Productlog 配置
在数据库的 inventory schema 中,有一些表的表名以 product 开头。
默认情况下,Debezium 会将完全限定的表名捕获到同名主题中,例如,dbserver1 的 inventory schema 中的 products 表将被捕获到 dbserver1.inventory.products 主题中。
我们希望所有发送到表名以 product 开头的表的主题的消息,其保留时间为 3 个月/90 天,cleanup.policy": "delete" 和 retention.ms = 7776000000,replication.factor = 1,partitions = 20,并且仅使用生产者使用的压缩格式 compression.type": "producer"。
您可以省略与集群默认值匹配的属性,但请注意,一旦您更改了 Kafka Broker 上的默认配置,生成的主题配置可能会有所不同!
首先,我们需要使用 topic.creation.groups 属性注册一个 productlog 组。
然后,我们可以定义哪些主题名称应该包含在该组中,并指定我们组的配置,就像我们对 default 组所做的那样。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": 1,
"database.hostname": "postgres",
"database.port": 5432,
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"schema.include.list": "inventory",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"topic.creation.groups": "productlog", (1)
"topic.creation.productlog.include": "dbserver1\\.inventory\\.product.*", (2)
"topic.creation.productlog.replication.factor": 1,
"topic.creation.productlog.partitions": 20,
"topic.creation.productlog.cleanup.policy": "delete",
"topic.creation.productlog.retention.ms": 7776000000,
"topic.creation.productlog.compression.type": "producer"
}
} | Item | 描述 |
|---|---|
1 |
|
2 |
|
探索结果
现在当我们启动连接器并使用 kafka-topics.sh 查看主题是如何创建的时,我们可以看到一切都按定义的那样工作了。
## the `dbserver1.inventory.products` topic has the config from the `productlog` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.products
Topic: dbserver1.inventory.products PartitionCount: 20 ReplicationFactor: 1
Configs: compression.type=producer,cleanup.policy=delete,retention.ms=7776000000,segment.bytes=1073741824
## the `dbserver1.inventory.orders` topic has the config from the `default` group:
$ kafka-topics.sh --bootstrap-server $HOSTNAME:9092 --describe --topic dbserver1.inventory.orders
Topic: dbserver1.inventory.orders PartitionCount: 10 ReplicationFactor: 3
Configs: compression.type=lz4,cleanup.policy=compact,segment.bytes=1073741824,delete.retention.ms=2592000000 结论
在许多(尤其是在生产环境中)情况下,我们通常不希望在 Kafka Broker 端启用主题的自动创建,或者我们需要与默认主题配置不同的配置。
在 Kafka 2.6 之前,这只能通过手动预先创建主题或通过一些自定义设置过程(可能在部署期间)来实现。
自 Kafka 2.6 起,Kafka Connect 为连接器主题提供了内置的主题创建功能,本文展示了如何与 Debezium 一起使用它。
您可以在 GitHub 上的 Debezium 示例仓库中找到一个示例 这里。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。