您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium 引擎

Debezium 连接器通常是通过将其部署到 Kafka Connect 服务来运行的,并配置一个或多个连接器来监视上游数据库,并为它们在上游数据库中看到的所有更改生成数据更改事件。这些数据更改事件被写入 Kafka,然后可以被许多不同的应用程序独立地消费。Kafka Connect 提供了出色的容错性和可伸缩性,因为它作为一个分布式服务运行,并确保所有已注册和配置的连接器始终在运行。例如,即使 Kafka Connect 集群中的一个节点出现故障,剩余的 Kafka Connect 节点也会重新启动之前在该节点上运行的任何连接器,从而最大限度地减少停机时间并消除管理活动。

并非所有应用程序都需要这种级别的容错和可靠性,它们可能也不想依赖于外部的 Kafka 代理集群和 Kafka Connect 服务。相反,一些应用程序更愿意 **嵌入** Debezium 连接器到应用程序空间中。它们仍然想要相同的数据更改事件,但更希望连接器直接将它们发送给应用程序,而不是将它们持久化到 Kafka 中。

这个 debezium-api 模块定义了一个小的 API,允许应用程序使用 Debezium 引擎轻松配置和运行 Debezium 连接器。

从 2.6.0 版本开始,Debezium 提供了 DebeziumEngine 接口的两个实现。较旧的 EmbeddedEngine 实现运行一个只使用一个任务的连接器。连接器按顺序发出所有记录。

EmbeddedEngine 是 Debezium 3.1.0.Final 及更早版本的默认实现。从 Debezium 3.2.0.Alpha1 版本开始,默认实现是 AsyncEmbeddedEngine,并且 EmbeddedEngine 实现不再可用。

从 2.6.0 版本开始,提供了一个新的 AsyncEmbeddedEngine 实现。此实现也只运行一个连接器,但它可以处理多线程中的记录,并运行多个任务(如果连接器支持)。(目前只有 SQL Server 和 MongoDB 的连接器支持在单个连接器内运行多个任务)。由于这两个引擎都实现了相同的接口并共享相同的 API,因此下面的代码示例对两个引擎都有效。两个实现都支持相同的配置选项。

然而,新的 AsyncEmbeddedEngine 提供了一些新的配置选项,用于设置和微调并行处理。有关这些新配置选项的信息,请参阅 异步引擎属性。要了解有关开发 AsyncEmbeddedEngine 的动机及其实现细节的更多信息,请参阅 异步嵌入式引擎设计文档

依赖项

要使用 Debezium 引擎模块,请将 debezium-api 模块添加到应用程序的依赖项中。debezium-embedded 模块中有一个开箱即用的此 API 实现,也应该添加到依赖项中。对于 Maven,这需要将以下内容添加到应用程序的 POM 中

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

其中 ${version.debezium} 是您使用的 Debezium 版本,或者是一个包含 Debezium 版本字符串值的 Maven 属性。

同样,为应用程序将使用的每个 Debezium 连接器添加依赖项。例如,以下内容可以添加到应用程序的 Maven POM 文件中,以便应用程序可以使用 MySQL 连接器

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

或者对于 MongoDB 连接器

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mongodb</artifactId>
    <version>${version.debezium}</version>
</dependency>

本文档的其余部分将描述如何在应用程序中嵌入 MySQL 连接器。其他连接器的使用方式类似,只是连接器特定的配置、主题和事件有所不同。

打包你的项目

Debezium 使用 SPI 通过 ServiceLoader 来加载实现。实现可以基于连接器类型,也可以是自定义实现。

某些接口有多个实现。例如,io.debezium.snapshot.spi.SnapshotLock 在核心中有一个默认实现,以及每个连接器的特定实现。为确保 Debezium 能够找到所需的实现,您必须显式配置构建工具以合并 META-INF/services 文件。

例如,如果您使用的是 Maven shade 插件,请添加 ServicesResourceTransformer 转换器,如下例所示

...
<configuration>
 <transformers>
    ...
    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
    ...
 </transformers>
...
</configuration>

或者,如果您使用 Maven Assembly 插件,您可以使用 metaInf-services 容器描述符处理器

代码中

您的应用程序需要为要运行的每个连接器实例设置一个嵌入式引擎。io.debezium.engine.DebeziumEngine<R> 类充当任何 Debezium 连接器的易于使用的包装器,并完全管理连接器的生命周期。您可以使用其 builder API 创建 DebeziumEngine 实例,提供以下内容

  • 您希望接收消息的格式,例如 JSON、Avro 或 Kafka Connect 的 SourceRecord(请参阅 输出消息格式

  • 配置属性(可能从属性文件加载),这些属性定义了引擎和连接器的环境

  • 将为连接器生成的每个数据更改事件调用的方法

以下是一个配置和运行嵌入式 MySQL 连接器 的代码示例

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");
/* begin connector properties */
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "mysqluser");
props.setProperty("database.password", "mysqlpw");
props.setProperty("database.server.id", "85744");
props.setProperty("topic.prefix", "my-app-connector");
props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {
    // Run the engine asynchronously ...
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

    // Do something else or wait for a signal or an event
}
// Engine is stopped when the main code is finished

让我们更详细地研究一下这段代码,从我们在这里重复的前几行开始

// Define the configuration for the Debezium Engine with MySQL connector...
final Properties props = new Properties();
props.setProperty("name", "engine");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", "/path/to/storage/offsets.dat");
props.setProperty("offset.flush.interval.ms", "60000");

这会创建一个新的标准 Properties 对象,用于设置引擎所需的几个字段,而无需考虑使用哪个连接器。第一个是引擎的名称,它将用于连接器生成的源记录和内部状态中,因此请在您的应用程序中使用有意义的名称。connector.class 字段定义了扩展 Kafka Connect 的 org.apache.kafka.connect.source.SourceConnector 抽象类的类名;在本例中,我们指定了 Debezium 的 MySqlConnector 类。

当 Kafka Connect 连接器运行时,它会读取源中的信息,并定期记录“偏移量”,这些偏移量定义了它已处理的信息量。如果连接器被重新启动,它将使用最后记录的偏移量来知道从源信息的哪个位置恢复读取。由于连接器不知道也不关心如何存储偏移量,因此引擎需要提供一种存储和恢复这些偏移量的方法。我们配置的接下来的几个字段指定了我们的引擎应该使用 FileOffsetBackingStore 类,将偏移量存储在本地文件系统上的 /path/to/storage/offset.dat 文件中(文件名可以任意,存储位置也可以任意)。此外,虽然连接器在其生成的每个源记录中都记录了偏移量,但引擎会定期将偏移量刷新到后端存储(在本例中,每分钟一次)。这些字段可以根据您的应用程序需求进行定制。

接下来的几行定义了特定于连接器的字段(记录在每个连接器的文档中),在本例中是 MySqlConnector 连接器

    /* begin connector properties */
    props.setProperty("database.hostname", "localhost");
    props.setProperty("database.port", "3306");
    props.setProperty("database.user", "mysqluser");
    props.setProperty("database.password", "mysqlpw");
    props.setProperty("database.server.id", "85744");
    props.setProperty("topic.prefix", "my-app-connector");
    props.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
    props.setProperty("schema.history.internal.file.filename", "/path/to/storage/schemahistory.dat");

在这里,我们设置了 MySQL 数据库服务器运行的主机名和端口号,并定义了用于连接 MySQL 数据库的用户名和密码。请注意,对于 MySQL,用户名和密码应该对应一个已被授予以下 MySQL 权限的 MySQL 数据库用户

  • SELECT

  • RELOAD

  • SHOW DATABASES

  • REPLICATION SLAVE

  • REPLICATION CLIENT

前三个权限是在读取数据库的连续快照时必需的。最后两个权限允许数据库读取服务器的 binlog,该 binlog 通常用于 MySQL 复制。

配置还包括一个 server.id 的数字标识符。由于 MySQL 的 binlog 是 MySQL 复制机制的一部分,因此为了读取 binlog,MySqlConnector 实例必须加入 MySQL 服务器组,这意味着此服务器 ID 必须 在构成 MySQL 服务器组的所有进程中是唯一的,并且是介于 1 和 232-1 之间的任何整数。在我们的代码中,我们将其设置为一个相当大但有些随机的值,我们只为我们的应用程序使用。

配置还指定了一个 MySQL 服务器的逻辑名称。连接器会将此逻辑名称包含在其生成的每个源记录的主题字段中,从而使您的应用程序能够辨别这些记录的来源。我们的示例使用的是“products”这个服务器名,可能是因为数据库包含产品信息。当然,您可以将其命名为对您的应用程序有意义的任何名称。

MySqlConnector 类运行时,它会读取 MySQL 服务器的 binlog,其中包含服务器托管的数据库中发生的所有数据更改和 Schema 更改。由于所有数据更改都根据更改记录时的所有者表的 Schema 进行结构化,因此连接器需要跟踪所有 Schema 更改,以便能够正确地解码更改事件。连接器会记录 Schema 信息,以便在连接器重新启动并从最后记录的偏移量处恢复读取时,它确切地知道在那个偏移量处数据库 Schema 的样子。连接器如何记录数据库 Schema 历史记录是在我们配置的最后两个字段中定义的,即我们的连接器应该使用 FileSchemaHistory 类,将数据库 Schema 历史记录更改存储在本地文件系统上的 /path/to/storage/schemahistory.dat 文件中(同样,这个文件可以任意命名,存储在任何地方)。

最后,使用 build() 方法构建不可变的配置。(顺便说一句,我们不是通过编程方式构建它,而是可以通过使用 Configuration.read(…​) 方法之一从属性文件中 **读取** 配置。)

现在我们有了配置,就可以创建我们的引擎了。以下是相关的代码行

// Create the engine with this configuration ...
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        })
        .build()) {
}

所有更改事件都将传递给给定的处理程序方法,该方法必须匹配 java.util.function.Consumer<R> 函数式接口的签名,其中 <R> 必须与调用 create() 时指定的格式类型匹配。请注意,您的应用程序的处理程序函数不应抛出任何异常;如果抛出异常,引擎将记录抛出的任何异常,并继续处理下一个源记录,但您的应用程序将没有机会再次处理导致异常的特定源记录,这意味着您的应用程序可能与数据库不一致。

此时,我们已经有了一个已配置并准备运行的 DebeziumEngine 对象,但它什么也没做。DebeziumEngine 被设计为由 ExecutorExecutorService 异步执行

// Run the engine asynchronously ...
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);

// Do something else or wait for a signal or an event

您的应用程序可以通过调用其 close() 方法来安全且优雅地停止引擎

// At some later time ...
engine.close();

或者,由于引擎支持 Closeable 接口,因此在离开 try 块时会自动调用。

引擎的连接器将停止从源系统读取信息,将所有剩余的更改事件转发给您的处理程序函数,并将最新的偏移量刷新到偏移量存储。只有在所有这些操作完成后,引擎的 run() 方法才会返回。如果您的应用程序需要等待引擎完全停止后再退出,您可以使用 ExcecutorServiceshutdownawaitTermination 方法来完成

try {
    executor.shutdown();
    while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
        logger.info("Waiting another 5 seconds for the embedded engine to shut down");
    }
}
catch ( InterruptedException e ) {
    Thread.currentThread().interrupt();
}

或者,您可以在创建 DebeziumEngine 时注册 CompletionCallback,以便在引擎终止时收到通知。

请记住,当 JVM 关闭时,它只会等待非守护线程。因此,当您在守护线程上运行引擎时,如果您的应用程序退出,请确保等待引擎进程完成。

您的应用程序应始终正确停止引擎,以确保优雅且完整的关闭,并且每个源记录都只被发送给应用程序一次。例如,不要依赖于关闭 ExecutorService,因为这会中断正在运行的线程。虽然 DebeziumEngine 在其线程被中断时确实会终止,但引擎可能不会干净地终止,当您的应用程序重新启动时,它可能会看到一些在关闭前刚刚处理过的相同源记录。

如前所述,DebeziumEngine 接口有两种实现。这两种实现使用相同的 API,并且前面的代码示例对两个版本都有效。唯一的例外是 DebeziumEngine 实例的创建。如引言中所述,默认情况下使用 AsyncEmbeddedEngine 实现。因此,方法 DebeziumEngine.create(Json.class) 在内部会使用 AsyncEmbeddedEngine 实例。

输出消息格式

DebeziumEngine#create() 可以接受多个不同的参数,这些参数会影响消息由消费者接收的格式。允许的值包括:

  • Connect.class - 输出值是封装 Kafka Connect 的 SourceRecord 的更改事件

  • Json.class - 输出值是编码为 JSON 字符串的键值对

  • JsonByteArray.class - 输出值是格式化为 JSON 并编码为 UTF-8 字节数组的键值对

  • Avro.class - 输出值是编码为 Avro 序列化记录的键值对(有关更多详细信息,请参阅 Avro 序列化

  • CloudEvents.class - 输出值是编码为 Cloud Events 消息的键值对

在调用 DebeziumEngine#create() 时也可以指定标头格式。允许的值包括:

  • Json.class - 标头值被编码为 JSON 字符串

  • JsonByteArray.class - 标头值被格式化为 JSON 并编码为 UTF-8 字节数组

内部,引擎将数据转换委托给 Kafka Connect 或 Apicurio 转换器实现,并使用最适合执行转换的算法。转换器可以通过引擎属性进行参数化以修改其行为。

JSON 输出格式的示例是

final Properties props = new Properties();
...
props.setProperty("converter.schemas.enable", "false"); // don't include schema in message
...
final DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    .using(props)
    .notifying((records, committer) -> {

        for (ChangeEvent<String, String> r : records) {
            System.out.println("Key = '" + r.key() + "' value = '" + r.value() + "'");
            committer.markProcessed(r);
        }
...

其中 ChangeEvent 数据类型是键/值对。

消息转换

在消息传递给处理程序之前,可以将其通过 Kafka Connect 简单消息转换 (SMT) 的管道。每个 SMT 可以将消息原样传递、修改它或过滤掉它。链条使用属性 transforms 进行配置。该属性包含要应用的转换的逻辑名称的逗号分隔列表。属性 transforms.<logical_name>.type 然后定义每个转换的实现类的名称,transforms.<logical_name>.* 是传递给转换的配置选项。

配置示例是

final Properties props = new Properties();
...
props.setProperty("transforms", "filter, router");                                               // (1)
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");  // (2)
props.setProperty("transforms.router.regex", "(.*)");                                            // (3)
props.setProperty("transforms.router.replacement", "trf$1");                                     // (3)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");      // (4)
  1. 定义了两个转换 - filterrouter

  2. router 转换的实现是 org.apache.kafka.connect.transforms.RegexRouter

  3. router 转换有两个配置选项 - regexreplacement

  4. filter 转换的实现是 io.debezium.embedded.ExampleFilterTransform

消息转换谓词

可以将谓词应用于转换,使转换成为可选的。

配置示例是

final Properties props = new Properties();
...
props.setProperty("transforms", "filter");                                                 // (1)
props.setProperty("predicates", "headerExists");                                           // (2)
props.setProperty("predicates.headerExists.type", "org.apache.kafka.connect.transforms.predicates.HasHeaderKey"); //(3)
props.setProperty("predicates.headerExists.name", "header.name");                          // (4)
props.setProperty("transforms.filter.type", "io.debezium.embedded.ExampleFilterTransform");// (5)
props.setProperty("transforms.filter.predicate", "headerExists");                          // (6)
props.setProperty("transforms.filter.negate", "true");                                     // (7)
  1. 定义了一个转换 - filter

  2. 定义了一个谓词 - headerExists

  3. headerExists 谓词的实现是 org.apache.kafka.connect.transforms.predicates.HasHeaderKey

  4. headerExists 谓词有一个配置选项 - name

  5. filter 转换的实现是 io.debezium.embedded.ExampleFilterTransform

  6. filter 转换需要谓词 headerExists

  7. filter 转换期望谓词的值被取反,从而使谓词决定标头是否存在

高级记录消费

对于某些用例,例如尝试批量写入记录或对异步 API 进行写入时,上面描述的函数式接口可能会很棘手。在这些情况下,使用 io.debezium.engine.DebeziumEngine.ChangeConsumer<R>. 接口可能会更容易。

该接口有一个签名如下的单个函数

/**
  * Handles a batch of records, calling the {@link RecordCommitter#markProcessed(Object)}
  * for each record and {@link RecordCommitter#markBatchFinished()} when this batch is finished.
  * @param records the records to be processed
  * @param committer the committer that indicates to the system that we are finished
  */
 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

如 Javadoc 中所述,RecordCommitter 对象应为每个记录调用一次,并在每个批次完成后调用一次。RecordCommitter 接口是线程安全的,这使得记录的处理非常灵活。

您可以选择覆盖已处理记录的偏移量。这是通过首先调用 RecordCommitter#buildOffsets() 来构建新的 Offsets 对象,使用 Offsets#set(String key, Object value) 更新偏移量,然后调用 RecordCommitter#markProcessed(SourceRecord record, Offsets sourceOffsets),并附带更新后的 Offsets 来完成的。

要使用 ChangeConsumer API,您必须将该接口的实现传递给 notifying API,如下所示

class MyChangeConsumer implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
  public void handleBatch(List<RecordChangeEvent<SourceRecord>> records, RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<RecordChangeEvent<SourceRecord>> engine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(props)
        .notifying(new MyChangeConsumer())
        .build();

如果使用 JSON 格式(其他格式也一样),代码将如下所示

class JsonChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
  public void handleBatch(List<ChangeEvent<String, String>> records,
    RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
    ...
  }
}
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
        .using(props)
        .notifying(new JsonChangeConsumer())
        .build();

引擎属性

除非有默认值,否则以下配置属性是 **必需的**(为了文本格式化,Java 类的包名被替换为 <…​>)。

属性

Default (默认值)

描述

name (名称)

连接器实例的唯一名称。

connector.class

连接器的 Java 类名,例如 MySQL 连接器的 <…​>.MySqlConnector

offset.storage

<…​>.FileOffsetBackingStore

负责持久化连接器偏移量的 Java 类名。它必须实现 <…​>.OffsetBackingStore 接口。

offset.storage.file.filename

""

存储偏移量的文件的路径。当 offset.storage 设置为 <…​>.FileOffsetBackingStore 时需要。

offset.storage.topic

""

存储偏移量的 Kafka 主题名称。当 offset.storage 设置为 <…​>.KafkaOffsetBackingStore 时需要。

offset.storage.partitions

""

创建偏移量存储主题时使用的分区数。当 offset.storage 设置为 <…​>.KafkaOffsetBackingStore 时需要。

offset.storage.replication.factor

""

创建偏移量存储主题时使用的复制因子。当 offset.storage 设置为 <…​>.KafkaOffsetBackingStore 时需要。

offset.commit.policy

<…​>.PeriodicCommitOffsetPolicy

提交策略的 Java 类名。它定义了偏移量提交应该何时触发,基于处理的事件数量和自上次提交以来经过的时间。此类必须实现 <…​>.OffsetCommitPolicy 接口。默认是基于时间间隔的周期性提交策略。

offset.flush.interval.ms

60000

尝试提交偏移量的间隔。默认为 1 分钟。

offset.flush.timeout.ms

5000

在取消进程并恢复偏移量数据以在未来尝试中提交之前,等待记录刷新并将分区偏移量数据提交到偏移量存储的最长毫秒数。默认为 5 秒。

errors.max.retries

-1

在连接错误时重试的最大次数,之后失败(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。

errors.retry.delay.initial.ms

300

遇到连接错误时重试的初始延迟(毫秒)。此值会在每次重试时翻倍,但不会超过 errors.retry.delay.max.ms

errors.retry.delay.max.ms

10000

遇到连接错误时重试之间的最大延迟(毫秒)。

异步引擎属性

属性

Default (默认值)

描述

record.processing.threads

按需分配线程,基于工作负载和可用 CPU 核心数。

可用于处理更改事件记录的线程数。如果未指定值(默认值),则引擎使用 Java 的 ThreadPoolExecutor 根据当前工作负载动态调整线程数。最大线程数是给定机器上的 CPU 核心数。如果指定了值,则引擎使用 Java 的 固定线程池 方法创建一个具有指定线程数的线程池。要使用给定机器上的所有可用核心,请将占位符值设置为 AVAILABLE_CORES

record.processing.shutdown.timeout.ms

1000

在调用任务关闭后等待已提交记录处理的最长毫秒数。

record.processing.order

ORDERED

确定记录的生成方式。

ORDERED

记录按顺序处理;也就是说,它们按照从数据库获取的顺序生成。

UNORDERED

记录按非顺序处理;也就是说,它们可能以与源数据库不同的顺序生成。

UNORDERED 选项的非顺序处理可以提高吞吐量,因为记录在任何 SMT 处理和消息序列化完成后立即生成,而无需等待其他记录。当向引擎提供了 ChangeConsumer 方法时,此选项无效。

record.processing.with.serial.consumer

false

指定是否从提供的 Consumer 创建默认的 ChangeConsumer,从而实现串行 Consumer 处理。如果您在创建引擎的 API 时指定了 ChangeConsumer 接口,则此选项无效。

task.management.timeout.ms

180,000 (3 分钟)

引擎等待任务的生命周期管理操作(启动和停止)完成的毫秒数。

数据库 Schema 历史记录属性

一些连接器还需要额外的属性集来配置数据库 Schema 历史记录。

  • MySQL

  • SQL Server

  • Oracle

  • Db2

如果没有正确配置数据库 Schema 历史记录,连接器将拒绝启动。默认配置需要 Kafka 集群可用。对于其他部署,提供了一个基于文件的数据库 Schema 历史记录存储实现。

属性 Default (默认值) 描述

schema.history.internal

<…​>.KafkaSchemaHistory

负责持久化数据库 Schema 历史记录的 Java 类名。
它必须实现 <…​>.SchemaHistory 接口。

schema.history.internal.file.filename

""

存储数据库 Schema 历史记录的文件的路径。
schema.history.internal 设置为 <…​>.FileSchemaHistory 时需要。

schema.history.internal.kafka.topic

""

存储数据库 Schema 历史记录的 Kafka 主题。
schema.history.internal 设置为 <…​>.KafkaSchemaHistory 时需要。

schema.history.internal.kafka.bootstrap.servers

""

要连接的 Kafka 集群服务器的初始列表。集群提供用于存储数据库 Schema 历史记录的主题。
schema.history.internal 设置为 <…​>.KafkaSchemaHistory 时需要。

处理失败

当引擎执行时,它的连接器会在每个源记录中主动记录源偏移量,并且引擎会定期将这些偏移量刷新到持久存储。当应用程序和引擎正常关闭或崩溃时,当它们被重新启动时,引擎及其连接器将 **从最后记录的偏移量** 处恢复读取源信息。

那么,当您的应用程序在嵌入式引擎运行时出现故障会发生什么?最终结果是,应用程序在重新启动后可能会收到一些在崩溃前已经处理过的源记录。收到重复记录的数量取决于引擎多久将偏移量刷新到其存储(通过 offset.flush.interval.ms 属性)以及特定连接器一次返回多少个源记录。最好的情况是,偏移量每次都刷新(例如,offset.flush.interval.ms 设置为 0),但即使那样,嵌入式引擎在从连接器接收到每个源记录批次后仍然只会刷新偏移量。

例如,MySQL 连接器使用 max.batch.size 来指定批次中可以出现的最大源记录数。即使 offset.flush.interval.ms 设置为 0,当应用程序在崩溃后重新启动时,它可能会看到多达 **n** 个重复项,其中 **n** 是批次的大小。如果 offset.flush.interval.ms 属性设置得更高,那么应用程序可能会看到多达 n * m 个重复项,其中 **n** 是批次的最大大小,**m** 是在单个偏移量刷新间隔内可能累积的批次数。(显然,可以配置嵌入式连接器不使用批处理并始终刷新偏移量,从而导致应用程序永远不会收到任何重复的源记录。然而,这会大大增加连接器的开销并降低吞吐量。)

底线是,在使用嵌入式连接器时,应用程序在正常操作期间(包括正常关闭后的重启)会收到每个源记录一次,但需要能够容忍在崩溃或不正确关闭后的重启后立即收到重复事件。如果应用程序需要更严格的“仅一次”行为,那么它们应该使用完整的 Debezium 平台,该平台可以提供“仅一次”保证(即使在崩溃和重启之后)。