您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium Cloud Spanner 连接器

Debezium 的 Cloud Spanner 连接器会消耗并输出 Spanner 更改流数据到 Kafka 主题。

Spanner 更改流会近乎实时地监视和流出 Spanner 数据库的数据更改——插入、更新、删除。Spanner 连接器抽象了查询 Spanner 更改流的细节。使用此连接器,您不必管理更改流分区生命周期,而这是您 直接使用 Spanner API 时所必需的。

该连接器目前不支持快照功能。Kafka 连接器第一次连接到 Spanner 数据库时,它将从提供的起始时间戳或当前时间戳(如果未提供时间戳)开始流式传输更改。

概述

要读取和处理数据库更改,连接器会从更改流中进行查询。

连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。

该连接器对故障具有容错能力。当连接器读取更改并生成事件时,它会记录已处理的每个 更改流分区的最后一个提交时间戳。如果连接器由于任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,连接器将从上次中断的地方继续流式传输记录。

连接器的工作原理

为了最佳地配置和运行 Debezium Spanner 连接器,了解连接器如何流式传输更改事件、确定 Kafka 主题名称以及使用元数据非常重要。

流式传输更改

Debezium Spanner 连接器花费所有时间从其订阅的更改流中流式传输更改。当表中发生更改时,Spanner 会在数据库中写入相应的更改流记录,与数据更改同步地在同一个事务中。为了扩展更改流的写入和读取,Spanner 会 along with the database data 分割和合并内部更改流存储。为了支持在数据库写入扩展时近乎实时地读取更改流记录,Spanner API 被设计成可以使用更改流分区来并发查询。请参阅 Spanner 更改流的 分区模型

连接器提供了一个抽象层,用于查询更改流的 Spanner API。有了这个连接器,您就不必管理更改流分区生命周期。连接器为您提供了一个数据更改记录流,因此您可以更专注于应用程序逻辑,而不是具体的 API 细节和动态更改流分区。

当订阅更改流时,连接器需要提供项目 ID、Spanner 实例 ID、Spanner 数据库 ID 以及更改流名称。用户还可以选择性地提供开始时间戳和结束时间戳。有关连接器配置属性的更详细列表,请参阅 本节

当连接器接收到更改时,它会将事件转换为 Debezium 的创建更新删除事件。连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以它们生成的相同顺序异步地将更改事件记录写入适当的 Kafka 主题。

Kafka Connect 会定期在另一个 Kafka 主题中记录最近的偏移量。偏移量指示 Debezium 与每个事件一起包含的源特定位置信息。对于 Spanner 连接器,最后一个已处理的更改流分区的提交时间戳就是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器的最后一个记录的偏移量,并在其最后一个记录的偏移量处启动每个连接器。

Spanner 连接器还在流式传输期间将元数据信息记录在以下元数据主题中。不建议修改这些主题的内容或配置

  • _consumer_offsets:Kafka 自动创建的一个主题。存储 Kafka 连接器中创建的消费者的偏移量。

  • _kafka-connect-offsets:Kafka Connect 自动创建的一个主题。存储连接器偏移量。

  • _sync_topic_spanner_connector_connectorname:连接器自动创建的一个主题。存储有关更改流分区的信息。

  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的一个主题。用于确定连接器任务的存活性。

  • _debezium-heartbeat.connectorname:用于处理 Spanner 更改流心跳的主题。

主题名称

Spanner 连接器将单个表的所有插入、更新和删除操作的事件写入一个 Kafka 主题。默认情况下,Kafka 主题名称为 topicPrefix.connectorName.tableName,其中

  • *topicPrefix* 是由 topic.prefix 连接器配置属性指定的。*topicPrefix* 是由 topic.prefix 连接器配置属性指定的。

  • connectorName 是用户指定的连接器名称。

  • *tableName* 是发生操作的数据库表的名称。

例如,假设 spanner 是配置中用于捕获 Spanner 更改流(跟踪数据库中四个表:table1table2table3table4 的更改)的逻辑名称。该连接器将流式传输记录到这四个 Kafka 主题:

  • spanner.table1

  • spanner.table2

  • spanner.table3

  • spanner.table4

数据更改事件

Debezium Spanner 连接器为每次行级别的 INSERTUPDATEDELETE 操作生成一个数据更改事件。每个事件都包含一个键和一个值。键和值是独立的文档。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 都围绕连续的事件消息流进行设计。但是,这些事件的结构可能会随时间而变化,这会给消费者带来处理困难。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个消费者可以用于从注册表中获取模式的模式 ID。这使得每个事件都自包含。键的模式永远不会改变。请注意,值的模式是自连接器启动以来更改流跟踪的表中所有列的融合。

以下骨架 JSON 文档显示了键和值文档的基本结构。但是,如何配置您选择在应用程序中使用的 Kafka Connect 转换器将决定键和值文档的表示。当您配置转换器以生成它们时,更改事件的键或更改事件的值中才会出现 schema 字段。同样,只有当您配置转换器以生成它们时,才会出现事件键和事件有效负载。如果您使用 JSON 转换器并配置它生成模式,则更改事件具有此结构

// Key
{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 }
}

// Value
{
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 }
}
表 1. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 schema 字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了主键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由前面的 schema 字段描述,它包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认行为是连接器将更改事件记录流式传输到名称与事件源表相同的主题。

从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。

更改事件键

对于给定的表,更改事件的键具有一个结构,该结构包含表在创建事件时主键中每个列的字段。所有键列都将被标记为非可选。

考虑 business 数据库中定义的 users 表以及该表的更改事件键的示例。

示例表
CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
示例更改事件键

对于 users 表,在其具有此定义时,每个更改事件都具有相同的键结构,在 JSON 中看起来像这样

{
  "schema": { (1)
    "type": "struct",
    "name": "Users.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
      {
        "type": "int64",
        "optional": "false",
        "field": "false"
      }
    ]
  },
  "payload": { (5)
      "id": "1"
  },
}
表 2. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

Users.Key

定义键有效负载结构名称的模式。此模式描述了已更改的表的主键的结构。

3

optional

指示事件键的 payload 字段是否必须包含值。主键列始终是必需的。

4

fields (字段)

指定 payload 中预期的每个字段,包括每个字段的名称、类型以及是否可选。

5

payload

包含生成此更改事件的行的键。在此示例中,键包含一个名为 id 的字段,其值为 1

更改事件值

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);

create 事件

以下示例显示了连接器为在 Users 表中创建数据的操作生成的更改事件的值部分。如果 Spanner 列被标记为非可选,则所有插入行的变异都需要其值。Spanner 中的所有主键列都将被标记为非可选。请注意,即使 Spanner 中的非键列被标记为非可选,它在模式中也会显示为可选。只有主键列在模式中被标记为非可选。

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "project_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "instance_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "database_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "change_stream_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "server_transaction_id"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "low_watermark"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "read_at_timestamp"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_records_in_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "transaction_tag"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "system_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "value_capture_type"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "partition_token"
                    }
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "mod_number"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "is_last_record_in_transaction_in_partition"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_partitions_in_transaction"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.spanner.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "connector_name.Users.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.3.0.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863769, (10)
        "ts_ns": 1559033904863769841 (10)
    }
}
表 3. create 事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

3

name (名称)

io.debezium.connector.spanner.Source 是有效负载 source 字段的模式。此模式特定于 Spanner 连接器。连接器为它生成的所有事件使用此模式。

4

name (名称)

connector_name.Users.Envelope 是有效负载整体结构的模式,其中 connector_name 是连接器名称,customers 是表。

5

payload

的实际数据。这是更改事件提供的信息。

6

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

7

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

8

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 如果事件属于快照

  • 事务中数据更改事件的记录序号

  • 项目 ID

  • 实例 ID

  • 数据库 ID

  • 更改流名称

  • 事务 ID

  • 低水位标记,表示连接器已流出所有提交时间戳小于低水位标记时间戳的记录的时间

  • 更改在数据库中发生的提交时间戳

  • 连接器处理更改的时间戳

  • 原始事务中的记录数

  • 事务标签

  • 事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的原始分区令牌

  • 从 Spanner 收到的原始数据更改事件中的修改号

  • 数据更改事件是否为分区中事务的最后一条记录

  • 事务中的总更改流分区数

9

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

10

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

update 事件

示例 Users 表更新的更改事件的值具有与该表的创建事件相同的模式。同样,事件值有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是连接器为 Users 表中的更新生成的事件中的更改事件值的示例

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.3.0.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523614,  (5)
        "ts_ns": 1465584025523614723  (5)
    }
}
表 4. update 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,包含数据库提交之前行中所有列的所有值。

2

after

一个可选字段,指定事件发生后行的状态。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

一个强制性字段,描述事件的源元数据。source 字段结构与*create* 事件中的字段相同,但某些值不同。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(又名 keyspace)和表

  • 如果事件属于快照

  • 事务中数据更改事件的记录序号

  • 项目 ID

  • 实例 ID

  • 数据库 ID

  • 更改流名称

  • 事务 ID

  • 低水位标记,表示连接器已流出所有提交时间戳小于低水位标记时间戳的记录的时间

  • 更改在数据库中发生的提交时间戳

  • 连接器处理更改的时间戳

  • 原始事务中的记录数

  • 事务标签

  • 事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的原始分区令牌

  • 从 Spanner 收到的原始数据更改事件中的修改号

  • 数据更改事件是否为分区中事务的最后一条记录

  • 事务中的总更改流分区数

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

delete 事件

delete 更改事件中的值具有与同一表的创建更新事件相同的 schema 部分。示例 Users 表的delete 事件中的 payload 部分看起来像这样

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.3.0.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461425, (5)
        "ts_ns": 1465581902461425378 (5)
    }
}
表 5. delete 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

一个强制性字段,描述事件的源元数据。在*delete* 事件值中,source 字段结构与同一表的*create* 和*update* 事件相同。许多 source 字段值也相同。在*delete* 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是,*delete* 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(又名 keyspace)和表

  • 如果事件属于快照

  • 事务中数据更改事件的记录序号

  • 项目 ID

  • 实例 ID

  • 数据库 ID

  • 更改流名称

  • 事务 ID

  • 低水位标记,表示连接器已流出所有提交时间戳小于低水位标记时间戳的记录的时间

  • 更改在数据库中发生的提交时间戳

  • 连接器处理更改的时间戳

  • 原始事务中的记录数

  • 事务标签

  • 事务是否为系统事务

  • 值捕获类型

  • 用于查询此更改事件的原始分区令牌

  • 从 Spanner 收到的原始数据更改事件中的修改号

  • 数据更改事件是否为分区中事务的最后一条记录

  • 事务中的总更改流分区数

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。

Spanner 连接器事件设计用于与 Kafka 日志压缩配合使用。日志压缩允许删除一些较旧的消息,只要保留每个键的至少最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。请注意,如果启用了低水位标记,则不应启用压缩。

Tombstone events (墓碑事件)

当行被删除时,delete 事件值仍然可以与日志压缩配合使用,因为 Kafka 可以删除具有相同键的所有先前消息。但是,为了让 Kafka 删除所有具有相同键的消息,消息值必须为 null。为了实现这一点,连接器会向一个特殊的墓碑事件发送一个delete 事件,该事件具有相同的键但值为 null

数据类型映射

Spanner 连接器用结构化的事件表示行的更改,这些事件的结构与行所在的表相似。事件包含一个字段用于每个列的值。该值在事件中的表示方式取决于列的 Spanner 数据类型。本节将描述这些映射。

Spanner 类型

表 6. Spanner 数据类型的映射
Spanner 数据类型 字面类型 (模式类型)

BOOLEAN

BOOLEAN

INT64

INT64

ARRAY

ARRAY

BYTES

BYTES

STRING

STRING

FLOAT32

FLOAT

FLOAT64

DOUBLE

NUMERIC

STRING

TIMESTAMP

STRING

NUMERIC

STRING

设置 Spanner

核对清单
  • 确保提供项目 ID、Spanner 实例 ID、Spanner 数据库 ID 和更改流名称。有关如何创建更改流,请参阅 文档

  • 确保创建并配置一个具有适当凭据的 GCP 服务帐户。服务帐户密钥需要在连接器配置中提供。有关服务帐户的更多 信息

请参阅以下部分,了解如何配置 Debezium Spanner 连接器。

部署

安装 KafkaKafka Connect 后,部署 Debezium Spanner 连接器的剩余任务是下载连接器的插件存档,将 JAR 文件解压到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。

如果您使用不可变容器,请参阅 Debezium 的容器镜像,其中包含已安装并准备好运行的 Spanner 连接器的 Kafka 和 Kafka Connect。您也可以 在 Kubernetes 和 OpenShift 上运行 Debezium

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

连接器配置示例

以下是一个 Spanner 连接器的配置示例,该连接器连接到项目 Project、实例 Instance、数据库 Database 中名为 changeStreamAll 的更改流。

您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。

{
  "name": "spanner-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector", (2)
    "gcp.spanner.change.stream": "changeStreamAll", (3)
    "gcp.spanner.project.id": "Project", (4)
    "gcp.spanner.instance.id": "Instance", (5)
    "gcp.spanner.database.id": "Database", (6)
    "gcp.spanner.credentials.json": <key.json>, (7)
    "tasks.max": 1 (8)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 Spanner 连接器类的名称。
3 更改流名称。
4 GCP 项目 ID。
5 Spanner 实例 ID。
6 Spanner 数据库 ID。
7 GCP 服务帐户密钥 JSON。
8 最大任务数。

有关可在这些配置中指定的完整 Spanner 连接器属性列表,请参阅

添加连接器配置

要开始运行 Spanner 连接器,请创建连接器配置并将配置添加到您的 Kafka Connect 集群。

先决条件
  • Spanner 更改流已创建并可用。

  • Spanner 连接器已安装。

过程
  1. 为 Spanner 连接器创建配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

监控

除了 Kafka 和 Kafka Connect 提供的 JMX 指标的内置支持外,Debezium Spanner 连接器还提供一种指标。

  • 流式传输指标提供有关连接器捕获更改和流式传输更改事件记录时连接器操作的信息。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 1. 自定义标签如何修改连接器 MBean 名称

默认情况下,Spanner 连接器使用以下 MBean 名称进行流式传输指标

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

流式传输指标

MBeandebezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>

Attributes Type 描述

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。

long

连接器任务的当前低水位标记。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。

long

连接器任务的当前低水位标记(以毫秒为单位)。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。

long

低水位标记落后于当前时间的毫秒延迟。低水位标记描述了连接器保证已流出所有时间戳小于 T 的事件的时间 T。

long

低水位标记落后于当前时间的延迟分布(以毫秒为单位)。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到连接器读取延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 读取时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

连接器发出时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

总 Spanner 事件队列容量。此队列指示 StreamEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储从更改流查询接收到的元素。

long

剩余的 Spanner 事件队列容量。

long

总任务状态更改事件队列容量。此队列指示 TaskStateChangeEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储连接器中发生的事件。

long

剩余的任务状态更改事件队列容量。

long

当前任务检测到的分区总数。

long

当前任务发出的更改流查询总数。

long

当前任务检测到的活动更改流查询数。

连接器配置属性

Debezium Spanner 连接器有许多配置属性,可用于为您的应用程序实现正确的连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性是必需的,除非有默认值可用。

表 7. 必需的连接器配置属性
属性 Default (默认值) 描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名。对于 Spanner 连接器,始终使用 io.debezium.connector.spanner.SpannerConnector 的值。

1

应为此连接器创建的最大任务数。如果启用 offset.storage.per.task 模式,Spanner 连接器可以使用多个任务。

无默认值

GCP 项目 ID

无默认值

Spanner 实例 ID

无默认值

Spanner 数据库 ID

无默认值

Spanner 更改流

无默认值

GCP 服务帐户密钥 JSON 的文件路径。

无默认值

GCP 服务帐户密钥 JSON。如果未提供 gcp.spanner.credentials.path,则需要此项。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。

表 8. 高级连接器配置属性
属性 Default (默认值) 描述

false

是否为连接器启用了低水位标记。

1000 毫秒

低水位标记更新的间隔。

300000

Spanner 心跳间隔。

当前时间

连接器启动时间。

无限结束时间

连接器结束时间。

10000

Spanner 事件队列容量。如果在连接器运行时剩余流事件队列容量接近零,则增加此容量。

1000

任务状态更改事件队列容量。如果在连接器运行时剩余任务状态更改事件队列容量接近零,则增加此容量。

5

在抛出异常之前,更改流查询错过的心跳的最大数量。

false

是否启用了任务自动扩展。

sync_topic_spanner_connector<connectorname>

Sync 主题的名称。Sync 主题是一个内部连接器主题,用于存储任务之间的通信。

500 毫秒

Sync 主题的轮询持续时间。

5000 毫秒

Sync 主题请求的超时时间。

15000 毫秒

发布到 Sync 主题的超时时间。

5000 毫秒

Sync 主题提交偏移量的超时时间。

60000 毫秒

Sync 主题提交偏移量的间隔。

5 毫秒

将消息发布到 Sync 主题的间隔。

rebalancing_topic_spanner_connector<connectorname>

Rebalancing 主题的名称。Rebalancing 主题是一个内部连接器主题,用于确定任务的存活性。

5000

Rebalancing 主题的轮询持续时间。

5000

Rebalance 主题提交偏移量的超时时间。

60000 毫秒

Sync 主题提交偏移量的间隔。

1000 毫秒

任务在处理 rebalancing 事件之前等待的时间。

无默认值

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如:
k1=v1,k2=v2

连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

有关更完整的高级配置列表,请参阅 Github 代码

直通连接器配置属性

连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。

请务必参考 Kafka 文档以获取 Kafka 生产者和消费者的所有配置属性。Spanner 连接器确实使用了 新的消费者配置属性

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 Spanner。

在这些情况下,错误消息将包含有关问题的详细信息,以及可能的建议的解决方法。在您更正配置或解决 Spanner 问题后,重新启动连接器。

Spanner 变得不可用

当连接器运行时,Spanner 可能因任何原因而变得不可用。连接器将继续运行,并在 Spanner 再次可用时能够流式传输事件。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。

Kafka Connect 进程崩溃

如果 Kafka Connect 进程意外停止,它运行的所有连接器任务都会终止,而不会记录它们最近处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Spanner 连接器会从早期进程记录的最后一个偏移量处恢复。这意味着新的替换任务可能会生成在崩溃前不久处理过的某些相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。

由于在故障恢复期间可能存在事件重复的可能性,因此消费者应始终预料到一些重复事件。

在每个更改事件记录中,Debezium 连接器都会插入有关事件来源的源特定信息,例如原始分区令牌、提交时间戳、事务 ID、记录序号和修改号。消费者可以使用这些标识符进行去重。

Kafka 变得不可用

当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。

连接器停止一段时间

如果连接器被正常停止,数据库可以继续使用。当连接器重新启动时,它会从上次中断的地方继续流式传输更改。也就是说,它会生成连接器停止期间发生的所有数据库更改的更改事件记录。

配置得当的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是按照 Kafka 最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理大量数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能会赶上连接器停止期间发生的数据库更改。这有多快发生取决于 Kafka 的能力和性能以及 Spanner 中数据的更改量。

请注意,当前连接器只能回溯 ~一个小时。Kafka 连接器读取 Kafka 连接器启动时间戳处的信息模式以检索模式信息。默认情况下,Spanner 无法读取版本保留期(默认为一小时)之前的读取时间戳的信息模式。如果您希望从一小时前开始连接器,则需要增加数据库的版本保留期。

局限性