Debezium MongoDB 连接器
Debezium 的 MongoDB 连接器会跟踪 MongoDB 副本集或 MongoDB 分片集群中的文档更改,将这些更改记录为 Kafka 主题中的事件。连接器会自动处理分片集群中分片的添加或删除、每个副本集的成员更改、每个副本集内的选举以及通信问题解决的等待。
有关此连接器兼容的 MongoDB 版本的信息,请参阅 Debezium 版本概述。
概述
MongoDB 的复制机制提供了冗余和高可用性,是生产环境中运行 MongoDB 的首选方式。MongoDB 连接器捕获副本集或分片集群中的更改。
MongoDB 副本集 由一组服务器组成,这些服务器都拥有相同数据的副本,并且复制确保客户端对副本集主节点上的文档所做的所有更改都能正确地应用于其他副本集服务器,称为从节点。MongoDB 复制的工作方式是主节点在其oplog(或操作日志)中记录更改,然后每个从节点按顺序读取主节点的 oplog 并将所有操作应用于自己的文档。当新服务器添加到副本集时,该服务器首先对主节点上的所有数据库和集合执行快照,然后读取主节点的 oplog 以应用自开始快照以来可能发生的所有更改。当新服务器赶上主节点的 oplog 尾部时,它将成为从节点(并能够处理查询)。
更改流
尽管 Debezium MongoDB 连接器不属于副本集的一部分,但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不直接读取 oplog。相反,它将 oplog 数据的捕获和解码委托给 MongoDB 的更改流功能。通过更改流,MongoDB 服务器将集合中发生的更改公开为事件流。Debezium 连接器监视流,然后将更改向下游传递。第一次检测到副本集时,连接器会检查 oplog 以获取最后记录的事务,然后执行主节点数据库和集合的快照。连接器完成数据复制后,它会从之前读取的 oplog 位置开始创建一个更改流。
在 MongoDB 连接器处理更改时,它会定期记录事件在 oplog 流中的来源位置。当连接器停止时,它会记录其处理的最后一个 oplog 流位置,以便在重新启动后可以从该位置继续流式传输。换句话说,连接器可以停止、升级或维护,并在稍后重新启动,并且始终能确切地从上次中断的地方继续,而不会丢失任何事件。当然,MongoDB oplog 通常有最大大小限制,因此如果连接器长时间停止,在连接器有机会读取它们之前,oplog 中的操作可能会被清除。在这种情况下,重新启动后,连接器会检测到丢失的 oplog 操作,执行快照,然后继续流式传输更改。
MongoDB 连接器对于副本集成员和领导者的更改、分片集群中分片的添加或删除以及可能导致通信失败的网络问题也具有很高的容忍度。连接器始终使用副本集的主节点来流式传输更改,因此当副本集进行选举并出现不同的节点成为主节点时,连接器会立即停止流式传输更改,连接到新主节点,并使用新主节点开始流式传输更改。同样,如果连接器无法与副本集主节点通信,它会尝试重新连接(使用指数退避,以免使网络或副本集过载)。连接恢复后,连接器将继续从其捕获的最后一个事件流式传输更改。通过这种方式,连接器可以动态地适应副本集成员的变化,并自动处理通信中断。
读取首选项
您可以通过在 mongodb.connection.string 中设置 readPreference 参数来指定 MongoDB 连接的读取首选项。
MongoDB 连接器的工作原理
了解连接器支持的 MongoDB 拓扑概述有助于规划您的应用程序。
支持的 MongoDB 拓扑
MongoDB 连接器支持以下 MongoDB 拓扑
- MongoDB 副本集
-
Debezium MongoDB 连接器可以从单个 MongoDB 副本集捕获更改。生产副本集至少需要三个成员。
要将 MongoDB 连接器与副本集一起使用,您必须在连接器配置中将
mongodb.connection.string属性的值设置为副本集连接字符串。当连接器准备好开始从 MongoDB 更改流捕获更改时,它会启动一个连接任务。然后,连接任务使用指定的连接字符串与可用的副本集成员建立连接。
- MongoDB 分片集群
-
一个 MongoDB 分片集群包含:
-
一个或多个分片,每个分片都部署为一个副本集;
-
一个单独的副本集,作为集群的配置服务器
-
一个或多个路由器(也称为
mongos),客户端连接到这些路由器,并将请求路由到相应分片。要将 MongoDB 连接器与分片集群一起使用,请在连接器配置中将
mongodb.connection.string属性的值设置为分片集群连接字符串。
-
- MongoDB 单机服务器
-
MongoDB 连接器无法监视单机 MongoDB 服务器的更改,因为单机服务器没有 oplog。如果将单机服务器转换为单成员副本集,连接器将可以工作。
|
MongoDB 不建议在生产环境中使用单机服务器。有关更多信息,请参阅MongoDB 文档。 |
所需用户权限
要从 MongoDB 捕获数据,Debezium 会以 MongoDB 用户的身份连接到数据库。您为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库读取。连接器用户需要以下权限:
-
从数据库读取。
-
运行
hello命令。
连接器用户可能还需要以下权限:
-
从
config.shards系统集合读取。
根据连接器 capture.scope 属性的值,连接器用户必须能够读取所有数据库,或者读取特定数据库。根据 capture.scope 设置,为用户分配以下权限之一:
capture.scope设置为deployment-
授予用户读取任何数据库的权限。
capture.scope设置为database-
授予用户读取连接器
capture.target属性指定的数据库的权限。 capture.scope设置为collection-
授予用户读取连接器
capture.target属性指定的集合的权限。
hello 命令的权限无论 capture.scope 设置如何,用户都需要运行 MongoDB hello 命令的权限。
config.shards 集合的权限根据您的 Debezium 环境,要使连接器能够执行偏移量合并,您必须授予连接器用户读取 config.shards 集合的显式权限。对于以下连接器环境,需要读取 config.shards 集合的权限:
-
从 Debezium 2.5 或更早版本升级的连接器。
-
配置为捕获分片 MongoDB 集群更改的连接器。
逻辑连接器名称
连接器配置属性 topic.prefix 用作 MongoDB 副本集或分片集群的逻辑名称。连接器在多种方式中使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的更改流位置时作为唯一标识符。
您应该为每个 MongoDB 连接器提供一个有意义地描述源 MongoDB 系统的唯一逻辑名称。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
偏移量合并
Debezium MongoDB 连接器不再支持连接到分片 MongoDB 部署的 replica_set 连接。因此,由使用 replica_set 连接模式的连接器版本记录的偏移量与当前版本不兼容。
为最大程度地减少连接模式更改的影响,并防止连接器运行不必要的快照,在连接器升级后重新启动时,它会运行一个合并偏移量的过程。在此偏移量合并过程中,连接器将完成以下步骤来协调由早期连接器版本记录的偏移量:
执行快照
当 Debezium 任务开始使用副本集时,它会使用连接器的逻辑名称和副本集名称来查找一个偏移量,该偏移量描述了连接器先前停止读取更改的位置。如果找到偏移量并且该偏移量仍然存在于 oplog 中,则任务将立即继续流式传输更改,从记录的偏移量位置开始。
但是,如果找不到偏移量,或者 oplog 中不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。此过程从记录 oplog 的当前位置开始,并将其记录为偏移量(以及一个表示已开始快照的标志)。然后,任务继续复制每个集合,尽可能多地生成线程(最多达到 snapshot.max.threads 配置属性的值),以并行执行此工作。连接器为看到的每个文档记录一个单独的读取事件。每个读取事件包含对象的标识符、对象的完整状态以及关于找到该对象的 MongoDB 副本集的源信息。源信息还包括一个标志,该标志表示事件是在快照期间生成的。
此快照将继续进行,直到复制完所有与连接器的过滤器匹配的集合。如果在任务完成快照之前停止了连接器,则在重新启动时,连接器将再次开始快照。
|
尝试避免在连接器执行任何副本集快照时进行任务重新分配和重新配置。连接器会生成日志消息来报告快照的进度。为提供最大的控制,请为每个连接器运行一个单独的 Kafka Connect 集群。 |
| Setting (设置) | 描述 |
|---|---|
|
连接器在每次启动时执行 snapshot。snapshot 完成后,连接器开始流式传输后续数据库更改的事件记录。 |
|
在连接器启动后,它会执行初始数据库快照。 |
|
The connector performs a database snapshot. After the snapshot completes, the connector stops, and does not stream event records for subsequent database changes. (连接器执行数据库快照。快照完成后,连接器停止,并且不为后续数据库更改流式传输事件记录。) |
|
Deprecated, see |
|
连接器捕获所有相关表的结构,但它不会创建 |
|
After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)
|
|
Set the snapshot mode to |
|
|
有关更多信息,请参阅连接器配置属性表中的snapshot.mode。
临时快照
By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)
然而,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供一种重新捕获集合数据的方法,Debezium 包含了一个执行临时快照的选项。在发生以下任何情况后,您可能需要执行临时快照:
-
修改了连接器配置以捕获不同的集合集。
-
Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)
-
Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)
您可以重新运行先前已捕获快照的集合的快照,方法是启动所谓的临时快照。临时快照需要使用信号集合。您可以通过向 Debezium 信号集合发送信号请求来启动临时快照。
当您启动现有集合的临时快照时,连接器会将内容追加到该集合已存在的集合中。如果先前存在的集合被删除,如果启用了自动创建主题,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容,或仅捕获数据库中部分集合。
您可以通过向信号集合发送 execute-snapshot 消息来指定要捕获的集合。将 execute-snapshot 信号的类型设置为 incremental 或 blocking,并提供要包含在快照中的集合名称,如以下表格所述:
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。) |
|
N/A |
一个包含正则表达式的数组,这些正则表达式匹配要包含在快照中的集合的完全限定名称。 |
您可以通过向信号集合添加具有 execute-snapshot 信号类型的条目,或者向 Kafka 信号主题发送信号消息来启动临时增量快照。在连接器处理完消息后,它将开始快照操作。快照过程读取第一个和最后一个主键值,并使用这些值作为每个集合的开始和结束点。根据集合中的条目数量和配置的块大小,Debezium 将集合分成块,然后依次快照每个块。
有关更多信息,请参阅增量快照。
您可以通过向信号集合或信号主题添加具有 execute-snapshot 信号类型的条目来启动临时阻塞快照。在连接器处理完消息后,它将开始快照操作。连接器会暂时停止流式传输,然后初始化指定集合的快照,遵循初始快照过程中使用的相同过程。快照完成后,连接器将恢复流式传输。
有关更多信息,请参阅阻塞快照。
增量快照
To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)
在增量快照中,Debezium 不会一次性捕获数据库的全部状态,而是在一系列可配置的块中分阶段捕获每个集合。您可以指定要快照的集合以及每个块的大小。块大小决定了快照在每次从数据库获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,并维护其捕获的每个集合行的记录。这种分阶段捕获数据的方法提供了以下优点,优于标准的初始快照过程:
-
You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)
-
如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在进程恢复后,快照将从停止点开始,而不是从头重新捕获集合。
-
您可以随时按需运行增量快照,并根据需要重复此过程以适应数据库更新。例如,在修改连接器配置以向其
collection.include.list属性添加集合后,您可能需要重新运行快照。
运行增量快照时,Debezium 按主键对每个集合进行排序,然后根据配置的块大小将集合分成块。然后,它逐块捕获每个集合行。对于其捕获的每一行,快照都会发出一个 READ 事件。该事件代表快照开始时该行的值。
随着快照的进行,其他进程很可能继续访问数据库,可能会修改集合记录。为了反映这些更改,INSERT、UPDATE 或 DELETE 操作会按常规提交到事务日志。同样,正在进行的 Debezium 流式传输过程继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
在某些情况下,流式传输过程发出的 UPDATE 或 DELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前,发出修改集合行的事件。当快照最终发出该行的相应 READ 事件时,其值已经过时。为确保乱序的增量快照事件按正确的逻辑顺序处理,Debezium 采用了一种缓冲方案来解决冲突。只有在解决快照事件与流式事件之间的冲突后,Debezium 才会将事件记录发送到 Kafka。
为帮助解决乱序到达的 READ 事件与修改同一集合行的流式事件之间的冲突,Debezium 采用了一种称为快照窗口的机制。快照窗口标定了增量快照为指定集合块捕获数据的时间间隔。在特定块的快照窗口打开之前,Debezium 会遵循其常规行为,直接将事务日志中的事件向下游发送到目标 Kafka 主题。但是,从特定块的快照打开的那一刻直到它关闭,Debezium 会执行去重步骤来解决具有相同主键的事件之间的冲突。
For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATE 或 DELETE 操作。)
当快照窗口打开,Debezium 开始处理快照块时,它会将快照记录发送到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入流式事件的主键进行比较。如果没有找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上优先于静态快照事件。当块的快照窗口关闭后,缓冲区中仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。
The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)
|
要使 Debezium 能够执行增量快照,您必须授予连接器向信号集合写入的权限。 Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDB、MySQL 或 PostgreSQL)来说,写入权限是可选的。) |
Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)
|
增量快照要求每个表的主键稳定排序。由于 有关 MongoDB BSON 字符串类型的更多信息,请参阅MongoDB 文档)。 |
要将增量快照与分片 MongoDB 集群一起使用,您必须将 incremental.snapshot.chunk.size 设置为一个足够大的值,以补偿更改流管道的复杂性增加。
触发增量快照
要启动增量快照,您可以向源数据库上的信号集合发送临时快照信号。
您可以使用 MongoDB insert() 方法将信号提交到信号集合。
在 Debezium 检测到信号集合中的更改后,它会读取信号并运行请求的快照操作。
您提交的查询指定了要包含在快照中的集合,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是 incremental 和 blocking。
要指定要包含在快照中的集合,请提供一个 data-collections 数组,列出集合或用于匹配集合的正则表达式数组,例如:
{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要执行任何操作,并且不会执行快照。
|
如果要在快照中包含的集合名称在数据库、模式或表名称中包含点 ( |
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
将快照信号文档插入信号集合
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>, "additional-conditions" : [{"data-collections" : "<collectionName>", "filter" : "<additional-condition>"}] }});For example, (例如,)
db.debeziumSignal.insert({ (1) "type" : "execute-snapshot", (2) (3) "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4) "type": "incremental"} (5) "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6) });命令中的
id、type和data参数的值对应于信号集合的字段。The following table describes the parameters in the example (下表描述了示例中的参数:)
表 3. 用于向信号集合发送增量快照信号的 MongoDB insert() 命令中字段的描述 Item Value (值) 描述 1
db.debeziumSignal指定源数据库上的信号集合的完全限定名称。
2
null
_id参数指定一个任意字符串,该字符串被分配为信号请求的id标识符。
前面的示例中的 insert 方法省略了可选的_id参数。因为文档没有显式为参数分配值,所以 MongoDB 自动为文档分配的任意 ID 成为信号请求的id标识符。
使用此字符串标识日志消息到信号集合的条目。Debezium 不使用此标识符字符串。相反,在快照期间,Debezium 会生成自己的id字符串作为水印信号。3
execute-snapshotSpecifies
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)信号的
data字段的必需组件,该信号指定一个集合名称数组或用于匹配要包含在快照中的集合名称的正则表达式。
数组列出了与集合的完全限定名称匹配的正则表达式,其格式与在signal.data.collection配置属性中指定连接器信号集合名称的格式相同。5
incremental (增量)An optional
typecomponent of thedatafield of a signal that specifies the type of snapshot operation to run. (信号的data字段的可选type组件,指定要运行的快照操作的类型。)
目前支持incremental和blocking类型。
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)6
additional-conditions (附加条件)An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
additional-conditions数组中的每个元素都是一个对象,其中包含以下键:data-collection:: 将要应用过滤器的数据库集合的完全限定名称。filter:: 指定数据集合记录中必须存在的列值才能包含在快照中,例如,"color='blue'"。
The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"ts_us":"1620393591654962",
"ts_ns":"1620393591654962147",
"transaction":null
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。) |
2 |
|
Specifies the event type. (指定事件类型。) |
使用 Kafka 信号通道触发增量快照
You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports the |
|
N/A |
An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。) |
|
N/A |
An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。) |
execute-snapshot Kafka 消息Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Debezium 使用 additional-conditions 字段来选择集合内容的一个子集。
Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)
SELECT * FROM <tableName> ….
When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collection 和 filter 参数将被附加到 SQL 查询中,例如:)
SELECT * FROM <data-collection> WHERE <filter> ….
例如,给定一个具有 id(主键)、color 和 brand 列的 products 集合,如果您希望快照仅包含 color='blue' 的内容,则在请求快照时,可以添加 additional-conditions 属性来过滤内容::leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
您还可以使用 additional-conditions 属性传递基于多个列的条件。例如,使用与前一个示例相同的 products 集合,如果您希望快照仅包含 products 集合中 color='blue' 和 brand='MyBrand' 的内容,则可以发送以下请求::leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
停止增量快照
在某些情况下,可能需要停止增量快照。例如,您可能意识到快照配置不正确,或者您可能希望确保资源可用于其他数据库操作。您可以通过向源数据库上的集合发送信号来停止正在进行的快照。
您通过将停止快照信号文档插入信号集合来将停止快照信号提交到信号集合。您提交的停止快照信号将快照操作的 type 指定为 incremental,并且可以选择指定要从当前正在运行的快照中排除的集合。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在进行中停止增量快照操作。
您还可以通过向Kafka 信号主题发送 JSON 消息来停止增量快照。
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
将停止快照信号文档插入信号集合
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});For example, (例如,)
db.debeziumSignal.insert({ (1) "type" : "stop-snapshot", (2) (3) "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4) "type": "incremental"} (5) });信号命令中的
id、type和data参数的值对应于信号集合的字段。The following table describes the parameters in the example (下表描述了示例中的参数:)
表 5. 用于将停止增量快照文档发送到信号集合的 insert 命令中字段的描述 Item Value (值) 描述 1
db.debeziumSignal指定源数据库上的信号集合的完全限定名称。
2
null
前面的示例中的 insert 方法省略了可选的
_id参数。因为文档没有显式为参数分配值,所以 MongoDB 自动为文档分配的任意 ID 成为信号请求的id标识符。
使用此字符串标识日志消息到信号集合的条目。Debezium 不使用此标识符字符串。3
stop-snapshotThe
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)信号的
data字段的可选组件,用于指定集合名称数组或用于匹配要从快照中删除的集合名称的正则表达式。
数组列出了与完全限定名称格式为database.collection的集合匹配的正则表达式。如果从
data字段中省略data-collections数组,则信号将停止正在进行的整个增量快照。5
incremental (增量)A required component of the
datafield of a signal that specifies the type of snapshot operation to be stopped. (信号的data字段的必需组件,指定要停止的快照操作的类型。)
Currently, the only valid option isincremental. (目前,唯一有效选项是incremental。)
If you do not specify atypevalue, the signal fails to stop the incremental snapshot. (如果您不指定type值,信号将无法停止增量快照。)
使用 Kafka 信号通道停止增量快照
You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports only the |
|
N/A |
一个可选的逗号分隔的正则表达式数组,这些正则表达式匹配表(集合名称)的完全限定名称,或用于匹配要从快照中删除的集合名称的正则表达式数组。 |
The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Custom snapshotter SPI (自定义快照器 SPI)
For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)
io.debezium.snapshot.spi.Snapshotter-
Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)
io.debezium.snapshot.spi.SnapshotQuery-
Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)
io.debezium.snapshot.spi.SnapshotLock-
Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)
io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)/**
* {@link Snapshotter} is used to determine the following details about the snapshot process:
* <p>
* - Whether a snapshot occurs. <br>
* - Whether streaming continues during the snapshot. <br>
* - Whether the snapshot includes schema (if supported). <br>
* - Whether to snapshot data or schema following an error.
* <p>
* Although Debezium provides many default snapshot modes,
* to provide more advanced functionality, such as partial snapshots,
* you can customize implementation of the interface.
* For more information, see the documentation.
*
*
*
*/
@Incubating
public interface Snapshotter extends Configurable {
/**
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a data snapshot
*/
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a schema snapshot
*/
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called after the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called after the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}
io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)/**
* {@link SnapshotQuery} is used to determine the query used during a data snapshot
*
*
*/
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}
io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)/**
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
*
*/
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);
}
阻塞快照
To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)
A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)
You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)
-
您添加了一个新集合,并且希望在连接器运行时完成快照。
-
您添加了一个大型集合,并且希望快照的完成时间比增量快照可能用的时间短。
运行阻塞快照时,Debezium 会停止流式传输,然后初始化指定集合的快照,遵循初始快照过程中使用的相同过程。快照完成后,流式传输将恢复。
You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)
-
data-collections: 指定哪些集合必须进行快照。
-
data-collections: 指定您希望快照包含的集合。
此属性接受逗号分隔的正则表达式列表,这些列表匹配完全限定的集合名称。此属性的行为类似于table.include.list属性,该属性指定了在阻塞快照中要捕获的表。 -
additional-conditions: 您可以为不同的集合指定不同的过滤器。
-
data-collection属性是过滤器将应用的集合的完全限定名称,并且可能区分大小写或不区分大小写,具体取决于数据库。 -
filter属性将具有与snapshot.select.statement.overrides相同的绝对值,即需要区分大小写匹配的集合的完全限定名称。
-
For example (例如:)
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)
流式传输更改
在副本集连接器的任务记录偏移量后,它会使用该偏移量来确定 oplog 中的位置,从该位置开始流式传输更改。然后,任务(根据配置)连接到副本集的主节点或连接到整个副本集的更改流,并从该位置开始流式传输更改。它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。每个更改事件都包含操作在 oplog 中的位置,连接器会定期将其记录为最新的偏移量。偏移量记录的间隔由 offset.flush.interval.ms 控制,这是一个 Kafka Connect 工作进程配置属性。
当连接器正常停止时,最后一个处理的偏移量会被记录下来,以便在重新启动时,连接器能够从上次中断的地方继续。但是,如果连接器的任务意外终止,那么任务可能在它最后一次记录偏移量但在最后一次偏移量被记录之前就已经处理并生成了事件;重新启动时,连接器将从最后一个记录的偏移量开始,可能会生成一些与崩溃前不久生成的事件相同的事件。
|
当 Kafka 管道中的所有组件正常运行时,Kafka 消费者会收到每条消息恰好一次。然而,当出现问题时,Kafka 只能保证消费者收到每条消息至少一次。为避免意外结果,消费者必须能够处理重复的消息。 |
如前所述,连接器任务始终使用副本集的主节点从 oplog 流式传输更改,这确保了连接器尽可能看到最新的操作,并且可以比使用从节点捕获更改具有更低的延迟。当副本集选举新的主节点时,连接器会立即停止流式传输更改,连接到新的主节点,并从新主节点开始以相同的位置流式传输更改。同样,如果连接器在与副本集成员通信时遇到任何问题,它会尝试重新连接(通过使用指数退避,以免使副本集过载),一旦连接成功,它将继续从上次中断的地方流式传输更改。通过这种方式,连接器能够动态地适应副本集成员的变化并自动处理通信故障。
总而言之,MongoDB 连接器在大多数情况下会继续运行。通信问题可能会导致连接器等待问题解决。
预映像支持
在 MongoDB 6.0 及更高版本中,您可以配置更改流以发出文档的预映像状态,以填充 MongoDB 更改事件的 before 字段。要启用 MongoDB 中预映像的使用,您必须使用 db.createCollection()、create 或 collMod 为集合设置 changeStreamPreAndPostImages。要使 Debezium MongoDB 包含更改事件中的预映像,请将连接器的 capture.mode 设置为 *_with_pre_image 选项之一。
|
MongoDB 更改流事件大小限制
MongoDB 更改流事件的大小限制为 16MB。因此,使用预映像会增加超过此阈值的可能性,这可能导致失败。有关如何避免超出更改流限制的信息,请参阅MongoDB 文档。 |
主题名称
MongoDB 连接器将每个集合中所有插入、更新和删除操作的事件写入单个 Kafka 主题。Kafka 主题的名称始终采用 logicalName.databaseName.collectionName 的形式,其中 logicalName 是连接器通过 topic.prefix 配置属性指定的逻辑名称,databaseName 是操作发生的数据库名称,collectionName 是受影响文档所在的 MongoDB 集合的名称。
例如,假设有一个 MongoDB 副本集,其中有一个 inventory 数据库,包含四个集合:products、products_on_hand、customers 和 orders。如果监视此数据库的连接器被赋予逻辑名称 fulfillment,则连接器将在以下四个 Kafka 主题上生成事件:
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,分片集合(其中每个分片包含集合文档的子集)的所有更改都会进入同一个 Kafka 主题。
您可以将 Kafka 设置为按需自动创建主题。如果未设置,则必须在启动连接器之前使用 Kafka 管理工具创建主题。
分区
MongoDB 连接器未明确确定如何为事件分区主题。相反,它允许 Kafka 根据事件键来确定如何分区主题。您可以通过在 Kafka Connect 工作进程配置中定义 Partitioner 实现的名称来更改 Kafka 的分区逻辑。
Kafka 仅对写入单个主题分区的事件保持总顺序。按键对事件进行分区意味着具有相同键的所有事件始终进入同一个分区。这确保了特定文档的所有事件始终按总顺序排列。
事务元数据
Debezium 可以生成表示事务元数据边界的事件,并丰富更改数据事件消息。
|
Debezium 接收事务元数据的限制
Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
对于每个事务 BEGIN 和 END,Debezium 会生成一个包含以下字段的事件:
status-
BEGIN或END id-
唯一事务标识符的字符串表示。
event_count(针对END事件)-
事务发出的事件总数。
data_collections(针对END事件)-
一个包含
data_collection和event_count对的数组,提供来自给定数据集合的更改所发出的事件数量。
以下示例显示了一个典型消息:
{
"status": "BEGIN",
"id": "1462833718356672513",
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "1462833718356672513",
"event_count": 2,
"data_collections": [
{
"data_collection": "rs0.testDB.collectiona",
"event_count": 1
},
{
"data_collection": "rs0.testDB.collectionb",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项进行覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。
启用事务元数据后,数据消息 Envelope 将会添加一个 transaction 字段。此字段提供有关每个事件的信息,形式为字段的组合:
id-
唯一事务标识符的字符串表示。
total_order-
事件在事务生成的所有事件中的绝对位置。
data_collection_order-
事件在事务发出的所有事件中,每个数据集合的位置。
以下是消息外观的示例:
{
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335486",
"ts_ns": "1580390884335486281",
"transaction": {
"id": "1462833718356672513",
"total_order": "1",
"data_collection_order": "1"
}
}
数据更改事件
Debezium MongoDB 连接器为每个文档级别的插入、更新或删除数据的操作生成数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于发生更改的集合。
Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。
以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器将更改事件记录流式传输到与事件的源集合同名的主题。请参阅主题名称。
|
MongoDB 连接器确保所有 Kafka Connect Schema 名称都遵循 Avro Schema 名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,则会将其替换为下划线字符。 如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且区分名称的唯一字符被替换为下划线,则可能会导致意外冲突。 |
更改事件键
更改事件的键包含已更改文档键的 Schema 和已更改文档的实际键。对于给定集合,Schema 及其相应的 payload 都包含一个 id 字段。此字段的值是从MongoDB 扩展 JSON 序列化严格模式派生的字符串表示的文档标识符。
考虑一个逻辑名称为 fulfillment 的连接器,一个包含 inventory 数据库的副本集,以及一个包含如下文档的 customers 集合。
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
捕获 customers 集合更改的每个更改事件都具有相同的事件键 Schema。只要 customers 集合具有之前的定义,捕获 customers 集合更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:
{
"schema": { (1)
"type": "struct",
"name": "fulfillment.inventory.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"field": "id",
"type": "string",
"optional": false
}
]
},
"payload": { (5)
"id": "1004"
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
定义键 payload 结构的 Schema 名称。此 Schema 描述了已更改文档的键的结构。键 Schema 名称的格式为 connector-name.database-name.collection-name.
|
3 |
|
指示事件键的 |
4 |
|
指定 |
5 |
|
包含生成此更改事件的文档的键。在此示例中,键包含一个类型为 |
此示例使用具有整数标识符的文档,但任何有效的 MongoDB 文档标识符都可以工作,包括文档标识符。对于文档标识符,事件键的 payload.id 值是一个字符串,表示更新文档的原始 _id 字段,作为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了不同类型的 _id 字段如何表示的示例。
| Type | MongoDB _id 值 |
键的 payload |
|---|---|---|
整数 |
1234 |
|
浮点数 |
12.34 |
|
String |
"1234" |
|
文档 |
|
|
ObjectId |
|
|
二进制 |
|
|
更改事件值
更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。
考虑用于显示更改事件键的示例文档。
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
对示例 customers 集合中的文档进行更新的更改事件的值部分,根据事件类型而定。
create 事件
以下示例显示了连接器为在 customers 集合中创建数据的操作生成的更改事件的value 部分。
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json", (2)
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope" (4)
},
"payload": { (5)
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", (6)
"source": { (7)
"version": "3.3.0.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_ms": 1558965508000000,
"ts_ms": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c", (8)
"ts_ms": 1558965515240, (9)
"ts_us": 1558965515240142, (10)
"ts_ns": 1558965515240142879, (11)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述值 payload 结构的 value Schema。更改事件的 value Schema 在连接器为特定集合生成的每个更改事件中都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 事件的 JSON 表示可能比它们描述的文档大得多。这是因为 JSON 表示必须包括消息的 Schema 和 payload 部分。但是,通过使用Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 |
|
一个可选字段,指定事件发生后文档的状态。在此示例中, |
7 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
8 |
|
描述导致连接器生成事件的操作类型的必需字符串。在此示例中,
|
9 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
10 |
|
一个可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
9 |
|
一个可选字段,显示连接器处理事件的时间(以纳秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
update 事件
更改流捕获模式
示例 customers 集合中更新的更改事件的值具有与该集合的create 事件相同的 Schema。同样,事件 value 的 payload 结构也相同。但是,update 事件中的事件 value payload 包含不同的值。如果 capture.mode 选项设置为 change_streams_update_full,则 update 事件仅包含 after 值。如果 capture.mode 选项设置为 *_with_pre_image 选项之一,则提供 before 值。在这种情况下,有一个新的结构化字段 updateDescription 包含一些附加字段:
-
updatedFields是一个字符串字段,包含已更新文档字段及其值的 JSON 表示。 -
removedFields是从文档中删除的字段名称列表。 -
truncatedArrays是文档中被截断的数组列表。
这是连接器为 customers 集合中的更新生成的事件中的更改事件 value 的示例:
{
"schema": { ... },
"payload": {
"op": "u", (1)
"ts_ms": 1465491461815, (2)
"ts_us": 1465491461815698, (2)
"ts_ns": 1465491461815698142, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (3)
"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (4)
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Anne Marie\"}", (5)
"truncatedArrays": null
},
"source": { (6)
"version": "3.3.0.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1,
"h": null,
"tord": null,
"stxnid": null,
"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
"txnNumber":1
}
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述导致连接器生成事件的操作类型的必需字符串。在此示例中, |
2 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
包含更改前实际 MongoDB 文档的 JSON 字符串表示。 如果捕获模式未设置为 |
4 |
|
包含实际 MongoDB 文档的 JSON 字符串表示。 |
5 |
|
包含文档已更新字段值的 JSON 字符串表示。在此示例中,更新将 |
6 |
|
描述事件源元数据的必需字段。此字段包含与同一集合的create 事件相同的信息,但值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
|
事件中的 如果您的应用程序依赖于渐进式变更演变,则应仅依赖 |
delete 事件
delete 更改事件中的值具有与同一集合的create 和update 事件相同的 schema 部分。delete 事件中的 payload 部分包含与同一集合的create 和update 事件不同的值。特别是,delete 事件不包含 after 值,也不包含 updateDescription 值。以下是 customers 集合中文档的delete 事件示例:
{
"schema": { ... },
"payload": {
"op": "d", (1)
"ts_ms": 1465495462115, (2)
"ts_us": 1465495462115748, (2)
"ts_ns": 1465495462115748263, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(3)
"source": { (4)
"version": "3.3.0.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述操作类型的必需字符串。 |
2 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
包含更改前实际 MongoDB 文档的 JSON 字符串表示。 如果捕获模式未设置为 |
4 |
|
描述事件源元数据的必需字段。此字段包含与同一集合的create 或update 事件相同的信息,但值不同,因为此事件来自 oplog 中的不同位置。源元数据包括:
|
MongoDB 连接器事件旨在与Kafka 日志压缩配合使用。日志压缩允许删除一些旧消息,只要保留每个键的至少最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。
设置 MongoDB
MongoDB 连接器使用 MongoDB 的更改流来捕获更改,因此连接器仅适用于 MongoDB 副本集或每个分片都是独立副本集的分片集群。有关设置副本集或分片集群的信息,请参阅 MongoDB 文档。另外,请务必了解如何与副本集一起启用访问控制和身份验证。
您还必须拥有一个具有读取 oplog 的 admin 数据库的适当角色的 MongoDB 用户。此外,在分片集群的配置服务器中,用户还必须能够读取 config 数据库,并具有 listDatabases 权限操作。当使用更改流(默认)时,用户还必须具有集群范围的权限操作 find 和 changeStream。
当您打算使用预映像并填充 before 字段时,您需要首先使用 db.createCollection()、create 或 collMod 为集合启用 changeStreamPreAndPostImages。
云中的 MongoDB
您可以将 Debezium for MongoDB 连接器与MongoDB Atlas 一起使用。请注意,MongoDB Atlas 仅支持通过 SSL 进行安全连接,即连接器选项+mongodb.ssl.enabled 必须设置为 true。
最佳 Oplog 配置
Debezium MongoDB 连接器读取更改流以获取副本集的 oplog 数据。由于 oplog 是一个固定大小的、有限的集合,如果它超过其配置的最大大小,它将开始覆盖其最旧的条目。如果连接器因任何原因停止,当它重新启动时,它会尝试从最后一个 oplog 流位置恢复。然而,如果最后一个流位置已从 oplog 中删除,则根据连接器的 snapshot.mode 属性中指定的值,连接器可能会因报告无效恢复令牌错误而无法启动。发生故障时,您必须创建一个新连接器才能使 Debezium 继续从数据库捕获记录。有关更多信息,请参阅如果 snapshot.mode 设置为 initial,则连接器在长时间停止后失败。
为确保 oplog 保留 Debezium 恢复流式传输所需的偏移量值,您可以使用以下任一方法:
-
增加 oplog 的大小。根据您的典型工作负载,将 oplog 大小设置为大于每小时 oplog 条目峰值数量的值。
-
增加 oplog 条目保留的最短小时数(MongoDB 4.4 及更高版本)。此设置是基于时间的,因此即使 oplog 达到其配置的最大大小,最后 n 小时内的条目也保证可用。虽然这通常是首选选项,但对于接近容量的高工作负载集群,请指定最大 oplog 大小。
为帮助防止与 oplog 条目丢失相关的故障,跟踪报告复制行为的指标并优化 oplog 大小以支持 Debezium 非常重要。特别是,您应该监视 Oplog GB/Hour 和 Replication Oplog Window 的值。如果 Debezium 离线的时间超过复制 oplog 窗口的值,并且主 oplog 的增长速度快于 Debezium 消耗条目的速度,则可能会导致连接器故障。
有关如何监视这些指标的信息,请参阅MongoDB 文档。
最好将最大 oplog 大小设置为基于 oplog 的预期每小时增长量(Oplog GB/Hour)乘以解决 Debezium 故障可能需要的时间。
即,
Oplog GB/Hour X 解决 Debezium 故障的平均反应时间
例如,如果 oplog 大小限制设置为 1GB,并且 oplog 每小时增长 3GB,则 oplog 条目每小时会被清除三次。如果 Debezium 在此期间失败,其最后一个 oplog 位置很可能会被删除。
如果 oplog 以 3GB/小时的速度增长,并且 Debezium 离线两小时,则应将 oplog 大小设置为 3GB/小时 X 2 小时,即 6GB。
部署
要部署 Debezium MongoDB 连接器,您需要安装 Debezium MongoDB 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
-
已安装 Apache Kafka 和 Kafka Connect。
-
MongoDB 已安装并设置为与 Debezium 连接器协同工作。
-
下载连接器的插件存档。
-
将 JAR 文件提取到您的 Kafka Connect 环境中。
-
将包含 JAR 文件的目录添加到Kafka Connect 的
plugin.path。 -
重新启动您的 Kafka Connect 进程以加载新 JAR 文件。
如果您使用的是不可变容器,请参阅Debezium 的容器镜像,其中包含 Apache Kafka 和 Kafka Connect,并已预装 MongoDB 连接器。
|
从 |
Debezium 的教程将引导您使用这些镜像,这是一个学习 Debezium 的好方法。
MongoDB 连接器配置示例
以下是一个连接器实例的配置示例,该实例从端口 27017 上的 MongoDB 副本集 rs0(位于 192.168.99.100)捕获数据,并将其逻辑命名为 fullfillment。通常,您通过设置连接器可用的配置属性来在 JSON 文件中配置 Debezium MongoDB 连接器。
您可以选择为特定的 MongoDB 副本集或分片集群生成事件。也可以选择性地过滤掉不需要的集合。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", (2)
"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", (3)
"topic.prefix": "fullfillment", (4)
"collection.include.list": "inventory[.]*" (5)
}
}
| 1 | 我们在将其注册到 Kafka Connect 服务时使用的连接器名称。 |
| 2 | MongoDB 连接器类的名称。 |
| 3 | 用于连接 MongoDB 副本集的连接字符串。 |
| 4 | MongoDB 副本集的逻辑名称,它构成事件的命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect Schema 名称以及使用 Avro 转换器时的相应 Avro Schema 的命名空间。 |
| 5 | 一个正则表达式列表,用于匹配要监视的所有集合的集合命名空间(例如,<dbName>.<collectionName>)。此项是可选的。 |
有关您可以为 Debezium MongoDB 连接器设置的完整配置属性列表,请参阅MongoDB 连接器配置属性。
您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个连接器任务,该任务执行以下操作:
-
连接到 MongoDB 副本集或分片集群。
-
为每个副本集分配任务。
-
执行快照(如果需要)。
-
读取更改流。
-
将更改事件记录流式传输到 Kafka 主题。
添加连接器配置
要开始运行 Debezium MongoDB 连接器,请创建连接器配置,并将该配置添加到您的 Kafka Connect 集群。
-
Debezium MongoDB 连接器已安装。
-
创建 MongoDB 连接器的配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
连接器启动后,它将执行以下操作:
-
读取副本集的更改流。
-
为每个插入、更新和删除的文档生成更改事件。
-
将更改事件记录流式传输到 Kafka 主题。
连接器属性
Debezium MongoDB 连接器有许多配置属性,您可以使用它们来实现适合您的应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:
以下配置属性是必需的,除非有默认值可用。
必需的 Debezium MongoDB 连接器配置属性
| 属性 | Default (默认值) | 描述 | ||||||
|---|---|---|---|---|---|---|---|---|
false |
将此属性设置为
|
|||||||
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。) |
|||||||
无默认值 |
连接器的 Java 类名。对于 MongoDB 连接器,请始终使用 |
|||||||
无默认值 |
指定连接器用于连接 MongoDB 副本集的连接字符串。此属性替换了 MongoDB 连接器早期版本中提供的 |
|||||||
无默认值 |
一个唯一的名称,用于标识连接器和/或 Debezium 连接器正在监视的 MongoDB 副本集或分片集群。每个服务器最多应由一个 Debezium 连接器监视,因为此服务器名称将用作 MongoDB 副本集或集群发出的所有持久化 Kafka 主题的前缀。请仅使用字母数字字符、连字符、点和下划线来构成名称。此逻辑名称在所有其他连接器中都应该是唯一的,因为它用作接收来自此连接器记录的 Kafka 主题的命名前缀。
|
|||||||
DefaultMongoDbAuthProvider |
一个完整的 Java 类名,它是 io.debezium.connector.mongodb.connection.MongoDbAuthProvider 接口的实现。此类负责在 MongoDB 连接上设置凭据(在每次应用启动时调用)。默认行为根据每个属性的文档使用 |
|||||||
无默认值 |
在使用默认 |
|||||||
无默认值 |
在使用默认 |
|||||||
|
在使用默认 |
|||||||
|
连接器将使用 SSL 连接到 MongoDB 实例。 |
|||||||
|
启用 SSL 时,此设置控制在连接阶段是否禁用严格的主机名检查。如果设置为 |
|||||||
regex |
基于包含/排除的数据库和集合名称匹配事件的模式。将此属性设置为以下值之一:
|
|||||||
空字符串 |
一个可选的逗号分隔列表,包含正则表达式或字面量,用于匹配要监视的数据库名称。默认情况下,所有数据库都将被监视。 要匹配数据库名称,Debezium 会根据
如果您在此配置中包含此属性,请不要同时设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含正则表达式或字面量,用于匹配要从监视中排除的数据库名称。当设置了 要匹配数据库名称,Debezium 会根据
如果在此配置中包含此属性,请不要设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含正则表达式或字面量,用于匹配要监视的 MongoDB 集合的完全限定命名空间。默认情况下,连接器监视除 要匹配命名空间的名称,Debezium 会根据
如果在此配置中包含此属性,请不要同时设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含正则表达式或字面量,用于匹配要从监视中排除的 MongoDB 集合的完全限定命名空间。当设置了 要匹配命名空间的名称,Debezium 会根据
如果在此配置中包含此属性,请不要设置 |
|||||||
|
指定连接器用于从 MongoDB 服务器捕获
|
|||||||
指定 startAtOperationTime 更改流参数。该值应为 Bson 时间戳的长整型表示。 |
||||||||
|
指定连接器打开更改流的范围。将此属性设置为以下值之一:
|
|||||||
指定连接器监视更改的数据库。仅当 |
||||||||
空字符串 |
一个可选的逗号分隔列表,包含应从更改事件消息值中排除的字段的完全限定名称。字段的完全限定名称格式为 databaseName.collectionName.fieldName.nestedFieldName,其中 databaseName 和 collectionName 可以包含通配符(*),匹配任何字符。 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含应用于重命名更改事件消息值中字段的完全限定重命名。字段的完全限定重命名格式为 databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName,其中 databaseName 和 collectionName 可以包含通配符(*),匹配任何字符,冒号(:)用于确定字段的重命名映射。下一个字段重命名将应用于列表中前一个字段重命名的结果,因此在重命名同一路径下的多个字段时请牢记这一点。 |
|||||||
|
控制是否在*delete* 事件后跟一个墓碑事件。 |
|||||||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||||||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
以下高级配置属性具有可以满足大多数情况的良好默认值,因此很少需要在连接器的配置中指定。
Debezium MongoDB 连接器高级配置属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
指定当
要将此选项与 MongoDB 更改流集合一起使用,您必须将该集合配置为返回文档的预镜像和后镜像。操作的预镜像和后镜像仅在所需配置在操作发生之前已到位的情况下才可用。 将此属性设置为以下值之一:
|
|||
|
正整数值,指定在此连接器的每次迭代中处理的事件批的最大大小。默认为 2048。 |
|||
|
正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列,然后将其写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。队列中保存的事件在连接器定期记录偏移量时会被忽略。始终将 |
|||
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|||
|
正整数值,指定在发生异常并中止任务之前,尝试连接到副本集主节点的最多次数。默认为 16,这与 |
|||
无默认值 |
一个可选设置,指定密钥库文件的位置。密钥库文件可用于客户端和 MongoDB 服务器之间的双向身份验证。 |
|||
无默认值 |
密钥库文件的密码。仅当配置了 |
|||
无默认值 |
密钥库文件的类型。仅当配置了 |
|||
无默认值 |
用于服务器证书验证的信任库文件的位置。 |
|||
无默认值 |
信任库文件的密码。用于检查信任库的完整性并解锁信任库。仅当配置了 |
|||
无默认值 |
信任库文件的类型。仅当配置了 |
|||
v2 |
CDC 事件中 |
|||
|
控制心跳消息的发送频率。 将此参数设置为 |
|||
|
一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:
如果您不希望连接器跳过任何操作,请将值设置为 |
|||
无默认值 |
控制哪些集合项包含在快照中。此属性仅影响快照。以 databaseName.collectionName 的形式指定一个逗号分隔的集合名称列表。 对于您指定的每个集合,还应指定另一个配置属性: |
|||
无默认值 |
连接器在启动后等待多长时间(以毫秒为单位)进行快照。 |
|||
0 |
指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 |
|||
|
指定在快照期间一次从每个集合读取的最大文档数。连接器将以该大小的多个批次读取集合内容。 |
|||
|
一个可选的、逗号分隔的正则表达式列表,用于匹配您希望包含在快照中的模式的完全限定名称(<databaseName>.<collectionName>)。指定的项必须在连接器的 为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。 |
|||
|
正整数值,指定执行副本集中集合的初始同步所使用的最大线程数。默认为 1。 |
|||
initial (初始) |
指定连接器启动时执行快照的标准。将属性设置为以下值之一:
有关更多信息,请参阅自定义快照程序 SPI。 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
无默认值 |
如果 |
|||
|
设置为 有关更多详细信息,请参阅事务元数据。 |
|||
10000 (10 秒) |
在发生可重试错误后重启连接器的等待时间(以毫秒为单位)。 |
|||
|
连接器轮询新的、已删除的或更改的副本集的时间间隔。 |
|||
10000 (10 秒) |
驱动程序在中止新连接尝试之前等待的毫秒数。 |
|||
10000 (10 秒) |
集群监视器尝试访问每个服务器的频率。 |
|||
0 |
发送/接收套接字在超时之前可以花费的毫秒数。值为 |
|||
30000 (30 秒) |
驱动程序在选择服务器之前等待的毫秒数,之后将超时并引发错误。 |
|||
无默认值 |
在流式传输更改时,此设置将更改流事件的处理作为标准 MongoDB 聚合流管道的一部分应用。管道是由指令组成的 MongoDB 聚合管道,用于指示数据库筛选或转换数据。这可用于自定义连接器消耗的数据。此属性的值必须是 JSON 格式的允许的 聚合管道阶段 数组。请注意,这会附加在用于支持连接器的内部管道之后(例如,筛选操作类型、数据库名称、集合名称等)。 |
|||
internal_first |
用于构造有效 MongoDB 聚合流管道的顺序。将属性设置为以下值之一:
|
|||
fail |
用于处理超过指定 BSON 大小的文档的更改事件的策略。将属性设置为以下值之一:
|
|||
0 |
已存储文档的最大允许大小(以字节为单位),在此大小内会处理更改事件。这包括数据库操作之前和之后的大小,更具体地说,它限制了 MongoDB 更改事件的 fullDocument 和 fullDocumentBeforeChange 字段的大小。 |
|||
|
指定 oplog/更改流游标等待服务器生成结果的最大毫秒数,否则将导致执行超时异常。值为 |
|||
无默认值 |
用于将信号发送到连接器的数据集合的完全限定名称。使用以下格式指定集合名称: |
|||
source (源) |
为连接器启用的信号通道名称列表。默认情况下,以下通道可用:
|
|||
无默认值 |
为连接器启用的通知通道名称列表。默认情况下,以下通道可用:
|
|||
|
连接器在增量快照块期间获取并加载到内存中的最大文档数。增加块大小可提高效率,因为快照执行的快照查询次数更少但规模更大。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。 |
|||
|
指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
|
|||
|
用于确定数据更改、模式更改、事务、心跳事件等的 Kafka 主题名称的 TopicNamingStrategy 类的名称,默认为 |
|||
|
指定主题名称的分隔符,默认为 |
|||
|
用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。 |
|||
|
控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下: |
|||
|
控制连接器发送事务元数据消息的主题的名称。主题名称的模式为: |
|||
|
定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如: 连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称。 |
|||
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|||
|
指定连接器可以捕获的最大集合数量。超过此限制将触发 |
|||
|
指定当连接器捕获的集合数量超过
|
|||
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
用于配置 MongoDB 连接器如何与 Kafka 信号主题交互的透传属性
Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。
下表描述了 Kafka signal 属性。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
<topic.prefix>-signal |
连接器监视的用于临时信号的 Kafka 主题的名称。
|
|||
kafka-signal |
Kafka 消费者使用的组 ID 的名称。 |
|||
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 |
|||
|
一个整数值,指定连接器在轮询信号时等待的最大毫秒数。 |
用于配置 MongoDB 连接器接收器通知通道的透传属性
下表描述了可用于配置 Debezium sink notification 通道的属性。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
接收来自 Debezium 通知的主题名称。当您将 |
监控
除了 Kafka 和 Kafka Connect 的内置 JMX 指标支持外,Debezium MongoDB 连接器还有两种指标类型。
有关如何使用 JMX 公开这些指标的详细信息,请参阅 Debezium 监控文档。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,MongoDB 连接器为流式传输指标使用以下 MBean 名称:
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
快照指标
MBean 是 debezium.mongodb:type=connector-metrics,context=snapshot,server=<topic.prefix>,task=<task.id>。
除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。
下表列出了可用的快照指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个快照事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。 |
|
|
包含在快照中的表总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否已启动。 |
|
|
快照是否已暂停。 |
|
|
快照是否被中止。 |
|
|
快照是否已完成。 |
|
|
快照是否被跳过。 |
|
|
快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。 |
|
|
快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。 |
|
|
包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。 |
|
|
队列的最大缓冲区(以字节为单位)。如果 |
|
|
队列中记录的当前字节卷。 |
Debezium MongoDB 连接器还提供以下自定义快照指标:
| Attribute | Type | 描述 |
|---|---|---|
|
|
数据库断开连接数。 |
流式传输指标
MBean 是 debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,task=<task.id>。
下表列出了可用的流式传输指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个流式传输事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 |
|
|
自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。 |
|
|
自上次启动或重置指标以来,连接器处理的总创建事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总更新事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总删除事件数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
收到的最后一个事件的坐标。 |
|
|
已处理的最后一个事务的事务标识符。 |
|
|
队列的最大缓冲区(以字节为单位)。如果 |
|
|
队列中记录的当前字节卷。 |
Debezium MongoDB 连接器还提供以下自定义流式传输指标:
| Attribute | Type | 描述 |
|---|---|---|
|
|
数据库断开连接数。 |
|
|
主节点选举次数。 |
MongoDB 连接器常见问题
Debezium 是一个分布式系统,可以捕获多个上游数据库中的所有更改,并且永远不会遗漏或丢失事件。当系统正常运行并得到仔细管理时,Debezium 会提供对每个更改事件的精确一次传递。
如果发生故障,系统不会丢失任何事件。但是,在从故障中恢复时,它可能会重复某些更改事件。在这种情况下,Debezium 与 Kafka 一样,提供对更改事件的至少一次传递。
本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时会失败,在日志中报告错误或异常,并停止运行:
-
连接器的配置无效。
-
连接器无法使用指定的连接参数成功连接到 MongoDB。
发生故障后,连接器会尝试使用指数退避进行重新连接。您可以配置最大重连尝试次数。
在这些情况下,错误将提供有关问题的更多详细信息,以及可能的建议的解决方案。当配置已更正或 MongoDB 问题已解决后,可以重新启动连接器。
Debezium 运行时 MongoDB 不可用
一旦连接器正在运行,如果任何 MongoDB 副本集的主节点变得不可用或无法访问,连接器将反复尝试重新连接到主节点,使用指数退避以防止网络或服务器过载。如果主节点在可配置的连接尝试次数后仍然不可用,连接器将失败。
重新连接尝试由三个属性控制:
-
connect.backoff.initial.delay.ms- 第一次尝试重新连接之前的延迟,默认为 1 秒(1000 毫秒)。 -
connect.backoff.max.delay.ms- 尝试重新连接之前的最大延迟,默认为 120 秒(120,000 毫秒)。 -
connect.max.attempts- 在产生错误之前允许的最大尝试次数,默认为 16。
每次延迟都是前一次延迟的两倍,最高可达最大延迟。鉴于默认值,下表显示了每次失败连接尝试的延迟以及失败前总累积时间。
| 重连尝试次数 | 尝试之前的延迟(秒) | 尝试之前的总延迟(分钟和秒) |
|---|---|---|
1 |
1 |
00:01 |
2 |
2 |
00:03 |
3 |
4 |
00:07 |
4 |
8 |
00:15 |
5 |
16 |
00:31 |
6 |
32 |
01:03 |
7 |
64 |
02:07 |
8 |
120 |
04:07 |
9 |
120 |
06:07 |
10 |
120 |
08:07 |
11 |
120 |
10:07 |
12 |
120 |
12:07 |
13 |
120 |
14:07 |
14 |
120 |
16:07 |
15 |
120 |
18:07 |
16 |
120 |
20:07 |
连接器无法启动 - InvalidResumeToken 或 ChangeStreamHistoryLost
一个停止了很长时间的连接器无法启动,并报告以下异常:
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
前面的异常表明与连接器的恢复令牌对应的条目已不再存在于 oplog 中。因为 oplog 不再包含连接器处理的最后一个偏移量,所以连接器无法恢复流式传输。
您可以使用以下任一选项从故障中恢复:
-
删除失败的连接器,并创建一个具有相同配置但名称不同的新连接器。
-
暂停连接器,然后删除偏移量,或更改偏移量主题。
为防止与丢失的恢复令牌相关的故障,请优化 oplog 的配置。
Kafka Connect 进程正常停止
如果 Kafka Connect 以分布式模式运行,并且某个 Kafka Connect 进程正常停止,那么在该进程关闭之前,Kafka Connect 会将其所有连接器任务迁移到该组中的另一个 Kafka Connect 进程,并且新的连接器任务将从之前任务中断的地方继续。在连接器任务正常停止并在新进程上重新启动期间会有一个短暂的处理延迟。
如果该组只包含一个进程,并且该进程正常停止,那么 Kafka Connect 将停止连接器,并记录每个副本集的最后一个偏移量。重新启动后,副本集任务将从它们中断的地方继续。
Kafka Connect 进程崩溃
如果 Kafka Connector 进程意外停止,那么它正在运行的任何连接器任务都将在未记录其最近处理的偏移量的情况下终止。当 Kafka Connect 以分布式模式运行时,它会在其他进程上重新启动这些连接器任务。但是,MongoDB 连接器将从早期进程记录的最后一个偏移量恢复,这意味着新的替换任务可能会生成与崩溃前刚刚处理过的更改事件相同的事件。重复事件的数量取决于偏移量刷新周期以及崩溃前不久的数据更改量。
|
由于在故障恢复期间可能出现事件重复,因此消费者应始终预料到某些事件可能会重复。Debezium 更改是幂等的,因此一系列事件总是会产生相同的结果。 Debezium 还会在每个更改事件消息中包含源特定事件来源信息,包括 MongoDB 事件的唯一事务标识符( |
Kafka 不可用
当连接器生成更改事件时,Kafka Connect 框架使用 Kafka producer API 将这些事件记录到 Kafka 中。Kafka Connect 还将定期记录这些更改事件中出现的最新偏移量,频率由您在 Kafka Connect worker 配置中指定。如果 Kafka 代理变得不可用,运行连接器的 Kafka Connect worker 进程将简单地反复尝试重新连接到 Kafka 代理。换句话说,连接器任务将简单地暂停,直到可以重新建立连接,届时连接器将从它们中断的地方继续。
如果 snapshot.mode 设置为 initial,连接器在长时间停止后将失败
如果连接器被正常停止,用户可能仍然在副本集成员上执行操作。连接器离线期间发生的更改将继续记录在 MongoDB 的 oplog 中。在大多数情况下,连接器重新启动后,它会读取 oplog 中的偏移量值,以确定它为每个副本集流式传输的最后一个操作,然后从该点继续流式传输更改。重新启动后,连接器停止时发生的数据库操作将像往常一样发出到 Kafka,并在一段时间后,连接器将追赶上数据库。连接器赶上数据库所需的时间取决于 Kafka 的功能和性能以及数据库中发生的更改量。
然而,如果连接器停止的时间足够长,则可能发生 MongoDB 在连接器不活动期间清除 oplog 的情况,导致连接器最后一个位置的信息丢失。连接器重新启动后,它无法恢复流式传输,因为 oplog 中不再包含标记连接器处理的最后一个操作的先前偏移量值。连接器也无法执行快照,因为它通常会在 snapshot.mode 属性设置为 initial 且没有偏移量值存在时执行。在这种情况下,会出现不匹配,因为 oplog 不包含先前偏移量的值,但该偏移量值存在于连接器的内部 Kafka 偏移量主题中。这将导致错误并使连接器失败。
要从故障中恢复,请删除失败的连接器,并创建一个具有相同配置但名称不同的新连接器。启动新连接器时,它将执行快照以摄取数据库的状态,然后恢复流式传输。