MongoDB 出库事件路由器

此 SMT 仅供 Debezium MongoDB 连接器使用。有关关系数据库的 Outbox 事件路由器 SMT 的信息,请参阅 Outbox 事件路由器

出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。

要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以

  • 捕获 Outbox 集合中的更改

  • 应用 Debezium MongoDB Outbox 事件路由器单条消息转换 (SMT)

已配置为应用 MongoDB Outbox SMT 的 Debezium 连接器应仅捕获 Outbox 集合中发生的更改。有关更多信息,请参阅 选择性应用转换的选项

仅当每个 Outbox 集合具有相同的结构时,连接器才能捕获多个 Outbox 集合中的更改。

要使用此 SMT,对实际业务集合的操作以及向 Outbox 集合的插入操作必须作为多文档事务的一部分完成,这自 MongoDB 4.0 起就一直得到支持,以防止业务集合与 Outbox 集合之间出现潜在的数据不一致。对于未来的更新,为了在没有多文档事务的情况下启用 ACID 事务中的更新现有数据和插入 Outbox 事件,我们已计划支持其他配置,以便将 Outbox 事件存储为现有集合的子文档形式,而不是独立的 Outbox 集合。

有关 Outbox 模式的更多信息,请参阅 通过 Outbox 模式实现可靠的微服务数据交换

示例 Outbox 消息

要理解如何配置 Debezium MongoDB Outbox 事件路由器 SMT,请考虑以下 Debezium Outbox 消息示例

# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

已配置为应用 MongoDB Outbox 事件路由器 SMT 的 Debezium 连接器会通过转换原始 Debezium 更改事件消息来生成上述消息,如下例所示

# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "patch": null,
  "after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
  "source": {
    "version": "3.3.1.Final",
    "connector": "mongodb",
    "name": "fulfillment",
    "ts_ms": 1558965508000,
    "ts_us": 1558965508000000,
    "ts_ns": 1558965508000000000,
    "snapshot": false,
    "db": "inventory",
    "rs": "rs0",
    "collection": "customers",
    "ord": 31,
    "h": 1546547425148721999
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484452,
  "ts_ns": 1556890294484452697,
}

此 Debezium Outbox 消息示例基于 默认 Outbox 事件路由器配置,该配置假定 Outbox 集合结构和基于聚合体的事件路由。要自定义行为,Outbox 事件路由器 SMT 提供了许多 配置选项

基本的 Outbox 集合

要应用默认的 MongoDB Outbox 事件路由器 SMT 配置,您的 Outbox 集合应具有以下字段

{
  "_id": "objectId",
  "aggregatetype": "string",
  "aggregateid": "objectId",
  "type": "string",
  "payload": "object"
}
表 1. 预期 Outbox 集合字段的描述
Field (字段) 效果

id

包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。

要从不同的 Outbox 集合字段获取事件的唯一 ID,请在连接器配置中设置 collection.field.event.id SMT 选项。

aggregatetype

包含一个值,SMT 将此值附加到连接器发出 Outbox 消息的主题名称中。默认行为是此值替换 route.topic.replacement SMT 选项中的默认 ${routedByValue} 变量。

例如,在默认配置中,route.by.field SMT 选项设置为 aggregatetype,而 route.topic.replacement SMT 选项设置为 outbox.event.${routedByValue}。假设您的应用程序向 Outbox 集合添加了两个文档。在第一个文档中,aggregatetype 字段的值为 customers。在第二个文档中,aggregatetype 字段的值为 orders。连接器将第一个文档发出到 outbox.event.customers 主题。连接器将第二个文档发出到 outbox.event.orders 主题。

要从不同的 Outbox 集合字段获取此值,请在连接器配置中设置 route.by.field SMT 选项。

aggregateid

包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。

要从不同的 Outbox 集合字段获取事件键,请在连接器配置中设置 collection.field.event.key SMT 选项。

payload

Outbox 更改事件的表示形式。默认结构是 JSON。默认情况下,Kafka 消息值仅包含 payload 值。但是,如果 Outbox 事件配置为包含附加字段,则 Kafka 消息值将包含一个封装了载荷和附加字段的信封,并且每个字段单独表示。有关更多信息,请参阅 发出包含附加字段的消息

要从不同的 Outbox 集合字段获取事件载荷,请在连接器配置中设置 collection.field.event.payload SMT 选项。

附加的自定义字段

Outbox 集合中的任何附加字段都可以 添加到 Outbox 事件 中,可放置在载荷部分内或作为消息头。

一个例子可能是 eventType 字段,它传达了一个用户定义的帮助分类或组织事件的值。

基本配置

要配置 Debezium MongoDB 连接器以支持 Outbox 模式,请配置 outbox.MongoEventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下例所示

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
自定义配置

连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或事务的元数据消息)。要仅将转换应用于源自 Outbox 集合的事件,请定义 一个 SMT 谓词语句,该语句选择性地将转换应用于这些事件

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

使用 Avro 作为载荷格式

MongoDB Outbox 事件路由器 SMT 支持任意载荷格式。Outbox 集合中的 payload 字段值将透明地传递。处理 JSON 的替代方法是使用 Avro。这对于消息格式治理以及确保 Outbox 事件模式以向后兼容的方式演进可能很有益。

源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter

默认情况下,payload 字段值(Avro 数据)是唯一的消息值。将 ByteArrayConverter 配置为值转换器会将 payload 字段值按原样传播到 Kafka 消息值中。

请注意,这与建议用于其他 SMT 的 BinaryDataConverter 不同。这是因为 MongoDB 在内部存储字节数组的方法不同。

Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(支持因连接器而异)。这些事件无法被 ByteArrayConverter 序列化,因此必须提供额外的配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了如何使用 Apache Kafka 的 JsonConverter,不带模式

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false

发出包含附加字段的消息

您的 Outbox 集合可能包含您希望添加到发出 Outbox 消息中的字段。例如,考虑一个 Outbox 集合,其 aggregatetype 字段的值为 purchase-order,还有一个 eventType 字段,其可能的值为 order-createdorder-shipped。可以使用 field:placement:alias 语法添加附加字段。

placement 的允许值为: - header - envelope - partition

要将 eventType 字段的值发出到 Outbox 消息头中,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:header:type

结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 字段的值。

要将 eventType 字段的值发出到 Outbox 消息信封中,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:envelope:type

要控制出站消息在哪个分区上生成,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=partitionField:partition

请注意,对于 partition 放置,添加别名将无效。

将转义的 JSON 字符串展开为 JSON

默认情况下,Debezium Outbox 消息的 payload 表示为字符串。当字符串的原始来源是 JSON 格式时,生成的 Kafka 消息使用转义序列表示字符串,如下例所示

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

您可以将 Outbox 事件路由器配置为展开消息内容,将转义的 JSON 转换回其原始的、未转义的 JSON 格式。在转换后的字符串中,伴随的模式是从原始 JSON 文档推断出来的。以下示例显示了生成的 Kafka 消息中展开的 JSON

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
  "id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

要在此转换中启用字符串转换,请将 collection.expand.json.payload 的值设置为 true,并将 StringConverter 用作如下示例

transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter

配置选项

下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。

表 2. 出站事件路由器 SMT 配置选项描述
Option Default (默认值) 描述

warn

集合

确定 SMT 在 Outbox 集合上发生更新操作时的行为。可能的设置是

  • warn - SMT 记录一个警告并继续处理下一个 Outbox 集合文档。

  • error - SMT 记录一个错误并继续处理下一个 Outbox 集合文档。

  • fatal - SMT 记录错误,并且连接器停止处理。

Outbox 集合中的所有更改都应为插入或删除操作。也就是说,Outbox 集合充当队列;不允许更新 Outbox 集合中的文档。SMT 会自动过滤掉 Outbox 集合上的删除操作(用于删除已处理的 Outbox 事件)。

_id

集合

指定包含唯一事件 ID 的 Outbox 集合字段。此 ID 将以 id 键的形式存储在发出的事件的头中。

aggregateid

集合

指定包含事件键的 Outbox 集合字段。当此字段包含值时,SMT 将该值用作发出的 Outbox 消息的键。这对于在 Kafka 分区中保持正确的顺序很重要。

集合

默认情况下,发出的 Outbox 消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳在 Outbox 消息中,请将此选项设置为包含您希望出现在发出的 Outbox 消息中的时间戳的 Outbox 集合字段。

payload

集合

指定包含事件载荷的 Outbox 集合字段。

false

集合

指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。

有关更多详细信息,请参阅 展开转义的 JSON 部分。

集合、信封

指定一个或多个您希望添加到 Outbox 消息头或信封中的 Outbox 集合字段。指定一对逗号分隔的列表。在每对中,指定字段的名称以及您是否希望该值出现在头中或信封中。用冒号分隔对中的值,例如

id:header,my-field:envelope

要为字段指定别名,请指定一个三元组,其中别名是第三个值,例如

id:header,my-field:envelope:my-alias

第二个值是放置位置,它必须始终是 headerenvelope

集合、模式

设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。

aggregatetype

路由器

指定 Outbox 集合中的字段名称。默认情况下,此字段中指定的值将成为连接器将 Outbox 消息发出的主题名称的一部分。有关示例,请参阅 预期 Outbox 集合的描述

(?<routedByValue>.*)

路由器

指定一个正则表达式,Outbox SMT 在 RegexRouter 中将其应用于 Outbox 集合文档。此正则表达式是 route.topic.replacement SMT 选项设置的一部分。

当此属性设置为默认的 Router 值时,SMT 会将 route.topic.replacement 属性中设置的默认 ${routedByValue} 变量替换为 route.by.field 属性指定的值。

outbox.event​.${routedByValue}

路由器

指定连接器发出 Outbox 消息的主题名称。默认主题名称以字符串 outbox.event. 为前缀,后跟分配给 Outbox 集合文档的 aggregatetype 字段的值。

例如,如果 aggregatetype 的值为 customers,则连接器会将 Outbox 消息发出到 outbox.event.customers 主题。

要更改目标主题的默认名称,请执行以下操作之一

false

路由器

指示空或 null 的有效负载是否会导致连接器发出墓碑事件。

tracingspancontext

跟踪

包含跟踪跨度上下文的字段名称。

debezium-read

跟踪

表示 Debezium 处理跨度的操作名称。

false

跟踪

如果为 true,则仅跟踪具有序列化上下文字段的事件。

分布式跟踪

出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档