您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium Vitess 连接器

Debezium 的 Vitess 连接器可捕获 Vitess keyspace 分片中的行级更改。有关此连接器兼容的 Vitess 版本的信息,请参阅 Debezium 版本概述

当连接器首次连接到 Vitess 集群时,它将对 keyspace 进行一致性快照。快照完成后,连接器会持续捕获提交到 Vitess keyspace 的行级更改,以插入、更新或删除数据库内容。连接器会生成数据更改事件记录,并将它们流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到该表的独立 Kafka 主题。应用程序和服务然后可以从生成的主题中消费数据更改事件记录。

概述

Vitess 的 VStream 功能在 4.0 版本中引入。它是一个更改事件订阅服务,可提供与 Vitess 集群底层 MySQL 分片中的 MySQL 二进制日志等效的信息。用户可以订阅 keyspace 中的多个分片,使其成为为下游 CDC 流程提供数据的便捷工具。

为了读取和处理数据库更改,Vitess 连接器订阅了 VTGate 的 VStream gRPC 服务。VTGate 是一个轻量级的无状态 gRPC 服务器,是 Vitess 集群设置的一部分。

该连接器允许您灵活地选择订阅 MASTER 节点或 REPLICA 节点以获取更改事件。

连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。

Vitess 中的底层 MySQL 实现会根据某个可配置的时间段清除二进制日志。由于二进制日志的内容可能不完整,因此连接器需要另一种机制来确保它捕获特定数据库的完整内容。因此,当连接器首次连接到数据库时,它会执行数据库的一致性快照。在连接器完成快照后,它会从快照创建的确切点继续流式传输更改。这样,连接器就可以从所有数据的Consistent view 开始,而不会遗漏在快照期间发生的任何更改。

该连接器容忍故障。当连接器读取更改并生成事件时,它会记录每个事件的 Vitess 全局事务 ID (VGTID) 位置。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在连接器重新启动后,它将从最后一个存储的更改事件条目继续从 VStream 读取。此行为不适用于快照。如果在快照期间连接器停止,重新启动后,连接器不会从上次中断的地方继续进行快照。我们稍后将讨论连接器 在出现问题时 的行为。

连接器的工作原理

为了最佳地配置和运行 Debezium Vitess 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据非常有用。

快照

通常,MySQL 服务器不会配置为在二进制日志中保留数据库的完整历史记录。因此,连接器无法从二进制日志中读取数据库的整个历史记录。出于这个原因,当连接器第一次启动时,它会执行数据库的初始一致性快照。您可以通过将 snapshot.mode 连接器配置属性设置为除 initial 以外的值来更改此行为。此快照功能基于 7.0 版本中引入的 VStream Copy

未来版本预计将提供对失败快照的自动重试。

流式传输更改

Vitess 连接器花费所有时间从其订阅的 VTGate 的 VStream gRPC 服务流式传输更改。客户端从 VStream 接收更改,这些更改会以 VGTID 形式提交到底层 MySQL 服务器的二进制日志中的特定位置。

Vitess 中的 VGTID 等同于 MySQL 中的 GTID,它描述了更改事件发生的 VStream 中的位置。通常,VGTID 包含多个分片 GTID,每个分片 GTID 是一个 (Keyspace, Shard, GTID) 的元组,用于描述给定分片的 GTID 位置。

订阅 VStream 服务时,连接器需要提供 VGTID 和 Tablet 类型(例如,MASTERREPLICA)。VGTID 描述了 VStream 应开始发送更改事件的位置;Tablet 类型描述了我们从每个分片中的哪个底层 MySQL 实例(主服务器或副本)读取更改事件。

当连接器第一次连接到 Vitess 集群时,它会从一个称为 VTCtld 的 Vitess 组件获取当前的 VGTID,并将当前的 VGTID 提供给 VStream。

Debezium Vitess 连接器充当 VStream 的 gRPC 客户端。当连接器接收到更改时,它会将事件转换为 Debezium 的*创建*、*更新*或*删除*事件,其中包含事件的 VGTID。Vitess 连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以与生成时相同的顺序异步地将更改事件记录写入适当的 Kafka 主题。

Kafka Connect 会定期在另一个 Kafka 主题中记录最近的*偏移量*。偏移量指示 Debezium 随每个事件包含的特定于源的位置信息。对于 Vitess 连接器,每个更改事件中记录的 VGTID 就是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器记录的最后一个偏移量,并从最后一个记录的偏移量开始启动每个连接器。当连接器重新启动时,它会向 VStream 发送一个请求,以从该位置之后的事件开始发送事件。

主题名称

Vitess 连接器将单个表上所有插入、更新和删除操作的事件写入单个 Kafka 主题。默认情况下,Kafka 主题名称为 *topicPrefix*.*keyspaceName*.*tableName*,其中

  • *topicPrefix* 是由 topic.prefix 连接器配置属性指定的。*topicPrefix* 是由 topic.prefix 连接器配置属性指定的。

  • *keyspaceName* 是操作发生的 keyspace(也称为数据库)的名称。

  • *tableName* 是发生操作的数据库表的名称。

例如,假设 fulfillment 是一个连接器配置中的逻辑服务器名称,该连接器捕获 Vitess 安装中的更改,该安装具有一个 commerce keyspace,其中包含四个表:productsproducts_on_handcustomersorders。无论 keyspace 有多少分片,连接器都会将记录流式传输到以下四个 Kafka 主题:

  • fulfillment.commerce.products

  • fulfillment.commerce.products_on_hand

  • fulfillment.commerce.customers

  • fulfillment.commerce.orders

事务元数据

Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示。

ts_ms

源处事务边界事件(BEGINEND 事件)的时间。如果源未向 Debezium 提供事件时间,则该字段将表示 Debezium 处理事件的时间。注意:`ts_ms` 的单位是毫秒,但由于 MySQL 的限制,它只有秒级精度,MySQL 只能以秒为单位提供二进制日志时间戳。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

示例
{
  "status": "BEGIN",
  "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
  "ts_ms": 1486500577000,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
  "ts_ms": 1486500577000,
  "event_count": 1,
  "data_collections": [
    {
      "data_collection": "test_unsharded_keyspace.my_seq",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项进行覆盖,否则连接器会将事务事件发出到 <topic.prefix>.transaction 主题。

更改数据事件丰富

启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:

  • id - 唯一事务标识符的字符串表示形式

  • total_order - 事件在事务生成的所有事件中的绝对顺序

  • data_collection_order - 事件在事务发出的所有事件中按数据集合的顺序

以下是消息示例

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": 1637988245467,
  "ts_us": 1637988245467841,
  "ts_ns": 1637988245467841698,
  "transaction": {
    "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
    "total_order": 1,
    "data_collection_order": 1
  }
}

有序事务元数据

您可以配置 Debezium 以在数据更改事件记录中包含其他元数据。这些补充元数据可以帮助下游使用者正确处理消息顺序,而无需重新分区或其他可能导致数据被乱序消费的干扰。

数据更改增强

要将连接器配置为发出增强的数据更改事件记录,请设置 transaction.metadata.factory 属性。当此属性设置为 VitessOrderedTransactionMetadataFactory 时,连接器会在消息 Envelope 中包含一个 transaction 字段。transaction 字段添加了提供有关事务发生顺序信息的元数据。以下字段会添加到每条消息中:

transaction_epoch

一个非递减的值,表示事务排名所属的 epoch。

transaction_rank

一个 epoch 内的非递减值,表示事务的顺序。

还有一个字段与事件顺序有关:

total_order

表示事件在事务生成的所有事件中的绝对位置。此字段默认包含在标准事务元数据中。

以下示例说明了如何使用这些字段来确定事件顺序。

假设 Debezium 发出发生在同一分片且具有相同主键的两个事件的数据更改事件记录。如果发送到这些事件的 Kafka 主题被重新分区,那么这两个事件的消费者顺序将不可信。如果 Debezium 配置为提供增强的事务元数据,则消费该主题的应用程序可以应用以下逻辑来确定要应用(新事件)和要丢弃的两个事件中的哪个:

  1. 如果 transaction_epoch 的值不相等,则返回具有较高 transaction_epoch 值的事件。否则,继续。

  2. 如果 transaction_rank 的值不相等,则返回具有较高 transaction_rank 值的事件。否则,继续。

  3. 返回具有较大 total_order 值的事件。

如果两个事件都没有更高的 total_order 值,则这两个事件属于同一事务。由于 total_order 字段表示事务内的事件顺序,因此值更大的事件是最近的事件。

以下示例显示了一个带有有序事务元数据的数据更改事件:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": 1637988245467,
  "ts_us": 1637988245467841,
  "ts_ns": 1637988245467841698,
  "transaction": {
    "id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
    "total_order": 1,
    "data_collection_order": 1,
    "transaction_rank": 68,
    "transaction_epoch": 0
  }
}

高效的事务元数据

如果您启用连接器以提供事务元数据,它将生成更多数据。连接器不仅会向事务主题发送额外的消息,而且发送到数据更改主题的消息也会更大,因为它们包含一个事务元数据块。增加的卷是由于以下因素造成的:

  • VGTID 存储两次,一次作为 source.vgtid,然后再次作为 transaction.id。在包含许多分片的 keyspace 中,这些 VGTID 可能相当大。

  • 在分片环境中,VGTID 通常包含每个分片的 VGTID。在包含许多分片的 keyspace 中,VGTID 字段中的数据量可能很大。

  • 连接器为每个事务边界事件发送事务主题消息。通常,包含许多分片的 keyspace 会生成大量事务边界事件。

为了使 Vitess 连接器能够在不显著增加生成数据量的情况下编码事务元数据,Debezium 提供了几种单消息转换 (SMT)。以下 SMT 旨在减少 Vitess 连接器发出的事件中的数据量:

以下示例显示了一个使用上述转换的 Vitess 连接器配置的摘录:

}
 [...]
 "provide.transaction.metadata": "true",
 "transaction.metadata.factory": "io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory",
 "transforms": "filterTransactionTopicRecords,removeField,useLocalVgtid",
 "transforms.filterTransactionTopicRecords.type": "io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords",
 "transforms.removeField.type": "io.debezium.connector.vitess.transforms.RemoveField",
 "transforms.removeField.field_names": "transaction.id",
 "transforms.useLocalVgtid.type": "io.debezium.connector.vitess.transforms.UseLocalVgtid",
 [...]
}

数据更改事件

Debezium Vitess 连接器为每个行级 INSERTUPDATEDELETE 操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于已更改的表。

Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:

{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 },
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 },
}
表 1. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 schema 字段是事件键的一部分。它指定了一个 Kafka Connect schema,该 schema 描述了事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了已更改表的表主键或第一个单列唯一键的结构。不支持多列唯一键。

可以通过设置 message.key.columns 连接器配置属性来覆盖表的。主键。在这种情况下,第一个 schema 字段描述了该属性标识的键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由前面的 schema 字段描述,它包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认行为是连接器将更改事件记录流式传输到名称与事件的原始表相同的主题。

从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。

Vitess 连接器确保所有 Kafka Connect schema 名称都遵循 Avro schema 名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头(即 a-z、A-Z 或 \_)。逻辑服务器名称中的每个剩余字符以及 schema 和表名称中的每个字符都必须是拉丁字母、数字或下划线(即 a-z、A-Z、0-9 或 \_)。如果存在无效字符,它将被替换为下划线字符。

如果逻辑服务器名称、schema 名称或表名称包含无效字符,并且区分名称的唯一字符是无效的,因此被替换为下划线,则这可能导致意外冲突。

连接器目前不允许使用 @ 前缀命名列。例如,age 是一个有效的列名,而 @age 不是。原因是 Vitess vstreamer 有一个 bug,它会发送带有匿名列名的事件(例如,列名 age 被匿名化为 @1)。没有简单的方法可以区分带有 @ 前缀的合法列名和 Vitess bug。有关更多讨论,请参见 此处

更改事件键

对于给定的表,更改事件的键具有包含该事件创建时表中每个主键列的字段的结构。

考虑在 commerce keyspace 中定义的 customers 表以及该表更改事件键的示例。

示例表
CREATE TABLE customers (
  id INT NOT NULL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);
示例更改事件键

如果 topic.prefix 连接器配置属性的值为 Vitess_server,那么对于 customers 表的每一次更改事件,只要它具有此定义,都将具有相同的键结构,其 JSON 格式如下:

{
  "schema": { (1)
    "type": "struct",
    "name": "Vitess_server.commerce.customers.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": { (5)
      "id": "1"
  },
}
表 2. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

Vitess_server.commerce.customers.Key

定义键 payload 结构的 schema 的名称。此 schema 描述了已更改表的表主键的结构。键 schema 名称的格式为 connector-name.keyspace-name.table-name.Key。在此示例中:

  • Vitess_server 是生成此事件的连接器的名称。

  • commerce 是包含已更改表的 keyspace。

  • customers 是已更新的表。

3

optional

指示事件键的 payload 字段是否必须包含值。在此示例中,需要一个值在键的有效负载中。当表没有主键时,键的有效负载字段中的值是可选的。

4

fields (字段)

指定 payload 中预期的每个字段,包括每个字段的名称、索引和 schema。

5

payload

包含生成此更改事件的行的键。在此示例中,键包含一个名为 id 的字段,其值为 1

尽管 column.exclude.listcolumn.include.list 连接器配置属性允许您仅捕获表列的子集,但主键或唯一键中的所有列始终包含在事件的键中。

如果表没有主键,则更改事件的键为 null。没有主键约束的表中的行无法唯一标识。

更改事件值

更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE customers (
  id INT NOT NULL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

UPDATEDELETE 操作发出的事件包含表中所有列的先前值。

create 事件

以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Vitess_server.commerce.customers.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "Vitess_server.commerce.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "keyspace"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "shard"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "vgtid"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.vitess.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "Vitess_server.commerce.customers.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.3.0.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863497, (10)
        "ts_ns": 1559033904863497147 (10)
    }
}
表 3. create 事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

Vitess_server.commerce.customers.Value 是 payload 的 beforeafter 字段的 schema。此 schema 特定于 customers 表。

beforeafter 字段的 schema 名称的格式为 logicalName.keyspaceName.tableName.Value,这确保了 schema 名称在数据库中是唯一的。这意味着在使用 Avro converter 时,每个逻辑源中每个表的 Avro schema 都有其自己的演变和历史记录。

3

name (名称)

io.debezium.connector.vitess.Source 是 payload 的 source 字段的 schema。此 schema 特定于 Vitess 连接器。连接器将它用于它生成的所有事件。

4

name (名称)

Vitess_server.commerce.customers.Envelope 是 payload 的整体结构的 schema,其中 Vitess_server 是连接器名称,commerce 是 keyspace,customers 是表。

5

payload

的实际数据。这是更改事件提供的信息。



看起来事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。

6

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

7

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

8

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片

  • 如果事件是快照的一部分(始终为 false

  • 操作的 VGTID

  • 数据库中发生更改的时间戳

9

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

10

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示更改在数据库中发生的时间。通过比较 payload.source.ts_mspayload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 的单位是毫秒,但由于 MySQL 的限制,它只有秒级精度,MySQL 只能以秒为单位提供二进制日志时间戳。

update 事件

对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值

的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.3.0.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]"
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523763,  (5)
        "ts_ns": 1465584025523763547  (5)
    }
}
表 4. update 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,包含数据库提交之前行中所有列的所有值。

2

after

一个可选字段,指定事件发生后行的状态。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

一个强制性字段,描述事件的源元数据。source 字段结构与*create* 事件中的字段相同,但某些值不同。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片

  • 如果事件是快照的一部分(始终为 false

  • 操作的 VGTID

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示更改在数据库中发生的时间。通过比较 payload.source.ts_mspayload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 的单位是毫秒,但由于 MySQL 的限制,它只有秒级精度,MySQL 只能以秒为单位提供二进制日志时间戳。

更新行的主键的列会更改该行的键的值。当键更改时,Debezium 会输出*三个*事件:一个具有旧行键的 DELETE 事件和一个墓碑事件,然后是一个具有新行键的事件。详细信息将在下一节中介绍。

主键更新

更改行

主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器会发出旧键的 DELETE 事件记录和新(更新)键的 CREATE 事件记录,而不是 UPDATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息头。此标头的值是更新行的

    新主键。

  • CREATE 事件记录具有 __debezium.oldkey 作为消息头。此标头的值是更新行以前(旧)的主键。

delete 事件

删除更改事件中的值具有与

同一表的创建更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.3.0.Final",
            "connector": "vitess",
            "name": "my_sharded_connector",
            "ts_ms": 1559033904000,
            "ts_us": 1559033904000000,
            "ts_ns": 1559033904000000000,
            "snapshot": "false",
            "db": "",
            "sequence": null,
            "keyspace": "commerce",
            "table": "customers",
            "shard": "-80",
            "vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]"
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461324, (5)
        "ts_ns": 1465581902461324871 (5)
    }
}
表 5. delete 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

一个强制性字段,描述事件的源元数据。在*delete* 事件值中,source 字段结构与同一表的*create* 和*update* 事件相同。许多 source 字段值也相同。在*delete* 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是,*delete* 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库(也称为 keyspace)、表和分片

  • 如果事件是快照的一部分(始终为 false

  • 操作的 VGTID

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示更改在数据库中发生的时间。通过比较 payload.source.ts_mspayload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。注意:`ts_ms` 的单位是毫秒,但由于 MySQL 的限制,它只有秒级精度,MySQL 只能以秒为单位提供二进制日志时间戳。

*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。

Vitess 连接器事件旨在与Kafka 日志压缩配合使用。日志压缩允许删除一些旧消息,只要保留每个键的至少最新消息。这允许 Kafka 重新获得存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

Tombstone events (墓碑事件)

当行被删除时,*delete* 事件仍然与日志压缩配合使用,因为 Kafka 可以删除具有该相同键的所有早期消息。但是,为了让 Kafka 删除所有具有相同键的消息,消息值必须为 null。为了实现这一点,Vitess 连接器在*delete* 事件后跟一个特殊*墓碑*事件,该事件具有相同的键但值为 null

数据类型映射

Vitess 连接器使用与行所属的表类似的方式来表示行的更改。事件包含一个字段,用于表示每个列的值。该值在事件中的表示方式取决于列的 Vitess 数据类型。本节介绍这些映射。

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

基本类型

下表描述了连接器如何将基本的 Vitess 数据类型映射到事件字段中的*字面类型*和*语义类型*。

  • *字面类型*描述了如何使用 Kafka Connect schema 类型:INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT 来表示值。

  • 语义类型使用 Kafka Connect 模式的名称描述了 Kafka Connect 模式如何捕获字段的*含义*。

表 6. Vitess 基本数据类型映射
Vitess 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BOOLEAN, BOOL

INT16

n/a

BIT(1)

尚未支持

n/a

BIT(>1)

尚未支持

n/a

TINYINT

INT16

n/a

SMALLINT[(M)]

INT16

n/a

MEDIUMINT[(M)]

INT32

n/a

INT, INTEGER[(M)]

INT32

n/a

BIGINT[(M)]

INT64

n/a

REAL[(M,D)]

FLOAT64

n/a

FLOAT[(M,D)]

FLOAT64

n/a

DOUBLE[(M,D)]

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

BINARY[(M)]

BYTES

n/a

VARBINARY[(M)]

BYTES

n/a

TINYBLOB

BYTES

n/a

TINYTEXT

STRING

n/a

BLOB

BYTES

n/a

TEXT

STRING

n/a

MEDIUMBLOB

BYTES

n/a

MEDIUMTEXT

STRING

n/a

LONGBLOB

BYTES

n/a

LONGTEXT

STRING

n/a

JSON

STRING

io.debezium.data.Json
包含 JSON 文档、数组或标量

的字符串表示。

ENUM

STRING

io.debezium.data.Enum
allowed 模式参数包含允许值

的逗号分隔列表。

SET

STRING

io.debezium.data.EnumSet
allowed 模式参数包含允许值

的逗号分隔列表。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

n/a
格式为 yyyy-MM-dd HH:mm:ss.SSS,精度为微秒(基于 UTC)。MySQL 允许 M 的范围为 0-6

NUMERIC[(M[,D])]

STRING

n/a

DECIMAL[(M[,D])]

STRING

n/a

GEOMETRY,
LINESTRING,
POLYGON,
MULTIPOINT,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

尚未支持

n/a

时间类型

除了 TIMESTAMP 数据类型之外,Vitess 时间类型取决于 time.precision.mode 连接器配置属性的值。

不带时区的时间值

DATETIME 类型表示本地日期和时间,例如“2018-01-13 09:48:27”。正如您在前一个示例中看到的,此类型不包含时区信息。此类型的列将使用 UTC 转换为以毫秒或微秒为单位的 epoch,具体取决于列的精度。TIMESTAMP 类型表示不带时区信息的日期时间戳。写入数据时,MySQL 将 TIMESTAMP 类型从服务器或会话的时区转换为 UTC 格式。读取值时,数据库会从 UTC 格式转换为服务器或会话的当前时区。例如:

  • DATETIME 值为 2018-06-20 06:37:03,转换为 1529476623000

  • TIMESTAMP 值为 2018-06-20 06:37:03,转换为 2018-06-20T13:37:03Z

这些列将转换为等效的 io.debezium.time.ZonedTimestamp(以 UTC 为单位),基于服务器或会话的时区。默认情况下,Debezium 会查询服务器以获取时区。如果失败,您必须通过在 JDBC 连接字符串中设置 connectionTimeZone 选项来显式指定时区。例如,如果数据库的时区(全局或由 connectionTimeZone 选项为连接器配置)是“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”将由具有值“2018-06-20T13:37:03Z”的 ZonedTimestamp 表示。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。

有关影响时间值的属性的更多信息,请参阅 连接器配置属性

time.precision.mode=adaptive_time_microseconds(default)

Vitess 连接器根据列的数据类型定义确定字面类型和语义类型,以便事件表示数据库中的确切值。所有时间字段的单位都是微秒。只有在 00:00:00.00000023:59:59.999999 范围内的正 TIME 字段值才能正确捕获。

表 7. time.precision.mode=adaptive_time_microseconds 时的映射
Vitess 类型 文字类型 语义类型

DATE

INT32

io.debezium.time.Date
表示自 UNIX epoch 以来的天数。

TIME[(M)]

INT64

io.debezium.time.MicroTime
以微秒为单位表示时间值,不包含时区信息。MySQL 允许 M 的范围为 0-6

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp
表示自 UNIX epoch 以来的毫秒数,不包含时区信息。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp
表示自 UNIX epoch 以来的微秒数,不包含时区信息。

time.precision.mode=connect

Vitess 连接器使用定义的 Kafka Connect 逻辑类型。这种方法不如默认方法精确,并且如果数据库列具有大于 3 的*小数秒精度*值,则事件可能不太精确。连接器可以处理范围从 00:00:00.00023:59:59.999 的值。仅当您确定表中的 TIME 值从不错过支持范围时,才设置 time.precision.mode=connectconnect 设置预计将在 Debezium 的未来版本中删除。

表 8. time.precision.mode=connect 时的映射
Vitess 数据类型 文字类型 语义类型

DATE

INT32

org.apache.kafka.connect.data.Date
表示自 UNIX epoch 以来的天数。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time
表示自午夜以来的微秒时间值,不包含时区信息。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp
表示自 UNIX epoch 以来的毫秒数,不包含时区信息。

time.precision.mode=isostring

Vitess 连接器对所有时间类型使用字符串表示。

表 9. time.precision.mode=connect 时的映射
Vitess 数据类型 文字类型 语义类型

DATE

STRING

n/a

TIME[(M)]

STRING

n/a

DATETIME[(M)]

STRING

n/a

设置 Vitess

Debezium 不需要任何特定的 Vitess 配置。按照 通过 Docker 本地安装 指南或 Kubernetes 的 Vitess Operator 指南的标准说明安装 Vitess。

核对清单
  • 确保 VTGate 主机及其 gRPC 端口(默认是 15991)可以从安装 Vitess 连接器的机器访问。

  • 确保 VTCtld 主机及其 gRPC 端口(默认是 15999)可以从安装 Vitess 连接器的机器访问。

gRPC 身份验证

由于 Vitess 连接器从 VTGate VStream gRPC 服务器读取更改事件,因此它不需要直接连接到 MySQL 实例。因此,不需要特殊的数据库用户和权限。目前,Vitess 连接器仅支持对 VTGate gRPC 服务器的未经验证访问。

部署

安装 KafkaKafka Connect 后,部署 Debezium Vitess 连接器的剩余任务是下载连接器插件存档,将 JAR 文件提取到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path。然后,您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。

如果您使用的是不可变容器,请参阅 Debezium 的容器镜像,其中已安装并准备好运行带有 Vitess 连接器的 Kafka 和 Kafka Connect。您还可以 在 Kubernetes 和 OpenShift 上运行 Debezium

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

连接器配置示例

以下是一个 Vitess 连接器的配置示例,该连接器连接到 192.168.99.100 上的 Vitess(VTGate 的 VStream)服务器(端口 15991),其逻辑名称为 fullfillment。它还连接到 192.168.99.101 上的 VTCtld 服务器(端口 15999)以获取初始 VGTID。通常,您会使用连接器可用的配置属性在 .json 文件中配置 Debezium Vitess 连接器。

您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.vitess.VitessConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "15991", (4)
    "database.user": "vitess", (5)
    "database.password": "vitess_password", (6)
    "vitess.keyspace": "commerce", (7)
    "vitess.tablet.type": "MASTER", (8)
    "vitess.vtctld.host": "192.168.99.101", (9)
    "vitess.vtctld.port": "15999", (10)
    "vitess.vtctld.user": "vitess", (11)
    "vitess.vtctld.password": "vitess_password", (12)
    "topic.prefix": "fullfillment", (13)
    "tasks.max": 1 (14)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 Vitess 连接器类的名称。
3 Vitess(VTGate 的 VStream)服务器的地址。
4 Vitess(VTGate 的 VStream)服务器的端口号。
5 Vitess 数据库服务器(VTGate)的用户名。
6 Vitess 数据库服务器(VTGate)的密码。
7 keyspace(也称为数据库)的名称。由于未指定分片,因此它从 keyspace 中的所有分片读取更改事件。
8 要从中读取更改事件的 MySQL 实例类型(MASTER 或 REPLICA)。
9 VTCtld 服务器的地址。
10 VTCtld 服务器的端口。
11 VTCtld 服务器(VTCtld gRPC)的用户名。
12 VTCtld 数据库服务器(VTCtld gRPC)的密码。
13 Vitess 集群的主题前缀,它构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect schema 名称以及使用 Avro converter 时相应 Avro schema 的命名空间。
14 一次只能运行一个任务。

请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。

您可以通过向正在运行的 Kafka Connect 服务发送 POST 命令来发送此配置。该服务将记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。

连接器配置示例(用于 offset-storage-per-task 模式)

当您有一个大型 Vitess 安装,需要多个连接器任务来处理更改日志时,您可以使用 offset-storage-per-task 功能来启动多个连接器任务,并让每个任务处理 Vitess 分片的子集。每个任务将在 Kafka 的偏移量主题中以自己的分区空间持久化其偏移量(其正在跟踪的分片的 vgtids)。

以下是连接到 Vitess(VTGate 的 VStream)服务器的 Vitess 连接器的相同示例,但增加了三个参数以调用 offset-storage-per-task 模式:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.vitess.VitessConnector",
    "database.hostname": "192.168.99.100",
    "database.port": "15991",
    "database.user": "vitess",
    "database.password": "vitess_password",
    "topic.prefix": "fullfillment",
    "vitess.keyspace": "commerce",
    "vitess.tablet.type": "MASTER",
    "vitess.vtctld.host": "192.168.99.101",
    "vitess.vtctld.port": "15999",
    "vitess.vtctld.user": "vitess",
    "vitess.vtctld.password": "vitess_password",
    "vitess.offset.storage.per.task": true, (1)
    "vitess.offset.storage.task.key.gen": 1, (2)
    "vitess.prev.num.tasks": 1, (3)
    "tasks.max": 2 (4)
  }
}
1 指定我们要启用 offset-storage-per-task 功能
2 指定当前任务并行度的生成号为 1
3 指定前一个任务并行度的任务数是 1
4 指定我们要为当前任务并行度启动两个任务

任务到 Vitess 分片的分配基于简单的轮询算法。在此示例中,启动两个连接器任务并假设我们有 4 个 Vitess 分片(-40, 40-80, 80-c0, c0-),task0 将处理分片(-40, 80-c0),task1 将处理分片(40-80, c0-)。

我们需要三个配置参数的原因是确保每个连接器任务保存的偏移量不会相互冲突,并自动处理前一个任务并行度的偏移量迁移。为了确保我们在 Kafka 偏移量主题的分区键上不发生冲突,我们为每个连接器任务使用此分区名称方案:taskId_numTasks_gen。因此,在当前启动两个任务且生成号为 1 的示例中,task0 将在 Kafka 偏移量主题的分区键中写入其偏移量:task0_2_1,task1 将使用分区键:(task1_2_1)。gen 参数用于区分来自不同生成(生成对应于任务并行度的每次更改)的分区键。

当任务并行度更改时(例如,您想启动 4 个连接器任务而不是 2 个来处理来自 Vitess 的更大流量),您将指定 tasks.max=4,vitess.offset.storage.task.key.gen=2,vitess.prev.num.tasks=2,则此任务并行度生成的偏移量分区将是:task0_4_2,task1_4_2,task2_4_2,task3_4_2。一旦连接器重新启动,连接器将检测到当前 4 个分区键没有先前保存的偏移量,它将调用自动偏移量迁移,从前一个生成键中保存的偏移量(使用 2 个任务,每个任务处理 2 个分片)迁移。对于当前 4 个 Vitess 分片(-40, 40-80, 80-c0, c0-)的示例,task0 将处理分片(-40),task1(40-80),task2(80-c0),task3(c0-)。这些 4 个分片在先前并行度下的偏移量(使用 2 个任务,每个任务处理 2 个分片)将被自动迁移到当前并行度下使用 4 个任务(每个任务处理一个分片)。

请注意,在启用 offset-storage-per-task 功能之前,Kafka 偏移量主题中保存的偏移量的任务并行度生成号默认为 0,有一个特殊的偏移量查找用于偏移量迁移。因此,如果您已经运行 Vitess 连接器一段时间而未启用 offset-storage-per-task 功能,现在想启用此功能,请指定 vitess.offset.storage.task.key.gen=1,vitess.prev.num.tasks=1 以帮助自动迁移偏移量。

请注意,vitess.prev.num.tasks 需要与前一个任务并行度生成中实际启动的任务数相匹配。连接器任务的数量通常与您指定的 tasks.max 配置参数相同,但在罕见情况下 tasks.max > Vitess 分片数时,连接器将只启动_task_number = _Vitess_shard_number。这种情况很可能一开始就是配置错误的。

请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。

您可以通过向正在运行的 Kafka Connect 服务发送 POST 命令来发送此配置。该服务将记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。

添加连接器配置

要开始运行 Vitess 连接器,请创建一个连接器配置并将其添加到您的 Kafka Connect 集群。

先决条件
  • VTGate 主机及其 gRPC 端口(默认是 15991)可以从安装 Vitess 连接器的机器访问。

  • VTCtld 主机及其 gRPC 端口(默认是 15999)可以从安装 Vitess 连接器的机器访问。

  • Vitess 连接器已安装。

过程
  1. 为 Vitess 连接器创建一个配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

监控

Debezium Vitess 连接器提供的指标类型只有一种,此外 Kafka 和 Kafka Connect 还提供内置的 JMX 指标支持。

  • 流式传输指标提供有关连接器在捕获更改并流式传输更改事件记录时操作的信息。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 1. 自定义标签如何修改连接器 MBean 名称

默认情况下,Vitess 连接器为流式传输指标使用以下 MBean 名称:

debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

流式传输指标

MBeandebezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>

Attributes Type 描述

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。

long

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。

连接器配置属性

Debezium Vitess 连接器有许多配置属性,可用于实现连接器适合您应用程序的行为。许多属性都有默认值。属性信息组织如下:

以下配置属性是必需的,除非有默认值可用。

表 10. 必需的连接器配置属性
属性 Default (默认值) 描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名称。对于 Vitess 连接器,始终使用 io.debezium.connector.vitess.VitessConnector 的值。

1

为该连接器创建的最大任务数。如果启用了 offset.storage.per.task 模式,Vitess 连接器可以使用多个任务。

无默认值

Vitess 数据库服务器(VTGate)的 IP 地址或主机名。

15991

Vitess 数据库服务器(VTGate)的整数端口号。

要从中流式传输更改的 keyspace 名称。

n/a

一个可选的要从中流式传输更改的分片名称。如果未配置,对于未分片的 keyspace,连接器将从唯一的分片流式传输更改;对于已分片的 keyspace,连接器将从 keyspace 中的所有分片流式传输更改。我们建议不配置它,以便从 keyspace 中的所有分片流式传输,因为它对重新分片操作有更好的支持。如果配置了,例如 -80,连接器将从 -80 分片流式传输更改。

current

一个可选的分片 GTID 位置,用于流式传输。这必须与 vitess.shard 一起设置。如果未配置,连接器将从给定分片的最新位置流式传输更改。

false

控制 Vitess 标志 stop_on_reshard。

true - 重分片操作后流将停止。

false - 重分片操作后流将自动迁移到新分片。

如果设置为 true,您还应该在配置中设置 vitess.gtid

false

控制 Vitess 标志 StreamKeyspaceHeartbeats

+ true - 流将从 _vt.heartbeat 表接收事件(也必须为 tablets 启用)。

+ false - 流将不会从 _vt.heartbeat 表接收事件。
如果设置为 true,您可能还想在表包含列表中添加 <keyspace>.heartbeat 以发出事件。

n/a

Vitess 数据库服务器(VTGate)的可选用户名。如果未配置,则使用未经身份验证的 VTGate gRPC。

n/a

Vitess 数据库服务器(VTGate)的可选密码。如果未配置,则使用未经身份验证的 VTGate gRPC。

MASTER

要从中流式传输更改的 Tablet(因此是 MySQL)的类型

MASTER 表示从主 MySQL 实例流式传输

REPLICA 表示从副本从属 MySQL 实例流式传输

RDONLY 表示从只读从属 MySQL 实例流式传输。

无默认值

主题前缀,为 Debezium 捕获更改的特定 Vitess 数据库服务器或集群提供命名空间。只能在数据库服务器逻辑名称中使用字母数字字符、连字符、点和下划线。前缀应在所有其他连接器中唯一,因为它用作接收来自此连接器的记录的所有 Kafka 主题的名称前缀。

+

不要更改此属性的值。如果您更改名称值,在重新启动后,连接器将不会继续将事件发出到原始主题,而是将后续事件发出到名称基于新值的名称。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您想要捕获其更改的表的完全限定表标识符。不在 table.include.list 中的任何表都不会捕获其更改。每个标识符的格式为 keyspace.tableName。默认情况下,连接器捕获正在捕获其更改的每个 schema 中的每个非系统表的更改。请勿同时设置 table.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您*不*想捕获其更改的表的完全限定表标识符。不在 table.exclude.list 中的任何表都会捕获其更改。每个标识符的格式为 keyspace.tableName。请勿同时设置 table.include.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配应包含在更改事件记录值中的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 column.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 column.include.list 属性。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

true

控制是否在*delete* 事件后跟一个墓碑事件。

true - 删除操作表示为*delete* 事件和随后的墓碑事件。

false - 只发出*delete* 事件。

在源记录被删除后,发出墓碑事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的任何事件,前提是日志压缩已为该主题启用。

false

指定是否启用 offset-storage-per-task 模式,该模式允许启动多个连接器任务并将偏移量按任务分区持久化。

true - 启用 offset-storage-per-task 模式。

false - 不使用 offset-storage-per-task 模式。

如果启用 offset-storage-per-task 模式,您还需要指定 vitess.offset.storage.task.key.gen 和 vitess.prev.num.tasks 参数。

-1

指定 vitess.offset.storage.per.task 启用时的任务并行度生成号。当您决定更改连接器任务并行度(增加或减少任务数量)时,应增加生成号。

-1

指定 vitess.offset.storage.per.task 启用时,先前任务并行度生成中使用的连接器任务数。

空字符串

一个分号分隔的表及其匹配表列名称的正则表达式列表。连接器将匹配列中的值映射到发送到 Kafka 主题的更改事件记录中的键字段。当表没有主键,或者您想根据非主键字段对 Kafka 主题中的更改事件记录进行排序时,这很有用。

用分号分隔条目。在完全限定的表名及其正则表达式之间插入冒号。格式为:

keyspace-name.table-name:_regexp_;…​

For example, (例如,)

keyspaceA.table_a:regex_1;keyspaceA.table_b:regex_2;keyspaceA.table_c:regex_3

如果 table_a 有一个 id 列,并且 regex_1^i(匹配任何以 i 开头的列),则连接器会将 table_aid 列中的值映射到连接器发送到 Kafka 的更改事件中的键字段。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

initial (初始)

指定连接器启动时执行快照的标准。将属性设置为以下值之一:

initial (初始)

当连接器启动时,如果它在其偏移量主题中未检测到值,它将执行数据库快照。

never (从不)

当连接器启动时,它会跳过快照过程,并立即开始流式传输数据库记录到二进制日志的操作的更改事件。

adaptive_time_microseconds

您可以设置以下选项来确定 Debezium 如何表示时间、日期和时间戳值的精度:

adaptive_time_microseconds

(默认) 值以毫秒、微秒或纳秒的精度表示,具体取决于数据库列类型,但 TIME 类型字段始终以微秒为单位捕获。

connect

时间和时间戳值始终以 Kafka Connect 的默认格式表示(Time、Date 和 Timestamp),无论数据库列的精度如何,都使用毫秒精度。

isostring

时间类型表示为字符串。对于某些无法用数字表示的时间值(例如,0000-00-00),会发送该值的原始字符串表示。

string

指定如何表示 BIGINT UNSIGNED 列的更改事件。
将属性设置为以下值之一:

string::
使用 Java 的 string 表示值

long::
使用 Java 的 long 表示值,这可能精度不足,但对于消费者来说要容易得多。

precise::
将值表示为精确的(Java 的 'BigDecimal')值。这很精确但难以使用。

以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。

表 11. 高级连接器配置属性
属性 Default (默认值) 描述

无默认值

枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:

isbn

您必须设置 converters 属性才能启用连接器使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

isbn.type: io.debezium.test.IsbnConverter

如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如:

isbn.schema.name: io.debezium.vitess.type.Isbn

fail

指定连接器在处理事件时如何应对异常:

fail 传播异常,指示有问题事件的偏移量,并导致连接器停止。

warn 记录有问题事件的偏移量,跳过该事件,并继续处理。

skip 跳过有问题事件并继续处理。

20240

一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式的事件时,它会将事件放入阻塞队列,然后再将它们写入 Kafka。在连接器比写入 Kafka 的速度快,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。在连接器定期记录偏移量时,队列中保存的事件将被忽略。始终将 max.queue.size 的值设置得大于 max.batch.size 的值。

2048

一个正整数值,指定连接器处理的每个事件批次的最大大小。

0

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果也设置了 max.queue.size,则当队列中的记录数达到任一属性指定的限制时,将阻塞向队列的写入。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则当队列包含 1000 条记录或队列中的记录容量达到 5000 字节后,将阻塞向队列的写入。

500

一个正整数值,指定在连接器开始处理事件批次之前等待新更改事件出现的时间(以毫秒为单位)。默认为 500 毫秒,或 0.5 秒。

t

一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:

  • c (insert/create)

  • u (update)

  • d (delete)

  • t (truncate)

由于 Debezium Vitess 连接器从不将 truncate 事件发出到数据更改主题,因此设置默认的 t 选项与设置属性为 none 效果相同。也就是说,连接器会流式传输所有 insertupdatedelete 操作。

false

确定连接器是否生成带有事务边界的事件,并使用事务元数据丰富更改事件信封。如果您希望连接器执行此操作,请指定 true。有关详细信息,请参阅 事务元数据

io.debezium.pipeline.txmetadata.DefaultTransactionMetadataFactory

确定连接器用于跟踪事务上下文并构建表示事务的数据结构和 schema 的类。io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory 提供额外的事务元数据,可以帮助消费者解释两个事件的正确顺序,而不管它们被消费的顺序如何。有关更多信息,请参阅 有序事务元数据

Long.MAX_VALUE

控制 VStream 的 gPRC 心跳之间的间隔。默认为 Long.MAX_VALUE(禁用)。

无默认值

指定一个逗号分隔的 gRPC 头部列表。默认为空。格式为:

key1:value1,key2:value2,…​

For example, (例如,)

x-envoy-upstream-rq-timeout-ms:0,x-envoy-max-retries:2

无默认值

指定通道上允许接收的最大消息大小(以字节为单位)。

默认是 4MiB

n/a

一个可选的、逗号分隔的正则表达式列表,匹配列的完全限定名称,这些列的原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段 schema 中。这些 schema 参数

__debezium.source.column.type

用于分别传播可变宽度类型的原始类型名称和长度。这对于正确调整接收数据库中的相应列很有用。列的完全限定名称格式如下:

keyspaceName.tableName.columnName

n/a

一个可选的、逗号分隔的正则表达式列表,匹配列的数据库特定数据类型名称,这些列的原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段 schema 中。这些 schema 参数

__debezium.source.column.type

用于分别传播可变宽度类型的原始类型名称和长度。这对于正确调整接收数据库中的相应列很有用。列的完全限定名称格式如下:

keyspaceName.tableName.columnName

请参阅 Vitess 连接器如何映射数据类型 以获取 Vitess 特定数据类型名称列表。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

无默认值

指定当 topic.naming.strategy 设置为 io.debezium.connector.vitess.TableTopicNamingStrategy 时,连接器用于创建数据更改主题名称的前缀。连接器使用此属性的值而不是指定的 topic.prefix

.

指定主题名称的分隔符,默认为 .

10000

用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。

transaction

控制连接器发送事务元数据消息的主题的名称。主题名称的模式为:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

无默认值

自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如:k1=v1,k2=v2

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

直通连接器配置属性

连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。

请务必查阅Kafka 文档以获取 Kafka 生产者和消费者的所有配置属性。Vitess 连接器确实使用了新消费者配置属性

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 Vitess。

在这些情况下,错误消息将包含问题的详细信息,并且可能包含建议的解决方法。在更正配置或解决 Vitess 问题后,请重新启动连接器。

Vitess 变得不可用

当连接器运行时,它连接到的 Vitses 服务器(VTGate)可能由于各种原因而变得不可用。如果发生这种情况,连接器将以错误失败并停止。当服务器再次可用时,请重新启动连接器。

Vitess 连接器以 Vitess VGTID 的形式在外部存储最后一个已处理的偏移量。连接器重新启动并连接到服务器实例后,连接器会与服务器通信,以从该特定偏移量继续流式传输。

无效的列名错误

此错误非常罕见。如果您收到一条错误消息,内容为 Illegal prefix '@' for column: x, from schema: y, table: z,并且您的表中不存在此列,那么这是 Vitess vstream 的一个bug,它是由列重命名或列类型更改引起的。这是一个瞬时错误。您可以在短暂退避后重新启动连接器,问题应该会自动解决。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。

Kafka Connect 进程崩溃

如果 Kafka 连接器进程意外停止,它正在运行的任何连接器任务都将终止,而不会记录它们最近已处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Vitess 连接器将从前一个进程记录的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前刚刚被处理过的相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。

由于在从故障中恢复时可能会发生事件重复,因此使用者应始终预料到某些重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。

在每个更改事件记录中,Debezium 连接器都会插入特定于源的信息,说明事件的来源,包括 Vitess 服务器生成事件的时间,以及事务更改写入的 binlog 中的位置。使用者可以跟踪这些信息,特别是 VGTID,以确定事件是否为重复事件。

Kafka 不可用

当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。

连接器停止一段时间

如果连接器被正常停止,数据库可以继续使用。任何更改都会记录在 Vitess binlog 中。当连接器重新启动时,它将从停止的地方恢复流式传输更改。也就是说,它将为在连接器停止期间发生的所有数据库更改生成更改事件记录。

配置得当的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是根据 Kafka 最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理大量数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能能够赶上在它停止期间发生的数据库更改。这有多快发生取决于 Kafka 的能力和性能以及 Vitess 中数据更改的量。

连接器在完成快照之前失败

如果快照未能完成,连接器不会自动重试快照。如果使用之前的偏移量重新启动连接器,连接器将跳过快照过程,并立即开始流式传输更改事件。

因此,要从失败中恢复,请手动删除连接器偏移量并启动连接器。

早期 Vitess 版本的限制

Vitess 8.0.0
  • 由于 Vitess 的一个轻微填充问题(已在 Vitess 9.0.0 中修复),精度大于或等于 13 的 decimal 值会在数字前面产生额外的空格。例如,如果表定义中的列类型是 decimal(13,4),则值 -1.2300 会变成 "- 1.2300",值 1.2300 会变成 " 1.2300"

  • 不支持 JSON 列类型。

  • VStream 8.0.0 不提供 ENUM 列的允许值的附加元数据。因此,连接器不支持 ENUM 列类型。将发出索引号(从 1 开始)而不是枚举值。例如,如果 ENUM 定义是 enum('S','M','L'),则将发出 "3" 作为值,而不是 "L"