您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

使用 Testcontainers 进行集成测试

此功能目前处于孵化状态,即其确切的语义、配置选项、API 等可能会在未来的版本中根据收到的反馈进行更改。如果您在使用此扩展时遇到任何问题,请告知我们。

概述

在设置 Debezium 的变更数据捕获管道时,最好也要有一些自动化测试,以确保:

  • 源数据库已设置好,以便可以从中流式传输更改。

  • 您的连接器已正确配置。

Debezium 为 Testcontainers 提供的扩展旨在简化此类测试,通过 Linux 容器运行所有必需的基础设施(Apache Kafka、Kafka Connect 等),并使其易于 Java 测试访问。

它尽可能地应用合理的默认值(例如,连接器的数据库凭据可以从配置的数据库容器中获取),让您可以专注于测试的基本逻辑。

入门

要使用 Debezium 的 Testcontainers 集成,请将以下依赖项添加到您的项目中:

<dependency>
  <groupId>io.debezium</groupId>
  <artifactId>debezium-testing-testcontainers</artifactId>
  <version>3.3.0.Final</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <scope>test</scope>
</dependency>

<!-- Add the TC dependency matching your database -->
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <scope>test</scope>
</dependency>

根据您的测试策略,您可能还需要数据库的 JDBC 驱动程序以及 Apache Kafka 的客户端,以便您可以插入一些测试数据并在 Kafka 中断言相应的变更事件。

测试设置

在为 Debezium 连接器配置编写集成测试时,您还需要设置 Apache Kafka 和一个数据库,该数据库应该是变更事件的源。可以使用现有的 Testcontainers 对 Apache Kafka数据库 的支持。

结合 Debezium 的 DebeziumContainer 类,典型的设置将如下所示:

public class DebeziumContainerTest {

    private static Network network = Network.newNetwork(); (1)

    private static KafkaContainer kafkaContainer = new KafkaContainer()
            .withNetwork(network); (2)

    public static PostgreSQLContainer<?> postgresContainer =
            new PostgreSQLContainer<>(
                    DockerImageName.parse("quay.io/debezium/postgres:15")
                        .asCompatibleSubstituteFor("postgres"))
                .withNetwork(network)
                .withNetworkAliases("postgres"); (3)

    public static DebeziumContainer debeziumContainer =
            new DebeziumContainer("quay.io/debezium/connect:3.3.0.Final")
                .withNetwork(network)
                .withKafka(kafkaContainer)
                .dependsOn(kafkaContainer); (4)

    @BeforeClass
    public static void startContainers() { (5)
        Startables.deepStart(Stream.of(
                kafkaContainer, postgresContainer, debeziumContainer))
                .join();
    }
}
1 定义一个将由所有服务使用的 Docker 网络
2 为 Apache Kafka 设置容器
3 设置 Postgres 15 的容器(使用 Debezium 的 Postgres 容器镜像)
4 设置带有 Debezium 3.3.0.Final 的 Kafka Connect 容器
5 启动所有三个容器

测试实现

声明了所有必需的容器后,您现在可以注册 Debezium Postgres 连接器的实例,将一些测试数据插入 Postgres,并使用 Apache Kafka 客户端从相应的 Kafka 主题读取预期的变更事件记录。

@Test
public void canRegisterPostgreSqlConnector() throws Exception {
    try (Connection connection = getConnection(postgresContainer);
            Statement statement = connection.createStatement();
            KafkaConsumer<String, String> consumer = getConsumer(
                    kafkaContainer)) {

        statement.execute("create schema todo"); (1)
        statement.execute("create table todo.Todo (id int8 not null, " +
                "title varchar(255), primary key (id))");
        statement.execute("alter table todo.Todo replica identity full");
        statement.execute("insert into todo.Todo values (1, " +
                "'Learn CDC')");
        statement.execute("insert into todo.Todo values (2, " +
                "'Learn Debezium')");

        ConnectorConfiguration connector = ConnectorConfiguration
                .forJdbcContainer(postgresContainer)
                .with("topic.prefix", "dbserver1");

        debeziumContainer.registerConnector("my-connector",
                connector); (2)

        consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));

        List<ConsumerRecord<String, String>> changeEvents =
                drain(consumer, 2); (3)

        assertThat(JsonPath.<Integer> read(changeEvents.get(0).key(),
                "$.id")).isEqualTo(1);
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(0).value(),
                "$.after.title")).isEqualTo("Learn CDC");

        assertThat(JsonPath.<Integer> read(changeEvents.get(1).key(),
                "$.id")).isEqualTo(2);
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.op")).isEqualTo("r");
        assertThat(JsonPath.<String> read(changeEvents.get(1).value(),
                "$.after.title")).isEqualTo("Learn Debezium");

        consumer.unsubscribe();
    }
}

// Helper methods below

private Connection getConnection(
        PostgreSQLContainer<?> postgresContainer)
                throws SQLException {

    return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
            postgresContainer.getUsername(),
            postgresContainer.getPassword());
}

private KafkaConsumer<String, String> getConsumer(
            KafkaContainer kafkaContainer) {

    return new KafkaConsumer<>(
            Map.of(
                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            kafkaContainer.getBootstrapServers(),
                    ConsumerConfig.GROUP_ID_CONFIG,
                            "tc-" + UUID.randomUUID(),
                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                            "earliest"),
            new StringDeserializer(),
            new StringDeserializer());
}

private List<ConsumerRecord<String, String>> drain(
        KafkaConsumer<String, String> consumer,
        int expectedRecordCount) {

    List<ConsumerRecord<String, String>> allRecords = new ArrayList<>();

    Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
        consumer.poll(Duration.ofMillis(50))
                .iterator()
                .forEachRemaining(allRecords::add);

        return allRecords.size() == expectedRecordCount;
    });

    return allRecords;
}
1 在 Postgres 数据库中创建一个表并插入两条记录
2 注册 Debezium Postgres 连接器的实例;连接器类型以及数据库主机、数据库名称、用户等属性都源自数据库容器。
3 从 Kafka 的更改事件主题中读取两条记录并断言它们的属性