OpenLineage 集成

Debezium 提供与 OpenLineage 的内置集成,可自动跟踪变更数据捕获 (CDC) 操作的数据沿袭。OpenLineage 集成为您提供了对数据管道中使用的这些数据流和转换的全面可见性。

关于数据沿袭和 OpenLineage

数据沿袭跟踪数据在各种系统、转换和进程中的流动。这些信息可让您了解数据的来源、其移动方式以及数据管道中存在的依赖关系。数据沿袭的洞察对于以下活动至关重要:

  • 数据治理和合规性

  • 进行更改时的影响分析

  • 调试数据质量问题

  • 理解数据依赖关系

OpenLineage 是一个开放标准的数据沿袭,它提供了一种统一的方式来跨多个数据系统收集和跟踪沿袭元数据。该规范定义了一个通用模型来描述数据集、作业和运行,从而简化了在异构数据基础设施中构建全面的沿袭图的过程。

有关更多信息,请参阅 OpenLineage 网站文档

Debezium 如何与 OpenLineage 集成

为了与 OpenLineage 集成,Debezium 将其生命周期中的事件映射到 OpenLineage 数据模型中的工件。

OpenLineage 作业映射

Debezium 连接器被映射到一个 OpenLineage **作业**,该作业包含以下元素

名称

作业的名称继承自 Debezium 的 topic.prefix.<taskId>

命名空间

如果指定了 openlineage.integration.job.namespace,则从该属性继承;否则,默认为 topic.prefix 的值。

Debezium 连接器版本
完整的连接器配置
作业元数据

描述、标签和所有者。

源连接器的数据集映射

以下数据集映射是可能的

输入数据集

表示 Debezium 配置为捕获更改的数据库表。OpenLineage 集成会根据连接器配置自动创建输入数据集。在创建数据集映射时,集成会遵循以下原则:

  • 连接器监视的每个表都将成为一个输入数据集。

  • 每个数据集都捕获相应源表的架构信息,包括每列的名称和数据类型。

  • 源表中的 DDL 更改会在数据集中动态反映出来。

输出数据集

表示在应用 OpenLineage 单条消息转换 (SMT) 后产生的 Kafka 主题。输出数据集映射的创建遵循以下原则:

  • 连接器生成的每个 Kafka 主题都成为一个输出数据集。

  • 输出数据集捕获完整的 CDC 事件结构,包括元数据字段。

  • 数据集的名称基于连接器的 topic prefix 配置。

接收连接器的数据集映射

接收连接器尚不支持。

运行事件

当您将 Debezium 与 OpenLineage 集成时,连接器会发出事件来报告状态更改。连接器会在以下状态更改后发出 OpenLineage 运行事件:

START

报告连接器初始化。

RUNNING

在正常流式操作期间和处理单个表期间定期发出。这些定期事件确保了长时间运行的流式 CDC 操作的连续血缘跟踪。

COMPLETE

报告连接器已正常关闭。

FAIL

报告连接器遇到错误。

所需依赖项

OpenLineage 集成需要多个 JAR 文件,它们捆绑在 debezium-openlineage-core 存档中。

在使用 Debezium 和 OpenLineage 之前,请完成以下步骤以获取所需的依赖项:

  1. 下载 OpenLineage 核心存档

  2. 将存档的内容提取到 Kafka Connect 环境中的 Debezium 插件目录。

配置集成

要启用集成,您必须配置 Debezium 连接器和 OpenLineage 客户端。

使用基本的 OpenLineage 配置来配置连接器

要启用 Debezium 与 OpenLineage 的集成,请在连接器配置中添加属性,如下例所示:

# Enable OpenLineage integration
openlineage.integration.enabled=true

# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml

# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer

配置 OpenLineage 客户端

创建一个 openlineage.yml 文件来配置 OpenLineage 客户端。请参考以下示例:

transport:
  type: http
  url: http://your-openlineage-server:5000
  endpoint: /api/v1/lineage
  auth:
    type: api_key
    api_key: your-api-key

# Alternative: Console transport for testing
# transport:
#   type: console

有关详细的 OpenLineage 客户端配置选项,请参阅 OpenLineage 客户端文档

Debezium OpenLineage 配置属性

属性 描述 必需 Default (默认值)

openlineage.integration.enabled

启用和禁用 OpenLineage 集成。

false

openlineage.integration.config.file.path

OpenLineage YAML 配置文件路径。

无默认值

openlineage.integration.job.namespace

作业使用的命名空间。

来自 topic.prefix 的值

topic.prefix 的值

openlineage.integration.job.description

人类可读的作业描述

无默认值

openlineage.integration.job.tags

逗号分隔的键值标签列表。

无默认值

openlineage.integration.job.owners

逗号分隔的姓名-角色所有者条目列表。

无默认值

示例:标签列表格式

将标签指定为逗号分隔的键值对列表,如下例所示:

openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
示例:所有者列表格式

将所有者指定为逗号分隔的姓名-角色对列表,如下例所示:

openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner

源连接器输出数据集沿袭

要捕获输出数据集沿袭(Kafka 主题),请将 Debezium 配置为使用 OpenLineage 单条消息转换 (SMT)

# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage

# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092

SMT 捕获 Debezium 写入 Kafka 主题的更改事件的详细架构信息。转换捕获的架构数据包括以下项:

  • 事件结构(之前、之后、源、事务元数据)

  • 字段类型和嵌套结构

  • 主题名称和命名空间

示例:启用 OpenLineage 集成的完整连接器配置

以下示例显示了一个可能的完整配置,用于启用 PostgreSQL 连接器与 OpenLineage 集成:

# Connector basics
name=products-cdc-connector
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=localhost
database.port=5432
database.user=debezium
database.password=debezium
database.dbname=inventory
topic.prefix=inventory

# Snapshot configuration
snapshot.mode=initial
slot.drop.on.stop=false

# OpenLineage integration
openlineage.integration.enabled=true
openlineage.integration.config.file.path=/opt/debezium/config/openlineage.yml
openlineage.integration.job.description=CDC connector for inventory database
openlineage.integration.job.tags=env=production,team=data-platform,database=postgresql
openlineage.integration.job.owners=Data Team=maintainer,Alice Johnson=Data Engineer

# For output lineage (optional)
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
schema.history.internal.kafka.bootstrap.servers=kafka:9092

# Standard Kafka Connect settings
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

沿袭事件

该集成会产生几种类型的 OpenLineage 事件:

运行事件
START

报告连接器初始化。

RUNNING

报告连接器正在正常运行并正在处理表。

COMPLETE

报告连接器已正常关闭。

FAIL

报告连接器遇到错误。

数据集信息

输入数据集代表源数据库表。命名空间遵循 OpenLineage 数据集命名规范

以下示例显示了 PostgreSQL 数据库中表的命名:

  • 命名空间:postgres://hostname:port

  • 名称:schema.table

  • 架构:来自源表的列名和类型

确切的命名空间格式取决于您的数据库系统,并遵循 OpenLineage 关于数据集命名的规范。

输出数据集代表应用 OpenLineage 转换后产生的 Kafka 主题。

输出数据集包含有关 Kafka 主题的以下信息:

命名空间

kafka://bootstrap-server:port

名称

topic-prefix.schema.table

架构

完整的 CDC 事件结构,包括元数据字段

监控和故障排除

验证集成

您可以执行几项任务来验证集成是否按预期工作。

过程
  1. 检查连接器日志中是否有引用 OpenLineage 的消息。

  2. 在您的 OpenLineage 后端中验证事件。这仅适用于您使用 HTTP 传输的情况。

  3. 请使用控制台传输进行测试,如下例所示:

    transport:
      type: console
常见问题
集成未正常工作
  • 验证 openlineage.integration.enabled 是否设置为 true

  • 检查连接器配置中指定的 OpenLineage 配置文件路径是否正确,并且 Debezium 是否可以访问目标文件。

  • 确保 OpenLineage 配置文件中的 YAML 有效。

  • 验证类路径中是否存在所有必需的 JAR 依赖项。

缺少输出数据集
  • 验证您是否已配置连接器以使用 OpenLineage 转换。

  • 检查您是否在连接器配置中设置了 schema.history.internal.kafka.bootstrap.servers 属性。

连接问题
  • 验证您在 OpenLineage 客户端配置中是否指定了正确的服务器 URL 和身份验证信息。

  • 检查 Debezium 和 OpenLineage 服务器之间的网络连接。

依赖项问题
  • 确保所有必需的 JAR 文件都存在且版本兼容。

  • 检查与现有依赖项的类路径冲突。

错误事件

当连接器失败时,请在 OpenLineage FAIL 事件中检查以下各项:

  • 错误消息

  • 堆栈跟踪

  • 用于调试的连接器配置