MongoDB 新文档状态提取

Debezium MongoDB 连接器会发出数据更改消息,以表示 MongoDB 集合中发生的每个操作。这些事件消息的复杂结构忠实地表示了原始数据库事件的详细信息。但是,某些下游使用者可能无法处理其原始格式的消息。例如,为了在数据集合中表示嵌套文档,连接器会以包含嵌套字段的格式发出事件消息。为了支持无法处理原始消息层次结构的接收器连接器或其他使用者,您可以使用 Debezium MongoDB 事件展平(ExtractNewDocumentState)单消息转换 (SMT)。SMT 简化了原始消息的结构,还可以以其他方式修改消息,使数据更易于处理。

事件展平转换是一个 Kafka Connect SMT

本章中的信息仅描述了 Debezium MongoDB 连接器的事件展平单消息转换 (SMT)。有关适用于关系数据库的等效 SMT 的信息,请参阅 New Record State Extraction SMT 的文档

更改事件结构

Debezium MongoDB 连接器会生成具有复杂结构的更改事件。每个事件消息都包含以下部分:

源元数据

包括但不限于以下字段:

  • 更改集合中数据的操作类型(创建/插入、更新或删除)。

  • 发生更改的数据库和集合名称。

  • 标识更改发生时间的时戳。

  • 可选的事务信息。

文档数据
before 数据

当连接器的 capture.mode 设置为以下值之一时,在运行 MongoDB 6.0 及更高版本的环境中存在此字段:

  • change_streams_with_pre_image.

  • change_streams_update_full_with_pre_image.

    有关更多信息,请参阅 MongoDB pre-image 支持

after 数据

表示当前操作后文档中存在的 JSON 字符串。事件消息中 after 字段的存在取决于事件类型和连接器配置。MongoDB insert 操作的 create 事件始终包含 after 字段,无论 capture.mode 设置如何。对于 update 事件,仅当 capture.mode 设置为以下值之一时,才存在 after 字段:

  • change_streams_update_full

  • change_streams_update_full_with_pre_image.

    更改事件消息中的 after 值不一定代表事件刚发生后的文档状态。该值不是动态计算的;相反,在连接器捕获更改事件后,它会查询集合以检索文档的当前值。

    例如,想象一种情况,其中多个操作 a、b 和 c 在极短的时间内修改了一个文档。当连接器处理更改 a 时,它会查询集合以获取完整文档。与此同时,发生了更改 b 和 c。当连接器收到对其查询的更改 a 的完整文档的响应时,它可能会收到基于后续更改 b 或 c 的文档版本。有关更多信息,请参阅 capture.mode 属性的文档。

以下片段显示了连接器在 MongoDB insert 操作后发出的 create 更改事件的基本结构:

{
  "op": "c",
  "after": "{\"field1\":\"newvalue1\",\"field2\":\"newvalue1\"}",
  "source": { ... }
}

上例中 after 字段的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,某些使用者无法处理包含嵌套值的消息。为了将原始消息的复杂嵌套字段转换为更简单、更通用的结构,请使用 MongoDB 的事件展平 SMT。SMT 展平消息中嵌套字段的结构,如下例所示:

{
  "field1" : "newvalue1",
  "field2" : "newvalue2"
}

有关 Debezium MongoDB 连接器生成的默认消息结构的更多信息,请参阅 连接器文档

行为

MongoDB 的事件展平 SMT 会从 Debezium MongoDB 连接器发出的 createupdate 更改事件消息中提取 after 字段。SMT 处理完原始更改事件消息后,会生成一个简化的版本,其中仅包含 after 字段的内容。

根据您的用例,您可以将 ExtractNewDocumentState SMT 应用于 Debezium MongoDB 连接器,或者应用于消耗 Debezium 连接器生成的那些消息的接收器连接器。如果您将 SMT 应用于 Debezium MongoDB 连接器,SMT 会在连接器发出的消息发送到 Apache Kafka 之前对其进行修改。为了确保 Kafka 以其原始格式保留完整的 Debezium 更改事件消息,请将 SMT 应用于接收器连接器。

当您使用事件展平 SMT 处理从 MongoDB 连接器发出的消息时,SMT 会将原始消息中的记录结构转换为可以被典型接收器连接器使用的、具有正确类型的 Kafka Connect 记录。例如,SMT 将原始消息中代表 after 信息的 JSON 字符串转换为任何使用者都可以处理的模式结构。

可选地,您可以配置 MongoDB 的事件展平 SMT,以便在处理过程中以其他方式修改消息。有关更多信息,请参阅 配置主题

配置

为消费 Debezium MongoDB 连接器发出的消息的接收器连接器配置 MongoDB 的事件展平(ExtractNewDocumentState)SMT。

基本配置

要获得 SMT 的默认行为,请在不指定任何选项的情况下将 SMT 添加到接收器连接器的配置中,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState

与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个、用逗号分隔的 SMT 别名。Kafka Connect 按列出的顺序应用您指定的转换。

您可以为使用 MongoDB 事件展平 SMT 的连接器设置多个选项。以下示例显示了为连接器设置 delete.tombstone.handling.modeadd.headers 选项的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.delete.tombstone.handling.mode=drop
transforms.unwrap.add.headers=op

有关上例中配置选项的更多信息,请参阅 配置主题

自定义配置

连接器可能发出多种类型的事件消息(例如,心跳消息、墓碑消息或关于事务的元数据消息)。要将转换应用于事件的子集,您可以定义 SMT 谓词语句,以选择性地将转换应用于特定事件

数组编码

默认情况下,事件展平 SMT 将 MongoDB 数组转换为与 Apache Kafka Connect 或 Apache Avro 模式兼容的数组。虽然 MongoDB 数组可以包含多种类型的元素,但 Kafka 数组中的所有元素必须是同一类型。

为确保 SMT 以满足您环境需求的方式编码数组,您可以指定 array.encoding 配置选项。以下示例显示了设置数组编码的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.array.encoding=<array|document>

根据配置,SMT 使用以下编码方法之一处理源消息中数组的每个实例:

数组编码

如果 array.encoding 设置为 array(默认值),SMT 会使用 array 数据类型对原始消息中的数组进行编码。为确保正确处理,数组实例中的所有元素都必须是同一类型。此选项是限制性的,但它使下游客户端能够轻松处理数组。

文档编码

如果 array.encoding 设置为 document,SMT 会将源中的每个数组转换为 **struct** 的 **structs**,方式类似于 BSON 序列化。主 **struct** 包含名为 _0_1_2 等的字段,其中每个字段名代表原始数组中元素的索引。SMT 使用它从源数组中检索到的等效元素的值来填充每个索引字段。索引名称前缀为下划线,因为 Avro 编码禁止以数字字符开头的字段名。

以下示例显示了 Debezium MongoDB 连接器如何表示包含具有异构数据类型的数组的数据库文档:

示例 1。示例:包含多种数据类型的数组的文档编码
{
    "_id": 1,
    "a1": [
        {
            "a": 1,
            "b": "none"
        },
        {
            "a": "c",
            "d": "something"
        }
    ]
}

如果 array.encoding 设置为 document,SMT 会将上述文档转换为以下格式:

{
    "_id": 1,
    "a1": {
        "_0": {
            "a": 1,
            "b": "none"
        },
        "_1": {
            "a": "c",
            "d": "something"
        }
    }
}

document 编码选项使 SMT 能够处理由异构元素组成的任意数组。但是,在使用此选项之前,请务必验证接收器连接器和其他下游使用者是否能够处理包含多种数据类型的数组。

展平 MongoDB 事件消息中的嵌套结构

当数据库操作涉及嵌入式文档时,Debezium MongoDB 连接器会发出一个 Kafka 事件记录,其结构反映了原始文档的层次结构。也就是说,事件消息将嵌套文档表示为一组嵌套的字段结构。在下游连接器无法处理包含嵌套结构的消息的环境中,您可以配置事件展平 SMT 来展平消息中的层次结构。扁平化的消息结构更适合表式存储。

要配置 SMT 以展平嵌套结构,请将 flatten.struct 配置选项设置为 true。在转换后的消息中,字段名构造得与文档源一致。SMT 通过将父文档字段名与嵌套文档字段名连接起来来重命名每个展平后的字段。由 flatten.struct.delimiter 选项定义的定界符会分隔名称的各个部分。struct.delimiter 的默认值为下划线字符 (_)。

以下示例显示了指定 SMT 是否展平嵌套结构的配置:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.flatten.struct=<true|false>
transforms.unwrap.flatten.struct.delimiter=<string>

以下示例显示了 MongoDB 连接器发出的事件消息。该消息包含字段 a,其中包含字段 bc 两个嵌套文档:

{
    "_id": 1,
    "a": {
            "b": 1,
            "c": "none"
    },
    "d": 100
}

以下示例中的消息显示了 MongoDB 的 SMT 展平前一条消息中的嵌套结构后的输出:

{
    "_id": 1,
    "a_b": 1,
    "a_c": "none",
    "d": 100
}

在生成的消息中,原始消息中嵌套的 bc 字段被展平并重命名。重命名的字段是通过将父文档 a 的名称与嵌套文档的名称(a_ba_c)连接起来形成的。新字段名的各个部分由下划线字符分隔,如下 struct.delimiter 配置属性的设置所定义。

MongoDB $unset 处理

在 MongoDB 中,$unset 操作符和 $rename 操作符都会从文档中删除字段。由于 MongoDB 集合是无模式的,因此在更新删除文档中的字段后,无法从更新后的文档推断出缺失字段的名称。为了支持可能需要有关已删除字段信息的接收器连接器或其他使用者,Debezium 会发出更新消息,其中包含一个 removedFields 元素,列出已删除字段的名称。

以下示例显示了导致字段 a 被删除的操作的更新消息的一部分:

"payload": {
  "op": "u",
  "ts_ms": "...",
  "ts_us" : "...",
  "ts_ns" : "...",
  "before": "{ ... }",
  "after": "{ ... }",
  "updateDescription": {
    "removedFields": ["a"],
    "updatedFields": null,
    "truncatedArrays": null
  }
}

在上例中,beforeafter 表示源文档在更新之前和之后的状态。这些字段仅在连接器的 capture.mode 设置如下时才会出现在连接器发出的事件消息中:

before 字段

提供更改前的文档状态。仅当 capture.mode 设置为以下值之一时,才存在此字段:

  • change_streams_with_pre_image

  • change_streams_update_full_with_pre_image.

after 字段

提供更改后的文档的完整状态。仅当 capture.mode 设置为以下值之一时,才存在此字段:

  • change_streams_update_full

  • change_streams_update_full_with_pre_image.

假设连接器配置为捕获完整文档,当 ExtractNewDocumentState SMT 收到 $unset 事件的 update 消息时,SMT 会通过将已删除的字段表示为 null 值来重新编码消息,如下例所示:

{
    "id": 1,
    "a": null
}

对于未配置为捕获完整文档的连接器,当 SMT 收到 $unset 操作的更新事件时,它会产生以下输出消息:

{
   "a": null
}

确定原始数据库操作的类型

SMT 展平事件消息后,生成的消息将不再指示生成事件的操作是 createupdate 还是初始快照 read 类型。通常,您可以识别 delete 操作,方法是配置连接器以公开有关伴随删除的墓碑或重写事件的信息。有关配置连接器以公开事件消息中墓碑和重写信息的信息,请参阅 delete.tombstone.handling.mode 属性。

要在事件消息中报告数据库操作的类型,SMT 可以将 op 字段添加到以下元素之一:

  • 事件消息正文。

  • 消息头。

例如,要添加一个显示原始操作类型的标头属性,请添加转换,然后将 add.headers 属性添加到连接器配置中,如下例所示:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.headers=op

基于前面的配置,SMT 通过将 op 标头添加到消息并为其分配字符串值来标识操作的类型,从而报告事件类型。分配的字符串值基于原始 MongoDB 更改事件消息中的 op 字段值。

添加元数据字段

MongoDB 的事件展平 SMT 可以将原始更改事件消息中的元数据字段添加到简化后的消息中。添加的元数据字段前缀为双下划线 ("__")。将元数据添加到事件记录中,可以包含诸如更改事件发生的集合名称,或者包含连接器特定的字段(如副本集名称)的内容。目前,SMT 只能添加来自以下更改事件子结构的字段:sourcetransactionupdateDescription

有关 MongoDB 更改事件结构的更多信息,请参阅 MongoDB 连接器文档

例如,您可以指定以下配置,将更改事件的副本集名称 (rs) 和集合名称添加到最终的展平事件记录中:

transforms=unwrap,...
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState
transforms.unwrap.add.fields=rs,collection

前面的配置导致以下内容被添加到展平记录中:

{ "__rs" : "rs0", "__collection" : "my-collection", ... }

如果您希望 SMT 将元数据字段添加到 delete 事件,请将 delete.tombstone.handling.mode 选项的值设置为 rewrite

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。

有关如何选择性地应用 SMT 的更多信息,请参阅 为转换配置 SMT 谓词

配置选项

下表描述了 MongoDB 事件展平 SMT 的配置选项。

属性 Default (默认值) 描述

array

指定 SMT 在编码从原始事件消息读取的数组时使用的格式。设置以下选项之一:

array

SMT 使用 array 数据类型将 MongoDB 数组编码为与 Apache Kafka Connect 或 Apache Avro 模式兼容的格式。如果设置此选项,请验证每个数组实例中的元素是否为同一类型。虽然 MongoDB 允许数组包含多种数据类型,但某些下游客户端无法处理数组。

document

SMT 将每个 MongoDB 数组转换为 **struct** 的 **structs**,方式类似于 BSON 序列化。主 **struct** 包含字段名 _0_1_2 等。为了符合 Avro 命名标准,SMT 会在每个索引字段的数字名称前加上下划线。每个数字字段名代表原始数组中元素的索引。SMT 使用它从源文档中检索到的值来填充每个索引字段,代表指定的数组元素。

有关 array.coding 选项的更多信息,请参阅 MongoDB 事件消息中数组编码的选项

false

SMT 通过将消息中嵌套属性的名称连接起来(由可配置的定界符分隔),来展平原始事件消息中的结构(struct),从而形成一个简单的字段名。

_

flatten.struct 设置为 true 时,指定转换在连接输入记录中的字段名以在输出记录中生成字段名时插入的定界符。

tombstone

Debezium 会为每个 DELETE 操作生成一个更改事件记录。此设置决定了 MongoDB 事件展平 SMT 如何处理流中的 DELETE 事件。设置以下选项之一:

drop

SMT 从流中删除 DELETE 事件和 TOMBSTONE 记录。

tombstone (默认)

SMT 会将 TOMBSTONE 记录保留在流中。TOMBSTONE 记录仅包含以下值:"value": "null"

rewrite

SMT 会将更改事件记录保留在流中并进行以下更改:

  • 向记录添加一个 value 字段,其中包含原始记录的 before 字段中的键/值对。

  • 向记录的 value 添加 __deleted: true

  • 删除 TOMBSTONE 记录。

    此设置提供了另一种指示记录已被删除的方式。

rewrite-with-tombstone

SMT 的行为与选择 rewrite 选项时相同,不同之处在于它还会保留 TOMBSTONE 记录。

false

当设置为 truedelete.tombstone.handling.moderewrite 时,会将 id 字段从键复制到有效载荷中,作为 _id,用于删除事件。

__ (双下划线)

设置此可选字符串以在标题前添加前缀。

无默认值

指定一个逗号分隔的列表(无空格),包含您希望 SMT 添加到简化消息头部的元数据字段。当原始消息包含重复的字段名时,您可以通过提供结构名称和字段名称来识别要修改的特定字段,例如 source.ts_ms

可选地,您可以通过在列表中添加以下格式的条目来覆盖字段的原始名称并为其分配新名称:

<field_name>:<new_field_name>.

For example (例如:)

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 将元数据字段添加到简化消息的头部时,它会在每个元数据字段名之前加上双下划线。对于结构规范,SMT 还会在结构名称和字段名之间插入一个下划线。

如果您指定了一个不在更改事件原始消息中的字段,SMT 不会将该字段添加到头部。

__ (双下划线)

指定一个可选字符串,用于为字段名添加前缀。

无默认值

将此选项设置为一个逗号分隔的列表(无空格),其中包含要添加到简化 Kafka 消息的 value 元素中的元数据字段。当原始消息包含重复的字段名时,您可以通过提供结构名称和字段名称来识别要修改的特定字段,例如 source.ts_ms
可选地,您可以通过在列表中添加以下格式的条目来覆盖字段的原始名称并为其分配新名称:

<field_name>:<new_field_name>.

For example (例如:)

version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP

您指定的新名称值区分大小写。

当 SMT 将元数据字段添加到简化消息的 value 元素时,它会在每个元数据字段名之前加上双下划线。对于结构规范,SMT 还会在结构名称和字段名之间插入一个下划线。

如果您指定了一个在原始更改事件消息中不存在的字段,SMT 仍然会将指定的字段添加到修改后的消息的 value 元素中。

已知限制

  • 由于 MongoDB 是一个无模式数据库,为了在使用 Debezium 将更改流式传输到基于模式的数据关系数据库时确保一致的列定义,集合中的同名字段必须存储相同类型的数据。

  • 将 SMT 配置为生成与接收器连接器兼容格式的消息。如果接收器连接器需要“展平”消息结构,但它接收的消息将源 MongoDB 文档中的数组编码为 struct of structs,则接收器连接器无法处理该消息。