出库 Quarkus 扩展

概述

此扩展受到 Outbox Event Router 单消息转换 (SMT) 的启发。正如博客文章 Reliable Microservices Data Exchange with the Outbox Pattern 中讨论的那样,微服务通常需要相互交换信息,而处理这种情况的一个好方法是使用 Outbox 模式并结合 Debezium 的 Outbox Event Router SMT。

下图显示了此模式的整体架构

Outbox Pattern

Outbox 扩展的目标是为 Quarkus 应用程序提供一个可重用、高度可配置的组件,该组件可以方便地使用 Outbox 模式并结合 Debezium 的 CDC 连接器管道,可靠地异步地将数据与其他任何数据使用者共享。

入门

为了开始使用 Debezium Outbox Quarkus 扩展,需要按以下方式将扩展添加为 Quarkus 应用程序的一部分

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
  <version>3.3.1.Final</version>
</dependency>

该扩展为应用程序提供了 io.debezium.outbox.quarkus.ExportedEvent 接口。应用程序类需要实现此接口,并且事件将使用 javax.enterprise.event.Event 类进行发出。

ExportedEvent 接口是参数化的,允许应用程序指定由事件发出的几个属性使用的 Java 类型。重要的是,对于给定的 Quarkus 应用程序,ExportedEvent 接口的 **所有** 实现都必须使用相同的参数类型,否则会发生构建失败,因为所有事件将使用相同的底层数据库表。

示例

以下示例说明了实现 ExportedEvent 接口以表示已创建的订单

OrderCreatedEvent.java
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {

    private static final String TYPE = "Order";
    private static final String EVENT_TYPE = "OrderCreated";

    private final long orderId;
    private final JsonNode jsonNode;
    private final Instant timestamp;

    public OrderCreatedEvent(Instant createdAt, Order order) {
        this.orderId = order.getId();
        this.timestamp = createdAt;
        this.jsonNode = convertToJson(order);
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(orderId);
    }

    @Override
    public String getAggregateType() {
        return TYPE;
    }

    @Override
    public JsonNode getPayload() {
        return jsonNode;
    }

    @Override
    public String getType() {
        return EVENT_TYPE;
    }

    @Override
    public Instant getTimestamp() {
        return timestamp;
    }

    @Override
    public Map<String, Object> getAdditionalFieldValues() {
        // no additional fields
        return Collections.emptyMap();
    }
}

以下示例说明了发出 OrderCreatedEventOrderService

OrderService.java
@ApplicationScoped
public class OrderService {

    @Inject
    OrderRepository orderRepository;

    @Inject
    Event<ExportedEvent<?, ?>> event;

    @Transactional
    public Order addOrder(Order order) {
        order = orderRepository.save(order);
        event.fire(new OrderCreatedEvent(Instant.now(), order));
        return order;
    }
}

当应用程序代码通过调用 Event#fire() 发出事件时,Outbox 扩展将被通知事件已发生,并将事件的内容保存在当前事务范围内的 outbox 事件表中。Debezium CDC 连接器与 Outbox Event Router 结合将监视此表,并负责使用 CDC 事件中继该数据。

要查看完整的端到端演示,Outbox 示例说明了两个 Quarkus 微服务应用程序如何使用 outbox 模式在下订单或取消订单时共享数据。

Reactive 变体

如果您的应用程序使用 reactive 数据源或 Hibernate Reactive,则必须使用略有不同的配置来将扩展添加到您的应用程序。

例如,使用以下配置导入扩展的 reactive 变体

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox-reactive</artifactId>
  <version>3.3.1.Final</version>
</dependency>

reactive 扩展为应用程序提供了与非 reactive 变体相同的 io.debezium.outbox.quarkus.ExportedEvent 接口,但它还提供了 DebeziumOutboxHandler 类。将 DebeziumOutboxHandler bean 注入应用程序可提供一种快速将 ExportedEvent 持久化到 outbox 表的方法。以下示例显示了使用与前面示例中相同的 ExportedEvent 的调用

OrderService.java
@ApplicationScoped
public class OrderService {

    @Inject
    DebeziumOutboxHandler debeziumOutboxHandler;

    @Inject
    OrderRepository orderRepository;

    @Inject
    Event<ExportedEvent<?, ?>> event;

    @ReactiveTransactional
    public Uni<Order> addOrder(Order order) {
        return orderRepository.persistAndFlush(order)
        .call(debeziumOutboxHandler.persistToOutbox(new OrderCreatedEvent(Instant.now(), order)));

    }
}

persistToOutbox 方法返回一个 Uni;因此,必须由订阅者观察它才能接收结果。有关使用 Mutiny 库构建 reactive 应用程序的更多信息,请参阅 Quarkus Mutiny primer

附加字段映射

Debezium Outbox SMT 可以配置为读取附加字段,并将这些字段的值作为事件头或事件值的一部分发出。

为了将附加字段映射传递给 Quarkus Outbox 扩展保存,必须在 application.properties 中指定配置属性 quarkus.debezium-outbox.additional-fields。此配置属性是附加字段定义的逗号分隔列表,这些字段将被添加到 Outbox 实体映射中,并通过应用程序的 ExportedEvent 接口实现传递。

此逗号分隔列表中的每个条目必须遵循此格式

<field-name>:<field-java-type>[:<field-column-definition>[:<field-jpa-attribute-converter>]]

模式表明字段的名称和 java-type 是必需的,而列定义和 JPA 属性转换器是可选的。但是,请注意,如果您希望指定 JPA 属性转换器,则必须指定列定义。

以下示例演示了如何定义一个名为 customer_name 的附加字段,该字段在 Java 中表示为 String,并应存储在 outbox 表中作为 VARCHAR(100) 列。此示例还演示了一个 JPA 属性转换器定义,该转换器强制将字符串存储为大写。

application.properties
quarkus.debezium-outbox.additional-fields=customer_name:string:varchar(100):example.UpperCase

在应用程序的 .properties 文件中配置字段后,应用程序代码需要通过其导出的事件提供相应的。为了做到这一点,扩展 ExportedEvent 的应用程序类需要重写名为 getAdditionalFieldValues() 的方法,并返回一个包含附加字段名称和值的 Map

在下面的示例中,我们演示了如何指定 customer_name 字段,其值为 Acme Goods。使用上面示例部分的 OrderCreatedEvent,我们扩展了该事件

OrderCreatedEvent.java
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
    ...
    @Override
    public Map<String, Object> getAdditionalFieldValues() {
        return Collections.singletonMap("customer_name", "Acme Goods");
    }
}

附加字段映射允许为每个字段指定 JPA 属性转换器。

在此示例中,我们定义了 example.UpperCase,它将在插入之前将任何提供的字符串值转换为大写。JPA 属性转换器允许将这种行为与调用站点解耦,从而实现通用行为的重用。

通过在应用程序的 .properties 文件中进行配置,并更新 OrderCreateedEvent 以提供这些附加字段和值,Debezium Outbox SMT 现在可以访问这些附加字段值并将它们放入发出的事件中。

配置

可以通过在 Quarkus application.properties 文件中设置选项来配置 Outbox 扩展。该扩展开箱即用,具有默认配置,但此配置可能不适合所有情况。

构建时配置选项

配置属性

Type

Default (默认值)

quarkus.debezium-outbox.table-name

创建 outbox 表时使用的表名。

string

OutboxEvent

quarkus.debezium-outbox.id.name

事件 ID 列的列名。
例如,uuid

string

id

quarkus.debezium-outbox.id.column-definition

事件 ID 列的数据库特定列定义。
例如,uuid not null

string

UUID NOT NULL

string

aggregateid

quarkus.debezium-outbox.aggregate-id.column-definition

聚合 ID 的数据库特定列定义。
例如,varchar(50) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.aggregate-id.converter

事件键列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.aggregate-type.name

事件聚合类型列的列名。

string

aggregatetype

quarkus.debezium-outbox.aggregate-type.column-definition

聚合类型的数据库特定列定义。
例如,varchar(15) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.aggregate-type.converter

事件聚合类型列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.type.name

事件类型的列名。

string

type

quarkus.debezium-outbox.type.column-definition

事件类型的数据库特定列定义。
例如,varchar(50) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.type.converter

事件类型的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.timestamp.name

事件时间戳列的列名。

string

timestamp

quarkus.debezium-outbox.timestamp.column-definition

事件时间戳的数据库特定列定义。
例如,timestamp not null

string

TIMESTAMP NOT NULL

quarkus.debezium-outbox.timestamp.converter

事件时间戳列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.payload.name

事件载荷列的列名。

string

payload

quarkus.debezium-outbox.payload.column-definition

事件载荷的数据库特定列定义。
例如,text not null

string

VARCHAR(8000)

quarkus.debezium-outbox.payload.converter

事件载荷列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.payload.type

Hibernate 用户类型 实现的完全限定类名。
例如,io.company.types.JsonNodeBinaryType

string

quarkus.debezium-outbox.tracing-span.name

跟踪 span 上下文列的列名。

string

tracingspancontext

quarkus.debezium-outbox.tracingspancontext.column-definition

跟踪 span 上下文列的数据库特定列定义。
例如,text not null

string

VARCHAR(256)

quarkus.debezium-outbox.additional-fields

将在 outbox 表中持久化的附加字段映射的逗号分隔列表。

有关格式和用法的详细信息,请参阅 附加字段映射

string

构建时配置的默认值可以与 Outbox Event Router SMT 开箱即用。在使用默认值时,请确保 SMT 配置匹配。

运行时配置选项

配置属性

Type

Default (默认值)

quarkus.debezium-outbox.remove-after-insert

插入后是否删除 outbox 条目。

条目的删除不会影响 Debezium 连接器从发出 CDC 事件的能力。这用于防止表的基础存储随着时间的推移而增长。

boolean

true

分布式跟踪

该扩展支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档