您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

出库事件路由器

出站模式是一种安全可靠地在多个(微)服务之间交换数据的方式。出站模式实现可避免服务内部状态(通常持久化在其数据库中)与需要相同数据的服务所消费的事件状态之间出现不一致。

要实现 Debezium 应用程序中的出站模式,请配置 Debezium 连接器以

  • 捕获 outbox 表中的更改

  • 应用 Debezium outbox 事件路由器单个消息转换 (SMT)

配置为应用 outbox SMT 的 Debezium 连接器应仅捕获 outbox 表中发生的更改。有关更多信息,请参阅 选择性应用转换的选项

仅当每个 outbox 表具有相同的结构时,连接器才能捕获多个 outbox 表中的更改。

请参阅 使用 Outbox 模式可靠地交换微服务数据,了解 outbox 模式的用途及其工作原理。

有关可运行的示例,请参阅 Debezium 示例存储库中的 outbox 模式演示。它包括一个有关如何配置 Debezium 连接器以运行 outbox 事件路由器 SMT 的示例。

Outbox 事件路由器 SMT 与 MongoDB 连接器不兼容。

MongoDB 用户可以运行 MongoDB outbox 事件路由器 SMT

示例 outbox 消息

要理解 Debezium outbox 事件路由器 SMT 的配置方式,请查看以下 Debezium outbox 消息示例

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

配置为应用 outbox 事件路由器 SMT 的 Debezium 连接器通过转换如下所示的 Debezium 原始消息来生成上述消息

# Kafka Message key: "406c07f3-26f0-4eea-a50c-109940064b8f"
# Kafka Message Headers: ""
# Kafka Message Timestamp: 1556890294484
{
  "before": null,
  "after": {
    "id": "406c07f3-26f0-4eea-a50c-109940064b8f",
    "aggregateid": "1",
    "aggregatetype": "Order",
    "payload": "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}",
    "timestamp": 1556890294344,
    "type": "OrderCreated"
  },
  "source": {
    "version": "3.3.0.Final",
    "connector": "postgresql",
    "name": "dbserver1-bare",
    "db": "orderdb",
    "ts_usec": 1556890294448870,
    "txId": 584,
    "lsn": 24064704,
    "schema": "inventory",
    "table": "outboxevent",
    "snapshot": false,
    "last_snapshot_record": null,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1556890294484,
  "ts_us": 1556890294484651,
  "ts_ns": 1556890294484651402
}

此 Debezium outbox 消息示例基于 默认 outbox 事件路由器配置,该配置假定 outbox 表结构和基于聚合的事件路由。要自定义行为,outbox 事件路由器 SMT 提供了许多 配置选项

基本的 outbox 表

要应用默认的 outbox 事件路由器 SMT 配置,假定您的 outbox 表具有以下列

Column        |          Type          | Modifiers
--------------+------------------------+-----------
id            | uuid                   | not null
aggregatetype | character varying(255) | not null
aggregateid   | character varying(255) | not null
type          | character varying(255) | not null
payload       | jsonb                  |
表 1. 预期的 outbox 表列说明
Column 效果

id

包含事件的唯一 ID。在出站消息中,此值为一个标头。例如,您可以使用此 ID 来删除重复消息。

要从不同的 outbox 表列获取事件的唯一 ID,请在连接器配置中设置 table.field.event.id SMT 选项。

aggregatetype

包含一个 SMT 将其附加到连接器向其发出 outbox 消息的主题名称的值。默认行为是此值替换 route.topic.replacement SMT 选项中的默认 ${routedByValue} 变量。

例如,在默认配置中,route.by.field SMT 选项设置为 aggregatetype,而 route.topic.replacement SMT 选项设置为 outbox.event.${routedByValue}。假设您的应用程序向 outbox 表添加了两个记录。在第一条记录中,aggregatetype 列的值是 customers。在第二条记录中,aggregatetype 列的值是 orders。连接器将第一条记录发出到 outbox.event.customers 主题。连接器将第二条记录发出到 outbox.event.orders 主题。

要从不同的 outbox 表列获取此值,请在连接器配置中设置 route.by.field SMT 选项。

aggregateid

包含事件键,它为有效负载提供一个 ID。SMT 将此值用作已发出出站消息中的键。这对于在 Kafka 分区中保持正确的顺序很重要。

要从不同的 outbox 表列获取事件键,请在连接器配置中设置 table.field.event.key SMT 选项

payload

outbox 更改事件的表示形式。默认结构是 JSON。默认情况下,Kafka 消息值仅由 payload 值组成。但是,如果 outbox 事件配置为包含其他字段,Kafka 消息值将包含一个封装 payload 和其他字段的信封,并且每个字段都单独表示。有关更多信息,请参阅 发出带有其他字段的消息

要从不同的 outbox 表列获取事件 payload,请在连接器配置中设置 table.field.event.payload SMT 选项。

其他自定义列

outbox 表中的任何其他列都可以 添加到 outbox 事件中,要么放在 payload 部分内,要么作为消息头。

一个例子可能是 eventType 列,它传达了一个用户定义的帮助分类或组织事件的值。

基本配置

要配置 Debezium 连接器以支持 outbox 模式,请配置 outbox.EventRouter SMT。要获得 SMT 的默认行为,请在不指定任何选项的情况下将其添加到连接器配置中,如下例所示

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
自定义配置

连接器可能会发出许多类型的事件消息(例如,心跳消息、墓碑消息或有关事务或模式更改的元数据消息)。要仅将转换应用于源自 outbox 表的事件,请 定义一个 SMT 谓词语句,仅选择性地将转换应用于这些事件

选择性应用转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息外,连接器还会发出其他类型的消息,包括心跳消息以及关于 schema 更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它仅处理预期的数据更改消息。您可以使用以下方法之一将连接器配置为选择性应用 SMT:

Payload 序列化格式

Outbox 事件路由器 SMT 支持任意 payload 格式。SMT 会在不修改的情况下传递它从 outbox 表读取的 payload 列值。SMT 将这些列值转换为 Kafka 消息字段的方式取决于您如何配置 SMT。常见的 payload 格式是 JSON 和 Avro。

将 JSON 用作 Payload 格式

Outbox 事件路由器 SMT 的默认序列化格式是 JSON。要使用此格式,源列的数据类型必须是 JSON(例如,PostgreSQL 中的 jsonb)。

将转义的 JSON 字符串展开为 JSON

当 Debezium outbox 消息将 payload 表示为 JSON 字符串时,生成的 Kafka 消息会像以下示例一样转义该字符串

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "{\"id\": 1, \"lineItems\": [{\"id\": 1, \"item\": \"Debezium in Action\", \"status\": \"ENTERED\", \"quantity\": 2, \"totalPrice\": 39.98}, {\"id\": 2, \"item\": \"Debezium for Dummies\", \"status\": \"ENTERED\", \"quantity\": 1, \"totalPrice\": 29.99}], \"orderDate\": \"2019-01-31T12:13:01\", \"customerId\": 123}"
}

outbox 事件路由器使您能够将消息内容展开为“实际”JSON,并从 JSON 文档推断出伴随的模式。生成的 Kafka 消息的格式如下所示

# Kafka Topic: outbox.event.order
# Kafka Message key: "1"
# Kafka Message Headers: "id=4d47e190-0402-4048-bc2c-89dd54343cdc"
# Kafka Message Timestamp: 1556890294484
{
  "id": 1, "lineItems": [{"id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98}, {"id": 2, "item": "Debezium for Dummies", "status": "ENTERED", "quantity": 1, "totalPrice": 29.99}], "orderDate": "2019-01-31T12:13:01", "customerId": 123
}

要启用 outbox 事件路由器转换的使用,请将 table.expand.json.payload 设置为 true,并使用 JsonConverter,如下例所示

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.expand.json.payload=true
value.converter=org.apache.kafka.connect.json.JsonConverter

将 Apache Avro 用作 Payload 格式

Apache Avro 是序列化数据的常用框架。使用 Avro 有利于消息格式治理,并确保 outbox 事件模式以向后兼容的方式演进。

源应用程序如何为出站消息有效负载生成 Avro 格式的内容超出了本文档的范围。一种可能性是利用 KafkaAvroSerializer 类来序列化 GenericRecord 实例。要确保 Kafka 消息值是精确的 Avro 二进制数据,请对连接器应用以下配置

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter

默认情况下,payload 列值(Avro 数据)是唯一的消息值。当数据以 Avro 格式存储时,列格式必须设置为二进制数据类型,例如 PostgreSQL 中的 bytea。SMT 的值转换器也必须设置为 BinaryDataConverter,以便它将 payload 列的二进制值原样传播到 Kafka 消息值中。

Debezium 连接器可以配置为发出心跳、事务元数据或模式更改事件(支持情况因连接器而异)。这些事件无法由 BinaryDataConverter 序列化,因此必须提供其他配置,以便转换器知道如何序列化这些事件。例如,以下配置说明了如何使用 Apache Kafka JsonConverter(不带模式)

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false

委托 Converter 实现由 delegate.converter.type 选项指定。如果转换器需要任何额外的配置选项,也可以指定它们,例如上面显示的禁用模式,使用 schemas.enable=false

以下示例说明了如何配置 SMT 以使用带 Apicurio Registry 的委托转换器将数据转换为 Avro 格式

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.apicurio.registry.utils.converter.AvroConverter
value.converter.delegate.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
value.converter.delegate.converter.apicurio.registry.auto-register=true
value.converter.delegate.converter.registry.find-latest=true

最后,以下示例说明了如何配置 SMT 以使用带 Confluent Schema Registry 的委托转换器将数据转换为 Avro 格式

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
value.converter=io.debezium.converters.BinaryDataConverter
value.converter.delegate.converter.type=io.confluent.connect.avro.AvroConverter
value.converter.delegate.converter.type.basic.auth.credentials.source=USER_INFO
value.converter.delegate.converter.type.basic.auth.user.info={CREDENTIALS}
value.converter.delegate.converter.type.schema.registry.url={URL}

在前面的配置示例中,由于 AvroConverter 配置为委托转换器,因此需要第三方库。有关如何将第三方库添加到类路径的信息超出了本文档的范围。

发出带有其他字段的消息

您的 outbox 表可能包含您希望添加到发出的 outbox 消息中的列值。例如,考虑一个 outbox 表,其中 aggregatetype 列具有 purchase-order 的值,另一个名为 eventType 的列,其可能值为 order-createdorder-shipped。可以使用 column:placement:alias 语法添加其他字段。

placement 的允许值为: - header - envelope - partition

要将 eventType 列值发出到 outbox 消息头,请像这样配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:header:type

结果将是 Kafka 消息上的一个头,其键为 type,值为 eventType 列的值。

要将 eventType 列值发出到 outbox 消息信封,请像这样配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=eventType:envelope:type

要控制出站消息在哪个分区上生成,请按以下方式配置 SMT

transforms=outbox,...
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
transforms.outbox.table.fields.additional.placement=partitionColumn:partition

请注意,对于 partition 放置,添加别名将无效。

配置选项

下表描述了可以为出站事件路由器 SMT 指定的选项。在表中,Group 列指示 Kafka 的配置选项分类。

表 2. 出站事件路由器 SMT 配置选项描述
Option Default (默认值) 描述

warn

确定 SMT 在 outbox 表上执行 UPDATE 操作时的行为。可能设置包括

  • warn - SMT 会记录一个警告并继续处理下一个 outbox 表记录。

  • error - SMT 会记录一个错误并继续处理下一个 outbox 表记录。

  • fatal - SMT 记录错误,并且连接器停止处理。

outbox 表中的所有更改都应为 INSERT 操作。也就是说,outbox 表充当队列;不允许更新 outbox 表中的记录。SMT 会自动过滤掉 outbox 表上的 DELETE 操作。

id

指定包含唯一事件 ID 的 outbox 表列。此 ID 将存储在发出的事件的头信息中,键为 id

aggregateid

指定包含事件键的 outbox 表列。当此列包含值时,SMT 使用该值作为发出的 outbox 消息的键。这对于在 Kafka 分区中维护正确的顺序非常重要。

默认情况下,发出的 outbox 消息中的时间戳是 Debezium 事件时间戳。要使用不同的时间戳在 outbox 消息中,请将此选项设置为包含您希望在发出的 outbox 消息中显示的时间戳的 outbox 表列。

payload

指定包含事件 payload 的 outbox 表列。

false

指定是否应进行字符串有效负载的 JSON 展开。如果没有找到内容或出现解析错误,则内容保持“原样”。

更多详情,请参阅 展开转义 JSON 部分。

ignore

当启用 JSON 展开属性 table.expand.json.payload 时,确定包含 null 值的 JSON payload 在 outbox 表中的行为。可能设置包括

  • ignore - 忽略 null 值。

  • optional_bytes - 保留 null 值,并将 null 视为 connect 的可选字节。

表、信封

指定一个或多个您想添加到 outbox 消息头或信封中的 outbox 表列。指定一个逗号分隔的对列表。在每一对中,指定列的名称以及您是否希望值在头信息或信封中。用冒号分隔对中的值,例如

id:header,my-field:envelope

要为列指定别名,请指定一个三元组,别名作为第三个值,例如

id:header,my-field:envelope:my-alias

第二个值是放置位置,它必须始终是 headerenvelope

true

表、信封

指定此转换是否在 Outbox payload 中找不到 table.fields.additional.placement 属性指定的字段时抛出错误。

表、模式

设置后,此值将用作 Kafka Connect Schema Javadoc 中描述的模式版本。

aggregatetype

路由器

指定 outbox 表中一列的名称。默认行为是将此列中的值作为连接器向其发出 outbox 消息的主题名称的一部分。示例在 预期 outbox 表的说明 中。

(?<routedByValue>.*)

路由器

指定一个正则表达式,outbox SMT 在 RegexRouter 中应用于 outbox 表记录。此正则表达式是 route.topic.replacement SMT 选项设置的一部分。

默认行为是 SMT 将 route.topic.replacement SMT 选项设置中的默认 ${routedByValue} 变量替换为 route.by.field outbox SMT 选项的设置。

outbox.event​.${routedByValue}

路由器

指定连接器发出 outbox 消息的主题名称。默认主题名称是 outbox.event. 加上 outbox 表记录中的 aggregatetype 列值。例如,如果 aggregatetype 值为 customers,则主题名称为 outbox.event.customers

要更改主题名称,您可以

  • route.by.field 选项设置为不同的列。

  • route.topic.regex 选项设置为不同的正则表达式。

false

路由器

指示空或 null 的有效负载是否会导致连接器发出墓碑事件。

tracingspancontext

跟踪

包含跟踪跨度上下文的字段名称。

debezium-read

跟踪

表示 Debezium 处理跨度的操作名称。

false

跟踪

如果为 true,则仅跟踪具有序列化上下文字段的事件。

分布式跟踪

出站事件路由 SMT 支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档