出库事件路由器
出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。
要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以
-
捕获出站表中的更改
-
应用 Debezium 出站事件路由器单一消息转换 (SMT)
配置为应用出站 SMT 的 Debezium 连接器应仅捕获出站表中的更改。有关更多信息,请参阅 选择性应用转换的选项。
仅当每个出站表具有相同的结构时,连接器才能捕获多个出站表中的更改。
请参阅 可靠的微服务数据交换与出站模式,了解出站模式的优势和工作原理。
有关可运行的示例,请参阅 Debezium 示例存储库中的 出站模式演示。它包含如何配置 Debezium 连接器以运行出站事件路由器 SMT 的示例。
|
出站事件路由器 SMT 与 MongoDB 连接器不兼容。 MongoDB 用户可以运行 MongoDB 出站事件路由器 SMT。 |
出站消息示例
要了解 Debezium 出站事件路由器 SMT 的配置方式,请查看以下 Debezium 出站消息示例
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
配置为应用出站事件路由器 SMT 的 Debezium 连接器通过转换如下所示的 Debezium 原始消息来生成上述消息
# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
"before": null,
"after": {
"id": "406c07f3-26f0-4eea-a50c-109940064b8f",
"aggregateid": "1",
"aggregatetype": "Order",
"payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
"timestamp": 1556890294344,
"type": "OrderCreated"
},
"source": {
"version": "3.3.1.Final",
"connector": "postgresql",
"name": "dbserver1-bare",
"db": "orderdb",
"ts_usec": 1556890294448870,
"txId": 584,
"lsn": 24064704,
"schema": "inventory",
"table": "outboxevent",
"snapshot": false,
"last_snapshot_record": null,
"xmin": null
},
"op": "c",
"ts_ms": 1556890294484,
"ts_us": 1556890294484651,
"ts_ns": 1556890294484651402
}
此 Debezium 出站消息示例基于 默认出站事件路由器配置,该配置假定出站表结构和基于聚合的事件路由。要自定义行为,出站事件路由器 SMT 提供了许多 配置选项。
基本出站表
要应用默认的出站事件路由器 SMT 配置,您的出站表应具有以下列
Column | Type | Modifiers
--------------+------------------------+-----------
id | uuid | not null
aggregatetype | character varying(255) | not null
aggregateid | character varying(255) | not null
type | character varying(255) | not null
payload | jsonb |
| Column | 效果 |
|---|---|
|
包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。 |
包含一个值,SMT 将该值附加到连接器发出出站消息的主题名称。默认行为是,此值替换 |
|
|
包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。 |
|
出站更改事件的表示。默认结构为 JSON。默认情况下,Kafka 消息值仅由 |
其他自定义列 |
出站表中的任何其他列都可以 添加到出站事件中,无论是作为有效负载的一部分还是作为消息头。 |
基本配置
要配置 Debezium 连接器以支持出站模式,请配置 outbox.EventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下面的示例所示
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或关于事务或架构更改的元数据消息)。要仅将转换应用于源自出站表的事件,请定义 一个 SMT 谓词语句,该语句仅选择性地将转换应用于这些事件。
选择性应用转换的选项
除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:
-
为 SMT 使用
route.topic.regex配置选项。
有效负载序列化格式
出站事件路由器 SMT 支持任意有效负载格式。SMT 会在不修改的情况下传递它从出站表中读取的 payload 列值。SMT 如何将这些列值转换为 Kafka 消息字段取决于您如何配置 SMT。常见的有效负载格式包括 JSON 和 Avro。
使用 JSON 作为有效负载格式
出站事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON(例如,PostgreSQL 中的 jsonb)。
将转义的 JSON 字符串展开为 JSON
当 Debezium 出站消息将 payload 表示为 JSON 字符串时,生成的 Kafka 消息会像下面的示例一样对其进行转义
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}
出站事件路由器使您能够将消息内容展开为“真实的” JSON,并从 JSON 文档推断出配套的模式。生成的 Kafka 消息格式如下
# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
"id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}
要启用出站事件路由器转换的使用,请将 table.expand.json.payload 设置为 true,并使用 JsonConverter,如下面的示例所示
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter
使用 Apache Avro 作为有效负载格式
Apache Avro 是一种常用的数据序列化框架。使用 Avro 可为消息格式治理和确保出站事件模式以向后兼容的方式演进带来好处。
源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
默认情况下,payload 列值(Avro 数据)是唯一的消息值。当数据以 Avro 格式存储时,列格式必须设置为二进制数据类型,例如 PostgreSQL 中的 bytea。SMT 的值转换器也必须设置为 BinaryDataConverter,以便它将 payload 列的二进制值按原样传递到 Kafka 消息值中。
Debezium 连接器可以配置为发出心跳、事务元数据或架构更改事件(支持因连接器而异)。这些事件无法由 BinaryDataConverter 序列化,因此必须提供其他配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了使用 Apache Kafka JsonConverter 且不带模式
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false。
以下示例说明了如何配置 SMT 以使用带有 Apicurio Registry 的委托转换器将数据转换为 Avro 格式
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true
最后,以下示例说明了如何配置 SMT 以使用带有 Confluent Schema Registry 的委托转换器将数据转换为 Avro 格式
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}
|
在前面的配置示例中,由于 |
发出带有附加字段的消息
您的出站表可能包含您希望添加到已发出出站消息中的值的列。例如,考虑一个出站表,其 aggregatetype 列的值为 purchase-order,另一个名为 eventType 的列,其可能的值为 order-created 和 order-shipped。可以使用 column:placement:alias 语法添加附加字段。
placement 的允许值为: - header - envelope - partition
要将 eventType 列的值发出到出站消息头中,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type
结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 列的值。
要将 eventType 列的值发出到出站消息信封中,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type
要控制出站消息在哪个分区上生成,请按以下方式配置 SMT
transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition
请注意,对于 partition 放置,添加别名将无效。
配置选项
下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。
| Option | Default (默认值) | 组 | 描述 |
|---|---|---|---|
|
表 |
确定 SMT 在出站表上发生
出站表中的所有更改都应为 |
|
|
表 |
指定包含唯一事件 ID 的出站表列。此 ID 将以 |
|
|
表 |
指定包含事件键的出站表列。当此列包含值时,SMT 将该值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。 |
|
表 |
默认情况下,已发出出站消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳来生成出站消息,请将此选项设置为包含您希望在已发出出站消息中显示的时间戳的出站表列。 |
||
|
表 |
指定包含事件有效负载的出站表列。 |
|
|
表 |
指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。 |
|
|
表 |
当启用 JSON 展开属性
|
|
表、信封 |
指定一个或多个出站表列,您希望将它们添加到出站消息头或信封中。指定一个逗号分隔的对列表。在每对中,指定列的名称以及您是否希望值位于头中或信封中。用冒号分隔对中的值,例如
要为列指定别名,请指定一个三元组,其中别名是第三个值,例如
第二个值是放置位置,它必须始终是 配置示例在 在 Debezium 出站消息中发出附加字段 中。 |
||
|
表、信封 |
指定当 |
|
表、模式 |
设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。 |
||
|
路由器 |
指定出站表中的列名。默认行为是,此列中的值将成为连接器发出出站消息的主题名称的一部分。示例在 预期出站表描述 中。 |
|
|
路由器 |
指定一个正则表达式,出站 SMT 将该正则表达式应用于 RegexRouter 中的出站表记录。此正则表达式是 |
|
|
路由器 |
指定连接器发出出站消息的主题名称。默认主题名称是
|
|
|
路由器 |
指示空或 |
|
|
跟踪 |
包含跟踪跨度上下文的字段名称。 |
|
|
跟踪 |
表示 Debezium 处理跨度的操作名称。 |
|
|
跟踪 |
如果为 |
分布式跟踪
出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档。