使用 Debezium 设置变更数据捕获 (CDC) 流水线通常只是配置问题,无需编程。拥有自动化测试来验证您的 CDC 设置,确保一切配置正确并且您的 Debezium 连接器按预期设置,仍然是一个非常好的主意。
需要考虑配置的两个主要组件是:
-
源数据库:必须进行设置,以便 Debezium 可以连接到它并检索变更事件;具体细节取决于数据库,例如,对于 MySQL,binlog 必须处于“row”模式,对于 Postgres,必须安装支持的逻辑解码插件之一,等等。
-
Debezium 连接器:必须使用正确的数据库主机和凭据进行配置,可能使用 SSL,应用表和列过滤器,可能应用一个或多个单消息转换(SMTs)等。
这时,新加入的 Debezium 对集成测试的支持,通过 Testcontainers,便派上了用场。它允许使用 Linux 容器镜像来设置所有必需的组件(Apache Kafka、Kafka Connect 等),配置并部署 Debezium 连接器,然后针对生成的变更数据事件运行断言。
让我们来看看具体是如何实现的。
项目设置
假设您使用 Apache Maven 进行依赖管理,请将以下依赖项添加到您的 pom.xml 文件中,引入 Debezium Testcontainers 集成以及 Testcontainers Apache Kafka 模块
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<version>1.1.0.CR1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency> 另外,也请添加适用于您数据库的 Testcontainers 依赖项,例如 Postgres
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency> 您可以在 GitHub 上的 debezium-examples 仓库中找到包含完整配置的示例项目。
初始化 Testcontainers
在声明了所有必需的依赖项之后,就可以编写 CDC 集成测试了。使用 Testcontainers,集成测试是通过 Linux 容器和 Docker 实现的。它提供了一个 Java API 来启动和管理测试所需的资源。我们可以利用它来启动 Apache Kafka、Kafka Connect 和 Postgres 数据库。
public class CdcTest {
private static Network network = Network.newNetwork(); (1)
private static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network); (2)
public static PostgreSQLContainer<?> postgresContainer =
new PostgreSQLContainer<>("debezium/postgres:11")
.withNetwork(network)
.withNetworkAliases("postgres"); (3)
public static DebeziumContainer debeziumContainer =
new DebeziumContainer("1.1.0.CR1")
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer); (4)
@BeforeClass
public static void startContainers() { (5)
Startables.deepStart(Stream.of(
kafkaContainer, postgresContainer, debeziumContainer))
.join();
}
} | 1 | 定义一个将由所有服务使用的 Docker 网络 |
| 2 | 为 Apache Kafka 设置容器 |
| 3 | 设置一个 Postgres 11 的容器(使用 Debezium 的 Postgres 容器镜像) |
| 4 | 设置一个带有 Debezium 的 Kafka Connect 容器 |
| 5 | 在 @BeforeClass 方法中启动所有三个容器 |
请注意,您需要安装 Docker 才能使用 Testcontainers。
测试实现
在就绪的基础设施到位后,我们就可以为 CDC 设置编写测试了。测试的总体流程如下:
-
配置 Debezium 连接器以连接 Postgres 数据库
-
执行一些 SQL 语句来更改一些数据
-
使用 Kafka 消费者从相应的 Kafka 主题中检索生成的变更数据事件
-
对这些事件运行一些断言
这是测试的框架:
@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer =
getConsumer(kafkaContainer)) {
// TODO ...
}
} 数据库连接的凭据可以从通过 Testcontainers 启动的 Postgres 容器中获取,这很好地避免了任何冗余。
private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
throws SQLException {
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
postgresContainer.getUsername(),
postgresContainer.getPassword());
} Kafka 消费者也是如此。
private KafkaConsumer<String, String> getConsumer(
KafkaContainer kafkaContainer) {
return new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"),
new StringDeserializer(),
new StringDeserializer());
} 现在,让我们来实现实际的测试逻辑。
statement.execute("create schema todo"); (1)
statement.execute("create table todo.Todo (" +
"id int8 not null, " +
"title varchar(255), " +
"primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, 'Learn CDC')");
statement.execute("insert into todo.Todo values (2, 'Learn Debezium')");
ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(postgresContainer)
.with("database.server.name", "dbserver1");
debeziumContainer.registerConnector("my-connector",
connector); (2)
consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));
List<ConsumerRecord<String, String>> changeEvents =
drain(consumer, 2); (3)
ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
.isEqualTo(1);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
.isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
.isEqualTo("Learn CDC");
changeEvent = changeEvents.get(1);
assertThat(JsonPath.<Integer> read(changeEvent.key(), "$.id"))
.isEqualTo(2);
assertThat(JsonPath.<String> read(changeEvent.value(), "$.op"))
.isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvent.value(), "$.after.title"))
.isEqualTo("Learn Debezium");
consumer.unsubscribe(); | 1 | 在 Postgres 数据库中创建一个表并插入两条记录 |
| 2 | 注册 Debezium Postgres 连接器的一个实例。 |
| 3 | 从 Kafka 的更改事件主题中读取两条记录并断言它们的属性 |
请注意,Debezium 的 Testcontainers 支持如何允许从数据库容器中填充连接器配置,从而无需显式提供数据库连接属性。只需要提供唯一的 database.server.name,当然,您也可以应用其他配置选项,如表或列过滤器、SMT 等。
为了简洁起见,省略了用于从 Kafka 主题读取指定数量记录的 drain() 方法的源代码。您可以在 GitHub 上的完整示例中 找到它。
基于 JsonPath 的断言对于断言预期数据变更事件的属性非常有用,当然,您也可以使用任何其他 JSON API 来完成此任务。如果使用 Apache Avro 而不是 JSON 作为序列化格式,则需要使用 Avro API。
总结
Testcontainers 及其支持使得编写 CDC 设置的自动化集成测试变得相当容易。
本文讨论的测试方法可以从多个方面进行扩展。例如,将连接器配置置于修订控制之下(以便您可以管理和跟踪任何配置更改)并使用这些配置文件驱动测试可能是可取的。您还可以更进一步,测试整个数据流管道。为此,您不仅需要部署 Debezium 连接器,还需要部署一个 Sink 连接器,例如用于您的数据仓库或搜索服务器。然后,您可以对这些 Sink 系统中的数据运行断言,确保您的数据管道端到端的正确性。
您对测试 CDC 设置和管道有什么看法?请在下面的评论中告诉我们!
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。