您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Avro 序列化

Debezium 连接器在 Kafka Connect 框架中工作,通过生成变更事件记录来捕获数据库中的每个行级更改。对于每个变更事件记录,Debezium 连接器会完成以下操作:

  1. 应用配置的转换。

  2. 使用配置的 Kafka Connect 转换器将记录的键和值序列化为二进制格式。

  3. 将记录写入正确的 Kafka 主题。

您可以为每个单独的 Debezium 连接器实例指定转换器。Kafka Connect 提供了一个 JSON 转换器,可将记录的键和值序列化为 JSON 文档。默认行为是 JSON 转换器包含消息的模式,这使得每个记录都非常冗长。 Debezium 教程 展示了同时包含有效负载和模式时的记录外观。如果您希望记录使用 JSON 进行序列化,请考虑将以下连接器配置属性设置为 false

  • key.converter.schemas.enable

  • value.converter.schemas.enable

将这些属性设置为 false 会从每个记录中排除冗长的模式信息。

或者,您可以使用 Apache Avro 进行记录的键和值的序列化。Avro 二进制格式紧凑高效。Avro 模式可确保每条记录具有正确的结构。Avro 的模式演进机制允许模式演进。这对于 Debezium 连接器至关重要,因为它们动态生成每条记录的模式以匹配已更改数据库表的结构。随着时间的推移,写入同一 Kafka 主题的更改事件记录可能具有相同模式的不同版本。Avro 序列化使得更改事件记录的消费者更容易适应不断变化的记录模式。

要使用 Apache Avro 序列化,您必须部署一个模式注册表来管理 Avro 消息模式及其版本。可用的选项包括 Apicurio API 和 Schema Registry 以及 Confluent Schema Registry。此处将介绍两者。

Apicurio API 和 Schema Registry

关于 Apicurio API 和 Schema Registry

开源项目 Apicurio Registry 提供了与 Avro 配合使用的几个组件:

  • 一个可以在 Debezium 连接器配置中指定的 Avro 转换器。此转换器将 Kafka Connect 模式映射到 Avro 模式。然后,转换器使用 Avro 模式将记录的键和值序列化为 Avro 的紧凑二进制格式。

  • 一个跟踪以下内容的 API 和模式注册表:

    • Kafka 主题中使用的 Avro 模式。

    • Avro 转换器发送生成的 Avro 模式的位置。

    由于 Avro 模式存储在此注册表中,因此每条记录只需要包含一个非常小的模式标识符。这使得每条记录的体积更小。对于像 Kafka 这样的 I/O 密集型系统,这意味着生产者和消费者的总吞吐量更高。

  • 适用于 Kafka 生产者和消费者的 Avro Serdes(序列化器和反序列化器)。您编写的用于消费更改事件记录的 Kafka 消费者应用程序可以使用 Avro Serdes 来反序列化更改事件记录。

  • Apicurio 的Confluent 兼容模式(通过设置 as-confluent: true 启用)允许 Apicurio Registry 使用与 Confluent Schema Registry 相同的线上传输格式(包括魔术字节和 4 字节模式 ID)来序列化 Kafka 消息。这实现了与 Confluent 客户端和工具的完全互操作性,使 Apicurio 成为一个即插即用式替代品,而无需更改生产者或消费者。

要将 Apicurio Registry 与 Debezium 一起使用,请将 Apicurio Registry 转换器及其依赖项添加到您用于运行 Debezium 连接器的 Kafka Connect 容器镜像中。

Apicurio Registry 项目还提供了一个 JSON 转换器。此转换器结合了消息量少和人类可读的 JSON 的优点。消息本身不包含模式信息,只包含一个模式 ID。

要使用 Apicurio Registry 提供的转换器,您需要提供 apicurio.registry.url

Apicurio Registry 部署概述

要部署使用 Avro 序列化的 Debezium 连接器,您必须完成三个主要任务:

  1. 部署一个 Apicurio API 和 Schema Registry 实例。

  2. 将 Avro 转换器从 安装包安装到插件目录中。如果您正在使用 Debezium Connect 容器镜像,则无需安装该包。有关更多信息,请参阅 使用 Debezium 容器部署 Apicurio Registry

  3. 通过设置以下配置属性,将 Debezium 连接器实例配置为使用 Avro 序列化:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    key.converter.apicurio.registry.auto-register=true
    key.converter.apicurio.registry.find-latest=true
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    value.converter.apicurio.registry.auto-register=true
    value.converter.apicurio.registry.find-latest=true
    schema.name.adjustment.mode=avro

    或者,您可以使用Confluent 兼容模式,如下所示:

    key.converter=io.apicurio.registry.utils.converter.AvroConverter
    key.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    key.converter.apicurio.registry.auto-register=true
    key.converter.apicurio.registry.find-latest=true
    key.converter.schemas.enable": "false"
    key.converter.apicurio.registry.headers.enabled": "false"
    key.converter.apicurio.registry.as-confluent": "true"
    key.converter.apicurio.use-id: "contentId"
    value.converter=io.apicurio.registry.utils.converter.AvroConverter
    value.converter.apicurio.registry.url=http://apicurio:8080/apis/registry/v2
    value.converter.apicurio.registry.auto-register=true
    value.converter.apicurio.registry.find-latest=true
    value.converter.schemas.enable": "false"
    value.converter.apicurio.registry.headers.enabled": "false"
    value.converter.apicurio.registry.as-confluent": "true"
    value.converter.apicurio.use-id: "contentId"
    schema.name.adjustment.mode=avro

当使用 Confluent 兼容模式下的 Apicurio Registry 时,参数的配置方式略有不同,以便精确模仿 Confluent Schema Registry 的工作方式,包括线上传输格式和客户端期望。

  • converter.apicurio.registry.as-confluent: true:强制 Apicurio 使用与 Confluent 相同的结构(魔术字节 + 4 字节模式 ID)序列化 Kafka 消息。

  • converter.apicurio.use-id: true(或 globalId):确保消息中只写入一个数字模式 ID,这与 Confluent 消费者期望的一致。

  • converter.schemas.enable: false:防止在 Kafka 消息中嵌入完整的 Avro 模式,这是 Confluent 反序列化器不期望的(它们通过 ID 从注册表中获取模式)。

  • converter.apicurio.registry.headers.enabled: false:禁用 Confluent 客户端无法理解的 Apicurio 特定元数据头,保持消息格式整洁且兼容。

在内部,Kafka Connect 始终使用 JSON 键/值转换器来存储配置和偏移量。

使用 Debezium 容器部署 Apicurio Registry

在您的环境中,您可能希望使用提供的 Debezium 容器镜像来部署使用 Avro 序列化的 Debezium 连接器。请按照此处的过程进行操作。在此过程中,您将在 Debezium Kafka Connect 容器镜像上启用 Apicurio 转换器,并将 Debezium 连接器配置为使用 Avro 转换器。

先决条件
  • 您已安装 Docker 并拥有创建和管理容器的足够权限。

  • 您已下载要与 Avro 序列化一起部署的 Debezium 连接器插件。

过程
  1. 部署一个 Apicurio Registry 实例。

    以下示例使用了一个非生产环境的、内存中的 Apicurio Registry 实例:

    docker run -it --rm --name apicurio \
        -p 8080:8080 apicurio/apicurio-registry-mem:2.6.2.Final
  2. 运行 Kafka Connect 的 Debezium 容器镜像,通过设置环境变量 ENABLE_APICURIO_CONVERTERS=true 来配置它以提供 Avro 转换器,从而启用 Apicurio。

    docker run -it --rm --name connect \
        --link kafka:kafka \
        --link mysql:mysql \
        --link apicurio:apicurio \
        -e ENABLE_APICURIO_CONVERTERS=true \
        -e GROUP_ID=1 \
        -e CONFIG_STORAGE_TOPIC=my_connect_configs \
        -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
        -e KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
        -e VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
        -e CONNECT_KEY_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
        -e CONNECT_KEY_CONVERTER_APICURIO.REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
        -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
        -e CONNECT_KEY_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
        -e CONNECT_VALUE_CONVERTER=io.apicurio.registry.utils.converter.AvroConverter \
        -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_URL=http://apicurio:8080/apis/registry/v2 \
        -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_AUTO-REGISTER=true \
        -e CONNECT_VALUE_CONVERTER_APICURIO_REGISTRY_FIND-LATEST=true \
        -e CONNECT_SCHEMA_NAME_ADJUSTMENT_MODE=avro \
        -p 8083:8083 quay.io/debezium/connect:3.3

Confluent Schema Registry

Confluent 提供了一个替代的 模式注册表实现。

Confluent Schema Registry 部署概述

有关安装独立 Confluent Schema Registry 的信息,请参阅 Confluent Platform 部署文档

作为替代方案,您可以将独立 Confluent Schema Registry 安装为容器。

使用 Debezium 容器部署 Confluent Schema Registry

从 Debezium 2.0.0 开始,Debezium 容器中不再包含 Confluent Schema Registry 支持。要为 Debezium 容器启用 Confluent Schema Registry,请将以下 Confluent Avro 转换器 JAR 文件安装到 Connect 插件目录中:

  • kafka-connect-avro-converter

  • kafka-connect-avro-data

  • kafka-avro-serializer

  • kafka-schema-serializer

  • kafka-schema-converter

  • kafka-schema-registry-client

  • common-config

  • common-utils

您可以从 Confluent Maven 仓库下载上述文件。

还有一些其他 JAR 文件需要位于 Connect 插件目录中:

  • avro

  • commons-compress

  • failureaccess

  • guava

  • minimal-json

  • re2j

  • slf4j-api

  • snakeyaml

  • swagger-annotations

  • jackson-databind

  • jackson-core

  • jackson-annotations

  • jackson-dataformat-csv

  • logredactor

  • logredactor-metrics

您可以从 Maven 仓库下载上述文件。

配置略有不同。

  1. 在您的 Debezium 连接器配置中,指定以下属性:

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=https://:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=https://:8081
  2. 部署一个 Confluent Schema Registry 实例。

    docker run -it --rm --name schema-registry \
        --link kafka \
        -e SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
        -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
        -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
        -p 8181:8181 confluentinc/cp-schema-registry
  3. 运行一个配置为使用 Avro 的 Kafka Connect 镜像。

    docker run -it --rm --name connect \
        --link kafka:kafka \
        --link mysql:mysql \
        --link schema-registry:schema-registry \
        -e GROUP_ID=1 \
        -e CONFIG_STORAGE_TOPIC=my_connect_configs \
        -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
        -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
        -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
        -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
        -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
        -p 8083:8083 quay.io/debezium/connect:3.3
  4. 运行一个控制台消费者,该消费者从 db.myschema.mytable 主题读取新的 Avro 消息并将其解码为 JSON。

    docker run -it --rm --name avro-consumer \
        --link kafka:kafka \
        --link mysql:mysql \
        --link schema-registry:schema-registry \
        quay.io/debezium/connect:3.3 \
        /kafka/bin/kafka-console-consumer.sh \
          --bootstrap-server kafka:9092 \
          --property print.key=true \
          --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
          --property schema.registry.url=http://schema-registry:8081 \
          --topic db.myschema.mytable

命名

如 Avro 文档所述,名称必须遵守以下规则:

  • [A-Za-z_] 开头。

  • 随后只包含 [A-Za-z0-9_] 字符。

Debezium 使用列名作为相应 Avro 字段的基础。如果列名不符合 Avro 命名规则,这可能导致序列化过程中出现问题。每个 Debezium 连接器都提供了一个配置属性 field.name.adjustment.mode,如果您的列不符合 Avro 命名规则,您可以将其设置为 avro。将 field.name.adjustment.mode 设置为 avro 可以在不实际修改模式的情况下序列化不符合要求的字段。

获取更多信息

这篇文章来自 Debezium 博客,介绍了序列化器、转换器和其他组件的概念,并讨论了使用 Avro 的优势。一些 Kafka Connect 转换器的详细信息自发布以来已略有更改。

有关使用 Avro 作为 Debezium 更改数据事件消息格式的完整示例,请参阅 MySQL 和 Avro 消息格式