Change Data Capture (CDC) 在各种场景中得到广泛应用,例如微服务通信、遗留系统现代化和缓存失效。此模式的核心思想是检测和跟踪数据源(例如数据库)中的更改,并近乎实时地将它们传播到其他系统。Debezium 是一个 CDC 平台,为大多数数据源提供了广泛的连接器。除了捕获更改外,它还通过直观的 UI 提供转换功能,用于定义 Debezium 实例。
引言
使用 Debezium 的一种方法是将其嵌入到 Web 应用程序中,并使用连接器监听数据源中的更改。特别有趣的是,通过一些调整,可以利用 Quarkus 和 GraalVM 构建具有超快启动时间和最小内存占用的 CDC 系统。我们将从一个 PostgreSQL 数据库摄取数据到一个具有 Debezium 的原生 Quarkus 应用程序,该应用程序应用一个简单的 转换。
为什么
我想说,为什么不呢?从另一个角度来看,CDC 模式在分布式系统中运行,必须遵循可伸缩性、容错性和高可用性等原则。优化资源使用不仅可以降低成本,还有助于减少应用程序扩展时的碳排放。
免责声明
此处建议的调整仅作为 **概念验证**,以演示 Debezium 的功能。它们不用于生产环境,而是用于展示其潜力。Debezium 及其周围的第三方库并非为所有用例完全原生设计。相反,此处介绍的调整侧重于一个特定的 **场景**。
副作用
本指南还可以帮助您了解如何将 Java 应用程序原生化。但是,最可靠的方法是仅使用 Quarkus 提供的依赖项。
关于 GraalVM 的一些考虑
GraalVM 使用 **预编译 (Ahead-of-Time, AOT)** 编译将 Java 应用程序编译成本机二进制文件。此过程涉及分析整个应用程序,以识别所有可访问的代码,包括类、方法和依赖项。然而,一个广泛使用的 Java 特性 **——反射——** 会引入动态行为,这些行为有时无法在构建时解决,经常导致 **运行时失败**,例如 `ClassNotFoundException`。此外,许多库依赖于 **动态代理生成**,这在 AOT 编译中不受支持。这时 Quarkus 就派上用场了,它提供了一套原生就绪的扩展和库来克服这些限制。
构建基本应用程序
好了,让我们开始构建应用程序吧!所有建议的代码都可以在 GitHub 上找到。
前提条件
-
已安装并正确配置 GraalVM (sdkman: 23.0.2-graal)
-
Apache Maven 3.9.9
-
可正常工作的容器运行时(Docker 或 Podman)
-
可正常工作的 C 开发环境
您可以使用此链接设置您的 工作环境。
我们将首先更新 `pom.xml`,添加必要的 Quarkus 和 Debezium 依赖项。此外,还需要 Quarkus Maven 插件来构建应用程序。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.mosfet</groupId>
<artifactId>debezium-native</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.release>23</maven.compiler.release>
<quarkus.platform.version>3.19.2</quarkus.platform.version>
<version.debezium>3.0.8.Final</version.debezium>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus.platform</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${quarkus.platform.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>${quarkus.platform.group-id}</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${quarkus.platform.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>build</goal>
<goal>generate-code</goal>
<goal>generate-code-tests</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> 正如您所见,我们添加了 `debezium-connector-postgres` 来跟踪 PostgreSQL 中的表。
为了仪器化我们的应用程序,我们需要在注入配置的同时避免 GraalVM 无法很好处理的陷阱(牢记我们最初的考虑)。相反,我们将遵循 Quarkus 的方法,从 `application.properties` 读取配置。
为了实现这一点,我们可以使用 `@ConfigMapping` 和 `@StaticInitSafe` 创建一个接口,以确保与 GraalVM 的兼容性。
@StaticInitSafe
@ConfigMapping(prefix = "debezium")
public interface DebeziumConfiguration {
Map<String, String> configuration();
} 该前缀用于识别所有以 `debezium` 开头的属性,例如这些
# Debezium CDC configuration
debezium.configuration.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.configuration.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.configuration.name=native
debezium.configuration.database.hostname=localhost
debezium.configuration.database.port=5432
debezium.configuration.database.user=postgresuser
debezium.configuration.database.password=postgrespw
debezium.configuration.database.dbname=postgresuser
debezium.configuration.topic.prefix=dbserver1
debezium.configuration.table.include.list=inventory.products
debezium.configuration.plugin.name=pgoutput
debezium.configuration.snapshot.mode=never 因此,现在我们可以创建我们的侦听器 `DatabaseChangeEventListener`,它会跟踪 `inventory` 数据库中 `products` 表上发生的事件
@ApplicationScoped
@Startup
public class DatabaseChangeEventListener {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseChangeEventListener.class);
private DebeziumEngine<?> engine;
@Inject
private ExecutorService executor;
@Inject
private DebeziumConfiguration debeziumConfiguration;
@PostConstruct
public void startEmbeddedEngine() {
LOG.info("Launching Debezium embedded engine");
final Configuration config = Configuration.empty()
.withSystemProperties(Function.identity())
.edit()
.with(Configuration.from(debeziumConfiguration.configuration()))
.build();
this.engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(config.asProperties())
.notifying((events, committer) -> {
for (RecordChangeEvent<SourceRecord> record : events) {
handleDbChangeEvent(record.record());
committer.markProcessed(record);
}
committer.markBatchFinished();
})
.build();
executor.execute(engine);
}
@PreDestroy
public void shutdownEngine() throws Exception {
LOG.info("Stopping Debezium embedded engine");
engine.close();
executor.shutdown();
}
private void handleDbChangeEvent(SourceRecord record) {
LOG.info("DB change event {}", record);
if (record.topic().equals("dbserver1.inventory.products")) {
Integer productId = ((Struct) record.key()).getInt32("id");
Struct payload = (Struct) record.value();
Operation op = Operation.forCode(payload.getString("op"));
Long txId = ((Struct) payload.get("source")).getInt64("txId");
LOG.info("received event with productId: {} op: {} txId: {}", productId, op, txId);
}
}
} 我们可以使用以下命令启动我们的应用程序
mvn clean compile quarkus:dev 显然,在启动应用程序之前,您应该根据 `application.properties` 启动一个 PostgreSQL 实例。为简单起见,您可以使用此 docker compose 配置
services:
postgres:
image: quay.io/debezium/example-postgres:3.0
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw 现在您应该有一个在 JVM 中运行的、具有 Debezium 功能的应用程序。
原生化
好的,现在让我们深入了解 GraalVM 的功能。您可以定义一个触发构建的 Maven profile `native`
<profiles>
<profile>
<id>native</id>
<activation>
<property>
<name>native</name>
</property>
</activation>
<properties>
<quarkus.native.enabled>true</quarkus.native.enabled>
<quarkus.package.jar.enabled>false</quarkus.package.jar.enabled>
</properties>
</profile>
</profiles> 我们可以尝试以原生方式构建
mvn clean install -Dnative 正如我所说的,您可以尝试……但您会遇到类似的错误
Fatal error: com.oracle.graal.pointsto.util.AnalysisError$ParsingError: Error encountered while parsing org.glassfish.jersey.server.wadl.WadlFeature.configure(WadlFeature.java:49)
Parsing context:
at root method.(Unknown Source) 好吧,我从未提及过 Glassfish 和 Jersey(`org.glassfish.jersey.server`),那么为什么会出现这种情况?
我们应该分析依赖树来理解应用程序中包含了什么。所以我们可以执行
mvn dependency:tree 并看到类似这样的内容
[INFO] ---------------------< io.mosfet:debezium-native >----------------------
[INFO] Building debezium-native 1.0-SNAPSHOT
[INFO] from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- dependency:3.7.0:tree (default-cli) @ debezium-native ---
[INFO] io.mosfet:debezium-native:jar:1.0-SNAPSHOT
[INFO] +- io.quarkus:quarkus-arc:jar:3.19.2:compile
[INFO] | +- io.quarkus.arc:arc:jar:3.19.2:compile
[INFO] | | +- jakarta.enterprise:jakarta.enterprise.cdi-api:jar:4.1.0:compile
[INFO] | | | +- jakarta.enterprise:jakarta.enterprise.lang-model:jar:4.1.0:compile
[INFO] | | | +- jakarta.el:jakarta.el-api:jar:5.0.1:compile
[INFO] | | | \- jakarta.interceptor:jakarta.interceptor-api:jar:2.2.0:compile
[INFO] | | +- jakarta.annotation:jakarta.annotation-api:jar:3.0.0:compile
[INFO] | | +- jakarta.transaction:jakarta.transaction-api:jar:2.0.1:compile
[INFO] | | +- io.smallrye.reactive:mutiny:jar:2.8.0:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-annotation:jar:2.10.0:compile
[INFO] | | | \- org.jctools:jctools-core:jar:4.0.5:compile
[INFO] | | \- org.jboss.logging:jboss-logging:jar:3.6.1.Final:compile
[INFO] | +- io.quarkus:quarkus-core:jar:3.19.2:compile
[INFO] | | +- jakarta.inject:jakarta.inject-api:jar:2.0.1:compile
[INFO] | | +- io.smallrye.common:smallrye-common-os:jar:2.10.0:compile
[INFO] | | +- io.quarkus:quarkus-ide-launcher:jar:3.19.2:compile
[INFO] | | +- io.quarkus:quarkus-development-mode-spi:jar:3.19.2:compile
[INFO] | | +- io.smallrye.config:smallrye-config:jar:3.11.4:compile
[INFO] | | | \- io.smallrye.config:smallrye-config-core:jar:3.11.4:compile
[INFO] | | | +- org.eclipse.microprofile.config:microprofile-config-api:jar:3.1:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-classloader:jar:2.10.0:compile
[INFO] | | | \- io.smallrye.config:smallrye-config-common:jar:3.11.4:compile
[INFO] | | +- org.jboss.logmanager:jboss-logmanager:jar:3.1.1.Final:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-constraint:jar:2.10.0:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-cpu:jar:2.10.0:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-expression:jar:2.10.0:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-net:jar:2.10.0:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-ref:jar:2.10.0:compile
[INFO] | | | +- jakarta.json:jakarta.json-api:jar:2.1.3:compile
[INFO] | | | \- org.eclipse.parsson:parsson:jar:1.1.7:compile
[INFO] | | +- org.jboss.logging:jboss-logging-annotations:jar:3.0.4.Final:compile
[INFO] | | +- org.jboss.threads:jboss-threads:jar:3.8.0.Final:compile
[INFO] | | | \- io.smallrye.common:smallrye-common-function:jar:2.10.0:compile
[INFO] | | +- org.jboss.slf4j:slf4j-jboss-logmanager:jar:2.0.0.Final:compile
[INFO] | | +- org.wildfly.common:wildfly-common:jar:2.0.1:compile
[INFO] | | +- io.quarkus:quarkus-bootstrap-runner:jar:3.19.2:compile
[INFO] | | | +- io.quarkus:quarkus-classloader-commons:jar:3.19.2:compile
[INFO] | | | +- io.smallrye.common:smallrye-common-io:jar:2.10.0:compile
[INFO] | | | \- io.github.crac:org-crac:jar:0.1.3:compile
[INFO] | | \- io.quarkus:quarkus-fs-util:jar:0.0.10:compile
[INFO] | \- org.eclipse.microprofile.context-propagation:microprofile-context-propagation-api:jar:1.3:compile
[INFO] +- io.debezium:debezium-embedded:jar:3.0.8.Final:compile
[INFO] | +- io.debezium:debezium-core:jar:3.0.8.Final:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-core:jar:2.18.2:compile
[INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.18.2:compile
[INFO] | | \- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.18.2:compile
[INFO] | +- org.slf4j:slf4j-api:jar:2.0.6:compile
[INFO] | +- org.apache.kafka:connect-api:jar:3.9.0:compile
[INFO] | | +- org.apache.kafka:kafka-clients:jar:3.9.0:compile
[INFO] | | | +- com.github.luben:zstd-jni:jar:1.5.6-4:runtime
[INFO] | | | +- org.lz4:lz4-java:jar:1.8.0:runtime
[INFO] | | | \- org.xerial.snappy:snappy-java:jar:1.1.10.5:runtime
[INFO] | | \- javax.ws.rs:javax.ws.rs-api:jar:2.1.1:runtime
[INFO] | +- org.apache.kafka:connect-runtime:jar:3.9.0:compile
[INFO] | | +- org.apache.kafka:connect-transforms:jar:3.9.0:compile
[INFO] | | +- ch.qos.reload4j:reload4j:jar:1.2.25:runtime
[INFO] | | +- org.bitbucket.b_c:jose4j:jar:0.9.6:runtime
[INFO] | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.18.2:compile
[INFO] | | +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:2.18.2:runtime
[INFO] | | | +- com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:2.18.2:runtime
[INFO] | | | \- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.18.2:runtime
[INFO] | | | \- jakarta.activation:jakarta.activation-api:jar:2.1.3:runtime
[INFO] | | +- org.glassfish.jersey.containers:jersey-container-servlet:jar:2.39.1:runtime
[INFO] | | | +- org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.39.1:runtime
[INFO] | | | | \- org.glassfish.hk2.external:jakarta.inject:jar:2.6.1:runtime
[INFO] | | | +- org.glassfish.jersey.core:jersey-common:jar:2.39.1:runtime
[INFO] | | | | \- org.glassfish.hk2:osgi-resource-locator:jar:1.0.3:runtime
[INFO] | | | +- org.glassfish.jersey.core:jersey-server:jar:2.39.1:runtime
[INFO] | | | | +- org.glassfish.jersey.core:jersey-client:jar:2.39.1:runtime
[INFO] | | | | \- jakarta.validation:jakarta.validation-api:jar:3.0.2:runtime
[INFO] | | | \- jakarta.ws.rs:jakarta.ws.rs-api:jar:3.1.0:runtime
[INFO] | | +- org.glassfish.jersey.inject:jersey-hk2:jar:2.39.1:runtime
[INFO] | | | +- org.glassfish.hk2:hk2-locator:jar:2.6.1:runtime
[INFO] | | | | +- org.glassfish.hk2.external:aopalliance-repackaged:jar:2.6.1:runtime
[INFO] | | | | +- org.glassfish.hk2:hk2-api:jar:2.6.1:runtime
[INFO] | | | | \- org.glassfish.hk2:hk2-utils:jar:2.6.1:runtime
[INFO] | | | \- org.javassist:javassist:jar:3.29.0-GA:runtime
[INFO] | | +- javax.xml.bind:jaxb-api:jar:2.3.1:runtime
[INFO] | | | \- javax.activation:javax.activation-api:jar:1.2.0:runtime
[INFO] | | +- javax.activation:activation:jar:1.1.1:runtime
[INFO] | | +- org.eclipse.jetty:jetty-server:jar:9.4.56.v20240826:runtime
[INFO] | | | +- javax.servlet:javax.servlet-api:jar:3.1.0:runtime
[INFO] | | | +- org.eclipse.jetty:jetty-http:jar:9.4.56.v20240826:runtime
[INFO] | | | \- org.eclipse.jetty:jetty-io:jar:9.4.56.v20240826:runtime
[INFO] | | +- org.eclipse.jetty:jetty-servlet:jar:9.4.56.v20240826:runtime
[INFO] | | | +- org.eclipse.jetty:jetty-security:jar:9.4.56.v20240826:runtime
[INFO] | | | \- org.eclipse.jetty:jetty-util-ajax:jar:9.4.56.v20240826:runtime
[INFO] | | +- org.eclipse.jetty:jetty-servlets:jar:9.4.56.v20240826:runtime
[INFO] | | | +- org.eclipse.jetty:jetty-continuation:jar:9.4.56.v20240826:runtime
[INFO] | | | \- org.eclipse.jetty:jetty-util:jar:9.4.56.v20240826:runtime
[INFO] | | +- org.eclipse.jetty:jetty-client:jar:9.4.56.v20240826:runtime
[INFO] | | +- org.reflections:reflections:jar:0.10.2:runtime
[INFO] | | | \- com.google.code.findbugs:jsr305:jar:3.0.2:runtime
[INFO] | | +- org.apache.maven:maven-artifact:jar:3.9.9:runtime
[INFO] | | | \- org.codehaus.plexus:plexus-utils:jar:3.5.1:runtime
[INFO] | | \- io.swagger.core.v3:swagger-annotations:jar:2.2.8:runtime
[INFO] | +- org.apache.kafka:connect-json:jar:3.9.0:compile
[INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.18.2:compile
[INFO] | | \- com.fasterxml.jackson.module:jackson-module-afterburner:jar:2.18.2:compile
[INFO] | \- org.apache.kafka:connect-file:jar:3.9.0:compile
[INFO] +- io.debezium:debezium-connector-postgres:jar:3.0.8.Final:compile
[INFO] | +- org.postgresql:postgresql:jar:42.7.5:compile
[INFO] | \- com.google.protobuf:protobuf-java:jar:3.25.5:compile
[INFO] \- io.debezium:debezium-api:jar:3.0.8.Final:compile 如果您查看 `kafka:connect-runtime`,您会发现 `Jetty`、`Jersey` 以及许多 GraalVM 无法很好处理的其他组件。
为什么 `kafka:connect-runtime` 包含所有这些依赖项?嗯,我假设这个库是为运行 Kafka Connect 节点设计的,但在我们的特定情况下 **——Quarkus 应用程序中的 Debezium——** 这是不必要的(最有可能的是,Debezium 只使用某些 Kafka 类进行数据转换)。
为了最小化原生构建的问题,我建议排除所有未使用的依赖项。此外,如果我们深入挖掘依赖树,还会发现其他潜在的麻烦制造者,如 `Jackson` 和 `PostgreSQL` 驱动程序,它们可能会引起进一步的复杂性。我建议也排除它们,并用 Quarkus 推荐的替代品替换它们。
这是清理后的 `pom.xml` 依赖项部分
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
<exclusions>
<exclusion>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- kafka runtime & connect-json (used by debezium engine) excluded
of unnecessary or harmful for native build dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.bitbucket.b_c</groupId>
<artifactId>jose4j</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.containers</groupId>
<artifactId>jersey-container-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven</groupId>
<artifactId>maven-artifact</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml</groupId>
<artifactId>jackson-module-afterburner</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- add alternatives for jackson & postgres native friendly -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>
<!-- in some cases kafka use jetty-util and connect-transforms -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>9.4.56.v20240826</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-transforms</artifactId>
</dependency>
</dependencies> 构建将成功(很可能……)!现在您可以运行原生应用程序(假设您使用的是 Gnu/Linux 或 MacOS)
./target/debezium-native-1.0-SNAPSHOT-runner 但现在情况更糟了。您会遇到运行时错误,例如
2025-03-11 20:08:34,408 ERROR [io.qua.run.Application] (main) Failed to start application: java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:119)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.quarkus.runner.GeneratedMain.main(Unknown Source)
Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found Debezium 使用一些反射魔法来仪器化引擎。要修复它,我们应该通过在 `application.properties` 中添加以下内容来仪器化应用程序,以包括 `META-INF/services/`
# Solve the ConvertingEngineBuilderFactory.class in Debezium Engine
quarkus.native.resources.includes=META-INF/services/* 并通过添加以下类来在应用程序中注册 `ConvertingEngineBuilderFactory.class`
@RegisterForReflection(targets = { ConvertingEngineBuilderFactory.class })
public class ReflectingConfig { } 如果您仍然在跟着我,那么现在又会出现构建时错误,类似
Fatal error: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: Detected an instance of Random/SplittableRandom class in the image heap. Instances created during image generation have cached seed values and don't behave as expected. If these objects should not be stored in the image heap, you can use
'--trace-object-instantiation=java.util.Random' `Random` 和 `SplittableRandom` 用于提供随机值,并且通常期望在每次运行时都能获得一个新的种子。使用原生镜像可能会导致缓存值打破预期。此 `Random` 定义以静态方式存在于 Kafka 库中。要修复此类问题,我们应该仪器化应用程序,使其在运行时接受 `Random`,如下所示
# Kafka-clients use Random (SaslClientAuthenticator line 63)
quarkus.native.additional-build-args=--initialize-at-run-time=org.apache.kafka.common.security.authenticator.SaslClientAuthenticator 构建应该会成功,但同样,您会遇到运行时错误。现在过程与应用于 `ConvertingEngineBuilderFactory.class` 的过程相同:仪器化 GraalVM 以查找通过反射注入的类。为了您的理智,这里有一个针对此用例的 `ReflectingConfig`,其中包含所有类
@RegisterForReflection(targets = { ConvertingEngineBuilderFactory.class,
SaslClientAuthenticator.class,
JsonConverter.class,
PostgresConnector.class,
PostgresSourceInfoStructMaker.class,
DefaultTransactionMetadataFactory.class,
SchemaTopicNamingStrategy.class,
OffsetCommitPolicy.class,
PostgresConnectorTask.class,
SinkNotificationChannel.class,
LogNotificationChannel.class,
JmxNotificationChannel.class,
SnapshotLock.class,
NoLockingSupport.class,
NoSnapshotLock.class,
SharedSnapshotLock.class,
SelectAllSnapshotQuery.class,
AlwaysSnapshotter.class,
InitialSnapshotter.class,
InitialOnlySnapshotter.class,
NoDataSnapshotter.class,
RecoverySnapshotter.class,
WhenNeededSnapshotter.class,
NeverSnapshotter.class,
SchemaOnlySnapshotter.class,
SchemaOnlyRecoverySnapshotter.class,
ConfigurationBasedSnapshotter.class,
SourceSignalChannel.class,
KafkaSignalChannel.class,
FileSignalChannel.class,
JmxSignalChannel.class,
InProcessSignalChannel.class,
StandardActionProvider.class
})
public class ReflectingConfig { } 如果您遵循了说明,那么应用程序现在应该可以工作了。我还没有足够的时间来评估性能,但 **内存消耗从 150MB 下降到 50MB**。
添加转换
一个常见的用例是在事件摄取过程中应用转换。如果我们添加以下配置
# Transformation
debezium.configuration.transforms.t0.add.fields=op,table
debezium.configuration.transforms.t0.add.headers=db,table
debezium.configuration.transforms.t0.negate=false
debezium.configuration.transforms.t0.predicate=p2
debezium.configuration.transforms.t0.type=io.debezium.transforms.ExtractNewRecordState
debezium.configuration.transforms=t0
debezium.configuration.predicates.p2.pattern=inventory.inventory.products
debezium.configuration.predicates.p2.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
debezium.configuration.predicates=p2 应用程序将失败,出现类似以下错误
2025-03-14 10:35:26,159 ERROR [io.qua.run.Application] (main) Failed to start application: java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:119)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.quarkus.runner.GeneratedMain.main(Unknown Source)
Caused by: io.debezium.DebeziumException: Error while instantiating predicate 'p2' 或
2025-03-14 10:43:13,230 ERROR [io.qua.run.Application] (main) Failed to start application: java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:119)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
at io.quarkus.runner.GeneratedMain.main(Unknown Source)
Caused by: io.debezium.DebeziumException: Error while instantiating transformation 't0' 实际上,应用程序无法加载转换中定义的类,例如 `TopicNameMatches` 和 `ExtractNewRecordState`。有必要通过将它们添加到 `ReflectingConfig` 来仪器化应用程序。
考虑因素
这 **不是**(我再说一遍,**不是**)为生产使用做好的准备。在两个关键领域仍有大量工作要做
-
**将 Debezium 与 Kafka 库解耦**:Kafka 和 Debezium 应该只共享一种标准化的方法来移动和转换数据。用于创建 Kafka Connect 节点的 Kafka 运行时不应该是 Debezium 不必要导入的依赖项。
-
**解决 Debezium 中的反射问题**:所有使用 `ServiceLoader` 管理的类都应按照 Quarkus 和 GraalVM 的指南进行注册。此外,额外的配置(如转换)会注入类,应该 考虑原生构建。
解决这些挑战可能会为 Quarkus 扩展铺平道路,该扩展将完全认证 Debezium 的原生兼容性。
未来场景
极大地减少 Debezium 的内存占用的能力尤其令人感兴趣,特别是在考虑将 Debezium Server 作为原生镜像运行的场景时。
据我所知(**虽然不 100% 确定**),为了确保可伸缩性和保持事件顺序,源(如数据库表)和 Debezium 引擎之间通常存在一对一的关联。这种下降可以为 Debezium Server 成为 CDC 的最佳选择打开许多场景。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。