Outbox,就像我电子邮件客户端中的那个文件夹一样?不,不完全是,但有一些相似之处!
Outbox 这个术语描述了一种模式,它允许独立组件或服务执行读取您自己的写入语义,同时在组件或服务边界之间提供对这些写入的可靠、最终一致的视图。
您可以在我们的博客文章《使用 Outbox 模式实现可靠的微服务数据交换》中阅读有关 Outbox 模式及其在微服务中的应用的更多信息。
那么,Outbox 事件路由器到底是什么?
在 Debezium 版本 0.9.3.Final 中,我们引入了一个即用型单消息转换 (SMT),它构建在 Outbox 模式之上,用于通过 Debezium 和 Kafka 传播数据变更事件。有关如何使用此转换的详细信息,请参阅文档。
Quarkus 赋能,飞速发展!
Quarkus 是一个面向 GraalVM 和 HotSpot 优化的 Kubernetes 原生 Java 框架,使用了业界最佳的 Java 技术和标准。Quarkus 旨在为开发人员提供统一的响应式和命令式编程模型,以应对广泛的应用架构。
那么,用通俗的语言来说,这一切到底意味着什么?
简而言之,Debezium 社区现在可以在基于 Quarkus 的应用程序中利用 Outbox 模式,通过一个即用型扩展,该扩展与 Debezium 连接器并行工作,以发出变更数据事件。Debezium Quarkus Outbox 扩展可以在 Quarkus 的 JVM 或原生镜像模式下使用。
如何获取?
目前,必须像下面所示那样手动将依赖项添加到 Quarkus 应用程序的 pom.xml 中。未来计划将此扩展提供到 Quarkus 扩展目录中,并通过 Quarkus 的 Maven 插件发布。
<dependency>
<groupId>io.debezium.quarkus</groupId>
<artifactId>debezium-quarkus-outbox</artifactId>
<version>1.1.0.Alpha1</version>
</dependency> | 在撰写此博文时,该扩展已发布为1.1.0.Alpha1版本。 |
使用扩展
Debezium Outbox 扩展使用观察者模式来监控用户应用程序何时发出实现了 io.debezium.outbox.quarkus.ExportedEvent 接口的对象。这使得 Quarkus 应用程序的行为与扩展的行为完全解耦。
让我们通过一个简单的示例来了解一下,该示例中有一个服务负责存储新创建的订单,然后发出一个事件,该事件可用于通知其他感兴趣的服务订单已创建。
首先,我们将从实现 OrderCreatedEvent 开始,它是 ExportedEvent 的一个实现。此事件用于在 OrderService 保存 Order 时发出信号。
public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> {
private final long orderId;
private final JsonNode payload;
private final Instant created;
public OrderCreatedEvent(Instant createdAt, Order order) {
this.orderId = order.getId();
this.payload = convertOrderToJsonNode(order);
this.created = createdAt;
}
@Override
public String getAggregateId() {
return String.valueOf(orderId);
}
@Override
public String getAggregateType() {
return "Order";
}
@Override
public JsonNode getPayload() {
return payload;
}
@Override
public String getType() {
return "OrderCreated";
}
@Override
public Instant getTimestamp() {
return created;
}
} ExportedEvent 接口是定义 Quarkus 应用程序如何提供数据以持久化到 outbox 数据库表的契约。此契约公开了下面讨论的几个不同值。
聚合 ID
聚合 ID 在向 Kafka 发出消息时用作消息键,以保持消息顺序。在此示例中,OrderCreatedEvent 返回订单标识符。
|
|
聚合类型
聚合类型是一个基于字符串的值,用于附加到 Kafka 主题名称,并协助在 Outbox 事件路由器 SMT 中路由给定消息。在此示例中,我们使用 Order。当使用 SMT 的默认配置时,消息将在 outbox.event.Order 主题中找到。有关更多详细信息,请参阅 SMT 配置选项 中的 route.topic.replacement。
Type
消息类型是一个字符串值,它在 Kafka 消息的信封中发出。在此示例中,消息信封中的值为 OrderCreated。
时间戳
默认情况下,Outbox 事件路由器 SMT 在处理记录时使用当前时间戳发出 outbox 事件,但这并不总是适用于所有用例。此字段允许源应用程序指定一个 Instant,然后可以通过 SMT 配置选项 配置该 Instant,以用作 Kafka 消息的时间戳。
Payload
负载是消息内容或值,是 Kafka 主题的消费者所消费的内容。
|
|
| 如果 Quarkus 应用程序中存在多个 |
就其本身而言,这个 OrderCreatedEvent 并没有什么作用。
接下来,我们想实现一个应用程序组件,该组件负责将订单持久化到数据库,然后发出 OrderCreatedEvent 事件。下面的 OrderService 类使用 JPA 来持久化 Order 实体,然后使用 javax.enterprise.event.Event<T> 来通知 outbox 扩展。
@ApplicationScoped
public class OrderService {
@Inject
EntityManager entityManager;
@Inject
Event<ExportedEvent<String, JsonNode>> event;
@Transactional
public Order addOrder(Order order) {
entityManager.persist(order);
event.fire(new OrderCreatedEvent(Instant.now(), order));
return order;
}
} 在启动应用程序之前,必须在 application.properties 中指定某些配置设置。示例如下,其中我们指定了要连接的数据库以及持久化提供程序 Hibernate 的操作方式。
quarkus.datasource.driver=org.postgresql.Driver
quarkus.datasource.url=jdbc:postgresql://order-db:5432/orderdb?currentSchema=orders
quarkus.datasource.username=user
quarkus.datasource.password=password
quarkus.hibernate-orm.database.generation=update
quarkus.hibernate-orm.dialect=org.hibernate.dialect.PostgreSQLDialect
quarkus.hibernate-orm.log.sql=true 通过使用此配置启动应用程序,outbox 表 OutboxEvent 将在 order-db 数据库的 orders schema 中创建,其布局如下:
orderdb=# \d orders.outboxevent
Table "orders.outboxevent"
Column | Type | Collation | Nullable | Default
---------------+-----------------------------+-----------+----------+---------
id | uuid | | not null |
aggregatetype | character varying(255) | | not null |
aggregateid | character varying(255) | | not null |
type | character varying(255) | | not null |
timestamp | timestamp without time zone | | not null |
payload | character varying(8000) | | |
Indexes:
"outboxevent_pkey" PRIMARY KEY, btree (id) | 当使用 |
如果表或列名不符合您的命名约定,可以使用多个 构建时配置选项 进行自定义。例如,如果您希望表名为 outbox 而不是 outboxevent,请将以下行添加到 application.properties 文件中:
quarkus.debezium-outbox.table-name=outbox 如果您启用了 SQL 日志记录或检查 outbox 表的行数,您可能会发现不寻常的是,在保存订单后,记录会被插入到 outbox 表中,然后立即被删除。这是默认行为,因为 Debezium 不需要保留行即可拾取更改。
如果需要保留行,可以使用 运行时配置选项 进行配置。为了启用行保留,请将以下配置添加到 application.properties 文件中:
quarkus.debezium-outbox.remove-after-insert=false 设置连接器
到目前为止,我们已经介绍了如何在 Quarkus 应用程序中配置和使用该扩展来将事件保存到 outbox 数据库表中。最后一步是配置 Debezium 连接器来监视 outbox 并将这些记录发出到 Kafka。
我们将使用以下连接器配置:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "order-db",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "orderdb",
"database.server.name": "dbserver1",
"schema.whitelist" : "orders",
"table.whitelist": "orders.outboxevent",
"tombstones.on.delete": "false",
"transforms": "outbox",
"transforms.outbox.type" : "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.field.event.timestamp": "timestamp",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType"
} 绝大多数都是标准的 Debezium 连接器配置,但重要的是最后几行以 transforms 开头。这些是 Kafka Connect 用于配置和调用 Outbox 事件路由器 SMT 的配置选项。
| 此配置使用自定义的 此配置还指定了 有关如何配置 SMT 的详细信息,请参阅 Outbox 事件路由器配置选项。 |
一旦连接器运行,Order.events 主题将填充来自 outbox 表的消息。以下 JSON 示例表示由 OrderService 保存的 Order。
{
"customerId" : "123",
"orderDate" : "2019-01-31T12:13:01",
"lineItems" : [
{
"item" : "Debezium in Action",
"quantity" : 2,
"totalPrice" : 39.98
},
{
"item" : "Debezium for Dummies",
"quantity" : 1,
"totalPrice" : 29.99
}
]
} 在检查 Order.events 主题时,发出的事件将如下所示:
{
"key": "1",
"headers": "id=cc74eac7-176b-44e7-8bda-413a5088ca66,eventType=OrderCreated"
}
"{\"id\":1,\"customerId\":123,\"orderDate\":\"2019-01-31T12:13:01\",\"lineItems\":[{\"id\":1,\"item\":\"Debezium in Action\",\"quantity\":2,\"totalPrice\":39.98,\"status\":\"ENTERED\"},{\"id\":2,\"item\":\"Debezium for Dummies\",\"quantity\":1,\"totalPrice\":29.99,\"status\":\"ENTERED\"}]}" 总结
设置和使用 Debezium Outbox 扩展非常简单易用。
我们在示例存储库中有一个完整的 示例,其中使用了这里描述的订单服务以及消费事件的货运服务。有关该扩展的更多详细信息,请参阅 Outbox Quarkus 扩展 文档。
未来计划
Debezium Outbox 扩展的当前实现效果相当不错,但我们也认识到仍有改进的空间。我们已经确定并计划在扩展的未来迭代中包含的一些内容是:
-
事件负载的 Avro 序列化支持
-
完整的 outbox 表列属性控制,例如定义、长度、精度、精度和小数位数以及转换器。
-
使用用户提供的实体类进行完整的 outbox 表自定义。
-
允许在单个应用程序中使用不同的
ExportedEvent签名。
我们目前正在 DBZ-1711 中跟踪此扩展的所有未来更改。一如既往,我们欢迎任何和所有反馈,因此请随时在上述问题、Gitter 或邮件列表中告诉我们。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。