当我开始研究 Debezium 时,我脑海中浮现出两个问题:能否构建 Debezium 的原生版本?我能否在不依赖额外基础设施的情况下,直接在我的微服务中接收变更数据捕获 (CDC) 事件?

这促使我们开发了一个新的 Debezium 流:我很高兴地宣布 **Debezium Extensions for Quarkus** 的第一个版本!

引言

数据变更捕获 (CDC) 已成为**现代微服务架构**中的关键模式,在某些情况下,它是演进它们的最佳方式之一。然而,采用 CDC 通常会带来基础设施开销,尤其是在快速变化的开发环境中。

Quarkus 的 Debezium 扩展旨在改变这一点,它允许开发人员**直接在他们的 Quarkus 应用程序内部**像处理其他任何内容一样,消费 CDC 事件。只需添加扩展,配置数据源,然后在更改发生时监听它们。

支持的连接器

目前,**Postgres** 连接器是唯一受支持的连接器。

何时应该使用它?

  • 当基础设施是业务关注点时

  • 当低内存占用很重要时,这要归功于 Quarkus 原生支持

  • 轻量级用例,例如概念验证、开发环境或单体到微服务迁移

  • 组织正在初步评估 CDC 时

它不是什么?

  • 不是 Debezium Server 或 Kafka Connect 的替代品,它们保证了生产级别的 CDC

  • 开箱即用的接收器。您必须管理更改事件到达应用程序后会发生什么

  • 将 CDC 逻辑耦合到应用程序中可能会降低灵活性并导致技术债务

开始吧:为 Postgres 构建一个 CDC 微服务

在本节中,我们将介绍创建一个 Debezium 监听器,该监听器将在 Quarkus 应用程序中捕获更改!

前提条件

  • 已安装 JDK 21+ 并配置了 JAVA_HOME

  • Apache maven 3.9.9

  • Quarkus 版本 3.25.0

  • Docker 或 Podman

  • 如果想构建原生可执行文件,可以选择安装并配置 Mandrel 或 GraalVM

设置 Quarkus 项目

如果您已经有一个 Quarkus 应用程序,则可以跳过此步骤。否则,创建新 Quarkus 项目最简单的方法是打开终端并运行以下命令

mvn io.quarkus.platform:quarkus-maven-plugin:3.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=debezium-starter

它会在 ./debezium-starter 目录中生成一个带有 Quarkus 样板代码的 Maven 项目。

向项目中添加扩展

在项目中,将 Debezium 扩展依赖项添加到应用程序的 pom.xmldependencies 部分。在此演练中,我们将添加特定于 Postgres 的连接器实现,但未来会有其他数据库供应商的特定实现。

        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>quarkus-debezium-postgres</artifactId>
            <version>${version.debezium}</version>
        </dependency>

不要忘记在 pom.xml 中设置 version.debezium 属性,以便 Maven 知道要使用哪个 Debezium 扩展版本。

配置扩展

Debezium 扩展附带一个**连接器**(在本例中是 Postgres),它监听数据源的 CDC 事件,但最少的配置是强制性的。连接器可用的配置参数与Debezium 文档中使用的参数相同,但带有 quarkus.debezium.* 前缀。

以下是在使用 Postgres 时处理扩展所必需的最小配置。应将其添加到 src/main/resources/application.properties

# Debezium CDC configuration
quarkus.debezium.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
quarkus.debezium.name=native
quarkus.debezium.topic.prefix=dbz
quarkus.debezium.plugin.name=pgoutput
quarkus.debezium.snapshot.mode=initial

在本指南中,我们将使用DevService 支持和包含 CDC 设置的官方 Debezium 映像。这避免了配置数据源到目标数据库的需要。

示例数据

确实,为开发目的,可以在 Docker 映像中添加一些 SQL 脚本,因此在 src/main/resources/init.sql 中创建一个新文件,包含以下内容

CREATE SCHEMA inventory;

CREATE TABLE inventory.products (
    id INT PRIMARY KEY,
    name VARCHAR(100),
    description TEXT,
    weight BIGINT
);

INSERT INTO inventory.products (id, name, description, weight) VALUES
(1, 'Laptop', 'High-performance ultrabook', 1250),
(2, 'Smartphone', 'Latest model with AMOLED display', 180),
(3, 'Coffee Mug', 'Ceramic mug with lid', 350);

在一个新的开发配置文件中,例如 src/main/resources/application-dev.properties,您可以配置 Quarkus dev-services 使用它

quarkus.datasource.devservices.init-script-path=init.sql

从 Debezium 接收更改数据捕获事件

让我们开始创建一个 Quarkus 应用程序 Bean,它监听名为 InventoryListener 的更改事件。

在以下类中,我们使用 @Capturing 注解了一个方法。此注解在未指定目标的情况下使用,因此该方法会针对 Debezium 生成的每个事件被调用。当您想在单个更改事件处理程序方法中以编程方式确定路由或处理逻辑时,这很有用。

package org.acme;

import io.debezium.runtime.Capturing;
import io.debezium.runtime.CapturingEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
public class InventoryListener {

    private final Logger logger = LoggerFactory.getLogger(InventoryListener.class);

    @Capturing()
    public void products(CapturingEvent<SourceRecord> event) {
        logger.info("capturing event for destination {} with data {}", event.destination(), event.record());
    }
}

如果您在 @Capturing 注解中指定了 destination 值,Debezium 将仅将事件路由到匹配该目标的该方法处理程序。在下面的示例中,如果事件的目标主题是 dbz.inventory.products,则该方法才会被调用。

    @Capturing(destination = "dbz.inventory.products")
    public void orders(CapturingEvent<SourceRecord> event) {
        logger.info("capturing products for destination {} with data {}", event.destination(), event.record());
    }

您还可以将更改事件映射到给定的目标对象类型。例如,以下 ObjectMapperSerializer 实现使用 Jackson 将更改事件转换为 Product 对象类型。

package org.acme;

import io.quarkus.debezium.engine.deserializer.ObjectMapperDeserializer;

public class ProductDeserializer extends ObjectMapperDeserializer<Product> {
    public ProductDeserializer() {
        super(Product.class);
    }
}
public record Product(Integer id, String name, String  description, BigDecimal weight) { }

反序列化器的映射必须以这种方式在 application.properties 中定义

# Deserializer mapping for Product
quarkus.debezium.capturing.product.destination=dbz.inventory.products
quarkus.debezium.capturing.product.deserializer=org.acme.ProductDeserializer

反序列化器映射可以与 @Capturing 注解结合使用。如下图所示,该方法将被调用,使用新创建的映射对象实例,而不是传统上传递给捕获方法处理程序的 SourceRecord。

    @Capturing(destination = "dbz.inventory.products")
    public void orders(CapturingEvent<Product> event) {
        logger.info("capturing products for destination {} with data {}", event.destination(), event.record());
    }

当您需要使用域类型与外部服务通信,甚至如果您想将 outbox 模式实现为 sidecar 时,此模式都会很有益。

运行应用程序

因此,如果我们以开发模式运行应用程序

mvn quarkus:dev

您将看到从示例表中捕获的快照事件。

您可以在数据库中插入/更新一行,并在日志中查看它

  1. 获取 容器 ID

    container_id=$(docker ps --filter "ancestor=quay.io/debezium/postgres:15" --format "{{.ID}}") && echo $container_id
  2. 将产品名称从 Laptop 更新为 Notebook

    docker exec "$container_id" bash -c "PGPASSWORD=quarkus psql -U quarkus quarkus -c \"UPDATE inventory.products SET name = 'Notebook' where id = 1\""

构建原生应用程序

如果您的环境配置正确,您可以使用以下命令生成原生应用程序

mvn clean install -Dnative

但不要忘记在 application.propertiesapplication-prod.properties 中添加您的数据源配置。下面显示了一个示例

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=native
quarkus.datasource.password=native
quarkus.datasource.jdbc.url=jdbc:postgresql://:5432/native
quarkus.datasource.jdbc.max-size=16

深入了解

本快速指南演示了如何在 Quarkus 中使用 Debezium 轻松构建数据流应用程序。

有关功能的详尽列表和配置选项,请查看 Debezium 扩展参考文档

您可以在 Debezium 示例存储库中找到一个工作示例。

Giovanni Panice

Debezium 核心团队成员,在不同领域拥有构建弹性、分布式系统的经验。他专注于 Debezium 的 Server 和 Engine,包括原生构建和 Quarkus 集成。在工作之余,他共同组织了 Ticino Software Craft Meetup,以推广知识共享和持续学习。

     


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×