您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium MongoDB 汇聚连接器

概述

Debezium MongoDB 汇聚连接器从 Apache Kafka 主题捕获变更事件记录,然后将这些记录转换为 MongoDB 文档,并将其写入指定的 MongoDB 汇聚数据库中的集合。对于需要高扩展性和快速数据检索的应用程序,将变更数据传播到基于集群的 MongoDB 环境(该环境利用分片和副本集等功能来优化读取操作)可以显著提高检索性能。此连接器只能处理源自 Debezium 关系数据库连接器的变更事件。

有关此连接器兼容的 MongoDB 版本的信息,请参阅 Debezium 版本概述

架构及工作原理

使用 Debezium MongoDB 汇聚连接器,可以将变更数据捕获 (CDC) 事件记录从 Kafka 主题流式传输到 MongoDB 汇聚数据库。该连接器订阅由 Debezium 关系数据库源连接器生成的事件消息填充的 Kafka 主题。每条事件消息都以结构化格式描述数据库操作(插入、更新或删除),捕获事件的详细信息。该连接器将传入的变更事件记录转换为 MongoDB 文档格式,然后将生成的文档写入目标 MongoDB 集合。

收到事件后,连接器会解析事件负载,并确定要将事件发送到哪个 MongoDB 集合。根据事件负载中指定的事件类型,连接器会在目标集合中执行以下操作之一:

负载中的事件类型 生成的操作

INSERT

创建文档

UPDATE

修改指定标识符的文档。

DELETE

删除指定标识符的文档。

连接器使用 MongoDB Java 驱动程序与 MongoDB 数据库进行交互。

主题与 MongoDB 集合之间的映射来自连接器配置。文档键用作文档的唯一标识符,确保更新、插入和删除操作能够传播到正确的 MongoDB 文档和集合,并且操作能够按正确的顺序应用。

通过将事件消息映射到 MongoDB 文档的这个过程,连接器能够将关系数据库中表的状态镜像到 MongoDB 数据库中的集合。

限制

Debezium MongoDB 汇聚连接器存在以下限制:

仅支持关系数据库/RDBMS 源连接器

MongoDB 汇聚连接器只能消耗源自以下关系数据库的 Debezium 连接器的变更事件:

  • MariaDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

该连接器无法处理任何其他 Debezium 连接器的变更事件消息,包括 Debezium MongoDB 源连接器。

模式演进

尽管该连接器可以处理基本的模式更改,但高级的模式演进场景可能需要手动干预或特定配置。由于 MongoDB 是无模式的,它处理模式演进的能力有限。

事务支持

连接器按照操作在源系统中提交的顺序,以时间顺序处理单个变更事件。虽然 MongoDB 支持事务,但 Debezium MongoDB 连接器不提供跨多个 CDC 事件或单个汇聚任务内的多个文档的事务保证。

快速入门 (使用 Kafka Connect)

部署一个基本的 MongoDB 汇聚连接器实例以供测试。

先决条件

以下组件可在您的环境中运行:

  • Kafka 集群

  • Kafka Connect

  • MongoDB 实例。

  • Debezium 关系数据库连接器

  • Debezium MongoDB 连接器

过程
  1. 配置并启动 Debezium 源连接器,例如 Debezium PostgreSQL 连接器,以将来自关系数据库的更改流式传输到 Kafka。

  2. 配置并启动 Debezium MongoDB 汇聚连接器,以消耗源连接器发出的事件到 Kafka,并将它们发送到 MongoDB 汇聚数据库。

    以下示例提供了 Debezium MongoDB 汇聚连接器的最小配置。请将示例中的占位符替换为您的环境中的实际值。

    {
      "name": "mongodb-sink-connector",
      "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics.regex": "server1\.inventory\..*",
        "mongodb.connection.string": "mongodb://:27017",
        "sink.database": "debezium"
      }
    }

配置

MongoDB 汇聚连接器接受多种配置选项,如下表所述。

表 1. 必需的 Kafka Connect 汇聚连接器配置属性
属性 Default (默认值) 描述

connector.class

无默认值

必须设置为 io.debezium.connector.mongodb.sink.MongoDbSinkConnector

tasks.max

1

任务的最大数量。

topicstopics.regex

无默认值

要从中消耗的 Kafka 主题列表。如果将此值设置为 topics.regex,则连接器将消耗与正则表达式匹配的所有主题。

表 2. 必需的 MongoDB 连接属性
属性 Default (默认值) 描述

无默认值

汇聚用于连接 MongoDB 的 MongoDB 连接字符串 (URI)。此 URI 遵循标准的 MongoDB 连接字符串格式。

例如:mongodb://:27017/?replicaSet=my-replica-set

无默认值

目标 MongoDB 数据库的名称。

表 3. 汇聚行为配置
属性 Default (默认值) 描述

io.debezium.sink.naming.DefaultCollectionNamingStrategy

指定连接器用于从 Kafka 主题名称派生目标 MongoDB 集合名称的策略。

指定以下值之一:

io.debezium.sink.naming.DefaultCollectionNamingStrategy

连接器直接从主题名称中获取表名,并将源主题名称中的点 (.) 替换为下划线。

自定义实现

您可以提供自己的 CollectionNameStrategy 实现。

${topic}

用于从 Kafka 主题名称派生目标集合名称的模板。

io.debezium.sink.naming.DefaultColumnNamingStrategy

指定连接器用于命名目标集合中列的策略。

指定以下值之一:

io.debezium.sink.naming.DefaultColumnNamingStrategy

使用原始字段名作为列名。

自定义实现

指定自定义 CollectionNameStrategy 实现。

表 4. 通用汇聚选项
属性 Default (默认值) 描述

空字符串

一个可选的逗号分隔的字段名列表,匹配要从变更事件值中包含的字段的完全限定名。字段的完全限定名形式为 fieldNametopicName:fieldName
如果在此配置中包含此属性,请不要设置 field.exclude.list 属性。

空字符串

一个可选的逗号分隔的字段名列表,匹配要从变更事件值中排除的字段的完全限定名。字段的完全限定名形式为 fieldNametopicName:fieldName

如果在此配置中包含此属性,请不要设置 field.include.list 属性。

2048

在单个批次中写入的最大记录数。

示例配置

以下示例展示了如何配置连接器,从 dbserver1.inventory 数据库的三个特定主题读取变更事件,以修改名为 debezium 的 MongoDB 汇聚数据库中的集合。

{
    "name": "mongodb-sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.mongodb.sink.MongoDbSinkConnector",
        "topics": "dbserver1.inventory.customers,dbserver1.inventory.orders,dbserver1.inventory.products",
        "mongodb.connection.string": "mongodb://:27017",
        "sink.database": "debezium"
    }
}

监控

此版本的连接器不公开任何指标。

关键字段映射

当连接器处理事件时,它会将数据映射到目标 MongoDB 文档中的特定字段。

  • 来自 Debezium 变更事件的键(例如 Kafka 消息键)默认映射到 MongoDB _id 字段。

  • 值被映射到 MongoDB 文档中。

  • 更新和删除操作是根据关键字段映射来解析的。

以下示例显示了 Kafka 主题中的一个事件键:

{
    "userId": 1,
    "orderId": 1
}

根据映射逻辑,上述键被映射到 MongoDB 文档中的 _id 字段,如下例所示:

{
    "_id": {
        "userId": 1,
        "orderId": 1
    }
}

将 CloudEvents 与 Debezium MongoDB 汇聚连接器结合使用

Debezium MongoDB 汇聚连接器可以消耗序列化为 CloudEvents 的记录。Debezium 可以以 CloudEvents 格式发出变更事件,以便事件负载封装在标准化的信封中。

当您在源连接器上启用 CloudEvents 时,MongoDB 汇聚连接器会解析 CloudEvents 信封。

实际的 Debezium 事件负载将从数据部分提取。

然后,事件将按照标准的插入、更新或删除语义应用于目标 MongoDB 集合。

这个过程使得 Debezium 能够与更广泛的事件驱动系统集成,同时仍将生成的事件持久化到 MongoDB 中。

表 5. CloudEvents 汇聚选项
属性 Default (默认值) 描述

.*CloudEvents\.Envelope$

通过将模式名称与此模式匹配来标识 CloudEvents 消息的正则表达式模式。