Outbox,就像我电子邮件客户端中的那个文件夹一样?不,不完全是,但有一些相似之处!

Outbox 这个术语描述了一种模式,它允许独立组件或服务执行读取您自己的写入语义,同时在组件或服务边界之间提供对这些写入的可靠、最终一致的视图。

您可以在我们的博客文章《使用 Outbox 模式实现可靠的微服务数据交换》中阅读有关 Outbox 模式及其在微服务中的应用的更多信息。

那么,Outbox 事件路由器到底是什么?

在 Debezium 版本 0.9.3.Final 中,我们引入了一个即用型单消息转换 (SMT),它构建在 Outbox 模式之上,用于通过 Debezium 和 Kafka 传播数据变更事件。有关如何使用此转换的详细信息,请参阅文档

Quarkus 赋能,飞速发展!

Quarkus 是一个面向 GraalVM 和 HotSpot 优化的 Kubernetes 原生 Java 框架,使用了业界最佳的 Java 技术和标准。Quarkus 旨在为开发人员提供统一的响应式和命令式编程模型,以应对广泛的应用架构。

那么,用通俗的语言来说,这一切到底意味着什么?

简而言之,Debezium 社区现在可以在基于 Quarkus 的应用程序中利用 Outbox 模式,通过一个即用型扩展,该扩展与 Debezium 连接器并行工作,以发出变更数据事件。Debezium Quarkus Outbox 扩展可以在 Quarkus 的 JVM 或原生镜像模式下使用。

如何获取?

目前,必须像下面所示那样手动将依赖项添加到 Quarkus 应用程序的 pom.xml 中。未来计划将此扩展提供到 Quarkus 扩展目录中,并通过 Quarkus 的 Maven 插件发布。

<dependency>
  <groupId>io.debezium.quarkus</groupId>
  <artifactId>debezium-quarkus-outbox</artifactId>
  <version>1.1.0.Alpha1</version>
</dependency>

在撰写此博文时,该扩展已发布为1.1.0.Alpha1版本。
可能已有更新的扩展版本,请参阅 发布 获取详细信息。

使用扩展

Debezium Outbox 扩展使用观察者模式来监控用户应用程序何时发出实现了 io.debezium.outbox.quarkus.ExportedEvent 接口的对象。这使得 Quarkus 应用程序的行为与扩展的行为完全解耦。

让我们通过一个简单的示例来了解一下,该示例中有一个服务负责存储新创建的订单,然后发出一个事件,该事件可用于通知其他感兴趣的服务订单已创建。

首先,我们将从实现 OrderCreatedEvent 开始,它是 ExportedEvent 的一个实现。此事件用于在 OrderService 保存 Order 时发出信号。

public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
    private final long orderId;
    private final JsonNode payload;
    private final Instant created;

    public OrderCreatedEvent(Instant createdAt, Order order) {
        this.orderId = order.getId();
        this.payload = convertOrderToJsonNode(order);
        this.created = createdAt;
    }

    @Override
    public String getAggregateId() {
        return String.valueOf(orderId);
    }

    @Override
    public String getAggregateType() {
        return "Order";
    }

    @Override
    public JsonNode getPayload() {
        return payload;
    }

    @Override
    public String getType() {
        return "OrderCreated";
    }

    @Override
    public Instant getTimestamp() {
        return created;
    }
}

ExportedEvent 接口是定义 Quarkus 应用程序如何提供数据以持久化到 outbox 数据库表的契约。此契约公开了下面讨论的几个不同值。

聚合 ID

聚合 ID 在向 Kafka 发出消息时用作消息键,以保持消息顺序。在此示例中,OrderCreatedEvent 返回订单标识符。

ExportedEvent 接口是参数化的,参数参数列表的第一个参数允许应用程序指定聚合 ID 的返回数据类型。尽管此示例使用了 String,但返回的值可以是任何可持久化的对象类型。

聚合类型

聚合类型是一个基于字符串的值,用于附加到 Kafka 主题名称,并协助在 Outbox 事件路由器 SMT 中路由给定消息。在此示例中,我们使用 Order。当使用 SMT 的默认配置时,消息将在 outbox.event.Order 主题中找到。有关更多详细信息,请参阅 SMT 配置选项 中的 route.topic.replacement

Type

消息类型是一个字符串值,它在 Kafka 消息的信封中发出。在此示例中,消息信封中的值为 OrderCreated

时间戳

默认情况下,Outbox 事件路由器 SMT 在处理记录时使用当前时间戳发出 outbox 事件,但这并不总是适用于所有用例。此字段允许源应用程序指定一个 Instant,然后可以通过 SMT 配置选项 配置该 Instant,以用作 Kafka 消息的时间戳。

Payload

负载是消息内容或值,是 Kafka 主题的消费者所消费的内容。

ExportedEvent 接口是参数化的,参数参数列表的第二个参数允许应用程序指定负载的返回数据类型。尽管此示例使用 JsonNode 来存储 Order 的 JSON 表示,但负载可以是任何可持久化的对象类型。

如果 Quarkus 应用程序中存在多个 ExportedEvent 的实现,则它们必须使用相同的签名。如果需要不同的签名,则代码应拆分为不同的 Quarkus 应用程序,因为对于给定的 Quarkus 应用程序,所有 ExportedEvent 实现都将存储在同一个数据库 outbox 表中。我们目前正在研究替代方案,以便在将来的版本中放宽此限制,允许在同一应用程序中使用多种变体。

就其本身而言,这个 OrderCreatedEvent 并没有什么作用。

接下来,我们想实现一个应用程序组件,该组件负责将订单持久化到数据库,然后发出 OrderCreatedEvent 事件。下面的 OrderService 类使用 JPA 来持久化 Order 实体,然后使用 javax.enterprise.event.Event<T> 来通知 outbox 扩展。

@ApplicationScoped
public class OrderService {
    @Inject
    EntityManager entityManager;

    @Inject
    Event<ExportedEvent<String, JsonNode>> event;

    @Transactional
    public Order addOrder(Order order) {
        entityManager.persist(order);
        event.fire(new OrderCreatedEvent(Instant.now(), order));
        return order;
    }
}

在启动应用程序之前,必须在 application.properties 中指定某些配置设置。示例如下,其中我们指定了要连接的数据库以及持久化提供程序 Hibernate 的操作方式。

quarkus.datasource.driver=org.postgresql.Driver
quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders
quarkus.datasource.username=user
quarkus.datasource.password=password
quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.log.sql=true

通过使用此配置启动应用程序,outbox 表 OutboxEvent 将在 order-db 数据库的 orders schema 中创建,其布局如下:

orderdb=# \d orders.outboxevent
                        Table "orders.outboxevent"
    Column     |            Type             | Collation | Nullable | Default
---------------+-----------------------------+-----------+----------+---------
 id            | uuid                        |           | not null |
 aggregatetype | character varying(255)      |           | not null |
 aggregateid   | character varying(255)      |           | not null |
 type          | character varying(255)      |           | not null |
 timestamp     | timestamp without time zone |           | not null |
 payload       | character varying(8000)     |           |          |
Indexes:
    "outboxevent_pkey" PRIMARY KEY, btree (id)

当使用 JsonNode 作为负载返回类型时,该扩展使用 JPA 属性转换器将其内容作为字符串存储在数据库中。

如果表或列名不符合您的命名约定,可以使用多个 构建时配置选项 进行自定义。例如,如果您希望表名为 outbox 而不是 outboxevent,请将以下行添加到 application.properties 文件中:

quarkus.debezium-outbox.table-name=outbox

如果您启用了 SQL 日志记录或检查 outbox 表的行数,您可能会发现不寻常的是,在保存订单后,记录会被插入到 outbox 表中,然后立即被删除。这是默认行为,因为 Debezium 不需要保留行即可拾取更改。

如果需要保留行,可以使用 运行时配置选项 进行配置。为了启用行保留,请将以下配置添加到 application.properties 文件中:

quarkus.debezium-outbox.remove-after-insert=false

设置连接器

到目前为止,我们已经介绍了如何在 Quarkus 应用程序中配置和使用该扩展来将事件保存到 outbox 数据库表中。最后一步是配置 Debezium 连接器来监视 outbox 并将这些记录发出到 Kafka。

我们将使用以下连接器配置:

{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "order-db",
  "database.port": "5432",
  "database.user": "user",
  "database.password": "password",
  "database.dbname": "orderdb",
  "database.server.name": "dbserver1",
  "schema.whitelist" : "orders",
  "table.whitelist": "orders.outboxevent",
  "tombstones.on.delete": "false",
  "transforms": "outbox",
  "transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
  "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
  "transforms.outbox.table.field.event.timestamp": "timestamp",
  "transforms.outbox.table.fields.additional.placement": "type:header:eventType"
}

绝大多数都是标准的 Debezium 连接器配置,但重要的是最后几行以 transforms 开头。这些是 Kafka Connect 用于配置和调用 Outbox 事件路由器 SMT 的配置选项。

此配置使用自定义的 route.topic.replacement 配置属性。此设置会将 OrderCreatedEvent 行从 outbox 路由到 Order.events 主题,而不是默认的 outbox.events.Order 主题。

此配置还指定了 field.event.timestamp 配置属性。此设置会将 Kafka 消息时间从 outbox 数据库表中的 timestamp 字段填充,而不是处理行时的当前时间戳。

有关如何配置 SMT 的详细信息,请参阅 Outbox 事件路由器配置选项

一旦连接器运行,Order.events 主题将填充来自 outbox 表的消息。以下 JSON 示例表示由 OrderService 保存的 Order

{
    "customerId" : "123",
    "orderDate" : "2019-01-31T12:13:01",
    "lineItems" : [
        {
            "item" : "Debezium in Action",
            "quantity" : 2,
            "totalPrice" : 39.98
        },
        {
            "item" : "Debezium for Dummies",
            "quantity" : 1,
            "totalPrice" : 29.99
        }
    ]
}

在检查 Order.events 主题时,发出的事件将如下所示:

{
  "key": "1",
  "headers": "id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"
}
"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}"

总结

设置和使用 Debezium Outbox 扩展非常简单易用。

我们在示例存储库中有一个完整的 示例,其中使用了这里描述的订单服务以及消费事件的货运服务。有关该扩展的更多详细信息,请参阅 Outbox Quarkus 扩展 文档。

未来计划

Debezium Outbox 扩展的当前实现效果相当不错,但我们也认识到仍有改进的空间。我们已经确定并计划在扩展的未来迭代中包含的一些内容是:

  • 事件负载的 Avro 序列化支持

  • 完整的 outbox 表列属性控制,例如定义、长度、精度、精度和小数位数以及转换器。

  • 使用用户提供的实体类进行完整的 outbox 表自定义。

  • 允许在单个应用程序中使用不同的 ExportedEvent 签名。

我们目前正在 DBZ-1711 中跟踪此扩展的所有未来更改。一如既往,我们欢迎任何和所有反馈,因此请随时在上述问题、Gitter 或邮件列表中告诉我们。

Chris Cranford

Chris 是 IBM 的一名软件工程师,之前在 Red Hat 工作,他致力于 Debezium 项目,并每天都在深入研究 Oracle 和 Change Data Capture 的各个方面。他此前曾从事 Hibernate(领先的开源 JPA 持久化框架)方面的工作,并且继续为 Quarkus 做贡献。Chris 居住在美国北卡罗来纳州。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×