自定义转换器
|
此功能目前处于孵化状态,即确切的语义、配置选项等可能在未来的修订版中发生变化,具体取决于我们收到的反馈。在使用此扩展时,如果您遇到任何问题,请告知我们。 |
数据类型转换
Debezium 变更事件记录中的每个字段都代表源表或数据集合中的一个字段或列。当连接器将变更事件记录发出到 Kafka 时,它会将每个字段的数据类型转换为 Kafka Connect 的 Schema 类型。列值同样会被转换为匹配目标字段的 Schema 类型。对于每个连接器,都有一个默认映射,指定连接器如何转换每种数据类型。这些默认映射在每个连接器的 数据类型文档 中进行了描述。
虽然默认映射通常已经足够,但对于某些应用程序,您可能希望应用一个替代映射。例如,如果默认映射以自 Unix 纪元以来的毫秒数格式导出列,但您的下游应用程序只能将列值作为格式化字符串进行处理,那么您就需要自定义映射。您可以通过开发和部署自定义转换器来定制数据类型映射。您可以将自定义转换器配置为作用于所有特定类型的列,也可以将其范围缩小到仅适用于特定的表列。转换器函数会拦截任何匹配指定条件的列的数据类型转换请求,然后执行指定的转换。对于不匹配指定条件的列,转换器会忽略它们。
自定义转换器是实现 Debezium 服务提供商接口 (SPI) 的 Java 类。您可以通过在连接器配置中设置 converters 属性来启用和配置自定义转换器。converters 属性指定了可供连接器使用的转换器,并且可以包含子属性,以进一步修改转换行为。
启动连接器后,连接器配置中启用的转换器将被实例化并添加到注册表中。注册表将每个转换器与其要处理的列或字段关联起来。每当 Debezium 处理新的变更事件时,它会调用已配置的转换器来转换已为其注册的列或字段。
|
以下说明仅适用于 Debezium 关系数据库源连接器。您不能使用此信息为 Debezium MongoDB 连接器或 Debezium JDBC 接收连接器创建自定义转换器。 |
实现自定义转换器
以下示例展示了一个实现 io.debezium.spi.converter.CustomConverter 接口的 Java 类的转换器实现。
public interface CustomConverter<S, F extends ConvertedField> {
@FunctionalInterface
interface Converter { (1)
Object convert(Object input);
}
public interface ConverterRegistration<S> { (2)
void register(S fieldSchema, Converter converter); (3)
}
void configure(Properties props);
void converterFor(F field, ConverterRegistration<S> registration); (4)
}
| Item | 描述 |
|---|---|
1 |
将数据从一种类型转换为另一种类型。 |
2 |
注册转换器的回调。 |
3 |
为当前字段注册给定的 Schema 和转换器。对于同一个字段,不应调用超过一次。 |
4 |
注册自定义值和 Schema 转换器以与特定字段一起使用。 |
自定义转换器方法
CustomConverter 接口的实现必须包含以下方法:
configure()-
将连接器配置中指定的属性传递给转换器实例。
configure方法在连接器初始化时运行。您可以使用转换器处理多个连接器,并根据连接器的属性设置修改其行为。configure方法接受以下参数:props-
包含要传递给转换器实例的属性。每个属性指定转换特定类型列的值的格式。
converterFor()-
注册转换器以处理数据源中的特定列或字段。Debezium 调用
converterFor()方法来提示转换器进行转换注册。converterFor方法对每个列运行一次。
该方法接受以下参数:field-
一个对象,用于传递有关正在处理的字段或列的元数据。列元数据可以包括列名或字段名、表名或集合名、数据类型、大小等。
registration-
一个类型为
io.debezium.spi.converter.CustomConverter.ConverterRegistration的对象,它提供了目标 Schema 定义和转换列数据的代码。当源列匹配转换器应处理的类型时,转换器会调用registration参数。通过调用register方法为 Schema 中的每个列定义转换器。Schema 使用 Kafka Connect 的SchemaBuilderAPI 表示。未来将添加独立的 Schema 定义 API。
Debezium 自定义转换器示例
以下示例实现了一个简单的转换器,它执行以下操作:
-
运行
configure方法,该方法根据连接器配置中指定的schema.name属性值来配置转换器。转换器配置特定于每个实例。 -
运行
converterFor方法,该方法将转换器注册为处理数据源列中数据类型设置为isbn的值。-
根据为
schema.name属性指定的值标识目标STRINGSchema。 -
将源列中的 ISBN 数据转换为
String值。
-
public static class IsbnConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
private String isbnSchemaName;
@Override
public void configure(Properties props) {
isbnSchemaName = props.getProperty("schema.name");
}
@Override
public void converterFor(RelationalColumn column,
ConverterRegistration<SchemaBuilder> registration) {
if ("isbn".equals(column.typeName())) {
registration.register(SchemaBuilder.string().name(isbnSchemaName), x -> x.toString());
}
}
}
Debezium 和 Kafka Connect API 模块依赖
自定义转换器 Java 项目在编译时依赖于 Debezium API 和 Kafka Connect API 库模块。这些编译依赖项必须包含在项目的 pom.xml 文件中,如下例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version> (1)
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${version.kafka}</version> (2)
</dependency>
| Item | 描述 |
|---|---|
1 |
|
2 |
|
配置和使用转换器
自定义转换器作用于源表中的特定列或列类型,以指定如何将源中的数据类型转换为 Kafka Connect Schema 类型。要将自定义转换器与连接器一起使用,请将转换器 JAR 文件与连接器文件一起部署,然后配置连接器以使用该转换器。
|
自定义转换器旨在修改由 Debezium 关系数据库源连接器发出的消息。您不能配置 Debezium MongoDB 连接器或 Debezium JDBC 接收连接器来使用自定义转换器。 |
部署自定义转换器
-
您有一个自定义转换器 Java 程序。
-
要将自定义转换器与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含您想使用它的每个 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于自己的子目录中 (/kafka/connect/debezium-connector-db2,/kafka/connect/debezium-connector-mysql等)。要将转换器与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。
| 要将转换器与多个连接器一起使用,您必须将转换器 JAR 文件的副本放置在每个连接器子目录中。 |
配置连接器以使用自定义转换器
要使连接器能够使用自定义转换器,请在 Debezium 源连接器的配置中添加属性来指定转换器的名称和类。您不能配置 Debezium JDBC 接收连接器来使用自定义转换器。如果转换器需要额外信息来定制特定数据类型的格式,您可以定义其他配置选项来提供这些信息。
-
通过在连接器配置中添加以下必需属性,为连接器实例启用转换器:
converters: <converterSymbolicName> (1) <converterSymbolicName>.type: <fullyQualifiedConverterClassName> (2)
表 3. 用于启用转换器的连接器配置属性描述 Item 描述 1
必需的
converters属性列出了要与连接器一起使用的转换器实例的符号名称,以逗号分隔。此属性的值将作为您为转换器指定的其他属性名称的前缀。2
必需的
<converterSymbolicName>.type属性指定实现该转换器的类的名称。例如,对于前面 的自定义转换器示例,您会在连接器配置中添加以下属性:
converters: isbn isbn.type: io.debezium.test.IsbnConverter
-
要将其他属性与自定义转换器关联,请在属性名称前加上转换器的符号名称,后跟一个点 (
.)。符号名称是您为converters属性指定的值标签。例如,要为前面的isbn转换器添加一个属性,以指定要传递给转换器代码中configure方法的schema.name,请添加以下属性:isbn.schema.name: io.debezium.postgresql.type.Isbn