Debezium MongoDB 连接器
概述
Debezium MongoDB 连接器从 Apache Kafka 主题捕获变更事件记录,然后将这些记录转换为 MongoDB 文档,并写入指定 MongoDB 接收数据库的集合中。对于需要高可伸缩性和快速数据检索的应用程序,将变更数据传播到基于集群的 MongoDB 环境(该环境使用分片和副本集等功能来优化读取操作)可以显著提高检索性能。该连接器只能处理源自 Debezium 关系数据库连接器的变更事件。
有关此连接器兼容的 MongoDB 版本的信息,请参阅 Debezium 版本概述。
架构及工作原理
使用 Debezium MongoDB 连接器将变更数据捕获 (CDC) 事件记录从 Kafka 主题流式传输到 MongoDB 接收数据库。该连接器订阅由 Debezium 关系数据库源连接器生成事件消息的 Kafka 主题。每个事件消息都以结构化格式描述数据库操作(插入、更新或删除),并捕获事件的详细信息。该连接器将传入的变更事件记录转换为 MongoDB 文档格式,然后将生成的文档写入目标 MongoDB 集合。
收到事件后,该连接器会解析事件载荷,并确定要将其发送到哪个 MongoDB 集合。根据事件载荷中指定的事件类型,该连接器然后在目标集合中执行以下操作之一:
| 载荷中的事件类型 | 生成的相应操作 |
|---|---|
INSERT |
创建文档 |
UPDATE |
修改指定标识符的文档。 |
DELETE |
移除指定标识符的文档。 |
该连接器使用 MongoDB Java 驱动程序与 MongoDB 数据库进行交互。
主题与 MongoDB 集合之间的映射来自连接器配置。文档键用作文档的唯一标识符,确保更新、插入和删除操作能够传播到正确的 MongoDB 文档和集合,并且操作按正确的顺序应用。
通过此将事件消息映射到 MongoDB 文档的过程,该连接器能够将关系数据库表中状态反映到 MongoDB 数据库的集合中。
限制
Debezium MongoDB 连接器具有以下限制:
- 仅支持关系数据库 / RDBMS 源连接器
-
MongoDB 连接器只能消耗源自以下关系数据库的 Debezium 连接器的变更事件:
-
MariaDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
该连接器无法处理来自任何其他 Debezium 连接器的变更事件消息,包括 Debezium MongoDB 源连接器。
-
- 模式演进
-
尽管连接器可以处理基本的模式更改,但高级模式演化场景可能需要手动干预或特定配置。由于 MongoDB 是无模式的,它处理模式演化的能力非常有限。
- 事务支持
-
连接器根据源系统中操作的提交顺序,按时间顺序处理单个变更事件。尽管 MongoDB 支持事务,但 Debezium MongoDB 连接器不提供跨多个 CDC 事件或单个接收任务内的多个文档的事务保证。
快速入门(使用 Kafka Connect)
部署 MongoDB 连接器的基本实例以进行测试。
以下组件在您的环境中可用且正在运行:
-
Kafka 集群
-
Kafka Connect
-
MongoDB 实例。
-
Debezium 关系数据库连接器
-
Debezium MongoDB 连接器
-
配置并启动一个 Debezium 源连接器,例如 Debezium PostgreSQL 连接器,将更改从关系数据库流式传输到 Kafka。
-
配置并启动 Debezium MongoDB 连接器,以消耗源连接器发送到 Kafka 的事件,并将它们发送到 MongoDB 接收数据库。
以下示例提供了 Debezium MongoDB 连接器的最小配置。请将示例中的占位符替换为您的实际环境值。
{ "name": "mongodb-sink-connector", "config": { "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector", "topics.regex": "server1\.inventory\..*", "mongodb.connection.string": "mongodb://:27017", "sink.database": "debezium" } }
配置
MongoDB 连接器接受各种配置选项,如下表所述。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
无默认值 |
必须设置为 |
|
1 |
最大任务数。 |
|
无默认值 |
要从中消耗的 Kafka 主题列表。如果此值设置为 |
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
接收器用于连接到 MongoDB 的 MongoDB 连接字符串(URI)。此 URI 遵循标准的 MongoDB 连接字符串格式。 示例: |
|
无默认值 |
目标 MongoDB 数据库的名称。 |
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
指定连接器用于从 Kafka 主题名称派生目标 MongoDB 集合名称的策略。 指定以下值之一:
|
|
|
用于从 Kafka 主题名称派生目标集合名称的模板。 |
|
|
指定连接器用于命名目标集合中列的策略。 指定以下值之一:
|
| 属性 | Default (默认值) | 描述 |
|---|---|---|
空字符串 |
一个可选的、逗号分隔的字段名列表,用于匹配要包含在变更事件值中的字段的完全限定名。字段的完全限定名形式为 |
|
空字符串 |
一个可选的、逗号分隔的字段名列表,用于匹配要从变更事件值中排除的字段的完全限定名。字段的完全限定名形式为 如果在此配置中包含此属性,请不要设置 |
|
2048 |
单次批次写入的最大记录数。 |
示例配置
以下示例展示了如何配置连接器,以从 dbserver1.inventory 数据库的三个特定主题读取变更事件,并将它们修改到名为 debezium 的 MongoDB 接收数据库的集合中。
{
"name": "mongodb-sink-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
"topics": "dbserver1.inventory.customers,dbserver1.inventory.orders,dbserver1.inventory.products",
"mongodb.connection.string": "mongodb://:27017",
"sink.database": "debezium"
}
}
关键字段映射
当连接器处理事件时,它会将数据映射到目标 MongoDB 文档中的特定字段。
-
默认情况下,来自 Debezium 变更事件的键(例如 Kafka 消息键)会被映射到 MongoDB 的
_id字段。 -
值被映射到 MongoDB 文档中。
-
更新和删除操作是根据关键字段映射解析的。
以下示例显示了 Kafka 主题中的事件键
{
"userId": 1,
"orderId": 1
}
根据映射逻辑,上述键被映射到 MongoDB 文档中的 _id 字段,如下例所示
{
"_id": {
"userId": 1,
"orderId": 1
}
}
将 CloudEvents 与 Debezium MongoDB 连接器配合使用
Debezium MongoDB 连接器可以消费序列化为 CloudEvents 的记录。Debezium 可以以 CloudEvents 格式发出变更事件,以便事件载荷被封装在标准化的信封中。
当您在源连接器上启用 CloudEvents 时,MongoDB 连接器会解析 CloudEvents 信封。
实际的 Debezium 事件载荷从数据部分提取。
然后,事件将按照标准的插入、更新或删除语义应用于目标 MongoDB 集合。
此过程使得 Debezium 能够与更广泛的事件驱动系统集成,同时仍将生成的事件持久化到 MongoDB 中。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
通过将此模式与模式名称匹配来识别 CloudEvents 消息的正则表达式模式。 |