Debezium Vitess 连接器

Debezium 的 Vitess 连接器会捕获 Vitess keyspace 中分片(shards)的行级更改。有关此连接器兼容的 Vitess 版本的信息,请参阅 Debezium 版本概述

当连接器首次连接到 Vitess 集群时,它会对 keyspace 进行一致性快照。快照完成后,连接器会持续捕获提交到 Vitess keyspace 的行级更改,以插入、更新或删除数据库内容。连接器会生成数据更改事件记录,并将它们流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到一个单独的 Kafka 主题中。应用程序和服务随后可以从生成的主题中消费数据更改事件记录。

概述

Vitess 的 VStream 功能在 4.0 版本中引入。它是一种更改事件订阅服务,可提供与 Vitess 集群底层 MySQL 分片二进制日志等效的信息。用户可以订阅 keyspace 中的多个分片,这使其成为为下游 CDC 流程提供数据的便捷工具。

为了读取和处理数据库更改,Vitess 连接器会订阅 VTGate 的 VStream gRPC 服务。VTGate 是一个轻量级的无状态 gRPC 服务器,是 Vitess 集群设置的一部分。

连接器允许您灵活地选择订阅 MASTER 节点或 REPLICA 节点以获取更改事件。

连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。

Vitess 中的底层 MySQL 实现会根据某些可配置的时间段清除二进制日志。由于二进制日志的内容可能不完整,因此连接器需要另一种机制来确保捕获特定数据库的完整内容。因此,当连接器首次连接到数据库时,它会执行数据库的一致性快照。在连接器完成快照后,它会继续从快照完成的确切点开始流式传输更改。通过这种方式,连接器从所有数据的统一视图开始,并且不会遗漏在快照期间进行的任何更改。

连接器具有容错能力。当连接器读取更改并生成事件时,它会记录每个事件的 Vitess 全局事务 ID (VGTID) 位置。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在连接器重新启动后,它将从最后一个已存储的更改事件条目继续从 VStream 读取。此行为不适用于快照。如果连接器在快照期间停止,重新启动后,连接器不会从上次中断的地方继续执行快照。稍后我们将讨论连接器在 出现问题时 的行为。

连接器的工作原理

为了最优地配置和运行 Debezium Vitess 连接器,理解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据将非常有帮助。

快照

通常,MySQL 服务器未配置为在二进制日志中保留数据库的完整历史记录。因此,连接器无法从二进制日志中读取数据库的整个历史记录。出于此原因,连接器首次启动时,它会执行数据库的初始一致性快照。您可以通过将 snapshot.mode 连接器配置属性设置为 initial 以外的值来更改此行为。此快照功能基于 7.0 版本中引入的 VStream Copy

自动重试失败的快照将在将来的版本中提供。

流式传输更改

Vitess 连接器花费所有时间从其订阅的 VTGate 的 VStream gRPC 服务流式传输更改。客户端在某些位置(称为 VGTID)接收提交到底层 MySQL 服务器 binlog 的更改,这些位置被称为 VGTID。

Vitess 中的 VGTID 等同于 MySQL 中的 GTID,它描述了更改事件发生在 VStream 中的位置。通常,VGTID 包含多个分片 GTID,每个分片 GTID 是一个 (Keyspace, Shard, GTID) 的元组,用于描述给定分片的 GTID 位置。

订阅 VStream 服务时,连接器需要提供 VGTID 和 Tablet 类型(例如 MASTERREPLICA)。VGTID 描述了 VStream 应开始发送更改事件的位置;Tablet 类型描述了每个分片中我们从中读取更改事件的底层 MySQL 实例(主节点或副本节点)。

首次连接 Vitess 集群时,连接器会从一个名为 VTCtld 的 Vitess 组件获取当前的 VGTID,并将当前的 VGTID 提供给 VStream。

Debezium Vitess 连接器充当 VStream 的 gRPC 客户端。当连接器接收到更改时,它会将事件转换为 Debezium 的创建更新删除事件,其中包括事件的 VGTID。Vitess 连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以生成顺序异步地将更改事件记录写入相应的 Kafka 主题。

Kafka Connect 定期将最近的偏移量记录在另一个 Kafka 主题中。偏移量指示 Debezium 随每个事件包含的特定于源的位置信息。对于 Vitess 连接器,在每个更改事件中记录的 VGTID 就是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器的最后一个记录的偏移量,并从最后一个记录的偏移量开始重新启动每个连接器。当连接器重新启动时,它会向 VStream 发送一个请求,以从该位置之后的第一个事件开始发送事件。

主题名称

Vitess 连接器将对单个表的所有插入、更新和删除操作的事件写入同一个 Kafka 主题。默认情况下,Kafka 主题名称为 topicPrefix.keyspaceName.tableName,其中

  • *topicPrefix* 是由 topic.prefix 连接器配置属性指定的。*topicPrefix* 是由 topic.prefix 连接器配置属性指定的。

  • keyspaceName 是操作发生的 keyspace(也称为数据库)的名称。

  • *tableName* 是发生操作的数据库表的名称。

例如,假设 fulfillment 是一个连接器在具有 commerce keyspace 的 Vitess 安装中捕获更改的配置中的逻辑服务器名称,该 keyspace 包含四个表:productsproducts_on_handcustomersorders。无论 keyspace 有多少分片,连接器都会将记录流式传输到这四个 Kafka 主题:

  • fulfillment.commerce.products

  • fulfillment.commerce.products_on_hand

  • fulfillment.commerce.customers

  • fulfillment.commerce.orders

事务元数据

Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示。

ts_ms

数据源处事务边界事件(BEGINEND 事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段会表示 Debezium 处理事件的时间。注意:`ts_ms` 以毫秒为单位,但由于 MySQL 的限制,只有秒级精度,MySQL 只能提供秒级的二进制日志时间戳。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

示例
{
  "status": "BEGIN",
  "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
  "ts_ms": 1486500577000,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
  "ts_ms": 1486500577000,
  "event_count": 1,
  "data_collections": [
    {
      "data_collection": "test_unsharded_keyspace.my_seq",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。

更改数据事件丰富

启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:

  • id - 唯一事务标识符的字符串表示形式

  • total_order - 事件在事务生成的所有事件中的绝对顺序

  • data_collection_order - 事件在事务发出的所有事件中,按数据集合的顺序

以下是消息示例

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": 1637988245467,
  "ts_us": 1637988245467841,
  "ts_ns": 1637988245467841698,
  "transaction": {
    "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
    "total_order": 1,
    "data_collection_order": 1
  }
}

有序事务元数据

您可以配置 Debezium 以在数据更改事件记录中包含其他元数据。此类补充元数据可以帮助下游使用者在重新分区或其他干扰可能导致数据乱序消费时按正确的顺序处理消息。

更改数据丰富

要配置连接器以发出丰富的数据更改事件记录,请设置 transaction.metadata.factory 属性。当此属性设置为 VitessOrderedTransactionMetadataFactory 时,连接器会在消息 Envelope 中包含一个 transaction 字段。transaction 字段添加了提供事务发生顺序信息的元数据。以下字段会添加到每条消息中:

transaction_epoch

一个非递减值,表示事务排序所属的 epoch。

transaction_rank

一个 epoch 内非递减的值,表示事务的顺序。

还有一个与事件顺序相关的第三个字段:

total_order

表示事件在事务生成的所有事件中的绝对位置。此字段默认包含在标准事务元数据中。

以下示例说明了如何使用这些字段来确定事件顺序。

假设 Debezium 发出了发生在同一分片且具有相同主键的两个事件的数据更改事件记录。如果发送这些事件的 Kafka 主题被重新分区,那么这两个事件的消费者顺序将不可靠。如果 Debezium 配置为提供丰富的事务元数据,则从主题消费的应用程序可以应用以下逻辑来确定哪个事件需要应用(较新的事件),哪个需要丢弃:

  1. 如果 transaction_epoch 的值不相等,则返回具有较高 transaction_epoch 值的事件。否则,继续。

  2. 如果 transaction_rank 的值不相等,则返回具有较高 transaction_rank 值的事件。否则,继续。

  3. 返回具有较大 total_order 值的事件。

如果两个事件都没有更高的 total_order 值,那么这两个事件是同一事务的一部分。因为 total_order 字段表示事务内事件的顺序,所以值较大的事件是最新的事件。

以下示例显示了带有有序事务元数据的数据更改事件:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": 1637988245467,
  "ts_us": 1637988245467841,
  "ts_ns": 1637988245467841698,
  "transaction": {
    "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
    "total_order": 1,
    "data_collection_order": 1,
    "transaction_rank": 68,
    "transaction_epoch": 0
  }
}

高效的事务元数据

如果启用连接器以提供事务元数据,它将生成显著更多的数据。连接器不仅会向事务主题发送额外的消息,而且发送到数据更改主题的消息也会更大,因为它们包含事务元数据块。增加的量是由于以下因素:

  • VGTID 存储两次,一次是 source.vgtid,另一次是 transaction.id。在包含许多分片的 keyspace 中,这些 VGTID 可能非常大。

  • 在分片环境中,VGTID 通常包含每个分片的 VGTID。在包含许多分片的 keyspace 中,VGTID 字段中的数据量可能很大。

  • 连接器为每个事务边界事件发送事务主题消息。通常,包含许多分片的 keyspace 会生成大量的事务边界事件。

为了使 Vitess 连接器能够编码事务元数据而不显著增加生成数据的量,Debezium 提供了几种单消息转换 (SMT)。以下 SMT 旨在减少 Vitess 连接器发出的事件中的数据量:

以下示例显示了使用上述转换的 Vitess 连接器配置摘录:

}
 [...]
 "provide.transaction.metadata": "true",
 "transaction.metadata.factory": "io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory",
 "transforms": "filterTransactionTopicRecords,removeField,useLocalVgtid",
 "transforms.filterTransactionTopicRecords.type": "io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords",
 "transforms.removeField.type": "io.debezium.connector.vitess.transforms.RemoveField",
 "transforms.removeField.field_names": "transaction.id",
 "transforms.useLocalVgtid.type": "io.debezium.connector.vitess.transforms.UseLocalVgtid",
 [...]
}

数据更改事件

Debezium Vitess 连接器为每个行级 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:

{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 },
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 },
}
表 1. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 schema 字段是事件键的一部分。它指定一个 Kafka Connect 模式,该模式描述了事件键的 payload 部分中的内容。换句话说,第一个 schema 字段描述了已更改表的表的主键,或者如果没有主键则为第一个单列唯一键的结构。不支持多列唯一键。

通过设置 message.key.columns 连接器配置属性,可以覆盖表的 [主键]。在这种情况下,第一个 schema 字段描述了该属性标识的键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由前面的 schema 字段描述,它包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认行为是将连接器流式传输更改事件记录到 与事件的原始表同名的主题

从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。

Vitess 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称的其余每个字符以及模式和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则会将其替换为下划线字符。

如果逻辑服务器名称、schema 名称或表名称包含无效字符,并且区分名称的唯一字符是无效的,因此被替换为下划线,则这可能导致意外冲突。

目前,连接器不允许使用 `@` 前缀命名列。例如,age 是一个有效的列名,而 @age 不是。原因是 Vitess vstreamer 有一个 bug,它会发送带有匿名化列名的事件(例如,列名 age 被匿名化为 @1)。没有简单的方法可以区分带有 `@` 前缀的合法列名和 Vitess bug。请在此处 查看更多讨论

更改事件键

对于给定的表,更改事件的键具有一个结构,其中包含事件创建时表中主键的每个列的字段。

考虑在 commerce keyspace 中定义的 customers 表以及该表的更改事件键示例。

示例表
CREATE TABLE customers (
  id INT NOT NULL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);
示例更改事件键

如果 topic.prefix 连接器配置属性的值为 Vitess_server,则对于具有此定义的 customers 表的所有更改事件,都将具有相同的键结构,在 JSON 中如下所示:

{
  "schema": { (1)
    "type": "struct",
    "name": "Vitess_server.commerce.customers.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": { (5)
      "id": "1"
  },
}
表 2. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

Vitess_server.commerce.customers.Key

定义键的 payload 结构的 schema 名称。此 schema 描述了已更改表的表的主键结构。键 schema 名称的格式为 connector-name.keyspace-name.table-name.Key。在此示例中:

  • Vitess_server 是生成此事件的连接器的名称。

  • commerce 是包含已更改表的 keyspace。

  • customers 是已更新的表。

3

optional

指示事件键的 payload 字段是否必须包含值。在此示例中,需要一个值在键的有效负载中。当表没有主键时,键的有效负载字段中的值是可选的。

4

fields (字段)

指定 payload 中预期的每个字段,包括每个字段的名称、索引和 schema。

5

payload

包含生成此更改事件的行的键。在此示例中,键包含一个名为 id 的字段,其值为 1

尽管 column.exclude.listcolumn.include.list 连接器配置属性允许您仅捕获表列的子集,但主键或唯一键中的所有列始终包含在事件的键中。

如果表没有主键,则更改事件的键为 null。没有主键约束的表中的行无法唯一标识。

更改事件值

更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE customers (
  id INT NOT NULL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

UPDATEDELETE 操作发出的事件包含表中所有列的先前值。

create 事件

以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Vitess_server.commerce.customers.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Vitess_server.commerce.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "keyspace"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "shard"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "vgtid"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.vitess.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "Vitess_server.commerce.customers.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.3.1.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863497, (10)
        "ts_ns": 1559033904863497147 (10)
    }
}
表 3. create 事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

Vitess_server.commerce.customers.Valuebeforeafter 字段的 payload 的 schema。此 schema 特定于 customers 表。

beforeafter 字段的 schema 名称的格式为 logicalName.keyspaceName.tableName.Value,这确保了 schema 名称在数据库中是唯一的。这意味着在使用 Avro 转换器时,每个逻辑源中每个表的 Avro schema 都有其自己的演进和历史记录。

3

name (名称)

io.debezium.connector.vitess.Sourcesource 字段的 payload 的 schema。此 schema 特定于 Vitess 连接器。连接器将其用于生成的所有事件。

4

name (名称)

Vitess_server.commerce.customers.Envelope 是 payload 的整体结构的 schema,其中 Vitess_server 是连接器名称,commerce 是 keyspace,customers 是表。

5

payload

的实际数据。这是更改事件提供的信息。



事件的 JSON 表示可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。

6

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

7

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

8

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片。

  • 如果事件是快照的一部分(始终为 false)。

  • 操作的 VGTID。

  • 数据库中发生更改的时间戳

9

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

10

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 表示更改在数据库中发生的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 以毫秒为单位,但由于 MySQL 的限制,只有秒级精度,MySQL 只能提供秒级的二进制日志时间戳。

update 事件

对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值

的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]"
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523763,  (5)
        "ts_ns": 1465584025523763547  (5)
    }
}
表 4. update 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,包含数据库提交之前行中所有列的所有值。

2

after

一个可选字段,指定事件发生后行的状态。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

一个强制性字段,描述事件的源元数据。source 字段结构与*create* 事件中的字段相同,但某些值不同。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片。

  • 如果事件是快照的一部分(始终为 false)。

  • 操作的 VGTID。

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 表示更改在数据库中发生的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 以毫秒为单位,但由于 MySQL 的限制,只有秒级精度,MySQL 只能提供秒级的二进制日志时间戳。

更新行主键的列会更改行的键值。当键更改时,Debezium 会发出三个事件:一个带有行旧键的 DELETE 事件和一个墓碑事件,然后是一个带有行新键的事件。详细信息将在下一节中介绍。

主键更新

更改行

主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器会发出旧键的 DELETE 事件记录和新(更新)键的 CREATE 事件记录,而不是 UPDATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息头。此标头的值是更新行的

    新主键。

  • CREATE 事件记录具有 __debezium.oldkey 作为消息头。此标头的值是更新行以前(旧)的主键。

delete 事件

删除更改事件中的值具有与

同一表的创建更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]"
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461324, (5)
        "ts_ns": 1465581902461324871 (5)
    }
}
表 5. delete 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

一个强制性字段,描述事件的源元数据。在*delete* 事件值中,source 字段结构与同一表的*create* 和*update* 事件相同。许多 source 字段值也相同。在*delete* 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是,*delete* 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片。

  • 如果事件是快照的一部分(始终为 false)。

  • 操作的 VGTID。

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 表示更改在数据库中发生的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 以毫秒为单位,但由于 MySQL 的限制,只有秒级精度,MySQL 只能提供秒级的二进制日志时间戳。

*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。

Vitess 连接器事件旨在与 Kafka 日志压缩配合使用。日志压缩允许删除一些旧消息,只要保留每个键的最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。

Tombstone events (墓碑事件)

当一行被删除时,delete 事件的值仍然与日志压缩配合使用,因为 Kafka 可以删除具有该相同键的所有早期消息。但是,为了让 Kafka 删除具有该相同键的所有消息,消息值必须为 null。为了实现这一点,Vitess 连接器会跟随一个delete 事件,该事件是一个特殊的墓碑事件,具有相同的键但值为 null

数据类型映射

Vitess 连接器以结构类似于该行所在表的事件来表示行的更改。事件包含一个字段用于每个列值。该值在事件中如何表示取决于列的 Vitess 数据类型。本节将描述这些映射。

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

基本类型

下表描述了连接器如何将基本 Vitess 数据类型映射到事件字段中的字面量类型语义类型

  • *字面类型*描述了如何使用 Kafka Connect schema 类型:INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT 来表示值。

  • 语义类型使用 Kafka Connect 模式的名称描述了 Kafka Connect 模式如何捕获字段的*含义*。

表 6. Vitess 基本数据类型映射
Vitess 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BOOLEAN, BOOL

INT16

n/a

BIT(1)

暂不支持

n/a

BIT(>1)

暂不支持

n/a

TINYINT

INT16

n/a

SMALLINT[(M)]

INT16

n/a

MEDIUMINT[(M)]

INT32

n/a

INT, INTEGER[(M)]

INT32

n/a

BIGINT[(M)]

INT64

n/a

REAL[(M,D)]

FLOAT64

n/a

FLOAT[(M,D)]

FLOAT64

n/a

DOUBLE[(M,D)]

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

BINARY[(M)]

BYTES

n/a

VARBINARY[(M)]

BYTES

n/a

TINYBLOB

BYTES

n/a

TINYTEXT

STRING

n/a

BLOB

BYTES

n/a

TEXT

STRING

n/a

MEDIUMBLOB

BYTES

n/a

MEDIUMTEXT

STRING

n/a

LONGBLOB

BYTES

n/a

LONGTEXT

STRING

n/a

JSON

STRING

io.debezium.data.Json
包含 JSON 文档、数组或标量

的字符串表示。

ENUM

STRING

io.debezium.data.Enum
allowed 模式参数包含允许值

的逗号分隔列表。

SET

STRING

io.debezium.data.EnumSet
allowed 模式参数包含允许值

的逗号分隔列表。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

n/a
格式为 yyyy-MM-dd HH:mm:ss.SSS,具有基于 UTC 的微秒精度。MySQL 允许 M 的范围为 0-6

NUMERIC[(M[,D])]

STRING

n/a

DECIMAL[(M[,D])]

STRING

n/a

GEOMETRY,
LINESTRING,
POLYGON,
MULTIPOINT,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

暂不支持

n/a

时间类型

除了 TIMESTAMP 数据类型外,Vitess 时间类型取决于 time.precision.mode 连接器配置属性的值。

不带时区的时间值

DATETIME 类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如前面的示例所示,此类型不包含时区信息。此类型的列会根据列的精度,通过 UTC 转换为 epoch 毫秒或微秒。TIMESTAMP 类型表示没有时区信息的日期时间戳。写入数据时,MySQL 会将 TIMESTAMP 类型从服务器或会话的时区转换为 UTC 格式。读取值时,数据库会从 UTC 格式转换为服务器或会话的当前时区。例如:

  • DATETIME 值为 2018-06-20 06:37:03,转换为 1529476623000

  • TIMESTAMP 值为 2018-06-20 06:37:03,转换为 2018-06-20T13:37:03Z

此类列会根据服务器或会话的时区,转换为 UTC 中的等效 io.debezium.time.ZonedTimestamp。默认情况下,Debezium 会查询服务器以获取时区。如果失败,您必须通过在 JDBC 连接字符串中设置 connectionTimeZone 选项来显式指定时区。例如,如果数据库的时区(全局或通过 connectionTimeZone 选项为连接器配置的)为“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”将由值为“2018-06-20T13:37:03Z”的 ZonedTimestamp 表示。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。

有关影响时间值的属性的更多信息,请参阅 连接器配置属性

time.precision.mode=adaptive_time_microseconds(default)

Vitess 连接器确定字面量类型和语义类型,基于列的数据类型定义,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有范围在 00:00:00.00000023:59:59.999999 之间的正 TIME 字段值才能正确捕获。

表 7. time.precision.mode=adaptive_time_microseconds 时的映射
Vitess 类型 文字类型 语义类型

DATE

INT32

io.debezium.time.Date
表示自 UNIX 纪元以来的天数。

TIME[(M)]

INT64

io.debezium.time.MicroTime
以微秒为单位表示时间值,不包含时区信息。MySQL 允许 M 的范围为 0-6

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp
表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp
表示自 UNIX 纪元以来的微秒数,不包含时区信息。

time.precision.mode=connect

Vitess 连接器使用 Kafka Connect 的定义逻辑类型。此方法不如默认方法精确,如果数据库列的小数秒精度值大于 3,事件可能会不精确。连接器可以处理范围从 00:00:00.00023:59:59.999 的值。只有当您确定表中 TIME 值永远不会超出支持范围时,才应设置 time.precision.mode=connectconnect 设置预计将在 Debezium 的将来版本中移除。

表 8. time.precision.mode=connect 时的映射
Vitess 数据类型 文字类型 语义类型

DATE

INT32

org.apache.kafka.connect.data.Date
表示自 UNIX 纪元以来的天数。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time
表示自午夜以来的微秒时间值,不包含时区信息。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp
表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

time.precision.mode=isostring

Vitess 连接器将所有时间类型使用字符串表示。

表 9. time.precision.mode=connect 时的映射
Vitess 数据类型 文字类型 语义类型

DATE

STRING

n/a

TIME[(M)]

STRING

n/a

DATETIME[(M)]

STRING

n/a

设置 Vitess

Debezium 在使用 Vitess 时不需要任何特定配置。按照 本地通过 Docker 安装指南或 Kubernetes 的 Vitess Operator 指南中的标准说明安装 Vitess。

核对清单
  • 确保 VTGate 主机及其 gRPC 端口(默认是 15991)可从安装了 Vitess 连接器的机器访问。

  • 确保 VTCtld 主机及其 gRPC 端口(默认是 15999)可从安装了 Vitess 连接器的机器访问。

gRPC 身份验证

由于 Vitess 连接器从 VTGate VStream gRPC 服务器读取更改事件,因此它不需要直接连接到 MySQL 实例。因此,不需要特殊的数据库用户和权限。目前,Vitess 连接器仅支持对 VTGate gRPC 服务器进行未经验证的访问。

部署

在安装了 KafkaKafka Connect 后,部署 Debezium Vitess 连接器的剩余任务是下载连接器的插件存档,将 JAR 文件解压到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后,您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。

如果您使用的是不可变容器,请参阅 Debezium 的容器镜像,其中已安装了适用于 Kafka 和 Kafka Connect 的 Vitess 连接器,并且已准备就绪。您还可以 在 Kubernetes 和 OpenShift 上运行 Debezium

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

连接器配置示例

以下是一个 Vitess 连接器的配置示例,该连接器连接到位于 192.168.99.100 的 15991 端口上的 Vitess(VTGate 的 VStream)服务器,其逻辑名称为 fullfillment。它还连接到 192.168.99.101 的 15999 端口上的 VTCtld 服务器以获取初始 VGTID。通常,您会使用连接器可用的配置属性在 .json 文件中配置 Debezium Vitess 连接器。

您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.vitess.VitessConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "15991", (4)
    "database.user": "vitess", (5)
    "database.password": "vitess_password", (6)
    "vitess.keyspace": "commerce", (7)
    "vitess.tablet.type": "MASTER", (8)
    "vitess.vtctld.host": "192.168.99.101", (9)
    "vitess.vtctld.port": "15999", (10)
    "vitess.vtctld.user": "vitess", (11)
    "vitess.vtctld.password": "vitess_password", (12)
    "topic.prefix": "fullfillment", (13)
    "tasks.max": 1 (14)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 Vitess 连接器类的名称。
3 Vitess(VTGate 的 VStream)服务器的地址。
4 Vitess(VTGate 的 VStream)服务器的端口号。
5 Vitess 数据库服务器(VTGate gRPC)的用户名。
6 Vitess 数据库服务器(VTGate gRPC)的密码。
7 keyspace(也称为数据库)的名称。由于未指定分片,它会从 keyspace 中的所有分片读取更改事件。
8 用于读取更改事件的 MySQL 实例类型(MASTER 或 REPLICA)。
9 VTCtld 服务器的地址。
10 VTCtld 服务器的端口。
11 VTCtld 服务器(VTCtld gRPC)的用户名。
12 VTCtld 数据库服务器(VTCtld gRPC)的密码。
13 Vitess 集群的主题前缀,它形成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及使用 Avro 转换器时相应 Avro 模式的命名空间。
14 一次只有一个任务应运行。

请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。

您可以将此配置通过 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。

偏移量存储模式的连接器配置示例

当您拥有大型 Vitess 安装,需要多个连接器任务来处理更改日志时,您可以使用 offset-storage-per-task 功能来启动多个连接器任务,让每个任务处理 Vitess 分片的一个子集。每个任务将在 Kafka 的偏移量主题中以自己的分区空间持久化其偏移量(它们跟踪的分片的 vgtids)。

以下是一个连接到 Vitess(VTGate 的 VStream)服务器的 Vitess 连接器的相同示例,但增加了三个参数来调用 offset-storage-per-task 模式。

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.vitess.VitessConnector",
    "database.hostname": "192.168.99.100",
    "database.port": "15991",
    "database.user": "vitess",
    "database.password": "vitess_password",
    "topic.prefix": "fullfillment",
    "vitess.keyspace": "commerce",
    "vitess.tablet.type": "MASTER",
    "vitess.vtctld.host": "192.168.99.101",
    "vitess.vtctld.port": "15999",
    "vitess.vtctld.user": "vitess",
    "vitess.vtctld.password": "vitess_password",
    "vitess.offset.storage.per.task": true, (1)
    "vitess.offset.storage.task.key.gen": 1, (2)
    "vitess.prev.num.tasks": 1, (3)
    "tasks.max": 2 (4)
  }
}
1 指定我们要启用 offset-storage-per-task 功能。
2 指定当前任务并行度的生成号为 1。
3 指定前一个任务并行度的任务数为 1。
4 指定我们要为当前任务并行度启动两个任务。

任务到 Vitess 分片的分配基于简单的轮询算法。在此示例中,启动两个连接器任务,并假设我们有 4 个 Vitess 分片(-40、40-80、80-c0、c0-),task0 将处理分片(-40、80-c0),task1 将处理分片(40-80、c0-)。

我们需要三个配置参数的原因是确保每个连接器任务保存的偏移量不会相互冲突,并自动处理前一个任务并行度的偏移量迁移。为了确保我们在 Kafka 偏移量主题中的分区键不冲突,我们为每个连接器任务使用以下分区名称方案:taskId_numTasks_gen。因此,对于当前启动两个任务且生成号为 1 的示例,task0 将在 Kafka 偏移量主题的分区键中写入其偏移量:task0_2_1,task1 将使用分区键:(task1_2_1)。gen 配置参数用于区分来自不同生成的分区键(生成对应于任务并行度的每次更改)。

当任务并行度改变时(例如,您想启动 4 个连接器任务而不是 2 个来处理来自 Vitess 的更大流量),您将指定 tasks.max=4、vitess.offset.storage.task.key.gen=2、vitess.prev.num.tasks=2,此任务并行度生成的偏移量分区将为:task0_4_2、task1_4_2、task2_4_2、task3_4_2。连接器重新启动后,连接器将检测到当前 4 个分区键没有保存的先前偏移量,并将自动迁移从前一个生成键(task0_2_1 和 task1_2_1)保存的偏移量。对于当前 4 个 Vitess 分片(-40、40-80、80-c0、c0-)的示例,task0 将处理分片(-40),task1(40-80),task2(80-c0),task3(c0-)。来自前一个并行度生成(每个任务处理 2 个分片)的这 4 个分片的偏移量将自动迁移到当前生成,即使用 4 个任务(每个任务处理一个分片)。

请注意,在启用 offset-storage-per-task 功能之前,Kafka 偏移量主题中保存的偏移量的任务并行度生成号默认为 0,有一个特殊的偏移量查找过程。因此,如果您在没有启用 offset-storage-per-task 功能的情况下运行 Vitess 连接器一段时间,现在想启用此功能,请指定 vitess.offset.storage.task.key.gen=1、vitess.prev.num.tasks=1 以帮助偏移量自动迁移。

请注意,vitess.prev.num.tasks 需要与前一个任务并行度生成中启动的任务数相匹配。连接器任务的数量通常与您指定的 tasks.max 配置参数相同,但在tasks.max > Vitess 分片数这种罕见情况下,连接器只会启动_任务数 = Vitess 分片数。这种情况很可能在最初的配置中就存在错误。

请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。

您可以将此配置通过 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。

添加连接器配置

要开始运行 Vitess 连接器,请创建一个连接器配置并将其添加到您的 Kafka Connect 集群。

先决条件
  • VTGate 主机及其 gRPC 端口(默认是 15991)可从安装了 Vitess 连接器的机器访问。

  • VTCtld 主机及其 gRPC 端口(默认是 15999)可从安装了 Vitess 连接器的机器访问。

  • Vitess 连接器已安装。

过程
  1. 为 Vitess 连接器创建配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

监控

Debezium Vitess 连接器仅提供一种类型的指标,除此之外 Kafka 和 Kafka Connect 还提供内置的 JMX 指标支持。

  • 流式传输指标提供有关连接器运行时(当连接器正在捕获更改并流式传输更改事件记录时)的信息。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 1. 自定义标签如何修改连接器 MBean 名称

默认情况下,Vitess 连接器为流式传输指标使用以下 MBean 名称:

debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

流式传输指标

MBeandebezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>

Attributes Type 描述

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。

连接器配置属性

Debezium Vitess 连接器有许多配置属性,您可以使用它们来实现适合您应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:

以下配置属性是必需的,除非有默认值可用。

表 10. 必需的连接器配置属性
属性 Default (默认值) 描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名。对于 Vitess 连接器,始终使用 io.debezium.connector.vitess.VitessConnector 的值。

1

应为此连接器创建的最大任务数。如果您启用了 offset.storage.per.task 模式,Vitess 连接器可以使用多个任务。

无默认值

Vitess 数据库服务器(VTGate)的 IP 地址或主机名。

15991

Vitess 数据库服务器(VTGate)的整数端口号。

要从中流式传输更改的 keyspace 名称。

n/a

要从中流式传输更改的可选分片名称。如果未配置,在未分片的 keyspace 中,连接器将从唯一的分片流式传输更改;在已分片的 keyspace 中,连接器将从 keyspace 中的所有分片流式传输更改。建议不要配置此项,以便从 keyspace 中的所有分片流式传输,因为它对重新分片操作有更好的支持。如果配置,例如 -80,连接器将从 -80 分片流式传输更改。

current

用于流式传输分片的可选 GTID 位置。这必须与 vitess.shard 一起设置。如果未配置,连接器将从给定分片的最新位置流式传输更改。

false

控制 Vitess 标志 stop_on_reshard。

true - 流将在重新分片操作后停止。

false - 流将在重新分片操作后自动迁移到新分片。

如果设置为 true,您还应考虑在配置中设置 vitess.gtid

false

控制 Vitess 标志 StreamKeyspaceHeartbeats

+ true - 流将从 _vt.heartbeat 表接收事件(也必须为 tablet 启用)。

+ false - 流将不会从 _vt.heartbeat 表接收事件。
如果设置为 true,您可能还想将 <keyspace>.heartbeat 添加到表包含列表中,以便输出事件。

n/a

Vitess 数据库服务器(VTGate)的可选用户名。如果未配置,则使用未经验证的 VTGate gRPC。

n/a

Vitess 数据库服务器(VTGate)的可选密码。如果未配置,则使用未经验证的 VTGate gRPC。

MASTER

从中流式传输更改的 Tablet(因此是 MySQL)的类型。

MASTER 表示从主 MySQL 实例流式传输。

REPLICA 表示从副本 MySQL 实例流式传输。

RDONLY 表示从只读副本 MySQL 实例流式传输。

无默认值

主题前缀,为 Debezium 正在捕获更改的特定 Vitess 数据库服务器或集群提供命名空间。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。该前缀应与其他连接器唯一,因为它被用作接收此连接器记录的所有 Kafka 主题的主题名称前缀。

+

不要更改此属性的值。如果您更改名称值,在重新启动后,连接器将不会继续将事件发出到原始主题,而是将后续事件发出到名称基于新值的名称。

无默认值

可选的、逗号分隔的正则表达式列表,用于匹配要捕获其更改的表的完全限定表标识符。table.include.list 中未包含的任何表都不会捕获其更改。每个标识符的格式为 keyspace.tableName。默认情况下,连接器捕获正在捕获其更改的每个模式中的每个非系统表的更改。请勿同时设置 table.exclude.list 属性。

无默认值

可选的、逗号分隔的正则表达式列表,用于匹配您想捕获其更改的表的完全限定表标识符。table.exclude.list 中未包含的任何表都会捕获其更改。每个标识符的格式为 keyspace.tableName。请勿同时设置 table.include.list 属性。

无默认值

可选的、逗号分隔的正则表达式列表,用于匹配应包含在更改事件记录值中的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 column.exclude.list 属性。

无默认值

可选的、逗号分隔的正则表达式列表,用于匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 column.include.list 属性。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

true

控制是否在*delete* 事件后跟一个墓碑事件。

true - 删除操作表示为*delete* 事件和随后的墓碑事件。

false - 只发出*delete* 事件。

在源记录被删除后,发出墓碑事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的任何事件,前提是日志压缩已为该主题启用。

false

指定是否启用 offset-storage-per-task 模式以启动多个连接器任务并将偏移量按任务分区持久化。

true - 启用 offset-storage-per-task 模式。

false - 不使用 offset-storage-per-task 模式。

如果您启用了 offset-storage-per-task 模式,还需要指定 vitess.offset.storage.task.key.gen 和 vitess.prev.num.tasks 参数。

-1

当 vitess.offset.storage.per.task 启用时,指定任务并行度生成号。当您决定更改连接器任务并行度(启动更多或更少连接器任务)时,应增加生成号。

-1

当 vitess.offset.storage.per.task 启用时,指定前一个任务并行度生成中使用的连接器任务数。

空字符串

一个分号分隔的表列表,其中包含匹配表列名的正则表达式。连接器将匹配列中的值映射到它发送到 Kafka 主题的更改事件记录中的键字段。当表没有主键时,或者当您想根据非主键字段对 Kafka 主题中的更改事件记录进行排序时,此功能很有用。

用分号分隔条目。在完全限定的表名和其正则表达式之间插入冒号。格式为:

keyspace-name.table-name:_regexp_;…​

For example, (例如,)

keyspaceA.table_a:regex_1;keyspaceA.table_b:regex_2;keyspaceA.table_c:regex_3

如果 table_a 有一个 id 列,并且 regex_1^i(匹配任何以 i 开头的列),则连接器将 table_aid 列中的值映射到连接器发送到 Kafka 的更改事件中的键字段。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

initial (初始)

指定连接器启动时执行快照的标准。将属性设置为以下值之一:

initial (初始)

当连接器启动时,如果它在其偏移量主题中检测不到值,它将执行数据库的快照。

never (从不)

当连接器启动时,它会跳过快照过程,并立即开始流式传输数据库记录到二进制日志的操作的更改事件。

adaptive_time_microseconds

您可以设置以下选项来确定 Debezium 如何表示时间、日期和时间戳值的精度:

adaptive_time_microseconds

(默认) 值以毫秒、微秒或纳秒的精度表示,具体取决于数据库列类型,但 TIME 类型字段除外,它们始终以微秒为单位捕获。

connect

时间值和时间戳值始终以 Kafka Connect 的默认格式表示,用于 Time、Date 和 Timestamp,这些格式无论数据库列的精度如何,都使用毫秒精度。

isostring

时间类型表示为字符串。对于某些无法以数字方式表示的时间值(例如 0000-00-00),它会发送值的原始字符串表示。

string

指定 BIGINT UNSIGNED 列在更改事件中应如何表示。
将属性设置为以下值之一:

string::
使用 Java 的 string 表示值。

long::
使用 Java 的 long 表示值,这可能精度不足,但消费者使用起来会容易得多。

precise::
表示为精确(Java 的 'BigDecimal')值。这很精确但难以使用。

以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。

表 11. 高级连接器配置属性
属性 Default (默认值) 描述

无默认值

枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:

isbn

您必须设置 converters 属性才能启用连接器使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

isbn.type: io.debezium.test.IsbnConverter

如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如:

isbn.schema.name: io.debezium.vitess.type.Isbn

fail

指定连接器在处理事件时如何应对异常:

fail 传播异常,指示有问题事件的偏移量,并导致连接器停止。

warn 记录有问题事件的偏移量,跳过该事件,并继续处理。

skip 跳过有问题事件并继续处理。

20240

正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取流式传输的事件时,它会将事件放入阻塞队列,然后再写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以提供反压,以从数据库读取更改事件。队列中保存的事件在连接器定期记录偏移量时会被忽略。始终将 max.queue.size 的值设置得大于 max.batch.size 的值。

2048

一个正整数值,指定连接器处理的每个事件批次的最大大小。

0

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果还设置了 max.queue.size,则当队列大小达到任一属性指定的限制时,写入队列将被阻塞。例如,如果设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后,或在队列中的记录量达到 5000 字节后,写入队列将被阻塞。

500

正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的时间(以毫秒为单位)。默认为 500 毫秒,即 0.5 秒。

t

一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:

  • c (insert/create)

  • u (update)

  • d (delete)

  • t (truncate)

由于 Debezium Vitess 连接器从不向数据更改主题发出 truncate 事件,因此设置默认的 t 选项的效果与设置为 none 相同。也就是说,连接器会流式传输所有 insertupdatedelete 操作。

false

确定连接器是否生成具有事务边界的事件,以及是否用事务元数据丰富更改事件信封。如果您希望连接器执行此操作,请指定 true。有关详细信息,请参阅 事务元数据

io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory

确定连接器用于跟踪事务上下文并构建表示事务的数据结构和模式的类。io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory 提供额外的事务元数据,可以帮助消费者解释两个事件的正确顺序,无论它们被消费的顺序如何。有关更多信息,请参阅 有序事务元数据

Long.MAX_VALUE

控制 VStream 的 gRPC 心跳信号之间的间隔。默认为 Long.MAX_VALUE(禁用)。

无默认值

指定一个逗号分隔的 gRPC 标头列表。默认为空。格式为:

key1:value1,key2:value2,…​

For example, (例如,)

x-envoy-upstream-rq-timeout-ms:0,x-envoy-max-retries:2

无默认值

指定允许在通道上接收的最大消息大小(以字节为单位)。

默认值为 4MiB。

n/a

一个可选的、逗号分隔的正则表达式列表,用于匹配列的完全限定名称,其原始类型和长度应作为参数添加到生成的更改事件记录中的相应字段 schema 中。这些 schema 参数

__debezium.source.column.type

用于分别传播可变宽度类型的原始类型名称和长度。这有助于正确确定接收数据库中相应列的大小。列的完全限定名称形式如下:

keyspaceName.tableName.columnName

n/a

一个可选的、逗号分隔的正则表达式列表,用于匹配列的数据库特定数据类型名称,其原始类型和长度应作为参数添加到生成的更改事件记录中的相应字段 schema 中。这些 schema 参数

__debezium.source.column.type

用于分别传播可变宽度类型的原始类型名称和长度。这有助于正确确定接收数据库中相应列的大小。列的完全限定名称形式如下:

keyspaceName.tableName.columnName

请参阅 Vitess 连接器如何映射数据类型以获取 Vitess 特定数据类型名称列表。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

无默认值

topic.naming.strategy 设置为 io.debezium.connector.vitess.TableTopicNamingStrategy 时,指定连接器用于创建数据更改主题名称的前缀。连接器使用此属性的值,而不是指定的 topic.prefix

.

指定主题名称的分隔符,默认为 .

10000

用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。

transaction

控制连接器发送事务元数据消息的主题的名称。主题名称的模式为:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

无默认值

自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如:k1=v1,k2=v2

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

直通连接器配置属性

连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。

请务必查阅 Kafka 文档以了解 Kafka 生产者和消费者的所有配置属性。Vitess 连接器确实使用了 新的消费者配置属性

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 Vitess。

在这些情况下,错误消息会详细说明问题,可能还会提供建议的解决方案。在更正配置或解决 Vitess 问题后,重新启动连接器。

Vitess 变得不可用

当连接器运行时,它所连接的 Vitess 服务器(VTGate)可能因各种原因而变得不可用。如果发生这种情况,连接器会因错误而失败并停止。当服务器再次可用时,重新启动连接器。

Vitess 连接器以 Vitess VGTID 的形式外部存储最后一个已处理的偏移量。在连接器重新启动并连接到服务器实例后,连接器会与服务器通信,以从该特定偏移量继续流式传输。

无效的列名错误

此错误很少发生。如果您收到一条消息为 Illegal prefix '@' for column: x, from schema: y, table: z 的错误,而您的表中没有这样的列,这是 Vitess vstream bug,由列重命名或列类型更改引起。这是一个瞬时错误。在短暂退避后重启连接器,问题应该会自动解决。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。

Kafka Connect 进程崩溃

如果 Kafka Connect 进程意外停止,它正在运行的任何连接器任务都会在未记录其最近处理的偏移量的情况下终止。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Vitess 连接器将从早期进程记录的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前刚刚处理过的相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。

由于在从故障中恢复时可能会发生事件重复,因此使用者应始终预料到某些重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。

在每个更改事件记录中,Debezium 连接器会插入有关事件来源的特定于源的信息,包括 Vitess 服务器的事件时间,以及将事务更改写入的二进制日志位置。消费者可以跟踪这些信息,特别是 VGTID,以确定事件是否是重复的。

Kafka 变得不可用

当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。

连接器停止一段时间

如果连接器被正常停止,数据库可以继续使用。任何更改都会记录在 Vitess 二进制日志中。当连接器重新启动时,它将从之前的位置继续流式传输更改。也就是说,它将为在连接器停止期间进行的数据库更改生成更改事件记录。

配置正确的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是根据 Kafka 最佳实践编写的,并且在提供足够资源的情况下,Kafka Connect 连接器也可以处理大量的数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能会赶上在停止期间所做的数据库更改。这需要多长时间取决于 Kafka 的能力和性能以及 Vitess 中数据的更改量。

连接器在完成快照前失败

如果快照未能完成,连接器不会自动重试快照。如果使用先前的偏移量重新启动连接器,连接器将跳过快照过程并立即开始流式传输更改事件。

因此,要从失败中恢复,请手动删除连接器偏移量并启动连接器。

早期 Vitess 版本的限制

Vitess 8.0.0
  • 由于 Vitess 的一个轻微填充问题(在 Vitess 9.0.0 中已修复),精度大于或等于 13 的 decimal 值会在数字前面产生额外的空格。例如,如果表定义中的列类型为 decimal(13,4),则值 -1.2300 变为 "- 1.2300",值 1.2300 变为 " 1.2300"

  • 不支持 JSON 列类型。

  • VStream 8.0.0 不提供 ENUM 列的允许值的额外元数据。因此,连接器不支持 ENUM 列类型。将发出索引号(基于 1)而不是枚举值。例如,如果 ENUM 定义为 enum('S','M','L'),则会发出 "3" 作为值,而不是 "L"