您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

出库 Quarkus 扩展

概述

此扩展的灵感来自于 Outbox Event Router 单个消息转换 (SMT)。正如博客文章 Reliable Microservices Data Exchange with the Outbox Pattern 中所讨论的,微服务通常需要彼此交换信息,而处理此问题的一个绝佳方法是使用 Outbox 模式并结合 Debezium 的 Outbox Event Router SMT。

下图展示了此模式的整体架构

Outbox Pattern

Outbox 扩展的目标是为 Quarkus 应用程序提供一个可重用、高度可配置的组件,该组件有助于使用 Outbox 模式并结合 Debezium 的 CDC 连接器管道,以可靠且异步的方式与任何数据使用者共享数据。

入门

为了开始使用 Debezium Outbox Quarkus 扩展,需要将其添加为 Quarkus 应用程序的一部分,如下所示

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
  <version>3.3.0.Final</version>
</dependency>

该扩展为应用程序提供了 io.debezium.outbox.quarkus.ExportedEvent 接口。应用程序类需要实现此接口,并通过 javax.enterprise.event.Event 类发出事件。

ExportedEvent 接口是参数化的,允许应用程序指定事件发布的几个属性所使用的 Java 类型。对于给定的 Quarkus 应用程序,重要的是,所有 ExportedEvent 接口的实现都必须使用相同的参数类型,否则将抛出构建失败,因为所有事件都将使用相同的底层数据库表。

示例

以下示例说明了 ExportedEvent 接口的一个实现,该实现代表一个已创建的订单

OrderCreatedEvent.java
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {

    private static final String TYPE = "Order";
    private static final String EVENT_TYPE = "OrderCreated";

    private final long orderId;
    private final JsonNode jsonNode;
    private final Instant timestamp;

    public OrderCreatedEvent(Instant createdAt, Order order) {
        this.orderId = order.getId();
        this.timestamp = createdAt;
        this.jsonNode = convertToJson(order);
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(orderId);
    }

    @Override
    public String getAggregateType() {
        return TYPE;
    }

    @Override
    public JsonNode getPayload() {
        return jsonNode;
    }

    @Override
    public String getType() {
        return EVENT_TYPE;
    }

    @Override
    public Instant getTimestamp() {
        return timestamp;
    }

    @Override
    public Map<String, Object> getAdditionalFieldValues() {
        // no additional fields
        return Collections.emptyMap();
    }
}

以下示例说明了一个发出 OrderCreatedEventOrderService

OrderService.java
@ApplicationScoped
public class OrderService {

    @Inject
    OrderRepository orderRepository;

    @Inject
    Event<ExportedEvent<?, ?>> event;

    @Transactional
    public Order addOrder(Order order) {
        order = orderRepository.save(order);
        event.fire(new OrderCreatedEvent(Instant.now(), order));
        return order;
    }
}

当应用程序代码通过调用 Event#fire() 来触发事件时,Outbox 扩展将被通知事件发生,并将事件的内容持久化到当前事务范围内的 outbox 事件表中。Debezium CDC 连接器将与 Outbox Event Router 结合使用,监控此表,并负责使用 CDC 事件中继该数据。

要查看完整的端到端演示,Outbox 示例说明了两个 Quarkus 微服务应用程序,它们在放置或取消订单时使用 outbox 模式在它们之间共享数据。

响应式变体

如果您的应用程序使用响应式数据源或 Hibernate Reactive,则必须使用略有不同的配置来将扩展添加到您的应用程序。

例如,使用以下配置导入扩展的响应式变体

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-quarkus-outbox-reactive</artifactId>
  <version>3.3.0.Final</version>
</dependency>

响应式扩展提供了与非响应式变体相同的 io.debezium.outbox.quarkus.ExportedEvent 接口,但它还提供了 DebeziumOutboxHandler 类。将 DebeziumOutboxHandler bean 注入应用程序可提供一种快速将 ExportedEvent 持久化到 outbox 表的方法。以下示例显示了一个使用与先前示例中相同的 ExportedEvent 的调用

OrderService.java
@ApplicationScoped
public class OrderService {

    @Inject
    DebeziumOutboxHandler debeziumOutboxHandler;

    @Inject
    OrderRepository orderRepository;

    @Inject
    Event<ExportedEvent<?, ?>> event;

    @ReactiveTransactional
    public Uni<Order> addOrder(Order order) {
        return orderRepository.persistAndFlush(order)
        .call(debeziumOutboxHandler.persistToOutbox(new OrderCreatedEvent(Instant.now(), order)));

    }
}

persistToOutbox 方法返回一个 Uni;因此,它必须由订阅者观察才能接收结果。有关使用 Mutiny 库构建响应式应用程序的更多信息,请参阅 Quarkus Mutiny 入门指南

附加字段映射

Debezium Outbox SMT 可以配置为读取附加字段,并将这些字段值作为事件头或事件值的一部分发出。

为了传递要由 Quarkus Outbox 扩展保存的附加字段映射,必须在 application.properties 中指定配置属性 quarkus.debezium-outbox.additional-fields。此配置属性是附加字段定义的逗号分隔列表,这些定义将被添加到 Outbox 实体映射中,并通过应用程序的 ExportedEvent 接口实现传递。

此逗号分隔列表中的每个条目必须遵循此格式

<field-name>:<field-java-type>[:<field-column-definition>[:<field-jpa-attribute-converter>]]

该模式表明字段的名称和 Java 类型是必需的,而列定义和 JPA 属性转换器是可选的。但是,请注意,如果您希望指定 JPA 属性转换器,则必须指定列定义。

以下示例展示了如何定义一个名为 customer_name 的附加字段,该字段在 Java 中表示为 String,并应在 outbox 表中存储为 VARCHAR(100) 列。此示例还展示了一个 JPA 属性转换器定义,该转换器强制将字符串存储为大写。

application.properties
quarkus.debezium-outbox.additional-fields=customer_name:string:varchar(100):example.UpperCase

一旦在应用程序的 .properties 文件中配置了字段,应用程序代码就需要通过其导出的事件提供相应的。为此,扩展 ExportedEvent 的应用程序类需要重写名为 getAdditionalFieldValues() 的方法,并返回一个包含附加字段名称和值的 Map

在以下示例中,我们展示了如何指定 customer_name 字段,值为 Acme Goods。使用我们上面示例部分中的 OrderCreatedEvent,我们扩展了该事件

OrderCreatedEvent.java
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
    ...
    @Override
    public Map<String, Object> getAdditionalFieldValues() {
        return Collections.singletonMap("customer_name", "Acme Goods");
    }
}

附加字段映射确实允许为每个字段指定 JPA 属性转换器。

在此示例中,我们定义了 example.UpperCase,它将在插入之前将任何提供的字符串值转换为大写。JPA 属性转换器允许将此类行为与调用站点解耦,从而允许重用通用行为。

通过应用程序 .properties 文件中的配置以及更新 OrderCreateedEvent 来提供这些附加字段和值,Debezium Outbox SMT 现在可以访问这些附加字段值并将它们放入发出的事件中。

配置

Outbox 扩展可以通过在 Quarkus application.properties 文件中设置选项来配置。该扩展开箱即用,具有默认配置,但这可能并非在所有情况下都理想。

构建时配置选项

配置属性

Type

Default (默认值)

quarkus.debezium-outbox.table-name

创建 outbox 表时要使用的表名。

string

OutboxEvent

quarkus.debezium-outbox.id.name

事件 ID 列的列名。
例如,uuid

string

id

quarkus.debezium-outbox.id.column-definition

事件 ID 列的数据库特定列定义。
例如,uuid not null

string

UUID NOT NULL

string

aggregateid

quarkus.debezium-outbox.aggregate-id.column-definition

聚合 ID 的数据库特定列定义。
例如,varchar(50) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.aggregate-id.converter

事件键列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.aggregate-type.name

事件聚合类型列的列名。

string

aggregatetype

quarkus.debezium-outbox.aggregate-type.column-definition

聚合类型的数据库特定列定义。
例如,varchar(15) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.aggregate-type.converter

事件聚合类型列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.type.name

事件类型列的列名。

string

type

quarkus.debezium-outbox.type.column-definition

事件类型的数据库特定列定义。
例如,varchar(50) not null

string

VARCHAR(255) NOT NULL

quarkus.debezium-outbox.type.converter

事件类型列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.timestamp.name

事件时间戳列的列名。

string

timestamp

quarkus.debezium-outbox.timestamp.column-definition

事件时间戳的数据库特定列定义。
例如,timestamp not null

string

TIMESTAMP NOT NULL

quarkus.debezium-outbox.timestamp.converter

事件时间戳列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.payload.name

事件载荷列的列名。

string

payload

quarkus.debezium-outbox.payload.column-definition

事件载荷的数据库特定列定义。
例如,text not null

string

VARCHAR(8000)

quarkus.debezium-outbox.payload.converter

事件载荷列的 JPA 属性转换器。
例如,com.company.TheAttributeConverter

string

quarkus.debezium-outbox.payload.type

Hibernate 用户类型实现的完全限定类名。
例如,io.company.types.JsonNodeBinaryType

string

quarkus.debezium-outbox.tracing-span.name

跟踪 span 上下文列的列名。

string

tracingspancontext

quarkus.debezium-outbox.tracingspancontext.column-definition

跟踪 span 上下文列的数据库特定列定义。
例如,text not null

string

VARCHAR(256)

quarkus.debezium-outbox.additional-fields

将在 outbox 表中持久化的附加字段映射的逗号分隔列表。

有关格式和用法的详细信息,请参阅 附加字段映射

string

构建时配置将开箱即用,与 Outbox Event Router SMT 配合良好。当不使用默认值时,请确保 SMT 配置匹配。

运行时配置选项

配置属性

Type

Default (默认值)

quarkus.debezium-outbox.remove-after-insert

插入后是否删除 outbox 条目。

条目的删除不会影响 Debezium 连接器发出 CDC 事件的能力。这用作一种方式,以防止表的基础存储随时间增长。

boolean

true

分布式跟踪

该扩展支持分布式跟踪。有关更多详细信息,请参阅 跟踪文档