Debezium Cloud Spanner 连接器
Debezium 的 Cloud Spanner 连接器会消耗并输出 Spanner 更改流数据到 Kafka 主题。
Spanner 更改流会近乎实时地监视和流出 Spanner 数据库的数据更改——插入、更新、删除。Spanner 连接器抽象了查询 Spanner 更改流的细节。使用此连接器,您不必管理更改流分区生命周期,而这是您 直接使用 Spanner API 时所必需的。
该连接器目前不支持快照功能。Kafka 连接器第一次连接到 Spanner 数据库时,它将从提供的起始时间戳或当前时间戳(如果未提供时间戳)开始流式传输更改。
概述
要读取和处理数据库更改,连接器会从更改流中进行查询。
连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。
该连接器对故障具有容错能力。当连接器读取更改并生成事件时,它会记录已处理的每个 更改流分区的最后一个提交时间戳。如果连接器由于任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,连接器将从上次中断的地方继续流式传输记录。
连接器的工作原理
为了最佳地配置和运行 Debezium Spanner 连接器,了解连接器如何流式传输更改事件、确定 Kafka 主题名称以及使用元数据非常重要。
流式传输更改
Debezium Spanner 连接器花费所有时间从其订阅的更改流中流式传输更改。当表中发生更改时,Spanner 会在数据库中写入相应的更改流记录,与数据更改同步地在同一个事务中。为了扩展更改流的写入和读取,Spanner 会 along with the database data 分割和合并内部更改流存储。为了支持在数据库写入扩展时近乎实时地读取更改流记录,Spanner API 被设计成可以使用更改流分区来并发查询。请参阅 Spanner 更改流的 分区模型。
连接器提供了一个抽象层,用于查询更改流的 Spanner API。有了这个连接器,您就不必管理更改流分区生命周期。连接器为您提供了一个数据更改记录流,因此您可以更专注于应用程序逻辑,而不是具体的 API 细节和动态更改流分区。
当订阅更改流时,连接器需要提供项目 ID、Spanner 实例 ID、Spanner 数据库 ID 以及更改流名称。用户还可以选择性地提供开始时间戳和结束时间戳。有关连接器配置属性的更详细列表,请参阅 本节。
当连接器接收到更改时,它会将事件转换为 Debezium 的创建、更新或删除事件。连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以它们生成的相同顺序异步地将更改事件记录写入适当的 Kafka 主题。
Kafka Connect 会定期在另一个 Kafka 主题中记录最近的偏移量。偏移量指示 Debezium 与每个事件一起包含的源特定位置信息。对于 Spanner 连接器,最后一个已处理的更改流分区的提交时间戳就是偏移量。
当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器的最后一个记录的偏移量,并在其最后一个记录的偏移量处启动每个连接器。
Spanner 连接器还在流式传输期间将元数据信息记录在以下元数据主题中。不建议修改这些主题的内容或配置
-
_consumer_offsets:Kafka 自动创建的一个主题。存储 Kafka 连接器中创建的消费者的偏移量。 -
_kafka-connect-offsets:Kafka Connect 自动创建的一个主题。存储连接器偏移量。 -
_sync_topic_spanner_connector_connectorname:连接器自动创建的一个主题。存储有关更改流分区的信息。 -
_rebalancing_topic_spanner_connector_connectorname:连接器自动创建的一个主题。用于确定连接器任务的存活性。 -
_debezium-heartbeat.connectorname:用于处理 Spanner 更改流心跳的主题。
主题名称
Spanner 连接器将单个表的所有插入、更新和删除操作的事件写入一个 Kafka 主题。默认情况下,Kafka 主题名称为 topicPrefix.connectorName.tableName,其中
-
*topicPrefix* 是由
topic.prefix连接器配置属性指定的。*topicPrefix* 是由topic.prefix连接器配置属性指定的。 -
connectorName 是用户指定的连接器名称。
-
*tableName* 是发生操作的数据库表的名称。
例如,假设 spanner 是配置中用于捕获 Spanner 更改流(跟踪数据库中四个表:table1、table2、table3 和 table4 的更改)的逻辑名称。该连接器将流式传输记录到这四个 Kafka 主题:
-
spanner.table1 -
spanner.table2 -
spanner.table3 -
spanner.table4
数据更改事件
Debezium Spanner 连接器为每次行级别的 INSERT、UPDATE 和 DELETE 操作生成一个数据更改事件。每个事件都包含一个键和一个值。键和值是独立的文档。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 都围绕连续的事件消息流进行设计。但是,这些事件的结构可能会随时间而变化,这会给消费者带来处理困难。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个消费者可以用于从注册表中获取模式的模式 ID。这使得每个事件都自包含。键的模式永远不会改变。请注意,值的模式是自连接器启动以来更改流跟踪的表中所有列的融合。
以下骨架 JSON 文档显示了键和值文档的基本结构。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器将决定键和值文档的表示。当您配置转换器以生成它们时,更改事件的键或更改事件的值中才会出现 schema 字段。同样,只有当您配置转换器以生成它们时,才会出现事件键和事件有效负载。如果您使用 JSON 转换器并配置它生成模式,则更改事件具有此结构
// Key
{
"schema": { (1)
...
},
"payload": { (2)
...
}
}
// Value
{
"schema": { (3)
...
},
"payload": { (4)
...
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认行为是连接器将更改事件记录流式传输到名称与事件源表相同的主题。
|
从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。 |
更改事件键
对于给定的表,更改事件的键具有一个结构,该结构包含表在创建事件时主键中每个列的字段。所有键列都将被标记为非可选。
考虑 business 数据库中定义的 users 表以及该表的更改事件键的示例。
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
对于 users 表,在其具有此定义时,每个更改事件都具有相同的键结构,在 JSON 中看起来像这样
{
"schema": { (1)
"type": "struct",
"name": "Users.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"type": "int64",
"optional": "false",
"field": "false"
}
]
},
"payload": { (5)
"id": "1"
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
定义键有效负载结构名称的模式。此模式描述了已更改的表的主键的结构。 |
3 |
|
指示事件键的 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的行的键。在此示例中,键包含一个名为 |
更改事件值
考虑用于显示更改事件键示例的相同样本表。
CREATE TABLE Users (
id INT64 NOT NULL,
username STRING(MAX) NOT NULL,
password STRING(MAX) NOT NULL,
email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
create 事件
以下示例显示了连接器为在 Users 表中创建数据的操作生成的更改事件的值部分。如果 Spanner 列被标记为非可选,则所有插入行的变异都需要其值。Spanner 中的所有主键列都将被标记为非可选。请注意,即使 Spanner 中的非键列被标记为非可选,它在模式中也会显示为可选。只有主键列在模式中被标记为非可选。
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "first_name"
},
{
"type": "string",
"optional": true,
"field": "last_name"
},
{
"type": "string",
"optional": true,
"field": "email"
}
],
"optional": true,
"name": "Users.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Users.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "project_id"
},
{
"type": "string",
"optional": false,
"field": "instance_id"
},
{
"type": "string",
"optional": false,
"field": "database_id"
},
{
"type": "string",
"optional": false,
"field": "change_stream_name"
},
{
"type": "string",
"optional": true,
"field": "table"
}
{
"type": "string",
"optional": true,
"field": "server_transaction_id"
}
{
"type": "int64",
"optional": true,
"field": "low_watermark"
}
{
"type": "int64",
"optional": true,
"field": "read_at_timestamp"
}
{
"type": "int64",
"optional": true,
"field": "number_of_records_in_transaction"
}
{
"type": "string",
"optional": true,
"field": "transaction_tag"
}
{
"type": "boolean",
"optional": true,
"field": "system_transaction"
}
{
"type": "string",
"optional": true,
"field": "value_capture_type"
}
{
"type": "string",
"optional": true,
"field": "partition_token"
}
{
"type": "int32",
"optional": true,
"field": "mod_number"
}
{
"type": "boolean",
"optional": true,
"field": "is_last_record_in_transaction_in_partition"
}
{
"type": "int64",
"optional": true,
"field": "number_of_partitions_in_transaction"
}
],
"optional": false,
"name": "io.debezium.connector.spanner.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "connector_name.Users.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (8)
"version": "3.3.0.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"ts_us": 1670955531785000,
"ts_ns": 1670955531785000000,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "c", (9)
"ts_ms": 1559033904863, (10)
"ts_us": 1559033904863769, (10)
"ts_ns": 1559033904863769841 (10)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述值有效负载结构的 值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 |
6 |
|
一个可选字段,指定事件发生前行的状态。当 |
7 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
8 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的 必需字符串。在此示例中,
|
10 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
update 事件
示例 Users 表更新的更改事件的值具有与该表的创建事件相同的模式。同样,事件值有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是连接器为 Users 表中的更新生成的事件中的更改事件值的示例
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "3.3.0.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"ts_us": 1670955531785000,
"ts_ns": 1670955531785000000,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "u", (4)
"ts_ms": 1465584025523, (5)
"ts_us": 1465584025523614, (5)
"ts_ns": 1465584025523614723 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,包含数据库提交之前行中所有列的所有值。 |
2 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
3 |
|
一个强制性字段,描述事件的源元数据。
|
4 |
|
描述操作类型的 必需字符串。在更新事件值中, |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
delete 事件
delete 更改事件中的值具有与同一表的创建和更新事件相同的 schema 部分。示例 Users 表的delete 事件中的 payload 部分看起来像这样
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"version": "3.3.0.Final",
"connector": "spanner",
"name": "spanner_connector",
"ts_ms": 1670955531785,
"ts_us": 1670955531785000,
"ts_ns": 1670955531785000000,
"snapshot": "false",
"db": "database",
"sequence": "1",
"project_id": "project",
"instance_id": "instance",
"database_id": "database",
"change_stream_name": "change_stream",
"table": "Users",
"server_transaction_id": "transaction_id",
"low_watermark": 1670955471635,
"read_at_timestamp": 1670955531791,
"number_records_in_transaction": 2,
"transaction_tag": "",
"system_transaction": false,
"value_capture_type": "OLD_AND_NEW_VALUES",
"partition_token": "partition_token",
"mod_number": 0,
"is_last_record_in_transaction_in_partition": true,
"number_of_partitions_in_transaction": 1
},
"op": "d", (4)
"ts_ms": 1465581902461, (5)
"ts_us": 1465581902461425, (5)
"ts_ns": 1465581902461425378 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,指定事件发生前行的状态。在删除事件值中, |
2 |
|
一个可选字段,指定事件发生后行的状态。在删除事件值中, |
3 |
|
一个强制性字段,描述事件的源元数据。在*delete* 事件值中,
|
4 |
|
描述操作类型的 必需字符串。 |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。
Spanner 连接器事件设计用于与 Kafka 日志压缩配合使用。日志压缩允许删除一些较旧的消息,只要保留每个键的至少最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。请注意,如果启用了低水位标记,则不应启用压缩。
当行被删除时,delete 事件值仍然可以与日志压缩配合使用,因为 Kafka 可以删除具有相同键的所有先前消息。但是,为了让 Kafka 删除所有具有相同键的消息,消息值必须为 null。为了实现这一点,连接器会向一个特殊的墓碑事件发送一个delete 事件,该事件具有相同的键但值为 null。
部署
安装 Kafka 和 Kafka Connect 后,部署 Debezium Spanner 连接器的剩余任务是下载连接器的插件存档,将 JAR 文件解压到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。
如果您使用不可变容器,请参阅 Debezium 的容器镜像,其中包含已安装并准备好运行的 Spanner 连接器的 Kafka 和 Kafka Connect。您也可以 在 Kubernetes 和 OpenShift 上运行 Debezium。
|
从 |
连接器配置示例
以下是一个 Spanner 连接器的配置示例,该连接器连接到项目 Project、实例 Instance、数据库 Database 中名为 changeStreamAll 的更改流。
您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。
{
"name": "spanner-connector", (1)
"config": {
"connector.class": "io.debezium.connector.spanner.SpannerConnector", (2)
"gcp.spanner.change.stream": "changeStreamAll", (3)
"gcp.spanner.project.id": "Project", (4)
"gcp.spanner.instance.id": "Instance", (5)
"gcp.spanner.database.id": "Database", (6)
"gcp.spanner.credentials.json": <key.json>, (7)
"tasks.max": 1 (8)
}
}
| 1 | 连接器在 Kafka Connect 服务中注册时的名称。 |
| 2 | 此 Spanner 连接器类的名称。 |
| 3 | 更改流名称。 |
| 4 | GCP 项目 ID。 |
| 5 | Spanner 实例 ID。 |
| 6 | Spanner 数据库 ID。 |
| 7 | GCP 服务帐户密钥 JSON。 |
| 8 | 最大任务数。 |
添加连接器配置
要开始运行 Spanner 连接器,请创建连接器配置并将配置添加到您的 Kafka Connect 集群。
-
Spanner 更改流已创建并可用。
-
Spanner 连接器已安装。
-
为 Spanner 连接器创建配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。
监控
除了 Kafka 和 Kafka Connect 提供的 JMX 指标的内置支持外,Debezium Spanner 连接器还提供一种指标。
-
流式传输指标提供有关连接器捕获更改和流式传输更改事件记录时连接器操作的信息。
Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,Spanner 连接器使用以下 MBean 名称进行流式传输指标
debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
流式传输指标
MBean 为 debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>。
| Attributes | Type | 描述 |
|---|---|---|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。 |
|
|
连接器任务的当前低水位标记。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。 |
|
|
连接器任务的当前低水位标记(以毫秒为单位)。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。 |
|
|
低水位标记落后于当前时间的毫秒延迟。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。 |
|
|
低水位标记落后于当前时间的延迟分布(以毫秒为单位)。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
Spanner 提交时间戳到连接器读取延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
Spanner 读取时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
Spanner 提交时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
Spanner 提交时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
连接器发出时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。 |
|
|
总 Spanner 事件队列容量。此队列指示 StreamEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储从更改流查询接收到的元素。 |
|
|
剩余的 Spanner 事件队列容量。 |
|
|
总任务状态更改事件队列容量。此队列指示 TaskStateChangeEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储连接器中发生的事件。 |
|
|
剩余的任务状态更改事件队列容量。 |
|
|
当前任务检测到的分区总数。 |
|
|
当前任务发出的更改流查询总数。 |
|
|
当前任务检测到的活动更改流查询数。 |
连接器配置属性
Debezium Spanner 连接器有许多配置属性,可用于为您的应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:
以下配置属性是必需的,除非有默认值可用。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。 |
|
无默认值 |
连接器的 Java 类名。对于 Spanner 连接器,始终使用 |
|
|
应为此连接器创建的最大任务数。如果启用 offset.storage.per.task 模式,Spanner 连接器可以使用多个任务。 |
|
无默认值 |
GCP 项目 ID |
|
无默认值 |
Spanner 实例 ID |
|
无默认值 |
Spanner 数据库 ID |
|
无默认值 |
Spanner 更改流 |
|
无默认值 |
GCP 服务帐户密钥 JSON 的文件路径。 |
|
无默认值 |
GCP 服务帐户密钥 JSON。如果未提供 gcp.spanner.credentials.path,则需要此项。 |
|
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
是否为连接器启用了低水位标记。 |
|
|
低水位标记更新的间隔。 |
|
|
Spanner 心跳间隔。 |
|
|
连接器启动时间。 |
|
|
连接器结束时间。 |
|
|
Spanner 事件队列容量。如果在连接器运行时剩余流事件队列容量接近零,则增加此容量。 |
|
|
任务状态更改事件队列容量。如果在连接器运行时剩余任务状态更改事件队列容量接近零,则增加此容量。 |
|
|
在抛出异常之前,更改流查询错过的心跳的最大数量。 |
|
|
是否启用了任务自动扩展。 |
|
|
Sync 主题的名称。Sync 主题是一个内部连接器主题,用于存储任务之间的通信。 |
|
|
Sync 主题的轮询持续时间。 |
|
|
Sync 主题请求的超时时间。 |
|
|
发布到 Sync 主题的超时时间。 |
|
|
Sync 主题提交偏移量的超时时间。 |
|
|
Sync 主题提交偏移量的间隔。 |
|
|
将消息发布到 Sync 主题的间隔。 |
|
|
Rebalancing 主题的名称。Rebalancing 主题是一个内部连接器主题,用于确定任务的存活性。 |
|
|
Rebalancing 主题的轮询持续时间。 |
|
|
Rebalance 主题提交偏移量的超时时间。 |
|
|
Sync 主题提交偏移量的间隔。 |
|
|
任务在处理 rebalancing 事件之前等待的时间。 |
|
|
定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如: 连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称。 |
|
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
有关更完整的高级配置列表,请参阅 Github 代码。
连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。
出现问题时的行为
Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。
如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。
本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:
-
连接器的配置无效。
-
连接器无法使用指定的连接参数成功连接到 Spanner。
在这些情况下,错误消息将包含有关问题的详细信息,以及可能的建议的解决方法。在您更正配置或解决 Spanner 问题后,重新启动连接器。
Kafka Connect 进程正常停止
假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。
Kafka Connect 进程崩溃
如果 Kafka Connect 进程意外停止,它运行的所有连接器任务都会终止,而不会记录它们最近处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Spanner 连接器会从早期进程记录的最后一个偏移量处恢复。这意味着新的替换任务可能会生成在崩溃前不久处理过的某些相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。
由于在故障恢复期间可能存在事件重复的可能性,因此消费者应始终预料到一些重复事件。
在每个更改事件记录中,Debezium 连接器都会插入有关事件来源的源特定信息,例如原始分区令牌、提交时间戳、事务 ID、记录序号和修改号。消费者可以使用这些标识符进行去重。
Kafka 变得不可用
当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。
连接器停止一段时间
如果连接器被正常停止,数据库可以继续使用。当连接器重新启动时,它会从上次中断的地方继续流式传输更改。也就是说,它会生成连接器停止期间发生的所有数据库更改的更改事件记录。
配置得当的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是按照 Kafka 最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理大量数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能会赶上连接器停止期间发生的数据库更改。这有多快发生取决于 Kafka 的能力和性能以及 Spanner 中数据的更改量。
请注意,当前连接器只能回溯 ~一个小时。Kafka 连接器读取 Kafka 连接器启动时间戳处的信息模式以检索模式信息。默认情况下,Spanner 无法读取版本保留期(默认为一小时)之前的读取时间戳的信息模式。如果您希望从一小时前开始连接器,则需要增加数据库的版本保留期。
局限性
-
该连接器不支持流式传输快照事件。
-
如果在连接器中启用了水位标记,则不能配置Debezium 主题路由转换。
-
此连接器目前不支持Cloud Spanner 的 PostgreSQL 接口。