出库事件路由器
出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。
要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以
-
捕获 outbox 表中的更改
-
应用 Debezium outbox 事件路由器单个消息转换 (SMT)
配置为应用 outbox SMT 的 Debezium 连接器应仅捕获 outbox 表中发生的更改。有关更多信息,请参阅 选择性应用转换的选项。
仅当每个 outbox 表具有相同的结构时,连接器才能捕获多个 outbox 表中的更改。
请参阅 使用 Outbox 模式可靠地交换微服务数据,了解 outbox 模式的用途及其工作原理。
有关可运行的示例,请参阅 Debezium 示例存储库中的 outbox 模式演示。它包括一个有关如何配置 Debezium 连接器以运行 outbox 事件路由器 SMT 的示例。
|
Outbox 事件路由器 SMT 与 MongoDB 连接器不兼容。 MongoDB 用户可以运行 MongoDB outbox 事件路由器 SMT。 |
示例 outbox 消息
要理解 Debezium outbox 事件路由器 SMT 的配置方式,请查看以下 Debezium outbox 消息示例
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
配置为应用 outbox 事件路由器 SMT 的 Debezium 连接器通过转换如下所示的 Debezium 原始消息来生成上述消息
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"before": null,
"after": {
"id": "406c07f3-26f0-4eea-a50c-109940064b8f",
"aggregateid": "1",
"aggregatetype": "Order",
"payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
"timestamp": 1556890294344,
"type": "OrderCreated"
},
"source": {
"version": "3.3.0.Final",
"connector": "postgresql",
"name": "dbserver1-bare",
"db": "orderdb",
"ts_usec": 1556890294448870,
"txId": 584,
"lsn": 24064704,
"schema": "inventory",
"table": "outboxevent",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "c",
"ts_ms": 1556890294484,
"ts_us": 1556890294484651,
"ts_ns": 1556890294484651402
}
此 Debezium outbox 消息示例基于 默认 outbox 事件路由器配置,该配置假定 outbox 表结构和基于聚合的事件路由。要自定义行为,outbox 事件路由器 SMT 提供了许多 配置选项。
基本的 outbox 表
要应用默认的 outbox 事件路由器 SMT 配置,假定您的 outbox 表具有以下列
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb |
| Column | 效果 |
|---|---|
|
包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。 |
包含一个 SMT 将其附加到连接器向其发出 outbox 消息的主题名称的值。默认行为是此值替换 |
|
|
包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。 |
|
outbox 更改事件的表示形式。默认结构是 JSON。默认情况下,Kafka 消息值仅由 |
其他自定义列 |
outbox 表中的任何其他列都可以 添加到 outbox 事件中,要么放在 payload 部分内,要么作为消息头。 |
基本配置
要配置 Debezium 连接器以支持 outbox 模式,请配置 outbox.EventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下例所示
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或有关事务或模式更改的元数据消息)。要仅将转换应用于源自 outbox 表的事件,请 定义一个 SMT 谓词语句,仅选择性地将转换应用于这些事件。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
在 SMT 中使用
route.topic.regex配置选项。
Payload 序列化格式
Outbox 事件路由器 SMT 支持任意 payload 格式。SMT 会在不修改的情况下传递它从 outbox 表读取的 payload 列值。SMT 将这些列值转换为 Kafka 消息字段的方式取决于您如何配置 SMT。常见的 payload 格式是 JSON 和 Avro。
将 JSON 用作 Payload 格式
Outbox 事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON(例如,PostgreSQL 中的 jsonb)。
将转义的 JSON 字符串展开为 JSON
当 Debezium outbox 消息将 payload 表示为 JSON 字符串时,生成的 Kafka 消息会像以下示例一样转义该字符串
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
outbox 事件路由器使您能够将消息内容展开为“实际”JSON,并从 JSON 文档推断出伴随的模式。生成的 Kafka 消息的格式如下所示
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}
要启用 outbox 事件路由器转换的使用,请将 table.expand.json.payload 设置为 true,并使用 JsonConverter,如下例所示
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter
将 Apache Avro 用作 Payload 格式
Apache Avro 是序列化数据的常用框架。使用 Avro 有利于消息格式治理,并确保 outbox 事件模式以向后兼容的方式演进。
源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
默认情况下,payload 列值(Avro 数据)是唯一的消息值。当数据以 Avro 格式存储时,列格式必须设置为二进制数据类型,例如 PostgreSQL 中的 bytea。SMT 的值转换器也必须设置为 BinaryDataConverter,以便它将 payload 列的二进制值原样传播到 Kafka 消息值中。
Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(支持情况因连接器而异)。这些事件无法由 BinaryDataConverter 序列化,因此必须提供其他配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了如何使用 Apache Kafka JsonConverter(不带模式)
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false。
以下示例说明了如何配置 SMT 以使用带 Apicurio Registry 的委托转换器将数据转换为 Avro 格式
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true
最后,以下示例说明了如何配置 SMT 以使用带 Confluent Schema Registry 的委托转换器将数据转换为 Avro 格式
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}
|
在前面的配置示例中,由于 |
发出带有其他字段的消息
您的 outbox 表可能包含您希望添加到发出的 outbox 消息中的列值。例如,考虑一个 outbox 表,其中 aggregatetype 列具有 purchase-order 的值,另一个名为 eventType 的列,其可能值为 order-created 和 order-shipped。可以使用 column:placement:alias 语法添加其他字段。
placement 的允许值为: - header - envelope - partition
要将 eventType 列值发出到 outbox 消息头,请像这样配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 列的值。
要将 eventType 列值发出到 outbox 消息信封,请像这样配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type
要控制出站消息在哪个分区上生成,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition
请注意,对于 partition 放置,添加别名将无效。
配置选项
下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。
| Option | Default (默认值) | 组 | 描述 |
|---|---|---|---|
|
表 |
确定 SMT 在 outbox 表上执行
outbox 表中的所有更改都应为 |
|
|
表 |
指定包含唯一事件 ID 的 outbox 表列。此 ID 将存储在发出的事件的头信息中,键为 |
|
|
表 |
指定包含事件键的 outbox 表列。当此列包含值时,SMT 使用该值作为发出的 outbox 消息的键。这对于在 Kafka 分区中维护正确的顺序非常重要。 |
|
表 |
默认情况下,发出的 outbox 消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳在 outbox 消息中,请将此选项设置为包含您希望在发出的 outbox 消息中显示的时间戳的 outbox 表列。 |
||
|
表 |
指定包含事件 payload 的 outbox 表列。 |
|
|
表 |
指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。 |
|
|
表 |
当启用 JSON 展开属性
|
|
表、信封 |
指定一个或多个您想添加到 outbox 消息头或信封中的 outbox 表列。指定一个逗号分隔的对列表。在每一对中,指定列的名称以及您是否希望值在头信息或信封中。用冒号分隔对中的值,例如
要为列指定别名,请指定一个三元组,别名作为第三个值,例如
第二个值是放置位置,它必须始终是 配置示例在 在 Debezium outbox 消息中发出其他字段 中。 |
||
|
表、信封 |
指定此转换是否在 Outbox payload 中找不到 |
|
表、模式 |
设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。 |
||
|
路由器 |
指定 outbox 表中一列的名称。默认行为是将此列中的值作为连接器向其发出 outbox 消息的主题名称的一部分。示例在 预期 outbox 表的说明 中。 |
|
|
路由器 |
指定一个正则表达式,outbox SMT 在 RegexRouter 中应用于 outbox 表记录。此正则表达式是 |
|
|
路由器 |
指定连接器发出 outbox 消息的主题名称。默认主题名称是
|
|
|
路由器 |
指示空或 |
|
|
跟踪 |
包含跟踪跨度上下文的字段名称。 |
|
|
跟踪 |
表示 Debezium 处理跨度的操作名称。 |
|
|
跟踪 |
如果为 |
分布式跟踪
出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档。