Avro 序列化
Debezium 连接器在 Kafka Connect 框架中工作,通过生成变更事件记录来捕获数据库中的每一行级更改。对于每个变更事件记录,Debezium 连接器会完成以下操作:
-
应用配置的转换。
-
使用配置的 Kafka Connect 转换器 将记录的键和值序列化为二进制形式。
-
将记录写入正确的 Kafka 主题。
您可以为每个单独的 Debezium 连接器实例指定转换器。Kafka Connect 提供了一个 JSON 转换器,它将记录的键和值序列化为 JSON 文档。默认行为是 JSON 转换器包含消息的 schema,这使得每条记录都非常冗长。 Debezium 教程展示了包含 payload 和 schema 的记录的样子。如果您希望记录使用 JSON 进行序列化,请考虑将以下连接器配置属性设置为 false:
-
key.converter.schemas.enable -
value.converter.schemas.enable
将这些属性设置为 false 会从每条记录中排除冗长的 schema 信息。
或者,您可以使用 Apache Avro 来序列化记录的键和值。Avro 二进制格式紧凑且高效。Avro schema 可以确保每条记录都具有正确的结构。Avro 的 schema 演进机制允许 schema 演进。这对于 Debezium 连接器至关重要,因为它们动态生成每条记录的 schema 以匹配已更改的数据库表的结构。随着时间的推移,写入同一 Kafka 主题的变更事件记录可能具有相同 schema 的不同版本。Avro 序列化使得变更事件记录的消费者更容易适应不断变化的记录 schema。
要使用 Apache Avro 序列化,您必须部署一个管理 Avro 消息 schema 及其版本的 schema registry。可用的选项包括 Apicurio API 和 Schema Registry 以及 Confluent Schema Registry。此处将对两者进行描述。
Apicurio API 和 Schema Registry
关于 Apicurio API 和 Schema Registry
开源项目 Apicurio Registry 提供了多个与 Avro 配合使用的组件:
-
一个 Avro 转换器,您可以在 Debezium 连接器配置中指定。此转换器将 Kafka Connect schema 映射到 Avro schema。然后,转换器使用 Avro schema 将记录的键和值序列化为 Avro 的紧凑二进制格式。
-
一个 API 和 schema registry,用于跟踪:
-
在 Kafka 主题中使用的 Avro schema。
-
Avro 转换器发送生成的 Avro schema 的位置。
由于 Avro schema 存储在此 registry 中,因此每条记录只需要包含一个微小的 *schema 标识符*。这使得每条记录更小。对于像 Kafka 这样的 I/O 密集型系统,这意味着生产者和消费者的总吞吐量更高。
-
-
用于 Kafka 生产者和消费者的 Avro *Serdes*(序列化器和反序列化器)。您编写用于消费变更事件记录的 Kafka 消费者应用程序可以使用 Avro Serdes 来反序列化变更事件记录。
-
Apicurio 的 *Confluent 兼容模式*,通过设置为
as-confluent: true来启用,它允许 Apicurio Registry 使用与 Confluent Schema Registry 相同的线上传输格式(包括魔术字节和 4 字节 schema ID)来序列化 Kafka 消息。这使得与 Confluent 客户端和工具完全互操作,使 Apicurio 成为一个即插即用式的替换方案,而无需修改生产者或消费者。
要将 Apicurio Registry 与 Debezium 一起使用,请将 Apicurio Registry 转换器及其依赖项添加到您用于运行 Debezium 连接器的 Kafka Connect 容器镜像中。
|
Apicurio Registry 项目还提供了一个 JSON 转换器。此转换器结合了更少冗余消息和人类可读 JSON 的优点。消息本身不包含 schema 信息,只包含 schema ID。 |
|
要使用 Apicurio Registry 提供的转换器,您需要提供 |
Apicurio Registry 部署概览
要部署一个使用 Avro 序列化的 Debezium 连接器,您必须完成三个主要任务:
-
部署一个 Apicurio API 和 Schema Registry 实例。
-
将 Avro 转换器从 安装包 安装到插件目录中。如果您使用的是 Debezium Connect 容器镜像,则无需安装该包。有关更多信息,请参阅 使用 Debezium 容器部署 Apicurio Registry。
-
通过设置以下配置属性,将 Debezium 连接器实例配置为使用 Avro 序列化:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true schema.name.adjustment.mode=avro或者,您可以使用*Confluent 兼容模式*,如下所示:
key.converter=io.apicurio.registry.utils.converter.AvroConverter key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 key.converter.apicurio.registry.auto-register=true key.converter.apicurio.registry.find-latest=true key.converter.schemas.enable": "false" key.converter.apicurio.registry.headers.enabled": "false" key.converter.apicurio.registry.as-confluent": "true" key.converter.apicurio.use-id: "contentId" value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2 value.converter.apicurio.registry.auto-register=true value.converter.apicurio.registry.find-latest=true value.converter.schemas.enable": "false" value.converter.apicurio.registry.headers.enabled": "false" value.converter.apicurio.registry.as-confluent": "true" value.converter.apicurio.use-id: "contentId" schema.name.adjustment.mode=avro
当在 Confluent 兼容模式下使用 Apicurio Registry 时,参数的配置方式有所不同,以精确模仿 Confluent Schema Registry 在线上传输格式和客户端期望方面的工作方式。
-
converter.apicurio.registry.as-confluent: true:强制 Apicurio 使用与 Confluent 相同的结构(魔术字节 + 4 字节 schema ID)序列化 Kafka 消息。 -
converter.apicurio.use-id: true(或globalId):确保消息中仅写入一个数字 schema ID,这与 Confluent 消费者期望的一致。 -
converter.schemas.enable: false:防止将完整的 Avro schema 嵌入到 Kafka 消息中,这是 Confluent 反序列化器不期望的(它们通过 ID 从 registry 获取 schema)。 -
converter.apicurio.registry.headers.enabled: false:禁用 Confluent 客户端无法理解的 Apicurio 特定元数据头,保持消息格式整洁且兼容。
在内部,Kafka Connect 始终使用 JSON 键/值转换器来存储配置和偏移量。
使用 Debezium 容器部署 Apicurio Registry
在您的环境中,您可能希望使用提供的 Debezium 容器镜像来部署使用 Avro 序列化的 Debezium 连接器。请按照此处的步骤进行操作。在此过程中,您将在 Debezium Kafka Connect 容器镜像上启用 Apicurio 转换器,并配置 Debezium 连接器以使用 Avro 转换器。
-
您已安装 Docker 并有权创建和管理容器。
-
您已下载要与 Avro 序列化一起部署的 Debezium 连接器插件。
-
部署一个 Apicurio Registry 实例。
以下示例使用一个非生产环境的、内存中的 Apicurio Registry 实例:
docker run -it --rm --name apicurio \ -p 8080:8080 apicurio/apicurio-registry-mem:2.6.2.Final -
运行 Kafka Connect 的 Debezium 容器镜像,通过
ENABLE_APICURIO_CONVERTERS=true环境变量来配置它以提供 Avro 转换器:docker run -it --rm --name connect \ --link kafka:kafka \ --link mysql:mysql \ --link apicurio:apicurio \ -e ENABLE_APICURIO_CONVERTERS=true \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \ -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \ -e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \ -p 8083:8083 quay.io/debezium/connect:3.3
Confluent Schema Registry
Confluent 提供了一个可选的 schema registry 实现。
Confluent Schema Registry 部署概览
有关安装独立 Confluent Schema Registry 的信息,请参阅 Confluent Platform 部署文档。
或者,您可以将独立 Confluent Schema Registry 安装为容器。
使用 Debezium 容器部署 Confluent Schema Registry
从 Debezium 2.0.0 开始,Debezium 容器不再包含 Confluent Schema Registry 支持。要为 Debezium 容器启用 Confluent Schema Registry,请将以下 Confluent Avro 转换器 JAR 文件安装到 Connect 插件目录中:
-
kafka-connect-avro-converter -
kafka-connect-avro-data -
kafka-avro-serializer -
kafka-schema-serializer -
kafka-schema-converter -
kafka-schema-registry-client -
common-config -
common-utils
您可以从 Confluent Maven 仓库 下载上述文件。
还有一些其他 JAR 文件需要放在 Connect 插件目录中:
-
avro -
commons-compress -
failureaccess -
guava -
minimal-json -
re2j -
slf4j-api -
snakeyaml -
swagger-annotations -
jackson-databind -
jackson-core -
jackson-annotations -
jackson-dataformat-csv -
logredactor -
logredactor-metrics
您可以从 Maven 仓库 下载上述文件。
配置略有不同。
-
在您的 Debezium 连接器配置中,指定以下属性:
key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=https://:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=https://:8081 -
部署一个 Confluent Schema Registry 实例。
docker run -it --rm --name schema-registry \ --link kafka \ -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \ -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \ -p 8181:8181 confluentinc/cp-schema-registry -
运行一个配置为使用 Avro 的 Kafka Connect 镜像。
docker run -it --rm --name connect \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ -e GROUP_ID=1 \ -e CONFIG_STORAGE_TOPIC=my_connect_configs \ -e OFFSET_STORAGE_TOPIC=my_connect_offsets \ -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \ -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \ -p 8083:8083 quay.io/debezium/connect:3.3 -
运行一个控制台消费者,该消费者从
db.myschema.mytable主题读取新的 Avro 消息并解码为 JSON。docker run -it --rm --name avro-consumer \ --link kafka:kafka \ --link mysql:mysql \ --link schema-registry:schema-registry \ quay.io/debezium/connect:3.3 \ /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --property print.key=true \ --formatter io.confluent.kafka.formatter.AvroMessageFormatter \ --property schema.registry.url=http://schema-registry:8081 \ --topic db.myschema.mytable
命名
如 Avro 文档所述,名称必须遵循以下规则:
-
以
[A-Za-z_]开头 -
之后只包含
[A-Za-z0-9_]字符
Debezium 使用列名作为相应 Avro 字段的基础。如果列名不符合 Avro 命名规则,可能会导致序列化问题。每个 Debezium 连接器都提供了一个配置属性 field.name.adjustment.mode,如果您有不符合 Avro 命名规则的列,可以将其设置为 avro。将 field.name.adjustment.mode 设置为 avro 允许序列化不符合要求的字段,而无需实际修改 schema。
获取更多信息
这篇博文 来自 Debezium 博客,描述了序列化器、转换器和其他组件的概念,并讨论了使用 Avro 的优势。一些 Kafka Connect 转换器的细节自该博文发布以来已略有变化。
有关使用 Avro 作为 Debezium 变更数据事件消息格式的完整示例,请参阅 MySQL 和 Avro 消息格式。