Debezium Vitess 连接器
Debezium 的 Vitess 连接器可捕获 Vitess keyspace 分片中的行级更改。有关此连接器兼容的 Vitess 版本的信息,请参阅 Debezium 版本概述。
当连接器首次连接到 Vitess 集群时,它将对 keyspace 进行一致性快照。快照完成后,连接器会持续捕获提交到 Vitess keyspace 的行级更改,以插入、更新或删除数据库内容。连接器会生成数据更改事件记录,并将它们流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到该表的独立 Kafka 主题。应用程序和服务然后可以从生成的主题中消费数据更改事件记录。
概述
Vitess 的 VStream 功能在 4.0 版本中引入。它是一个更改事件订阅服务,可提供与 Vitess 集群底层 MySQL 分片中的 MySQL 二进制日志等效的信息。用户可以订阅 keyspace 中的多个分片,使其成为为下游 CDC 流程提供数据的便捷工具。
为了读取和处理数据库更改,Vitess 连接器订阅了 VTGate 的 VStream gRPC 服务。VTGate 是一个轻量级的无状态 gRPC 服务器,是 Vitess 集群设置的一部分。
该连接器允许您灵活地选择订阅 MASTER 节点或 REPLICA 节点以获取更改事件。
连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表相对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。
Vitess 中的底层 MySQL 实现会根据某个可配置的时间段清除二进制日志。由于二进制日志的内容可能不完整,因此连接器需要另一种机制来确保它捕获特定数据库的完整内容。因此,当连接器首次连接到数据库时,它会执行数据库的一致性快照。在连接器完成快照后,它会从快照创建的确切点继续流式传输更改。这样,连接器就可以从所有数据的Consistent view 开始,而不会遗漏在快照期间发生的任何更改。
该连接器容忍故障。当连接器读取更改并生成事件时,它会记录每个事件的 Vitess 全局事务 ID (VGTID) 位置。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在连接器重新启动后,它将从最后一个存储的更改事件条目继续从 VStream 读取。此行为不适用于快照。如果在快照期间连接器停止,重新启动后,连接器不会从上次中断的地方继续进行快照。我们稍后将讨论连接器 在出现问题时 的行为。
连接器的工作原理
为了最佳地配置和运行 Debezium Vitess 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据非常有用。
快照
通常,MySQL 服务器不会配置为在二进制日志中保留数据库的完整历史记录。因此,连接器无法从二进制日志中读取数据库的整个历史记录。出于这个原因,当连接器第一次启动时,它会执行数据库的初始一致性快照。您可以通过将 snapshot.mode 连接器配置属性设置为除 initial 以外的值来更改此行为。此快照功能基于 7.0 版本中引入的 VStream Copy。
|
未来版本预计将提供对失败快照的自动重试。 |
流式传输更改
Vitess 连接器花费所有时间从其订阅的 VTGate 的 VStream gRPC 服务流式传输更改。客户端从 VStream 接收更改,这些更改会以 VGTID 形式提交到底层 MySQL 服务器的二进制日志中的特定位置。
Vitess 中的 VGTID 等同于 MySQL 中的 GTID,它描述了更改事件发生的 VStream 中的位置。通常,VGTID 包含多个分片 GTID,每个分片 GTID 是一个 (Keyspace, Shard, GTID) 的元组,用于描述给定分片的 GTID 位置。
订阅 VStream 服务时,连接器需要提供 VGTID 和 Tablet 类型(例如,MASTER、REPLICA)。VGTID 描述了 VStream 应开始发送更改事件的位置;Tablet 类型描述了我们从每个分片中的哪个底层 MySQL 实例(主服务器或副本)读取更改事件。
当连接器第一次连接到 Vitess 集群时,它会从一个称为 VTCtld 的 Vitess 组件获取当前的 VGTID,并将当前的 VGTID 提供给 VStream。
Debezium Vitess 连接器充当 VStream 的 gRPC 客户端。当连接器接收到更改时,它会将事件转换为 Debezium 的*创建*、*更新*或*删除*事件,其中包含事件的 VGTID。Vitess 连接器将这些更改事件以记录的形式转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以与生成时相同的顺序异步地将更改事件记录写入适当的 Kafka 主题。
Kafka Connect 会定期在另一个 Kafka 主题中记录最近的*偏移量*。偏移量指示 Debezium 随每个事件包含的特定于源的位置信息。对于 Vitess 连接器,每个更改事件中记录的 VGTID 就是偏移量。
当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器记录的最后一个偏移量,并从最后一个记录的偏移量开始启动每个连接器。当连接器重新启动时,它会向 VStream 发送一个请求,以从该位置之后的事件开始发送事件。
主题名称
Vitess 连接器将单个表上所有插入、更新和删除操作的事件写入单个 Kafka 主题。默认情况下,Kafka 主题名称为 *topicPrefix*.*keyspaceName*.*tableName*,其中
-
*topicPrefix* 是由
topic.prefix连接器配置属性指定的。*topicPrefix* 是由topic.prefix连接器配置属性指定的。 -
*keyspaceName* 是操作发生的 keyspace(也称为数据库)的名称。
-
*tableName* 是发生操作的数据库表的名称。
例如,假设 fulfillment 是一个连接器配置中的逻辑服务器名称,该连接器捕获 Vitess 安装中的更改,该安装具有一个 commerce keyspace,其中包含四个表:products、products_on_hand、customers 和 orders。无论 keyspace 有多少分片,连接器都会将记录流式传输到以下四个 Kafka 主题:
-
fulfillment.commerce.products -
fulfillment.commerce.products_on_hand -
fulfillment.commerce.customers -
fulfillment.commerce.orders
事务元数据
Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。
|
Debezium 接收事务元数据的限制
Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
Debezium 为每个事务中的 BEGIN 和 END 分隔符生成事务边界事件。事务边界事件包含以下字段:
status-
BEGIN或END。 id-
唯一事务标识符的字符串表示。
ts_ms-
源处事务边界事件(
BEGIN或END事件)的时间。如果源未向 Debezium 提供事件时间,则该字段将表示 Debezium 处理事件的时间。注意:`ts_ms` 的单位是毫秒,但由于 MySQL 的限制,它只有秒级精度,MySQL 只能以秒为单位提供二进制日志时间戳。 event_count(针对END事件)-
事务发出的事件总数。
data_collections(针对END事件)-
一对
data_collection和event_count元素的数组,指示连接器为源自数据集合的更改发出的事件数量。
{
"status": "BEGIN",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-37\"}]",
"ts_ms": 1486500577000,
"event_count": 1,
"data_collections": [
{
"data_collection": "test_unsharded_keyspace.my_seq",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项进行覆盖,否则连接器会将事务事件发出到 <topic.prefix>.transaction 主题。
启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:
-
id- 唯一事务标识符的字符串表示形式 -
total_order- 事件在事务生成的所有事件中的绝对顺序 -
data_collection_order- 事件在事务发出的所有事件中按数据集合的顺序
以下是消息示例
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1
}
}
有序事务元数据
您可以配置 Debezium 以在数据更改事件记录中包含其他元数据。这些补充元数据可以帮助下游使用者正确处理消息顺序,而无需重新分区或其他可能导致数据被乱序消费的干扰。
要将连接器配置为发出增强的数据更改事件记录,请设置 transaction.metadata.factory 属性。当此属性设置为 VitessOrderedTransactionMetadataFactory 时,连接器会在消息 Envelope 中包含一个 transaction 字段。transaction 字段添加了提供有关事务发生顺序信息的元数据。以下字段会添加到每条消息中:
transaction_epoch-
一个非递减的值,表示事务排名所属的 epoch。
transaction_rank-
一个 epoch 内的非递减值,表示事务的顺序。
还有一个字段与事件顺序有关:
total_order-
表示事件在事务生成的所有事件中的绝对位置。此字段默认包含在标准事务元数据中。
以下示例说明了如何使用这些字段来确定事件顺序。
假设 Debezium 发出发生在同一分片且具有相同主键的两个事件的数据更改事件记录。如果发送到这些事件的 Kafka 主题被重新分区,那么这两个事件的消费者顺序将不可信。如果 Debezium 配置为提供增强的事务元数据,则消费该主题的应用程序可以应用以下逻辑来确定要应用(新事件)和要丢弃的两个事件中的哪个:
-
如果
transaction_epoch的值不相等,则返回具有较高transaction_epoch值的事件。否则,继续。 -
如果
transaction_rank的值不相等,则返回具有较高transaction_rank值的事件。否则,继续。 -
返回具有较大
total_order值的事件。
如果两个事件都没有更高的 total_order 值,则这两个事件属于同一事务。由于 total_order 字段表示事务内的事件顺序,因此值更大的事件是最近的事件。
以下示例显示了一个带有有序事务元数据的数据更改事件:
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": 1637988245467,
"ts_us": 1637988245467841,
"ts_ns": 1637988245467841698,
"transaction": {
"id": "[{\"keyspace\":\"test_unsharded_keyspace\",\"shard\":\"0\",\"gtid\":\"MySQL56/e03ece6c-4c04-11ec-8e20-0242ac110004:1-68\"}]",
"total_order": 1,
"data_collection_order": 1,
"transaction_rank": 68,
"transaction_epoch": 0
}
}
高效的事务元数据
如果您启用连接器以提供事务元数据,它将生成更多数据。连接器不仅会向事务主题发送额外的消息,而且发送到数据更改主题的消息也会更大,因为它们包含一个事务元数据块。增加的卷是由于以下因素造成的:
-
VGTID 存储两次,一次作为
source.vgtid,然后再次作为transaction.id。在包含许多分片的 keyspace 中,这些 VGTID 可能相当大。 -
在分片环境中,VGTID 通常包含每个分片的 VGTID。在包含许多分片的 keyspace 中,VGTID 字段中的数据量可能很大。
-
连接器为每个事务边界事件发送事务主题消息。通常,包含许多分片的 keyspace 会生成大量事务边界事件。
为了使 Vitess 连接器能够在不显著增加生成数据量的情况下编码事务元数据,Debezium 提供了几种单消息转换 (SMT)。以下 SMT 旨在减少 Vitess 连接器发出的事件中的数据量:
以下示例显示了一个使用上述转换的 Vitess 连接器配置的摘录:
}
[...]
"provide.transaction.metadata": "true",
"transaction.metadata.factory": "io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionMetadataFactory",
"transforms": "filterTransactionTopicRecords,removeField,useLocalVgtid",
"transforms.filterTransactionTopicRecords.type": "io.debezium.connector.vitess.transforms.FilterTransactionTopicRecords",
"transforms.removeField.type": "io.debezium.connector.vitess.transforms.RemoveField",
"transforms.removeField.field_names": "transaction.id",
"transforms.useLocalVgtid.type": "io.debezium.connector.vitess.transforms.UseLocalVgtid",
[...]
}
数据更改事件
Debezium Vitess 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于已更改的表。
Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。
以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认行为是连接器将更改事件记录流式传输到名称与事件的原始表相同的主题。
|
从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。 |
|
Vitess 连接器确保所有 Kafka Connect schema 名称都遵循 Avro schema 名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头(即 a-z、A-Z 或 \_)。逻辑服务器名称中的每个剩余字符以及 schema 和表名称中的每个字符都必须是拉丁字母、数字或下划线(即 a-z、A-Z、0-9 或 \_)。如果存在无效字符,它将被替换为下划线字符。 如果逻辑服务器名称、schema 名称或表名称包含无效字符,并且区分名称的唯一字符是无效的,因此被替换为下划线,则这可能导致意外冲突。 |
|
连接器目前不允许使用 |
更改事件键
对于给定的表,更改事件的键具有包含该事件创建时表中每个主键列的字段的结构。
考虑在 commerce keyspace 中定义的 customers 表以及该表更改事件键的示例。
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
如果 topic.prefix 连接器配置属性的值为 Vitess_server,那么对于 customers 表的每一次更改事件,只要它具有此定义,都将具有相同的键结构,其 JSON 格式如下:
{
"schema": { (1)
"type": "struct",
"name": "Vitess_server.commerce.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": { (5)
"id": "1"
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
定义键 payload 结构的 schema 的名称。此 schema 描述了已更改表的表主键的结构。键 schema 名称的格式为 connector-name.keyspace-name.table-name.
|
3 |
|
指示事件键的 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的行的键。在此示例中,键包含一个名为 |
|
尽管 |
|
如果表没有主键,则更改事件的键为 null。没有主键约束的表中的行无法唯一标识。 |
更改事件值
更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。
考虑用于显示更改事件键示例的相同样本表。
CREATE TABLE customers (
id INT NOT NULL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
为 UPDATE 和 DELETE 操作发出的事件包含表中所有列的先前值。
create 事件
以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "Vitess_server.commerce.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "keyspace"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": false,
"field": "shard"
},
{
"type": "int64",
"optional": true,
"field": "vgtid"
}
],
"optional": false,
"name": "io.debezium.connector.vitess.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "Vitess_server.commerce.customers.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (8)
"version": "3.3.0.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-45\"}]"
},
"op": "c", (9)
"ts_ms": 1559033904863, (10)
"ts_us": 1559033904863497, (10)
"ts_ns": 1559033904863497147 (10)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述值有效负载结构的 值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 看起来事件的 JSON 表示比它们描述的行大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用 Avro converter,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 |
|
一个可选字段,指定事件发生前行的状态。当 |
7 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
8 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的 必需字符串。在此示例中,
|
10 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
update 事件
对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值
的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": { (2)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { (3)
"version": "3.3.0.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-46\"}]"
},
"op": "u", (4)
"ts_ms": 1465584025523, (5)
"ts_us": 1465584025523763, (5)
"ts_ns": 1465584025523763547 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,包含数据库提交之前行中所有列的所有值。 |
2 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
3 |
|
一个强制性字段,描述事件的源元数据。
|
4 |
|
描述操作类型的 必需字符串。在更新事件值中, |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
|
更新行的主键的列会更改该行的键的值。当键更改时,Debezium 会输出*三个*事件:一个具有旧行键的 |
主键更新
更改行
主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器会发出旧键的 DELETE 事件记录和新(更新)键的 CREATE 事件记录,而不是 UPDATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头:
-
DELETE事件记录具有__debezium.newkey作为消息头。此标头的值是更新行的新主键。
-
CREATE事件记录具有__debezium.oldkey作为消息头。此标头的值是更新行以前(旧)的主键。
delete 事件
删除更改事件中的值具有与
同一表的创建和更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null, (2)
"source": { (3)
"version": "3.3.0.Final",
"connector": "vitess",
"name": "my_sharded_connector",
"ts_ms": 1559033904000,
"ts_us": 1559033904000000,
"ts_ns": 1559033904000000000,
"snapshot": "false",
"db": "",
"sequence": null,
"keyspace": "commerce",
"table": "customers",
"shard": "-80",
"vgtid": "[{\"keyspace\":\"commerce\",\"shard\":\"80-\",\"gtid\":\"MariaDB/0-54610504-47\"},{\"keyspace\":\"commerce\",\"shard\":\"-80\",\"gtid\":\"MariaDB/0-1592148-47\"}]"
},
"op": "d", (4)
"ts_ms": 1465581902461, (5)
"ts_us": 1465581902461324, (5)
"ts_ns": 1465581902461324871 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,指定事件发生前行的状态。在删除事件值中, |
2 |
|
一个可选字段,指定事件发生后行的状态。在删除事件值中, |
3 |
|
一个强制性字段,描述事件的源元数据。在*delete* 事件值中,
|
4 |
|
描述操作类型的 必需字符串。 |
5 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。
Vitess 连接器事件旨在与Kafka 日志压缩配合使用。日志压缩允许删除一些旧消息,只要保留每个键的至少最新消息。这允许 Kafka 重新获得存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。
当行被删除时,*delete* 事件仍然与日志压缩配合使用,因为 Kafka 可以删除具有该相同键的所有早期消息。但是,为了让 Kafka 删除所有具有相同键的消息,消息值必须为 null。为了实现这一点,Vitess 连接器在*delete* 事件后跟一个特殊*墓碑*事件,该事件具有相同的键但值为 null。
数据类型映射
Vitess 连接器使用与行所属的表类似的方式来表示行的更改。事件包含一个字段,用于表示每个列的值。该值在事件中的表示方式取决于列的 Vitess 数据类型。本节介绍这些映射。
如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。
基本类型
下表描述了连接器如何将基本的 Vitess 数据类型映射到事件字段中的*字面类型*和*语义类型*。
-
*字面类型*描述了如何使用 Kafka Connect schema 类型:
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP和STRUCT来表示值。 -
语义类型使用 Kafka Connect 模式的名称描述了 Kafka Connect 模式如何捕获字段的*含义*。
| Vitess 数据类型 | 字面类型 (模式类型) | 语义类型 (模式名称) 和注释 |
|---|---|---|
|
|
n/a |
|
尚未支持 |
n/a |
|
尚未支持 |
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
的字符串表示。 |
|
|
的逗号分隔列表。 |
|
|
的逗号分隔列表。 |
|
|
io.debezium.time.Year |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
尚未支持 |
n/a |
时间类型
除了 TIMESTAMP 数据类型之外,Vitess 时间类型取决于 time.precision.mode 连接器配置属性的值。
DATETIME 类型表示本地日期和时间,例如“2018-01-13 09:48:27”。正如您在前一个示例中看到的,此类型不包含时区信息。此类型的列将使用 UTC 转换为以毫秒或微秒为单位的 epoch,具体取决于列的精度。TIMESTAMP 类型表示不带时区信息的日期时间戳。写入数据时,MySQL 将 TIMESTAMP 类型从服务器或会话的时区转换为 UTC 格式。读取值时,数据库会从 UTC 格式转换为服务器或会话的当前时区。例如:
-
DATETIME值为2018-06-20 06:37:03,转换为1529476623000。 -
TIMESTAMP值为2018-06-20 06:37:03,转换为2018-06-20T13:37:03Z。
这些列将转换为等效的 io.debezium.time.ZonedTimestamp(以 UTC 为单位),基于服务器或会话的时区。默认情况下,Debezium 会查询服务器以获取时区。如果失败,您必须通过在 JDBC 连接字符串中设置 connectionTimeZone 选项来显式指定时区。例如,如果数据库的时区(全局或由 connectionTimeZone 选项为连接器配置)是“America/Los_Angeles”,则 TIMESTAMP 值“2018-06-20 06:37:03”将由具有值“2018-06-20T13:37:03Z”的 ZonedTimestamp 表示。
运行 Kafka Connect 和 Debezium 的 JVM 的时区不会影响这些转换。
有关影响时间值的属性的更多信息,请参阅 连接器配置属性。
- time.precision.mode=adaptive_time_microseconds(default)
-
Vitess 连接器根据列的数据类型定义确定字面类型和语义类型,以便事件表示数据库中的确切值。所有时间字段的单位都是微秒。只有在
00:00:00.000000到23:59:59.999999范围内的正TIME字段值才能正确捕获。表 7. time.precision.mode=adaptive_time_microseconds时的映射Vitess 类型 文字类型 语义类型 DATEINT32io.debezium.time.Date
表示自 UNIX epoch 以来的天数。TIME[(M)]INT64io.debezium.time.MicroTime
以微秒为单位表示时间值,不包含时区信息。MySQL 允许M的范围为0-6。DATETIME, DATETIME(0), DATETIME(1), DATETIME(2), DATETIME(3)INT64io.debezium.time.Timestamp
表示自 UNIX epoch 以来的毫秒数,不包含时区信息。DATETIME(4), DATETIME(5), DATETIME(6)INT64io.debezium.time.MicroTimestamp
表示自 UNIX epoch 以来的微秒数,不包含时区信息。 - time.precision.mode=connect
-
Vitess 连接器使用定义的 Kafka Connect 逻辑类型。这种方法不如默认方法精确,并且如果数据库列具有大于
3的*小数秒精度*值,则事件可能不太精确。连接器可以处理范围从00:00:00.000到23:59:59.999的值。仅当您确定表中的TIME值从不错过支持范围时,才设置time.precision.mode=connect。connect设置预计将在 Debezium 的未来版本中删除。表 8. time.precision.mode=connect时的映射Vitess 数据类型 文字类型 语义类型 DATEINT32org.apache.kafka.connect.data.Date
表示自 UNIX epoch 以来的天数。TIME[(M)]INT64org.apache.kafka.connect.data.Time
表示自午夜以来的微秒时间值,不包含时区信息。DATETIME[(M)]INT64org.apache.kafka.connect.data.Timestamp
表示自 UNIX epoch 以来的毫秒数,不包含时区信息。 - time.precision.mode=isostring
-
Vitess 连接器对所有时间类型使用字符串表示。
表 9. time.precision.mode=connect时的映射Vitess 数据类型 文字类型 语义类型 DATESTRINGn/a
TIME[(M)]STRINGn/a
DATETIME[(M)]STRINGn/a
设置 Vitess
Debezium 不需要任何特定的 Vitess 配置。按照 通过 Docker 本地安装 指南或 Kubernetes 的 Vitess Operator 指南的标准说明安装 Vitess。
-
确保 VTGate 主机及其 gRPC 端口(默认是 15991)可以从安装 Vitess 连接器的机器访问。
-
确保 VTCtld 主机及其 gRPC 端口(默认是 15999)可以从安装 Vitess 连接器的机器访问。
部署
安装 Kafka 和 Kafka Connect 后,部署 Debezium Vitess 连接器的剩余任务是下载连接器插件存档,将 JAR 文件提取到您的 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path。然后,您需要重新启动 Kafka Connect 进程以加载新的 JAR 文件。
如果您使用的是不可变容器,请参阅 Debezium 的容器镜像,其中已安装并准备好运行带有 Vitess 连接器的 Kafka 和 Kafka Connect。您还可以 在 Kubernetes 和 OpenShift 上运行 Debezium。
|
从 |
连接器配置示例
以下是一个 Vitess 连接器的配置示例,该连接器连接到 192.168.99.100 上的 Vitess(VTGate 的 VStream)服务器(端口 15991),其逻辑名称为 fullfillment。它还连接到 192.168.99.101 上的 VTCtld 服务器(端口 15999)以获取初始 VGTID。通常,您会使用连接器可用的配置属性在 .json 文件中配置 Debezium Vitess 连接器。
您可以选择为部分 schema 和表生成事件。可选地,忽略、屏蔽或截断敏感、过大或不需要的列。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "15991", (4)
"database.user": "vitess", (5)
"database.password": "vitess_password", (6)
"vitess.keyspace": "commerce", (7)
"vitess.tablet.type": "MASTER", (8)
"vitess.vtctld.host": "192.168.99.101", (9)
"vitess.vtctld.port": "15999", (10)
"vitess.vtctld.user": "vitess", (11)
"vitess.vtctld.password": "vitess_password", (12)
"topic.prefix": "fullfillment", (13)
"tasks.max": 1 (14)
}
}
| 1 | 连接器在 Kafka Connect 服务中注册时的名称。 |
| 2 | 此 Vitess 连接器类的名称。 |
| 3 | Vitess(VTGate 的 VStream)服务器的地址。 |
| 4 | Vitess(VTGate 的 VStream)服务器的端口号。 |
| 5 | Vitess 数据库服务器(VTGate)的用户名。 |
| 6 | Vitess 数据库服务器(VTGate)的密码。 |
| 7 | keyspace(也称为数据库)的名称。由于未指定分片,因此它从 keyspace 中的所有分片读取更改事件。 |
| 8 | 要从中读取更改事件的 MySQL 实例类型(MASTER 或 REPLICA)。 |
| 9 | VTCtld 服务器的地址。 |
| 10 | VTCtld 服务器的端口。 |
| 11 | VTCtld 服务器(VTCtld gRPC)的用户名。 |
| 12 | VTCtld 数据库服务器(VTCtld gRPC)的密码。 |
| 13 | Vitess 集群的主题前缀,它构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect schema 名称以及使用 Avro converter 时相应 Avro schema 的命名空间。 |
| 14 | 一次只能运行一个任务。 |
请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。
您可以通过向正在运行的 Kafka Connect 服务发送 POST 命令来发送此配置。该服务将记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。
连接器配置示例(用于 offset-storage-per-task 模式)
当您有一个大型 Vitess 安装,需要多个连接器任务来处理更改日志时,您可以使用 offset-storage-per-task 功能来启动多个连接器任务,并让每个任务处理 Vitess 分片的子集。每个任务将在 Kafka 的偏移量主题中以自己的分区空间持久化其偏移量(其正在跟踪的分片的 vgtids)。
以下是连接到 Vitess(VTGate 的 VStream)服务器的 Vitess 连接器的相同示例,但增加了三个参数以调用 offset-storage-per-task 模式:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.vitess.VitessConnector",
"database.hostname": "192.168.99.100",
"database.port": "15991",
"database.user": "vitess",
"database.password": "vitess_password",
"topic.prefix": "fullfillment",
"vitess.keyspace": "commerce",
"vitess.tablet.type": "MASTER",
"vitess.vtctld.host": "192.168.99.101",
"vitess.vtctld.port": "15999",
"vitess.vtctld.user": "vitess",
"vitess.vtctld.password": "vitess_password",
"vitess.offset.storage.per.task": true, (1)
"vitess.offset.storage.task.key.gen": 1, (2)
"vitess.prev.num.tasks": 1, (3)
"tasks.max": 2 (4)
}
}
| 1 | 指定我们要启用 offset-storage-per-task 功能 |
| 2 | 指定当前任务并行度的生成号为 1 |
| 3 | 指定前一个任务并行度的任务数是 1 |
| 4 | 指定我们要为当前任务并行度启动两个任务 |
任务到 Vitess 分片的分配基于简单的轮询算法。在此示例中,启动两个连接器任务并假设我们有 4 个 Vitess 分片(-40, 40-80, 80-c0, c0-),task0 将处理分片(-40, 80-c0),task1 将处理分片(40-80, c0-)。
我们需要三个配置参数的原因是确保每个连接器任务保存的偏移量不会相互冲突,并自动处理前一个任务并行度的偏移量迁移。为了确保我们在 Kafka 偏移量主题的分区键上不发生冲突,我们为每个连接器任务使用此分区名称方案:taskId_numTasks_gen。因此,在当前启动两个任务且生成号为 1 的示例中,task0 将在 Kafka 偏移量主题的分区键中写入其偏移量:task0_2_1,task1 将使用分区键:(task1_2_1)。gen 参数用于区分来自不同生成(生成对应于任务并行度的每次更改)的分区键。
当任务并行度更改时(例如,您想启动 4 个连接器任务而不是 2 个来处理来自 Vitess 的更大流量),您将指定 tasks.max=4,vitess.offset.storage.task.key.gen=2,vitess.prev.num.tasks=2,则此任务并行度生成的偏移量分区将是:task0_4_2,task1_4_2,task2_4_2,task3_4_2。一旦连接器重新启动,连接器将检测到当前 4 个分区键没有先前保存的偏移量,它将调用自动偏移量迁移,从前一个生成键中保存的偏移量(使用 2 个任务,每个任务处理 2 个分片)迁移。对于当前 4 个 Vitess 分片(-40, 40-80, 80-c0, c0-)的示例,task0 将处理分片(-40),task1(40-80),task2(80-c0),task3(c0-)。这些 4 个分片在先前并行度下的偏移量(使用 2 个任务,每个任务处理 2 个分片)将被自动迁移到当前并行度下使用 4 个任务(每个任务处理一个分片)。
请注意,在启用 offset-storage-per-task 功能之前,Kafka 偏移量主题中保存的偏移量的任务并行度生成号默认为 0,有一个特殊的偏移量查找用于偏移量迁移。因此,如果您已经运行 Vitess 连接器一段时间而未启用 offset-storage-per-task 功能,现在想启用此功能,请指定 vitess.offset.storage.task.key.gen=1,vitess.prev.num.tasks=1 以帮助自动迁移偏移量。
请注意,vitess.prev.num.tasks 需要与前一个任务并行度生成中实际启动的任务数相匹配。连接器任务的数量通常与您指定的 tasks.max 配置参数相同,但在罕见情况下 tasks.max > Vitess 分片数时,连接器将只启动_task_number = _Vitess_shard_number。这种情况很可能一开始就是配置错误的。
请参阅 Vitess 连接器属性的完整列表,这些属性可以在这些配置中指定。
您可以通过向正在运行的 Kafka Connect 服务发送 POST 命令来发送此配置。该服务将记录配置并启动连接器任务,该任务连接到 Vitess 数据库并将更改事件记录流式传输到 Kafka 主题。
添加连接器配置
要开始运行 Vitess 连接器,请创建一个连接器配置并将其添加到您的 Kafka Connect 集群。
-
VTGate 主机及其 gRPC 端口(默认是 15991)可以从安装 Vitess 连接器的机器访问。
-
VTCtld 主机及其 gRPC 端口(默认是 15999)可以从安装 Vitess 连接器的机器访问。
-
Vitess 连接器已安装。
-
为 Vitess 连接器创建一个配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
当连接器启动时,它开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。
监控
Debezium Vitess 连接器提供的指标类型只有一种,此外 Kafka 和 Kafka Connect 还提供内置的 JMX 指标支持。
-
流式传输指标提供有关连接器在捕获更改并流式传输更改事件记录时操作的信息。
Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,Vitess 连接器为流式传输指标使用以下 MBean 名称:
debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
流式传输指标
MBean 是 debezium.vitess:type=connector-metrics,context=streaming,server=<topic.prefix>。
| Attributes | Type | 描述 |
|---|---|---|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的最大缓冲区(以字节为单位)。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的当前缓冲区(以字节为单位)。 |
连接器配置属性
Debezium Vitess 连接器有许多配置属性,可用于实现连接器适合您应用程序的行为。许多属性都有默认值。属性信息组织如下:
以下配置属性是必需的,除非有默认值可用。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。 |
|||
无默认值 |
连接器的 Java 类名称。对于 Vitess 连接器,始终使用 |
|||
|
为该连接器创建的最大任务数。如果启用了 offset.storage.per.task 模式,Vitess 连接器可以使用多个任务。 |
|||
无默认值 |
Vitess 数据库服务器(VTGate)的 IP 地址或主机名。 |
|||
|
Vitess 数据库服务器(VTGate)的整数端口号。 |
|||
要从中流式传输更改的 keyspace 名称。 |
||||
n/a |
一个可选的要从中流式传输更改的分片名称。如果未配置,对于未分片的 keyspace,连接器将从唯一的分片流式传输更改;对于已分片的 keyspace,连接器将从 keyspace 中的所有分片流式传输更改。我们建议不配置它,以便从 keyspace 中的所有分片流式传输,因为它对重新分片操作有更好的支持。如果配置了,例如 |
|||
|
一个可选的分片 GTID 位置,用于流式传输。这必须与 |
|||
|
控制 Vitess 标志 stop_on_reshard。 |
|||
|
控制 Vitess 标志 + + |
|||
n/a |
Vitess 数据库服务器(VTGate)的可选用户名。如果未配置,则使用未经身份验证的 VTGate gRPC。 |
|||
n/a |
Vitess 数据库服务器(VTGate)的可选密码。如果未配置,则使用未经身份验证的 VTGate gRPC。 |
|||
|
要从中流式传输更改的 Tablet(因此是 MySQL)的类型 |
|||
无默认值 |
主题前缀,为 Debezium 捕获更改的特定 Vitess 数据库服务器或集群提供命名空间。只能在数据库服务器逻辑名称中使用字母数字字符、连字符、点和下划线。前缀应在所有其他连接器中唯一,因为它用作接收来自此连接器的记录的所有 Kafka 主题的名称前缀。 +
|
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,匹配您想要捕获其更改的表的完全限定表标识符。不在 |
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,匹配您*不*想捕获其更改的表的完全限定表标识符。不在 |
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,匹配应包含在更改事件记录值中的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 |
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名称格式为 keyspace.tableName.columnName。请勿同时设置 |
|||
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 列的完全限定名称遵循以下格式:databaseName.tableName.columnName。为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。 您可以在单个配置中指定多个具有不同长度的属性。 |
|||
|
控制是否在*delete* 事件后跟一个墓碑事件。 |
|||
|
指定是否启用 offset-storage-per-task 模式,该模式允许启动多个连接器任务并将偏移量按任务分区持久化。 |
|||
|
指定 vitess.offset.storage.per.task 启用时的任务并行度生成号。当您决定更改连接器任务并行度(增加或减少任务数量)时,应增加生成号。 |
|||
|
指定 vitess.offset.storage.per.task 启用时,先前任务并行度生成中使用的连接器任务数。 |
|||
空字符串 |
一个分号分隔的表及其匹配表列名称的正则表达式列表。连接器将匹配列中的值映射到发送到 Kafka 主题的更改事件记录中的键字段。当表没有主键,或者您想根据非主键字段对 Kafka 主题中的更改事件记录进行排序时,这很有用。 |
|||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
|||
initial (初始) |
指定连接器启动时执行快照的标准。将属性设置为以下值之一:
|
|||
|
您可以设置以下选项来确定 Debezium 如何表示时间、日期和时间戳值的精度:
|
|||
string |
指定如何表示 BIGINT UNSIGNED 列的更改事件。 |
以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:
您必须设置 对于为连接器配置的每个转换器,您还必须添加一个
For example, (例如,) isbn.type: io.debezium.test.IsbnConverter 如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如: isbn.schema.name: io.debezium.vitess.type.Isbn |
|
|
指定连接器在处理事件时如何应对异常: |
|
|
一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式的事件时,它会将事件放入阻塞队列,然后再将它们写入 Kafka。在连接器比写入 Kafka 的速度快,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。在连接器定期记录偏移量时,队列中保存的事件将被忽略。始终将 |
|
|
一个正整数值,指定连接器处理的每个事件批次的最大大小。 |
|
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|
|
一个正整数值,指定在连接器开始处理事件批次之前等待新更改事件出现的时间(以毫秒为单位)。默认为 500 毫秒,或 0.5 秒。 |
|
|
一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:
由于 Debezium Vitess 连接器从不将 |
|
|
确定连接器是否生成带有事务边界的事件,并使用事务元数据丰富更改事件信封。如果您希望连接器执行此操作,请指定 |
|
|
确定连接器用于跟踪事务上下文并构建表示事务的数据结构和 schema 的类。 |
|
|
控制 VStream 的 gPRC 心跳之间的间隔。默认为 |
|
无默认值 |
指定一个逗号分隔的 gRPC 头部列表。默认为空。格式为: |
|
无默认值 |
指定通道上允许接收的最大消息大小(以字节为单位)。 |
|
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配列的完全限定名称,这些列的原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段 schema 中。这些 schema 参数
用于分别传播可变宽度类型的原始类型名称和长度。这对于正确调整接收数据库中的相应列很有用。列的完全限定名称格式如下: keyspaceName.tableName.columnName |
|
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配列的数据库特定数据类型名称,这些列的原始类型和长度应作为参数添加到发出的更改事件记录中的相应字段 schema 中。这些 schema 参数
用于分别传播可变宽度类型的原始类型名称和长度。这对于正确调整接收数据库中的相应列很有用。列的完全限定名称格式如下: keyspaceName.tableName.columnName 请参阅 Vitess 连接器如何映射数据类型 以获取 Vitess 特定数据类型名称列表。 |
|
|
用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 |
|
无默认值 |
指定当 topic.naming.strategy 设置为 |
|
|
指定主题名称的分隔符,默认为 |
|
|
用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。 |
|
|
控制连接器发送事务元数据消息的主题的名称。主题名称的模式为: |
|
|
自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如: |
|
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
连接器还支持*直通*配置属性,这些属性在创建 Kafka 生产者和消费者时使用。
出现问题时的行为
Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。
如果发生故障,系统不会丢失任何事件。但是,在从故障恢复期间,它可能会重复某些更改事件。在这些异常情况下,Debezium 与 Kafka 一样,提供*至少一次*的更改事件传递。
本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:
-
连接器的配置无效。
-
连接器无法使用指定的连接参数成功连接到 Vitess。
在这些情况下,错误消息将包含问题的详细信息,并且可能包含建议的解决方法。在更正配置或解决 Vitess 问题后,请重新启动连接器。
Vitess 变得不可用
当连接器运行时,它连接到的 Vitses 服务器(VTGate)可能由于各种原因而变得不可用。如果发生这种情况,连接器将以错误失败并停止。当服务器再次可用时,请重新启动连接器。
Vitess 连接器以 Vitess VGTID 的形式在外部存储最后一个已处理的偏移量。连接器重新启动并连接到服务器实例后,连接器会与服务器通信,以从该特定偏移量继续流式传输。
无效的列名错误
此错误非常罕见。如果您收到一条错误消息,内容为 Illegal prefix '@' for column: x, from schema: y, table: z,并且您的表中不存在此列,那么这是 Vitess vstream 的一个bug,它是由列重命名或列类型更改引起的。这是一个瞬时错误。您可以在短暂退避后重新启动连接器,问题应该会自动解决。
Kafka Connect 进程正常停止
假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。
Kafka Connect 进程崩溃
如果 Kafka 连接器进程意外停止,它正在运行的任何连接器任务都将终止,而不会记录它们最近已处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,Vitess 连接器将从前一个进程记录的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前刚刚被处理过的相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。
由于在从故障中恢复时可能会发生事件重复,因此使用者应始终预料到某些重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。
在每个更改事件记录中,Debezium 连接器都会插入特定于源的信息,说明事件的来源,包括 Vitess 服务器生成事件的时间,以及事务更改写入的 binlog 中的位置。使用者可以跟踪这些信息,特别是 VGTID,以确定事件是否为重复事件。
Kafka 不可用
当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。
连接器停止一段时间
如果连接器被正常停止,数据库可以继续使用。任何更改都会记录在 Vitess binlog 中。当连接器重新启动时,它将从停止的地方恢复流式传输更改。也就是说,它将为在连接器停止期间发生的所有数据库更改生成更改事件记录。
配置得当的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是根据 Kafka 最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理大量数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能能够赶上在它停止期间发生的数据库更改。这有多快发生取决于 Kafka 的能力和性能以及 Vitess 中数据更改的量。
连接器在完成快照之前失败
如果快照未能完成,连接器不会自动重试快照。如果使用之前的偏移量重新启动连接器,连接器将跳过快照过程,并立即开始流式传输更改事件。
因此,要从失败中恢复,请手动删除连接器偏移量并启动连接器。
早期 Vitess 版本的限制
-
由于 Vitess 的一个轻微填充问题(已在 Vitess 9.0.0 中修复),精度大于或等于 13 的 decimal 值会在数字前面产生额外的空格。例如,如果表定义中的列类型是
decimal(13,4),则值-1.2300会变成"- 1.2300",值1.2300会变成" 1.2300"。 -
不支持
JSON列类型。 -
VStream 8.0.0 不提供
ENUM列的允许值的附加元数据。因此,连接器不支持ENUM列类型。将发出索引号(从 1 开始)而不是枚举值。例如,如果ENUM定义是enum('S','M','L'),则将发出"3"作为值,而不是"L"。