您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

自定义转换器

此功能目前处于孵化状态,即确切的语义、配置选项等可能在未来的修订版中发生变化,具体取决于我们收到的反馈。在使用此扩展时,如果您遇到任何问题,请告知我们。

数据类型转换

Debezium 变更事件记录中的每个字段都代表源表或数据集合中的一个字段或列。当连接器将变更事件记录发送到 Kafka 时,它会将每个字段的数据类型转换为 Kafka Connect schema 类型。列值同样会转换为与目标字段的 schema 类型匹配。对于每个连接器,默认映射指定了连接器如何转换每种数据类型。这些默认映射在每个连接器的数据类型文档中有描述。

虽然默认映射通常足够,但对于某些应用程序,您可能希望应用替代映射。例如,如果默认映射以自 Unix 纪元以来的毫秒数格式导出列,但下游应用程序只能将列值作为格式化字符串进行消耗,那么您可能需要自定义映射。您可以通过开发和部署自定义转换器来定制数据类型映射。您可以配置自定义转换器以应用于某种类型的所有列,也可以将其范围缩小到仅应用于特定的表列。转换器函数会拦截任何匹配指定条件的列的数据类型转换请求,然后执行指定的转换。转换器会忽略不匹配指定条件的列。

自定义转换器是实现 Debezium 服务提供商接口 (SPI) 的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定了连接器可用的转换器,并且可以包含子属性以进一步修改转换行为。

在您启动连接器后,将在连接器配置中启用的转换器将被实例化并添加到注册表中。注册表将每个转换器与要处理的列或字段关联起来。每当 Debezium 处理新的变更事件时,它会调用配置的转换器来转换已注册的列或字段。

以下说明仅适用于 Debezium 关系型数据库源连接器。您无法使用此信息为 Debezium MongoDB 连接器或 Debezium JDBC 汇连接器创建自定义转换器。

实现自定义转换器

以下示例展示了一个实现 io.debezium.spi.converter.CustomConverter 接口的 Java 类的转换器实现。

public interface CustomConverter<S, F extends ConvertedField> {

    @FunctionalInterface
    interface Converter {  (1)
        Object convert(Object input);
    }

    public interface ConverterRegistration<S> { (2)
        void register(S fieldSchema, Converter converter); (3)
    }

    void configure(Properties props);

    void converterFor(F field, ConverterRegistration<S> registration); (4)
}
表 1. 实现 io.debezium.spi.converter.CustomConverter 接口的 Java 转换器类字段描述
Item 描述

1

将数据从一种类型转换为另一种类型。

2

注册转换器的回调。

3

为当前字段注册给定的 schema 和转换器。对于同一个字段,不应调用超过一次。

4

为特定字段的使用注册自定义值和 schema 转换器。

自定义转换器方法

CustomConverter 接口的实现必须包含以下方法:

configure()

将连接器配置中指定的属性传递给转换器实例。configure 方法在连接器初始化时运行。您可以将转换器与多个连接器一起使用,并根据连接器的属性设置修改其行为。
configure 方法接受以下参数:

props

包含要传递给转换器实例的属性。每个属性指定转换特定类型列值的格式。

converterFor()

注册转换器以处理数据源中的特定列或字段。Debezium 调用 converterFor() 方法来提示转换器进行 registration 调用以进行转换。converterFor 方法对每个列运行一次。
该方法接受以下参数:

field

一个对象,用于传递有关正在处理的字段或列的元数据。列元数据可以包括列名或字段名、表名或集合名、数据类型、大小等。

registration

类型为 io.debezium.spi.converter.CustomConverter.ConverterRegistration 的对象,它提供了目标 schema 定义和转换列数据的代码。当源列匹配转换器应处理的类型时,转换器将调用 registration 参数。调用 register 方法来为 schema 中的每个列定义转换器。Schema 使用 Kafka Connect SchemaBuilder API 来表示。将来,将添加一个独立的 schema 定义 API。

Debezium 自定义转换器示例

以下示例实现了一个简单的转换器,它执行以下操作:

  • 运行 configure 方法,根据连接器配置中指定的 schema.name 属性的值配置转换器。转换器配置特定于每个实例。

  • 运行 converterFor 方法,它将转换器注册为处理数据源中数据类型设置为 isbn 的源列中的值。

    • 根据为 schema.name 属性指定的值标识目标 STRING schema。

    • 将源列中的 ISBN 数据转换为 String 值。

示例 1. 一个简单的自定义转换器
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {

    private String isbnSchemaName;

    @Override
    public void configure(Properties props) {
        isbnSchemaName = props.getProperty("schema.name");
    }

    @Override
    public void converterFor(RelationalColumn column,
            ConverterRegistration<SchemaBuilder> registration) {

        if ("isbn".equals(column.typeName())) {
            registration.register(SchemaBuilder.string().name(isbnSchemaName), x -> x.toString());
        }
    }
}

Debezium 和 Kafka Connect API 模块依赖

自定义转换器 Java 项目对 Debezium API 和 Kafka Connect API 库模块具有编译依赖。这些编译依赖必须包含在项目的 pom.xml 中,如下面的示例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version> (1)
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>${version.kafka}</version> (2)
</dependency>
表 2. pom.xml 中编译依赖版本描述
Item 描述

1

${version.debezium} 表示 Debezium 连接器的版本。

2

${version.kafka} 表示您环境中的 Apache Kafka 版本。

配置和使用转换器

自定义转换器作用于源表中的特定列或列类型,以指定如何将源中的数据类型转换为 Kafka Connect schema 类型。要将自定义转换器与连接器一起使用,请将转换器 JAR 文件与连接器文件一起部署,然后配置连接器以使用该转换器。

自定义转换器旨在修改 Debezium 关系型数据库源连接器发出的消息。您无法配置 Debezium MongoDB 连接器或 Debezium JDBC 汇连接器来使用自定义转换器。

部署自定义转换器

先决条件
  • 您有一个自定义转换器 Java 程序。

过程
  • 要将自定义转换器与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,然后将该文件复制到包含您要使用它的每个 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 位于自己的子目录中(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等)。要将转换器与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。

要将转换器与多个连接器一起使用,您必须将转换器 JAR 文件的副本放在每个连接器子目录中。

配置连接器以使用自定义转换器

要使连接器能够使用自定义转换器,请将指定转换器名称和类的属性添加到 Debezium 源连接器的配置中。您无法配置 Debezium JDBC 汇连接器来使用自定义转换器。如果转换器需要更多信息来定制特定数据类型的格式,您可以定义其他配置选项来提供这些信息。

先决条件
过程
  • 通过向 Debezium 源连接器配置添加以下必需属性来为连接器实例启用转换器:

    converters: <converterSymbolicName> (1)
    <converterSymbolicName>.type: <fullyQualifiedConverterClassName> (2)
    表 3. 启用转换器的连接器配置属性描述
    Item 描述

    1

    必需的 converters 属性枚举了要与连接器一起使用的转换器实例的符号名称的逗号分隔列表。此属性的值用作您为转换器指定的其他属性名称的前缀。

    2

    必需的 <converterSymbolicName>.type 属性指定了实现转换器的类名。

    例如,对于之前的 自定义转换器示例,您会将以下属性添加到连接器配置中:

    converters: isbn
    isbn.type: io.debezium.test.IsbnConverter
  • 要将其他属性与自定义转换器关联,请在属性名称前加上转换器的符号名称,后跟一个点(.)。符号名称是您为 converters 属性指定的值的标签。例如,要为前面的 isbn 转换器添加一个属性,以指定要传递给转换器代码中 configure 方法的 schema.name,请添加以下属性:

    isbn.schema.name: io.debezium.postgresql.type.Isbn