在复制用于分析的业务数据方面,变更数据捕获 (CDC) 是黄金标准。它提供了可伸缩性、近乎实时的性能,并捕获所有数据修改,确保您的分析数据集始终保持最新。Debezium 是该领域的领先工具,它可以连接到各种数据库,并以 JSON 和 Avro 等多种格式导出 CDC 事件,从而轻松与不同系统集成。
虽然 Debezium 本身是一个基于 Java 的项目,但数据工程领域越来越依赖 Python。这篇博文演示了如何利用 Debezium 在 Python 环境中使用 pydbzengine。我们将探讨如何使用这些技术来构建一个健壮且可扩展的 CDC 解决方案。
Python 驱动的 CDC 流水线
使用 Debezium 捕获变更数据并使用 DLT 加载到 DuckDB
这篇博文演示了如何使用 Debezium 从 PostgreSQL 数据库捕获变更数据,并使用数据加载工具 (DLT) 将其加载到 DuckDB 数据库中。所有这些都通过使用 pydbzengine 的 Python 数据处理流水线实现。
我们将逐步介绍代码,解释关键组件及其协同工作的方式。
了解组件
-
Debezium: 一个强大的开源变更数据捕获平台。它监控数据库事务日志并生成变更事件流,指示插入、更新和删除。
-
pydbzengine: 一个 Python 库,提供了一种方便的方式来与 Debezium 嵌入式引擎进行交互。 它简化了在 Python 应用程序中配置和运行 Debezium 的过程。
-
DLT: 一个多功能的数据加载工具,可简化将数据提取并加载到各种目标的过程。在此示例中,我们使用 DLT 将 Debezium 的变更事件加载到 DuckDB。
-
DuckDB: 一个可嵌入的分析数据库,提供高效的数据处理功能。它是本地开发和测试的绝佳选择。
-
Testcontainers: 一个用于启动临时服务实例(如数据库)以进行测试的库。我们在示例中将其用于管理 PostgreSQL 数据库。
代码解析
提供的代码演示了一个完整的 Python CDC 流水线,从使用 Debezium 捕获更改到使用 DLT 将其加载到 DuckDB。让我们分解关键部分
1. 设置环境
让我们首先设置必要环境,包括定义文件路径、清理之前的运行以及定义一个 `DbPostgresql` 辅助类,该类使用 Testcontainers 来管理 PostgreSQL 数据库,充当我们的操作数据源。
import os
from pathlib import Path
import dlt
import duckdb
from testcontainers.core.config import testcontainers_config
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.postgres import PostgresContainer
from pydbzengine import DebeziumJsonEngine, Properties
from pydbzengine.debeziumdlt import DltChangeHandler
from pydbzengine.helper import Utils
# set global variables
CURRENT_DIR = Path(__file__).parent
DUCKDB_FILE = CURRENT_DIR.joinpath("dbz_cdc_events_example.duckdb")
OFFSET_FILE = CURRENT_DIR.joinpath('postgresql-offsets.dat')
# cleanup
if OFFSET_FILE.exists():
os.remove(OFFSET_FILE)
if DUCKDB_FILE.exists():
os.remove(DUCKDB_FILE)
def wait_for_postgresql_to_start(self) -> None:
wait_for_logs(self, ".*database system is ready to accept connections.*")
wait_for_logs(self, ".*PostgreSQL init process complete.*")
class DbPostgresql:
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "postgres"
POSTGRES_DBNAME = "postgres"
POSTGRES_IMAGE = "debezium/example-postgres:3.0.0.Final"
POSTGRES_HOST = "localhost"
POSTGRES_PORT_DEFAULT = 5432
CONTAINER: PostgresContainer = (PostgresContainer(image=POSTGRES_IMAGE,
port=POSTGRES_PORT_DEFAULT,
username=POSTGRES_USER,
password=POSTGRES_PASSWORD,
dbname=POSTGRES_DBNAME,
)
.with_exposed_ports(POSTGRES_PORT_DEFAULT)
)
PostgresContainer._connect = wait_for_postgresql_to_start
def start(self):
testcontainers_config.ryuk_disabled = True
print("Starting Postgresql Db...")
self.CONTAINER.start()
def stop(self):
print("Stopping Postgresql Db...")
self.CONTAINER.stop()
def __exit__(self, exc_type, exc_value, traceback):
self.stop() 2. 配置 Debezium
我们通过创建 Java `Properties` 对象来配置 Debezium。此对象包含 Debezium 引擎的配置设置。它包括数据库连接详细信息、连接器类 (PostgresConnector)、偏移量存储和转换。`transforms` 属性用于解包 Debezium 消息,从而简化下游处理。
def debezium_engine_props(sourcedb: DbPostgresql):
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
props.setProperty("database.hostname", sourcedb.CONTAINER.get_container_host_ip())
props.setProperty("database.port",
sourcedb.CONTAINER.get_exposed_port(sourcedb.POSTGRES_PORT_DEFAULT))
props.setProperty("database.user", sourcedb.POSTGRES_USER)
props.setProperty("database.password", sourcedb.POSTGRES_PASSWORD)
props.setProperty("database.dbname", sourcedb.POSTGRES_DBNAME)
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
props.setProperty("offset.storage.file.filename", OFFSET_FILE.as_posix())
props.setProperty("max.batch.size", "5")
props.setProperty("poll.interval.ms", "10000")
props.setProperty("converter.schemas.enable", "false")
props.setProperty("offset.flush.interval.ms", "1000")
props.setProperty("database.server.name", "testc")
props.setProperty("database.server.id", "1234")
props.setProperty("topic.prefix", "testc")
props.setProperty("schema.whitelist", "inventory")
props.setProperty("database.whitelist", "inventory")
props.setProperty("table.whitelist", "inventory.*")
props.setProperty("replica.identity.autoset.values", "inventory.*:FULL")
# // debezium unwrap message
props.setProperty("transforms", "unwrap")
props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
props.setProperty("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
props.setProperty("transforms.unwrap.delete.handling.mode", "rewrite")
# props.setProperty("debezium.transforms.unwrap.drop.tombstones", "true")
return props 3. Change Handler 实现
`DltChangeHandler` 类(由 pydbzengine 库提供)充当 Debezium 和 DLT 之间的桥梁。它接收来自 Debezium 的变更数据捕获事件,然后利用 DLT 流水线将此数据高效地加载到您选择的目标(如 DuckDB)中。本质上,它是连接 Debezium 的实时变更数据流和 DLT 的数据加载功能的组件。您可以在 pydbzengine 存储库中找到完整实现。
同样,只需实现 `handleJsonBatch` 方法,即可实现自定义的消费逻辑。这允许您实现自定义处理逻辑并将数据消费到各种目标或服务。
from pydbzengine import BasePythonChangeHandler, ChangeEvent
class MyXYZChangeHandler(BasePythonChangeHandler):
def handleJsonBatch(self, records: List[ChangeEvent]):
# Process your data here!
for record in records:
# ... your processing logic ...
# Example: send data to another service, database, etc. 4. 运行 Debezium 引擎和 DLT 流水线
`main` 函数协调整个过程。它启动 PostgreSQL 容器,使用配置的属性和 `DltChangeHandler` 创建 Debezium 引擎,然后启动引擎。
def main():
# Start the PostgreSQL container that will serve as the replication source.
sourcedb = DbPostgresql()
sourcedb.start()
# Get Debezium engine configuration properties
props = debezium_engine_props(sourcedb=sourcedb)
# Create a dlt pipeline to consume the change events into DuckDB.
dlt_pipeline = dlt.pipeline(
pipeline_name="dbz_cdc_events_example",
destination="duckdb",
dataset_name="dbz_data"
)
handler = DltChangeHandler(dlt_pipeline=dlt_pipeline)
engine = DebeziumJsonEngine(properties=props, handler=handler)
# Run the Debezium engine asynchronously with a timeout. This allows the example
# to run for a limited time and then terminate automatically.
Utils.run_engine_async(engine=engine, timeout_sec=60)
# engine.run() # This would be used for synchronous execution (without timeout)
if __name__ == "__main__":
main() 5. 查询 DuckDB 数据库,结果
Debezium 引擎运行指定时间(示例中为 60 秒)后,我们可以连接到目标 (DuckDB) 数据库并显示加载的数据。
con = duckdb.connect(DUCKDB_FILE.as_posix())
result = con.sql("SHOW ALL TABLES").fetchall()
for r in result:
database, schema, table = r[:3] # Extract database, schema, and table names.
if schema == "dbz_data": # Only show data from the schema where Debezium loaded the data.
print(f"Data in table {table}:")
con.sql(f"select * from {database}.{schema}.{table} limit 5").show() # Display table data 已消费数据
┌────────────────────┬────────────────────────┬────────┬───────────────────────────────┬──────────────────────────────────────────────┐
│ load_id │ schema_name │ status │ inserted_at │ schema_version_hash │
│ varchar │ varchar │ int64 │ timestamp with time zone │ varchar │
├────────────────────┼────────────────────────┼────────┼───────────────────────────────┼──────────────────────────────────────────────┤
│ 1738405897.413279 │ debezium_source_events │ 0 │ 2025-02-01 11:31:38.086127+01 │ Q5UNIOd7gJ6ljH5qfKKcO7yWwPvNESKW+mVXJmx9geg= │
│ 1738405898.176148 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.381275+01 │ OyUXGP6PvFQuUTPnPdvESnsEqpFAxivJoP+l0G6l4+M= │
│ 1738405899.4865642 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.704015+01 │ jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8= │
│ 1738405899.775917 │ debezium_source_events │ 0 │ 2025-02-01 11:31:39.952311+01 │ jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8= │
│ 1738405900.0213661 │ debezium_source_events │ 0 │ 2025-02-01 11:31:40.223125+01 │ uMZY5n2NGPecXvVQIePLEg2nZQcAlkoWAXDLALKjWuQ= │
└────────────────────┴────────────────────────┴────────┴───────────────────────────────┴──────────────────────────────────────────────┘
Data in table _dlt_pipeline_state:
┌─────────┬────────────────┬──────────────────────┬──────────────────────┬──────────────────────┬────────────────────────────────────────────┬───────────────────┬────────────────┐
│ version │ engine_version │ pipeline_name │ state │ created_at │ version_hash │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ varchar │ varchar │ timestamp with tim… │ varchar │ varchar │ varchar │
├─────────┼────────────────┼──────────────────────┼──────────────────────┼──────────────────────┼────────────────────────────────────────────┼───────────────────┼────────────────┤
│ 1 │ 4 │ dbz_cdc_events_exa… │ eNp1j0FLw0AQhf/LXg… │ 2025-02-01 11:31:3… │ ZvlGi9hyfXjD2b0imkL9ZA7x3S1/YkmQK4QbA+Jw… │ 1738405897.413279 │ hNbs3TIc3vRHvA │
└─────────┴────────────────┴──────────────────────┴──────────────────────┴──────────────────────┴────────────────────────────────────────────┴───────────────────┴────────────────┘
Data in table _dlt_version:
┌─────────┬────────────────┬──────────────────────┬──────────────────────┬──────────────────────┬─────────────────────────────────────────────────────────────────────────────────┐
│ version │ engine_version │ inserted_at │ schema_name │ version_hash │ schema │
│ int64 │ int64 │ timestamp with tim… │ varchar │ varchar │ varchar │
├─────────┼────────────────┼──────────────────────┼──────────────────────┼──────────────────────┼─────────────────────────────────────────────────────────────────────────────────┤
│ 2 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ Q5UNIOd7gJ6ljH5qfK… │ {"version":2,"version_hash":"Q5UNIOd7gJ6ljH5qfKKcO7yWwPvNESKW+mVXJmx9geg=","e… │
│ 4 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ OyUXGP6PvFQuUTPnPd… │ {"version":4,"version_hash":"OyUXGP6PvFQuUTPnPdvESnsEqpFAxivJoP+l0G6l4+M=","e… │
│ 6 │ 11 │ 2025-02-01 11:31:3… │ debezium_source_ev… │ jqZNcnJXF/33Va2kRW… │ {"version":6,"version_hash":"jqZNcnJXF/33Va2kRWgKOZF4RnZSVgYxMDhFep8+Jg8=","e… │
│ 8 │ 11 │ 2025-02-01 11:31:4… │ debezium_source_ev… │ uMZY5n2NGPecXvVQIe… │ {"version":8,"version_hash":"uMZY5n2NGPecXvVQIePLEg2nZQcAlkoWAXDLALKjWuQ=","e… │
└─────────┴────────────────┴──────────────────────┴──────────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────┘
Data in table testc_inventory_customers:
┌───────┬────────────┬───────────┬───────────────────────┬─────────┬─────────┬───────────┬───────────────┬───────────────┬───────────────────┬────────────────┐
│ id │ first_name │ last_name │ email │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────┼───────────┼───────────────────────┼─────────┼─────────┼───────────┼───────────────┼───────────────┼───────────────────┼────────────────┤
│ 1001 │ Sally │ Thomas │ sally.thomas@acme.com │ false │ r │ customers │ 1738405883186 │ 1738405896858 │ 1738405897.413279 │ KcWKrYODYJ859w │
│ 1002 │ George │ Bailey │ gbailey@foobar.com │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ JU6dR1S27Xt3QA │
│ 1003 │ Edward │ Walker │ ed@walker.com │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ 02kMVvIX2/aGGg │
│ 1004 │ Anne │ Kretchmar │ annek@noanswer.org │ false │ r │ customers │ 1738405883186 │ 1738405896862 │ 1738405897.413279 │ TI7jpxl9FD2kRQ │
└───────┴────────────┴───────────┴───────────────────────┴─────────┴─────────┴───────────┴───────────────┴───────────────┴───────────────────┴────────────────┘
Data in table testc_inventory_geom:
┌───────┬──────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬───────────────┬───────────────┬───────────────────┬────────────────┐
│ id │ g__wkb │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼──────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼───────────────┼───────────────┼───────────────────┼────────────────┤
│ 1 │ AQEAAAAAAAAAAADwPwAAAAAAAPA/ │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405897.413279 │ 17snqevSVWL0xA │
│ 2 │ AQIAAAACAAAAAAAAAAAAAEAAAAAAAADwPwAAAAAAABhAAAAAAAAAGEA= │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405898.176148 │ W4kfG5n5jYhy3w │
│ 3 │ AQMAAAABAAAABQAAAAAAAAAAAAAAAAAAAAAAFEAAAAAAAAAAQAAAAAAAABRAAAAAAA… │ false │ r │ geom │ 1738405883186 │ 1738405896872 │ 1738405898.176148 │ 40HrbnruXZaB/g │
└───────┴──────────────────────────────────────────────────────────────────────┴─────────┴─────────┴─────────┴───────────────┴───────────────┴───────────────────┴────────────────┘
Data in table testc_inventory_orders:
┌───────┬────────────┬───────────┬──────────┬────────────┬─────────┬─────────┬─────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ id │ order_date │ purchaser │ quantity │ product_id │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ int64 │ int64 │ int64 │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────┼───────────┼──────────┼────────────┼─────────┼─────────┼─────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 10001 │ 16816 │ 1001 │ 1 │ 102 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ X7ejebZDxmm+hw │
│ 10002 │ 16817 │ 1002 │ 2 │ 105 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ 6LU0Fe9UVE3XFQ │
│ 10003 │ 16850 │ 1002 │ 2 │ 106 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405898.176148 │ 0OIBPdMzqjLh0w │
│ 10004 │ 16852 │ 1003 │ 1 │ 107 │ false │ r │ orders │ 1738405883186 │ 1738405896876 │ 1738405899.4865642 │ CcY6FKlHLQ6mPg │
└───────┴────────────┴───────────┴──────────┴────────────┴─────────┴─────────┴─────────┴───────────────┴───────────────┴────────────────────┴────────────────┘
Data in table testc_inventory_products:
┌───────┬────────────────────┬──────────────────────────────────────┬────────┬─────────┬─────────┬──────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ id │ name │ description │ weight │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ varchar │ varchar │ double │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├───────┼────────────────────┼──────────────────────────────────────┼────────┼─────────┼─────────┼──────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 101 │ scooter │ Small 2-wheel scooter │ 3.14 │ false │ r │ products │ 1738405883186 │ 1738405896879 │ 1738405899.4865642 │ aOf6efrtt48+1Q │
│ 102 │ car battery │ 12V car battery │ 8.1 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ kUuPhtKUAsTUaA │
│ 103 │ 12-pack drill bits │ 12-pack of drill bits with sizes r… │ 0.8 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ evSpPy68nldtbg │
│ 104 │ hammer │ 12oz carpenter's hammer │ 0.75 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.4865642 │ lCpa9yyHSm8xqA │
│ 105 │ hammer │ 14oz carpenter's hammer │ 0.875 │ false │ r │ products │ 1738405883186 │ 1738405896880 │ 1738405899.775917 │ VXcDU/tw/zT2fw │
└───────┴────────────────────┴──────────────────────────────────────┴────────┴─────────┴─────────┴──────────┴───────────────┴───────────────┴────────────────────┴────────────────┘
Data in table testc_inventory_products_on_hand:
┌────────────┬──────────┬─────────┬─────────┬──────────────────┬───────────────┬───────────────┬────────────────────┬────────────────┐
│ product_id │ quantity │ deleted │ op │ table │ source_ts_ms │ ts_ms │ _dlt_load_id │ _dlt_id │
│ int64 │ int64 │ varchar │ varchar │ varchar │ int64 │ int64 │ varchar │ varchar │
├────────────┼──────────┼─────────┼─────────┼──────────────────┼───────────────┼───────────────┼────────────────────┼────────────────┤
│ 101 │ 3 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ 90r3+XR7PH7y6g │
│ 102 │ 8 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ 5F+LUMVYO3I2wQ │
│ 103 │ 18 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ SguX65iX7ffyJg │
│ 104 │ 4 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ Vj/N2j0bN3ipzw │
│ 105 │ 5 │ false │ r │ products_on_hand │ 1738405883186 │ 1738405896883 │ 1738405900.0213661 │ z31M4RIQPpq3BA │
└────────────┴──────────┴─────────┴─────────┴──────────────────┴───────────────┴───────────────┴────────────────────┴────────────────┘ 自己测试
您可以通过 dlt_consuming 查看完整示例并开始尝试。
要运行此示例,您需要安装 Docker Desktop 和必需的 Python 库。您可以使用以下命令安装依赖项:
pip install pydbzengine[dev]
python dlt_consuming.py 关键要点
此示例演示了一种强大而简单的方法,可以使用 Debezium 和 DLT 从数据库捕获变更数据并将其加载到数据仓库中。这些工具的组合为 CDC 场景提供了一种强大而简单的解决方案,支持实时数据同步和分析。
Python 和 pydbzengine 的使用使其易于将 Debezium 集成到现有的 Python 工作流中。
`DltChangeHandler` 提供了清晰的关注点分离,负责与 DLT 的集成和数据加载过程。
总结与贡献
基于 Debezium,pydbzengine 使用 Python 使得设置低延迟数据摄取流水线变得非常简单。该项目完全开源,采用 Apache 2.0 许可证。pydbzengine 仍然是一个年轻的项目,还有需要改进的地方。请随时测试它、提供反馈、打开功能请求或发送拉取请求。
祝数据工程愉快……
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。