OpenLineage 集成
Debezium 提供与 OpenLineage 的内置集成,可自动跟踪变更数据捕获 (CDC) 操作的数据血缘。OpenLineage 集成可为您提供对数据管道中使用的流程和转换的全面可见性。
关于数据血缘和 OpenLineage
数据血缘跟踪数据在不同系统、转换和进程中的流动。此信息可让您了解数据的来源、去向以及数据管道中存在的依赖关系。数据血缘的见解对于以下活动至关重要
-
数据治理和合规性
-
进行更改时的影响分析
-
调试数据质量问题
-
理解数据依赖关系
OpenLineage 是一个数据血缘的开放标准,它提供了一种统一的方式来跨多个数据系统收集和跟踪血缘元数据。该规范定义了描述数据集、作业和运行的通用模型,从而简化了跨异构数据基础设施构建全面血缘图的过程。
部署类型
Debezium OpenLineage 集成可用于以下部署类型
- Kafka Connect
-
您将 Debezium 连接器作为 Kafka Connect 插件运行。
- Debezium 服务器
-
您将 Debezium 作为独立服务器模式运行。
部署类型之间提供了相同的 OpenLineage 事件模型和功能。但需要不同的进程来配置集成和安装依赖项。
Debezium 如何与 OpenLineage 集成
为了与 OpenLineage 集成,Debezium 将其生命周期中的事件映射到 OpenLineage 数据模型中的工件。
Debezium 连接器被映射到一个 OpenLineage **作业**,该作业包含以下元素
- 名称
-
作业的名称继承自 Debezium 的
topic.prefix,并与任务 ID 结合(例如,inventory.0)。 - 命名空间
-
继承自
openlineage.integration.job.namespace(如果已指定);否则,默认为topic.prefix的值。 - Debezium 连接器版本
-
正在运行并生成血缘事件的 Debezium 连接器的版本。
- 完整的连接器配置
-
所有连接器配置属性,可实现数据管道的完全可重现性和调试。
- 作业元数据
-
描述、标签和所有者。
以下数据集映射是可能的
- 输入数据集
-
代表 Debezium 配置为从中捕获更改的数据库表。OpenLineage 集成会根据连接器配置自动创建输入数据集。集成在创建数据集映射时应用以下原则
-
连接器监控的每个表都成为一个输入数据集。
-
每个数据集都捕获相应源表的架构信息,包括每个列的名称和数据类型。
-
源表中的 DDL 更改会在数据集中动态反映。
-
- 输出数据集
-
代表 CDC 事件写入的 Kafka 主题。对于 Kafka Connect 部署,当您应用 OpenLineage 单消息转换 (SMT) 时,会创建输出数据集。对于 Debezium Server 部署,输出数据集会自动捕获。
输出数据集映射的创建遵循以下原则
-
连接器生成的每个 Kafka 主题都成为一个输出数据集。
-
输出数据集捕获完整的 CDC 事件结构,包括元数据字段。
-
数据集的名称基于连接器的 topic prefix 配置。
-
对于 Sink 连接器,数据流与源连接器相反。
- 输入数据集
-
代表 Sink 连接器从中读取的 Kafka 主题。这些主题通常包含来自 Debezium 源连接器的 CDC 事件。在定义输入数据集时,应用以下原则
-
Sink 连接器消耗的每个 Kafka 主题代表一个输入数据集。
-
输入数据集指定 Kafka 主题的架构和元数据。
-
命名空间格式遵循
kafka://bootstrap-server:port,其中 bootstrap server 通过openlineage.integration.dataset.kafka.bootstrap.servers属性指定。
-
- 输出数据集
-
代表 Sink 连接器写入数据的目标数据库或集合。
在定义输出数据集映射时,应用以下原则
-
每个目标数据库表或集合代表一个输出数据集。
-
输出数据集指定目标数据库的架构信息。
-
命名空间格式取决于目标数据库系统。有关更多信息,请参阅数据集命名空间格式化。
-
以下 Debezium Sink 连接器支持 Kafka Connect 中的 OpenLineage 集成
- MongoDB Sink 连接器
-
将 CDC 事件写入 MongoDB 集合。
- JDBC Sink 连接器
-
将 CDC 事件写入关系数据库表。
| Debezium Server 目前仅支持与 Kafka Sink 的 OpenLineage 集成。MongoDB Sink 和 JDBC Sink 连接器仅在 Kafka Connect 部署中受支持。 |
当您将 Debezium 与 OpenLineage 集成时,连接器会发出事件来报告状态更改。连接器在以下状态更改后发出 OpenLineage 运行事件
- START
-
报告连接器初始化。
- RUNNING
-
在正常流式操作期间和处理单个表期间定期发出。这些定期事件确保了长时间运行的流式 CDC 操作的连续血缘跟踪。
- COMPLETE
-
报告连接器已正常关闭。
- FAIL
-
报告连接器遇到错误。
所需依赖
OpenLineage 集成需要几个 JAR 文件,这些文件捆绑在 debezium-openlineage-core-libs 存档中。
Kafka Connect
在 Kafka Connect 中将 Debezium 与 OpenLineage 结合使用之前,请完成以下步骤以获取所需的依赖项
-
下载 OpenLineage 核心存档。
-
将存档的内容提取到 Kafka Connect 环境中的 Debezium 插件目录中。
Debezium 服务器
在将 Debezium Server 与 OpenLineage 结合使用之前,请完成以下步骤以获取所需的依赖项
-
下载 OpenLineage 核心存档。
-
提取存档的内容。
-
将所有 JAR 文件复制到 Debezium Server 安装中的
/debezium/lib目录。
配置集成
要启用集成,您必须配置 Debezium 连接器和 OpenLineage 客户端。Kafka Connect 和 Debezium Server 部署之间的配置方法有所不同。
Kafka Connect 配置
要使 Debezium 在 Kafka Connect 中与 OpenLineage 集成,请将属性添加到连接器配置中,如以下示例所示
# Enable OpenLineage integration
openlineage.integration.enabled=true
# Path to OpenLineage configuration file
openlineage.integration.config.file.path=/path/to/openlineage.yml
# Job metadata (optional but recommended)
openlineage.integration.job.namespace=myNamespace
openlineage.integration.job.description=CDC connector for products database
openlineage.integration.job.tags=env=prod,team=data-engineering
openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
Debezium Server 配置
要使 Debezium Server 与 OpenLineage 集成,请将 OpenLineage 属性添加到 application.properties 文件中,如以下示例所示。OpenLineage 属性使用 debezium.source. 前缀
# Enable OpenLineage integration
debezium.source.openlineage.integration.enabled=true
# Path to OpenLineage configuration file
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
# Job metadata (optional but recommended)
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=data-engineering
debezium.source.openlineage.integration.job.owners=Alice Smith=maintainer,Bob Johnson=Data Engineer
配置 OpenLineage 客户端
创建一个 openlineage.yml 文件来配置 OpenLineage 客户端。openlineage.yml 配置文件在 Kafka Connect 和 Debezium Server 部署中都使用。请使用以下示例作为指南
transport:
type: http
url: http://your-openlineage-server:5000
endpoint: /api/v1/lineage
auth:
type: api_key
api_key: your-api-key
# Alternative: Console transport for testing
# transport:
# type: console
有关详细的 OpenLineage 客户端配置选项,请参考OpenLineage 客户端文档。
Debezium OpenLineage 配置属性
下表列出了两种部署类型的 OpenLineage 配置属性。
对于 Debezium Server,请在所有属性名称前添加 debezium.source. 前缀(例如,debezium.source.openlineage.integration.enabled)。 |
| 属性 (Kafka Connect) | 描述 | 必需 | Default (默认值) |
|---|---|---|---|
|
启用和禁用 OpenLineage 集成。 |
是 |
|
|
OpenLineage YAML 配置文件路径。 |
是 |
无默认值 |
|
作业使用的命名空间。 |
否 |
|
|
人类可读的作业描述 |
否 |
无默认值 |
|
逗号分隔的键值标签列表。 |
否 |
无默认值 |
|
逗号分隔的姓名-角色所有者条目列表。 |
否 |
无默认值 |
|
用于检索 Kafka 主题元数据的 Kafka bootstrap 服务器。对于源连接器,如果未指定值,则使用 对于 Sink 连接器,您必须为此属性指定一个值。 |
是(适用于 Sink 连接器) |
|
将标签指定为逗号分隔的键值对列表,如以下示例所示
openlineage.integration.job.tags=environment=production,team=data-platform,criticality=high
将所有者指定为逗号分隔的姓名-角色对列表,如以下示例所示
openlineage.integration.job.owners=John Doe=maintainer,Jane Smith=Data Engineer,Team Lead=owner
输出数据集血缘
Debezium 可以捕获输出数据集血缘 (Kafka 主题) 来跟踪 CDC 事件的目标。Kafka Connect 和 Debezium Server 之间的配置方法有所不同。
Kafka Connect 输出数据集血缘
要捕获 Kafka Connect 中的输出数据集血缘,请配置 Debezium 使用 OpenLineage 单消息转换 (SMT)
# Add OpenLineage transform
transforms=openlineage
transforms.openlineage.type=io.debezium.transforms.openlineage.OpenLineage
# Required: Configure schema history with Kafka bootstrap servers
schema.history.internal.kafka.bootstrap.servers=your-kafka:9092
SMT 捕获 Debezium 写入 Kafka 主题的更改事件的详细架构信息。转换捕获的架构数据包括以下项
-
事件结构(before、after、source、transaction 元数据)
-
字段类型和嵌套结构
-
主题名称和命名空间
完整配置示例
以下示例显示了在 Kafka Connect 和 Debezium Server 中启用 OpenLineage 集成的完整配置。
Kafka Connect 完整配置示例
以下示例显示了启用 PostgreSQL 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置
{
"name": "inventory-connector-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"topic.prefix": "inventory",
"snapshot.mode": "initial",
"slot.name": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "CDC connector for inventory database",
"openlineage.integration.job.tags": "env=production,team=data-platform,database=postgresql",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"transforms": "openlineage",
"transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
}
}
Debezium Server 完整配置示例
以下示例显示了一个完整的 application.properties 配置,用于在具有 Kafka Sink 的 Debezium Server 中启用 PostgreSQL 连接器以与 OpenLineage 集成
# Sink configuration (Kafka)
debezium.sink.type=kafka
debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
debezium.sink.kafka.producer.bootstrap.servers=kafka:9092
# Source connector configuration
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage=org.apache.kafka.connect.storage.MemoryOffsetBackingStore
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.topic.prefix=tutorial
debezium.source.schema.include.list=inventory
# OpenLineage integration
debezium.source.openlineage.integration.enabled=true
debezium.source.openlineage.integration.config.file.path=config/openlineage.yml
debezium.source.openlineage.integration.job.description=CDC connector for products database
debezium.source.openlineage.integration.job.tags=env=prod,team=cdc
debezium.source.openlineage.integration.job.owners=Mario=maintainer,John Doe=Data scientist
# Logging configuration (optional)
quarkus.log.console.json=false
MongoDB Sink 连接器配置示例
以下示例显示了启用 MongoDB Sink 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置
{
"name": "mongodb-sink",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
"topics": "inventory.inventory.products",
"sink.database": "inventory2",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for MongoDB",
"openlineage.integration.job.tags": "env=prod,team=cdc",
"openlineage.integration.job.owners": "Mario=maintainer,John Doe=Data scientist",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
对于 Sink 连接器,需要 openlineage.integration.dataset.kafka.bootstrap.servers 属性来从 Kafka 主题检索输入数据集元数据。与源连接器不同,Sink 连接器无法通过 Kafka Connect 框架直接访问 Kafka 主题元数据,必须显式连接以检索架构信息。 |
JDBC Sink 连接器配置示例
以下示例显示了启用 JDBC Sink 连接器以在 Kafka Connect 中与 OpenLineage 集成的完整配置
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres:5432/inventory",
"connection.username": "postgres",
"connection.password": "postgres",
"topics": "inventory.inventory.customers",
"insert.mode": "upsert",
"primary.key.mode": "record_key",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "Sink connector for JDBC",
"openlineage.integration.job.tags": "env=prod,team=data-engineering",
"openlineage.integration.job.owners": "Data Team=maintainer,Alice Johnson=Data Engineer",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
数据集命名空间格式化
Debezium 根据 OpenLineage 数据集命名规范格式化数据集命名空间。
输入数据集命名空间
输入数据集命名空间标识源数据库,并遵循每种数据库系统的特定格式。
-
命名空间:
postgres://hostname:port -
名称:
schema.table -
架构:源表的列名和类型
-
命名空间:
kafka://kafka-broker:9092 -
名称:
inventory.inventory.products -
架构:来自源连接器的 CDC 事件结构
确切的命名空间格式取决于您的数据库系统,并遵循 OpenLineage 的数据集命名规范。
监控和故障排除
要验证 OpenLineage 集成是否正常工作,请完成以下步骤
-
检查连接器日志中与 OpenLineage 相关的消息。
-
如果配置了 HTTP 传输,请验证事件是否出现在您的 OpenLineage 后端。
-
为了测试目的,您可以配置控制台传输,直接在日志中查看事件,如以下示例所示
transport: type: console
- 集成不工作
-
-
验证
openlineage.integration.enabled是否设置为true。 -
检查连接器配置中指定的 OpenLineage 配置文件路径是否正确,并且 Debezium 可以访问目标文件。
-
确保 OpenLineage 配置文件中的 YAML 有效。
-
验证类路径中是否存在所有必需的 JAR 依赖项。
-
- 缺少输出数据集
-
-
验证您是否已配置连接器使用 OpenLineage 转换。
-
检查您是否在连接器配置中设置了
schema.history.internal.kafka.bootstrap.servers属性。
-
- 连接问题
-
-
验证在 OpenLineage 客户端配置中是否指定了正确的服务器 URL 和身份验证信息。
-
检查 Debezium 和 OpenLineage 服务器之间的网络连接。
-
- 依赖项问题
-
-
确保所有必需的 JAR 文件都存在并且版本兼容。
-
检查与现有依赖项的类路径冲突。
-
- Sink 连接器缺少输入数据集
-
-
验证是否已配置
openlineage.integration.dataset.kafka.bootstrap.servers属性。 -
验证连接器是否可以访问 Kafka bootstrap 服务器。
-
验证
topics配置中指定的 Kafka 主题是否存在,并且连接器可以访问它们。
-
当连接器失败时,请在 OpenLineage FAIL 事件中检查以下项
-
错误消息
-
堆栈跟踪
-
用于调试的连接器配置