您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

存储 Debezium 连接器的状态

概述

Debezium 连接器需要持久化存储来在重启之间保留其状态。所有连接器都需要一种机制来提供偏移量的持久化存储。此外,像 Db2、MySQL、Oracle 和 SQL Server 这样的连接器还需要额外的存储来存储其所谓的内部模式历史记录,该记录会记录数据库中表模式的更改。

通过以下机制之一,在 Kafka Connect 运行时中自动为部署提供偏移量存储:

Kafka 偏移量存储

为 Kafka Connect 分布式提供存储。

文件偏移量存储

为 Kafka Connect 单机版提供存储。

如果您在 Debezium EngineDebezium Server 中运行连接器,则必须显式配置偏移量存储。对于与基于模式的数据库一起工作的连接器,您可以通过设置连接器属性来配置内部模式历史记录存储。

Kafka

Debezium 可以使用 Kafka 来存储其状态,包括源偏移量和模式历史记录。连接器实现 KafkaOffsetBackingStore 以将偏移量存储在 Kafka 主题中(例如,connect-offsets)。这些偏移量可确保在连接器重启后,它能够从正确的位置继续读取。连接器将其模式历史记录存储在单独的压缩主题中,例如 schema-changes.inventory

偏移量存储

属性 Default (默认值) 描述

无默认值

必须设置为 org.apache.kafka.connect.storage.KafkaOffsetBackingStore

无默认值

指定连接器存储其偏移量的 Kafka 主题。为了确保该主题保留最新的偏移量信息,您必须为该主题启用日志压缩。

25

指定偏移量存储主题的分区数。确保此设置的值与 Kafka 集群的分区策略保持一致。

3

设置偏移量存储主题的复制因子。跨多个代理复制数据可提高容错能力。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.kafka.history.KafkaSchemaHistory

无默认值

存储数据库模式历史记录的主题名称。

无默认值

连接器用于与 Kafka 集群建立初始连接以检索其数据库模式历史记录的主机和端口对列表。此值必须与 Kafka Connect 进程连接 Kafka 集群的连接设置匹配。

100

指定在恢复期间,连接器在轮询持久化数据请求之间等待的时间(以毫秒为单位)。

100

指定连接器允许从 Kafka 检索模式历史记录数据失败的连续尝试次数。当尝试次数超过此值时,恢复尝试将停止。连接器在无法检索数据后等待的最长时间为 recovery.attempts x recovery.poll.interval.ms

3

指定在 Kafka AdminClient 提交请求获取集群信息后,请求超时之前连接器等待响应的时间(以毫秒为单位)。

30

指定在 Kafka AdminClient 提交请求创建 Kafka 历史记录主题后,请求超时之前连接器等待响应的时间(以毫秒为单位)。

无默认值

用于配置生产者客户端如何与模式历史记录主题交互的传递属性前缀。

无默认值

用于配置消费者客户端如何与模式历史记录主题交互的传递属性前缀。

文件

可以将连接器的位置(偏移量)持久化到本地磁盘文件。这些偏移量可确保在连接器重启后,Debezium 能够从上次读取的位置继续读取。文件存储提供了一种简单、快速的偏移量存储机制,非常适合单节点应用程序或测试场景。

偏移量存储

属性 Default (默认值) 描述

无默认值

必须设置为 org.apache.kafka.connect.storage.FileOffsetBackingStore

无默认值

Debezium 存储源连接器偏移量的文件的路径。

6000 毫秒

指定尝试将当前偏移量状态刷新到配置的偏移量文件之间的时间(以毫秒为单位)。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.file.history.FileSchemaHistory

无默认值

Debezium 记录数据库模式历史记录的文件的路径。

内存

MemoryOffsetBackingStore 是一个易失性的内存存储,Debezium Embedded 使用它来跟踪源偏移量。将偏移量存储在内存中仅在应用程序运行时保留偏移量状态。如果连接器关闭或崩溃,偏移量记录将丢失。内存存储非常适合测试或短期任务,但它不适用于需要持久化偏移量的生产环境。

偏移量存储

属性 Default (默认值) 描述

无默认值

必须设置为 org.apache.kafka.connect.storage.MemoryOffsetBackingStore

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.relational.history.MemorySchemaHistory

JDBC

该存储使用任意关系数据库来存储偏移量数据。您必须提供数据库的 JDBC 驱动程序。Debezium 可以将数据存储在它从中捕获事件的同一个源数据库中,或者您可以将其配置为使用不同的数据库。

Debezium 提供了预配置的 DML 和 DDL 语句。您可以使用这些默认语句,也可以用自己的语句覆盖默认语句,以提供与数据库方言的兼容性或为特定用例进行自定义。

偏移量存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore

无默认值

用于连接数据库的 JDBC 驱动程序连接字符串。

无默认值

(可选)Debezium 连接到存储偏移量数据的数据库时使用的用户名。

无默认值

(可选)由 offset.storage.jdbc.connection.user 指定的用户的密码。

3 秒

(可选)指定连接器在连接偏移量存储数据库失败后重试连接的时间(以毫秒为单位)。

5

(可选)指定 Debezium 在连接失败后重试连接到偏移量存储数据库的最大次数。

debezium_offset_storage

Debezium 存储偏移量的表的名称。

用于创建偏移量表的 DDL 语句。

Debezium 用于从表中读取偏移量值的 DML 语句。

Debezium 用于将偏移量写入表的 DML 语句。

Debezium 用于从表中删除偏移量的 DML 语句。

3.2 版本之前的已弃用配置

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore

无默认值

用于连接数据库的 JDBC 驱动程序连接字符串。

无默认值

(可选)Debezium 连接到存储偏移量数据的数据库时使用的用户名。

无默认值

(可选)由 offset.storage.jdbc.user 指定的用户的密码。

3 秒

(可选)指定连接器在连接偏移量存储数据库失败后重试连接的时间(以毫秒为单位)。

5

(可选)指定 Debezium 在连接失败后重试连接到偏移量存储数据库的最大次数。

debezium_offset_storage

Debezium 存储偏移量的表的名称。

用于创建偏移量表的 DDL 语句。

从表中读取存储的偏移量的 DML 语句。

将偏移量写入表的 DML 语句。

从表中删除偏移量的 DML 语句。

偏移量表默认值

CREATE TABLE %s (
id VARCHAR(36)      NOT NULL,
offset_key          VARCHAR(1255),
offset_val          VARCHAR(1255),
record_insert_ts    TIMESTAMP NOT NULL,
record_insert_seq   INTEGER NOT NULL)
SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq
INSERT INTO %s(id, offset_key, offset_val, record_insert_ts, record_insert_seq)
    VALUES ( ?, ?, ?, ?, ? )
DELETE FROM %s

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.jdbc.history.JdbcSchemaHistory

无默认值

用于连接数据库的 JDBC 驱动程序连接字符串。

无默认值

(可选)Debezium 连接到存储模式历史记录数据的数据库时使用的用户名。

无默认值

(可选)由 schema.history.internal.jdbc.connection.user 指定的用户的密码。

3 秒

(可选)指定连接器在尝试连接到内部模式历史记录数据库失败后重试连接的时间(以毫秒为单位)。

5

(可选)指定 Debezium 在连接失败后重试连接到内部模式历史记录数据库的最大次数。

debezium_database_history

Debezium 存储内部模式历史记录的表的名称。

用于创建存储内部模式历史记录的表的 DDL 语句。

用于从内部模式历史记录表中读取模式更改的 SELECT 语句。

用于检查内部模式历史记录存储表是否存在状态的 SELECT 语句。

用于记录内部模式历史记录表更改的 INSERT 语句。

3.2 版本之前的已弃用配置

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.jdbc.history.JdbcSchemaHistory

无默认值

用于连接数据库的 JDBC 驱动程序连接字符串。

无默认值

(可选)Debezium 连接到存储内部模式历史记录数据的数据库时使用的用户名。

无默认值

(可选)由 schema.history.internal.jdbc.user 指定的用户的密码。

3 秒

(可选)指定连接器在尝试连接到内部模式历史记录数据库失败后重试连接的时间(以毫秒为单位)。

5

(可选)指定 Debezium 在连接失败后重试连接到内部模式历史记录数据库的最大次数。

debezium_database_history

Debezium 存储内部模式历史记录的表的名称。

用于创建内部模式历史记录存储表的 DDL 语句。

用于从内部模式历史记录表中读取模式更改的 SELECT 语句。

用于检查内部模式历史记录存储表是否存在状态的 SELECT 语句。

用于记录内部模式历史记录表更改的 INSERT 语句。

历史记录表默认值

CREATE TABLE %s (
    id VARCHAR(36) NOT NULL,
    history_data VARCHAR(65000),
    history_data_seq INTEGER,
    record_insert_ts TIMESTAMP NOT NULL,
    record_insert_seq INTEGER NOT NULL
)
SELECT id, history_data FROM %s
    ORDER BY record_insert_ts, record_insert_seq, id, history_data_seq
SELECT * FROM %s LIMIT 1
INSERT INTO %s(id, history_data, history_data_seq, record_insert_ts, record_insert_seq) VALUES ( ?, ?, ?, ?, ? )

Redis

Debezium 可以使用 Jedis 客户端 将数据存储在 Redis 缓存中。

Debezium 可以使用单个 Redis 实例,也可以使用 Redis Cluster 模式

单实例模式

连接到单个 Redis 服务器实例。

集群模式

连接到 Redis Cluster 以实现高可用性和水平扩展。

要启用 Redis Cluster 模式,请将 redis.cluster.enabled 属性设置为 true,并在 redis.address 属性中提供逗号分隔的主机:端口地址。

偏移量存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.redis.offset.RedisOffsetBackingStore

metadata:debezium:offsets

Debezium 用于存储偏移量的 Redis 键。

无默认值

Debezium 连接 Redis 以存储偏移量数据的 URL。

无默认值

Debezium 连接 Redis 以存储偏移量数据的用户账户。

无默认值

Debezium 连接 Redis 以存储偏移量数据的用户账户的密码。

0

Debezium 用于访问 Redis 以存储偏移量数据的数据库索引(0-15)。

false

指定 Debezium 在与 Redis 通信以存储偏移量数据时是否使用 SSL。

false

指定 Debezium 在与 Redis 通信以存储偏移量数据时是否启用了主机名验证。

无默认值

用于 SSL/TLS 连接到 Redis 进行偏移量存储的信任库文件的路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

无默认值

用于 SSL/TLS 连接到 Redis 进行偏移量存储的信任库文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

用于 SSL/TLS 连接到 Redis 进行偏移量存储的信任库文件的类型。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

无默认值

用于 SSL/TLS 连接到 Redis 进行偏移量存储的密钥库文件的路径。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

无默认值

用于 SSL/TLS 连接到 Redis 进行偏移量存储的密钥库文件的密码。如果设置,Redis 连接将使用此属性而不是其他配置或系统属性。

JKS

用于 SSL/TLS 连接到 Redis 进行偏移量存储的密钥库文件的类型。

2000

指定 Debezium 在连接超时之前等待与 Redis 建立连接的时间(以毫秒为单位)。

2000

指定 Debezium 允许与 Redis 交换偏移量数据的间隔(以毫秒为单位),之后套接字将超时。如果在指定的间隔内未传输数据包,Debezium 将关闭套接字。

300

指定 Debezium 在首次尝试连接 Redis 失败后等待重试连接的时间(以毫秒为单位)。

10000

指定 Debezium 在连接 Redis 失败后重试连接的最大等待时间(以毫秒为单位)。

10

指定 Debezium 在连接尝试失败后重试连接到 Redis 的最大次数。

false

在配置为使用副本分片的 Redis 环境中,指定 Debezium 是否等待 Redis 确认其已将数据写入副本。

1000

指定 Debezium 等待 Redis 将数据写入副本分片确认的时间(以毫秒为单位),之后请求将超时。

false

指定 Debezium 是否重试失败的请求以确认数据是否已写入副本分片。

1000

指定 Debezium 在失败后等待多长时间后,将重新提交请求到 Redis 以确认数据是否已写入副本分片。

false

如果您将 Debezium Server 配置为将偏移量存储在 Redis 中,请设置此属性以指定是否使用 Redis Cluster 模式。将值设置为 true 以配置 Debezium 使用 JedisCluster 客户端 将偏移量数据路由到 Redis 节点。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.redis.history.RedisSchemaHistory

metadata:debezium:schema_history

Debezium 用于存储模式历史记录数据的 Redis 键。

无默认值

Debezium 连接 Redis 以存储模式历史记录数据的 URL。

无默认值

Debezium 连接 Redis 以存储模式历史记录数据的用户账户。

无默认值

Debezium 连接 Redis 以存储模式历史记录数据的用户账户的密码。

0

Debezium 用于访问 Redis 以存储模式历史记录数据的数据库索引(0-15)。

false

指定 Debezium 在与 Redis 通信以存储模式历史记录数据时是否使用 SSL。

false

指定 Debezium 在与 Redis 通信以存储模式历史记录数据时是否启用了主机名验证。

无默认值

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的信任库文件的路径。

无默认值

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的信任库文件的密码。

JKS

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的信任库文件的类型。

无默认值

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的密钥库文件的路径。

无默认值

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的密钥库文件的密码。

JKS

用于 SSL/TLS 连接到 Redis 以存储模式历史记录数据的密钥库文件的类型。

2000

指定 Debezium 在连接超时之前等待与 Redis 建立连接的时间(以毫秒为单位)。

2000

指定 Debezium 允许与 Redis 交换模式历史记录数据的间隔(以毫秒为单位)。如果在指定的间隔内未传输数据包,Debezium 将关闭套接字。

300

指定 Debezium 在首次尝试连接 Redis 失败后等待重试连接的时间(以毫秒为单位)。

10000

指定 Debezium 在连接 Redis 失败后重试连接的最大等待时间(以毫秒为单位)。

10

指定 Debezium 在连接尝试失败后重试连接到 Redis 的最大次数。

false

在配置为使用副本分片的 Redis 环境中,指定 Debezium 是否等待 Redis 确认其已将数据写入副本。

1000

指定 Debezium 等待 Redis 将数据写入副本分片确认的时间(以毫秒为单位),之后请求将超时。

false

指定 Debezium 是否重试失败的请求以确认数据是否已写入副本分片。

1000

指定 Debezium 在失败后等待多长时间后,将重新提交请求到 Redis 以确认数据是否已写入副本分片。

false

如果您将 Debezium Server 配置为将模式历史记录存储在 Redis 中,请设置此属性以指定是否使用 Redis Cluster 模式。将值设置为 true 以配置 Debezium 使用 JedisCluster 客户端将历史记录数据路由到 Redis 节点。

Redis 配置示例

单实例模式

# Offset storage configuration
offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
offset.storage.redis.address=localhost:6379
offset.storage.redis.password=password
offset.storage.redis.cluster.enabled=false

# Schema history storage configuration
schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
schema.history.internal.storage.redis.address=localhost:6379
schema.history.internal.storage.redis.password=password
schema.history.internal.storage.redis.cluster.enabled=false

集群模式

# Offset storage configuration
offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
offset.storage.redis.address=redis-node-1:7001,redis-node-2:7002,redis-node-3:7003
offset.storage.redis.password=password
offset.storage.redis.cluster.enabled=true

# Schema history storage configuration
schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
schema.history.internal.storage.redis.address=redis-node-1:7001,redis-node-2:7002,redis-node-3:7003
schema.history.internal.storage.redis.password=password
schema.history.internal.storage.redis.cluster.enabled=true
如果您将 Debezium 配置为使用 Redis Cluster 模式,请确保您的 Redis Cluster 已正确配置并可访问。Debezium Server 实例必须能够与集群节点通信。

Amazon S3

Debezium 可以使用 Amazon S3 对象存储服务。通常,您会在使用 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 部署 Debezium 时使用 S3 存储。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.s3.history.S3SchemaHistory

无默认值

(可选)Debezium 用于向 S3 进行身份验证的静态访问密钥的标识符。

无默认值

(可选)Debezium 用于向 S3 进行身份验证的 Amazon Web Services (AWS) 密钥。

无默认值

(可选)指定托管 S3 存储桶的区域名称。

无默认值

指定存储模式历史记录的 S3 存储桶的名称。

无默认值

指定存储模式历史记录的存储桶中的对象名称。

无默认值

(可选)指定 Debezium 用于访问 S3 服务的自定义 URL。
按以下格式提供 URL:http://<server>:<port>;

Azure Blob 存储

Debezium 可以使用 Azure Blob 存储服务来存储数据。通常,您会在 Apache Kafka in Azure HDInsight 服务中部署 Debezium 时使用 Azure Blob 存储。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.azure.blob.history.AzureBlobSchemaHistory

无默认值

指定 Azure Blob 存储连接字符串。

无默认值

Debezium 用于连接到 Azure 的账户名称。

无默认值

Debezium 存储数据的 Azure 容器的名称。

无默认值

Debezium 存储数据的 Blob 的名称。

RocketMQ

Debezium 可以使用 RocketMqSchemaHistory 类将数据库模式更改存储和检索到 Apache RocketMQ 中。

内部模式历史记录存储

属性 Default (默认值) 描述

无默认值

必须设置为 io.debezium.storage.rocketmq.history.RocketMqSchemaHistory

无默认值

Debezium 存储数据库模式历史记录的 RocketMQ 主题名称。

无默认值

指定 Apache RocketMQ NameServer 发现服务所在的主机和端口。

false

指定是否在 RocketMQ 中启用访问控制列表。

无默认值

指定 RocketMQ 访问密钥。如果将 schema.history.internal.rocketmq.acl.enabled 设置为 true,则此字段必须包含一个值。

无默认值

指定 RocketMQ 密钥。如果将 schema.history.internal.rocketmq.acl.enabled 设置为 true,则此字段必须包含一个值。

无默认值

指定 RocketMQ 在恢复完成前返回无数据的连续尝试次数。

无默认值

指定 Debezium 在每次轮询尝试以恢复历史记录后等待的时间(以毫秒为单位)。

无默认值

指定 Debezium 在操作超时之前等待写入 Rocket MQ 完成的时间(以毫秒为单位)。