MongoDB 出库事件路由器
|
此 SMT 仅供 Debezium MongoDB 连接器使用。有关关系数据库的 Outbox 事件路由器 SMT 的信息,请参阅 Outbox 事件路由器。 |
出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。
要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以
-
捕获 Outbox 集合中的更改
-
应用 Debezium MongoDB Outbox 事件路由器单条消息转换 (SMT)
已配置为应用 MongoDB Outbox SMT 的 Debezium 连接器应仅捕获 Outbox 集合中发生的更改。有关更多信息,请参阅 选择性应用转换的选项。
仅当每个 Outbox 集合具有相同的结构时,连接器才能捕获多个 Outbox 集合中的更改。
|
要使用此 SMT,对实际业务集合的操作以及向 Outbox 集合的插入操作必须作为多文档事务的一部分完成,这自 MongoDB 4.0 起就一直得到支持,以防止业务集合与 Outbox 集合之间出现潜在的数据不一致。对于未来的更新,为了在没有多文档事务的情况下启用 ACID 事务中的更新现有数据和插入 Outbox 事件,我们已计划支持其他配置,以便将 Outbox 事件存储为现有集合的子文档形式,而不是独立的 Outbox 集合。 |
有关 Outbox 模式的更多信息,请参阅 通过 Outbox 模式实现可靠的微服务数据交换。
示例 Outbox 消息
要理解如何配置 Debezium MongoDB Outbox 事件路由器 SMT,请考虑以下 Debezium Outbox 消息示例
# Kafka Topic: outbox.event.order
# Kafka Message key: "b2730779e1f596e275826f08"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
已配置为应用 MongoDB Outbox 事件路由器 SMT 的 Debezium 连接器会通过转换原始 Debezium 更改事件消息来生成上述消息,如下例所示
# Kafka Message key: { "id": "{\"$oid\": \"596e275826f08b2730779e1f\"}" }
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"patch": null,
"after": "{\"_id\": {\"$oid\": \"596e275826f08b2730779e1f\"}, \"aggregateid\": {\"$oid\": \"b2730779e1f596e275826f08\"}, \"aggregatetype\": \"Order\", \"type\": \"OrderCreated\", \"payload\": {\"_id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}}",
"source": {
"version": "3.3.1.Final",
"connector": "mongodb",
"name": "fulfillment",
"ts_ms": 1558965508000,
"ts_us": 1558965508000000,
"ts_ns": 1558965508000000000,
"snapshot": false,
"db": "inventory",
"rs": "rs0",
"collection": "customers",
"ord": 31,
"h": 1546547425148721999
},
"op": "c",
"ts_ms": 1556890294484,
"ts_us": 1556890294484452,
"ts_ns": 1556890294484452697,
}
此 Debezium Outbox 消息示例基于 默认 Outbox 事件路由器配置,该配置假定 Outbox 集合结构和基于聚合体的事件路由。要自定义行为,Outbox 事件路由器 SMT 提供了许多 配置选项。
基本的 Outbox 集合
要应用默认的 MongoDB Outbox 事件路由器 SMT 配置,您的 Outbox 集合应具有以下字段
{
"_id": "objectId",
"aggregatetype": "string",
"aggregateid": "objectId",
"type": "string",
"payload": "object"
}
| Field (字段) | 效果 |
|---|---|
|
包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。 |
包含一个值,SMT 将此值附加到连接器发出 Outbox 消息的主题名称中。默认行为是此值替换 |
|
|
包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。 |
|
Outbox 更改事件的表示形式。默认结构是 JSON。默认情况下,Kafka 消息值仅包含 |
附加的自定义字段 |
Outbox 集合中的任何附加字段都可以 添加到 Outbox 事件 中,可放置在载荷部分内或作为消息头。 |
基本配置
要配置 Debezium MongoDB 连接器以支持 Outbox 模式,请配置 outbox.MongoEventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下例所示
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或事务的元数据消息)。要仅将转换应用于源自 Outbox 集合的事件,请定义 一个 SMT 谓词语句,该语句选择性地将转换应用于这些事件。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
使用 SMT 的
route.topic.regex配置选项。
使用 Avro 作为载荷格式
MongoDB Outbox 事件路由器 SMT 支持任意载荷格式。Outbox 集合中的 payload 字段值将透明地传递。处理 JSON 的替代方法是使用 Avro。这对于消息格式治理以及确保 Outbox 事件模式以向后兼容的方式演进可能很有益。
源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
默认情况下,payload 字段值(Avro 数据)是唯一的消息值。将 ByteArrayConverter 配置为值转换器会将 payload 字段值按原样传播到 Kafka 消息值中。
请注意,这与建议用于其他 SMT 的 BinaryDataConverter 不同。这是因为 MongoDB 在内部存储字节数组的方法不同。
Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(支持因连接器而异)。这些事件无法被 ByteArrayConverter 序列化,因此必须提供额外的配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了如何使用 Apache Kafka 的 JsonConverter,不带模式
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
value.converter=io.debezium.converters.ByteArrayConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false。
发出包含附加字段的消息
您的 Outbox 集合可能包含您希望添加到发出 Outbox 消息中的字段。例如,考虑一个 Outbox 集合,其 aggregatetype 字段的值为 purchase-order,还有一个 eventType 字段,其可能的值为 order-created 和 order-shipped。可以使用 field:placement:alias 语法添加附加字段。
placement 的允许值为: - header - envelope - partition
要将 eventType 字段的值发出到 Outbox 消息头中,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 字段的值。
要将 eventType 字段的值发出到 Outbox 消息信封中,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=eventType:envelope:type
要控制出站消息在哪个分区上生成,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.collection.fields.additional.placement=partitionField:partition
请注意,对于 partition 放置,添加别名将无效。
将转义的 JSON 字符串展开为 JSON
默认情况下,Debezium Outbox 消息的 payload 表示为字符串。当字符串的原始来源是 JSON 格式时,生成的 Kafka 消息使用转义序列表示字符串,如下例所示
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": {\"$oid\": \"da8d6de63b7745ff8f4457db\"}, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
您可以将 Outbox 事件路由器配置为展开消息内容,将转义的 JSON 转换回其原始的、未转义的 JSON 格式。在转换后的字符串中,伴随的模式是从原始 JSON 文档推断出来的。以下示例显示了生成的 Kafka 消息中展开的 JSON
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=596e275826f08b2730779e1f"
# Kafka Message Timestamp: 1556890294484
{
"id": "da8d6de63b7745ff8f4457db", "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}
要在此转换中启用字符串转换,请将 collection.expand.json.payload 的值设置为 true,并将 StringConverter 用作如下示例
transforms=outbox,...
transforms.outbox.type=io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter
transforms.outbox.collection.expand.json.payload=true
value.converter=org.apache.kafka.connect.storage.StringConverter
配置选项
下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。
| Option | Default (默认值) | 组 | 描述 |
|---|---|---|---|
|
集合 |
确定 SMT 在 Outbox 集合上发生更新操作时的行为。可能的设置是
Outbox 集合中的所有更改都应为插入或删除操作。也就是说,Outbox 集合充当队列;不允许更新 Outbox 集合中的文档。SMT 会自动过滤掉 Outbox 集合上的删除操作(用于删除已处理的 Outbox 事件)。 |
|
|
集合 |
指定包含唯一事件 ID 的 Outbox 集合字段。此 ID 将以 |
|
|
集合 |
指定包含事件键的 Outbox 集合字段。当此字段包含值时,SMT 将该值用作发出的 Outbox 消息的键。这对于在 Kafka 分区中保持正确的顺序很重要。 |
|
集合 |
默认情况下,发出的 Outbox 消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳在 Outbox 消息中,请将此选项设置为包含您希望出现在发出的 Outbox 消息中的时间戳的 Outbox 集合字段。 |
||
|
集合 |
指定包含事件载荷的 Outbox 集合字段。 |
|
|
集合 |
指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。 |
|
集合、信封 |
指定一个或多个您希望添加到 Outbox 消息头或信封中的 Outbox 集合字段。指定一对逗号分隔的列表。在每对中,指定字段的名称以及您是否希望该值出现在头中或信封中。用冒号分隔对中的值,例如
要为字段指定别名,请指定一个三元组,其中别名是第三个值,例如
第二个值是放置位置,它必须始终是 配置示例在 在 Debezium Outbox 消息中发出附加字段 中。 |
||
集合、模式 |
设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。 |
||
|
路由器 |
指定 Outbox 集合中的字段名称。默认情况下,此字段中指定的值将成为连接器将 Outbox 消息发出的主题名称的一部分。有关示例,请参阅 预期 Outbox 集合的描述。 |
|
|
路由器 |
指定一个正则表达式,Outbox SMT 在 RegexRouter 中将其应用于 Outbox 集合文档。此正则表达式是 |
|
|
路由器 |
指定连接器发出 Outbox 消息的主题名称。默认主题名称以字符串
|
|
|
路由器 |
指示空或 |
|
|
跟踪 |
包含跟踪跨度上下文的字段名称。 |
|
|
跟踪 |
表示 Debezium 处理跨度的操作名称。 |
|
|
跟踪 |
如果为 |
分布式跟踪
出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档。