Debezium Cloud Spanner 连接器

Debezium 的 Cloud Spanner 连接器会消费并输出 Spanner 变更流数据到 Kafka 主题。

Spanner 变更流 会近实时地监视并流出 Spanner 数据库的数据变更——插入、更新、删除。Spanner 连接器抽象了查询 Spanner 变更流的细节。使用此连接器,您无需管理变更流分区生命周期,而这是您 直接使用 Spanner API 时必需的。

目前连接器不支持快照功能。Kafka 连接器第一次连接到 Spanner 数据库时,它会从提供的或当前时间戳开始流式传输变更。

概述

为了读取和处理数据库变更,连接器会从变更流进行查询。

连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。

该连接器具有容错能力。当连接器读取变更并生成事件时,它会记录每个 变更流分区 已处理的最后提交时间戳。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在重新启动后,连接器将从上次停止的地方继续流式传输记录。

连接器的工作原理

为了优化配置和运行 Debezium Spanner 连接器,了解连接器如何流式传输变更事件、确定 Kafka 主题名称以及使用元数据会很有帮助。

流式传输变更

Debezium Spanner 连接器花费所有时间从其订阅的变更流流式传输变更。当表发生变更时,Spanner 会在数据库中写入相应的变更流记录,这与数据变更在同一事务中同步进行。为了扩展变更流的写入和读取,Spanner 会 along with the database data 将内部变更流存储进行分割和合并。为了支持在数据库写入扩展时近实时地读取变更流记录,Spanner API 被设计成可以使用变更流分区并发地查询。有关 Spanner 变更流的 分区模型,请参阅。

连接器提供了一个抽象层,用于查询变更流的 Spanner API。有了这个连接器,您就不必管理变更流分区生命周期。该连接器为您提供了数据变更记录的流,因此您可以更专注于应用程序逻辑,而不是具体的 API 细节和动态的变更流分区。

订阅变更流时,连接器需要提供项目 ID、Spanner 实例 ID、Spanner 数据库 ID 以及变更流名称。可选地,用户还可以提供开始时间戳和结束时间戳。有关连接器配置属性的更详细列表,请参阅 本节

当连接器接收到变更时,它会将事件转换为 Debezium 的 *create*、*update* 或 *delete* 事件。连接器会将这些变更事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以生成的相同顺序将变更事件记录异步写入到相应的 Kafka 主题。

Kafka Connect 会定期在另一个 Kafka 主题中记录最新的 *offset*。offset 指示 Debezium 在每个事件中包含的特定于源的位置信息。对于 Spanner 连接器,变更流分区的最后处理提交时间戳就是 offset。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后 offset。当 Kafka Connect 重新启动时,它会读取每个连接器最后记录的 offset,并从最后记录的 offset 处启动每个连接器。

Spanner 连接器在流式传输期间还在以下元数据主题中记录元数据信息。不建议修改这些主题的内容或配置。

  • _consumer_offsets:Kafka 自动创建的主题。存储 Kafka 连接器中创建的消费者的偏移量。

  • _kafka-connect-offsets:Kafka Connect 自动创建的主题。存储连接器偏移量。

  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储有关变更流分区的信息。

  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务的活动状态。

  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更流心跳的主题。

主题名称

Spanner 连接器将单个表上所有插入、更新和删除操作的事件写入单个 Kafka 主题。默认情况下,Kafka 主题名称为 *topicPrefix*.connectorName.tableName,其中:

  • *topicPrefix* 是由 topic.prefix 连接器配置属性指定的。*topicPrefix* 是由 topic.prefix 连接器配置属性指定的。

  • connectorName 是用户指定的连接器名称。

  • *tableName* 是发生操作的数据库表的名称。

例如,假设 `spanner` 是配置中用于捕获 Spanner 变更流的逻辑名称,该变更流跟踪包含四个表:`table1`、`table2`、`table3` 和 `table4` 的数据库的变更。该连接器会将记录流式传输到以下四个 Kafka 主题:

  • spanner.table1

  • spanner.table2

  • spanner.table3

  • spanner.table4

数据变更事件

Debezium Spanner 连接器为每个行级别的 `INSERT`、`UPDATE` 和 `DELETE` 操作生成一个数据变更事件。每个事件都包含一个键和一个值。键和值是独立的文档。键和值的结构取决于被更改的表。

Debezium 和 Kafka Connect 都围绕着*持续的事件消息流*而设计。然而,这些事件的结构可能会随时间而变化,这会给消费者带来处理上的困难。为了解决这个问题,每个事件都包含其内容的 schema,或者,如果您正在使用 schema registry,则包含一个消费者可用于从注册表中获取 schema 的 schema ID。这使得每个事件都是自包含的。键的 schema 永远不会改变。请注意,值的 schema 是自连接器启动以来变更流跟踪的表中所有列的集合。

以下骨架 JSON 文档显示了键和值文档的基本结构。但是,您如何在应用程序中配置您选择的 Kafka Connect 转换器决定了键和值文档的表示方式。仅当您配置转换器生成 schema 时,`schema` 字段才会出现在变更事件的键或值中。同样,仅当您配置转换器生成时,事件键和事件负载才会出现。如果您使用 JSON 转换器并配置它生成 schema,则变更事件具有此结构:

// Key
{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 }
}

// Value
{
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 }
}
表 1. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 `schema` 字段是事件键的一部分。它指定了一个 Kafka Connect schema,该 schema 描述了事件键的 `payload` 部分的内容。换句话说,第一个 `schema` 字段描述了主键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由前面的 schema 字段描述,它包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认行为是连接器将变更事件记录流式传输到*与事件的源表同名的主题*。此处

从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。

变更事件键

对于给定的表,变更事件的键的结构包含一个字段,对应于事件创建时表中主键的每个列。所有键列都将被标记为非可选。

考虑 `business` 数据库中定义的 `users` 表以及该表的变更事件键的示例。

示例表
CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);
示例更改事件键

在定义如下的 `users` 表的所有变更事件中,键的结构是相同的,在 JSON 中如下所示:

{
  "schema": { (1)
    "type": "struct",
    "name": "Users.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
      {
        "type": "int64",
        "optional": "false",
        "field": "false"
      }
    ]
  },
  "payload": { (5)
      "id": "1"
  },
}
表 2. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

Users.Key

定义键的 payload 结构所使用的 schema 的名称。此 schema 描述了已更改表的主键结构。

3

optional

指示事件键的 `payload` 字段是否必须包含一个值。主键列始终是必需的。

4

fields (字段)

指定 `payload` 中预期的每个字段,包括每个字段的名称、类型以及是否可选。

5

payload

包含生成此更改事件的行的键。在此示例中,键包含一个名为 id 的字段,其值为 1

变更事件值

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE Users (
  id INT64 NOT NULL,
  username STRING(MAX) NOT NULL,
  password STRING(MAX) NOT NULL,
  email STRING(MAX) NOT NULL)
PRIMARY KEY (id);

*create* 事件

下面的示例显示了连接器为 `Users` 表中的创建操作生成的变更事件的值部分。如果 Spanner 列被标记为非可选,则所有插入行的变异都需要其值。Spanner 中的所有主键列都将被标记为非可选。请注意,即使 Spanner 中的非键列被标记为非可选,在 schema 中它仍将被显示为可选。只有主键列在 schema 中被标记为非可选。

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Users.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "project_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "instance_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "database_id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "change_stream_name"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "table"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "server_transaction_id"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "low_watermark"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "read_at_timestamp"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_records_in_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "transaction_tag"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "system_transaction"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "value_capture_type"
                    }
                    {
                        "type": "string",
                        "optional": true,
                        "field": "partition_token"
                    }
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "mod_number"
                    }
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "is_last_record_in_transaction_in_partition"
                    }
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "number_of_partitions_in_transaction"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.spanner.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "connector_name.Users.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.3.1.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863769, (10)
        "ts_ns": 1559033904863769841 (10)
    }
}
表 3. create 事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

3

name (名称)

io.debezium.connector.spanner.Source 是 payload 中 `source` 字段的 schema。此 schema 特定于 Spanner 连接器。连接器将其用于它生成的所有事件。

4

name (名称)

connector_name.Users.Envelope 是 payload 整体结构的 schema,其中 `connector_name` 是连接器名称,`customers` 是表。

5

payload

的实际数据。这是更改事件提供的信息。

6

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

7

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

8

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 如果事件属于快照

  • 事务中数据变更事件的记录序列号。

  • 项目 ID。

  • 实例 ID。

  • 数据库 ID。

  • 变更流名称。

  • 事务 ID。

  • 低水位标记,表示连接器已流出所有 commit timestamp 小于 low watermark timestamp 的记录的时间。

  • 数据库中变更发生时的提交时间戳。

  • 连接器处理变更的时间戳。

  • 原始事务中的记录数。

  • 事务标签。

  • 事务是否为系统事务。

  • 值捕获类型。

  • 用于查询此变更事件的原始分区 token。

  • 从 Spanner 收到的原始数据变更事件中的修改号。

  • 数据变更事件是否是分区中事务的最后一条记录。

  • 事务中变更流分区的总数。

9

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

10

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

*update* 事件

用于示例 `Users` 表中更新的变更事件的值与该表的 *create* 事件具有相同的 schema。同样,事件值 payload 具有相同的结构。但是,*update* 事件中的事件值 payload 包含不同的值。下面是一个连接器为 `Users` 表中的更新生成的事件中的变更事件值示例:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523614,  (5)
        "ts_ns": 1465584025523614723  (5)
    }
}
表 4. update 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,包含数据库提交之前行中所有列的所有值。

2

after

一个可选字段,指定事件发生后行的状态。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

一个强制性字段,描述事件的源元数据。source 字段结构与*create* 事件中的字段相同,但某些值不同。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(又名 keyspace)和表。

  • 如果事件属于快照

  • 事务中数据变更事件的记录序列号。

  • 项目 ID。

  • 实例 ID。

  • 数据库 ID。

  • 变更流名称。

  • 事务 ID。

  • 低水位标记,表示连接器已流出所有 commit timestamp 小于 low watermark timestamp 的记录的时间。

  • 数据库中变更发生时的提交时间戳。

  • 连接器处理变更的时间戳。

  • 原始事务中的记录数。

  • 事务标签。

  • 事务是否为系统事务。

  • 值捕获类型。

  • 用于查询此变更事件的原始分区 token。

  • 从 Spanner 收到的原始数据变更事件中的修改号。

  • 数据变更事件是否是分区中事务的最后一条记录。

  • 事务中变更流分区的总数。

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

*delete* 事件

在 *delete* 变更事件中的值与同一表的 *create* 和 *update* 事件具有相同的 `schema` 部分。对于示例 `Users` 表,*delete* 事件中的 `payload` 部分如下所示:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "spanner",
            "name": "spanner_connector",
            "ts_ms": 1670955531785,
            "ts_us": 1670955531785000,
            "ts_ns": 1670955531785000000,
            "snapshot": "false",
            "db": "database",
            "sequence": "1",
            "project_id": "project",
            "instance_id": "instance",
            "database_id": "database",
            "change_stream_name": "change_stream",
            "table": "Users",
            "server_transaction_id": "transaction_id",
            "low_watermark": 1670955471635,
            "read_at_timestamp": 1670955531791,
            "number_records_in_transaction": 2,
            "transaction_tag": "",
            "system_transaction": false,
            "value_capture_type": "OLD_AND_NEW_VALUES",
            "partition_token": "partition_token",
            "mod_number": 0,
            "is_last_record_in_transaction_in_partition": true,
            "number_of_partitions_in_transaction": 1
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461425, (5)
        "ts_ns": 1465581902461425378 (5)
    }
}
表 5. delete 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

一个强制性字段,描述事件的源元数据。在*delete* 事件值中,source 字段结构与同一表的*create* 和*update* 事件相同。许多 source 字段值也相同。在*delete* 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是,*delete* 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(又名 keyspace)和表。

  • 如果事件属于快照

  • 事务中数据变更事件的记录序列号。

  • 项目 ID。

  • 实例 ID。

  • 数据库 ID。

  • 变更流名称。

  • 事务 ID。

  • 低水位标记,表示连接器已流出所有 commit timestamp 小于 low watermark timestamp 的记录的时间。

  • 数据库中变更发生时的提交时间戳。

  • 连接器处理变更的时间戳。

  • 原始事务中的记录数。

  • 事务标签。

  • 事务是否为系统事务。

  • 值捕获类型。

  • 用于查询此变更事件的原始分区 token。

  • 从 Spanner 收到的原始数据变更事件中的修改号。

  • 数据变更事件是否是分区中事务的最后一条记录。

  • 事务中变更流分区的总数。

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。

Spanner 连接器事件被设计为与 Kafka 日志压缩 一起工作。日志压缩允许删除一些较旧的消息,只要保留每个键的最新消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。请注意,如果启用了低水位标记,则不应启用压缩。

Tombstone events (墓碑事件)

当一行被删除时,*delete* 事件值仍然可以与日志压缩一起工作,因为 Kafka 可以删除所有具有相同键的先前消息。但是,要让 Kafka 删除所有具有相同键的消息,消息值必须为 `null`。为了实现这一点,连接器会在 *delete* 事件之后发送一个特殊的 *tombstone* 事件,该事件具有相同的键但值为 `null`。

数据类型映射

Spanner 连接器使用结构与行所在的表相似的事件来表示行的变更。事件包含一个字段用于每个列的值。该值在事件中的表示方式取决于该列的 Spanner 数据类型。本节描述了这些映射。

Spanner 类型

表 6. Spanner 数据类型的映射
Spanner 数据类型 字面类型 (模式类型)

BOOLEAN

BOOLEAN

INT64

INT64

ARRAY

ARRAY

BYTES

BYTES

STRING

STRING

FLOAT32

FLOAT

FLOAT64

DOUBLE

NUMERIC

STRING

TIMESTAMP

STRING

NUMERIC

STRING

设置 Spanner

核对清单
  • 请务必提供项目 ID、Spanner 实例 ID、Spanner 数据库 ID 和变更流名称。有关如何创建变更流,请参阅 文档

  • 请务必创建并配置一个具有适当凭据的 GCP 服务账户。服务账户密钥需要在连接器配置中提供。有关服务账户的更多信息,请参阅此处。

有关如何配置 Debezium Spanner 连接器的信息,请参阅以下部分。

部署

安装 KafkaKafka Connect 后,部署 Debezium Spanner 连接器的剩余任务是下载连接器的插件存档,将 JAR 文件解压到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 `plugin.path`。然后您需要重启 Kafka Connect 进程以识别新的 JAR 文件。

如果您使用不可变容器,请参阅 Debezium 的容器镜像,其中已安装了 Kafka 和 Kafka Connect 以及 Spanner 连接器,并且已准备好运行。您也可以 在 Kubernetes 和 OpenShift 上运行 Debezium

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

连接器配置示例

以下是一个 Spanner 连接器的配置示例,该连接器连接到项目 `Project`、实例 `Instance` 和数据库 `Database` 中的名为 `changeStreamAll` 的变更流。

您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。

{
  "name": "spanner-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector", (2)
    "gcp.spanner.change.stream": "changeStreamAll", (3)
    "gcp.spanner.project.id": "Project", (4)
    "gcp.spanner.instance.id": "Instance", (5)
    "gcp.spanner.database.id": "Database", (6)
    "gcp.spanner.credentials.json": <key.json>, (7)
    "tasks.max": 1 (8)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 Spanner 连接器类的名称。
3 变更流名称。
4 GCP 项目 ID。
5 Spanner 实例 ID。
6 Spanner 数据库 ID。
7 GCP 服务账户密钥 JSON。
8 最大任务数。

请参阅 Spanner 连接器属性的完整列表,这些属性可以在这些配置中指定。

添加连接器配置

要开始运行 Spanner 连接器,请创建连接器配置并将其添加到您的 Kafka Connect 集群。

先决条件
  • Spanner 变更流已创建并可用。

  • Spanner 连接器已安装。

过程
  1. 为 Spanner 连接器创建配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

监控

除了 Kafka 和 Kafka Connect 提供的 JMX 指标内置支持外,Debezium Spanner 连接器还提供一种类型的指标。

  • 流式传输指标 提供有关连接器在捕获变更和流式传输变更事件记录时的操作信息。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 1. 自定义标签如何修改连接器 MBean 名称

默认情况下,Spanner 连接器对流式传输指标使用以下 MBean 名称:

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

流式传输指标

MBeandebezium.Cloud Spanner:type=connector-metrics,context=streaming,server=<topic.prefix>

Attributes Type 描述

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。

long

连接器任务的当前低水位标记。低水位标记描述了时间 T,在该时间 T,连接器保证已流出所有时间戳小于 T 的事件。

long

连接器任务的当前低水位标记(以毫秒为单位)。低水位标记描述了时间 T,在该时间 T,连接器保证已流出所有时间戳小于 T 的事件。

long

低水位标记落后于当前时间的毫秒数。低水位标记描述了时间 T,在该时间 T,连接器保证已流出所有时间戳小于 T 的事件。

long

低水位标记落后于当前时间的毫秒数延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到连接器读取延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 读取时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到连接器发出延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

Spanner 提交时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

连接器发出时间戳到 Kafka 发布时间戳延迟的分布。variant 将包括 P50、P95、P99、平均值、最小值、最大值计算。

long

总 Spanner 事件队列容量。此队列表示 StreamEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储从变更流查询接收到的元素。

long

剩余 Spanner 事件队列容量。

long

总任务状态变更事件队列容量。此队列表示 TaskStateChangeEventQueue 的总容量,这是一个 Spanner 特定的队列,用于存储连接器中发生的事件。

long

剩余任务状态变更事件队列容量。

long

当前任务检测到的分区总数。

long

当前任务发出的变更流查询总数。

long

当前任务检测到的活动变更流查询数量。

连接器配置属性

Debezium Spanner 连接器有许多配置属性,您可以使用它们来实现适合您应用程序的正确连接器行为。许多属性都有默认值。属性信息组织如下:

以下配置属性是必需的,除非有默认值可用。

表 7. 必需的连接器配置属性
属性 Default (默认值) 描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名。对于 Spanner 连接器,始终使用 io.debezium.connector.spanner.SpannerConnector 的值。

1

应为此连接器创建的最大任务数。如果您启用了 offset.storage.per.task 模式,Spanner 连接器可以使用多个任务。

无默认值

GCP 项目 ID。

无默认值

Spanner 实例 ID。

无默认值

Spanner 数据库 ID。

无默认值

Spanner 变更流。

无默认值

GCP 服务账户密钥 JSON 的文件路径。

无默认值

GCP 服务账户密钥 JSON。如果未提供 gcp.spanner.credentials.path,则必需。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。

表 8. 高级连接器配置属性
属性 Default (默认值) 描述

false

是否为连接器启用了低水位标记。

1000 毫秒。

更新低水位标记的间隔。

300000

Spanner 心跳间隔。

当前时间。

连接器开始时间。

无限结束时间。

连接器结束时间。

10000

Spanner 事件队列容量。如果在连接器运行时剩余流事件队列容量接近零,则增加此容量。

1000

任务状态变更事件队列容量。如果在连接器运行时剩余任务状态变更事件队列容量接近零,则增加此容量。

5

在抛出异常之前,变更流查询的最大遗漏心跳数。

false

是否启用了任务自动扩展。

sync_topic_spanner_connector<connectorname>

Sync 主题的名称。Sync 主题是用于存储任务之间通信的内部连接器主题。

500 毫秒。

Sync 主题的轮询时长。

5000 毫秒。

Sync 主题请求的超时时间。

15000 毫秒。

发布到 Sync 主题的超时时间。

5000 毫秒。

Sync 主题提交偏移量的超时时间。

60000 毫秒。

Sync 主题提交偏移量的间隔。

5 毫秒。

发布到 Sync 主题的消息间隔。

rebalancing_topic_spanner_connector<connectorname>

Rebalancing 主题的名称。Rebalancing 主题是用于确定任务活动状态的内部连接器主题。

5000

Rebalancing 主题的轮询时长。

5000

Rebalance 主题提交偏移量的超时时间。

60000 毫秒。

Sync 主题提交偏移量的间隔。

1000 毫秒。

任务在处理 Rebalancing 事件之前等待的时间。

无默认值

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如:
k1=v1,k2=v2

连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

有关高级配置的更完整列表,请参阅 Github 代码

直通连接器配置属性

连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。

请务必参考 Kafka 文档,了解所有 Kafka producer 和 consumer 的配置属性。Spanner 连接器确实使用了 新的 consumer 配置属性

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 Spanner。

在这些情况下,错误消息将包含问题的详细信息,甚至可能提供建议的解决方案。在纠正配置或解决 Spanner 问题后,请重新启动连接器。

Spanner 不可用

当连接器运行时,Spanner 可能由于各种原因而变得不可用。一旦 Spanner 再次可用,连接器将继续运行并能够流式传输事件。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。

Kafka Connect 进程崩溃

如果 Kafka Connector 进程意外停止,它运行的任何连接器任务都会在未记录其最新处理的偏移量的情况下终止。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。然而,Spanner 连接器将从之前进程*记录*的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前不久已处理过的相同变更事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据变更量。

由于故障恢复期间可能会发生事件重复,因此消费者应始终预计会有一些重复事件。

在每个变更事件记录中,Debezium 连接器会插入关于事件来源的特定于源的信息,例如原始分区 token、提交时间戳、事务 ID、记录序列号和修改号。消费者可以使用这些标识符进行去重。

Kafka 不可用

当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。

连接器停止一段时间

如果连接器被正常停止,数据库可以继续使用。当连接器重新启动时,它会从上次中断的地方继续流式传输变更。也就是说,它会为在连接器停止期间发生的所有数据库变更生成变更事件记录。

一个配置良好的 Kafka 集群能够处理巨大的吞吐量。Kafka Connect 是根据 Kafka 的最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理大量数据库变更事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很有可能会赶上在它停止期间发生的数据库变更。这需要多长时间取决于 Kafka 的能力和性能以及 Spanner 数据所做的变更量。

请注意,当前的连接器只能回溯大约一个小时。Kafka 连接器在 Kafka 连接器开始时间戳读取信息 schema 以检索 schema 信息。默认情况下,Spanner 无法读取早于版本保留期(默认为一小时)的读取时间戳。如果您想从过去一小时之前的时间点开始连接器,您需要增加数据库的版本保留期。

限制