使用 Testcontainers 进行集成测试
概述
在设置 Debezium 的变更数据捕获管道时,最好也要有一些自动化测试,以确保:
-
源数据库已设置好,以便可以从中流式传输更改。
-
您的连接器已正确配置。
Debezium 为 Testcontainers 提供的扩展旨在简化此类测试,通过 Linux 容器运行所有必需的基础设施(Apache Kafka、Kafka Connect 等),并使其易于 Java 测试访问。
它尽可能地应用合理的默认值(例如,连接器的数据库凭据可以从配置的数据库容器中获取),让您可以专注于测试的基本逻辑。
入门
要使用 Debezium 的 Testcontainers 集成,请将以下依赖项添加到您的项目中:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-testing-testcontainers</artifactId>
<version>3.3.0.Final</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<!-- Add the TC dependency matching your database -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
根据您的测试策略,您可能还需要数据库的 JDBC 驱动程序以及 Apache Kafka 的客户端,以便您可以插入一些测试数据并在 Kafka 中断言相应的变更事件。
测试设置
在为 Debezium 连接器配置编写集成测试时,您还需要设置 Apache Kafka 和一个数据库,该数据库应该是变更事件的源。可以使用现有的 Testcontainers 对 Apache Kafka 和 数据库 的支持。
结合 Debezium 的 DebeziumContainer 类,典型的设置将如下所示:
public class DebeziumContainerTest {
private static Network network = Network.newNetwork(); (1)
private static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network); (2)
public static PostgreSQLContainer<?> postgresContainer =
new PostgreSQLContainer<>(
DockerImageName.parse("quay.io/debezium/postgres:15")
.asCompatibleSubstituteFor("postgres"))
.withNetwork(network)
.withNetworkAliases("postgres"); (3)
public static DebeziumContainer debeziumContainer =
new DebeziumContainer("quay.io/debezium/connect:3.3.0.Final")
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer); (4)
@BeforeClass
public static void startContainers() { (5)
Startables.deepStart(Stream.of(
kafkaContainer, postgresContainer, debeziumContainer))
.join();
}
}
| 1 | 定义一个将由所有服务使用的 Docker 网络 |
| 2 | 为 Apache Kafka 设置容器 |
| 3 | 设置 Postgres 15 的容器(使用 Debezium 的 Postgres 容器镜像) |
| 4 | 设置带有 Debezium 3.3.0.Final 的 Kafka Connect 容器 |
| 5 | 启动所有三个容器 |
测试实现
声明了所有必需的容器后,您现在可以注册 Debezium Postgres 连接器的实例,将一些测试数据插入 Postgres,并使用 Apache Kafka 客户端从相应的 Kafka 主题读取预期的变更事件记录。
@Test
public void canRegisterPostgreSqlConnector() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer = getConsumer(
kafkaContainer)) {
statement.execute("create schema todo"); (1)
statement.execute("create table todo.Todo (id int8 not null, " +
"title varchar(255), primary key (id))");
statement.execute("alter table todo.Todo replica identity full");
statement.execute("insert into todo.Todo values (1, " +
"'Learn CDC')");
statement.execute("insert into todo.Todo values (2, " +
"'Learn Debezium')");
ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(postgresContainer)
.with("topic.prefix", "dbserver1");
debeziumContainer.registerConnector("my-connector",
connector); (2)
consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));
List<ConsumerRecord<String, String>> changeEvents =
drain(consumer, 2); (3)
assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
"$.id")).isEqualTo(1);
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
"$.op")).isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
"$.after.title")).isEqualTo("Learn CDC");
assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
"$.id")).isEqualTo(2);
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
"$.op")).isEqualTo("r");
assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
"$.after.title")).isEqualTo("Learn Debezium");
consumer.unsubscribe();
}
}
// Helper methods below
private Connection getConnection(
PostgreSQLContainer<?> postgresContainer)
throws SQLException {
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
postgresContainer.getUsername(),
postgresContainer.getPassword());
}
private KafkaConsumer<String, String> getConsumer(
KafkaContainer kafkaContainer) {
return new KafkaConsumer<>(
Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG,
"tc-" + UUID.randomUUID(),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest"),
new StringDeserializer(),
new StringDeserializer());
}
private List<ConsumerRecord<String, String>> drain(
KafkaConsumer<String, String> consumer,
int expectedRecordCount) {
List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();
Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
consumer.poll(Duration.ofMillis(50))
.iterator()
.forEachRemaining(allRecords::add);
return allRecords.size() == expectedRecordCount;
});
return allRecords;
}
| 1 | 在 Postgres 数据库中创建一个表并插入两条记录 |
| 2 | 注册 Debezium Postgres 连接器的实例;连接器类型以及数据库主机、数据库名称、用户等属性都源自数据库容器。 |
| 3 | 从 Kafka 的更改事件主题中读取两条记录并断言它们的属性 |