您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

MongoDB 新文档状态提取

Debezium MongoDB 连接器会发出数据更改消息,以表示 MongoDB 集合中发生的每个操作。这些事件消息的复杂结构忠实地表示了原始数据库事件的详细信息。但是,某些下游使用者可能无法处理原始格式的消息。例如,为了在数据集合中表示嵌套文档,连接器会发出包含嵌套字段的格式的事件消息。为了支持无法处理原始消息分层格式的接收器连接器或其他使用者,您可以使用 Debezium MongoDB 事件展平(ExtractNewDocumentState)单一消息转换(SMT)。SMT 简化了原始消息的结构,还可以以其他方式修改消息,以使数据更易于处理。

事件展平转换是一个 Kafka Connect SMT

本章中的信息仅描述了 Debezium MongoDB 连接器的事件展平单一消息转换(SMT)。有关可用于关系数据库的等效 SMT 的信息,请参阅New Record State Extraction SMT 的文档

更改事件结构

Debezium MongoDB 连接器生成具有复杂结构的更改事件。每个事件消息包含以下部分:

源元数据

包括但不限于以下字段:

  • 更改集合中数据的操作类型(创建/插入、更新或删除)。

  • 发生更改的数据库和集合的名称。

  • 标识更改发生时间的时戳。

  • 可选的事务信息。

文档数据
before 数据

当 Debezium 连接器的 capture.mode 设置为以下值之一时,在运行 MongoDB 6.0 及更高版本的环境中存在此字段:

  • change_streams_with_pre_image.

  • change_streams_update_full_with_pre_image.

    有关更多信息,请参阅MongoDB pre-image 支持

after 数据

表示当前操作后文档中存在的 JSON 字符串。事件消息中 after 字段的存在取决于事件类型和连接器配置。MongoDB insert 操作的 create 事件始终包含 after 字段,无论 capture.mode 设置如何。对于 update 事件,仅当 capture.mode 设置为以下值之一时,才存在 after 字段:

  • change_streams_update_full

  • change_streams_update_full_with_pre_image.

    更改事件消息中的 after 值不一定表示事件发生后文档的状态。该值不是动态计算的;相反,在连接器捕获更改事件后,它会查询集合以检索文档的当前值。

    例如,假设存在一个情况,其中多个操作 abc 在短时间内修改了同一个文档。当连接器处理更改 a 时,它会查询集合以获取完整的文档。在此期间,发生了更改 bc。当连接器收到对其为更改 a 查询完整文档的响应时,它可能会收到基于后续更改 bc 的文档版本。有关更多信息,请参阅capture.mode 属性的文档。

以下片段显示了连接器在 MongoDB insert 操作后发出的 create 更改事件的基本结构。

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

前面示例中 after 字段的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,某些使用者无法处理包含嵌套值的消息。要将原始消息的复杂嵌套字段转换为更简单、更通用的结构,请使用 MongoDB 的事件展平 SMT。SMT 展平了消息中嵌套字段的结构,如下例所示:

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

有关 Debezium MongoDB 连接器生成的默认消息结构的信息,请参阅连接器文档

行为

MongoDB 的事件展平 SMT 从 Debezium MongoDB 连接器发出的 createupdate 更改事件消息中提取 after 字段。SMT 处理原始更改事件消息后,会生成一个简化的版本,其中仅包含 after 字段的内容。

根据您的用例,您可以将 ExtractNewDocumentState SMT 应用于 Debezium MongoDB 连接器,或者应用于使用 Debezium 连接器发出的消息的接收器连接器。如果您将 SMT 应用于 Debezium MongoDB 连接器,SMT 会在将消息发送到 Apache Kafka 之前修改连接器发出的消息。要确保 Kafka 保留完整 Debezium 更改事件消息的原始格式,请将 SMT 应用于接收器连接器。

当您使用事件展平 SMT 处理从 MongoDB 连接器发出的消息时,SMT 将原始消息中的记录结构转换为可以被典型接收器连接器使用的、具有正确类型定义的 Kafka Connect 记录。例如,SMT 将原始消息中表示 after 信息的 JSON 字符串转换为任何使用者都可以处理的模式结构。

可选地,您可以配置 MongoDB 的事件展平 SMT,以便在处理过程中以其他方式修改消息。有关更多信息,请参阅配置主题

配置

为使用 Debezium MongoDB 连接器发出的消息的接收器连接器配置 MongoDB 的事件展平(ExtractNewDocumentState)SMT。

基本配置

要获得 SMT 的默认行为,请在不指定任何选项的情况下将 SMT 添加到接收器连接器的配置中,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个、用逗号分隔的 SMT 别名。Kafka Connect 会按照您指定的顺序应用转换。

您可以为使用 MongoDB 事件展平 SMT 的连接器设置多个选项。以下示例显示了一个配置,该配置设置了 delete.tombstone.handling.modeadd.headers 选项:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.delete.tombstone.handling.mode=drop
transforms.unwrap.add.headers=op

有关前一示例中配置选项的更多信息,请参阅配置主题

自定义配置

连接器可能会发出多种类型的事件消息(例如,心跳消息、墓碑消息或有关事务的元数据消息)。要将转换应用于事件的子集,您可以定义SMT 谓词语句,以选择性地将转换应用于特定事件

数组编码

默认情况下,事件展平 SMT 将 MongoDB 数组转换为与 Apache Kafka Connect 或 Apache Avro 模式兼容的数组。虽然 MongoDB 数组可以包含多种类型的元素,但 Kafka 数组中的所有元素都必须是相同类型的。

为了确保 SMT 以满足您的环境需求的方式对数组进行编码,您可以指定 array.encoding 配置选项。以下示例显示了设置数组编码的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

根据配置,SMT 使用以下编码方法之一处理源消息中数组的每个实例:

数组编码

如果 array.encoding 设置为 array(默认值),SMT 会使用 array 数据类型来编码原始消息中的数组。为了确保正确处理,数组实例中的所有元素都必须是同一类型。此选项是受限的,但它使下游客户端能够轻松处理数组。

文档编码

如果 array.encoding 设置为 document,SMT 会将源中的每个数组转换为 **struct of structs**,其方式类似于BSON 序列化。主 **struct** 包含名为 _0_1_2 等的字段,其中每个字段名代表原始数组中元素的索引。SMT 会使用它从源数组中为等效元素检索到的值来填充每个索引字段。索引名以_前缀,因为 Avro 编码禁止以数字字符开头的字段名。

以下示例显示了 Debezium MongoDB 连接器如何表示包含异构数据类型数组的数据库文档:

示例 1. 示例:包含多种数据类型的数组的文档编码
{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

如果 array.encoding 设置为 document,SMT 会将上述文档转换为以下格式:

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

document 编码选项使 SMT 能够处理由异构元素组成的任意数组。但是,在使用此选项之前,请务必验证接收器连接器和其他下游使用者是否能够处理包含多种数据类型的数组。

展平 MongoDB 事件消息中的嵌套结构

当数据库操作涉及嵌入式文档时,Debezium MongoDB 连接器会发出一个 Kafka 事件记录,其结构反映了原始文档的分层结构。也就是说,事件消息将嵌套文档表示为一组嵌套字段结构。在下游连接器无法处理包含嵌套结构的消息的环境中,您可以配置事件展平 SMT 来展平消息中的分层结构。扁平化的消息结构更适合表状存储。

要配置 SMT 以展平嵌套结构,请将 flatten.struct 配置选项设置为 true。在转换后的消息中,字段名是通过将父文档字段名与嵌套文档字段名连接起来来构建的,以与文档源保持一致。由 flatten.struct.delimiter 选项定义的定界符用于分隔名称的组成部分。struct.delimiter 的默认值是下划线字符(_)。

以下示例显示了指定 SMT 是否展平嵌套结构的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

以下示例显示了 MongoDB 连接器发出的事件消息。该消息包含一个文档 a 的字段,该文档包含两个嵌套文档 bc 的字段:

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

以下示例中的消息显示了 MongoDB 的 SMT 展平了前一条消息中的嵌套结构后的输出:

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

在结果消息中,原始消息中嵌套的 bc 字段被展平并重命名。重命名的字段是通过将父文档 a 的名称与嵌套文档的名称(a_ba_c)连接起来形成的。新字段名的组成部分由下划线字符分隔,这由 struct.delimiter 配置属性的设置定义。

MongoDB $unset 处理

在 MongoDB 中,$unset 操作符和 $rename 操作符都会从文档中删除字段。由于 MongoDB 集合是无模式的,在更新删除文档中的字段后,无法从更新后的文档中推断出缺失字段的名称。为了支持可能需要有关已删除字段信息的接收器连接器或其他使用者,Debezium 会发出包含 removedFields 元素的更新消息,其中列出了已删除字段的名称。

以下示例显示了导致字段 a 被删除的操作的更新消息的一部分:

"payload": {
  "op": "u",
  "ts_ms": "...",
  "ts_us" : "...",
  "ts_ns" : "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

在上例中,beforeafter 表示源文档在更新之前和之后的文档状态。只有当连接器的 capture.mode 设置如下时,这些字段才会出现在连接器发出的事件消息中:

before 字段

提供更改之前的文档状态。仅当 capture.mode 设置为以下值之一时,才存在此字段:

  • change_streams_with_pre_image

  • change_streams_update_full_with_pre_image.

after 字段

提供更改后的文档的完整状态。仅当 capture.mode 设置为以下值之一时,才存在此字段:

  • change_streams_update_full

  • change_streams_update_full_with_pre_image.

假设连接器配置为捕获完整文档,当 ExtractNewDocumentState SMT 收到 $unset 事件的 update 消息时,SMT 会通过将已删除字段表示为 null 值来重新编码消息,如下例所示:

{
    "id": 1,
    "a": null
}

对于未配置为捕获完整文档的连接器,当 SMT 收到 $unset 操作的更新事件时,它会产生以下输出消息:

{
   "a": null
}

确定原始数据库操作的类型

SMT 展平事件消息后,结果消息将不再指示生成事件的操作是 createupdate 还是初始快照 read 类型。通常,您可以通过配置连接器来暴露有关伴随删除的墓碑或重写事件的信息来识别 delete 操作。有关配置连接器以在事件消息中暴露有关墓碑和重写的信息的更多信息,请参阅delete.tombstone.handling.mode 属性。

要将数据库操作的类型报告在事件消息中,SMT 可以将 op 字段添加到以下元素之一:

  • 事件消息正文。

  • 消息头。

例如,要添加一个显示原始操作类型的头属性,请添加转换,然后将 add.headers 属性添加到连接器配置中,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

根据上述配置,SMT 通过将 op 头添加到消息并为其分配一个字符串值来标识操作类型,从而报告事件类型。分配的字符串值基于原始MongoDB 更改事件消息中的 op 字段值。

添加元数据字段

MongoDB 的事件展平 SMT 可以将原始更改事件消息中的元数据字段添加到简化消息中。添加的元数据字段以双下划线("__")为前缀。将元数据添加到事件记录可以包含诸如更改事件发生的集合名称之类的内容,或者包含连接器特定的字段,例如副本集名称。目前,SMT 只能从以下更改事件子结构添加字段:sourcetransactionupdateDescription

有关 MongoDB 更改事件结构的信息,请参阅MongoDB 连接器文档

例如,您可以指定以下配置,将副本集名称(rs)和更改事件的集合名称添加到最终的展平事件记录中:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

上述配置会在展平后的记录中添加以下内容:

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

如果您希望 SMT 将元数据字段添加到 delete 事件,请将 delete.tombstone.handling.mode 选项的值设置为 rewrite

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。

有关如何选择性应用 SMT 的更多信息,请参阅 为转换配置 SMT 谓词

配置选项

下表描述了 MongoDB 事件展平 SMT 的配置选项。

属性 Default (默认值) 描述

array

指定 SMT 在编码从原始事件消息中读取的数组时使用的格式。设置以下选项之一:

array

SMT 使用 array 数据类型将 MongoDB 数组编码为与 Apache Kafka Connect 或 Apache Avro 模式兼容的格式。如果您设置了此选项,请验证每个数组实例中的元素是否为同一类型。虽然 MongoDB 允许数组包含多种数据类型,但某些下游客户端无法处理数组。

document

SMT 将每个 MongoDB 数组转换为 **struct of structs**,其方式类似于BSON 序列化。主 **struct** 包含名称为 _0_1_2 等的字段。为了符合 Avro 命名标准,SMT 在每个索引字段的数字名称前加上下划线。每个数字字段名代表原始数组中元素的索引。SMT 使用从源文档中为指定的数组元素检索到的值来填充每个索引字段。

有关 array.coding 选项的更多信息,请参阅编码 MongoDB 事件消息中数组的选项

false

SMT 通过连接消息中嵌套属性的名称,并用可配置的定界符分隔,来展平原始事件消息中的结构(struct),从而形成简单的字段名。

_

flatten.struct 设置为 true 时,指定转换在连接来自输入记录的字段名以在输出记录中生成字段名时插入的定界符。

tombstone

Debezium 为每个 DELETE 操作生成一个更改事件记录。此设置确定 MongoDB 事件展平 SMT 如何处理流中的 DELETE 事件。设置以下选项之一:

drop

SMT 会从流中删除 DELETE 事件和 `TOMBSTONE` 记录。

tombstone (默认)

SMT 会将 TOMBSTONE 记录保留在流中。TOMBSTONE 记录仅包含以下值:"value": "null"

rewrite

SMT 会将更改事件记录保留在流中并进行以下更改:

  • 向记录添加一个 value 字段,其中包含原始记录的 before 字段中的键/值对。

  • 向记录的 value 添加 __deleted: true

  • 删除 TOMBSTONE 记录。

    此设置提供了另一种指示记录已被删除的方式。

rewrite-with-tombstone

SMT 的行为与选择 rewrite 选项时相同,不同之处在于它还会保留 TOMBSTONE 记录。

false

当设置为 truedelete.tombstone.handling.moderewrite 时,会将 id 字段从 key 复制并包含在 delete 事件的 payload 中,作为 _id

__ (双下划线)

设置此可选字符串以在标题前添加前缀。

无默认值

指定一个用逗号分隔的、无空格的元数据字段列表,您希望 SMT 将这些字段添加到简化消息的头中。当原始消息包含重复的字段名时,您可以通过提供结构名和字段名来标识要修改的特定字段,例如 source.ts_ms

可选地,您可以通过在列表中添加以下格式的条目来覆盖字段的原始名称并为其分配新名称:

<field_name>:<new_field_name>.

For example (例如:)

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 将元数据字段添加到简化消息的头时,它会在每个元数据字段名前加上双下划线。对于结构规范,SMT 还会在结构名称和字段名之间插入一个下划线。

如果您指定了一个不在更改事件原始消息中的字段,SMT 不会将该字段添加到头中。

__ (双下划线)

指定一个可选字符串,用于前缀字段名。

无默认值

将此选项设置为一个逗号分隔的、无空格的元数据字段列表,以添加到简化 Kafka 消息的 value 元素中。当原始消息包含重复的字段名时,您可以通过提供结构名和字段名来标识要修改的特定字段,例如 source.ts_ms
可选地,您可以通过在列表中添加以下格式的条目来覆盖字段的原始名称并为其分配新名称:

<field_name>:<new_field_name>.

For example (例如:)

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 将元数据字段添加到简化消息的 value 元素时,它会在每个元数据字段名前加上双下划线。对于结构规范,SMT 还会在结构名称和字段名之间插入一个下划线。

如果您指定了一个在原始更改事件消息中不存在的字段,SMT 仍会将指定的字段添加到修改后的消息的 value 元素中。

已知限制

  • 由于 MongoDB 是一个无模式数据库,为了在使用 Debezium 将更改流式传输到基于模式的关系数据库时确保一致的列定义,集合中的同名字段必须存储相同类型的数据。

  • 配置 SMT 以生成与接收器连接器兼容格式的消息。如果接收器连接器需要“展平”的消息结构,但它收到了一条消息,该消息将源 MongoDB 文档中的数组编码为 struct of structs,那么接收器连接器将无法处理该消息。