Debezium Quarkus 扩展
以下文档将探讨您的 Quarkus 应用程序如何集成 Debezium。
简介
支持的连接器
支持以下连接器:
-
用于 Postgres 连接器的
quarkus-debezium-postgres扩展。 -
用于 Mongodb 连接器的
quarkus-debezium-mongodb扩展。
先决条件
-
已安装 JDK 21+ 并正确配置了
JAVA_HOME。 -
Apache maven 3.9.9
-
Quarkus 版本 3.25.0
-
Docker 或 Podman
-
如果想构建原生可执行文件,可选择安装并正确配置 Mandrel 或 GraalVM。
工作示例
在 debezium-example 仓库 中提供了演示该扩展功能的完整示例。它可以作为设置和测试您自己的 Quarkus 应用程序的参考。
引导一个 Quarkus 项目
如果您已经有一个 Quarkus 应用程序,可以跳过此部分。
创建新 Quarkus 项目最简单的方法是打开终端并运行以下命令:
mvn io.quarkus.platform:quarkus-maven-plugin:3.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started
它将在 ./getting-started 中生成以下内容:
-
Quarkus 的样板代码
-
应用程序配置
将扩展添加到项目中
在项目中,将 Debezium 扩展依赖添加到您的应用程序的 pom.xml 文件中。
<dependencies>
<!-- [...] -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>quarkus-debezium-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- [...] -->
</dependencies>
配置 Debezium 连接器
Debezium 扩展附带一个 连接器,用于监听数据源的 CDC 事件,但最低限度的配置是必需的。
该扩展使用 Quarkus (SmallRye Config API) 提供与配置相关的所有机制。Debezium 配置使用 quarkus.debezium.* 前缀,如果我们使用位于 src/main/resources/application.properties 的应用程序配置文件,那么最低限度设置的结果如下:
# Debezium CDC configuration
quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.name=native
quarkus.debezium.topic.prefix=native
quarkus.debezium.plugin.name=pgoutput
quarkus.debezium.snapshot.mode=initial
# datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=<your username>
quarkus.datasource.password=<your password>
quarkus.datasource.jdbc.url=jdbc:postgresql://:5432/hibernate_orm_test
quarkus.datasource.jdbc.max-size=16
可用的配置参数可在 Debezium 文档 中找到。此外,您必须指定 Debezium 运行时所需的 数据源配置参数。
捕获 Debezium 的事件
从之前的最低配置继续,您的 Quarkus 应用程序可以直接接收 CDC 事件载荷。
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing
public void capture(CapturingEvent<SourceRecord> record) {
// process your events
}
}
CapturingEvent<T> 包含与数据库操作类型相关的信息。
@Capturing
public void capture(CapturingEvent<SourceRecord> record) {
switch (record) {
case Create<SourceRecord> event -> {}
case Delete<SourceRecord> event -> {}
case Message<SourceRecord> event -> {}
case Read<SourceRecord> event -> {}
case Truncate<SourceRecord> event -> {}
case Update<SourceRecord> event -> {}
}
}
过滤变更数据捕获事件
可以通过 destination 过滤事件。
@Capturing(destination = "native.inventory.products")
public void capture(CapturingEvent<SourceRecord> record) {
// process your event
}
默认情况下,Debezium 连接器的 destination 由配置中定义的 prefix 名称加上数据库名称和发生更改的表名组成。在某些情况下,destination 会使用 SMT 重新定义。
通过 Jackson 进行反序列化
Quarkus 内置了基于 Jackson 的 JSON 序列化和反序列化支持。有一个现有的 ObjectMapperDeserializer,可以用于通过 Jackson 反序列化所有数据对象。
需要继承相应的反序列化器类。因此,我们需要创建一个扩展自 ObjectMapperDeserializer 的 ProductDeserializer。
public class ProductDeserializer extends ObjectMapperDeserializer<Product> {
public ProductDeserializer() {
super(Product.class);
}
}
最后,配置您的捕获通道,以便为特定目标使用 Jackson 反序列化器。
quarkus.debezium.capturing.products.destination=native.inventory.products
quarkus.debezium.capturing.products.deserializer=com.acme.product.jackson.ProductDeserializer
并在代码中使用它,添加分配给反序列化器的目标。
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing(destination = "native.inventory.products")
public void capture(CapturingEvent<Product> record) {
// process your events
}
}
或者只获取反序列化后的对象,不带 CapturingEvent<T>。
|
请记住,在这种情况下,您将没有关于数据库操作的信息。 |
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.runtime.Capturing;
@ApplicationScoped
public class ProductHandler {
@Capturing(destination = "native.inventory.products")
public void capture(Product product) {
// process your events
}
}
生命周期事件
可以获取与 Debezium 监听生命周期事件状态相关的信息。
import io.debezium.runtime.events.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class LifecycleListener {
public void started(@Observes ConnectorStartedEvent event) {
// your logic
}
public void stopped(@Observes ConnectorStoppedEvent connectorStoppedEvent) {
// your logic
}
public void tasksStarted(@Observes TasksStartedEvent tasksStartedEvent) {
// your logic
}
public void tasksStopped(@Observes TasksStoppedEvent tasksStoppedEvent) {
// your logic
}
public void pollingStarted(@Observes PollingStartedEvent pollingStartedEvent) {
// your logic
}
public void pollingStopped(@Observes PollingStoppedEvent pollingStoppedEvent) {
// your logic
}
public void completed(@Observes DebeziumCompletionEvent debeziumCompletionEvent) {
// your logic
}
}
以下事件可用:
-
ConnectorStartedEvent:当 Debezium 启动连接器时触发。 -
ConnectorStoppedEvent:当 Debezium 停止连接器时触发。 -
TasksStartedEvent:当连接器任务启动时触发。 -
TasksStoppedEvent:当连接器任务停止时触发。 -
PollingStartedEvent:当 Debezium 引擎开始轮询连接器更改时触发。 -
PollingStoppedEvent:当 Debezium 引擎停止轮询连接器更改时触发。 -
DebeziumCompletionEvent:在 Debezium 引擎完成关闭后触发。它包含关于先前执行是否成功、失败原因和错误的所有信息。
心跳事件
可以在您的 Quarkus 应用程序中监听心跳事件。
import io.debezium.runtime.events.DebeziumHeartbeat;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class HeartbeatListener {
public void heartbeat(@Observes DebeziumHeartbeat heartbeat) {
//
}
}
DebeziumHeartbeat 包含以下相关信息:
-
连接器
-
Debezium 状态
-
分区
-
偏移量
通知事件
Debezium 通知 提供关于细粒度状态(snapshot 和 streaming)的事件,始终作为 Jakarta 事件提供。
import io.quarkus.debezium.notification.SnapshotEvent;
import io.quarkus.debezium.notification.DebeziumNotification;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
@ApplicationScoped
public class NotificationListener {
public void snapshot(@Observes SnapshotEvent event) {
//
}
public void notification(@Observes DebeziumNotification event) {
//
}
}
以下事件可用:
-
DebeziumNotification -
SnapshotStarted -
SnapshotInProgres -
SnapshotTableScanCompleted -
SnapshotAborted -
SnapshotSkipped -
SnapshotCompleted -
SnapshotPaused -
SnapshotResumed
数据类型转换器
可以使用 @CustomConverter 注解在扩展中定义 Debezium 自定义转换器,并实例化一个定义类型转换的 ConverterDefinition。
import io.debezium.relational.CustomConverterRegistry.ConverterDefinition;
import io.debezium.runtime.CustomConverter;
import io.debezium.spi.converter.ConvertedField;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.SchemaBuilder;
@ApplicationScoped
public class StringConverter {
@CustomConverter
public ConverterDefinition<SchemaBuilder> bind(ConvertedField field) {
return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
}
}
这种类型的转换应用于 cdc 事件中的所有字段。要仅将转换应用于部分字段,可以通过 FieldFilterStrategy 来丰富 CustomConverter,该策略仅过滤感兴趣的字段。
@CustomConverter(filter = CustomFieldFilterStrategy.class)
public ConverterDefinition<SchemaBuilder> filteredBind(ConvertedField field) {
return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
}
@ApplicationScoped
public static class CustomFieldFilterStrategy implements FieldFilterStrategy {
@Override
public boolean filter(ConvertedField field) {
// your logic
return false;
}
}
后置处理器
后置处理器 在事件流中比 SMT 更早地应用轻量级的、每条消息的更改,这使得它们能够修改 Debezium 上下文中的消息。这使得它们比转换器更有效。可以通过两种方式定义后置处理器:作为配置参数或使用 @PostProcessing 注解。
对于配置,官方文档概述了可用的参数,例如 Reselect 后置处理器。
quarkus.debezium.post.processors=reselector
quarkus.debezium.post.processors.reselector.type=io.debezium.processors.reselect.ReselectColumnsPostProcessor
quarkus.debezium.post.processors.reselector.reselect.unavailable.values=true
quarkus.debezium.post.processors.reselector.reselect.null.values=true
quarkus.debezium.post.processors.reselector.reselect.use.event.key=false
quarkus.debezium.post.processors.reselector.reselect.error.handling.mode=WARN
对于代码,扩展中提供了 @PostProcessing 注解,该注解可以访问 key 和 Struct。
import io.debezium.runtime.PostProcessing;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.Struct;
@ApplicationScoped
public class PostProcessorHandler {
@PostProcessing
public void processing(Object key, Struct struct) {
// apply your logic
}
}
DevService 支持
Quarkus 在开发和测试模式下使用 Dev Services 自动配置未配置的服务。当包含一个未配置的扩展时,Quarkus 会启动必要的服务(通过 Testcontainers)并将其连接到您的应用程序。对于 Debezium,需要 进行设置,而 Quarkus 的默认镜像不支持。该扩展已配备了一个配置了变更数据捕获镜像的开发服务,但该支持是实验性的,如果出现错误或问题,您可以使用以下属性禁用它:
quarkus.datasource.devservices.enabled=false
或者使用官方 Debezium 镜像覆盖。
quarkus.datasource.devservices.image-name=quay.io/debezium/postgres:15