出库事件路由器

出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。

要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以

  • 捕获出站表中的更改

  • 应用 Debezium 出站事件路由器单一消息转换 (SMT)

配置为应用出站 SMT 的 Debezium 连接器应仅捕获出站表中的更改。有关更多信息,请参阅 选择性应用转换的选项

仅当每个出站表具有相同的结构时,连接器才能捕获多个出站表中的更改。

请参阅 可靠的微服务数据交换与出站模式,了解出站模式的优势和工作原理。

有关可运行的示例,请参阅 Debezium 示例存储库中的 出站模式演示。它包含如何配置 Debezium 连接器以运行出站事件路由器 SMT 的示例。

出站事件路由器 SMT 与 MongoDB 连接器不兼容。

MongoDB 用户可以运行 MongoDB 出站事件路由器 SMT

出站消息示例

要了解 Debezium 出站事件路由器 SMT 的配置方式,请查看以下 Debezium 出站消息示例

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

配置为应用出站事件路由器 SMT 的 Debezium 连接器通过转换如下所示的 Debezium 原始消息来生成上述消息

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "3.3.1.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484651,
  "ts_ns": 1556890294484651402
}

此 Debezium 出站消息示例基于 默认出站事件路由器配置,该配置假定出站表结构和基于聚合的事件路由。要自定义行为,出站事件路由器 SMT 提供了许多 配置选项

基本出站表

要应用默认的出站事件路由器 SMT 配置,您的出站表应具有以下列

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
表 1. 预期出站表列的描述
Column 效果

id

包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。

要从另一个出站表列获取事件的唯一 ID,请在连接器配置中设置 table.field.event.id SMT 选项。

aggregatetype

包含一个值,SMT 将该值附加到连接器发出出站消息的主题名称。默认行为是,此值替换 route.topic.replacement SMT 选项中的默认 ${routedByValue} 变量。

例如,在默认配置中,route.by.field SMT 选项设置为 aggregatetype,而 route.topic.replacement SMT 选项设置为 outbox.event.${routedByValue}。假设您的应用程序向出站表添加了两个记录。在第一条记录中,aggregatetype 列的值为 customers。在第二条记录中,aggregatetype 列的值为 orders。连接器将第一条记录发出到 outbox.event.customers 主题。连接器将第二条记录发出到 outbox.event.orders 主题。

要从另一个出站表列获取此值,请在连接器配置中设置 route.by.field SMT 选项。

aggregateid

包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。

要从另一个出站表列获取事件键,请在连接器配置中设置 table.field.event.key SMT 选项

payload

出站更改事件的表示。默认结构为 JSON。默认情况下,Kafka 消息值仅由 payload 值组成。但是,如果出站事件配置为包含附加字段,则 Kafka 消息值将包含一个封装有效负载和附加字段的信封,并且每个字段都单独表示。有关更多信息,请参阅 发出带有附加字段的消息

要从另一个出站表列获取事件有效负载,请在连接器配置中设置 table.field.event.payload SMT 选项。

其他自定义列

出站表中的任何其他列都可以 添加到出站事件中,无论是作为有效负载的一部分还是作为消息头。

一个例子可能是一个名为 eventType 的列,它传递一个用户定义的、有助于对事件进行分类或组织的值。

基本配置

要配置 Debezium 连接器以支持出站模式,请配置 outbox.EventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下面的示例所示

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
自定义配置

连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或关于事务或架构更改的元数据消息)。要仅将转换应用于源自出站表的事件,请定义 一个 SMT 谓词语句,该语句仅选择性地将转换应用于这些事件

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

有效负载序列化格式

出站事件路由器 SMT 支持任意有效负载格式。SMT 会在不修改的情况下传递它从出站表中读取的 payload 列值。SMT 如何将这些列值转换为 Kafka 消息字段取决于您如何配置 SMT。常见的有效负载格式包括 JSON 和 Avro。

使用 JSON 作为有效负载格式

出站事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON(例如,PostgreSQL 中的 jsonb)。

将转义的 JSON 字符串展开为 JSON

当 Debezium 出站消息将 payload 表示为 JSON 字符串时,生成的 Kafka 消息会像下面的示例一样对其进行转义

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

出站事件路由器使您能够将消息内容展开为“真实的” JSON,并从 JSON 文档推断出配套的模式。生成的 Kafka 消息格式如下

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

要启用出站事件路由器转换的使用,请将 table.expand.json.payload 设置为 true,并使用 JsonConverter,如下面的示例所示

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

使用 Apache Avro 作为有效负载格式

Apache Avro 是一种常用的数据序列化框架。使用 Avro 可为消息格式治理和确保出站事件模式以向后兼容的方式演进带来好处。

源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

默认情况下,payload 列值(Avro 数据)是唯一的消息值。当数据以 Avro 格式存储时,列格式必须设置为二进制数据类型,例如 PostgreSQL 中的 bytea。SMT 的值转换器也必须设置为 BinaryDataConverter,以便它将 payload 列的二进制值按原样传递到 Kafka 消息值中。

Debezium 连接器可以配置为发出心跳、事务元数据或架构更改事件(支持因连接器而异)。这些事件无法由 BinaryDataConverter 序列化,因此必须提供其他配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了使用 Apache Kafka JsonConverter 且不带模式

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false

以下示例说明了如何配置 SMT 以使用带有 Apicurio Registry 的委托转换器将数据转换为 Avro 格式

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true

最后,以下示例说明了如何配置 SMT 以使用带有 Confluent Schema Registry 的委托转换器将数据转换为 Avro 格式

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}

在前面的配置示例中,由于 AvroConverter 配置为委托转换器,因此需要第三方库。有关如何将第三方库添加到类路径的信息超出了本文档的范围。

发出带有附加字段的消息

您的出站表可能包含您希望添加到已发出出站消息中的值的列。例如,考虑一个出站表,其 aggregatetype 列的值为 purchase-order,另一个名为 eventType 的列,其可能的值为 order-createdorder-shipped。可以使用 column:placement:alias 语法添加附加字段。

placement 的允许值为: - header - envelope - partition

要将 eventType 列的值发出到出站消息头中,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 列的值。

要将 eventType 列的值发出到出站消息信封中,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

要控制出站消息在哪个分区上生成,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

请注意,对于 partition 放置,添加别名将无效。

配置选项

下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。

表 2. 出站事件路由器 SMT 配置选项描述
Option Default (默认值) 描述

warn

确定 SMT 在出站表上发生 UPDATE 操作时的行为。可能设置如下

  • warn - SMT 记录警告并继续处理下一个出站表记录。

  • error - SMT 记录错误并继续处理下一个出站表记录。

  • fatal - SMT 记录错误,并且连接器停止处理。

出站表中的所有更改都应为 INSERT 操作。也就是说,出站表充当队列;不允许更新出站表中的记录。SMT 会自动过滤掉出站表上的 DELETE 操作。

id

指定包含唯一事件 ID 的出站表列。此 ID 将以 id 键存储在已发出事件的头中。

aggregateid

指定包含事件键的出站表列。当此列包含值时,SMT 将该值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。

默认情况下,已发出出站消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳来生成出站消息,请将此选项设置为包含您希望在已发出出站消息中显示的时间戳的出站表列。

payload

指定包含事件有效负载的出站表列。

false

指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。

有关更多详细信息,请参阅 展开转义的 JSON 部分。

ignore

当启用 JSON 展开属性 table.expand.json.payload 时,确定包含出站表中 null 值的 JSON 有效负载的行为。可能设置如下

  • ignore - 忽略 null 值。

  • optional_bytes - 保留 null 值,并将 null 视为 connect 的可选字节。

表、信封

指定一个或多个出站表列,您希望将它们添加到出站消息头或信封中。指定一个逗号分隔的对列表。在每对中,指定列的名称以及您是否希望值位于头中或信封中。用冒号分隔对中的值,例如

id:header,my-field:envelope

要为列指定别名,请指定一个三元组,其中别名是第三个值,例如

id:header,my-field:envelope:my-alias

第二个值是放置位置,它必须始终是 headerenvelope

true

表、信封

指定当 table.fields.additional.placement 属性指定的字段在出站有效负载中找不到时,此转换是否会引发错误。

表、模式

设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。

aggregatetype

路由器

指定出站表中的列名。默认行为是,此列中的值将成为连接器发出出站消息的主题名称的一部分。示例在 预期出站表描述 中。

(?<routedByValue>.*)

路由器

指定一个正则表达式,出站 SMT 将该正则表达式应用于 RegexRouter 中的出站表记录。此正则表达式是 route.topic.replacement SMT 选项设置的一部分。

默认行为是,SMT 会将 route.topic.replacement SMT 选项设置中的默认 ${routedByValue} 变量替换为 route.by.field 出站 SMT 选项的设置。

outbox.event​.${routedByValue}

路由器

指定连接器发出出站消息的主题名称。默认主题名称是 outbox.event. 后面跟着出站表记录中的 aggregatetype 列值。例如,如果 aggregatetype 的值为 customers,则主题名称为 outbox.event.customers

要更改主题名称,您可以

  • route.by.field 选项设置为不同的列。

  • 将 route.topic.regex 选项设置为不同的正则表达式。

false

路由器

指示空或 null 的有效负载是否会导致连接器发出墓碑事件。

tracingspancontext

跟踪

包含跟踪跨度上下文的字段名称。

debezium-read

跟踪

表示 Debezium 处理跨度的操作名称。

false

跟踪

如果为 true,则仅跟踪具有序列化上下文字段的事件。

分布式跟踪

出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档