Debezium Vitess 连接器
Debezium 的 Vitess 连接器会捕获 Vitess keyspace 中分片(shards)的行级更改。有关此连接器兼容的 Vitess 版本的信息,请参阅 Debezium 版本概述。
当连接器首次连接到 Vitess 集群时,它会对 keyspace 进行一致性快照。快照完成后,连接器会持续捕获提交到 Vitess keyspace 的行级更改,以插入、更新或删除数据库内容。连接器会生成数据更改事件记录,并将它们流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到一个单独的 Kafka 主题中。应用程序和服务随后可以从生成的主题中消费数据更改事件记录。
概述
Vitess 的 VStream 功能在 4.0 版本中引入。它是一种更改事件订阅服务,可提供与 Vitess 集群底层 MySQL 分片二进制日志等效的信息。用户可以订阅 keyspace 中的多个分片,这使其成为为下游 CDC 流程提供数据的便捷工具。
为了读取和处理数据库更改,Vitess 连接器会订阅 VTGate 的 VStream gRPC 服务。VTGate 是一个轻量级的无状态 gRPC 服务器,是 Vitess 集群设置的一部分。
连接器允许您灵活地选择订阅 MASTER 节点或 REPLICA 节点以获取更改事件。
连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。
Vitess 中的底层 MySQL 实现会根据某些可配置的时间段清除二进制日志。由于二进制日志的内容可能不完整,因此连接器需要另一种机制来确保捕获特定数据库的完整内容。因此,当连接器首次连接到数据库时,它会执行数据库的一致性快照。在连接器完成快照后,它会继续从快照完成的确切点开始流式传输更改。通过这种方式,连接器从所有数据的统一视图开始,并且不会遗漏在快照期间进行的任何更改。
连接器具有容错能力。当连接器读取更改并生成事件时,它会记录每个事件的 Vitess 全局事务 ID (VGTID) 位置。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在连接器重新启动后,它将从最后一个已存储的更改事件条目继续从 VStream 读取。此行为不适用于快照。如果连接器在快照期间停止,重新启动后,连接器不会从上次中断的地方继续执行快照。稍后我们将讨论连接器在 出现问题时 的行为。
连接器的工作原理
为了最优地配置和运行 Debezium Vitess 连接器,理解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据将非常有帮助。
快照
通常,MySQL 服务器未配置为在二进制日志中保留数据库的完整历史记录。因此,连接器无法从二进制日志中读取数据库的整个历史记录。出于此原因,连接器首次启动时,它会执行数据库的初始一致性快照。您可以通过将 snapshot.mode 连接器配置属性设置为 initial 以外的值来更改此行为。此快照功能基于 7.0 版本中引入的 VStream Copy。
|
自动重试失败的快照将在将来的版本中提供。 |
流式传输更改
Vitess 连接器花费所有时间从其订阅的 VTGate 的 VStream gRPC 服务流式传输更改。客户端在某些位置(称为 VGTID)接收提交到底层 MySQL 服务器 binlog 的更改,这些位置被称为 VGTID。
Vitess 中的 VGTID 等同于 MySQL 中的 GTID,它描述了更改事件发生在 VStream 中的位置。通常,VGTID 包含多个分片 GTID,每个分片 GTID 是一个 (Keyspace, Shard, GTID) 的元组,用于描述给定分片的 GTID 位置。
订阅 VStream 服务时,连接器需要提供 VGTID 和 Tablet 类型(例如 MASTER、REPLICA)。VGTID 描述了 VStream 应开始发送更改事件的位置;Tablet 类型描述了每个分片中我们从中读取更改事件的底层 MySQL 实例(主节点或副本节点)。
首次连接 Vitess 集群时,连接器会从一个名为 VTCtld 的 Vitess 组件获取当前的 VGTID,并将当前的 VGTID 提供给 VStream。
Debezium Vitess 连接器充当 VStream 的 gRPC 客户端。当连接器接收到更改时,它会将事件转换为 Debezium 的创建、更新或删除事件,其中包括事件的 VGTID。Vitess 连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以生成顺序异步地将更改事件记录写入相应的 Kafka 主题。
Kafka Connect 定期将最近的偏移量记录在另一个 Kafka 主题中。偏移量指示 Debezium 随每个事件包含的特定于源的位置信息。对于 Vitess 连接器,在每个更改事件中记录的 VGTID 就是偏移量。
当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器的最后一个记录的偏移量,并从最后一个记录的偏移量开始重新启动每个连接器。当连接器重新启动时,它会向 VStream 发送一个请求,以从该位置之后的第一个事件开始发送事件。
主题名称
Vitess 连接器将对单个表的所有插入、更新和删除操作的事件写入同一个 Kafka 主题。默认情况下,Kafka 主题名称为 topicPrefix.keyspaceName.tableName,其中
-
*topicPrefix* 是由
topic.prefix连接器配置属性指定的。*topicPrefix* 是由topic.prefix连接器配置属性指定的。 -
keyspaceName 是操作发生的 keyspace(也称为数据库)的名称。
-
*tableName* 是发生操作的数据库表的名称。
例如,假设 fulfillment 是一个连接器在具有 commerce keyspace 的 Vitess 安装中捕获更改的配置中的逻辑服务器名称,该 keyspace 包含四个表:products、products_on_hand、customers 和 orders。无论 keyspace 有多少分片,连接器都会将记录流式传输到这四个 Kafka 主题:
-
fulfillment.commerce.products -
fulfillment.commerce.products_on_hand -
fulfillment.commerce.customers -
fulfillment.commerce.orders
事务元数据
Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。
|
Debezium 接收事务元数据的限制
Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
Debezium 为每个事务中的 BEGIN 和 END 分隔符生成事务边界事件。事务边界事件包含以下字段:
status-
BEGIN或END。 id-
唯一事务标识符的字符串表示。
ts_ms-
数据源处事务边界事件(
BEGIN或END事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段会表示 Debezium 处理事件的时间。注意:`ts_ms` 以毫秒为单位,但由于 MySQL 的限制,只有秒级精度,MySQL 只能提供秒级的二进制日志时间戳。 event_count(针对END事件)-
事务发出的事件总数。
data_collections(针对END事件)-
一对
data_collection和event_count元素的数组,指示连接器为源自数据集合的更改发出的事件数量。
{
"status": "BEGIN",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": 1,
"data_collections": [
{
"data_collection": "test_unsharded_keyspace.my_seq",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。
启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:
-
id- 唯一事务标识符的字符串表示形式 -
total_order- 事件在事务生成的所有事件中的绝对顺序 -
data_collection_order- 事件在事务发出的所有事件中,按数据集合的顺序
以下是消息示例
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1
}
}
有序事务元数据
您可以配置 Debezium 以在数据更改事件记录中包含其他元数据。此类补充元数据可以帮助下游使用者在重新分区或其他干扰可能导致数据乱序消费时按正确的顺序处理消息。
要配置连接器以发出丰富的数据更改事件记录,请设置 transaction.metadata.factory 属性。当此属性设置为 VitessOrderedTransactionMetadataFactory 时,连接器会在消息 Envelope 中包含一个 transaction 字段。transaction 字段添加了提供事务发生顺序信息的元数据。以下字段会添加到每条消息中:
transaction_epoch-
一个非递减值,表示事务排序所属的 epoch。
transaction_rank-
一个 epoch 内非递减的值,表示事务的顺序。
还有一个与事件顺序相关的第三个字段:
total_order-
表示事件在事务生成的所有事件中的绝对位置。此字段默认包含在标准事务元数据中。
以下示例说明了如何使用这些字段来确定事件顺序。
假设 Debezium 发出了发生在同一分片且具有相同主键的两个事件的数据更改事件记录。如果发送这些事件的 Kafka 主题被重新分区,那么这两个事件的消费者顺序将不可靠。如果 Debezium 配置为提供丰富的事务元数据,则从主题消费的应用程序可以应用以下逻辑来确定哪个事件需要应用(较新的事件),哪个需要丢弃:
-
如果
transaction_epoch的值不相等,则返回具有较高transaction_epoch值的事件。否则,继续。 -
如果
transaction_rank的值不相等,则返回具有较高transaction_rank值的事件。否则,继续。 -
返回具有较大
total_order值的事件。
如果两个事件都没有更高的 total_order 值,那么这两个事件是同一事务的一部分。因为 total_order 字段表示事务内事件的顺序,所以值较大的事件是最新的事件。
以下示例显示了带有有序事务元数据的数据更改事件:
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1,
"transaction_rank": 68,
"transaction_epoch": 0
}
}
高效的事务元数据
如果启用连接器以提供事务元数据,它将生成显著更多的数据。连接器不仅会向事务主题发送额外的消息,而且发送到数据更改主题的消息也会更大,因为它们包含事务元数据块。增加的量是由于以下因素:
-
VGTID 存储两次,一次是
source.vgtid,另一次是transaction.id。在包含许多分片的 keyspace 中,这些 VGTID 可能非常大。 -
在分片环境中,VGTID 通常包含每个分片的 VGTID。在包含许多分片的 keyspace 中,VGTID 字段中的数据量可能很大。
-
连接器为每个事务边界事件发送事务主题消息。通常,包含许多分片的 keyspace 会生成大量的事务边界事件。
为了使 Vitess 连接器能够编码事务元数据而不显著增加生成数据的量,Debezium 提供了几种单消息转换 (SMT)。以下 SMT 旨在减少 Vitess 连接器发出的事件中的数据量:
以下示例显示了使用上述转换的 Vitess 连接器配置摘录:
}
[...]
"provide.transaction.metadata": "true",
"transaction.metadata.factory": "io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory",
"transforms": "filterTransactionTopicRecords,removeField,useLocalVgtid",
"transforms.filterTransactionTopicRecords.type": "io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords",
"transforms.removeField.type": "io.debezium.connector.vitess.transforms.RemoveField",
"transforms.removeField.field_names": "transaction.id",
"transforms.useLocalVgtid.type": "io.debezium.connector.vitess.transforms.UseLocalVgtid",
[...]
}
数据更改事件
Debezium Vitess 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于更改的表。
Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。
以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认行为是将连接器流式传输更改事件记录到 与事件的原始表同名的主题。
|
从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。 |
|
Vitess 连接器确保所有 Kafka Connect 模式名称都遵循 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称的其余每个字符以及模式和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则会将其替换为下划线字符。 如果逻辑服务器名称、schema 名称或表名称包含无效字符,并且区分名称的唯一字符是无效的,因此被替换为下划线,则这可能导致意外冲突。 |
|
目前,连接器不允许使用 `@` 前缀命名列。例如, |
更改事件键
对于给定的表,更改事件的键具有一个结构,其中包含事件创建时表中主键的每个列的字段。
考虑在 commerce keyspace 中定义的 customers 表以及该表的更改事件键示例。
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
如果 topic.prefix 连接器配置属性的值为 Vitess_server,则对于具有此定义的 customers 表的所有更改事件,都将具有相同的键结构,在 JSON 中如下所示:
{
"schema": { (1)
"type": "struct",
"name": "Vitess_server.commerce.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": { (5)
"id": "1"
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
定义键的 payload 结构的 schema 名称。此 schema 描述了已更改表的表的主键结构。键 schema 名称的格式为 connector-name.keyspace-name.table-name.
|
3 |
|
指示事件键的 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的行的键。在此示例中,键包含一个名为 |
|
尽管 |
|
如果表没有主键,则更改事件的键为 null。没有主键约束的表中的行无法唯一标识。 |
更改事件值
更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。
考虑用于显示更改事件键示例的相同样本表。
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
为 UPDATE 和 DELETE 操作发出的事件包含表中所有列的先前值。
create 事件
以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "keyspace"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": false,
"field": "shard"
},
{
"type": "int64",
"optional": true,
"field": "vgtid"
}
],
"optional": false,
"name": "io.debezium.connector.vitess.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "Vitess_server.commerce.customers.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (8)
"version": "3.3.1.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
},
"op": "c", (9)
"ts_ms": 1559033904863, (10)
"ts_us": 1559033904863497, (10)
"ts_ns": 1559033904863497147 (10)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述值有效负载结构的 值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 事件的 JSON 表示可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 |
|
一个可选字段,指定事件发生前行的状态。当 |
7 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
8 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的 必需字符串。在此示例中,
|
10 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
update 事件
对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值
的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "3.3.1.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]"
},
"op": "u", (4)
"ts_ms": 1465584025523, (5)
"ts_us": 1465584025523763, (5)
"ts_ns": 1465584025523763547 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,包含数据库提交之前行中所有列的所有值。 |
2 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
3 |
|
一个强制性字段,描述事件的源元数据。
|
4 |
|
描述操作类型的 必需字符串。在更新事件值中, |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
|
更新行主键的列会更改行的键值。当键更改时,Debezium 会发出三个事件:一个带有行旧键的 |
主键更新
更改行
主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器会发出旧键的 DELETE 事件记录和新(更新)键的 CREATE 事件记录,而不是 UPDATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
-
DELETE事件记录具有__debezium.newkey作为消息头。此标头的值是更新行的新主键。
-
CREATE事件记录具有__debezium.oldkey作为消息头。此标头的值是更新行以前(旧)的主键。
delete 事件
删除更改事件中的值具有与
同一表的创建和更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"version": "3.3.1.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]"
},
"op": "d", (4)
"ts_ms": 1465581902461, (5)
"ts_us": 1465581902461324, (5)
"ts_ns": 1465581902461324871 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,指定事件发生前行的状态。在删除事件值中, |
2 |
|
一个可选字段,指定事件发生后行的状态。在删除事件值中, |
3 |
|
一个强制性字段,描述事件的源元数据。在*delete* 事件值中,
|
4 |
|
描述操作类型的 必需字符串。 |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。
Vitess 连接器事件旨在与 Kafka 日志压缩配合使用。日志压缩允许删除一些旧消息,只要保留每个键的最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。
当一行被删除时,delete 事件的值仍然与日志压缩配合使用,因为 Kafka 可以删除具有该相同键的所有早期消息。但是,为了让 Kafka 删除具有该相同键的所有消息,消息值必须为 null。为了实现这一点,Vitess 连接器会跟随一个delete 事件,该事件是一个特殊的墓碑事件,具有相同的键但值为 null。
数据类型映射
Vitess 连接器以结构类似于该行所在表的事件来表示行的更改。事件包含一个字段用于每个列值。该值在事件中如何表示取决于列的 Vitess 数据类型。本节将描述这些映射。
如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。
基本类型
下表描述了连接器如何将基本 Vitess 数据类型映射到事件字段中的字面量类型和语义类型。
-
*字面类型*描述了如何使用 Kafka Connect schema 类型:
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP和STRUCT来表示值。 -
语义类型使用 Kafka Connect 模式的名称描述了 Kafka Connect 模式如何捕获字段的*含义*。
| Vitess 数据类型 | 字面类型 (模式类型) | 语义类型 (模式名称) 和注释 |
|---|---|---|
|
|
n/a |
|
暂不支持 |
n/a |
|
暂不支持 |
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
的字符串表示。 |
|
|
的逗号分隔列表。 |
|
|
的逗号分隔列表。 |
|
|
io.debezium.time.Year |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
暂不支持 |
n/a |
时间类型
除了 TIMESTAMP 数据类型外,Vitess 时间类型取决于 time.precision.mode 连接器配置属性的值。
DATETIME 类型表示本地日期和时间,例如“2018-01-13 09:48:27”。如前面的示例所示,此类型不包含时区信息。此类型的列会根据列的精度,通过 UTC 转换为 epoch 毫秒或微秒。TIMESTAMP 类型表示没有时区信息的日期时间戳。写入数据时,MySQL 会将 TIMESTAMP 类型从服务器或会话的时区转换为 UTC 格式。读取值时,数据库会从 UTC 格式转换为服务器或会话的当前时区。例如:
-
DATETIME值为2018-06-20 06:37:03,转换为1529476623000。 -
TIMESTAMP值为2018-06-20 06:37:03,转换为2018-06-20T13:37:03Z。
此类列会根据服务器或会话的时区,转换为 UTC 中的等效 io.debezium.time.ZonedTimestamp。默认情况下,Debezium 会查询服务器以获取时区。如果失败,您必须通过在 JDBC 连接字符串中设置 connectionTimeZone 选项来显式指定时区。例如,如果数据库的时区(全局或通过 connectionTimeZone 选项为连接器配置的)为“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”将由值为“2018-06-20T13:37:03Z”的 ZonedTimestamp 表示。
运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。
有关影响时间值的属性的更多信息,请参阅 连接器配置属性。
- time.precision.mode=adaptive_time_microseconds(default)
-
Vitess 连接器确定字面量类型和语义类型,基于列的数据类型定义,以便事件准确表示数据库中的值。所有时间字段都以微秒为单位。只有范围在
00:00:00.000000到23:59:59.999999之间的正TIME字段值才能正确捕获。表 7. time.precision.mode=adaptive_time_microseconds时的映射Vitess 类型 文字类型 语义类型 DATEINT32io.debezium.time.Date
表示自 UNIX 纪元以来的天数。TIME[(M)]INT64io.debezium.time.MicroTime
以微秒为单位表示时间值,不包含时区信息。MySQL 允许M的范围为0-6。DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)INT64io.debezium.time.Timestamp
表示自 UNIX 纪元以来的毫秒数,不包含时区信息。DATETIME(4), DATETIME(5), DATETIME(6)INT64io.debezium.time.MicroTimestamp
表示自 UNIX 纪元以来的微秒数,不包含时区信息。 - time.precision.mode=connect
-
Vitess 连接器使用 Kafka Connect 的定义逻辑类型。此方法不如默认方法精确,如果数据库列的小数秒精度值大于
3,事件可能会不精确。连接器可以处理范围从00:00:00.000到23:59:59.999的值。只有当您确定表中TIME值永远不会超出支持范围时,才应设置time.precision.mode=connect。connect设置预计将在 Debezium 的将来版本中移除。表 8. time.precision.mode=connect时的映射Vitess 数据类型 文字类型 语义类型 DATEINT32org.apache.kafka.connect.data.Date
表示自 UNIX 纪元以来的天数。TIME[(M)]INT64org.apache.kafka.connect.data.Time
表示自午夜以来的微秒时间值,不包含时区信息。DATETIME[(M)]INT64org.apache.kafka.connect.data.Timestamp
表示自 UNIX 纪元以来的毫秒数,不包含时区信息。 - time.precision.mode=isostring
-
Vitess 连接器将所有时间类型使用字符串表示。
表 9. time.precision.mode=connect时的映射Vitess 数据类型 文字类型 语义类型 DATESTRINGn/a
TIME[(M)]STRINGn/a
DATETIME[(M)]STRINGn/a
设置 Vitess
Debezium 在使用 Vitess 时不需要任何特定配置。按照 本地通过 Docker 安装指南或 Kubernetes 的 Vitess Operator 指南中的标准说明安装 Vitess。
-
确保 VTGate 主机及其 gRPC 端口(默认是 15991)可从安装了 Vitess 连接器的机器访问。
-
确保 VTCtld 主机及其 gRPC 端口(默认是 15999)可从安装了 Vitess 连接器的机器访问。
部署
在安装了 Kafka 和 Kafka Connect 后,部署 Debezium Vitess 连接器的剩余任务是下载连接器的插件存档,将 JAR 文件解压到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后,您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。
如果您使用的是不可变容器,请参阅 Debezium 的容器镜像,其中已安装了适用于 Kafka 和 Kafka Connect 的 Vitess 连接器,并且已准备就绪。您还可以 在 Kubernetes 和 OpenShift 上运行 Debezium。
|
从 |
连接器配置示例
以下是一个 Vitess 连接器的配置示例,该连接器连接到位于 192.168.99.100 的 15991 端口上的 Vitess(VTGate 的 VStream)服务器,其逻辑名称为 fullfillment。它还连接到 192.168.99.101 的 15999 端口上的 VTCtld 服务器以获取初始 VGTID。通常,您会使用连接器可用的配置属性在 .json 文件中配置 Debezium Vitess 连接器。
您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "15991", (4)
"database.user": "vitess", (5)
"database.password": "vitess_password", (6)
"vitess.keyspace": "commerce", (7)
"vitess.tablet.type": "MASTER", (8)
"vitess.vtctld.host": "192.168.99.101", (9)
"vitess.vtctld.port": "15999", (10)
"vitess.vtctld.user": "vitess", (11)
"vitess.vtctld.password": "vitess_password", (12)
"topic.prefix": "fullfillment", (13)
"tasks.max": 1 (14)
}
}
| 1 | 连接器在 Kafka Connect 服务中注册时的名称。 |
| 2 | 此 Vitess 连接器类的名称。 |
| 3 | Vitess(VTGate 的 VStream)服务器的地址。 |
| 4 | Vitess(VTGate 的 VStream)服务器的端口号。 |
| 5 | Vitess 数据库服务器(VTGate gRPC)的用户名。 |
| 6 | Vitess 数据库服务器(VTGate gRPC)的密码。 |
| 7 | keyspace(也称为数据库)的名称。由于未指定分片,它会从 keyspace 中的所有分片读取更改事件。 |
| 8 | 用于读取更改事件的 MySQL 实例类型(MASTER 或 REPLICA)。 |
| 9 | VTCtld 服务器的地址。 |
| 10 | VTCtld 服务器的端口。 |
| 11 | VTCtld 服务器(VTCtld gRPC)的用户名。 |
| 12 | VTCtld 数据库服务器(VTCtld gRPC)的密码。 |
| 13 | Vitess 集群的主题前缀,它形成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及使用 Avro 转换器时相应 Avro 模式的命名空间。 |
| 14 | 一次只有一个任务应运行。 |
请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。
您可以将此配置通过 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。
偏移量存储模式的连接器配置示例
当您拥有大型 Vitess 安装,需要多个连接器任务来处理更改日志时,您可以使用 offset-storage-per-task 功能来启动多个连接器任务,让每个任务处理 Vitess 分片的一个子集。每个任务将在 Kafka 的偏移量主题中以自己的分区空间持久化其偏移量(它们跟踪的分片的 vgtids)。
以下是一个连接到 Vitess(VTGate 的 VStream)服务器的 Vitess 连接器的相同示例,但增加了三个参数来调用 offset-storage-per-task 模式。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector",
"database.hostname": "192.168.99.100",
"database.port": "15991",
"database.user": "vitess",
"database.password": "vitess_password",
"topic.prefix": "fullfillment",
"vitess.keyspace": "commerce",
"vitess.tablet.type": "MASTER",
"vitess.vtctld.host": "192.168.99.101",
"vitess.vtctld.port": "15999",
"vitess.vtctld.user": "vitess",
"vitess.vtctld.password": "vitess_password",
"vitess.offset.storage.per.task": true, (1)
"vitess.offset.storage.task.key.gen": 1, (2)
"vitess.prev.num.tasks": 1, (3)
"tasks.max": 2 (4)
}
}
| 1 | 指定我们要启用 offset-storage-per-task 功能。 |
| 2 | 指定当前任务并行度的生成号为 1。 |
| 3 | 指定前一个任务并行度的任务数为 1。 |
| 4 | 指定我们要为当前任务并行度启动两个任务。 |
任务到 Vitess 分片的分配基于简单的轮询算法。在此示例中,启动两个连接器任务,并假设我们有 4 个 Vitess 分片(-40、40-80、80-c0、c0-),task0 将处理分片(-40、80-c0),task1 将处理分片(40-80、c0-)。
我们需要三个配置参数的原因是确保每个连接器任务保存的偏移量不会相互冲突,并自动处理前一个任务并行度的偏移量迁移。为了确保我们在 Kafka 偏移量主题中的分区键不冲突,我们为每个连接器任务使用以下分区名称方案:taskId_numTasks_gen。因此,对于当前启动两个任务且生成号为 1 的示例,task0 将在 Kafka 偏移量主题的分区键中写入其偏移量:task0_2_1,task1 将使用分区键:(task1_2_1)。gen 配置参数用于区分来自不同生成的分区键(生成对应于任务并行度的每次更改)。
当任务并行度改变时(例如,您想启动 4 个连接器任务而不是 2 个来处理来自 Vitess 的更大流量),您将指定 tasks.max=4、vitess.offset.storage.task.key.gen=2、vitess.prev.num.tasks=2,此任务并行度生成的偏移量分区将为:task0_4_2、task1_4_2、task2_4_2、task3_4_2。连接器重新启动后,连接器将检测到当前 4 个分区键没有保存的先前偏移量,并将自动迁移从前一个生成键(task0_2_1 和 task1_2_1)保存的偏移量。对于当前 4 个 Vitess 分片(-40、40-80、80-c0、c0-)的示例,task0 将处理分片(-40),task1(40-80),task2(80-c0),task3(c0-)。来自前一个并行度生成(每个任务处理 2 个分片)的这 4 个分片的偏移量将自动迁移到当前生成,即使用 4 个任务(每个任务处理一个分片)。
请注意,在启用 offset-storage-per-task 功能之前,Kafka 偏移量主题中保存的偏移量的任务并行度生成号默认为 0,有一个特殊的偏移量查找过程。因此,如果您在没有启用 offset-storage-per-task 功能的情况下运行 Vitess 连接器一段时间,现在想启用此功能,请指定 vitess.offset.storage.task.key.gen=1、vitess.prev.num.tasks=1 以帮助偏移量自动迁移。
请注意,vitess.prev.num.tasks 需要与前一个任务并行度生成中启动的任务数相匹配。连接器任务的数量通常与您指定的 tasks.max 配置参数相同,但在tasks.max > Vitess 分片数这种罕见情况下,连接器只会启动_任务数 = Vitess 分片数。这种情况很可能在最初的配置中就存在错误。
请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。
您可以将此配置通过 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。
添加连接器配置
要开始运行 Vitess 连接器,请创建一个连接器配置并将其添加到您的 Kafka Connect 集群。
-
VTGate 主机及其 gRPC 端口(默认是 15991)可从安装了 Vitess 连接器的机器访问。
-
VTCtld 主机及其 gRPC 端口(默认是 15999)可从安装了 Vitess 连接器的机器访问。
-
Vitess 连接器已安装。
-
为 Vitess 连接器创建配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。
监控
Debezium Vitess 连接器仅提供一种类型的指标,除此之外 Kafka 和 Kafka Connect 还提供内置的 JMX 指标支持。
-
流式传输指标提供有关连接器运行时(当连接器正在捕获更改并流式传输更改事件记录时)的信息。
Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,Vitess 连接器为流式传输指标使用以下 MBean 名称:
debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
流式传输指标
MBean 为 debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>。
| Attributes | Type | 描述 |
|---|---|---|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。 |
连接器配置属性
Debezium Vitess 连接器有许多配置属性,您可以使用它们来实现适合您应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:
以下配置属性是必需的,除非有默认值可用。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。 |
|||
无默认值 |
连接器的 Java 类名。对于 Vitess 连接器,始终使用 |
|||
|
应为此连接器创建的最大任务数。如果您启用了 offset.storage.per.task 模式,Vitess 连接器可以使用多个任务。 |
|||
无默认值 |
Vitess 数据库服务器(VTGate)的 IP 地址或主机名。 |
|||
|
Vitess 数据库服务器(VTGate)的整数端口号。 |
|||
要从中流式传输更改的 keyspace 名称。 |
||||
n/a |
要从中流式传输更改的可选分片名称。如果未配置,在未分片的 keyspace 中,连接器将从唯一的分片流式传输更改;在已分片的 keyspace 中,连接器将从 keyspace 中的所有分片流式传输更改。建议不要配置此项,以便从 keyspace 中的所有分片流式传输,因为它对重新分片操作有更好的支持。如果配置,例如 |
|||
|
用于流式传输分片的可选 GTID 位置。这必须与 |
|||
|
控制 Vitess 标志 stop_on_reshard。 |
|||
|
控制 Vitess 标志 + + |
|||
n/a |
Vitess 数据库服务器(VTGate)的可选用户名。如果未配置,则使用未经验证的 VTGate gRPC。 |
|||
n/a |
Vitess 数据库服务器(VTGate)的可选密码。如果未配置,则使用未经验证的 VTGate gRPC。 |
|||
|
从中流式传输更改的 Tablet(因此是 MySQL)的类型。 |
|||
无默认值 |
主题前缀,为 Debezium 正在捕获更改的特定 Vitess 数据库服务器或集群提供命名空间。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。该前缀应与其他连接器唯一,因为它被用作接收此连接器记录的所有 Kafka 主题的主题名称前缀。 +
|
|||
无默认值 |
可选的、逗号分隔的正则表达式列表,用于匹配要捕获其更改的表的完全限定表标识符。 |
|||
无默认值 |
可选的、逗号分隔的正则表达式列表,用于匹配您不想捕获其更改的表的完全限定表标识符。 |
|||
无默认值 |
可选的、逗号分隔的正则表达式列表,用于匹配应包含在更改事件记录值中的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 |
|||
无默认值 |
可选的、逗号分隔的正则表达式列表,用于匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 |
|||
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。 您可以在单个配置中指定多个具有不同长度的属性。 |
|||
|
控制是否在*delete* 事件后跟一个墓碑事件。 |
|||
|
指定是否启用 offset-storage-per-task 模式以启动多个连接器任务并将偏移量按任务分区持久化。 |
|||
|
当 vitess.offset.storage.per.task 启用时,指定任务并行度生成号。当您决定更改连接器任务并行度(启动更多或更少连接器任务)时,应增加生成号。 |
|||
|
当 vitess.offset.storage.per.task 启用时,指定前一个任务并行度生成中使用的连接器任务数。 |
|||
空字符串 |
一个分号分隔的表列表,其中包含匹配表列名的正则表达式。连接器将匹配列中的值映射到它发送到 Kafka 主题的更改事件记录中的键字段。当表没有主键时,或者当您想根据非主键字段对 Kafka 主题中的更改事件记录进行排序时,此功能很有用。 |
|||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
|||
initial (初始) |
指定连接器启动时执行快照的标准。将属性设置为以下值之一:
|
|||
|
您可以设置以下选项来确定 Debezium 如何表示时间、日期和时间戳值的精度:
|
|||
string |
指定 BIGINT UNSIGNED 列在更改事件中应如何表示。 |
以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:
您必须设置 对于为连接器配置的每个转换器,您还必须添加一个
For example, (例如,) isbn.type: io.debezium.test.IsbnConverter 如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如: isbn.schema.name: io.debezium.vitess.type.Isbn |
|
|
指定连接器在处理事件时如何应对异常: |
|
|
正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取流式传输的事件时,它会将事件放入阻塞队列,然后再写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以提供反压,以从数据库读取更改事件。队列中保存的事件在连接器定期记录偏移量时会被忽略。始终将 |
|
|
一个正整数值,指定连接器处理的每个事件批次的最大大小。 |
|
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|
|
正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的时间(以毫秒为单位)。默认为 500 毫秒,即 0.5 秒。 |
|
|
一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:
由于 Debezium Vitess 连接器从不向数据更改主题发出 |
|
|
确定连接器是否生成具有事务边界的事件,以及是否用事务元数据丰富更改事件信封。如果您希望连接器执行此操作,请指定 |
|
|
确定连接器用于跟踪事务上下文并构建表示事务的数据结构和模式的类。 |
|
|
控制 VStream 的 gRPC 心跳信号之间的间隔。默认为 |
|
无默认值 |
指定一个逗号分隔的 gRPC 标头列表。默认为空。格式为: |
|
无默认值 |
指定允许在通道上接收的最大消息大小(以字节为单位)。 |
|
n/a |
一个可选的、逗号分隔的正则表达式列表,用于匹配列的完全限定名称,其原始类型和长度应作为参数添加到生成的更改事件记录中的相应字段 schema 中。这些 schema 参数
用于分别传播可变宽度类型的原始类型名称和长度。这有助于正确确定接收数据库中相应列的大小。列的完全限定名称形式如下: keyspaceName.tableName.columnName |
|
n/a |
一个可选的、逗号分隔的正则表达式列表,用于匹配列的数据库特定数据类型名称,其原始类型和长度应作为参数添加到生成的更改事件记录中的相应字段 schema 中。这些 schema 参数
用于分别传播可变宽度类型的原始类型名称和长度。这有助于正确确定接收数据库中相应列的大小。列的完全限定名称形式如下: keyspaceName.tableName.columnName 请参阅 Vitess 连接器如何映射数据类型以获取 Vitess 特定数据类型名称列表。 |
|
|
用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 |
|
无默认值 |
当 topic.naming.strategy 设置为 |
|
|
指定主题名称的分隔符,默认为 |
|
|
用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。 |
|
|
控制连接器发送事务元数据消息的主题的名称。主题名称的模式为: |
|
|
自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如: |
|
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。
出现问题时的行为
Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。
如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。
本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:
-
连接器的配置无效。
-
连接器无法使用指定的连接参数成功连接到 Vitess。
在这些情况下,错误消息会详细说明问题,可能还会提供建议的解决方案。在更正配置或解决 Vitess 问题后,重新启动连接器。
Vitess 变得不可用
当连接器运行时,它所连接的 Vitess 服务器(VTGate)可能因各种原因而变得不可用。如果发生这种情况,连接器会因错误而失败并停止。当服务器再次可用时,重新启动连接器。
Vitess 连接器以 Vitess VGTID 的形式外部存储最后一个已处理的偏移量。在连接器重新启动并连接到服务器实例后,连接器会与服务器通信,以从该特定偏移量继续流式传输。
无效的列名错误
此错误很少发生。如果您收到一条消息为 Illegal prefix '@' for column: x, from schema: y, table: z 的错误,而您的表中没有这样的列,这是 Vitess vstream bug,由列重命名或列类型更改引起。这是一个瞬时错误。在短暂退避后重启连接器,问题应该会自动解决。
Kafka Connect 进程正常停止
假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。
Kafka Connect 进程崩溃
如果 Kafka Connect 进程意外停止,它正在运行的任何连接器任务都会在未记录其最近处理的偏移量的情况下终止。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Vitess 连接器将从早期进程记录的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前刚刚处理过的相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。
由于在从故障中恢复时可能会发生事件重复,因此使用者应始终预料到某些重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。
在每个更改事件记录中,Debezium 连接器会插入有关事件来源的特定于源的信息,包括 Vitess 服务器的事件时间,以及将事务更改写入的二进制日志位置。消费者可以跟踪这些信息,特别是 VGTID,以确定事件是否是重复的。
Kafka 变得不可用
当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。
连接器停止一段时间
如果连接器被正常停止,数据库可以继续使用。任何更改都会记录在 Vitess 二进制日志中。当连接器重新启动时,它将从之前的位置继续流式传输更改。也就是说,它将为在连接器停止期间进行的数据库更改生成更改事件记录。
配置正确的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是根据 Kafka 最佳实践编写的,并且在提供足够资源的情况下,Kafka Connect 连接器也可以处理大量的数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能会赶上在停止期间所做的数据库更改。这需要多长时间取决于 Kafka 的能力和性能以及 Vitess 中数据的更改量。
连接器在完成快照前失败
如果快照未能完成,连接器不会自动重试快照。如果使用先前的偏移量重新启动连接器,连接器将跳过快照过程并立即开始流式传输更改事件。
因此,要从失败中恢复,请手动删除连接器偏移量并启动连接器。
早期 Vitess 版本的限制
-
由于 Vitess 的一个轻微填充问题(在 Vitess 9.0.0 中已修复),精度大于或等于 13 的 decimal 值会在数字前面产生额外的空格。例如,如果表定义中的列类型为
decimal(13,4),则值-1.2300变为"- 1.2300",值1.2300变为" 1.2300"。 -
不支持
JSON列类型。 -
VStream 8.0.0 不提供
ENUM列的允许值的额外元数据。因此,连接器不支持ENUM列类型。将发出索引号(基于 1)而不是枚举值。例如,如果ENUM定义为enum('S','M','L'),则会发出"3"作为值,而不是"L"。