您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium Quarkus 扩展

简介

Quarkus 的 Debezium 扩展Debezium Runtime 集成到 Quarkus 应用程序中,使开发人员能够直接在轻量级、云原生应用程序中从支持的数据库消费 变更数据捕获 (CDC) 事件。

何时使用扩展

  • Quarkus 中的 CDC:将 Debezium 无缝嵌入 Quarkus 应用,非常适合微服务用例。

  • 无 Kafka 选项:当只需要简单的流式处理或进程内处理时,无需完整的 Kafka 基础设施。

  • 轻量且快速:利用 Quarkus 原生的快速启动时间和低内存占用。

  • 开发者友好:通过 Quarkus 原生配置和依赖管理简化设置。

特性

该扩展支持成熟的 Debezium 功能以及更多!以下是新增或重审的特性列表。

  • Debezium 捕获监听器

  • Debezium 自定义反序列化器

  • Debezium 生命周期事件

  • Debezium 心跳事件

  • Debezium 自定义数据类型转换器

  • Debezium 通知事件

  • Debezium 后置处理器

支持的连接器

支持以下连接器:

  • 用于 Postgres 连接器的 quarkus-debezium-postgres 扩展。

  • 用于 Mongodb 连接器的 quarkus-debezium-mongodb 扩展。

先决条件

  • 已安装 JDK 21+ 并正确配置了 JAVA_HOME

  • Apache maven 3.9.9

  • Quarkus 版本 3.25.0

  • Docker 或 Podman

  • 如果想构建原生可执行文件,可选择安装并正确配置 Mandrel 或 GraalVM。

工作示例

debezium-example 仓库 中提供了演示该扩展功能的完整示例。它可以作为设置和测试您自己的 Quarkus 应用程序的参考。

引导一个 Quarkus 项目

如果您已经有一个 Quarkus 应用程序,可以跳过此部分。

创建新 Quarkus 项目最简单的方法是打开终端并运行以下命令:

mvn io.quarkus.platform:quarkus-maven-plugin:3.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=getting-started

它将在 ./getting-started 中生成以下内容:

  • Quarkus 的样板代码

  • 应用程序配置

将扩展添加到项目中

在项目中,将 Debezium 扩展依赖添加到您的应用程序的 pom.xml 文件中。

    <dependencies>
        <!-- [...] -->
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>quarkus-debezium-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>
        <!-- [...] -->
    </dependencies>

配置 Debezium 连接器

Debezium 扩展附带一个 连接器,用于监听数据源的 CDC 事件,但最低限度的配置是必需的。

该扩展使用 Quarkus (SmallRye Config API) 提供与配置相关的所有机制。Debezium 配置使用 quarkus.debezium.* 前缀,如果我们使用位于 src/main/resources/application.properties 的应用程序配置文件,那么最低限度设置的结果如下:

# Debezium CDC configuration
quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.name=native
quarkus.debezium.topic.prefix=native
quarkus.debezium.plugin.name=pgoutput
quarkus.debezium.snapshot.mode=initial

# datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=<your username>
quarkus.datasource.password=<your password>
quarkus.datasource.jdbc.url=jdbc:postgresql://:5432/hibernate_orm_test
quarkus.datasource.jdbc.max-size=16

可用的配置参数可在 Debezium 文档 中找到。此外,您必须指定 Debezium 运行时所需的 数据源配置参数

捕获 Debezium 的事件

从之前的最低配置继续,您的 Quarkus 应用程序可以直接接收 CDC 事件载荷。

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing
    public void capture(CapturingEvent<SourceRecord> record) {
        // process your events
    }

}

CapturingEvent<T> 包含与数据库操作类型相关的信息。

    @Capturing
    public void capture(CapturingEvent<SourceRecord> record) {
        switch (record) {
            case Create<SourceRecord> event -> {}
            case Delete<SourceRecord> event -> {}
            case Message<SourceRecord> event -> {}
            case Read<SourceRecord> event -> {}
            case Truncate<SourceRecord> event -> {}
            case Update<SourceRecord> event -> {}
        }
    }

过滤变更数据捕获事件

可以通过 destination 过滤事件。

    @Capturing(destination = "native.inventory.products")
    public void capture(CapturingEvent<SourceRecord> record) {
        // process your event
    }

默认情况下,Debezium 连接器的 destination 由配置中定义的 prefix 名称加上数据库名称和发生更改的表名组成。在某些情况下,destination 会使用 SMT 重新定义。

通过 Jackson 进行反序列化

Quarkus 内置了基于 Jackson 的 JSON 序列化和反序列化支持。有一个现有的 ObjectMapperDeserializer,可以用于通过 Jackson 反序列化所有数据对象。

需要继承相应的反序列化器类。因此,我们需要创建一个扩展自 ObjectMapperDeserializerProductDeserializer

public class ProductDeserializer extends ObjectMapperDeserializer<Product> {
    public ProductDeserializer() {
        super(Product.class);
    }
}

最后,配置您的捕获通道,以便为特定目标使用 Jackson 反序列化器。

quarkus.debezium.capturing.products.destination=native.inventory.products
quarkus.debezium.capturing.products.deserializer=com.acme.product.jackson.ProductDeserializer

并在代码中使用它,添加分配给反序列化器的目标。

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing(destination = "native.inventory.products")
    public void capture(CapturingEvent<Product> record) {
        // process your events
    }

}

或者只获取反序列化后的对象,不带 CapturingEvent<T>

请记住,在这种情况下,您将没有关于数据库操作的信息。

import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.runtime.Capturing;

@ApplicationScoped
public class ProductHandler {


    @Capturing(destination = "native.inventory.products")
    public void capture(Product product) {
        // process your events
    }

}

生命周期事件

可以获取与 Debezium 监听生命周期事件状态相关的信息。

import io.debezium.runtime.events.*;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class LifecycleListener {

    public void started(@Observes ConnectorStartedEvent event) {
        // your logic
    }

    public void stopped(@Observes ConnectorStoppedEvent connectorStoppedEvent) {
        // your logic
    }
    public void tasksStarted(@Observes TasksStartedEvent tasksStartedEvent) {
        // your logic
    }
    public void tasksStopped(@Observes TasksStoppedEvent tasksStoppedEvent) {
        // your logic
    }
    public void pollingStarted(@Observes PollingStartedEvent pollingStartedEvent) {
        // your logic
    }
    public void pollingStopped(@Observes PollingStoppedEvent pollingStoppedEvent) {
        // your logic
    }
    public void completed(@Observes DebeziumCompletionEvent debeziumCompletionEvent) {
        // your logic
    }

}

以下事件可用:

  • ConnectorStartedEvent:当 Debezium 启动连接器时触发。

  • ConnectorStoppedEvent:当 Debezium 停止连接器时触发。

  • TasksStartedEvent:当连接器任务启动时触发。

  • TasksStoppedEvent:当连接器任务停止时触发。

  • PollingStartedEvent:当 Debezium 引擎开始轮询连接器更改时触发。

  • PollingStoppedEvent:当 Debezium 引擎停止轮询连接器更改时触发。

  • DebeziumCompletionEvent:在 Debezium 引擎完成关闭后触发。它包含关于先前执行是否成功、失败原因和错误的所有信息。

心跳事件

可以在您的 Quarkus 应用程序中监听心跳事件。

import io.debezium.runtime.events.DebeziumHeartbeat;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class HeartbeatListener {

    public void heartbeat(@Observes DebeziumHeartbeat heartbeat) {
        //
    }
}

DebeziumHeartbeat 包含以下相关信息:

  • 连接器

  • Debezium 状态

  • 分区

  • 偏移量

通知事件

Debezium 通知 提供关于细粒度状态(snapshotstreaming)的事件,始终作为 Jakarta 事件提供。

import io.quarkus.debezium.notification.SnapshotEvent;
import io.quarkus.debezium.notification.DebeziumNotification;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;

@ApplicationScoped
public class NotificationListener {

    public void snapshot(@Observes SnapshotEvent event) {
        //
    }

    public void notification(@Observes DebeziumNotification event) {
        //
    }
}

以下事件可用:

  • DebeziumNotification

  • SnapshotStarted

  • SnapshotInProgres

  • SnapshotTableScanCompleted

  • SnapshotAborted

  • SnapshotSkipped

  • SnapshotCompleted

  • SnapshotPaused

  • SnapshotResumed

数据类型转换器

可以使用 @CustomConverter 注解在扩展中定义 Debezium 自定义转换器,并实例化一个定义类型转换的 ConverterDefinition

import io.debezium.relational.CustomConverterRegistry.ConverterDefinition;
import io.debezium.runtime.CustomConverter;
import io.debezium.spi.converter.ConvertedField;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.SchemaBuilder;

@ApplicationScoped
public class StringConverter {

    @CustomConverter
    public ConverterDefinition<SchemaBuilder> bind(ConvertedField field) {
        return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
    }
}

这种类型的转换应用于 cdc 事件中的所有字段。要仅将转换应用于部分字段,可以通过 FieldFilterStrategy 来丰富 CustomConverter,该策略仅过滤感兴趣的字段。

    @CustomConverter(filter = CustomFieldFilterStrategy.class)
    public ConverterDefinition<SchemaBuilder> filteredBind(ConvertedField field) {
        return new ConverterDefinition<>(SchemaBuilder.string(), String::valueOf);
    }

    @ApplicationScoped
    public static class CustomFieldFilterStrategy implements FieldFilterStrategy {

        @Override
        public boolean filter(ConvertedField field) {
            // your logic
            return false;
        }

    }

后置处理器

后置处理器 在事件流中比 SMT 更早地应用轻量级的、每条消息的更改,这使得它们能够修改 Debezium 上下文中的消息。这使得它们比转换器更有效。可以通过两种方式定义后置处理器:作为配置参数或使用 @PostProcessing 注解。

对于配置,官方文档概述了可用的参数,例如 Reselect 后置处理器。

quarkus.debezium.post.processors=reselector
quarkus.debezium.post.processors.reselector.type=io.debezium.processors.reselect.ReselectColumnsPostProcessor
quarkus.debezium.post.processors.reselector.reselect.unavailable.values=true
quarkus.debezium.post.processors.reselector.reselect.null.values=true
quarkus.debezium.post.processors.reselector.reselect.use.event.key=false
quarkus.debezium.post.processors.reselector.reselect.error.handling.mode=WARN

对于代码,扩展中提供了 @PostProcessing 注解,该注解可以访问 keyStruct

import io.debezium.runtime.PostProcessing;
import jakarta.enterprise.context.ApplicationScoped;
import org.apache.kafka.connect.data.Struct;

@ApplicationScoped
public class PostProcessorHandler {

    @PostProcessing
    public void processing(Object key, Struct struct) {
        // apply your logic
    }
}

DevService 支持

Quarkus 在开发和测试模式下使用 Dev Services 自动配置未配置的服务。当包含一个未配置的扩展时,Quarkus 会启动必要的服务(通过 Testcontainers)并将其连接到您的应用程序。对于 Debezium,需要 进行设置,而 Quarkus 的默认镜像不支持。该扩展已配备了一个配置了变更数据捕获镜像的开发服务,但该支持是实验性的,如果出现错误或问题,您可以使用以下属性禁用它:

quarkus.datasource.devservices.enabled=false

或者使用官方 Debezium 镜像覆盖。

quarkus.datasource.devservices.image-name=quay.io/debezium/postgres:15