Debezium MongoDB 连接器
- 概述
- MongoDB 连接器的工作原理
- 支持的 MongoDB 拓扑
- 所需的权限
- 逻辑连接器名称 (Logical connector name)
- 偏移量合并 (Offset consolidation)
- 执行快照 (Performing a snapshot)
- Ad hoc snapshots (即席快照)
- Incremental snapshots (增量快照)
- Custom snapshotter SPI (自定义快照器 SPI)
- Blocking snapshots (阻塞快照)
- 流式传输更改
- 预映像支持 (Pre-image support)
- Topic names (主题名称)
- 分区 (Partitions)
- 事务元数据
- Data change events (数据变更事件)
- 设置 MongoDB
- Deployment (部署)
- 监控
- MongoDB 连接器常见问题
Debezium 的 MongoDB 连接器会跟踪 MongoDB 副本集或 MongoDB 分片集群的文档更改,并将这些更改记录为 Kafka 主题中的事件。连接器会自动处理分片集群中分片的添加或删除、每个副本集成员身份的变化、副本集内部的选举以及通信问题的解决。
有关此连接器兼容的 MongoDB 版本的信息,请参阅 Debezium 版本概述。
概述 (Overview)
MongoDB 的复制机制提供了冗余和高可用性,并且是在生产环境中运行 MongoDB 的首选方式。MongoDB 连接器会捕获副本集或分片集群中的更改。
MongoDB 副本集 由一组拥有相同数据副本的服务器组成,复制确保客户端对副本集主节点上文档进行的任何更改都能正确地应用到副本集上的其他服务器,称为从节点。MongoDB 复制的工作方式是,主节点在其oplog(或操作日志)中记录更改,然后每个从节点读取主节点的 oplog 并按顺序将所有操作应用到自己的文档中。当向副本集添加新服务器时,该服务器首先执行主节点上所有数据库和集合的快照,然后读取主节点的 oplog 来应用自开始快照以来可能发生的任何更改。这个新服务器在赶上主节点的 oplog 尾部时,成为一个从节点(并能够处理查询)。
变更流 (Change streams)
尽管 Debezium MongoDB 连接器不属于副本集,但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不直接读取 oplog。相反,它将 oplog 数据的捕获和解码委托给 MongoDB 的变更流功能。通过变更流,MongoDB 服务器将集合中发生的更改暴露为一个事件流。Debezium 连接器会监视此流,然后将更改向下游传递。连接器首次检测到副本集时,它会检查 oplog 以获取最后记录的事务,然后执行主节点的数据库和集合的快照。连接器完成数据复制后,它会创建一个从之前读取的 oplog 位置开始的变更流。
在 MongoDB 连接器处理更改时,它会定期记录事件在 oplog 流中起源的位置。当连接器停止时,它会记录它处理的最后一个 oplog 流位置,以便在重新启动后可以从该位置继续流式传输。换句话说,连接器可以停止、升级或维护,并在稍后重新启动,并且始终能在不丢失任何事件的情况下准确地恢复到之前的位置。当然,MongoDB oplog 通常有最大容量限制,因此如果连接器停止时间过长,在连接器有机会读取它们之前,oplog 中的操作可能会被清除。在这种情况下,重新启动后,连接器会检测到丢失的 oplog 操作,执行快照,然后继续流式传输更改。
MongoDB 连接器对副本集成员身份和领导者的变化、分片集群中分片的添加或删除以及可能导致通信失败的网络问题也具有很强的容忍度。连接器始终使用副本集的主节点来流式传输更改,因此当副本集进行选举并且另一个节点成为主节点时,连接器会立即停止流式传输更改,连接到新的主节点,并使用新的主节点开始流式传输更改。类似地,如果连接器无法与副本集主节点通信,它会尝试重新连接(使用指数退避,以免使网络或副本集过载)。重新建立连接后,连接器将继续从它捕获的最后一个事件继续流式传输更改。通过这种方式,连接器可以动态地适应副本集成员的变化,并自动处理通信中断。
读取偏好 (Read Preference)
您可以通过在 mongodb.connection.string 中设置 readPreference 参数来指定 MongoDB 连接的读取偏好。
MongoDB 连接器的工作原理 (How the MongoDB connector works)
概述连接器支持的 MongoDB 拓扑对于规划您的应用程序很有用。
支持的 MongoDB 拓扑 (Supported MongoDB topologies)
MongoDB 连接器支持以下 MongoDB 拓扑
- MongoDB 副本集 (MongoDB replica set)
-
Debezium MongoDB 连接器可以从单个 MongoDB 副本集捕获更改。生产环境的副本集至少需要三个成员。
要使用副本集的 MongoDB 连接器,必须在连接器配置中将
mongodb.connection.string属性的值设置为副本集连接字符串。当连接器准备开始从 MongoDB 变更流捕获更改时,它会启动一个连接任务。然后,该连接任务使用指定的连接字符串与可用的副本集成员建立连接。
- MongoDB 分片集群 (MongoDB sharded cluster)
-
一个 MongoDB 分片集群由以下部分组成:
-
一个或多个分片,每个分片部署为一个副本集;
-
一个单独的副本集,充当集群的配置服务器
-
一个或多个路由器(也称为
mongos),客户端连接到这些路由器,并且路由器将请求路由到适当的分片要使用分片集群的 MongoDB 连接器,请在连接器配置中将
mongodb.connection.string属性的值设置为分片集群连接字符串。
-
- MongoDB 独立服务器 (MongoDB standalone server)
-
MongoDB 连接器无法监视独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果将独立服务器转换为具有一个成员的副本集,连接器将可以工作。
|
MongoDB 不建议在生产环境中使用独立服务器。有关更多信息,请参阅MongoDB 文档。 |
所需的权限 (Required user permissions)
为了捕获 MongoDB 中的数据,Debezium 会作为 MongoDB 用户连接到数据库。为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库读取。连接器用户需要以下权限:
-
从数据库读取。
-
运行
hello命令。
连接器用户可能还需要以下权限:
-
从
config.shards系统集合读取。
连接器用户必须能够从所有数据库读取,或者读取特定数据库,具体取决于连接器的 capture.scope 属性的值。根据 capture.scope 设置,为用户分配以下权限之一:
capture.scope设置为deployment-
授予用户读取任何数据库的权限。
capture.scope设置为database-
授予用户读取连接器的
capture.target属性指定的数据库的权限。 capture.scope设置为collection-
授予用户读取连接器的
capture.target属性指定的集合的权限。
hello 命令的权限 (Permission to use the MongoDB hello command)无论 capture.scope 设置如何,用户都需要运行 MongoDB hello 命令的权限。
config.shards 集合的权限 (Permission to read the config.shards collection)根据您的 Debezium 环境,为了使连接器能够执行偏移量合并,您必须授予连接器用户读取 config.shards 集合的显式权限。在以下连接器环境中需要读取 config.shards 集合的权限:
-
从 Debezium 2.5 或更早版本升级的连接器。
-
配置为从分片 MongoDB 集群捕获更改的连接器。
逻辑连接器名称 (Logical connector name)
连接器配置属性 topic.prefix 用作 MongoDB 副本集或分片集群的逻辑名称。连接器在多种方式中使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的变更流位置时的唯一标识符。
您应该为每个 MongoDB 连接器提供一个唯一的逻辑名称,该名称能够有意义地描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。
偏移量合并 (Offset consolidation)
Debezium MongoDB 连接器不再支持连接到分片 MongoDB 部署的 replica_set 连接。因此,由使用 replica_set 连接模式的连接器版本记录的偏移量与当前版本不兼容。
为了最小化连接模式更改的影响,并防止连接器运行不必要的快照,当连接器在升级后重新启动时,它会运行一个合并偏移量的过程。在此偏移量合并过程中,连接器将完成以下步骤来协调由早期连接器版本记录的偏移量:
执行快照 (Performing a snapshot)
当 Debezium 任务开始使用副本集时,它会使用连接器的逻辑名称和副本集名称来查找描述连接器之前停止读取更改的位置的偏移量。如果找到偏移量并且该偏移量仍然存在于 oplog 中,则任务将立即继续流式传输更改,从记录的偏移量位置开始。
但是,如果没有找到偏移量,或者 oplog 中不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。此过程以记录当前 oplog 位置并将其记录为偏移量(连同表示已启动快照的标志)开始。然后,任务将复制每个集合,生成尽可能多的线程(最多为 snapshot.max.threads 配置属性的值),以便并行执行此工作。连接器为看到的每个文档记录一个单独的读取事件。每个读取事件包含对象的标识符、对象的完整状态以及关于对象所在 MongoDB 副本集的源信息。源信息还包括一个标志,该标志表示事件是在快照期间生成的。
此快照将继续,直到复制了所有匹配连接器过滤器的集合。如果在任务快照完成之前停止了连接器,则在重新启动时,连接器将重新开始快照。
|
尝试避免在连接器执行任何副本集快照时进行任务重新分配和重新配置。连接器会生成日志消息来报告快照的进度。为了提供最大的控制,请为每个连接器运行一个单独的 Kafka Connect 集群。 |
| Setting (设置) | 描述 |
|---|---|
|
连接器在每次启动时执行 snapshot。snapshot 完成后,连接器开始流式传输后续数据库更改的事件记录。 |
|
在连接器启动后,它将执行初始数据库快照。 |
|
The connector performs a database snapshot. After the snapshot completes, the connector stops, and does not stream event records for subsequent database changes. (连接器执行数据库快照。快照完成后,连接器停止,并且不为后续数据库更改流式传输事件记录。) |
|
Deprecated, see |
|
连接器捕获所有相关表的结构,但它不会创建 |
|
After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)
|
|
Set the snapshot mode to |
|
|
有关更多信息,请参阅连接器配置属性表中的 snapshot.mode。
临时快照 (Ad hoc snapshots)
By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)
但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供一种重新捕获集合数据的方法,Debezium 包含了一个执行临时快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行临时快照:
-
修改了连接器配置以捕获不同的集合集。
-
Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)
-
Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)
您可以通过启动所谓的临时快照来重新运行先前已捕获快照的集合的快照。临时快照需要使用信号集合。您可以通过向 Debezium 信号集合发送信号请求来启动临时快照。
当您启动现有集合的临时快照时,连接器会将内容追加到该集合已有的主题中。如果先前存在的主题被删除,如果自动创建主题已启用,Debezium 可以自动创建主题。
临时快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容,或者只捕获数据库中的一部分集合。
您可以通过向信号集合发送一个 execute-snapshot 消息来指定要捕获的集合。将 execute-snapshot 信号的类型设置为 incremental 或 blocking,并提供要包含在快照中的集合名称,如下表所述:
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。) |
|
N/A |
一个包含正则表达式的数组,匹配要包含在快照中的集合的完全限定名称。 |
您可以通过向信号集合添加具有 execute-snapshot 信号类型的条目,或向 Kafka 信号主题发送信号消息来启动临时增量快照。在连接器处理消息后,它将开始快照操作。快照过程读取第一个和最后一个主键值,并将这些值作为每个集合的起始和结束点。根据集合中的条目数和配置的块大小,Debezium 将集合划分为块,然后依次对每个块进行快照。
有关更多信息,请参阅增量快照。
您可以通过向信号集合或信号主题添加具有 execute-snapshot 信号类型的条目来启动临时阻塞快照。在连接器处理消息后,它将开始快照操作。连接器会暂时停止流式传输,然后启动指定集合的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。
有关更多信息,请参阅阻塞快照。
增量快照 (Incremental snapshots)
To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)
在增量快照中,Debezium 不是一次性捕获数据库的全部状态(如初始快照),而是分阶段、分一系列可配置的块来捕获每个集合。您可以指定要快照的集合以及每个块的大小。块大小决定了快照在每次数据库提取操作期间收集的行数。增量快照的默认块大小为 1024 行。
随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护每个捕获的集合行的记录。这种分阶段捕获数据的方法与标准的初始快照过程相比具有以下优点:
-
You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)
-
如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。进程恢复后,快照将从停止点开始,而不是从头重新捕获集合。
-
您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以在
collection.include.list属性中添加集合后,您可能需要重新运行快照。
运行增量快照时,Debezium 会按主键对每个集合进行排序,然后根据配置的块大小将集合分成块。然后,它逐块地捕获每个集合中的行。对于捕获的每一行,快照都会发出一个 READ 事件。该事件代表块开始时行的值。
在快照进行时,很可能其他进程会继续访问数据库,从而可能修改集合记录。为反映此类更改,INSERT、UPDATE 或 DELETE 操作会像往常一样提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些更改事件,并将相应的更改事件记录发送到 Kafka。
在某些情况下,流式传输过程发出的 UPDATE 或 DELETE 事件可能会乱序接收。也就是说,在快照捕获包含该行读取事件的块之前,流式传输过程可能会发出修改集合行的事件。当快照最终为该行发出相应的 READ 事件时,其值已经被覆盖。为了确保乱序的增量快照事件按正确的逻辑顺序处理,Debezium 采用了一种缓冲方案来解决冲突。仅当解决了快照事件与流式事件之间的冲突后,Debezium 才会将事件记录发送到 Kafka。
为了帮助解决乱序到达的 READ 事件与修改同一集合行的流式事件之间的冲突,Debezium 采用了一种称为快照窗口的机制。快照窗口界定了增量快照捕获指定集合块数据的间隔。在特定块的快照窗口打开之前,Debezium 会遵循其常规行为,直接将来自事务日志的事件向下游发送到目标 Kafka 主题。但从特定块的快照打开的那一刻起,直到它关闭,Debezium 会执行去重步骤来解决具有相同主键的事件之间的冲突。
For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATE 或 DELETE 操作。)
当快照窗口打开,并且 Debezium 开始处理快照块时,它会将快照记录发送到一个内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键会与传入的流式事件的主键进行比较。如果没有找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配项,它会丢弃缓冲区中的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上会覆盖静态快照事件。在块的快照窗口关闭后,缓冲区中将只包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。
The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)
|
为了让 Debezium 能够执行增量快照,您必须授予连接器写入信号集合的权限。 Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDB、MySQL 或 PostgreSQL)来说,写入权限是可选的。) |
Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)
|
增量快照要求每个表的“主键”是稳定排序的。由于 有关 MongoDB 中 BSON 字符串类型的更多信息,请参阅MongoDB 文档)。 |
要将增量快照用于分片 MongoDB 集群,必须将 incremental.snapshot.chunk.size 设置为一个足够大的值,以补偿变更流管道日益增加的复杂性。
触发增量快照 (Triggering an incremental snapshot)
要启动增量快照,您可以向源数据库上的信号集合发送一个临时快照信号。
您可以使用 MongoDB insert() 方法向信号集合提交信号。
在 Debezium 检测到信号集合中的更改后,它会读取信号,并运行请求的快照操作。
您提交的查询指定了要包含在快照中的集合,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是 incremental 和 blocking。
要指定要包含在快照中的集合,请提供一个 data-collections 数组,其中列出了集合或用于匹配集合的正则表达式数组,例如:
{"data-collections": ["public.Collection1", "public.Collection2"]}
增量快照的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会检测到不需要执行任何操作,也不会执行快照。
|
如果要在快照中包含的集合名称在数据库、模式或表的名称中包含点 ( |
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
将快照信号文档插入信号集合 (Insert a snapshot signal document into the signaling collection)
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : <snapshotType>, "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>, "additional-conditions" : [{"data-collections" : "<collectionName>", "filter" : "<additional-condition>"}] }});For example, (例如,)
db.debeziumSignal.insert({ (1) "type" : "execute-snapshot", (2) (3) "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4) "type": "incremental"} (5) "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6) });命令中的
id、type和data参数值对应于信号集合的字段。The following table describes the parameters in the example (下表描述了示例中的参数:)
表 3. 用于向信号集合发送增量快照信号的 MongoDB insert() 命令字段说明 Item Value (值) 描述 1
db.debeziumSignal指定源数据库上信号集合的完全限定名称。
2
null
_id参数指定一个任意字符串,该字符串被分配为信号请求的id标识符。
前面示例中的 insert 方法省略了可选的_id参数的使用。因为文档未显式分配该参数的值,所以 MongoDB 自动分配给文档的任意 ID 成为信号请求的id标识符。
使用此字符串来识别日志消息与信号集合中的条目。Debezium 不使用此标识符字符串。相反,在快照期间,Debezium 会生成自己的id字符串作为水印信号。3
execute-snapshotSpecifies
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)data字段的必需组件,指定一个集合名称或匹配集合名称的正则表达式的数组,以包含在快照中。
数组列出了匹配集合名称的正则表达式,使用与在signal.data.collection配置属性中指定连接器信号集合名称相同的格式。5
incremental (增量)An optional
typecomponent of thedatafield of a signal that specifies the type of snapshot operation to run. (信号的data字段的可选type组件,指定要运行的快照操作的类型。)
目前支持incremental和blocking类型。
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)6
additional-conditions (附加条件)An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
additional-conditions数组中的每个元素都是一个对象,包含以下键:data-collection:: 要应用过滤器的目标集合的完全限定名称。filter:: 指定了在数据集合记录中必须存在的列值才能包含在快照中,例如,"color='blue'"。
The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"ts_us":"1620393591654962",
"ts_ns":"1620393591654962147",
"transaction":null
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。) |
2 |
|
Specifies the event type. (指定事件类型。) |
使用 Kafka 信号通道触发增量快照 (Using the Kafka signaling channel to trigger an incremental snapshot)
You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports the |
|
N/A |
An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。) |
|
N/A |
An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。) |
execute-snapshot Kafka 消息 (Example 1. An execute-snapshot Kafka message)Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Debezium 使用 additional-conditions 字段来选择集合内容的子集。
Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)
SELECT * FROM <tableName> ….
When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collection 和 filter 参数将被附加到 SQL 查询中,例如:)
SELECT * FROM <data-collection> WHERE <filter> ….
例如,给定一个具有 id(主键)、color 和 brand 列的 products 集合,如果您希望快照仅包含 color='blue' 的内容,则在请求快照时,可以添加 additional-conditions 属性来过滤内容::leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue'"}]}}`
您还可以使用 additional-conditions 属性传递基于多个列的条件。例如,使用与上一个示例相同的 products 集合,如果您希望快照仅包含 products 集合中 color='blue' 和 brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
停止增量快照 (Stopping an incremental snapshot)
在某些情况下,可能需要停止增量快照。例如,您可能意识到快照配置不正确,或者您可能希望确保有资源可用于其他数据库操作。您可以通过向源数据库上的集合发送信号来停止正在进行的快照。
您可以通过将停止快照信号文档插入信号集合来向信号集合提交停止快照信号。您提交的停止快照信号将快照操作的 type 指定为 incremental,并可选地指定要从当前正在运行的快照中排除的集合。在 Debezium 检测到信号集合中的更改后,它会读取信号,并在快照进行中时停止增量快照操作。
您也可以通过向Kafka 信号主题发送 JSON 消息来停止增量快照。
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
将停止快照信号文档插入信号集合 (Insert a stop snapshot signal document into the signaling collection)
<signalDataCollection>.insert({"id" : _<idNumber>,"type" : "stop-snapshot", "data" : {"data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});For example, (例如,)
db.debeziumSignal.insert({ (1) "type" : "stop-snapshot", (2) (3) "data" : { "data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], (4) "type": "incremental"} (5) });信号命令中的
id、type和data参数值对应于信号集合的字段。The following table describes the parameters in the example (下表描述了示例中的参数:)
表 5. 用于向信号集合发送停止增量快照文档的 insert 命令字段说明 Item Value (值) 描述 1
db.debeziumSignal指定源数据库上信号集合的完全限定名称。
2
null
前面示例中的 insert 方法省略了可选的
_id参数的使用。因为文档未显式分配该参数的值,所以 MongoDB 自动分配给文档的任意 ID 成为信号请求的id标识符。
使用此字符串来识别日志消息与信号集合中的条目。Debezium 不使用此标识符字符串。3
stop-snapshotThe
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)data字段的可选组件,指定一个集合名称或匹配要从快照中删除的集合名称的正则表达式的数组。
数组列出了匹配集合名称的正则表达式,格式为database.collection。如果您在
data字段中省略了data-collections数组,则信号将停止正在进行的所有增量快照。5
incremental (增量)A required component of the
datafield of a signal that specifies the type of snapshot operation to be stopped. (信号的data字段的必需组件,指定要停止的快照操作的类型。)
Currently, the only valid option isincremental. (目前,唯一有效选项是incremental。)
If you do not specify atypevalue, the signal fails to stop the incremental snapshot. (如果您不指定type值,信号将无法停止增量快照。)
使用 Kafka 信号通道停止增量快照 (Using the Kafka signaling channel to stop an incremental snapshot)
You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports only the |
|
N/A |
一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的完全限定名称,或者匹配要从快照中删除的集合名称的正则表达式数组。 |
The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.table1", "db1.table2"], "type": "INCREMENTAL"}}`
Custom snapshotter SPI (自定义快照器 SPI)
For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)
io.debezium.snapshot.spi.Snapshotter-
Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)
io.debezium.snapshot.spi.SnapshotQuery-
Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)
io.debezium.snapshot.spi.SnapshotLock-
Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)
io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)/**
* {@link Snapshotter} is used to determine the following details about the snapshot process:
* <p>
* - Whether a snapshot occurs. <br>
* - Whether streaming continues during the snapshot. <br>
* - Whether the snapshot includes schema (if supported). <br>
* - Whether to snapshot data or schema following an error.
* <p>
* Although Debezium provides many default snapshot modes,
* to provide more advanced functionality, such as partial snapshots,
* you can customize implementation of the interface.
* For more information, see the documentation.
*
*
*
*/
@Incubating
public interface Snapshotter extends Configurable {
/**
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a data snapshot
*/
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a schema snapshot
*/
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called after the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called after the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}
io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)/**
* {@link SnapshotQuery} is used to determine the query used during a data snapshot
*
*
*/
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}
io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)/**
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
*
*/
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);
}
阻塞快照 (Blocking snapshots)
To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)
A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)
You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)
-
您添加了一个新集合,并且希望在连接器运行时完成快照。
-
您添加了一个大型集合,并且希望快照完成的时间比增量快照可能完成的时间要短。
运行阻塞快照时,Debezium 会停止流式传输,然后启动指定集合的快照,遵循其在初始快照期间使用的相同过程。快照完成后,将恢复流式传输。
You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)
-
data-collections: 用于指定必须进行快照的集合。
-
data-collections: 指定您希望快照包含的集合。
此属性接受一个逗号分隔的正则表达式列表,这些表达式匹配完全限定的集合名称。此属性的行为类似于table.include.list属性,该属性指定在阻塞快照中要捕获的表。 -
additional-conditions: 您可以为不同的集合指定不同的过滤器。
-
data-collection属性是应用过滤器的集合的完全限定名称,它可以是区分大小写或不区分大小写的,具体取决于数据库。 -
filter属性的值将与snapshot.select.statement.overrides中使用的值相同,即应该区分大小写匹配的集合的完全限定名称。
-
For example (例如:)
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)
流式传输更改 (Streaming changes)
在副本集的连接器任务记录偏移量后,它会使用该偏移量来确定 oplog 中应该开始流式传输更改的位置。然后,任务(根据配置)连接到副本集的主节点或连接到整个副本集的变更流,并从该位置开始流式传输更改。它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。每个更改事件都包含操作在 oplog 中的位置,并且连接器会定期将其记录为最新的偏移量。偏移量记录的间隔由 offset.flush.interval.ms 控制,这是 Kafka Connect 工作者的配置属性。
当连接器正常停止时,会记录最后处理的偏移量,以便在重新启动后,连接器将从它离开的地方继续。但是,如果连接器任务意外终止,那么任务可能在其最后一次记录偏移量但尚未记录完整偏移量之前已经处理并生成了事件;重新启动后,连接器将从最后记录的偏移量开始,可能重新生成一些在崩溃前已经生成过的事件。
|
当 Kafka 管道中的所有组件正常运行时,Kafka 消费者会仅一次收到每个消息。但是,当出现问题时,Kafka 只能保证消费者至少一次收到每个消息。为了避免意外结果,消费者必须能够处理重复的消息。 |
如前所述,连接器任务始终使用副本集的主节点从 oplog 流式传输更改,确保连接器尽可能看到最新的操作,并能够比使用从节点捕获更改具有更低的延迟。当副本集选举出新的主节点时,连接器会立即停止流式传输更改,连接到新的主节点,并从新的主节点以相同的位置开始流式传输更改。同样,如果连接器在与副本集成员通信时遇到任何问题,它会尝试重新连接(使用指数退避,以免使副本集过载),一旦连接成功,它将从上次中断的地方继续流式传输更改。通过这种方式,连接器能够动态地适应副本集成员的变化,并自动处理通信故障。
总而言之,MongoDB 连接器在大多数情况下会继续运行。通信问题可能会导致连接器等待,直到问题得到解决。
预映像支持 (Pre-image support)
在 MongoDB 6.0 及更高版本中,您可以配置变更流以发出文档的预映像状态,从而为 MongoDB 更改事件填充 before 字段。要启用预映像在 MongoDB 中的使用,您必须使用 db.createCollection()、create 或 collMod 为集合启用 changeStreamPreAndPostImages。要让 Debezium MongoDB 在更改事件中包含预映像,请将连接器的 capture.mode 设置为 *_with_pre_image 选项之一。
|
MongoDB 变更流事件的大小限制 (Size limits on MongoDB change stream events)
MongoDB 变更流事件的大小限制为 16MB。因此,使用预映像增加了超过此阈值的可能性,这可能导致失败。有关如何避免超出变更流限制的信息,请参阅MongoDB 文档。 |
主题名称 (Topic names)
MongoDB 连接器会将每个集合中文档的插入、更新和删除操作的事件写入单个 Kafka 主题。Kafka 主题的名称始终采用 logicalName.databaseName.collectionName 的形式,其中 logicalName 是连接器的逻辑名称(由 topic.prefix 配置属性指定),databaseName 是操作发生的数据库名称,collectionName 是受影响文档所在的 MongoDB 集合的名称。
例如,考虑一个 MongoDB 副本集,其中有一个 inventory 数据库,包含四个集合:products、products_on_hand、customers 和 orders。如果监视此数据库的连接器被赋予逻辑名称 fulfillment,那么连接器将产生以下四个 Kafka 主题的事件:
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
请注意,主题名称不包含副本集名称或分片名称。因此,分片集合(其中每个分片包含集合文档的子集)的所有更改都将转到同一个 Kafka 主题。
您可以设置 Kafka 以自动创建所需的主题。如果未这样做,则必须使用 Kafka 管理工具在启动连接器之前创建主题。
分区 (Partitions)
MongoDB 连接器不显式确定如何对事件进行分区。相反,它允许 Kafka 根据事件键来确定如何分区主题。您可以通过在 Kafka Connect 工作者配置中定义 Partitioner 实现的名称来更改 Kafka 的分区逻辑。
Kafka 仅为写入单个主题分区的事件维护总顺序。通过键对事件进行分区意味着具有相同键的所有事件始终转到同一个分区。这确保了特定文档的所有事件始终具有总顺序。
事务元数据 (Transaction Metadata)
Debezium 可以生成代表事务元数据边界的事件,并丰富更改数据事件消息。
|
Debezium 接收事务元数据的限制
Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
对于每个事务 BEGIN 和 END,Debezium 会生成一个包含以下字段的事件:
status-
BEGIN或END id-
唯一事务标识符的字符串表示。
event_count(针对END事件)-
事务发出的事件总数。
data_collections(针对END事件)-
一个由
data_collection和event_count对组成的数组,提供来自给定数据集合的更改发出的事件数量。
以下示例显示了一个典型的消息:
{
"status": "BEGIN",
"id": "1462833718356672513",
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "1462833718356672513",
"event_count": 2,
"data_collections": [
{
"data_collection": "rs0.testDB.collectiona",
"event_count": 1
},
{
"data_collection": "rs0.testDB.collectionb",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。
启用事务元数据后,数据消息 Envelope 将会添加一个 transaction 字段。此字段提供有关每个事件的信息,形式为字段的组合:
id-
唯一事务标识符的字符串表示。
total_order-
事件在事务生成的所有事件中的绝对位置。
data_collection_order-
事件在事务发出的所有事件中,每个数据集合的位置。
以下是消息外观的示例:
{
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}",
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335486",
"ts_ns": "1580390884335486281",
"transaction": {
"id": "1462833718356672513",
"total_order": "1",
"data_collection_order": "1"
}
}
数据更改事件 (Data change events)
Debezium MongoDB 连接器为插入、更新或删除数据的每个文档级操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于更改的集合。
Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。
以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器将更改事件记录流式传输到与事件的源集合同名的主题。请参阅主题名称。
|
MongoDB 连接器确保所有 Kafka Connect schema 名称都遵循Avro schema 名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头(即 a-z、A-Z 或 _)。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线(即 a-z、A-Z、0-9 或 _)。如果存在无效字符,它将被下划线字符替换。 如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且唯一区分名称的字符无效并因此被替换为下划线,这可能会导致意外的冲突。 |
更改事件键 (Change event keys)
更改事件的键包含已更改文档的键的 schema 和已更改文档的实际键。对于给定的集合,schema 和其对应的 payload 都包含一个 id 字段。此字段的值是文档的标识符,表示为从MongoDB 扩展 JSON 序列化严格模式派生的字符串。
考虑一个逻辑名称为 fulfillment、包含 inventory 数据库的副本集以及包含如下文档的 customers 集合的连接器。
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
捕获 customers 集合更改的每个更改事件都有相同的事件键 schema。只要 customers 集合具有之前的定义,捕获 customers 集合更改的每个更改事件都具有以下键结构。在 JSON 中,它看起来像这样:
{
"schema": { (1)
"type": "struct",
"name": "fulfillment.inventory.customers.Key", (2)
"optional": false, (3)
"fields": [ (4)
{
"field": "id",
"type": "string",
"optional": false
}
]
},
"payload": { (5)
"id": "1004"
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
定义键 payload 结构的 schema 名称。此 schema 描述了已更改文档的键的结构。键 schema 名称的格式为 connector-name.database-name.collection-name.
|
3 |
|
指示事件键的 |
4 |
|
指定 |
5 |
|
包含为生成此更改事件的文档的键。在此示例中,键包含一个类型为 |
此示例使用具有整数标识符的文档,但任何有效的 MongoDB 文档标识符都以相同方式工作,包括文档标识符。对于文档标识符,事件键的 payload.id 值是一个字符串,表示已更新文档的原始 _id 字段,作为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了不同类型的 _id 字段如何表示的示例。
| Type | MongoDB _id 值 (MongoDB _id Value) |
键的 payload (Key’s payload) |
|---|---|---|
整数 (Integer) |
1234 |
|
浮点数 (Float) |
12.34 |
|
String |
"1234" |
|
文档 (Document) |
|
|
ObjectId |
|
|
二进制 (Binary) |
|
|
更改事件值 (Change event values)
更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。
考虑用于显示更改事件键的示例的相同样本文档
{
"_id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
}
示例 customers 集合中更改的更改事件的值部分根据事件类型有所不同
创建事件 (create events)
以下示例显示了连接器为在 customers 集合中创建数据操作生成的更改事件的值部分:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json", (2)
"version": 1,
"field": "after"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Json",
"version": 1,
"field": "patch"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "rs"
},
{
"type": "string",
"optional": false,
"field": "collection"
},
{
"type": "int32",
"optional": false,
"field": "ord"
},
{
"type": "int64",
"optional": true,
"field": "h"
}
],
"optional": false,
"name": "io.debezium.connector.mongo.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": true,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope" (4)
},
"payload": { (5)
"after": "{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"annek@noanswer.org\"}", (6)
"source": { (7)
"version": "3.3.1.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_ms": 1558965508000000,
"ts_ms": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c", (8)
"ts_ms": 1558965515240, (9)
"ts_us": 1558965515240142, (10)
"ts_ns": 1558965515240142879, (11)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
值 schema,它描述了值 payload 的结构。更改事件的值 schema 在连接器为特定集合生成的每个更改事件中都是相同的。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 事件的 JSON 表示可能比它们描述的文档大得多。这是因为 JSON 表示必须包含消息的 schema 和 payload 部分。但是,通过使用Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 |
|
一个可选字段,指定事件发生后的文档状态。在此示例中, |
7 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
8 |
|
强制性的字符串,描述导致连接器生成事件的操作类型。在此示例中,
|
9 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
10 |
|
一个可选字段,显示连接器处理事件的时间(以微秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
9 |
|
一个可选字段,显示连接器处理事件的时间(以纳秒为单位)。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
更新事件 (update events)
变更流捕获模式 (Change streams capture mode)
对于示例 customers 集合中的更新,更改事件的值具有与该集合的创建事件相同的 schema。同样,事件值的 payload 结构也相同。然而,在更新事件中,事件值 payload 包含不同的值。如果 capture.mode 选项设置为 change_streams_update_full,则更新事件包含 after 值。如果 capture.mode 选项设置为 *_with_pre_image 选项之一,则会提供 before 值。在这种情况下,会有一个新的结构化字段 updateDescription 包含一些附加字段:
-
updatedFields是一个字符串字段,包含更新文档字段及其值的 JSON 表示。 -
removedFields是从文档中移除的字段名称列表。 -
truncatedArrays是文档中被截断的数组列表。
以下是连接器为 customers 集合中的更新生成的事件的更改事件值示例:
{
"schema": { ... },
"payload": {
"op": "u", (1)
"ts_ms": 1465491461815, (2)
"ts_us": 1465491461815698, (2)
"ts_ns": 1465491461815698142, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (3)
"after":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}", (4)
"updateDescription": {
"removedFields": null,
"updatedFields": "{\"first_name\": \"Anne Marie\"}", (5)
"truncatedArrays": null
},
"source": { (6)
"version": "3.3.1.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 1,
"h": null,
"tord": null,
"stxnid": null,
"lsid":"{\"id\": {\"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {\"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
"txnNumber":1
}
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
强制性的字符串,描述导致连接器生成事件的操作类型。在此示例中, |
2 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
包含更改前实际 MongoDB 文档的 JSON 字符串表示。 如果捕获模式未设置为 |
4 |
|
包含实际 MongoDB 文档的 JSON 字符串表示。 |
5 |
|
包含更新文档字段值的 JSON 字符串表示。在此示例中,更新将 |
6 |
|
描述事件源元数据的强制性字段。此字段包含与同一集合的创建事件相同的信息,但由于此事件来自 oplog 中的不同位置,因此值不同。源元数据包括:
|
|
事件中的 如果您的应用程序依赖于渐进式更改演变,则应仅依赖 |
删除事件 (delete events)
删除更改事件中的值与同一集合的创建和更新事件具有相同的 schema 部分。删除事件中的 payload 部分包含与同一集合的创建和更新事件不同的值。特别是,删除事件既不包含 after 值也不包含 updateDescription 值。以下是 customers 集合中文档的删除事件示例:
{
"schema": { ... },
"payload": {
"op": "d", (1)
"ts_ms": 1465495462115, (2)
"ts_us": 1465495462115748, (2)
"ts_ns": 1465495462115748263, (2)
"before":"{\"_id\": {\"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"annek@noanswer.org\"}",(3)
"source": { (4)
"version": "3.3.1.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": true,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 6,
"h": 1546547425148721999
}
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
强制性的字符串,描述操作类型。 |
2 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
3 |
|
包含更改前实际 MongoDB 文档的 JSON 字符串表示。 如果捕获模式未设置为 |
4 |
|
描述事件源元数据的强制性字段。此字段包含与同一集合的创建或更新事件相同的信息,但由于此事件来自 oplog 中的不同位置,因此值不同。源元数据包括:
|
MongoDB 连接器事件设计用于与Kafka 日志压缩配合使用。日志压缩可以删除一些较旧的消息,只要保留每个键的最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。
设置 MongoDB (Setting up MongoDB)
MongoDB 连接器使用 MongoDB 的变更流来捕获更改,因此连接器仅适用于 MongoDB 副本集或分片集群,其中每个分片都是一个单独的副本集。有关设置副本集或分片集群的信息,请参阅 MongoDB 文档。另外,请务必了解如何在副本集上启用访问控制和身份验证。
您还必须有一个具有适当角色的 MongoDB 用户,该用户可以读取 oplog 的 admin 数据库。此外,用户还必须能够读取分片集群配置服务器中的 config 数据库,并且必须具有 listDatabases 权限。当使用变更流(默认)时,用户还必须具有集群范围的权限操作 find 和 changeStream。
当您打算使用预映像并填充 before 字段时,您需要先使用 db.createCollection()、create 或 collMod 为集合启用 changeStreamPreAndPostImages。
云上的 MongoDB (MongoDB in the Cloud)
您可以将 Debezium MongoDB 连接器与MongoDB Atlas 一起使用。请注意,MongoDB Atlas 仅支持通过 SSL 进行安全连接,即连接器的 +mongodb.ssl.enabled 选项必须设置为 true。
最优 Oplog 配置 (Optimal Oplog Config)
Debezium MongoDB 连接器读取变更流以获取副本集的 oplog 数据。由于 oplog 是一个固定大小的有上限集合,如果它超过其配置的最大大小,它就会开始覆盖最早的条目。如果连接器因任何原因停止,当它重新启动时,它会尝试从最后一个 oplog 流位置恢复。但是,如果最后一个流位置已从 oplog 中删除,则根据连接器 snapshot.mode 属性中指定的值,连接器可能会启动失败,并报告无效的恢复令牌错误。如果发生故障,您必须创建一个新的连接器才能让 Debezium 继续捕获数据库中的记录。有关更多信息,请参阅如果 `snapshot.mode` 设置为 `initial`,连接器在停止很长时间后启动失败。
为了确保 oplog 保留 Debezium 恢复流式传输所需的偏移量值,您可以使用以下任一方法:
-
增加 oplog 的大小。根据您的典型工作负载,将 oplog 大小设置为大于每小时 oplog 条目峰值数量的值。
-
增加 oplog 条目保留的最小小时数(MongoDB 4.4 及更高版本)。此设置是基于时间的,因此即使 oplog 达到其配置的最大大小,最后n小时内的条目也能保证可用。虽然这通常是首选选项,但对于接近容量的高工作负载集群,请指定最大 oplog 大小。
为了帮助防止与缺失 oplog 条目相关的失败,跟踪报告复制行为的指标并优化 oplog 大小以支持 Debezium 很重要。特别是,您应该监视 Oplog GB/Hour 和 Replication Oplog Window 的值。如果 Debezium 离线时间超过复制 oplog 窗口的值,并且主 oplog 的增长速度快于 Debezium 消耗条目的速度,则可能会导致连接器失败。
有关如何监视这些指标的信息,请参阅MongoDB 文档。
最好将最大 oplog 大小设置为基于 oplog 的预期每小时增长量(Oplog GB/Hour)乘以解决 Debezium 故障可能需要的时间。
即:
Oplog GB/Hour X 平均 Debezium 故障响应时间
例如,如果 oplog 大小限制设置为 1GB,并且 oplog 每小时增长 3GB,则 oplog 条目每小时清除三次。如果 Debezium 在此期间发生故障,其最后 oplog 位置很可能会被删除。
如果 oplog 以 3GB/小时的速度增长,并且 Debezium 离线两个小时,那么您会将 oplog 大小设置为 3GB/小时 X 2 小时,即 6GB。
部署 (Deployment)
要部署 Debezium MongoDB 连接器,您需要安装 Debezium MongoDB 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
-
已安装 Apache Kafka 和 Kafka Connect。
-
MongoDB 已安装并设置为与 Debezium 连接器配合使用。
-
下载连接器的插件存档,
-
将 JAR 文件提取到您的 Kafka Connect 环境中。
-
将包含 JAR 文件的目录添加到Kafka Connect 的
plugin.path。 -
重新启动您的 Kafka Connect 进程以加载新 JAR 文件。
如果您正在使用不可变容器,请参阅Debezium 的容器镜像(适用于 Apache Kafka 和 Kafka Connect),其中已预装 MongoDB 连接器并准备好运行。
|
从 |
Debezium 的 教程 将引导您完成这些镜像的使用,这是了解 Debezium 的绝佳途径。
MongoDB 连接器配置示例
以下是一个连接器实例的配置示例,该实例用于捕获 192.168.99.100 的 27017 端口上名为 rs0 的 MongoDB 副本集的数据,我们逻辑上将其命名为 fullfillment。通常,您会在 JSON 文件中通过设置连接器可用的配置属性来配置 Debezium MongoDB 连接器。
您可以选择为特定的 MongoDB 副本集或分片集群生成事件。可选地,您可以过滤掉不需要的集合。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector", (2)
"mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", (3)
"topic.prefix": "fullfillment", (4)
"collection.include.list": "inventory[.]*" (5)
}
}
| 1 | 我们在将其注册到 Kafka Connect 服务时使用的连接器名称。 |
| 2 | MongoDB 连接器类的名称。 |
| 3 | 用于连接 MongoDB 副本集的连接字符串。 |
| 4 | MongoDB 副本集的逻辑名称,它构成了生成事件的命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 的模式名称以及使用 Avro 转换器时的相应 Avro 模式的命名空间。 |
| 5 | 一系列正则表达式,用于匹配要监视的所有集合的集合命名空间(例如,<dbName>.<collectionName>)。这是可选的。 |
有关 Debezium MongoDB 连接器可设置的完整配置属性列表,请参阅 MongoDB 连接器配置属性。
您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个连接器任务,该任务执行以下操作:
-
连接到 MongoDB 副本集或分片集群。
-
为每个副本集分配任务。
-
在必要时执行快照。
-
读取变更流。
-
将更改事件记录流式传输到 Kafka 主题。
添加连接器配置
要开始运行 Debezium MongoDB 连接器,请创建连接器配置,并将配置添加到您的 Kafka Connect 集群。
-
Debezium MongoDB 连接器已安装。
-
为 MongoDB 连接器创建配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
连接器启动后,它将完成以下操作
-
读取副本集的变更流。
-
为每个插入、更新和删除的文档生成变更事件。
-
将更改事件记录流式传输到 Kafka 主题。
连接器属性
Debezium MongoDB 连接器具有许多配置属性,您可以使用它们来实现连接器为您的应用程序实现正确的行为。许多属性都有默认值。有关属性的信息组织如下
以下配置属性是必需的,除非有默认值可用。
必需的 Debezium MongoDB 连接器配置属性
| 属性 | Default (默认值) | 描述 | ||||||
|---|---|---|---|---|---|---|---|---|
false |
将此属性设置为
|
|||||||
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。) |
|||||||
无默认值 |
连接器的 Java 类名。对于 MongoDB 连接器,始终使用 |
|||||||
无默认值 |
指定连接器用于连接 MongoDB 副本集的 连接字符串。此属性取代了 MongoDB 连接器早期版本中可用的 |
|||||||
无默认值 |
用于标识连接器和/或此连接器监视的 MongoDB 副本集或分片集群的唯一名称。每个服务器最多应由一个 Debezium 连接器监视,因为此服务器名称会为所有源自 MongoDB 副本集或集群的持久化 Kafka 主题添加前缀。仅使用字母数字字符、连字符、点和下划线来构成名称。逻辑名称在所有其他连接器中应是唯一的,因为该名称用作接收此连接器记录的 Kafka 主题的名称前缀。
|
|||||||
DefaultMongoDbAuthProvider |
实现 io.debezium.connector.mongodb.connection.MongoDbAuthProvider 接口的完整 Java 类名。此类负责在 MongoDB 连接上设置凭据(在每次应用启动时调用)。默认行为根据其文档使用 |
|||||||
无默认值 |
使用默认 |
|||||||
无默认值 |
使用默认 |
|||||||
|
使用默认 |
|||||||
|
连接器将使用 SSL 连接到 MongoDB 实例。 |
|||||||
|
启用 SSL 时,此设置控制在连接阶段是否禁用严格的主机名检查。如果设置为 |
|||||||
regex |
用于根据包含/排除的数据库和集合名称匹配事件的模式。将属性设置为以下值之一
|
|||||||
空字符串 |
一个可选的逗号分隔列表,包含匹配要监视的数据库名称的正则表达式或字面量。默认情况下,监视所有数据库。 为了匹配数据库的名称,Debezium 根据
如果您在此配置中包含此属性,请不要同时设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含匹配要从监视中排除的数据库名称的正则表达式或字面量。当设置了 为了匹配数据库的名称,Debezium 根据
如果您在此配置中包含此属性,请不要设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含匹配要监视的 MongoDB 集合的完全限定命名空间的正则表达式或字面量。默认情况下,连接器监视除 为了匹配命名空间的名称,Debezium 根据
如果您在此配置中包含此属性,请不要同时设置 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含匹配要从监视中排除的 MongoDB 集合的完全限定命名空间的正则表达式或字面量。当设置了 为了匹配命名空间的名称,Debezium 根据
如果您在此配置中包含此属性,请不要设置 |
|||||||
|
指定连接器用于从 MongoDB 服务器捕获
|
|||||||
指定 startAtOperationTime 变更流参数。该值应为 Bson 时间戳的长整型表示。 |
||||||||
|
指定连接器打开的变更流的范围。将此属性设置为以下值之一
|
|||||||
指定连接器监视更改的数据库。仅当 |
||||||||
空字符串 |
一个可选的逗号分隔列表,包含应从变更事件消息值中排除的字段的完全限定名称。字段的完全限定名称格式为 databaseName.collectionName.fieldName.nestedFieldName,其中 databaseName 和 collectionName 可以包含通配符(*),它匹配任何字符。 |
|||||||
空字符串 |
一个可选的逗号分隔列表,包含应用于重命名字段的完全限定字段重命名。字段的完全限定重命名格式为 databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName,其中 databaseName 和 collectionName 可以包含通配符(*),它匹配任何字符,冒号(:)字符用于确定字段的重命名映射。下一个字段重命名将应用于列表中前一个字段重命名的结果,因此在重命名同一路径下的多个字段时请牢记这一点。 |
|||||||
|
控制是否在*delete* 事件后跟一个墓碑事件。 |
|||||||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||||||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
以下高级配置属性具有可以满足大多数情况的良好默认值,因此很少需要在连接器的配置中指定。
Debezium MongoDB 连接器高级配置属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
指定当
要将此选项与 MongoDB 变更流集合一起使用,您必须将集合配置为返回文档的预映像和后映像。仅当在操作发生之前已到位所需配置,才能获得操作的预映像和后映像。 将此属性设置为以下值之一
|
|||
|
正整数值,指定在连接器的每次迭代中应处理的事件批次的最大大小。默认为 2048。 |
|||
|
正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 从数据库读取流式事件时,它会将事件放入阻塞队列,然后再写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或 Kafka 不可用时,阻塞队列可以为读取变更事件提供反压。队列中保存的事件在连接器定期记录偏移量时会被忽略。始终将 |
|||
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|||
|
正整数值,指定在发生异常并中止任务之前,连接到副本集主节点的失败连接尝试的最大次数。默认为 16,结合 |
|||
无默认值 |
一个可选设置,用于指定密钥库文件的位置。密钥库文件可用于客户端和 MongoDB 服务器之间的双向身份验证。 |
|||
无默认值 |
密钥库文件的密码。仅当配置了 |
|||
无默认值 |
密钥库文件的类型。仅当配置了 |
|||
无默认值 |
用于服务器证书验证的信任库文件的位置。 |
|||
无默认值 |
信任库文件的密码。用于检查信任库的完整性并解锁信任库。仅当配置了 |
|||
无默认值 |
信任库文件的类型。仅当配置了 |
|||
v2 |
CDC 事件中 |
|||
|
控制心跳消息的发送频率。 将此参数设置为 |
|||
|
一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:
如果您不希望连接器跳过任何操作,请将值设置为 |
|||
无默认值 |
控制在快照中包含哪些集合项。此属性仅影响快照。指定格式为 databaseName.collectionName 的集合名称的逗号分隔列表。 对于您指定的每个集合,还要指定另一个配置属性: |
|||
无默认值 |
连接器在启动后等待多长时间(以毫秒为单位)以执行快照; |
|||
0 |
指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 |
|||
|
指定在执行快照时从每个集合中一次读取的最大文档数。连接器将以该大小的多个批次读取集合内容。 |
|||
|
一个可选的逗号分隔列表,包含匹配您要在快照中包含的模式的完全限定名称( 为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。 |
|||
|
正整数值,指定用于执行副本集集合初始同步的最大线程数。默认为 1。 |
|||
initial (初始) |
指定连接器启动时执行快照的标准。将属性设置为以下值之一:
有关更多信息,请参阅自定义快照程序 SPI。 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
无默认值 |
如果 |
|||
|
设置为 有关更多详细信息,请参阅 事务元数据。 |
|||
10000 (10 秒) |
在发生可重试错误后重启连接器的等待时间(以毫秒为单位)。 |
|||
|
连接器轮询新的、已删除的或已更改的副本集的间隔。 |
|||
10000 (10 秒) |
驱动程序在新的连接尝试被中止之前等待的毫秒数。 |
|||
10000 (10 秒) |
集群监视器尝试访问每个服务器的频率。 |
|||
0 |
在发生超时之前,套接字上的发送/接收操作可以执行的毫秒数。值为 |
|||
30000 (30 秒) |
驱动程序在超时并引发错误之前等待选择服务器的毫秒数。 |
|||
无默认值 |
在流式传输更改时,此设置将处理更改流事件,作为标准 MongoDB 聚合流管道的一部分。管道是由数据库指令组成的 MongoDB 聚合管道,用于过滤或转换数据。这可用于自定义连接器消耗的数据。此属性的值必须是一个 JSON 格式的允许的 聚合管道阶段 数组。请注意,这会附加到用于支持连接器的内部管道之后(例如,过滤操作类型、数据库名称、集合名称等)。 |
|||
internal_first |
用于构建有效 MongoDB 聚合流管道的顺序。将属性设置为以下值之一
|
|||
fail |
用于处理超出指定 BSON 大小的文档的更改事件的策略。将属性设置为以下值之一
|
|||
0 |
已存储文档的最大允许大小(以字节为单位),以便处理更改事件。这包括数据库操作之前和之后的大小,更具体地说,它限制了 MongoDB 更改事件的 fullDocument 和 fullDocumentBeforeChange 字段的大小。 |
|||
|
指定 oplog/变更流游标等待服务器生成结果的最大毫秒数,然后才会导致执行超时异常。值为 |
|||
无默认值 |
用于将信号发送到连接器的数据集合的完全限定名称。使用以下格式指定集合名称: |
|||
source (源) |
为连接器启用的信号通道名称列表。默认情况下,以下通道可用:
|
|||
无默认值 |
为连接器启用的通知通道名称列表。默认情况下,以下通道可用:
|
|||
|
在增量快照块期间,连接器获取并加载到内存中的最大文档数。增加块大小可以提高效率,因为快照执行的较少的大块快照查询。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。 |
|||
|
指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
|
|||
|
用于确定数据更改、模式更改、事务、心跳事件等的 TopicNamingStrategy 类的名称,默认为 |
|||
|
指定主题名称的分隔符,默认为 |
|||
|
用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。 |
|||
|
控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下: |
|||
|
控制连接器发送事务元数据消息的主题的名称。主题名称的模式为: |
|||
|
定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如: 连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称。 |
|||
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|||
|
指定连接器可以捕获的最大集合数。超过此限制将触发 |
|||
|
指定当连接器捕获的集合数量超过
|
|||
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
用于配置 MongoDB 连接器如何与 Kafka 信号主题交互的直通属性
Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。
下表描述了 Kafka signal 属性。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
<topic.prefix>-signal |
连接器监视的用于临时信号的 Kafka 主题的名称。
|
|||
kafka-signal |
Kafka 消费者使用的组 ID 的名称。 |
|||
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 |
|||
|
一个整数值,指定连接器在轮询信号时等待的最大毫秒数。 |
用于配置 MongoDB 连接器接收通知通道的直通属性
下表描述了可用于配置 Debezium sink notification 通道的属性。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
接收 Debezium 通知的主题名称。当您将 |
监控
除了 Kafka 和 Kafka Connect 内置的 JMX 指标支持外,Debezium MongoDB 连接器还提供两种类型的指标。
有关如何通过 JMX 公开这些指标的详细信息,请参阅 Debezium 监控文档。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,MongoDB 连接器使用以下 MBean 名称进行流式传输指标
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
快照指标
MBean 是 debezium.mongodb:type=connector-metrics,context=snapshot,server=<topic.prefix>,task=<task.id>。
除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。
下表列出了可用的快照指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个快照事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。 |
|
|
包含在快照中的表总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否已启动。 |
|
|
快照是否已暂停。 |
|
|
快照是否被中止。 |
|
|
快照是否已完成。 |
|
|
快照是否被跳过。 |
|
|
快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。 |
|
|
快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。 |
|
|
包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。 |
|
|
队列的最大缓冲区(字节)。如果 |
|
|
队列中记录的当前字节卷。 |
Debezium MongoDB 连接器还提供以下自定义快照指标
| Attribute | Type | 描述 |
|---|---|---|
|
|
数据库断开连接的数量。 |
流式传输指标
MBean 是 debezium.mongodb:type=connector-metrics,context=streaming,server=<topic.prefix>,task=<task.id>。
下表列出了可用的流式传输指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个流式传输事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 |
|
|
自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。 |
|
|
自上次启动或重置指标以来,连接器处理的总创建事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总更新事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总删除事件数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
收到的最后一个事件的坐标。 |
|
|
已处理的最后一个事务的事务标识符。 |
|
|
队列的最大缓冲区(字节)。如果 |
|
|
队列中记录的当前字节卷。 |
Debezium MongoDB 连接器还提供以下自定义流式传输指标
| Attribute | Type | 描述 |
|---|---|---|
|
|
数据库断开连接的数量。 |
|
|
主节点选举的数量。 |
MongoDB 连接器常见问题
Debezium 是一个分布式系统,可捕获多个上游数据库中的所有更改,并且永远不会遗漏或丢失任何事件。当系统正常运行并得到妥善管理时,Debezium 可为每个变更事件提供精确一次的传递。
如果发生故障,系统不会丢失任何事件。但是,在从故障中恢复的过程中,它可能会重复某些变更事件。在这种情况下,Debezium 和 Kafka 一样,为变更事件提供至少一次的传递。
本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。
配置和启动错误
在以下情况下,连接器在尝试启动时会失败,在日志中报告错误或异常,并停止运行:
-
连接器的配置无效。
-
连接器无法使用指定的连接参数成功连接到 MongoDB。
发生故障后,连接器将尝试使用指数退避机制重新连接。您可以配置最大重连尝试次数。
在这些情况下,错误将包含有关问题的更多详细信息,并可能提供建议的解决方法。当配置已更正或 MongoDB 问题已解决后,可以重新启动连接器。
Debezium 运行时 MongoDB 不可用
一旦连接器运行,如果任何 MongoDB 副本集的主节点变得不可用或无法访问,连接器将反复尝试重新连接到主节点,使用指数退避机制以防止网络或服务器过载。如果主节点在可配置的连接尝试次数后仍不可用,连接器将失败。
尝试重新连接由三个属性控制
-
connect.backoff.initial.delay.ms- 第一次尝试重新连接之前的延迟,默认为 1 秒(1000 毫秒)。 -
connect.backoff.max.delay.ms- 尝试重新连接之前的最大延迟,默认为 120 秒(120,000 毫秒)。 -
connect.max.attempts- 在产生错误之前的最大尝试次数,默认为 16。
每次延迟是前一次延迟的两倍,直到达到最大延迟。考虑到默认值,下表显示了每次连接失败尝试的延迟以及失败前的累计总时间。
| 重连尝试次数 | 尝试前的延迟(秒) | 尝试前的总延迟(分钟和秒) |
|---|---|---|
1 |
1 |
00:01 |
2 |
2 |
00:03 |
3 |
4 |
00:07 |
4 |
8 |
00:15 |
5 |
16 |
00:31 |
6 |
32 |
01:03 |
7 |
64 |
02:07 |
8 |
120 |
04:07 |
9 |
120 |
06:07 |
10 |
120 |
08:07 |
11 |
120 |
10:07 |
12 |
120 |
12:07 |
13 |
120 |
14:07 |
14 |
120 |
16:07 |
15 |
120 |
18:07 |
16 |
120 |
20:07 |
连接器无法启动 - InvalidResumeToken 或 ChangeStreamHistoryLost
停止很长时间后停止的连接器无法启动,并报告以下异常
Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog
前面的异常表明对应于连接器恢复令牌的条目已不再存在于 oplog 中。由于 oplog 不再包含连接器处理的最后一个偏移量,因此连接器无法恢复流式传输。
您可以使用以下任一选项从故障中恢复
-
删除失败的连接器,然后创建一个具有相同配置但名称不同的新连接器。
-
暂停连接器,然后删除偏移量,或更改偏移量主题。
为防止与丢失的恢复令牌相关的故障,请优化 oplog 配置。
Kafka Connect 进程正常停止
如果 Kafka Connect 在分布式模式下运行,并且 Kafka Connect 进程正常停止,那么在进程关闭之前,Kafka Connect 会将该进程的所有连接器任务迁移到该组中的另一个 Kafka Connect 进程,新连接器任务将从之前任务中断的地方继续。在连接器任务正常停止并在新进程上重新启动期间会有短暂的处理延迟。
如果组中只有一个进程,并且该进程正常停止,那么 Kafka Connect 将停止连接器并记录每个副本集的最后一个偏移量。重新启动后,副本集任务将从它们停止的地方继续。
Kafka Connect 进程崩溃
如果 Kafka Connector 进程意外停止,那么它正在运行的任何连接器任务将在未记录其最近处理的偏移量的情况下终止。当 Kafka Connect 在分布式模式下运行时,它将在其他进程上重新启动这些连接器任务。但是,MongoDB 连接器将从早期进程记录的最后一个偏移量恢复,这意味着新的替换任务可能会生成一些在崩溃前刚刚处理过的相同变更事件。重复事件的数量取决于偏移量刷新周期和崩溃前的数据更改量。
|
由于在从故障中恢复期间可能会重复某些事件,因此消费者应始终预期可能会有重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。 Debezium 还在每个变更事件消息中包含有关事件来源的特定于源的信息,包括 MongoDB 事件的唯一事务标识符( |
Kafka 不可用
当连接器生成变更事件时,Kafka Connect 框架使用 Kafka producer API 将这些事件记录到 Kafka 中。Kafka Connect 还将定期记录这些变更事件中出现的最后一个偏移量,频率由您在 Kafka Connect 工作进程配置中指定。如果 Kafka brokers 不可用,运行连接器的 Kafka Connect 工作进程将简单地反复尝试重新连接到 Kafka brokers。换句话说,连接器任务将暂停,直到可以重新建立连接,此时连接器将从中断的地方继续。
如果 snapshot.mode 设置为 initial,并且连接器长时间停止后,连接器将失败
如果连接器被正常停止,用户可以继续在副本集成员上执行操作。连接器离线期间发生的更改将继续记录在 MongoDB 的 oplog 中。在大多数情况下,连接器重新启动后,它会读取 oplog 中的偏移量值,以确定它为每个副本集流式传输的最后一个操作,然后从该点恢复流式传输。重新启动后,连接器离线期间发生的数据库操作将像往常一样发送到 Kafka,并在一段时间后,连接器将赶上数据库。连接器赶上数据库所需的时间取决于 Kafka 的功能和性能以及数据库中发生的更改量。
但是,如果连接器停止的时间过长,可能会发生 MongoDB 在连接器不活动期间清除 oplog 的情况,导致丢失有关连接器最后一个位置的信息。连接器重新启动后,由于 oplog 中不再包含标记连接器处理的最后一个操作的先前偏移量值,因此连接器无法恢复流式传输。连接器也无法执行快照,尽管通常当 snapshot.mode 属性设置为 initial 且没有偏移量值时,它会这样做。在这种情况下,会出现不匹配,因为 oplog 不包含先前偏移量的值,但该偏移量值存在于连接器的内部 Kafka 偏移量主题中。这将导致错误,连接器将失败。
要从故障中恢复,请删除失败的连接器,然后创建一个具有相同配置但名称不同的新连接器。启动新连接器时,它将执行快照以摄取数据库的状态,然后恢复流式传输。