您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

OpenLineage 集成

关于数据血缘和 OpenLineage

数据血缘跟踪数据在不同系统、转换和进程中的流动。此信息可让您了解数据的来源、去向以及数据管道中存在的依赖关系。数据血缘的见解对于以下活动至关重要

  • 数据治理和合规性

  • 进行更改时的影响分析

  • 调试数据质量问题

  • 理解数据依赖关系

OpenLineage 是一个数据血缘的开放标准,它提供了一种统一的方式来跨多个数据系统收集和跟踪血缘元数据。该规范定义了描述数据集、作业和运行的通用模型,从而简化了跨异构数据基础设施构建全面血缘图的过程。

有关更多信息,请参阅 OpenLineage 网站文档

部署类型

Debezium OpenLineage 集成可用于以下部署类型

Kafka Connect

您将 Debezium 连接器作为 Kafka Connect 插件运行。

Debezium 服务器

您将 Debezium 作为独立服务器模式运行。

部署类型之间提供了相同的 OpenLineage 事件模型和功能。但需要不同的进程来配置集成和安装依赖项。

Debezium 如何与 OpenLineage 集成

为了与 OpenLineage 集成,Debezium 将其生命周期中的事件映射到 OpenLineage 数据模型中的工件。

OpenLineage 作业映射

Debezium 连接器被映射到一个 OpenLineage **作业**,该作业包含以下元素

名称

作业的名称继承自 Debezium 的 topic.prefix,并与任务 ID 结合(例如,inventory.0)。

命名空间

继承自 openlineage.integration.job.namespace(如果已指定);否则,默认为 topic.prefix 的值。

Debezium 连接器版本

正在运行并生成血缘事件的 Debezium 连接器的版本。

完整的连接器配置

所有连接器配置属性,可实现数据管道的完全可重现性和调试。

作业元数据

描述、标签和所有者。

源连接器的数据集映射

以下数据集映射是可能的

输入数据集

代表 Debezium 配置为从中捕获更改的数据库表。OpenLineage 集成会根据连接器配置自动创建输入数据集。集成在创建数据集映射时应用以下原则

  • 连接器监控的每个表都成为一个输入数据集。

  • 每个数据集都捕获相应源表的架构信息,包括每个列的名称和数据类型。

  • 源表中的 DDL 更改会在数据集中动态反映。

输出数据集

代表 CDC 事件写入的 Kafka 主题。对于 Kafka Connect 部署,当您应用 OpenLineage 单消息转换 (SMT) 时,会创建输出数据集。对于 Debezium Server 部署,输出数据集会自动捕获。

输出数据集映射的创建遵循以下原则

  • 连接器生成的每个 Kafka 主题都成为一个输出数据集。

  • 输出数据集捕获完整的 CDC 事件结构,包括元数据字段。

  • 数据集的名称基于连接器的 topic prefix 配置。

Sink 连接器的数据集映射

对于 Sink 连接器,数据流与源连接器相反。

输入数据集

代表 Sink 连接器从中读取的 Kafka 主题。这些主题通常包含来自 Debezium 源连接器的 CDC 事件。在定义输入数据集时,应用以下原则

  • Sink 连接器消耗的每个 Kafka 主题代表一个输入数据集。

  • 输入数据集指定 Kafka 主题的架构和元数据。

  • 命名空间格式遵循 kafka://bootstrap-server:port,其中 bootstrap server 通过 openlineage.integration.dataset.kafka.bootstrap.servers 属性指定。

输出数据集

代表 Sink 连接器写入数据的目标数据库或集合。

在定义输出数据集映射时,应用以下原则

  • 每个目标数据库表或集合代表一个输出数据集。

  • 输出数据集指定目标数据库的架构信息。

  • 命名空间格式取决于目标数据库系统。有关更多信息,请参阅数据集命名空间格式化

以下 Debezium Sink 连接器支持 Kafka Connect 中的 OpenLineage 集成

MongoDB Sink 连接器

将 CDC 事件写入 MongoDB 集合。

JDBC Sink 连接器

将 CDC 事件写入关系数据库表。

Debezium Server 目前仅支持与 Kafka Sink 的 OpenLineage 集成。MongoDB Sink 和 JDBC Sink 连接器仅在 Kafka Connect 部署中受支持。
运行事件

当您将 Debezium 与 OpenLineage 集成时,连接器会发出事件来报告状态更改。连接器在以下状态更改后发出 OpenLineage 运行事件

START

报告连接器初始化。

RUNNING

在正常流式操作期间和处理单个表期间定期发出。这些定期事件确保了长时间运行的流式 CDC 操作的连续血缘跟踪。

COMPLETE

报告连接器已正常关闭。

FAIL

报告连接器遇到错误。

所需依赖

OpenLineage 集成需要几个 JAR 文件,这些文件捆绑在 debezium-openlineage-core-libs 存档中。

Kafka Connect

在 Kafka Connect 中将 Debezium 与 OpenLineage 结合使用之前,请完成以下步骤以获取所需的依赖项

  1. 下载 OpenLineage 核心存档

  2. 将存档的内容提取到 Kafka Connect 环境中的 Debezium 插件目录中。

Debezium 服务器

在将 Debezium Server 与 OpenLineage 结合使用之前,请完成以下步骤以获取所需的依赖项

  1. 下载 OpenLineage 核心存档

  2. 提取存档的内容。

  3. 将所有 JAR 文件复制到 Debezium Server 安装中的 /debezium/lib 目录。

配置集成

要启用集成,您必须配置 Debezium 连接器和 OpenLineage 客户端。Kafka Connect 和 Debezium Server 部署之间的配置方法有所不同。

Kafka Connect 配置

要使 Debezium 在 Kafka Connect 中与 OpenLineage 集成,请将属性添加到连接器配置中,如以下示例所示

# Enable OpenLineage integration
openlineage.integration.enabled=true

# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml

# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

Debezium Server 配置

要使 Debezium Server 与 OpenLineage 集成,请将 OpenLineage 属性添加到 application.properties 文件中,如以下示例所示。OpenLineage 属性使用 debezium.source. 前缀

# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true

# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml

# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

配置 OpenLineage 客户端

创建一个 openlineage.yml 文件来配置 OpenLineage 客户端。openlineage.yml 配置文件在 Kafka Connect 和 Debezium Server 部署中都使用。请使用以下示例作为指南

transport:
  type: http
  url: http://your-openlineage-server:5000
  endpoint: /api/v1/lineage
  auth:
    type: api_key
    api_key: your-api-key

# Alternative: Console transport for testing
# transport:
#   type: console

有关详细的 OpenLineage 客户端配置选项,请参考OpenLineage 客户端文档

Debezium OpenLineage 配置属性

下表列出了两种部署类型的 OpenLineage 配置属性。

对于 Debezium Server,请在所有属性名称前添加 debezium.source. 前缀(例如,debezium.source.openlineage.integration.enabled)。
属性 (Kafka Connect) 描述 必需 Default (默认值)

openlineage.integration.enabled

启用和禁用 OpenLineage 集成。

false

openlineage.integration.config.file.path

OpenLineage YAML 配置文件路径。

无默认值

openlineage.integration.job.namespace

作业使用的命名空间。

topic.prefix 的值

openlineage.integration.job.description

人类可读的作业描述

无默认值

openlineage.integration.job.tags

逗号分隔的键值标签列表。

无默认值

openlineage.integration.job.owners

逗号分隔的姓名-角色所有者条目列表。

无默认值

openlineage.integration.dataset.kafka.bootstrap.servers (仅限源连接器)

用于检索 Kafka 主题元数据的 Kafka bootstrap 服务器。对于源连接器,如果未指定值,则使用 schema.history.internal.kafka.bootstrap.servers 的值。

对于 Sink 连接器,您必须为此属性指定一个值。

是(适用于 Sink 连接器)

schema.history.internal.kafka.bootstrap.servers 的值 (仅限源连接器)

示例:标签列表格式

将标签指定为逗号分隔的键值对列表,如以下示例所示

openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
示例:所有者列表格式

将所有者指定为逗号分隔的姓名-角色对列表,如以下示例所示

openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner

输出数据集血缘

Debezium 可以捕获输出数据集血缘 (Kafka 主题) 来跟踪 CDC 事件的目标。Kafka Connect 和 Debezium Server 之间的配置方法有所不同。

Kafka Connect 输出数据集血缘

要捕获 Kafka Connect 中的输出数据集血缘,请配置 Debezium 使用 OpenLineage 单消息转换 (SMT)

# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage

# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092

SMT 捕获 Debezium 写入 Kafka 主题的更改事件的详细架构信息。转换捕获的架构数据包括以下项

  • 事件结构(before、after、source、transaction 元数据)

  • 字段类型和嵌套结构

  • 主题名称和命名空间

Debezium Server 输出数据集血缘

对于 Debezium Server 部署,当 OpenLineage 集成启用时,输出数据集血缘会自动捕获。无需额外的配置或转换,因为 Debezium Server 完全控制输出记录。

完整配置示例

以下示例显示了在 Kafka Connect 和 Debezium Server 中启用 OpenLineage 集成的完整配置。

Kafka Connect 完整配置示例

以下示例显示了启用 PostgreSQL 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置

{
  "name": "inventory-connector-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "topic.prefix": "inventory",
    "snapshot.mode": "initial",
    "slot.name": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "CDC connector for inventory database",
    "openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "transforms": "openlineage",
    "transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
  }
}

Debezium Server 完整配置示例

以下示例显示了一个完整的 application.properties 配置,用于在具有 Kafka Sink 的 Debezium Server 中启用 PostgreSQL 连接器以与 OpenLineage 集成

# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092

# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory

# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist

# Logging configuration (optional)
quarkus.log.console.json=false

MongoDB Sink 连接器配置示例

以下示例显示了启用 MongoDB Sink 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置

{
  "name": "mongodb-sink",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
    "tasks.max": "1",
    "mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
    "topics": "inventory.inventory.products",
    "sink.database": "inventory2",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for MongoDB",
    "openlineage.integration.job.tags": "env=prod,team=cdc",
    "openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}
对于 Sink 连接器,需要 openlineage.integration.dataset.kafka.bootstrap.servers 属性来从 Kafka 主题检索输入数据集元数据。与源连接器不同,Sink 连接器无法通过 Kafka Connect 框架直接访问 Kafka 主题元数据,必须显式连接以检索架构信息。

JDBC Sink 连接器配置示例

以下示例显示了启用 JDBC Sink 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置

{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory",
    "connection.username": "postgres",
    "connection.password": "postgres",
    "topics": "inventory.inventory.customers",
    "insert.mode": "upsert",
    "primary.key.mode": "record_key",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "Sink connector for JDBC",
    "openlineage.integration.job.tags": "env=prod,team=data-engineering",
    "openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
    "openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
  }
}

数据集命名空间格式化

Debezium 根据 OpenLineage 数据集命名规范格式化数据集命名空间。

输入数据集命名空间

输入数据集命名空间标识源数据库,并遵循每种数据库系统的特定格式。

示例:PostgreSQL 输入数据集(用于源连接器)
  • 命名空间: postgres://hostname:port

  • 名称: schema.table

  • 架构:源表的列名和类型

示例:Kafka 输入数据集(用于 Sink 连接器)
  • 命名空间: kafka://kafka-broker:9092

  • 名称: inventory.inventory.products

  • 架构:来自源连接器的 CDC 事件结构

确切的命名空间格式取决于您的数据库系统,并遵循 OpenLineage 的数据集命名规范。

源连接器的输出数据集命名空间

输出数据集命名空间标识写入 CDC 事件的 Kafka 主题。

示例:Kafka 输出数据集(用于源连接器)
  • 命名空间: kafka://bootstrap-server:port

  • 名称: topic-prefix.schema.table

  • 架构:完整的 CDC 事件结构,包括元数据字段

Sink 连接器的输出数据集命名空间

输出数据集命名空间标识 Sink 连接器写入数据的目标数据库。

示例:MongoDB 输出数据集
  • 命名空间: mongodb://mongodb-host:27017

  • 名称: database.collection

  • 架构:目标集合架构

示例:JDBC 输出数据集(PostgreSQL)
  • 命名空间: postgres://postgres-host:5432

  • 名称: schema.table

  • 架构:目标表架构

监控和故障排除

验证集成

要验证 OpenLineage 集成是否正常工作,请完成以下步骤

过程
  1. 检查连接器日志中与 OpenLineage 相关的消息。

  2. 如果配置了 HTTP 传输,请验证事件是否出现在您的 OpenLineage 后端。

  3. 为了测试目的,您可以配置控制台传输,直接在日志中查看事件,如以下示例所示

    transport:
      type: console
常见问题
集成不工作
  • 验证 openlineage.integration.enabled 是否设置为 true

  • 检查连接器配置中指定的 OpenLineage 配置文件路径是否正确,并且 Debezium 可以访问目标文件。

  • 确保 OpenLineage 配置文件中的 YAML 有效。

  • 验证类路径中是否存在所有必需的 JAR 依赖项。

缺少输出数据集
  • 验证您是否已配置连接器使用 OpenLineage 转换。

  • 检查您是否在连接器配置中设置了 schema.history.internal.kafka.bootstrap.servers 属性。

连接问题
  • 验证在 OpenLineage 客户端配置中是否指定了正确的服务器 URL 和身份验证信息。

  • 检查 Debezium 和 OpenLineage 服务器之间的网络连接。

依赖项问题
  • 确保所有必需的 JAR 文件都存在并且版本兼容。

  • 检查与现有依赖项的类路径冲突。

Sink 连接器缺少输入数据集
  • 验证是否已配置 openlineage.integration.dataset.kafka.bootstrap.servers 属性。

  • 验证连接器是否可以访问 Kafka bootstrap 服务器。

  • 验证 topics 配置中指定的 Kafka 主题是否存在,并且连接器可以访问它们。

错误事件

当连接器失败时,请在 OpenLineage FAIL 事件中检查以下项

  • 错误消息

  • 堆栈跟踪

  • 用于调试的连接器配置