Debezium MySQL 连接器

目录

MySQL 具有二进制日志(binlog),该日志会按顺序记录已提交到数据库的所有操作。这包括表架构的更改以及表数据的更改。MySQL 使用 binlog 进行复制和恢复。

Debezium MySQL 连接器读取 binlog,为行级 INSERTUPDATEDELETE 操作生成更改事件,并将更改事件发送到 Kafka 主题。客户端应用程序读取这些 Kafka 主题。

由于 MySQL 通常配置为在指定时间后清除 binlog,因此 MySQL 连接器会对每个数据库执行初始的一致性快照。MySQL 连接器从快照创建的时间点开始读取 binlog。

有关与此连接器兼容的 MySQL 数据库版本的信息,请参阅 Debezium 版本概览

连接器的工作原理

了解连接器支持的 MySQL 拓扑概述有助于规划您的应用程序。要优化配置和运行 Debezium MySQL 连接器,了解连接器如何跟踪表结构、公开模式更改、执行快照以及确定 Kafka 主题名称会很有帮助。

支持的 MySQL 拓扑

Debezium MySQL 连接器支持以下 MySQL 拓扑:

Standalone (独立)

当使用单个 MySQL 服务器时,该服务器必须启用 binlog,以便 Debezium MySQL 连接器可以监视该服务器。
这通常是可以接受的,因为二进制日志也可以用作增量 备份
在这种情况下,MySQL 连接器始终连接并跟随此独立的 MySQL 服务器实例。

Primary and replica (主服务器和副本服务器)

Debezium MySQL 连接器可以跟随主服务器之一,或副本之一(前提是该副本已启用 binlog),但连接器仅检测该服务器可见的集群中的更改。通常,这不会有问题,除了多主拓扑。

连接器记录其在服务器的 binlog 中的位置,该位置在集群中的每个服务器上都不同。因此,连接器必须只跟随一个 MySQL 服务器实例。如果该服务器发生故障,则必须在连接器继续之前重启或恢复该服务器。

High available clusters (高可用集群)

您可以使用各种 解决方案来提供 MySQL 的冗余。部署冗余副本主机可以显著提高容忍度和几乎立即从问题和故障中恢复的能力。由于大多数高可用性 MySQL 集群都使用 GTID,因此副本能够跟踪在任何主服务器上发生的所有更改。

Multi-primary (多主)

网络数据库 (NDB) 集群复制使用一个或多个 MySQL 副本节点,每个节点从多个主服务器进行复制。集群复制提供了一种聚合多个 MySQL 集群复制的强大方法。此拓扑要求使用 GTID。
Debezium MySQL 连接器可以使用这些多主 MySQL 副本作为源,并且可以故障转移到不同的多主 MySQL 副本,只要新副本赶上了旧副本。也就是说,新副本拥有在第一个副本上看到的所有事务。即使连接器仅使用部分数据库和/或表,这也可以工作,因为在尝试重新连接到新的多主 MySQL 副本并查找 binlog 中的正确位置时,可以配置连接器以包含或排除特定的 GTID 源。

Hosted (托管)

Debezium MySQL 连接器可以使用托管数据库选项,例如 Amazon RDS 和 Amazon Aurora。

由于这些托管选项不允许使用全局读锁,因此连接器在创建一致性快照时使用表级锁。

模式历史记录主题

当数据库客户端查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以随时更改,这意味着连接器必须能够识别在记录每次插入、更新或删除操作时模式是什么。此外,连接器不一定能将当前模式应用于每个事件。如果事件相对较旧,它可能是在应用当前模式之前记录的。

为确保正确处理模式更改后的事件,MySQL 会在事务日志中包含影响数据的行级更改以及应用于数据库的 DDL 语句。当连接器在 binlog 中遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存中表示。连接器使用此模式表示来标识每个插入、更新或删除操作发生时表的结构,并生成相应的更改事件。在单独的数据库模式历史记录 Kafka 主题中,连接器会记录所有 DDL 语句以及每个 DDL 语句出现的 binlog 位置。

当连接器在崩溃或正常停止后重新启动时,它从特定位置(即特定时间点)开始读取 binlog。连接器通过读取数据库模式历史 Kafka 主题并解析到连接器正在开始的 binlog 位置为止的所有 DDL 语句来重建在该时间点存在的表结构。

此数据库模式历史记录主题仅供连接器内部使用。可选地,连接器还可以将模式更改事件发送到另一个供使用者应用程序使用的主题

当 MySQL 连接器捕获已应用 gh-ostpt-online-schema-change 等模式更改工具的表的更改时,会在迁移过程中创建辅助表。您必须配置连接器以捕获这些辅助表中发生的更改。如果使用者不需要连接器为辅助表生成的记录,请配置单个消息转换 (SMT) 以从连接器发出的消息中删除这些记录。

Additional resources (附加资源)

模式更改主题

您可以配置 Debezium MySQL 连接器以生成描述应用于数据库中表的模式更改的模式更改事件。连接器将模式更改事件写入名为 <topicPrefix> 的 Kafka 主题,其中 topicPrefixtopic.prefix 连接器配置属性中指定的命名空间。连接器发送到模式更改主题的消息包含一个有效负载,并且可以选择性地包含模式更改消息的模式。

模式变更事件的模式具有以下元素:

name (名称)

模式变更事件消息的名称。

type

事件消息类型的类型。

version (版本)

模式的版本。版本是一个整数,每次模式更改时都会递增。

fields (字段)

变更事件消息中包含的字段。

示例:MySQL 连接器模式更改主题的模式

以下示例以 JSON 格式显示了典型的模式。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.mysql.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

模式变更事件消息的 payload 包含以下元素:

ddl

提供导致模式更改的 SQL CREATEALTERDROP 语句。

databaseName (数据库名称)

DDL 语句应用的数据库名称。databaseName 的值作为消息键。

pos (位置)

语句出现在 binlog 中的位置。

tableChanges (表变更)

模式更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者无需先通过 DDL 解析器处理消息即可轻松读取消息。

对于处于捕获模式的表,连接器不仅在模式变更主题中存储模式变更的历史记录,还在内部数据库模式历史主题中存储。内部数据库模式历史主题仅供连接器使用,不供消耗应用程序直接使用。确保需要模式变更通知的应用程序仅从模式变更主题中消耗该信息。

切勿分区数据库模式历史主题。为了使数据库模式历史主题正常工作,它必须维护连接器发送到它的事件记录的一致的全局顺序。

为了确保主题不会在分区之间分割,请使用以下任一方法设置主题的分区计数:

  • 如果您手动创建数据库模式历史主题,请指定分区计数为 1

  • 如果您使用 Apache Kafka 代理自动创建数据库模式历史主题,则会创建该主题,请将 Kafka num.partitions 配置选项的值设置为 1

连接器发出的模式变更主题消息的格式仍处于孵化状态,并可能随时更改,恕不另行通知。

示例:发送到 MySQL 连接器模式更改主题的消息

以下示例以 JSON 格式显示了典型的模式变更消息。该消息包含表模式的逻辑表示。

{
  "schema": { },
  "payload": {
      "source": {  (1)
        "version": "3.3.1.Final",
        "connector": "mysql",
        "name": "mysql",
        "ts_ms": 1651535750218, (2)
        "ts_us": 1651535750218000, (2)
        "ts_ns": 1651535750218000000, (2)
        "snapshot": "false",
        "db": "inventory",
        "sequence": null,
        "table": "customers",
        "server_id": 223344,
        "gtid": null,
        "file": "mysql-bin.000003",
        "pos": 570,
        "row": 0,
        "thread": null,
        "query": null
      },
      "databaseName": "inventory", (3)
      "schemaName": null,
      "ddl": "ALTER TABLE customers ADD middle_name varchar(255) AFTER first_name", (4)
      "tableChanges": [  (5)
        {
          "type": "ALTER", (6)
          "id": "\"inventory\".\"customers\"", (7)
          "table": {    (8)
            "defaultCharsetName": "utf8mb4",
            "primaryKeyColumnNames": [  (9)
              "id"
            ],
            "columns": [  (10)
              {
                "name": "id",
                "jdbcType": 4,
                "nativeType": null,
                "typeName": "INT",
                "typeExpression": "INT",
                "charsetName": null,
                "length": null,
                "scale": null,
                "position": 1,
                "optional": false,
                "autoIncremented": true,
                "generated": true
              },
              {
                "name": "first_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 2,
                "optional": false,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "middle_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 3,
                "optional": true,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "last_name",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 4,
                "optional": false,
                "autoIncremented": false,
                "generated": false
              },
              {
                "name": "email",
                "jdbcType": 12,
                "nativeType": null,
                "typeName": "VARCHAR",
                "typeExpression": "VARCHAR",
                "charsetName": "utf8mb4",
                "length": 255,
                "scale": null,
                "position": 5,
                "optional": false,
                "autoIncremented": false,
                "generated": false
            }
          ],
          "attributes": [ (11)
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
Table 1. Descriptions of fields in messages emitted to the schema change topic (表 1. 发送到模式变更主题的消息中字段的说明)
Item Field name (字段名) 描述

1

source (源)

source 字段的结构与连接器写入表特定主题的标准数据变更事件完全相同。此字段有助于关联不同主题上的事件。

2

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在 source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。

3

databaseName (数据库名称)
schemaName (模式名称)

标识包含更改的数据库和模式。databaseName 字段的值用作记录的消息键。

4

ddl

此字段包含负责模式更改的 DDL。ddl 字段可以包含多个 DDL 语句。每个语句都应用于 databaseName 字段中的数据库。多个 DDL 语句按应用于数据库的顺序出现。

客户端可以提交应用于多个数据库的多个 DDL 语句。如果 MySQL 原子地应用它们,连接器会按顺序获取 DDL 语句,按数据库分组,并为每个组创建一个模式更改事件。如果 MySQL 单独应用它们,连接器会为每个语句创建一个单独的模式更改事件。

5

tableChanges (表变更)

包含 DDL 命令生成的模式变更的一个或多个项目的数组。

6

type

描述更改的类型。值是以下之一:

CREATE (创建)

表已创建。

ALTER (修改)

表已修改。

DROP (删除)

表已删除。

7

id

已创建、修改或删除的表的完整标识符。在表重命名的情况下,此标识符是 <old>,<new> 表名称的串联。

8

table (表)

表示应用更改后表的元数据。

9

primaryKeyColumnNames (主键列名)

组成表主键的列的列表。

10

columns (列)

已更改表中每个列的元数据。

11

attributes (属性)

每个表变更的自定义属性元数据。

有关更多信息,请参阅 模式历史记录主题

快照

当 Debezium MySQL 连接器首次启动时,它会执行数据库的初始一致性快照。此快照使连接器能够建立数据库当前状态的基线。

Debezium 在运行快照时可以使用不同的模式。快照模式由 snapshot.mode 配置属性确定。该属性的默认值为 initial。您可以更改 snapshot.mode 属性的值来定制连接器创建快照的方式。

连接器在执行快照时会完成一系列任务。具体步骤会因快照模式和数据库的表锁定策略而异。Debezium MySQL 连接器在执行使用全局读锁表级锁的初始快照时会完成不同的步骤。

使用全局读锁的初始快照

您可以更改 snapshot.mode 属性的值来定制连接器创建快照的方式。如果配置了不同的快照模式,连接器将使用此工作流的修改版本来完成快照。有关不允许全局读锁的环境中的快照过程的信息,请参阅表级锁的快照工作流

Debezium MySQL 连接器使用全局读锁执行初始快照的默认工作流

下表显示了 Debezium 为创建具有全局读锁的快照而遵循的工作流程中的步骤。

Step (步骤) Action (操作)

1

Establish a connection to the database. (建立数据库连接。)

2

确定要捕获的表。默认情况下,连接器会捕获所有非系统表的数据。快照完成后,连接器将继续流式传输指定表的数据。如果您希望连接器仅捕获特定表的数据,则可以通过设置 table.include.listtable.exclude.list 等属性来指示连接器仅捕获特定表或表元素的数据

3

Obtain a global read lock on the tables to be captured to block writes by other database clients. (对要捕获的表获取全局读锁,以阻止其他数据库客户端写入。)

The snapshot itself does not prevent other clients from applying DDL that might interfere with the connector’s attempt to read the binlog position and table schemas. The connector retains the global read lock while it reads the binlog position, and releases the lock as described in a later step. (快照本身并不能阻止其他客户端应用可能干扰连接器读取 binlog 位置和表模式的 DDL。连接器在读取 binlog 位置期间保留全局读锁,并在后续步骤中按描述释放锁。)

4 :leveloffset: +1

使用具有可重复读语义的事务启动,以确保事务中所有后续读取都针对一致性快照进行。

+

使用这些隔离语义可能会减慢快照的进度。如果快照完成时间过长,请考虑使用不同的隔离配置,或跳过初始快照并改用增量快照

5

Read the current binlog position. (读取当前的 binlog 位置。)

6

Capture the structure of all tables in the database, or all tables that are designated for capture. The connector persists schema information in its internal database schema history topic, including all necessary DROP…​ and CREATE…​ DDL statements. (捕获数据库中所有表或指定捕获的表的所有表的结构。连接器在其内部数据库模式历史主题中持久化模式信息,包括所有必需的 DROP…​CREATE…​ DDL 语句。)
The schema history provides information about the structure that is in effect when a change event occurs. (模式历史提供了有关在发生变更事件时生效的结构的信息。)

By default, the connector captures the schema of every table in the database, including tables that are not configured for capture. If tables are not configured for capture, the initial snapshot captures only their structure; it does not capture any table data. (默认情况下,连接器捕获数据库中每个表的模式,包括未配置为捕获的表。如果表未配置为捕获,初始快照仅捕获它们的结构;它不捕获任何表数据。)

For more information about why snapshots persist schema information for tables that you did not include in the initial snapshot, see Understanding why initial snapshots capture the schema for all tables. (有关为什么快照会为未包含在初始快照中的表保留模式信息的信息,请参阅了解为什么初始快照会捕获所有表的模式历史记录。)

7

Release the global read lock obtained in Step 3. Other database clients can now write to the database. (释放步骤 3 中获取的全局读锁。其他数据库客户端现在可以写入数据库。)

8

At the binlog position that the connector read in Step 5, the connector begins to scan the tables that are designated for capture. During the scan, the connector completes the following tasks (在连接器在步骤 5 中读取的 binlog 位置,连接器开始扫描指定捕获的表。在扫描过程中,连接器完成以下任务:)

  1. Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began. (确认表在快照开始之前已创建。如果表在快照开始之后创建,连接器将跳过该表。快照完成后,连接器过渡到流式传输,它会为快照开始后创建的任何表发出变更事件。)

  2. Produces a read event for each row that is captured from a table. All read events contain the same binlog position, which is the position that was obtained in step 5. (为从表中捕获的每一行生成一个 read 事件。所有 read 事件都包含相同的 binlog 位置,该位置是在步骤 5 中获取的位置。)

  3. Emits each read event to the Kafka topic for the source table. (将每个 read 事件发送到源表的 Kafka 主题。)

  4. Releases data table locks, if applicable. (释放数据表锁(如果适用)。)

9

Commit the transaction. (提交事务。)

10

Record the successful completion of the snapshot in the connector offsets. (在连接器偏移量中记录快照的成功完成。)

The resulting initial snapshot captures the current state of each row in the captured tables. From this baseline state, the connector captures subsequent changes as they occur. (生成的初始快照捕获了捕获表中每一行的当前状态。从这个基线状态开始,连接器会捕获后续发生的更改。)

After the snapshot process begins, if the process is interrupted due to connector failure, rebalancing, or other reasons, the process restarts after the connector restarts. (在快照过程开始后,如果由于连接器故障、重新平衡或其他原因导致过程中断,该过程将在连接器重新启动后重新启动。)

After the connector completes the initial snapshot, it continues streaming from the position that it read in Step 5 so that it does not miss any updates. (在连接器完成初始快照后,它会从步骤 5 中读取的位置继续流式传输,以确保不会错过任何更新。)

If the connector stops again for any reason, after it restarts, it resumes streaming changes from where it previously left off. (如果连接器因任何原因再次停止,则在重新启动后,它将从之前停止的地方继续流式传输更改。)

连接器重新启动后,如果日志已被修剪,则连接器在日志中的位置可能不再可用。然后,连接器会失败并返回一个错误,指示需要新的快照。要将连接器配置为在此情况下自动启动快照,请将 snapshot.mode 属性的值设置为 when_needed。有关排除 Debezium MySQL 连接器故障的更多技巧,请参阅出现问题时的行为

使用表级锁的初始快照

在某些数据库环境中,管理员不允许全局读锁。如果 Debezium MySQL 连接器检测到不允许全局读锁,则连接器在执行快照时将使用表级锁。为了让连接器执行使用表级锁的快照,Debezium 连接器用于连接 MySQL 的数据库帐户必须具有 LOCK TABLES 权限。

Debezium MySQL 连接器使用表级锁执行初始快照的默认工作流

下表显示了 Debezium 为创建带表级读锁的快照所遵循的工作流中的步骤。有关不允许全局读锁的环境中的快照过程的信息,请参阅全局读锁的快照工作流

Step (步骤) Action (操作)

1

Establish a connection to the database. (建立数据库连接。)

2

确定要捕获的表。默认情况下,连接器捕获所有非系统表。要让连接器捕获表或表元素的子集,您可以设置许多 includeexclude 属性来过滤数据,例如,table.include.listtable.exclude.list

3

Obtain table-level locks. (获取表级锁。)

4 :leveloffset: +1

使用具有可重复读语义的事务启动,以确保事务中所有后续读取都针对一致性快照进行。

5

Read the current binlog position. (读取当前的 binlog 位置。)

6

Read the schema of the databases and tables for which the connector is configured to capture changes. The connector persists schema information in its internal database schema history topic, including all necessary DROP…​ and CREATE…​ DDL statements. (读取连接器配置为捕获更改的数据库和表的模式。连接器在其内部数据库模式历史主题中持久化模式信息,包括所有必需的 DROP…​CREATE…​ DDL 语句。)
The schema history provides information about the structure that is in effect when a change event occurs. (模式历史提供了有关在发生变更事件时生效的结构的信息。)

By default, the connector captures the schema of every table in the database, including tables that are not configured for capture. If tables are not configured for capture, the initial snapshot captures only their structure; it does not capture any table data. (默认情况下,连接器捕获数据库中每个表的模式,包括未配置为捕获的表。如果表未配置为捕获,初始快照仅捕获它们的结构;它不捕获任何表数据。)

For more information about why snapshots persist schema information for tables that you did not include in the initial snapshot, see Understanding why initial snapshots capture the schema for all tables. (有关为什么快照会为未包含在初始快照中的表保留模式信息的信息,请参阅了解为什么初始快照会捕获所有表的模式历史记录。)

7

At the binlog position that the connector read in Step 5, the connector begins to scan the tables that are designated for capture. During the scan, the connector completes the following tasks (在连接器在步骤 5 中读取的 binlog 位置,连接器开始扫描指定捕获的表。在扫描过程中,连接器完成以下任务:)

  1. Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began. (确认表在快照开始之前已创建。如果表在快照开始之后创建,连接器将跳过该表。快照完成后,连接器过渡到流式传输,它会为快照开始后创建的任何表发出变更事件。)

  2. Produces a read event for each row that is captured from a table. All read events contain the same binlog position, which is the position that was obtained in step 5. (为从表中捕获的每一行生成一个 read 事件。所有 read 事件都包含相同的 binlog 位置,该位置是在步骤 5 中获取的位置。)

  3. Emits each read event to the Kafka topic for the source table. (将每个 read 事件发送到源表的 Kafka 主题。)

  4. Releases data table locks, if applicable. (释放数据表锁(如果适用)。)

8

Commit the transaction. (提交事务。)

9

Release the table-level locks. Other database clients can now write to any previously locked tables. (释放表级锁。其他数据库客户端现在可以写入任何先前锁定的表。)

10

Record the successful completion of the snapshot in the connector offsets. (在连接器偏移量中记录快照的成功完成。)

Table 2. Settings for snapshot.mode connector configuration property (表 2. snapshot.mode 连接器配置属性的设置)
Setting (设置) 描述

always (始终)

The connector performs a snapshot every time that it starts. The snapshot includes the structure and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables every time that the connector starts. After the snapshot completes, the connector begins to stream event records for subsequent database changes. (连接器每次启动时都会执行快照。快照包括捕获表的结构和数据。每次连接器启动时,通过指定此值来用捕获表数据的完整表示填充主题。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。)

initial (初始)

连接器执行数据库快照,如初始快照的默认工作流中所述。快照完成后,连接器开始流式传输后续数据库更改的事件记录。

initial_only (仅初始)

The connector performs a database snapshot. After the snapshot completes, the connector stops, and does not stream event records for subsequent database changes. (连接器执行数据库快照。快照完成后,连接器停止,并且不为后续数据库更改流式传输事件记录。)

schema_only (仅模式)

Deprecated, see no_data. (已弃用,请参阅 no_data。)

no_data (无数据)

连接器捕获所有相关表的结构,执行初始快照的默认工作流中描述的所有步骤,只是它不会创建 READ 事件来表示连接器启动时的数据集(步骤 7.2)。

never (从不)

When the connector starts, rather than performing a snapshot, it immediately begins to stream event records for subsequent database changes. This option is under consideration for future deprecation, in favor of the no_data option. (当连接器启动时,它不会执行快照,而是立即开始流式传输后续数据库更改的事件记录。此选项正考虑将来弃用,而采用 no_data 选项。)

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. (将此选项设置为恢复已丢失或损坏的数据库模式历史主题。重新启动后,连接器将运行一个快照,从源表重建该主题。您还可以将此属性设置为定期修剪经历意外增长的数据库模式历史主题。)

WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown. (警告:如果在上次连接器关闭后模式更改已提交到数据库,请不要使用此模式执行快照。)

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

Set the snapshot mode to configuration_based to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'. (将快照模式设置为 configuration_based,通过具有前缀“snapshot.mode.configuration.based”的连接器属性集来控制快照行为。)

custom (自定义)

The custom snapshot mode lets you inject your own implementation of the io.debezium.spi.snapshot.Snapshotter interface. Set the snapshot.mode.custom.name configuration property to the name provided by the name() method of your implementation. The name is specified on the classpath of your Kafka Connect cluster. If you use the DebeziumEngine, the name is included in the connector JAR file. For more information, see custom snapshotter SPI. (custom 快照模式允许您注入 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。将 snapshot.mode.custom.name 配置属性设置为实现类的 name() 方法提供的名称。该名称在 Kafka Connect 集群的类路径中指定。如果您使用 DebeziumEngine,则名称包含在连接器 JAR 文件中。有关更多信息,请参阅custom snapshotter SPI。)

有关更多信息,请参阅连接器配置属性表中的 snapshot.mode

了解为什么初始快照会捕获所有表的模式历史记录

The initial snapshot that a connector runs captures two types of information (连接器运行的初始快照捕获两种类型的信息:)

Table data (表数据)

有关在连接器的 table.include.list 属性中命名的表上的 INSERTUPDATEDELETE 操作的信息。

Schema data (模式数据)

DDL statements that describe the structural changes that are applied to tables. Schema data is persisted to both the internal schema history topic, and to the connector’s schema change topic, if one is configured. (描述应用于表的结构化更改的 DDL 语句。模式数据将持久化到内部模式历史主题,以及连接器的模式变更主题(如果已配置)。)

After you run an initial snapshot, you might notice that the snapshot captures schema information for tables that are not designated for capture. By default, initial snapshots are designed to capture schema information for every table that is present in the database, not only from tables that are designated for capture. Connectors require that the table’s schema is present in the schema history topic before they can capture a table. By enabling the initial snapshot to capture schema data for tables that are not part of the original capture set, Debezium prepares the connector to readily capture event data from these tables should that later become necessary. If the initial snapshot does not capture a table’s schema, you must add the schema to the history topic before the connector can capture data from the table. (运行初始快照后,您可能会注意到快照捕获了未指定捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的所有表的模式信息,而不仅仅是指定捕获的表。连接器要求表的模式存在于模式历史主题中,然后才能捕获表。通过允许初始快照捕获非原始捕获集一部分的表的模式数据,Debezium 会为连接器做好准备,以便在将来需要时能够轻松地从这些表中捕获事件数据。如果初始快照未捕获表的模式,您必须在连接器能够从表中捕获数据之前将模式添加到历史主题。)

In some cases, you might want to limit schema capture in the initial snapshot. This can be useful when you want to reduce the time required to complete a snapshot. Or when Debezium connects to the database instance through a user account that has access to multiple logical databases, but you want the connector to capture changes only from tables in a specific logic database. (在某些情况下,您可能希望在初始快照中限制模式捕获。当您想减少完成快照所需的时间时,这可能很有用。或者,当 Debezium 通过具有访问多个逻辑数据库的用户帐户连接到数据库实例时,而您只想让连接器捕获特定逻辑数据库中表的更改。)

从未在初始快照中捕获的表捕获数据(无模式更改)

In some cases, you might want the connector to capture data from a table whose schema was not captured by the initial snapshot. Depending on the connector configuration, the initial snapshot might capture the table schema only for specific tables in the database. If the table schema is not present in the history topic, the connector fails to capture the table, and reports a missing schema error. (在某些情况下,您可能希望连接器捕获模式未被初始快照捕获的表中的数据。根据连接器配置,初始快照可能仅捕获数据库中特定表的模式。如果表模式不存在于历史主题中,连接器将无法捕获该表,并报告模式丢失错误。)

You might still be able to capture data from the table, but you must perform additional steps to add the table schema. (您仍然可以从该表中捕获数据,但必须执行额外步骤来添加表模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • 在事务日志中,该表的所有条目都使用相同的模式。有关从新更改了结构的表捕获数据的信息,请参阅从具有模式更改的新表中捕获数据

过程
  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. Apply the following changes to the connector configuration (对连接器配置应用以下更改:)

    1. snapshot.mode 设置为 recovery

    2. schema.history.internal.store.only.captured.tables.ddl 的值设置为 false

    3. Add the tables that you want the connector to capture to table.include.list. This guarantees that in the future, the connector can reconstruct the schema history for all tables. (将您希望连接器捕获的表添加到 table.include.list。这可以确保将来连接器能够重建所有表的模式历史记录。)

  4. Restart the connector. The snapshot recovery process rebuilds the schema history based on the current structure of the tables. (重新启动连接器。快照恢复过程将根据表的当前结构重建模式历史记录。)

  5. (可选) 快照完成后,启动增量快照以捕获新添加表的现有数据以及连接器离线期间发生的其他表的更改。

  6. (Optional) Reset the snapshot.mode back to no_data to prevent the connector from initiating recovery after a future restart. ((可选)将 snapshot.mode 重置回 no_data,以防止连接器在将来的重启后启动恢复。)

从未在初始快照中捕获的表捕获数据(模式更改)

If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results. (如果模式更改应用于某个表,则在模式更改之前提交的记录与更改之后提交的记录具有不同的结构。当 Debezium 从表中捕获数据时,它会读取模式历史记录以确保它将正确的模式应用于每个事件。如果模式不存在于模式历史主题中,连接器将无法捕获该表,并导致错误。)

If you want to capture data from a table that was not captured by the initial snapshot, and the schema of the table was modified, you must add the schema to the history topic, if it is not already available. You can add the schema by running a new schema snapshot, or by running an initial snapshot for the table. (如果您想从初始快照未捕获的表中捕获数据,并且该表的模式已修改,则必须将模式添加到历史主题(如果尚未提供)。您可以通过运行新的模式快照或为该表运行初始快照来添加模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • A schema change was applied to the table so that the records to be captured do not have a uniform structure. (已对表应用了模式更改,以便要捕获的记录没有统一的结构。)

过程
Initial snapshot captured the schema for all tables (store.only.captured.tables.ddl was set to false) (初始快照捕获了所有表的模式(store.only.captured.tables.ddl 设置为 false))
  1. 编辑 table.include.list 属性以指定要捕获的表。

  2. Restart the connector. (重新启动连接器。)

  3. 启动增量快照以捕获新添加表的现有数据

Initial snapshot did not capture the schema for all tables (store.only.captured.tables.ddl was set to true) (初始快照未捕获所有表的模式(store.only.captured.tables.ddl 设置为 true))

If the initial snapshot did not save the schema of the table that you want to capture, complete one of the following procedures (如果初始快照未保存您要捕获的表的模式,请完成以下任一程序:)

Procedure 1: Schema snapshot, followed by incremental snapshot (过程 1:模式快照,然后是增量快照)

In this procedure, the connector first performs a schema snapshot. You can then initiate an incremental snapshot to enable the connector to synchronize data. (在此过程中,连接器首先执行模式快照。然后,您可以启动增量快照以使连接器能够同步数据。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 no_data

    2. 编辑 table.include.list 以添加要捕获的表。

  5. Restart the connector. (重新启动连接器。)

  6. Wait for Debezium to capture the schema of the new and existing tables. Data changes that occurred any tables after the connector stopped are not captured. (等待 Debezium 捕获新表和现有表的模式。连接器停止后在任何表上发生的数据更改都不会被捕获。)

  7. 为确保不丢失任何数据,请启动增量快照

Procedure 2: Initial snapshot, followed by optional incremental snapshot (过程 2:初始快照,然后是可选的增量快照)

In this procedure the connector performs a full initial snapshot of the database. As with any initial snapshot, in a database with many large tables, running an initial snapshot can be a time-consuming operation. After the snapshot completes, you can optionally trigger an incremental snapshot to capture any changes that occur while the connector is off-line. (在此过程中,连接器将执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一项耗时的操作。快照完成后,您可以选择触发增量快照以捕获连接器离线期间发生的任何更改。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. 编辑 table.include.list 以添加要捕获的表。

  5. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 initial

    2. (可选) 将 schema.history.internal.store.only.captured.tables.ddl 设置为 false

  6. Restart the connector. The connector takes a full database snapshot. After the snapshot completes, the connector transitions to streaming. (重新启动连接器。连接器将进行完整的数据库快照。快照完成后,连接器将过渡到流式传输。)

  7. (可选) 要捕获连接器离线期间发生的任何数据,请启动增量快照

临时快照

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment (但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 提供了一个执行即席快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行即席快照:)

  • The connector configuration is modified to capture a different set of tables. (修改了连接器配置以捕获不同的表集。)

  • Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)

  • Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table. (您可以通过启动所谓的即席快照来重新运行之前捕获了快照的表的快照。即席快照需要使用信号表。通过向 Debezium 信号表发送信号请求来启动即席快照。)

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled. (当您启动现有表的即席快照时,连接器会将内容追加到该表已存在的主题中。如果之前存在的主题已被删除,并且启用了自动主题创建,则 Debezium 可以自动创建主题。)

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database. (即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的部分内容。此外,快照还可以捕获数据库中表的部分内容。)

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table (您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供要在快照中包含的表名称,如下表所示:)

Table 3. Example of an ad hoc execute-snapshot signal record (表 3. 即席 execute-snapshot 信号记录示例)
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。)
Currently, you can request incremental or blocking snapshots. (目前,您可以请求 incrementalblocking 快照。)

data-collections (数据集合)

N/A

An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. (一个包含正则表达式的数组,匹配要包含在快照中的表的完全限定名称。)
对于 MySQL 连接器,请使用以下格式指定表的完全限定名称:database.table

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:)

data-collection (数据集合)

The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. (过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。)

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

surrogate-key (代理键)

N/A

An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. (一个可选字符串,指定连接器在快照过程中用作表主键的列名。)

Triggering an ad hoc incremental snapshot (触发即席增量快照)

您可以通过向信号表添加具有 execute-snapshot 信号类型的条目,或向 Kafka 信号主题发送信号消息来启动临时增量快照。连接器处理消息后,它将开始快照操作。快照过程会读取第一个和最后一个主键值,并使用这些值作为每个表的开始和结束点。根据表中的条目数和配置的块大小,Debezium 将表划分为块,然后依次逐个快照每个块

有关更多信息,请参阅增量快照

Triggering an ad hoc blocking snapshot (触发即席阻塞快照)

You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming. (您可以通过向信号表或信号主题添加具有 execute-snapshot 信号类型的条目来启动即席阻塞快照。在连接器处理消息后,它将开始快照操作。连接器将暂时停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。)

有关更多信息,请参阅阻塞快照

增量快照

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)

在增量快照中,Debezium 不是像初始快照那样一次捕获数据库的完整状态,而是分阶段、通过一系列可配置的块来捕获每个表。您可以指定快照要捕获的表以及每个块的大小。块大小决定了快照在每次数据库读取操作期间收集的行数。增量快照的默认块大小为 1024 行。

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process (随着增量快照的进行,Debezium 使用水位标记来跟踪其进度,并维护捕获的每个表行的记录。与标准的初始快照过程相比,这种分阶段的数据捕获方法提供了以下优势:)

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning. (如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在过程恢复后,快照将从停止的点开始,而不是从头重新捕获表。)

  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以向其 table.include.list 属性添加表后,您可能会重新运行快照。

Incremental snapshot process (增量快照过程)

当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据配置的块大小将表分成块。然后,它按块处理,捕获每个表的行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示快照开始时行的值。

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka. (随着快照的进行,其他进程很可能会继续访问数据库,可能修改表记录。为了反映这些更改,INSERTUPDATEDELETE 操作照常提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些变更事件,并将相应的变更事件记录发送到 Kafka。)

How Debezium resolves collisions among records with the same primary key (Debezium 如何解决具有相同主键的记录之间的冲突)

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka. (在某些情况下,流式传输过程发出的 UPDATEDELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前发出修改表行的事件。当快照最终发出行的相应 READ 事件时,其值已过时。为了确保乱序到达的增量快照事件按正确的逻辑顺序处理,Debezium 采用缓冲方案来解决冲突。仅当快照事件与流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。)

Snapshot window (快照窗口)

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key. (为了帮助解决延迟到达的 READ 事件与修改同一表行的流式事件之间的冲突,Debezium 采用所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的间隔。在某个块的快照窗口打开之前,Debezium 会遵循其常规行为,将事务日志中的事件直接向下游发送到目标 Kafka 主题。但在某个块的快照打开到关闭的整个过程中,Debezium 会执行去重步骤以解决具有相同主键的事件之间的冲突。)

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。)

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic. (随着快照窗口的打开,Debezium 开始处理快照块,它会将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入的流式事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃已缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。当块的快照窗口关闭后,缓冲区将仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。)

The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)

To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table. (要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。)

Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDBMySQLPostgreSQL)来说,写入权限是可选的。)

Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)

触发增量快照

To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database. You submit snapshot signals as SQL INSERT queries. (要启动增量快照,您可以将即席快照信号发送到源数据库上的信号表。您将快照信号作为 SQL INSERT 查询提交。)

After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation. (在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。)

The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation. Debezium currently supports the incremental and blocking snapshot types. (您提交的查询指定了要包含在快照中的表,并可选地指定了快照操作的类型。Debezium 目前支持 incrementalblocking 快照类型。)

To specify the tables to include in the snapshot, provide a data-collections array that lists the tables, or an array of regular expressions used to match tables, for example, (要指定要包含在快照中的表,请提供一个列出表的 data-collections 数组,或者一个用于匹配表的正则表达式数组,例如:)

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot. (增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会将空数组解释为无需执行任何操作,并且不会执行快照。)

If the name of a table that you want to include in a snapshot contains a dot (.), a space, or some other non-alphanumeric character, you must escape the table name in double quotes. (如果要包含在快照中的表名称包含点 (.)、空格或其他非字母数字字符,则必须用双引号转义表名称。)
For example, to include a table that exists in the db1 database, and that has the name My.Table, use the following format: "db1.\"My.Table\"". (例如,要包含存在于 db1 数据库中且名称为 My.Table 的表,请使用以下格式:"db1.\"My.Table\""。)

先决条件
  • Signaling is enabled (已启用信号).

    • A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)

    • 信号数据收集在 signal.data.collection 属性中指定。

Using a source signaling channel to trigger an incremental snapshot (使用源信号通道触发增量快照)
  1. Send a SQL query to add the ad hoc incremental snapshot request to the signaling table (发送 SQL 查询将即席增量快照请求添加到信号表)

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    For example, (例如,)

    INSERT INTO db1.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'execute-snapshot',  (3)
        '{"data-collections": ["db1.table1", "db1.table2"], (4)
        "type":"incremental", (5)
        "additional-conditions":[{"data-collection": "db1.table1" ,"filter":"color=\'blue\'"}]}'); (6)

    The values of the id,type, and data parameters in the command correspond to the fields of the signaling table. (命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    Table 4. Descriptions of fields in a SQL command for sending an incremental snapshot signal to the signaling table (表 4. 发送到信号表的增量快照信号的 SQL 命令中字段的说明)
    Item Value (值) 描述

    1

    database.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its own id string as a watermarking signal. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。而是在快照期间,Debezium 生成自己的 id 字符串作为水位标记信号。)

    3

    execute-snapshot

    The type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    A required component of the data field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot. (信号的 data 字段的必需组件,该字段指定一个表名称数组或匹配要包含在快照中的表名称的正则表达式。)
    The array lists regular expressions that use the format database.table to match the fully-qualified names of the tables. This format is the same as the one that you use to specify the name of the connector’s signaling table. (该数组列出了使用 database.table 格式匹配表完全限定名称的正则表达式。此格式与您用于指定连接器信号表的名称的格式相同。)

    5

    incremental (增量)

    An optional type component of the data field of a signal that specifies the type of snapshot operation to run. (信号的 data 字段的可选 type 组件,指定要运行的快照操作的类型。)
    Valid values are incremental and blocking. (有效值为 incrementalblocking。)
    If you do not specify a value, the connector defaults to performing an incremental snapshot. (如果您不指定值,连接器将默认执行增量快照。)

    6

    additional-conditions (附加条件)

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    Each additional condition is an object with data-collection and filter properties. You can specify different filters for each data collection. (每个附加条件是一个具有 data-collectionfilter 属性的对象。您可以为每个数据集合指定不同的过滤器。)
    * data-collection 属性是过滤器适用的数据集合的完全限定名称。有关 additional-conditions 参数的更多信息,请参阅使用 additional-conditions 运行临时增量快照

使用 additional-conditions 运行临时增量快照

If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions parameter to the snapshot signal. (如果您希望快照仅包含表内容的子集,则可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。)

The SQL query for a typical snapshot takes the following form (典型快照的 SQL 查询形式如下:)

SELECT * FROM <tableName> ....

By adding an additional-conditions parameter, you append a WHERE condition to the SQL query, as in the following example (通过添加 additional-conditions 参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:)

SELECT * FROM <data-collection> WHERE <filter> ....

The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table (以下示例显示了一个 SQL 查询,用于向信号表发送带有附加条件的即席增量快照请求:)

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

For example, suppose you have a products table that contains the following columns (例如,假设您有一个 products 表,其中包含以下列:)

  • id (primary key)

  • color (颜色)

  • quantity (数量)

If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot (如果您希望 products 表的增量快照仅包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:)

INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue"}]}');

The additional-conditions parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10 (additional-conditions 参数还允许您传递基于多个列的条件。例如,使用前面示例中的 products 表,您可以提交一个查询来触发增量快照,其中仅包含 color=bluequantity>10 的项目数据:)

INSERT INTO db1.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.products", "filter": "color=blue AND quantity>10"}]}');

The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)

Example 1. Incremental snapshot event message (示例 1. 增量快照事件消息)
{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" (1)
    },
    "op":"r", (2)
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
Table 5. Description of fields in an incremental snapshot event message (表 5. 增量快照事件消息中字段的说明)
Item Field name (字段名) 描述

1

snapshot (快照)

Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。)
Currently, the only valid options are blocking and incremental. (目前,唯一有效选项是 blockingincremental。)
Specifying a type value in the SQL query that you submit to the signaling table is optional. (在您提交给信号表的 SQL 查询中指定 type 值是可选的。)
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)

2

op (操作)

Specifies the event type. (指定事件类型。)
The value for snapshot events is r, signifying a READ operation. (快照事件的值为 r,表示 READ 操作。)

使用 Kafka 信号通道触发增量快照

You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshotdata 字段必须具有以下字段:)

Table 6. Execute snapshot data fields (表 6. 执行快照数据字段)
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports the incremental and blocking types. (要执行的快照类型。目前 Debezium 支持 incrementalblocking 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。)
使用与 signal.data.collection 配置选项相同的格式指定名称。

additional-conditions (附加条件)

N/A

An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition: data-collection:: The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. filter:: Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:data-collection:: 过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。filter:: 指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

Example 2. An execute-snapshot Kafka message (示例 2. 一个 execute-snapshot Kafka 消息)
Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Ad hoc incremental snapshots with additional-conditions (带有附加条件的即席增量快照)

Debezium uses the additional-conditions field to select a subset of a table’s content. (Debezium 使用 additional-conditions 字段来选择表内容的子集。)

Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)

SELECT * FROM <tableName> …​.

When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collectionfilter 参数将被附加到 SQL 查询中,例如:)

SELECT * FROM <data-collection> WHERE <filter> …​.

For example, given a products table with the columns id (primary key), color, and brand, if you want a snapshot to include only content for which color='blue', when you request the snapshot, you could add the additional-conditions property to filter the content: :leveloffset: +1 (例如,给定一个具有 id(主键)、colorbrand 列的 products 表,如果您希望快照仅包含 color='blue' 的内容,在请求快照时,您可以添加 additional-conditions 属性来过滤内容::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`

You can also use the additional-conditions property to pass conditions based on multiple columns. For example, using the same products table as in the previous example, if you want a snapshot to include only the content from the products table for which color='blue', and brand='MyBrand', you could send the following request: :leveloffset: +1 (您还可以使用 additional-conditions 属性来传递基于多个列的条件。例如,使用前面示例中的相同 products 表,如果您希望快照仅包含 products 表中 color='blue'brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`

停止增量快照

In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database. (在某些情况下,可能需要停止增量快照。例如,您可能发现快照配置不正确,或者您想确保资源可用于其他数据库操作。您可以通过向源数据库上的信号表发送信号来停止正在运行的快照。)

You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT query. The stop-snapshot signal specifies the type of the snapshot operation as incremental, and optionally specifies the tables that you want to omit from the currently running snapshot. After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress. (您通过在 SQL INSERT 查询中发送来将停止快照信号提交到信号表。stop-snapshot 信号将快照操作的 type 指定为 incremental,并可选地指定您希望从当前正在运行的快照中排除的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在增量快照操作进行中时停止它。)

Additional resources (附加资源)

您还可以通过向Kafka 信号主题发送 JSON 消息来停止增量快照。

先决条件
  • Signaling is enabled (已启用信号).

    • A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)

    • 信号数据收集在 signal.data.collection 属性中指定。

Using a source signaling channel to stop an incremental snapshot (使用源信号通道停止增量快照)
  1. Send a SQL query to stop the ad hoc incremental snapshot to the signaling table (发送 SQL 查询以停止添加到信号表的即席增量快照)

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    For example, (例如,)

    INSERT INTO db1.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'stop-snapshot',  (3)
        '{"data-collections": ["db1.table1", "db1.table2"], (4)
        "type":"incremental"}'); (5)

    The values of the id, type, and data parameters in the signal command correspond to the fields of the signaling table. (信号命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    Table 7. Descriptions of fields in a SQL command for sending a stop incremental snapshot signal to the signaling table (表 7. 发送到信号表的停止增量快照信号的 SQL 命令中字段的说明)
    Item Value (值) 描述

    1

    database.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。)

    3

    stop-snapshot

    Specifies type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    An optional component of the data field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot. (信号的 data 字段的可选组件,指定一个表名称数组或匹配要从快照中删除的表名称的正则表达式。)
    The array lists regular expressions which match tables by their fully-qualified names in the format database.table (该数组列出了以 database.table 格式匹配表完全限定名称的正则表达式)

    If you omit this component from the data field, the signal stops the entire incremental snapshot that is in progress. (如果您从 data 字段中省略此组件,则信号将停止正在进行的整个增量快照。)

    5

    incremental (增量)

    A required component of the data field of a signal that specifies the type of snapshot operation to be stopped. (信号的 data 字段的必需组件,指定要停止的快照操作的类型。)
    Currently, the only valid option is incremental. (目前,唯一有效选项是 incremental。)
    If you do not specify a type value, the signal fails to stop the incremental snapshot. (如果您不指定 type 值,信号将无法停止增量快照。)

使用 Kafka 信号通道停止增量快照

You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshotdata 字段必须具有以下字段:)

Table 8. Execute snapshot data fields (表 8. 执行快照数据字段)
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports only the incremental type. (要执行的快照类型。目前 Debezium 仅支持 incremental 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. (一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的表名称或匹配表名称的正则表达式数组。)
Specify table names by using the format database.table. (通过使用 database.table 格式指定表名称。)

The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`

只读增量快照

Debezium MySQL 连接器允许使用只读连接到数据库来运行增量快照。要使用只读访问权限运行增量快照,连接器会使用已执行的全局事务 ID (GTID) 集作为高低水位标记。通过将二进制日志 (binlog) 事件的 GTID 或服务器心跳与高低水位标记进行比较来更新块窗口的状态。

要切换到只读实现,请将 read.only 属性的值设置为 true

先决条件
  • 启用 MySQL GTID.

  • If the connector reads from a multi-threaded replica (that is, a replica for which the value of replica_parallel_workers is greater than 0) you must set one of the following options (如果连接器从多线程副本读取(即 replica_parallel_workers 的值为 0 以上的副本),则必须设置以下选项之一:)

    • replica_preserve_commit_order=ON

    • slave_preserve_commit_order=ON

Ad hoc read-only incremental snapshots (即席只读增量快照)

当 MySQL 连接是只读的时,您可以使用任何可用的信号通道,而无需使用 source 通道。

快照事件的操作类型

MySQL 连接器将快照事件作为 READ 操作 ("op" : "r") 发出。如果您希望连接器将快照事件作为 CREATE (c) 事件发出,请配置 Debezium ReadToInsertEvent 单个消息转换 (SMT) 来修改事件类型。

以下示例显示了如何配置 SMT

示例:使用 ReadToInsertEvent SMT 更改快照事件的类型
transforms=snapshotasinsert,...
transforms.snapshotasinsert.type=io.debezium.connector.mysql.transforms.ReadToInsertEvent

Custom snapshotter SPI (自定义快照器 SPI)

For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)

io.debezium.snapshot.spi.Snapshotter

Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)

io.debezium.snapshot.spi.SnapshotQuery

Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)

io.debezium.snapshot.spi.SnapshotLock

Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)

io.debezium.snapshot.spi.Snapshotter interface. All built-in snapshot modes implement this interface. (io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)
/**
 * {@link Snapshotter} is used to determine the following details about the snapshot process:
 * <p>
 * - Whether a snapshot occurs. <br>
 * - Whether streaming continues during the snapshot. <br>
 * - Whether the snapshot includes schema (if supported). <br>
 * - Whether to snapshot data or schema following an error.
 * <p>
 * Although Debezium provides many default snapshot modes,
 * to provide more advanced functionality, such as partial snapshots,
 * you can customize implementation of the interface.
 * For more information, see the documentation.
 *
 *
 *
 */
@Incubating
public interface Snapshotter extends Configurable {

    /**
     * @return the name of the snapshotter.
     *
     *
     */
    String name();

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a data snapshot
     */
    boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a schema snapshot
     */
    boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @return {@code true} if the snapshotter should stream after taking a snapshot
     */
    boolean shouldStream();

    /**
     * @return {@code true} whether the schema can be recovered if database schema history is corrupted.
     */
    boolean shouldSnapshotOnSchemaError();

    /**
     * @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
     */
    boolean shouldSnapshotOnDataError();

    /**
     *
     * @return {@code true} if streaming should resume from the start of the snapshot
     * transaction, or {@code false} for when a connector resumes and takes a snapshot,
     * streaming should resume from where streaming previously left off.
     */
    default boolean shouldStreamEventsStartingFromSnapshot() {
        return true;
    }

    /**
     * Lifecycle hook called after the snapshot phase is successful.
     */
    default void snapshotCompleted() {
        // no operation
    }

    /**
     * Lifecycle hook called after the snapshot phase is aborted.
     */
    default void snapshotAborted() {
        // no operation
    }
}
io.debezium.snapshot.spi.SnapshotQuery interface. All built-in snapshot query modes implement this interface. (io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)
/**
 * {@link SnapshotQuery} is used to determine the query used during a data snapshot
 *
 *
 */
public interface SnapshotQuery extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Generate a valid query string for the specified table, or an empty {@link Optional}
     * to skip snapshotting this table (but that table will still be streamed from)
     *
     * @param tableId the table to generate a query for
     * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
     *                              include/exclude filters
     * @return a valid query string, or none to skip snapshotting this table
     */
    Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);

}
io.debezium.snapshot.spi.SnapshotLock interface. All built-in snapshot lock modes implement this interface. (io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)
/**
 * {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
 *
 *
 */
public interface SnapshotLock extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
     * implementation.
     */
    Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);

}

阻塞快照

To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)

A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)

You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)

  • You add a new table and you want to complete the snapshot while the connector is running. (添加了一个新表,并且您希望在连接器运行时完成快照。)

  • You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot. (添加了一个大表,并且您希望快照的完成时间比增量快照更快。)

Blocking snapshot process (阻塞快照过程)

When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed. (运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,流式传输将恢复。)

Configure snapshot (配置快照)

You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)

  • data-collections: to specify which tables must be snapshot. (data-collections:用于指定必须进行快照的表。)

  • data-collections: Specifies the tables that you want the snapshot to include. (data-collections:指定您希望快照包含的表。)
    This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of the table.include.list property, which specifies the tables to capture in a blocking snapshot. (此属性接受逗号分隔的正则表达式列表,这些表达式匹配完全限定的表名。此属性的行为类似于 table.include.list 属性的行为,后者指定要在阻塞快照中捕获的表。)

  • additional-conditions: You can specify different filters for different table. (additional-conditions:您可以为不同的表指定不同的过滤器。)

    • The data-collection property is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. (data-collection 属性是过滤器将应用的表的完全限定名称,并且根据数据库的不同,可能区分大小写或不区分大小写。)

    • The filter property will have the same value used in the snapshot.select.statement.overrides, the fully-qualified name of the table that should match by case. (filter 属性将具有与 snapshot.select.statement.overrides 中使用的值相同的值,即应区分大小写匹配的表的完全限定名称。)

For example (例如:)

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Possible duplicates (可能重复)

A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)

主题名称

默认情况下,MySQL 连接器会将表中所有 INSERTUPDATEDELETE 操作的更改事件写入一个 Apache Kafka 主题,该主题特定于该表。

连接器使用以下约定来命名变更事件主题:

topicPrefix.databaseName.tableName

假设 fulfillment 是主题前缀,inventory 是数据库名称,并且数据库包含名为 orderscustomersproducts 的表。Debezium MySQL 连接器会将事件发送到三个 Kafka 主题,每个主题对应数据库中的一个表:

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

以下列表提供了默认名称组件的定义:

topicPrefix

topic.prefix 连接器配置属性指定的主题前缀

schemaName (模式名称)

操作发生的模式(schema)的名称。

tableName

操作发生的表的名称。

连接器应用类似的命名约定来标记其内部数据库模式历史记录主题、模式更改主题事务元数据主题

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

事务元数据

Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示。

ts_ms

数据源中事务边界事件(BEGINEND 事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段将代表 Debezium 处理事件的时间。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

示例
{
  "status": "BEGIN",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。

更改数据事件丰富

启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:

id

唯一事务标识符的字符串表示。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中,每个数据集合的位置。

以下是消息示例

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335472",
  "ts_ns": "1580390884335472987",
  "transaction": {
    "id": "0e4d5dcd-a33b-11ea-80f1-02010a22a99e:10",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

数据更改事件

Debezium MySQL 连接器为每个行级 INSERTUPDATEDELETE 操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于更改的表。

Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:

{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 },
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 },
}
表 9. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 schema 字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了已更改表的

主键(或唯一键,如果表没有主键)的结构。



可以通过设置 message.key.columns 连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述了该属性标识的键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认情况下,连接器将更改事件记录流式传输到名称与事件的源表相同的主题。请参阅主题名称

MySQL 连接器确保所有 Kafka Connect 模式名称都符合Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则会将其替换为下划线字符。

如果逻辑服务器名称、数据库名称或表名称包含无效字符,并且区分名称的唯一字符是无效的并因此被替换为下划线,这可能会导致意外冲突。

更改事件键

更改事件的键包含已更改表的键的模式和已更改行的实际键。模式和相应的有效负载都包含已更改表的

PRIMARY KEY(或唯一约束)在连接器创建事件时的每个列的字段。

考虑以下 customers 表,后面是该表更改事件键的示例。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

捕获对 customers 表所做的更改的每个更改事件都具有相同的事件键模式。只要 customers 表具有之前的定义,捕获对 customers 表所做的更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:

{
 "schema": { (1)
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
 "payload": { (5)
    "id": 1001
  }
}
表 10. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

mysql-server-1.inventory.customers.Key

定义键的有效负载结构的模式的名称。此模式描述了已更改表的

主键的结构。键模式名称的格式为 connector-name.database-name.table-name.Key。在此示例中:


  • mysql-server-1 是生成此事件的连接器的名称。

  • inventory 是包含已更改表的数据库。

  • customers 是已更新的表。

3

optional

指示事件键的 payload 字段是否必须包含值。在此示例中,需要一个值在键的有效负载中。当表没有主键时,键的有效负载字段中的值是可选的。

4

fields (字段)

指定 payload 中预期的每个字段,包括每个字段的名称、类型以及是否必需。

5

payload

包含为此更改事件生成的行的键。在此示例中,键包含一个 id 字段,其值为 1001

更改事件值

更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE customers (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;

此表更改的更改事件的值部分描述如下:

create 事件

以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:

{
  "schema": { (1)
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value", (2)
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "mysql-server-1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_us"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ns"
          },
          {
            "type": "boolean",
            "optional": true,
            "default": false,
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source", (3)
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_us"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ns"
      }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope" (4)
  },
  "payload": { (5)
    "op": "c", (6)
    "ts_ms": 1465491411815, (7)
    "ts_us": 1465491411815437, (7)
    "ts_ns": 1465491411815437158, (7)
    "before": null, (8)
    "after": { (9)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { (10)
      "version": "3.3.1.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 0,
      "ts_us": 0,
      "ts_ns": 0,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 0,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 154,
      "row": 0,
      "thread": 7,
      "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
  }
}
表 11. 创建事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

mysql-server-1.inventory.customers.Value 是有效负载的 beforeafter 字段的模式。此模式特定于 customers 表。

beforeafter 字段的模式名称格式为 logicalName.tableName.Value,这确保了模式名称在数据库中是唯一的。这意味着在使用Avro 转换器时,每个逻辑源中每个表的最终 Avro 模式都有其自己的演变和历史记录。

3

name (名称)

io.debezium.connector.mysql.Source 是有效负载的 source 字段的模式。此模式特定于 MySQL 连接器。连接器将其用于它生成的所有事件。

4

name (名称)

mysql-server-1.inventory.customers.Envelope 是有效负载的整体结构的模式,其中 mysql-server-1 是连接器名称,inventory 是数据库,customers 是表。

5

payload

的实际数据。这是更改事件提供的信息。



事件的 JSON 表示可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的模式和有效负载部分。但是,通过使用Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。

6

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

  • r = read(仅适用于快照)

7

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

8

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

9

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

10

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器名称

  • 记录事件的 binlog 名称

  • binlog 位置

  • 事件中的行

  • 如果事件属于快照

  • 包含新行的

    数据库和表名称

  • 创建事件的 MySQL 线程的 ID(仅限非快照)

  • MySQL 服务器 ID(如果可用)

  • 数据库中发生更改的时间戳

update 事件

对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值

的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:

{
  "schema": { ... },
  "payload": {
    "before": { (1)
      "id": 1004,
      "first_name": "Anne",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": { (2)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": { (3)
      "version": "3.3.1.Final",
      "name": "mysql-server-1",
      "connector": "mysql",
      "ts_ms": 1465581029100,
      "ts_us": 1465581029100000,
      "ts_ns": 1465581029100000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 484,
      "row": 0,
      "thread": 7,
      "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u", (4)
    "ts_ms": 1465581029523, (5)
    "ts_us": 1465581029523758, (6)
    "ts_ns": 1465581029523758914 (7)
  }
}
表 12. 更新事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在更新事件值中,before 字段包含每个表列的字段以及数据库提交前该列中的值。在此示例中,first_name 的值为 Anne.

2

after

一个可选字段,指定事件发生后行的状态。您可以比较 beforeafter 结构来确定此行的更新内容。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

描述事件源元数据的

必需字段。source 字段结构与创建事件中的字段相同,但某些值不同,例如,样本更新事件来自 binlog 中的不同位置。源元数据包括:

  • Debezium 版本

  • 连接器名称

  • 记录事件的 binlog 名称

  • binlog 位置

  • 事件中的行

  • 如果事件属于快照

  • 包含已更新行的

    数据库和表名称

  • 创建事件的 MySQL 线程的 ID(仅限非快照)

  • MySQL 服务器 ID(如果可用)

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

6

ts_us

一个可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

7

ts_ns

一个可选字段,显示连接器处理事件的时间(以纳秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

更新行主键/唯一键的列会更改该行键的值。当键更改时,Debezium 会发出三个事件:一个带有该行旧键的 DELETE 事件和一个墓碑事件,然后是一个带有该行新键的事件。详细信息将在下一节介绍。

主键更新

更改行

主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器会发出旧键的 DELETE 事件记录和新(更新)键的 CREATE 事件记录,而不是 UPDATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:

  • DELETE 事件记录具有 __debezium.newkey 作为消息头。此标头的值是更新行的

    新主键。

  • CREATE 事件记录具有 __debezium.oldkey 作为消息头。此标头的值是更新行以前(旧)的主键。

delete 事件

删除更改事件中的值具有与

同一表的创建更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:

{
  "schema": { ... },
  "payload": {
    "before": { (1)
      "id": 1004,
      "first_name": "Anne Marie",
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": null, (2)
    "source": { (3)
      "version": "3.3.1.Final",
      "connector": "mysql",
      "name": "mysql-server-1",
      "ts_ms": 1465581902300,
      "ts_us": 1465581902300000,
      "ts_ns": 1465581902300000000,
      "snapshot": false,
      "db": "inventory",
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 805,
      "row": 0,
      "thread": 7,
      "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d", (4)
    "ts_ms": 1465581902461, (5)
    "ts_us": 1465581902461842, (6)
    "ts_ns": 1465581902461842579 (7)
  }
}
表 13. 删除事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

描述事件源元数据的

必需字段。在删除事件值中,source 字段结构与

同一表的创建更新事件相同。许多 source 字段值也相同。在删除事件值中,ts_mspos 字段值以及其他值可能会发生变化。但删除事件值中的 source 字段提供了相同的元数据。

  • Debezium 版本

  • 连接器名称

  • 记录事件的 binlog 名称

  • binlog 位置

  • 事件中的行

  • 如果事件属于快照

  • 包含已更新行的

    数据库和表名称

  • 创建事件的 MySQL 线程的 ID(仅限非快照)

  • MySQL 服务器 ID(如果可用)

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

6

ts_us

一个可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

7

ts_ns

一个可选字段,显示连接器处理事件的时间(以纳秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

删除更改事件记录为使用者提供了处理此行删除所需的信息。旧值包含在内,因为某些使用者可能需要它们来正确处理删除。

MySQL 连接器事件旨在与Kafka 日志压缩配合使用。日志压缩允许删除一些较旧的消息,只要保留每个键的至少最新消息。这允许 Kafka 重新获得存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

墓碑事件

当行被删除时,delete 事件值仍然与日志压缩配合使用,因为 Kafka 可以删除具有相同键的所有早期消息。但是,为了让 Kafka 删除具有相同键的所有消息,消息值必须为 null。为了实现这一点,在 Debezium MySQL 连接器发出delete 事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但值为 null

truncate 事件

截断更改事件指示表已被截断。截断事件的消息键为 null。消息值类似于以下示例:

{
    "schema": { ... },
    "payload": {
        "source": { (1)
            "version": "3.3.1.Final",
            "name": "mysql-server-1",
            "connector": "mysql",
            "name": "mysql-server-1",
            "ts_ms": 1465581029100,
            "ts_us": 1465581029100000,
            "ts_ns": 1465581029100000000,
            "snapshot": false,
            "db": "inventory",
            "table": "customers",
            "server_id": 223344,
            "gtid": null,
            "file": "mysql-bin.000003",
            "pos": 484,
            "row": 0,
            "thread": 7,
            "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
        },
        "op": "t", (2)
        "ts_ms": 1465581029523, (3)
        "ts_us": 1465581029523468, (4)
        "ts_ns": 1465581029523468471 (5)
    }
}
表 14. 截断事件值字段描述
Item Field name (字段名) 描述

1

source (源)

描述事件源元数据的

必需字段。在截断事件值中,source 字段结构与

同一表的创建更新删除事件相同,提供了此元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 记录事件的 binlog 名称

  • binlog 位置

  • 事件中的行

  • 如果事件属于快照

  • 数据库和表名称

  • 截断事件的 MySQL 线程 ID(仅限非快照)

  • MySQL 服务器 ID(如果可用)

  • 数据库中发生更改的时间戳

2

op (操作)

描述操作类型的

必需字符串。op 字段值为 t,表示此表被截断。

3

ts_ms

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

4

ts_us

一个可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

5

ts_ns

一个可选字段,显示连接器处理事件的时间(以纳秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

如果单个 TRUNCATE 语句应用于多个表,连接器将为每个截断的表发出一个截断更改事件记录。

截断事件表示对整个表进行的更改,并且没有消息键。因此,对于具有多个分区的

主题,对于与表相关的更改事件(创建更新等)或截断事件,没有排序保证。例如,如果使用者从多个分区读取表的事件,它可能会在从另一个分区接收到删除表中所有数据的截断事件后,从一个分区接收到表的更新事件。仅保证使用单个分区的主题的排序。

如果您不希望连接器捕获truncate 事件,请使用 skipped.operations 选项将其过滤掉

数据类型映射

Debezium MySQL 连接器用类似于其所在表的事件来表示行的更改。事件包含一个字段以表示每个列值。该列的 MySQL 数据类型决定了 Debezium 在事件中表示该值的方式。

存储字符串的列在 MySQL 中定义了字符集和排序规则。MySQL 连接器在读取 binlog 事件中列值的二进制表示时使用列的字符集。

连接器可以将 MySQL 数据类型映射到字面量语义类型。

  • 文字类型:使用 Kafka Connect 模式类型表示值的

    方式。

  • 语义类型:Kafka Connect 模式如何捕获字段的含义(模式名称)。

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

基本类型

下表显示了连接器如何映射基本 MySQL 数据类型。

表 15. 基本类型映射描述
MySQL 类型 文字类型 语义类型

BOOLEAN, BOOL

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT(>1)

BYTES

io.debezium.data.Bits

length 模式参数包含一个表示位数的整数。byte[] 包含小端格式的位,并且大小足以容纳指定的位数。例如,其中 n 是位数:
numBytes = n/8 + (n%8== 0 ? 0 : 1)

TINYINT

INT16

n/a

SMALLINT[(M)]

INT16

n/a

MEDIUMINT[(M)]

INT32

n/a

INT, INTEGER[(M)]

INT32

n/a

BIGINT[(M)]

INT64

n/a

REAL[(M,D)]

FLOAT32

n/a

FLOAT[(P)]

FLOAT32FLOAT64

精度仅用于确定存储大小。精度 P 从 0 到 23 导致 4 字节单精度 FLOAT32 列。精度 P 从 24 到 53 导致 8 字节双精度 FLOAT64 列。

DOUBLE[(M,D)]

FLOAT64

n/a

CHAR(M)]

STRING

n/a

VARCHAR(M)]

STRING

n/a

BINARY(M)]

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

VARBINARY(M)]

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

TINYBLOB

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

TINYTEXT

STRING

n/a

BLOB

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

仅支持大小 up to 2GB 的值。建议使用声明检查模式(claim check pattern)外部化大型列值。

TEXT

STRING

n/a

仅支持大小 up to 2GB 的值。建议使用声明检查模式(claim check pattern)外部化大型列值。

MEDIUMBLOB

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

MEDIUMTEXT

STRING

n/a

LONGBLOB

BYTESSTRING

n/a

原始字节(默认)、base64 编码的字符串、base64-url 安全编码的字符串或十六进制编码的字符串,具体取决于 binary.handling.mode 连接器配置属性的设置。

仅支持大小 up to 2GB 的值。建议使用声明检查模式(claim check pattern)外部化大型列值。

LONGTEXT

STRING

n/a

仅支持大小 up to 2GB 的值。建议使用声明检查模式(claim check pattern)外部化大型列值。

JSON

STRING

io.debezium.data.Json

包含 JSON 文档、数组或标量

的字符串表示。

ENUM

STRING

io.debezium.data.Enum

allowed 模式参数包含允许值

的逗号分隔列表。

SET

STRING

io.debezium.data.EnumSet

allowed 模式参数包含允许值

的逗号分隔列表。

YEAR[(2|4)]

INT32

io.debezium.time.Year

TIMESTAMP[(M)]

STRING

io.debezium.time.ZonedTimestamp

采用 ISO 8601 格式,精确到微秒。MySQL 允许 M 的范围是 0-6

.

时间类型

TIMESTAMP 数据类型外,MySQL 时间类型取决于 time.precision.mode 连接器配置属性的值。对于默认值指定为 CURRENT_TIMESTAMPNOWTIMESTAMP 列,Kafka Connect 模式中使用 1970-01-01 00:00:00 作为默认值。

MySQL 允许 DATEDATETIMETIMESTAMP 列的零值,因为零值有时比 null 值更受欢迎。当列定义允许 null 值时,MySQL 连接器会将零值表示为 null 值,否则表示为纪元日

不带时区的时间值

DATETIME 类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如您所见,没有时区信息。此类列通过使用 UTC转换为纪元毫秒或微秒,具体取决于列的精度。TIMESTAMP 类型表示没有时区信息的时间戳。MySQL 在写入时将其从服务器(或会话)的当前时区转换为 UTC,在读取值时从 UTC 转换为服务器(或会话)的当前时区。例如:

  • DATETIME 值为 2018-06-20 06:37:03,转换为 1529476623000

  • TIMESTAMP 值为 2018-06-20 06:37:03,转换为 2018-06-20T13:37:03Z

这些列被转换为等效的 io.debezium.time.ZonedTimestamp(基于服务器(或会话)的当前时区)在 UTC 中。默认情况下,将从服务器查询时区。

运行 Kafka Connect 和 Debezium 的 JVM 的时区设置不会影响这些转换。

有关与时间值相关的属性的更多详细信息,请参阅MySQL 连接器配置属性的文档。

time.precision.mode=adaptive_time_microseconds(default)

MySQL 连接器根据列的数据类型定义确定字面量类型和语义类型,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有正的 TIME 字段值在 00:00:00.00000023:59:59.999999 的范围内才能被正确捕获。

表 16. time.precision.mode=adaptive_time_microseconds 时的映射
MySQL 类型 文字类型 语义类型

DATE

INT32

io.debezium.time.Date
表示自 epoch 以来的天数。

TIME[(M)]

INT64

io.debezium.time.MicroTime
以微秒为单位表示时间值,不包含时区信息。MySQL 允许 M 的范围为 0-6

DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)

INT64

io.debezium.time.Timestamp
表示自 epoch 以来的毫秒数,不包含时区信息。

DATETIME(4), DATETIME(5), DATETIME(6)

INT64

io.debezium.time.MicroTimestamp
表示自 epoch 以来的微秒数,不包含时区信息。

time.precision.mode=connect

MySQL 连接器使用定义的 Kafka Connect 逻辑类型。此方法比默认方法精度低,并且如果数据库列的小数秒精度值大于 3,则事件的精度可能会降低。只能处理 00:00:00.00023:59:59.999 范围内的值。仅当您可以确保表中的 TIME 值永远不会超过支持的范围时,才应设置 time.precision.mode=connectconnect 设置预计将在 Debezium 的未来版本中移除。

表 17. time.precision.mode=connect 时的映射
MySQL 类型 文字类型 语义类型

DATE

INT32

org.apache.kafka.connect.data.Date
表示自 epoch 以来的天数。

TIME[(M)]

INT64

org.apache.kafka.connect.data.Time
表示自午夜以来的微秒时间值,不包含时区信息。

DATETIME[(M)]

INT64

org.apache.kafka.connect.data.Timestamp
表示自 epoch 以来的毫秒数,不包含时区信息。

Decimal 类型

Debezium 连接器根据 decimal.handling.mode 连接器配置属性的设置来处理 decimal。

decimal.handling.mode=precise
表 18. decimal.handling.mode=precise 时的映射
MySQL 类型 文字类型 语义类型

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包含一个表示小数点移动了多少位的整数。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal
scale 模式参数包含一个表示小数点移动了多少位的整数。

decimal.handling.mode=double
表 19. decimal.handling.mode=double 时的映射
MySQL 类型 文字类型 语义类型

NUMERIC[(M[,D])]

FLOAT64

n/a

DECIMAL[(M[,D])]

FLOAT64

n/a

decimal.handling.mode=string
表 20. decimal.handling.mode=string 时的映射
MySQL 类型 文字类型 语义类型

NUMERIC[(M[,D])]

STRING

n/a

DECIMAL[(M[,D])]

STRING

n/a

布尔值

MySQL 以特定方式内部处理 BOOLEAN 值。BOOLEAN 列在内部映射到 TINYINT(1) 数据类型。当流式传输期间创建表时,它会使用正确的 BOOLEAN 映射,因为 Debezium 会接收原始 DDL。在快照期间,Debezium 会执行 SHOW CREATE TABLE 来获取表定义,这些定义会为 BOOLEANTINYINT(1) 列返回 TINYINT(1)。然后,Debezium 无法获得原始类型映射,因此会映射到 TINYINT(1)

为了使您能够将源列转换为布尔数据类型,Debezium 提供了一个 TinyIntOneToBooleanConverter 自定义转换器,您可以通过以下方式之一使用它:

  • 将所有 TINYINT(1)TINYINT(1) UNSIGNED 列映射到 BOOLEAN 类型。

  • 使用逗号分隔的正则表达式列表枚举一组列。
    要使用这种类型的转换,您必须设置 converters 配置属性和 selector 参数,如下例所示

    converters=boolean
    boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
    boolean.selector=db1.table1.*, db1.table2.column1
  • 注意:在某些情况下,数据库在快照执行 SHOW CREATE TABLE 时可能不会显示 tinyint unsigned 的长度,这意味着此转换器不起作用。新的选项 length.checker 可以解决此问题,默认值为 true。禁用 length.checker 并将需要转换为 selected 属性的列指定为,而不是基于类型转换所有列,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
    boolean.length.checker=false
    boolean.selector=db1.table1.*, db1.table2.column1

空间类型

目前,Debezium MySQL 连接器支持以下空间数据类型。

表 21. 空间类型映射描述
MySQL 类型 文字类型 语义类型

GEOMETRY,
LINESTRING,
POLYGON,
MULTIPOINT,
MULTILINESTRING,
MULTIPOLYGON,
GEOMETRYCOLLECTION

STRUCT

io.debezium.data.geometry.Geometry
包含一个具有两个字段的结构:

  • srid (INT32:空间参考系统 ID,定义了存储在结构中的几何对象的类型。

  • wkb (BYTES):以 Well-Known-Binary (wkb) 格式编码的几何对象的二进制表示。有关更多详细信息,请参阅开放地理空间联盟

向量类型

目前,Debezium MySQL 连接器支持以下向量数据类型。

表 22. 向量类型映射说明
MySQL 类型 文字类型 语义类型

VECTOR

ARRAY (FLOAT32)

io.debezium.data.FloatVector

自定义转换器

默认情况下,Debezium MySQL 连接器提供了几种 CustomConverter 实现,用于 MySQL 数据类型。这些自定义转换器根据连接器配置提供特定数据类型的替代映射。要将 CustomConverter 添加到连接器,请按照自定义转换器文档中的说明进行操作。

TINYINT(1) 转布尔值

默认情况下,在连接器快照期间,Debezium MySQL 连接器从 JDBC 驱动程序获取列类型,该驱动程序将 TINYINT(1) 类型分配给 BOOLEAN 列。然后,Debezium 使用这些 JDBC 列类型来定义快照事件的模式。在连接器从快照过渡到流式传输阶段后,默认映射产生的更改事件模式可能导致 BOOLEAN 列的映射不一致。为了帮助确保 MySQL 一致地发出 BOOLEAN 列,您可以应用自定义 TinyIntOneToBooleanConverter,如下面的配置示例所示。

示例:TinyIntOneToBooleanConverter 配置
converters=tinyint-one-to-boolean
tinyint-one-to-boolean.type=io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter
tinyint-one-to-boolean.selector=.*.MY_TABLE.DATA
tinyint-one-to-boolean.length.checker=false

在上例中,selectorlength.checker 属性是可选的。默认情况下,转换器会检查 TINYINT 数据类型是否符合长度 1。如果将 length.checker 设置为 false,则转换器不会显式确认 TINYINT 数据类型是否符合长度 1selector 根据提供的正则表达式指定要转换的表或列。如果省略 selector 属性,则转换器会将所有 TINYINT 列映射到逻辑 BOOL 字段类型。如果您不配置 selector 选项,并且希望将 TINYINT 列映射到 TINYINT(1),请省略 length.checker 属性,或将其值设置为 true

JDBC 接收器数据类型

如果您将 Debezium JDBC Sink 连接器与 Debezium MySQL Source 连接器集成,MySQL 连接器在快照和流式传输阶段发出列属性的方式不同。为了让 JDBC Sink 连接器一致地消耗来自快照和流式传输阶段的更改,您必须将 JdbcSinkDataTypesConverter 转换器作为 MySQL Source 连接器配置的一部分包含,如下面的示例所示。

示例:JdbcSinkDataTypesConverter 配置
converters=jdbc-sink
jdbc-sink.type=io.debezium.connector.binlog.converters.JdbcSinkDataTypesConverter
jdbc-sink.selector.boolean=.*.MY_TABLE.BOOL_COL
jdbc-sink.selector.real=.*.MY_TABLE.REAL_COL
jdbc-sink.selector.string=.*.MY_TABLE.STRING_COL
jdbc-sink.treat.real.as.double=true

在上例中,selector.*treat.real.as.double 配置属性是可选的。

selector.* 属性指定了逗号分隔的正则表达式列表,这些列表指定了转换器应用于哪些表和列。默认情况下,转换器对所有表中的所有布尔、实数和基于字符串的列数据类型应用以下规则:

  • BOOLEAN 数据类型始终作为 INT16 逻辑类型发出,其中 1 表示 true0 表示 false

  • REAL 数据类型始终作为 FLOAT64 逻辑类型发出。

  • 基于字符串的列始终包含 __debezium.source.column.character_set 模式参数,其中包含列的字符集。

对于每种数据类型,您可以配置一个选择器规则来覆盖默认范围,并仅将选择器应用于特定表和列。例如,要设置布尔转换器的范围,请将以下规则添加到连接器配置中,如上例所示:converters.jdbc-sink.selector.boolean=.*.MY_TABLE.BOOL_COL

设置 MySQL

在安装和运行 Debezium 连接器之前,需要执行一些 MySQL 设置任务。

创建用户

Debezium MySQL 连接器需要一个 MySQL 用户帐户。此 MySQL 用户必须在 Debezium MySQL 连接器捕获更改的所有数据库上具有适当的权限。

先决条件
  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

过程
  1. 创建 MySQL 用户

    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  2. 授予用户所需的权限

    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

    有关所需权限的说明,请参阅用户权限说明

    如果使用 Amazon RDS 或 Amazon Aurora 等不允许全局读锁的托管选项,则会使用表级锁来创建一致性快照。在这种情况下,您还需要将 LOCK TABLES 权限授予您创建的用户。有关更多详细信息,请参阅快照
  3. 完成用户权限设置

    mysql> FLUSH PRIVILEGES;
    表 23. 用户权限说明
    Keyword 描述

    SELECT

    允许连接器从数据库中的表中选择行。仅在执行快照时使用。

    RELOAD

    允许连接器使用 FLUSH 语句来清除或重新加载内部缓存、刷新表或获取锁。仅在执行快照时使用。

    SHOW DATABASES

    允许连接器通过发出 SHOW DATABASE 语句来查看数据库名称。仅在执行快照时使用。

    REPLICATION SLAVE

    使连接器能够连接到并读取 MySQL 服务器 binlog。

    REPLICATION CLIENT

    允许连接器使用以下语句:

    • SHOW MASTER STATUS

    • SHOW SLAVE STATUS

    • SHOW BINARY LOGS

    连接器始终需要此权限。

    ON

    标识权限适用的数据库。

    TO 'user'

    指定要授予权限的用户。

    IDENTIFIED BY 'password'

    指定用户的 MySQL 密码。

启用 binlog

您必须为 MySQL 复制启用二进制日志。二进制日志以允许副本传播这些更改的方式记录事务更新。

先决条件
  • 一个 MySQL 服务器。

  • 适当的 MySQL 用户权限。

过程
  1. 检查 log-bin 选项是否已启用

    // for MySQL 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySQL 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
  1. 如果 binlog 为 OFF,请将下表中的属性添加到 MySQL 服务器的配置文件中

    server-id         = 223344 # Querying variable is called server_id, e.g. SELECT variable_value FROM information_schema.global_variables WHERE variable_name='server_id';
    log_bin                     = mysql-bin
    binlog_format               = ROW
    binlog_row_image            = FULL
    binlog_expire_logs_seconds  = 864000
  2. 再次检查 binlog 状态以确认您的更改

    // for MySQL 5.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    // for MySQL 8.x
    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM performance_schema.global_variables WHERE variable_name='log_bin';
  1. 如果您在 Amazon RDS 上运行 MySQL,则必须为数据库实例启用自动备份才能进行二进制日志记录。即使您应用了先前步骤中描述的设置,如果数据库实例未配置为执行自动备份,binlog 仍将禁用。

    表 24. MySQL binlog 配置属性说明
    属性 描述

    server-id

    server-id 的值对于 MySQL 集群中的每个服务器和复制客户端都必须是唯一的。

    log_bin

    log_bin 的值是 binlog 文件序列的基本名称。

    binlog_format

    binlog-format 必须设置为 ROWrow

    binlog_row_image

    binlog_row_image 必须设置为 FULLfull

    binlog_expire_logs_seconds

    binlog_expire_logs_seconds 对应于已弃用的系统变量 expire_logs_days。这是自动删除 binlog 文件的秒数。默认值为 2592000,等于 30 天。设置值以匹配您环境的需求。有关更多信息,请参阅MySQL 删除 binlog 文件

启用 GTID

全局事务标识符 (GTID) 唯一标识集群内服务器上发生的事务。虽然 Debezium MySQL 连接器不需要 GTID,但使用 GTID 可以简化复制并让您更轻松地确认主服务器和副本服务器是否一致。

GTID 在 MySQL 5.6.5 及更高版本中可用。有关更多详细信息,请参阅MySQL 文档

先决条件
  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

过程
  1. 启用 gtid_mode

    mysql> gtid_mode=ON
  2. 启用 enforce_gtid_consistency

    mysql> enforce_gtid_consistency=ON
  3. 确认更改

    mysql> show global variables like '%GTID%';
    Result
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    | enforce_gtid_consistency | ON    |
    | gtid_mode                | ON    |
    +--------------------------+-------+
    表 25. GTID 选项说明
    Option 描述

    gtid_mode

    布尔值,指定 MySQL 服务器的 GTID 模式是否已启用。

    • ON = 启用

    • OFF = 禁用

    enforce_gtid_consistency

    布尔值,指定服务器是否通过允许执行可以以事务安全方式记录的语句来强制 GTID 一致性。使用 GTID 时必需。

    • ON = 启用

    • OFF = 禁用

配置会话超时

当大型数据库进行初始一致性快照时,已建立的连接可能会在表读取期间超时。您可以通过在 MySQL 配置文件中配置 interactive_timeoutwait_timeout 来防止此行为。

先决条件
  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

过程
  1. 配置 interactive_timeout

    mysql> interactive_timeout=<duration-in-seconds>
  2. 配置 wait_timeout

    mysql> wait_timeout=<duration-in-seconds>
    表 26. MySQL 会话超时选项说明
    Option 描述

    interactive_timeout

    服务器在关闭交互式连接之前等待其上活动的时间(秒)。:leveloffset: +1

    有关更多信息,请参阅MySQL 文档

    wait_timeout

    服务器在非交互式连接关闭前等待活动的时间(以秒为单位)。有关更多信息,请参阅MySQL 文档

启用查询日志事件

您可能希望查看每个 binlog 事件的原始 SQL 语句。在 MySQL 配置文件中启用 binlog_rows_query_log_events 选项可让您做到这一点。

此选项在 MySQL 5.6 及更高版本中可用。

先决条件
  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

过程
  • 在 MySQL 中启用 binlog_rows_query_log_events

    mysql> binlog_rows_query_log_events=ON

    binlog_rows_query_log_events 设置为一个值,该值启用/禁用在 binlog 条目中包含原始 SQL 语句的支持。

    • ON = 启用

    • OFF = 禁用

验证 binlog 行值选项

验证数据库中 binlog_row_value_options 变量的设置。为了使连接器能够消费 UPDATE 事件,此变量必须设置为 PARTIAL_JSON 以外的值。

先决条件
  • 一个 MySQL 服务器。

  • SQL 命令的基本知识。

  • 访问 MySQL 配置文件。

过程
  1. 检查当前变量值

    mysql> show global variables where variable_name = 'binlog_row_value_options';
    Result
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    | binlog_row_value_options |       |
    +--------------------------+-------+
  2. 如果变量的值设置为 PARTIAL_JSON,请运行以下命令取消设置:

    mysql> set @@global.binlog_row_value_options="" ;

部署

要部署 Debezium MySQL 连接器,请安装 Debezium MySQL 连接器存档,配置连接器,并通过将配置添加到 Kafka Connect 来启动连接器。

先决条件
过程
  1. 下载 Debezium MySQL 连接器插件

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path

  4. 配置连接器将配置添加到您的 Kafka Connect 集群

  5. 重新启动您的 Kafka Connect 进程以加载新 JAR 文件。

如果您使用的是不可变容器,请参阅Debezium 的容器映像,其中已安装 Apache Kafka MySQL 和 Kafka Connect(已安装 MySQL 连接器)并可运行。

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

MySQL 连接器配置示例

以下是连接器实例的配置示例,该实例捕获来自 192.168.99.100 上端口 3306 的 MySQL 服务器的数据,我们将其逻辑命名为 fullfillment。通常,您会在 JSON 文件中配置 Debezium MySQL 连接器,方法是设置可用于该连接器的配置属性。

您可以选择为数据库中的模式和表的子集生成事件。可选地,您可以忽略、掩盖或截断包含敏感数据、大于指定大小或您不需要的列。

{
    "name": "inventory-connector", (1)
    "config": {
        "connector.class": "io.debezium.connector.{context}.{connector-name}Connector", (2)
        "database.hostname": "192.168.99.100", (3)
        "database.port": "3306", (4)
        "database.user": "debezium-user", (5)
        "database.password": "debezium-user-pw", (6)
        "database.server.id": "184054", (7)
        "topic.prefix": "fullfillment", (8)
        "database.include.list": "inventory", (9)
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", (10)
        "schema.history.internal.kafka.topic": "schemahistory.fullfillment", (11)
        "include.schema.changes": "true" (12)
    }
}
1 连接器在注册到 Kafka Connect 服务时的名称。
2 连接器的类名。
3 MySQL 服务器地址。
4 MySQL 服务器端口号。
5 具有适当权限的 MySQL 用户。
6 MySQL 用户的密码。
7 连接器的唯一 ID。
8 MySQL 服务器或集群的主题前缀。
9 指定服务器托管的数据库列表。
10 连接器用于将 DDL 语句写入和恢复到数据库模式历史记录主题的 Kafka Broker 列表。
11 数据库模式历史记录主题的名称。此主题仅供内部使用,不应被使用者使用。
12 指定连接器是否为 DDL 更改生成事件并将其发送到与模式前缀同名的模式更改主题以供使用者使用

的标志。

有关您可以为 Debezium MySQL 连接器设置的配置属性的完整列表,请参阅MySQL 连接器配置属性

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个连接器任务,该任务执行以下操作:

  • 连接到 MySQL 数据库。

  • 读取处于捕获模式的表的更改数据表。

  • 将更改事件记录流式传输到 Kafka 主题。

添加连接器配置

要开始运行 MySQL 连接器,请配置连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件
过程
  1. 为 MySQL 连接器创建配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

连接器启动后,它会执行配置的 MySQL 数据库的一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

连接器属性

Debezium MySQL 连接器有许多配置属性,您可以使用它们来实现适用于您应用程序的正确连接器行为。许多属性都有默认值。

MySQL 连接器配置属性信息组织如下:

必需的 Debezium MySQL 连接器配置属性

以下配置属性是必需的,除非有默认值可用。

bigint.unsigned.handling.mode
Default value

long

描述

指定连接器如何在更改事件中表示 BIGINT UNSIGNED 列。

设置以下选项之一:

long

使用 Java long 数据类型来表示 BIGINT UNSIGNED 列值。尽管 long 类型没有最大的精度,但在大多数消费者中很容易实现。在大多数环境中,这是首选设置。

precise

使用 java.math.BigDecimal 数据类型来表示值。连接器使用 Kafka Connect org.apache.kafka.connect.data.Decimal 数据类型以二进制编码格式表示值。如果连接器通常处理大于 2^63 的值,请设置此选项。long 数据类型无法传达如此大的值。

binary.handling.mode
Default value

bytes

描述

指定连接器如何在更改事件中表示二进制列(例如 blobbinaryvarbinary)的值。

设置以下选项之一:

bytes

将二进制数据表示为字节数组。

base64

将二进制数据表示为 base64 编码的字符串。

base64-url-safe

将二进制数据表示为 base64-url 安全编码的字符串。

hex

将二进制数据表示为十六进制编码(base16)的字符串。

column.exclude.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配要从更改事件记录值中排除的列的完全限定名称。源记录中的其他列照常捕获。列的完全限定名称格式为 databaseName.tableName.columnName

为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。如果您在此配置中包含此属性,请不要同时设置 column.include.list 属性。

column.include.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配要包含在更改事件记录值中的列的完全限定名称。其他列将从事件记录中省略。列的完全限定名称格式为 databaseName.tableName.columnName

为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。
如果您在此配置中包含此属性,请不要设置 column.exclude.list 属性。

column.mask.hash.v2.hashAlgorithm.with.salt.salt
Default value

无默认值

描述

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。列的完全限定名称格式为 <databaseName>.<tableName>.<columnName>

为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。在生成的更改事件记录中,指定列的值将被替换为假名。

假名由应用指定的hashAlgorithmsalt 产生的哈希值组成。根据使用的哈希函数,可以维护引用完整性,同时用假名替换列值。支持的哈希函数在 Java Cryptography Architecture 标准算法名称文档的MessageDigest 部分中进行了描述。

在下面的示例中,CzQMA0cB5K 是随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如果需要,假名会自动截断为列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。

取决于使用的hashAlgorithm、选择的salt 和实际数据集,生成的数据集可能无法完全隐藏。

哈希策略版本 2 确保在不同位置或系统中哈希的值的保真度。

column.mask.with.length.chars
Default value

无默认值

描述

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望连接器掩盖一组列的值,请设置此属性(例如,如果它们包含敏感数据)。将 length 设置为正整数,以将指定列中的数据替换为属性名称中由 length 指定的星号(*)字符的数量。将 length 设置为 0(零)以将指定列中的数据替换为空字符串。

列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

column.propagate.source.type
Default value

无默认值

描述

一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称。设置此属性时,连接器会将以下字段添加到事件记录的模式中:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

    这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。

    启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

    列的完全限定名称遵循以下格式之一:databaseName.tableName.columnName,或 databaseName.schemaName.tableName.columnName

    为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

column.truncate.to.length.chars
Default value

无默认值

描述

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

connect.timeout.ms
Default value

30000(30 秒)

描述

一个正整数值,指定连接器在连接请求超时之前等待连接到 MySQL 数据库服务器的最大时间(以毫秒为单位)。

connector.class
Default value

无默认值

描述

连接器的 Java 类名。对于 MySQL 连接器,始终指定 io.debezium.connector.mysql.MySqlConnector

database.exclude.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配您不希望连接器从中捕获更改的数据库名称。连接器捕获未在 database.exclude.list 中命名的任何数据库中的更改。

为了匹配数据库名称,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与数据库的整个名称字符串进行匹配;它不匹配可能存在于数据库名中的子字符串。

如果您在此配置中包含此属性,请不要同时设置 database.include.list 属性。

database.hostname
Default value

无默认值

描述

MySQL 数据库服务器的 IP 地址或主机名。

database.include.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配连接器从中捕获更改的数据库名称。名称不在 database.include.list 中的任何数据库的更改都不会被捕获。默认情况下,连接器捕获所有数据库中的更改。

为了匹配数据库名称,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与数据库的整个名称字符串进行匹配;它不匹配可能存在于数据库名中的子字符串。

如果您在此配置中包含此属性,请不要同时设置 database.exclude.list 属性。

database.jdbc.driver
Default value

com.mysql.cj.jdbc.Driver

描述

指定连接器使用的驱动程序类名。

如果配置的驱动程序不是随连接器打包的驱动程序,请设置此属性。

database.password
Default value

无默认值

描述

连接器用于连接 MySQL 数据库服务器的 MySQL 用户的密码。

database.port
Default value

3306

描述

MySQL 数据库服务器的整数端口号。

database.protocol
Default value

jdbc:mysql

描述

指定驱动程序连接字符串用于连接数据库的 JDBC 协议。

database.server.id
Default value

无默认值

描述

此数据库客户端的数字 ID。指定的 ID 在 MySQL 集群中所有当前运行的数据库进程中必须是唯一的。要启用从 binlog 读取,连接器使用此唯一 ID 作为另一个服务器加入 MySQL 数据库集群。

database.user
Default value

无默认值

描述

连接器用于连接 MySQL 数据库服务器的 MySQL 用户名。

decimal.handling.mode
Default value

precise

描述

指定连接器如何在更改事件中处理 DECIMALNUMERIC 列的值。

设置以下选项之一:

precise

使用

二进制形式的 java.math.BigDecimal 值来精确表示值。

double

使用 double 数据类型来表示值。此选项可能会导致精度损失,但大多数消费者更容易使用。

string

将值编码为格式化字符串。此选项易于消费,但可能会导致真实类型的语义信息丢失。

event.deserialization.failure.handling.mode 已弃用
Default value

fail

描述

指定在反序列化 binlog 事件期间发生异常后连接器的反应。

此选项已弃用。

请改用 event.processing.failure.handling.mode 属性。

此选项接受以下选项:

fail

传播异常,指示有问题的事件及其 binlog 偏移量,并导致连接器停止。

warn

记录有问题的事件及其 binlog 偏移量,然后跳过该事件。

ignore

忽略有问题的事件,并且不记录任何内容。

field.name.adjustment.mode
Default value

无默认值

描述

指定字段名称应如何调整以与连接器使用的消息转换器兼容。

设置以下选项之一:

none

无调整。

avro

将 Avro 名称中无效的字符替换为下划线字符。

avro_unicode

将下划线字符或不能在 Avro 名称中使用的字符替换为相应的 Unicode,例如 _uxxxx

下划线字符(_)表示转义序列,类似于 Java 中的反斜杠。

有关更多信息,请参阅:Avro 命名

gtid.source.excludes
Default value

无默认值

描述

用逗号分隔的正则表达式列表,匹配连接器用于在 MySQL 服务器上查找 binlog 位置的 GTID 集合中的源域 ID。设置此属性时,连接器仅使用源 UUID 不匹配任何指定 exclude 模式的 GTID 范围。

为了匹配 GTID 的值,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与 GTID 的域标识符进行匹配。

如果设置此属性,请不要同时设置 gtid.source.includes 属性。

gtid.source.includes
Default value

无默认值

描述

用逗号分隔的正则表达式列表,匹配连接器用于在 MySQL 服务器上查找 binlog 位置的 GTID 集合中的源域 ID。设置此属性时,连接器仅使用源 UUID 匹配任何指定 include 模式的 GTID 范围。

为了匹配 GTID 的值,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与 GTID 的域标识符进行匹配。

如果设置此属性,请不要同时设置 gtid.source.excludes 属性。

include.query
Default value

false

描述

一个布尔值,指定连接器发出的更改事件是否包含生成更改的 SQL 查询。

将此属性设置为 true 可能会暴露您通过其他设置显式排除或掩盖的表或字段的信息。

要启用此属性,数据库属性 binlog_annotate_row_events 必须设置为 ON

设置此属性对快照过程生成的事件没有影响。快照事件不包含原始 SQL 查询。

有关配置数据库以返回每个日志事件的原始 SQL 语句的更多信息,请参阅启用查询日志事件

include.schema.changes
Default value

true

描述

一个布尔值,指定连接器是否将数据库模式中的更改发布到与主题前缀同名的 Kafka 主题。连接器将每个模式更改记录下来,键包含数据库名称,值为描述模式更新的 JSON 结构。此记录模式更改的机制独立于连接器对数据库模式历史记录更改的内部记录。

include.schema.comments
Default value

false

描述

一个布尔值,指定连接器是否解析和发布元数据对象上的表和列注释。

当您将此选项设置为 true 时,连接器包含的模式注释会为每个模式对象添加大量字符串数据。增加逻辑模式对象的数量和大小会增加连接器使用的内存量。
inconsistent.schema.handling.mode
Default value

fail

描述

指定连接器如何响应引用数据库中不存在的表的 binlog 事件。也就是说,内部表示与数据库不一致。

设置以下选项之一:

fail

连接器会抛出异常,报告有问题的事件及其 binlog 偏移量。然后连接器会停止。

warn

连接器记录有问题的事件及其 binlog 偏移量,然后跳过该事件。

skip

连接器跳过有问题的事件,并且不在日志中报告它。

message.key.columns
Default value

无默认值

描述

一组表达式,用于指定连接器用于为发布到指定表 Kafka 主题的更改事件记录形成自定义消息键的列。默认情况下,Debezium 使用表的

主键列作为其发出的记录的消息键。要代替默认值,或为缺少主键的表指定键,您可以配置基于一个或多个列的自定义消息键。

要为表建立自定义消息键,请列出该表,然后列出用作消息键的列。每个列表条目采用以下格式:

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

要基于多个列名设置表键,请在列名之间插入逗号。

每个完全限定的表名都是一个正则表达式,格式如下:

<databaseName>.<tableName>

该属性可以包含多个表的条目。使用分号分隔列表中的表条目。

以下示例设置了 inventory.customerspurchase.orders 表的消息键:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

对于 inventory.customer 表,将 pk1pk2 列指定为消息键。对于任何数据库中的 purchaseorders 表,将 pk3pk4 列用作消息键。

用于创建自定义消息键的列数没有限制。但是,最好使用唯一键所需的最小数量。

name (名称)
Default value

无默认值

描述

连接器的唯一名称。如果尝试使用相同的名称注册多个连接器,注册将失败。所有 Kafka Connect 连接器都要求提供此属性。

schema.name.adjustment.mode
Default value

无默认值

描述

指定连接器如何调整模式名称以与连接器使用的消息转换器兼容。

设置以下选项之一:

none

无调整。

avro

将 Avro 名称中无效的字符替换为下划线字符。

avro_unicode

将下划线字符或不能用于 Avro 名称的字符替换为相应的 Unicode 字符,例如 _uxxxx

_ 是一个转义序列,类似于 Java 中的反斜杠。
skip.messages.without.change
Default value

false

描述

指定连接器在未检测到包含列更改时是否为记录发出消息。如果列列在 column.include.list 中,或者未列在 column.exclude.list 中,则认为这些列是包含的。将值设置为 true 可防止连接器在包含的列中没有更改时捕获记录。

table.exclude.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配完全限定的表标识符,这些标识符来自您不希望连接器捕获更改的表。连接器捕获 table.exclude.list 中未包含的任何表中的更改。每个标识符的格式为 databaseName.tableName

为了匹配列名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。

如果设置了此属性,则不要同时设置 table.include.list 属性。

table.include.list
Default value

空字符串

描述

一个可选的、逗号分隔的正则表达式列表,匹配您希望捕获更改的表的完全限定表标识符。连接器不捕获 table.include.list 中未包含的任何表中的更改。每个标识符的格式为 databaseName.tableName。默认情况下,连接器捕获它配置为捕获更改的每个数据库中所有非系统表中的更改。

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。

如果设置了此属性,则不要同时设置 table.exclude.list 属性。

tasks.max
Default value

1

描述

要为该连接器创建的最大任务数。由于 MySQL 连接器始终使用单个任务,因此更改默认值无效。

time.precision.mode
Default value

adaptive_time_microseconds

描述

指定连接器用于表示时间、日期和时间戳值的精度类型。

设置以下选项之一:

adaptive_time_microseconds

连接器使用数据库中存在的毫秒、微秒或纳秒精度值(取决于数据库列的类型)精确捕获日期、datetime 和 timestamp 值,但 TIME 类型字段除外,它始终以微秒捕获。

adaptive

(已弃用) 连接器使用数据库中存在的毫秒、微秒或纳秒精度值(取决于列的数据类型)精确捕获时间和时间戳值。

connect

连接器始终使用 Kafka Connect 内置的时间、日期和时间戳表示形式来表示时间和时间戳值,无论数据库列的精度如何,这些表示形式都使用毫秒精度。

tombstones.on.delete
Default value

true

描述

指定 delete 事件之后是否跟随一个 tombstone 事件。源记录被删除后,连接器可以发出一个 tombstone 事件(默认行为),以便在主题启用了日志压缩时,Kafka 可以完全删除与已删除行的键相关的所有事件。

设置以下选项之一:

true

连接器通过发出 delete 事件和随后的 tombstone 事件来表示删除操作。

false

连接器仅发出 delete 事件。

topic.prefix
Default value

无默认值

描述

一个字符串,指定 Debezium 捕获更改的 MySQL 数据库服务器或集群的命名空间。由于主题前缀用于命名此连接器发出的所有事件的 Kafka 主题,因此主题前缀在所有连接器中都是唯一的非常重要。值必须仅包含字母数字字符、连字符、点和下划线。

设置此属性后,请勿更改其值。如果更改值,在连接器重新启动后,连接器将不会继续将事件发出到原始主题,而是将后续事件发出到基于新值命名的主题。连接器也无法恢复其数据库模式历史主题。

高级 Debezium MySQL 连接器配置属性

以下列表描述了高级 MySQL 连接器配置属性。这些属性的默认值很少需要更改。因此,您无需在连接器配置中指定它们。

binlog.buffer.size
Default value

0

描述

binlog 读取器使用的前瞻缓冲区的大小。默认设置 0 会禁用缓冲。

在特定条件下,MySQL binlog 可能包含由 ROLLBACK 语句完成的未提交数据。典型示例是使用保存点或在单个事务中混合临时表和常规表更改。

当检测到事务开始时,Debezium 会尝试滚动 binlog 位置,并查找 COMMITROLLBACK,以便确定是否应流式传输事务中的更改。事务缓冲区的大小定义了 Debezium 在查找事务边界时可以缓冲的最大事务更改数。如果事务的大小大于缓冲区,则 Debezium 在流式传输时必须回溯并重新读取未包含在缓冲区中的事件。

此功能处于孵化阶段。鼓励提供反馈。预计此功能不完全成熟。
connect.keep.alive
Default value

true

描述

一个布尔值,指定是否应使用单独的线程来确保与 MySQL 服务器或集群的连接保持活动状态。

converters
Default value

无默认值

描述

列出连接器可以使用自定义转换器实例的符号名称,用逗号分隔。例如,boolean

需要此属性才能使连接器能够使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

boolean.type: io.debezium.connector.binlog.converters.TinyIntOneToBooleanConverter

如果您想进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。要将这些附加配置参数与转换器关联,请在参数名称前加上转换器的符号名称。

例如,要定义一个 selector 参数来指定 boolean 转换器处理的列子集,请添加以下属性:

boolean.selector=db1.table1.*, db1.table2.column1
custom.metric.tags
Default value

无默认值

描述

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,相应的值代表该键的值,例如 k1=v1,k2=v2

连接器将指定的标签附加到基础 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来标识特定的应用程序实例、环境、区域、版本等。

有关更多信息,请参阅自定义 MBean 名称

database.initial.statements
Default value

无默认值

描述

一个分号分隔的 SQL 语句列表,当建立与数据库的 JDBC 连接(不是读取事务日志的连接)时执行。要将分号指定为 SQL 语句中的字符而不是分隔符,请使用两个分号 (;;)。

连接器可能会自行决定建立 JDBC 连接,因此此属性仅用于配置会话参数。它不用于执行 DML 语句。

database.query.timeout.ms
Default value

600000 (10 分钟)

描述

指定连接器等待查询完成的时间(以毫秒为单位)。

将值设置为 0(零)可删除超时限制。

database.ssl.keystore
Default value

无默认值

描述

一个可选设置,指定密钥存储文件的位置。密钥存储文件可用于客户端与 MySQL 服务器之间的双向身份验证。

database.ssl.keystore.password
Default value

无默认值

描述

密钥存储文件的密码。仅当配置了 database.ssl.keystore 时才指定密码。

database.ssl.mode
Default value

preferred

描述

指定连接器是否使用加密连接。

以下是可用设置:

disabled

指定使用未加密的连接。

preferred

如果服务器支持安全连接,连接器将建立加密连接。如果服务器不支持安全连接,连接器将回退到使用未加密连接。

required

连接器建立加密连接。如果无法建立加密连接,连接器将失败。

verify_ca

连接器行为与设置 required 选项时相同,但它还会根据配置的证书颁发机构 (CA) 证书验证服务器 TLS 证书。如果服务器 TLS 证书与任何有效的 CA 证书都不匹配,连接器将失败。

verify_identity

连接器的行为与设置 verify_ca 选项时相同,但它还会验证服务器证书是否与远程连接的主机匹配。

database.ssl.truststore
Default value

无默认值

描述

用于服务器证书验证的信任库文件的位置。

database.ssl.truststore.password
Default value

无默认值

描述

信任库文件的密码。用于检查信任库的完整性并解锁信任库。

enable.time.adjuster
Default value

true

描述

一个布尔值,指示连接器是否将 2 位年份表示转换为 4 位年份。当转换完全委托给数据库时,将值设置为 false

MySQL 用户可以使用 2 位或 4 位数字插入年份值。2 位值映射到 1970 - 2069 年范围内的年份。默认情况下,连接器会执行转换。

errors.max.retries
Default value

-1

描述

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。

设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

>0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

event.converting.failure.handling.mode
Default value

warn

描述

指定连接器由于列数据类型与 Debezium 内部模式指定的类型不匹配而无法转换表记录时如何响应。设置以下选项之一:

fail

一个异常报告转换失败,因为字段的数据类型与模式类型不匹配,并指出可能需要在 schema _only_recovery 模式下重新启动连接器以实现成功转换。

warn

连接器将 null 值写入事件字段,用于转换失败的列,并将一条消息写入警告日志。

skip

连接器将 null 值写入事件字段,用于转换失败的列,并将一条消息写入调试日志。

event.processing.failure.handling.mode
Default value

fail

描述

指定连接器在处理事件时发生的故障(例如,遇到损坏的事件)如何处理。以下是可用设置:

fail

连接器将引发一个报告有问题的事件及其位置的异常。然后连接器停止。

warn

连接器不引发异常。相反,它会记录有问题的事件及其位置,然后跳过该事件。

ignore

连接器忽略有问题的事件,并且不生成日志条目。

heartbeat.action.query
Default value

无默认值

描述

指定连接器在发送心跳消息时在源数据库上执行的查询。

例如,以下查询定期捕获源数据库中已执行 GTID 集合的状态。

INSERT INTO gtid_history_table (select @gtid_executed)

heartbeat.interval.ms
Default value

0

描述

指定连接器将心跳消息发送到 Kafka 主题的频率。默认情况下,连接器不发送心跳消息。

心跳消息对于监视连接器是否正在接收数据库的更改事件很有用。心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。

incremental.snapshot.allow.schema.changes
Default value

false

描述

指定连接器是否允许在增量快照期间进行模式更改。当值设置为 true 时,连接器会在增量快照期间检测到模式更改,并重新选择当前块以避免锁定 DDL。

不支持主键的更改。在增量快照期间更改主键可能会导致不正确的结果。进一步的限制是,如果模式更改仅影响列的默认值,则直到从 binlog 流处理 DDL 时才会检测到该更改。这不会影响快照事件的值,但这些快照事件的模式可能包含过时的默认值。

incremental.snapshot.chunk.size
Default value

1024

描述

连接器在检索增量快照块时获取并加载到内存中的最大行数。增加块大小可提高效率,因为快照运行的快照查询数量更少,但每次查询的规模更大。但是,更大的块大小也需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。

incremental.snapshot.watermarking.strategy
Default value

insert_insert

描述

指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。

您可以指定以下选项之一:

insert_insert (默认)

当您发送信号以启动增量快照时,对于 Debezium 在快照期间读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,Debezium 会插入第二个条目,记录关闭窗口的信号。

insert_delete

当您发送信号以启动增量快照时,对于 Debezium 读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,此条目将被删除。不会为关闭快照窗口的信号创建条目。设置此选项可防止信号数据集合快速增长。

max.batch.size
Default value

2048

描述

正整数值,指定在此连接器的每次迭代中处理的事件批次的最大大小。

max.queue.size
Default value

8192

描述

一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列,然后再将它们写入 Kafka。在连接器以比写入 Kafka 更快的速度摄取消息或 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。在连接器定期记录偏移量时,会忽略队列中保存的事件。始终将 max.queue.size 设置为大于 max.batch.size 的值。

max.queue.size.in.bytes
Default value

0

描述

一个长整数值,指定阻塞队列的最大卷(以字节为单位)。默认情况下,不为阻塞队列指定卷限制。要指定队列可以消耗的字节数,请将此属性设置为正长值。如果还设置了 max.queue.size,则当队列中的记录数达到任一属性指定的限制时,写入队列将被阻止。例如,如果设置 max.queue.size=1000max.queue.size.in.bytes=5000,则当队列包含 1000 条记录,或当队列中的记录卷达到 5000 字节时,写入队列将被阻止。

min.row.count.to.stream.results
Default value

1000

描述

在快照期间,连接器会查询连接器配置为捕获更改的所有表。连接器使用每个查询结果生成一个读取事件,该事件包含该表中所有行的数据。此属性决定了 MySQL 连接器是将表的结果集放入内存(这很快,但需要大量内存)还是流式传输结果(这可能较慢,但适用于非常大的表)。此属性的设置指定了表必须包含的最小行数,然后连接器才会流式传输结果。

要跳过所有表大小检查并始终流式传输所有结果,请将此属性设置为 0

notification.enabled.channels
Default value

无默认值

描述

为连接器启用的通知通道名称列表。默认情况下,以下通道可用:

  • sink

  • log

  • jmx

您还可以选择实现自定义通知通道

poll.interval.ms
Default value

500 (0.5 秒)

描述

正整数值,指定在处理事件批次之前,连接器等待新更改事件出现的时间(以毫秒为单位)。

provide.transaction.metadata
Default value

false

描述

确定连接器是否生成带有事务边界的事件,并使用事务元数据丰富更改事件信封。如果您希望连接器执行此操作,请指定 true。有关更多信息,请参阅事务元数据

read.only
Default value

false

描述

指定连接器是否将水印写入信号数据集合以跟踪增量快照的进度。将值设置为 true 可使具有数据库只读连接的连接器使用不需要写入信号数据集合的增量快照水印策略。

signal.data.collection
Default value

无默认值

描述

用于将信号发送到连接器的数据集合的完全限定名称。使用以下格式指定集合名称:

<databaseName>.<tableName>

signal.enabled.channels
Default value

无默认值

描述

为连接器启用的信号通道名称列表。默认情况下,以下通道可用:

  • source (源)

  • kafka

  • file

  • jmx

您还可以选择实现自定义信号通道

skipped.operations
Default value

t

描述

要让连接器在流式传输期间跳过的操作类型的逗号分隔列表。

设置以下选项之一以指定要跳过的操作:

c

插入/创建操作。

u

更新操作。

d

删除操作。

t

截断操作。

none

连接器不跳过任何操作。

snapshot.delay.ms
Default value

无默认值

描述

连接器启动时执行快照之前应等待的毫秒数间隔。如果要启动集群中的多个连接器,此属性有助于避免快照中断,这可能会导致连接器重新平衡。

snapshot.fetch.size
Default value

未设置

描述

默认情况下,在快照期间,连接器会分批读取表内容。设置此属性可指定批次中的最大行数。

为保持连接器性能,最好保留此属性的未设置默认值。此默认配置使 MySQL 能够一次一行地将结果集流式传输到 Debezium。相比之下,如果您设置了此属性,可能会导致性能问题,因为 Debezium 会尝试一次性将整个结果集提取到内存中

snapshot.include.collection.list
Default value

table.include.list 中指定的所有表。

描述

一个可选的、用逗号分隔的正则表达式列表,匹配要包含在快照中的表的完全限定名称(<databaseName>.<tableName>)。指定的项必须在连接器的 table.include.list 属性中命名

此属性仅在连接器的 snapshot.mode 属性设置为除 never 以外的值时生效。此属性不影响增量快照的行为。

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。

snapshot.lock.timeout.ms
Default value

10000

描述

正整数,指定执行快照时获取表锁的最大等待时间(以毫秒为单位)。如果连接器在此时间间隔内无法获取表锁,则快照将失败。

有关更多信息,请参阅描述 MySQL 连接器如何执行数据库快照的文档。

snapshot.locking.mode
Default value

minimal

描述

指定连接器在执行快照期间阻止任何数据库更新的全局 MySQL 读锁的持有时间

以下是可用设置:

minimal

连接器仅在读取数据库模式和其他元数据的快照的初始阶段持有全局读锁。在快照的下一阶段,连接器在选择所有行后释放锁。为了以一致的方式执行 SELECT 操作,连接器使用 REPEATABLE READ 事务。尽管全局读锁的释放允许其他 MySQL 客户端更新数据库,但 REPEATABLE READ 隔离确保了快照的一致性,因为连接器在事务持续期间继续读取相同的数据。

extended

在快照期间阻塞所有写操作。如果客户端提交与 MySQL 中的 REPEATABLE READ 隔离级别不兼容的并发操作,请使用此设置。

none

阻止连接器在快照期间获取任何表锁。尽管此选项可与所有快照模式一起使用,但仅当快照运行时没有发生模式更改才能安全使用。使用 MyISAM 引擎定义的表始终获取表锁。因此,即使设置了此选项,这些表也会被锁定。此行为与 InnoDB 引擎定义的表不同,后者获取行级锁。

custom (自定义)

连接器根据 snapshot.locking.mode.custom.name 属性指定的实现执行快照,该属性是 io.debezium.spi.snapshot.SnapshotLock 接口的自定义实现。

snapshot.locking.mode.custom.name
Default value

无默认值

描述

snapshot.locking.mode 设置为 custom 时,使用此设置指定由 'io.debezium.spi.snapshot.SnapshotLock' 接口定义的 name() 方法提供的自定义实现的名称。

有关更多信息,请参阅自定义快照程序 SPI

snapshot.max.threads
Default value

1

描述

指定连接器在执行初始快照时使用的线程数。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会并发处理多个表。

并行初始快照是一项孵化中的功能。
snapshot.mode
Default value

initial (初始)

描述

指定连接器启动时运行快照的条件。

以下是可用设置:

always (始终)

连接器在每次启动时都运行快照。快照包括捕获表的结构和数据。使用此值可在每次连接器启动时,用捕获表中数据的完整表示来填充主题。

initial (初始)

连接器仅在未为逻辑服务器名称记录偏移量,或检测到早期快照失败时运行快照。快照完成后,连接器开始流式传输后续数据库更改的事件记录。

initial_only (仅初始)

连接器仅在未为逻辑服务器名称记录偏移量时运行快照。快照完成后,连接器将停止。它不会转换为流式传输以从 binlog 读取更改事件。

schema_only (仅模式)

Deprecated, see no_data. (已弃用,请参阅 no_data。)

no_data (无数据)

连接器运行一个仅捕获模式而不捕获任何表数据的快照。如果您不需要主题包含数据的一致快照,但希望捕获上次连接器重新启动后应用的任何模式更改,请使用此选项。

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. (将此选项设置为恢复已丢失或损坏的数据库模式历史主题。重新启动后,连接器将运行一个快照,从源表重建该主题。您还可以将此属性设置为定期修剪经历意外增长的数据库模式历史主题。)

如果自上次连接器关闭以来已将模式更改提交到数据库,请不要在此模式下执行快照。

never (从不)

When the connector starts, rather than performing a snapshot, it immediately begins to stream event records for subsequent database changes. This option is under consideration for future deprecation, in favor of the no_data option. (当连接器启动时,它不会执行快照,而是立即开始流式传输后续数据库更改的事件记录。此选项正考虑将来弃用,而采用 no_data 选项。)

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • 先前记录的偏移量指定了服务器上不可用的 binlog 位置或 GTID。

configuration_based (基于配置)

使用此选项,您可以通过一组以 'snapshot.mode.configuration.based' 前缀开头的连接器属性来控制快照行为。

custom (自定义)

连接器根据 snapshot.mode.custom.name 属性指定的实现执行快照,该属性定义了 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。

snapshot.mode.configuration.based.snapshot.data
Default value

false

描述

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表数据。

snapshot.mode.configuration.based.snapshot.on.data.error
Default value

false

描述

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定在事务日志中不再提供数据的情况下,连接器是否在快照中包含表数据。

snapshot.mode.configuration.based.snapshot.on.schema.error
Default value

false

描述

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定在模式历史记录主题不可用时,连接器是否在快照中包含表模式。

snapshot.mode.configuration.based.snapshot.schema
Default value

false

描述

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表模式。

snapshot.mode.configuration.based.start.stream
Default value

false

描述

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定快照完成后连接器是否开始流式传输更改事件。

snapshot.mode.custom.name
Default value

无默认值

描述

如果 snapshot.mode 设置为 custom,请使用此设置指定 'io.debezium.spi.snapshot.Snapshotter' 接口中定义的 name() 方法提供的自定义实现名称。连接器重启后,Debezium 会调用指定的自定义实现来确定是否执行快照。有关更多信息,请参阅自定义快照程序 SPI

snapshot.query.mode
Default value

select_all

描述

指定连接器在执行快照时如何查询数据。

设置以下选项之一:

select_all (默认)

连接器使用 select all 查询从捕获的表中检索行,可以选择根据列的 includeexclude 列表配置进行调整。

custom (自定义)

连接器根据 snapshot.query.mode.custom.name 属性指定的实现执行快照查询,该属性定义了 io.debezium.spi.snapshot.SnapshotQuery 接口的自定义实现。

与使用 snapshot.select.statement.overrides 属性相比,此设置使您能够更灵活地管理快照内容。

snapshot.query.mode.custom.name
Default value

无默认值

描述

snapshot.query.mode 设置为 custom 时,使用此设置指定由 'io.debezium.spi.snapshot.SnapshotQuery' 接口定义的 name() 方法提供的自定义实现的名称。有关更多信息,请参阅自定义快照程序 SPI

snapshot.select.statement.overrides
Default value

无默认值

描述

指定要在快照中包含的表行。如果您希望快照仅包含表中一部分行,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

该属性包含一个完全限定的表名称的逗号分隔列表,格式为 <databaseName>.<tableName>。例如:

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

对于列表中的每个表,添加一个额外的配置属性,该属性指定连接器在拍摄快照时在该表上运行的 SELECT 语句。指定的 SELECT 语句决定了要在快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides.<databaseName>.<tableName>

For example, (例如,)

snapshot.select.statement.overrides.customers.orders

从包含软删除列 delete_flagcustomers.orders 表中,如果要快照仅包含未软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器仅包含 delete_flag = 0 的记录。

snapshot.tables.order.by.row.count
Default value

disabled

描述

指定连接器在执行初始快照时处理表的顺序。

设置以下选项之一:

descending

连接器按行数顺序对表进行快照,从最多到最少。

ascending

连接器按行数顺序对表进行快照,从最少到最多。

disabled

连接器在执行初始快照时会忽略行数。

source.struct.version
Default value

v2

描述

Debezium 事件中 source 块的模式版本。Debezium 0.10 对 source 块的结构进行了一些破坏性更改,以统一所有连接器暴露的结构。

通过将此选项设置为 v1,可以生成早期版本中使用的结构。但是,不建议使用此设置,并且计划在未来的 Debezium 版本中删除它。

streaming.delay.ms
Default value

0

描述

指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 offset.flush.interval.ms 属性值的延迟值。

table.ignore.builtin
Default value

true

描述

一个布尔值,指定是否应忽略内置系统表。这适用于表包含和排除列表。默认情况下,对系统表值进行的更改不被捕获,Debezium 不为系统表更改生成事件。

topic.cache.size
Default value

10000

描述

指定可以在内存中的有界并发哈希映射中存储的主题名称数量。连接器使用缓存来帮助确定与数据集合对应的主题名称。

topic.delimiter
Default value

. (句点)

描述

指定连接器在主题名称的组件之间插入的分隔符。

topic.heartbeat.prefix
Default value

__debezium-heartbeat

描述

指定连接器将心跳消息发送到的主题的名称。主题名称采用以下格式:

topic.heartbeat.prefix.topic.prefix

例如,当您设置此属性的默认值,并且主题前缀为 fulfillment 时,主题名称为 __debezium-heartbeat.fulfillment

topic.naming.strategy
Default value

io.debezium.schema.DefaultTopicNamingStrategy

描述

连接器使用的 TopicNamingStrategy 类的名称。指定的策略决定了连接器如何命名存储数据更改、模式更改、事务、心跳等事件记录的主题。

topic.transaction
Default value

transaction

描述

指定连接器将事务元数据消息发送到的主题的名称。主题名称采用以下模式:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

use.nongraceful.disconnect
Default value

false

描述

一个布尔值,指定二进制日志客户端的 keepalive 线程是否将 SO_LINGER 套接字选项设置为 0 以立即关闭陈旧的 TCP 连接。如果连接器在 SSLSocketImpl.close 中遇到死锁,则将值设置为 true。有关更多信息,请参阅mysql-binlog-connector-java GitHub 存储库中的问题 133

guardrail.collections.max
Default value

0

描述

指定连接器可以捕获的最大表数。超过此限制将触发 guardrail.collections.limit.action 指定的操作。将此属性设置为 0 可防止连接器触发保护措施。

guardrail.collections.limit.action
Default value

warn

描述

指定如果连接器捕获的表数超过 guardrail.collections.max 属性中指定的数量时触发的操作。将属性设置为以下值之一:

fail

连接器失败并报告异常。

warn

连接器记录警告。

Debezium 连接器数据库模式历史记录配置属性

Debezium 提供了一组 schema.history.internal.* 属性来控制连接器如何与模式历史记录主题进行交互。

下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

表 27. 连接器数据库模式历史记录配置属性
属性 Default (默认值) 描述

无默认值

连接器存储数据库模式历史的 Kafka 主题的完整名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史,以及写入从源数据库读取的每个 DDL 语句。每个对都应指向 Kafka Connect 进程使用的相同 Kafka 集群。

100

一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认为 100 毫秒。

3000

一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。

30000

一个整数值,指定连接器在使用 Kafka 管理客户端创建 Kafka 历史主题时应等待的最大毫秒数。

100

连接器在连接器恢复因错误而失败之前尝试读取持久历史数据的最大次数。在收到无数据后的最长等待时间为 recovery.attempts × recovery.poll.interval.ms

false

一个布尔值,指定连接器是应忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。安全默认值为 false。跳过操作应谨慎使用,因为它可能导致在处理 binlog 时丢失或损坏数据。

false

一个布尔值,指定连接器是记录模式或数据库中所有表的模式结构,还是仅记录指定用于捕获的表的模式结构。
指定以下值之一:

false (默认)

在数据库快照期间,连接器会记录所有非系统表的模式数据,包括未指定用于捕获的表。最好保留默认设置。如果您稍后决定捕获未最初指定用于捕获的表的更改,连接器可以轻松开始从这些表中捕获数据,因为它们的模式结构已存储在模式历史记录主题中。Debezium 需要表的模式历史记录,以便它能够识别更改事件发生时存在的结构。

true

在数据库快照期间,连接器仅记录 Debezium 捕获更改事件的表的表模式。如果您更改默认值,并且稍后配置连接器以捕获数据库中的其他表的数据,则连接器将缺少捕获表中更改事件所需的模式信息。

false

一个布尔值,指定连接器是记录实例中所有逻辑数据库的模式结构。
指定以下值之一:

true

连接器仅记录 Debezium 捕获更改事件的逻辑数据库和模式中的表的模式结构。

false

连接器记录所有逻辑数据库的模式结构。

传递式 MySQL 连接器配置属性

您可以在连接器配置中设置传递属性来定制 Apache Kafka 生产者和消费者的行为。有关 Kafka 生产者和消费者配置属性的完整范围的信息,请参阅Kafka 文档

用于配置生产者和消费者客户端与模式历史记录主题交互的传递式属性

Debezium 依赖 Apache Kafka 生产者将模式更改写入数据库模式历史记录主题。同样,它依赖 Kafka 消费者在连接器启动时从数据库模式历史记录主题读取。您可以通过为一组以 schema.history.internal.producer.*schema.history.internal.consumer.* 前缀开头的传递配置属性赋值来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库模式历史记录属性控制一系列行为,例如这些客户端如何与 Kafka 代理建立安全连接,如下例所示:

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium 会从属性名称中剥离前缀,然后再将属性传递给 Kafka 客户端。

有关 Kafka 生产者配置属性Kafka 消费者配置属性 的更多信息,请参阅 Apache Kafka 文档。

用于配置 MySQL 连接器与 Kafka 信号主题交互的传递式属性

Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了 Kafka signal 属性。

表 28. Kafka 信号配置属性
属性 Default (默认值) 描述

<topic.prefix>-signal

连接器监视的用于临时信号的 Kafka 主题的名称。

如果自动主题创建被禁用,则必须手动创建所需的信号主题。信号主题对于保留信号顺序是必需的。信号主题必须只有一个分区。

kafka-signal

Kafka 消费者使用的组 ID 的名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

100

一个整数值,指定连接器在轮询信号时等待的最大毫秒数。

用于配置信号通道的 Kafka 消费者客户端的传递式属性

Debezium 为信号 Kafka 消费者的传递配置提供了支持。传递信号属性以 signal.consumer.* 前缀开头。例如,连接器会将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给 Kafka 信号消费者。

用于配置 MySQL 连接器接收器通知通道的传递式属性

下表描述了可用于配置 Debezium sink notification 通道的属性。

表 29. 接收器通知配置属性
属性 Default (默认值) 描述

无默认值

接收来自 Debezium 通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知通道之一时,此属性是必需的。

Debezium 连接器传递式数据库驱动程序配置属性

Debezium 连接器支持数据库驱动程序的传递配置。传递数据库属性以 driver.* 前缀开头。例如,连接器会将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给数据库驱动程序。

监控

Debezium MySQL 连接器提供三种类型的指标,这些指标是 Kafka 和 Kafka Connect 提供的内置 JMX 指标支持的补充。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 3. 自定义标签如何修改连接器 MBean 名称

默认情况下,MySQL 连接器使用以下 MBean 名称用于流式传输指标:

debezium.mysql:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.mysql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

快照指标

MBeandebezium.mysql:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

Attributes Type 描述

string

连接器已读取的最后一个快照事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 0

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

int

包含在快照中的表总数。

int

快照尚未复制的表数。

boolean

快照是否已启动。

boolean

快照是否已暂停。

boolean

快照是否被中止。

boolean

快照是否已完成。

boolean

快照是否被跳过。

long

快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。

long

快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

long

队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 设置为正长值,则此指标可用。

long

队列中记录的当前字节卷。

当执行增量快照时,连接器还提供以下附加快照指标:

Attributes Type 描述

string

当前快照块的标识符。

string

定义当前块的主键集的下限。

string

定义当前块的主键集的上限。

string

当前快照表的组成键集(主键)的下限。

string

当前快照表的组成键集(主键)的上限。

Debezium MySQL 连接器还提供 HoldingGlobalLock 自定义快照指标。此指标设置为布尔值,指示连接器当前是否持有全局写锁或表写锁。

流式传输指标

Debezium MySQL 连接器提供三种类型的指标,这些指标是 Kafka 和 Kafka Connect 提供的内置 JMX 指标支持的补充。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

MBeandebezium.mysql:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

Attributes Type 描述

string

连接器已读取的最后一个流式传输事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 0

long

自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。

long

自上次启动或重置指标以来,连接器处理的总创建事件数。

long

自上次启动或重置指标以来,连接器处理的总更新事件数。

long

自上次启动或重置指标以来,连接器处理的总删除事件数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

Map<String, String>

收到的最后一个事件的坐标。

string

已处理的最后一个事务的事务标识符。

long

队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 设置为正长值,则此指标可用。

long

队列中记录的当前字节卷。

Debezium MySQL 连接器还提供以下其他流式传输指标:

表 30. 其他 MySQL 流式传输指标说明
Attribute Type 描述

BinlogFilename

string

连接器最近读取的 binlog 文件名。

BinlogPosition

long

连接器已读取的 binlog 中的最后一个位置(以字节为单位)。

IsGtidModeEnabled

boolean

表示连接器当前是否正在跟踪 MySQL 服务器 GTID 的标志。

GtidSet

string

连接器在读取 binlog 时处理的最后一个 GTID 集合的字符串表示。

NumberOfSkippedEvents

long

MySQL 连接器跳过的事件数量。通常,事件因 MySQL binlog 中的格式错误或无法解析的事件而被跳过。

NumberOfDisconnects

long

MySQL 连接器断开连接的数量。

NumberOfRolledBackTransactions

long

已回滚且未流式传输的处理过的事务数。

NumberOfNotWellFormedTransactions

long

不符合 BEGIN + COMMIT/ROLLBACK 预期协议的事务数。在正常情况下,此值应为 0

NumberOfLargeTransactions

long

未包含在前瞻缓冲区中的事务数。为获得最佳性能,此值应远小于 NumberOfCommittedTransactionsNumberOfRolledBackTransactions

模式历史记录指标

MBeandebezium.mysql:type=connector-metrics,context=schema-history,server=<topic.prefix>

下表列出了可用的模式历史记录指标。

Attributes Type 描述

string

STOPPEDRECOVERING(从存储中恢复历史记录)或 RUNNING 中的一个,描述数据库模式历史记录的状态。

long

恢复开始的时间(以 epoch 秒为单位)。

long

恢复阶段读取的更改数。

long

在恢复和运行时应用的总模式更改数。

long

自从从历史存储中恢复最后一个更改以来经过的毫秒数。

long

自应用最后一个更改以来经过的毫秒数。

string

从历史存储中恢复的最后一个更改的字符串表示。

string

已应用的最后一个更改的字符串表示。

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在 Debezium 从故障中恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供至少一次更改事件的传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,在日志中报告错误或异常,并停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 MySQL 服务器。

  • 连接器正在尝试在 MySQL 不再拥有历史记录的 binlog 位置重新启动。

在这些情况下,错误消息包含有关问题的详细信息,以及可能的建议的解决方法。在更正配置或解决 MySQL 问题后,重启连接器。

MySQL 不可用

如果您的 MySQL 服务器变得不可用,Debezium MySQL 连接器将失败并出现错误,并且连接器会停止。当服务器再次可用时,重启连接器

但是,如果您连接到高可用性 MySQL 集群,则可以立即重启连接器。它将连接到集群中的另一个 MySQL 服务器,查找服务器 binlog 中代表最后一个事务的位置,并从该特定位置开始读取新服务器的 binlog。

Kafka Connect 正常停止

当 Kafka Connect 正常停止时,Debezium MySQL 连接器任务停止并在新的 Kafka Connect 进程上重启需要短暂延迟。

Kafka Connect 进程崩溃

如果 Kafka Connect 崩溃,该进程停止,并且任何 Debezium MySQL 连接器任务都会在其最近处理的偏移量未被记录的情况下终止。在分布式模式下,Kafka Connect 会在其他进程上重启连接器任务。但是,MySQL 连接器将从早期进程记录的最后一个偏移量恢复。因此,替换任务可能会重新生成崩溃前已处理的一些事件,从而导致重复事件。

每个更改事件消息都包含特定于源的信息,您可以使用这些信息来识别重复事件,例如:

  • 事件来源

  • MySQL 服务器的事件时间

  • binlog 文件名和位置 :leveloffset: +1

  • GTID(如果使用)

Kafka 不可用

Kafka Connect 框架使用 Kafka producer API 将 Debezium 更改事件记录在 Kafka 中。如果 Kafka 代理程序变得不可用,Debezium MySQL 连接器将暂停直到重新建立连接,然后连接器将从上次中断的地方恢复

MySQL 删除 binlog 文件

如果 Debezium MySQL 连接器停止时间过长,MySQL 服务器会删除较旧的 binlog 文件,并且连接器的最后位置可能会丢失。当连接器重新启动时,MySQL 服务器不再拥有起始点,并且连接器会执行另一个初始快照。如果禁用了快照,连接器将因错误而失败

有关 MySQL 连接器如何执行初始快照的详细信息,请参阅快照