您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

TimescaleDB 集成

TimescaleDB 是一个开源数据库,旨在使 SQL 能够大规模处理时间序列数据。它基于 PostgreSQL 数据库,并作为其扩展来实现。

Debezium PostgreSQL 连接器可以捕获 TimescaleDB 的数据更改。标准的 PostgreSQL 连接器 从数据库读取原始数据。然后,您可以使用 io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb 转换来处理原始数据、执行逻辑路由以及添加相关元数据。

安装

  1. 按照 TimescaleDB 文档 中的说明安装 TimescaleDB。

  2. 按照 Debezium 安装指南 中的说明安装 Debezium PostgresSQL 连接器。

  3. 配置 TimescaleDB 并部署连接器。

工作原理

Debezium 可以捕获来自以下 TimescaleDB 函数的事件

  • Hypertables (超级表)

  • 持续聚合 (Continuous aggregates)

  • Compression (压缩)

这三个函数在内部是相互依赖的。每个函数都建立在 PostgreSQL 存储数据的表的功能之上。Debezium 对所有三个函数的支持程度不同。

SMT 需要访问 TimescaleDB 元数据。由于 SMT 无法在连接器级别访问数据库配置,因此您必须为转换显式定义配置元数据。

HyperTables

Hypertable 是一个用于存储时间序列数据的逻辑表。数据根据定义的时限列进行分块(分区)。TimescaleDB 在其内部模式下创建一个或多个物理表,每个表代表一个块。默认情况下,连接器会捕获每个块表的更改,并将更改流式传输到与每个块对应的单独主题。Timescaledb 转换会重新组合来自不同主题的数据,然后将重新组合后的数据流式传输到单个主题。

  • 转换可以访问 TimescaleDB 元数据以获取块/Hypertable 映射。

  • 转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式如下:<prefix>.<hypertable-schema-name>.<hypertable-name>

  • 转换会将以下标头添加到事件中

    __debezium_timescaledb_chunk_table

    存储事件数据的物理表的名称。

    __debezium_timescaledb_chunk_schema

    物理表所属的模式的名称。

示例:从 Hypertable 流式传输数据

以下示例显示了一个用于在 public 模式下创建 conditions hypertable 的 SQL 命令

CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');

Timescaledb SMT 将捕获的 hypertable 中的更改事件路由到一个名为 timescaledb.public.conditions 的主题。转换会使用您在配置中定义的标头来丰富事件消息。例如:

__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal

持续聚合 (Continuous aggregates)

持续聚合对存储在 hypertables 中的数据提供自动统计计算。聚合视图由其自己的 hypertable 支持,而该 hypertable 又由一组 PostgreSQL 表支持。聚合可以自动或手动重新计算。在重新计算聚合后,新值会存储在 hypertable 中,然后可以从中捕获和流式传输。来自聚合的数据会根据其存储的块流式传输到不同的主题。Timescaledb 转换会重新组合流式传输到不同主题的数据,并将其路由到单个主题。

  • 转换可以访问 TimescaleDB 元数据,以获取块与 hypertable 之间以及 hypertable 与聚合之间的映射。

  • 转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式为 <prefix>.<aggregate-schema-name>.<aggregate-name>

  • 转换会将以下标头添加到事件中

    __debezium_timescaledb_hypertable_table

    存储持续聚合的 hypertable 的名称。

    __debezium_timescaledb_hypertable_schema

    hypertable 所属的模式的名称。

    __debezium_timescaledb_chunk_table

    存储持续聚合的物理表的名称。

    __debezium_timescaledb_chunk_schema

    物理表所属的模式的名称。

示例:从持续聚合流式传输数据

以下示例显示了一个用于在 public 模式下创建 conditions_summary 持续聚合的 SQL 命令。

CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
  SELECT
    location,
    time_bucket(INTERVAL '1 hour', time) AS bucket,
    AVG(temperature),
    MAX(temperature),
    MIN(temperature)
  FROM conditions
  GROUP BY location, bucket;

TimescaleDB SMT 将捕获的聚合中的更改事件路由到一个名为 timescaledb.public.conditions_summary 的主题。转换会使用您在配置中定义的标头来丰富事件消息。例如:

_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
__debezium_timescaledb_hypertable_table: _materialized_hypertable_2
__debezium_timescaledb_hypertable_schema: _timescaledb_internal

压缩 (Compression)

TimescaleDB SMT 不会对压缩函数应用任何特殊处理。压缩块会原封不动地转发到管道中的下一个下游作业进行进一步处理(如果需要)。通常,包含压缩块的消息会被丢弃,并且不会被管道中的后续作业处理。

TimescaleDB 配置

Debezium 使用复制槽来捕获 TimescaleDB 和 PostgreSQL 的更改。复制槽以多种消息格式存储数据。通常,最好将 Debezium 配置为使用 pgoutput 解码器(TimescaleDB 实例的默认解码器)从插槽读取。

要配置复制槽,请在 postgresql.conf 文件中指定以下内容:

# REPLICATION
wal_level = logical             (1)
1 指示服务器使用逻辑解码和预写日志。

要配置要复制的表,您必须创建一个发布,如下面的示例所示:

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

您可以全局创建发布(如上例所示),也可以为每个表创建单独的发布。由于 TimescaleDB 会根据需要自动创建表,因此强烈建议使用全局发布。

连接器配置

以配置 PostgreSQL 连接器相同的方式配置 TimescaleDB SMT。要使连接器能够正确处理来自 TimescaleDB 的事件,请在连接器配置中添加以下选项:

    "transforms": "timescaledb",
    "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
    "transforms.timescaledb.database.hostname": "timescaledb",
    "transforms.timescaledb.database.port": "...",
    "transforms.timescaledb.database.user": "...",
    "transforms.timescaledb.database.password": "...",
    "transforms.timescaledb.database.dbname": "..."

连接器配置示例

以下示例显示了用于设置 PostgreSQL 连接器的配置,以便连接到具有逻辑名称 dbserver1、端口 5432、位于 192.168.99.100 的 TimescaleDB 服务器。通常,您会在 JSON 文件中配置 Debezium PostgreSQL 连接器,方法是设置连接器可用的配置属性。

您可以选择为数据库中的部分模式和表生成事件。可选地,您可以忽略、屏蔽或截断包含敏感数据、超出指定大小或不需要的列。

{
  "name": "timescaledb-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "5432", (4)
    "database.user": "postgres", (5)
    "database.password": "postgres", (6)
    "database.dbname" : "postgres", (7)
    "topic.prefix": "dbserver1", (8)
    "plugin.name": "pgoutput", (9)
    "schema.include.list": "_timescaledb_internal", (10)
    "transforms": "timescaledb", (11)
    "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
    "transforms.timescaledb.database.hostname": "timescaledb", (13)
    "transforms.timescaledb.database.port": "5432", (14)
    "transforms.timescaledb.database.user": "postgres", (15)
    "transforms.timescaledb.database.password": "postgres", (16)
    "transforms.timescaledb.database.dbname": "postgres" (17)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 PostgreSQL 连接器类的名称。
3 TimescaleDB 服务器的地址。
4 TimescaleDB 服务器的端口号。
5 TimescaleDB 用户的名称。
6 TimescaleDB 的密码。
7 要连接的 TimescaleDB 数据库的名称。
8 TimescaleDB 服务器或集群的主题前缀。此前缀构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及 Avro 转换器使用时的相应 Avro 模式的命名空间。
9 指示使用 pgoutput 逻辑解码插件。
10 包含 TimescaleDB 物理表的模式列表。
11 启用 SMT 以处理 TimescaleDB 原始事件。
12 启用 SMT 以处理 TimescaleDB 原始事件。
13 提供 SMT 的 TimescaleDB 连接信息。值必须与项目 3 - 7 的值匹配。

配置选项

下表列出了您可以为 TimescaleDB 集成 SMT 设置的配置选项。

表 1. TimescaleDB 集成 SMT (TimescaleDB) 配置选项

属性

Default (默认值)

描述

无默认值

TimescaleDB 数据库服务器的 IP 地址或主机名。

5432

TimescaleDB 数据库服务器的整数端口号。

无默认值

用于连接到 TimescaleDB 数据库服务器的 TimescaleDB 数据库用户名。

无默认值

连接到 TimescaleDB 数据库服务器时使用的密码。

无默认值

要从中流式传输更改的 TimescaleDB 数据库的名称。

_timescaledb_internal

包含 TimescaleDB 原始(内部)数据表的模式名称的逗号分隔列表。SMT 仅处理源自列表中某个模式的更改。

timescaledb

TimescaleDB 事件被路由到的主题的命名空间(前缀)。SMT 将消息路由到名为 _<prefix>_._<schema>_._<hypertable|aggregate>_ 的主题。