TimescaleDB 集成

TimescaleDB 是一个开源数据库,旨在实现时间序列数据的 SQL 可扩展性。它基于 PostgreSQL 数据库,并作为其扩展实现。

Debezium PostgreSQL 连接器可以捕获 TimescaleDB 的数据更改。标准的 PostgreSQL 连接器会从数据库读取原始数据。然后,您可以使用 io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb 转换来处理原始数据、执行逻辑路由,并添加相关的元数据。

安装

  1. 按照 TimescaleDB 文档中的说明安装 TimescaleDB。

  2. 按照 Debezium 安装指南中的说明安装 Debezium PostgresSQL 连接器。

  3. 配置 TimescaleDB,并部署连接器。

工作原理

Debezium 可以捕获来自以下 TimescaleDB 函数的事件:

  • Hypertables (超级表)

  • 连续聚合

  • Compression (压缩)

这三个函数在内部是相互依赖的。每个函数都建立在 PostgreSQL 存储数据到表的功能之上。Debezium 对这三个函数都有不同程度的支持。

SMT 需要访问 TimescaleDB 元数据。由于 SMT 无法在连接器级别访问数据库配置,因此您必须为转换显式定义配置元数据。

HyperTables

HyperTable 是用于存储时间序列数据的逻辑表。数据根据定义的时限列进行分块(分区)。TimescaleDB 在其内部模式中创建一个或多个物理表,每个表代表一个块。默认情况下,连接器会捕获每个块表中的更改,并将更改流式传输到与每个块对应的单独主题中。Timescaledb 转换会重新组合来自各个主题的数据,然后将重新组合的数据流式传输到一个主题中。

  • 转换可以访问 TimescaleDB 元数据以获取块/HyperTable 映射。

  • 转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式如下:<prefix>.<hypertable-schema-name>.<hypertable-name>

  • 转换会将以下标头添加到事件中:

    __debezium_timescaledb_chunk_table

    存储事件数据的物理表的名称。

    __debezium_timescaledb_chunk_schema

    物理表所属模式的名称。

示例:从 HyperTable 流式传输数据

以下示例显示了在 public 模式中创建 conditions HyperTable 的 SQL 命令。

CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');

Timescaledb SMT 将在 HyperTable 中捕获的更改事件路由到一个名为 timescaledb.public.conditions 的主题。转换会使用您在配置中定义的标头丰富事件消息。例如:

__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal

连续聚合

连续聚合提供对存储在 HyperTable 中的数据的自动统计计算。聚合视图由其自身的 HyperTable 支持,而该 HyperTable 又由一组 PostgreSQL 表支持。可以自动或手动重新计算聚合。重新计算聚合后,新值将存储在 HyperTable 中,然后可以从中捕获并流式传输数据。来自聚合的数据将根据其存储的块流式传输到不同的主题。Timescaledb 转换会重新组合流式传输到不同主题的数据,并将其路由到一个主题。

  • 转换可以访问 TimescaleDB 元数据以获取块和 HyperTable 之间的映射,以及 HyperTable 和聚合之间的映射。

  • 转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式为 <prefix>.<aggregate-schema-name>.<aggregate-name>

  • 转换会将以下标头添加到事件中:

    __debezium_timescaledb_hypertable_table

    存储连续聚合的 HyperTable 的名称。

    __debezium_timescaledb_hypertable_schema

    HyperTable 所属模式的名称。

    __debezium_timescaledb_chunk_table

    存储连续聚合的物理表的名称。

    __debezium_timescaledb_chunk_schema

    物理表所属模式的名称。

示例:从连续聚合流式传输数据

以下示例显示了在 public 模式中创建连续聚合 conditions_summary 的 SQL 命令。

CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
  SELECT
    location,
    time_bucket(INTERVAL '1 hour', time) AS bucket,
    AVG(temperature),
    MAX(temperature),
    MIN(temperature)
  FROM conditions
  GROUP BY location, bucket;

TimescaleDB SMT 将在聚合中捕获的更改事件路由到一个名为 timescaledb.public.conditions_summary 的主题。转换会使用您在配置中定义的标头丰富事件消息。例如:

_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
__debezium_timescaledb_hypertable_table: _materialized_hypertable_2
__debezium_timescaledb_hypertable_schema: _timescaledb_internal

压缩

TimescaleDB SMT 不会对压缩函数应用任何特殊处理。压缩的块会未经更改地转发到管道中的下一个下游作业进行进一步处理(如有必要)。通常,带有压缩块的消息将被丢弃,不会被管道中的后续作业处理。

TimescaleDB 配置

Debezium 使用复制槽从 TimescaleDB 和 PostgreSQL 捕获更改。复制槽会以多种消息格式存储数据。通常,最好将 Debezium 配置为使用 pgoutput 解码器(TimescaleDB 实例的默认解码器)从复制槽读取。

要配置复制槽,请在 postgresql.conf 文件中指定以下内容:

# REPLICATION
wal_level = logical             (1)
1 指示服务器使用逻辑解码和预写日志。

要配置要复制的表,您必须创建一个发布,如下面的示例所示:

CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')

您可以全局创建发布(如上例所示),也可以为每个表创建单独的发布。由于 TimescaleDB 会根据需要自动创建表,因此强烈建议使用全局发布。

连接器配置

以配置 PostgreSQL 连接器相同的方式配置 TimescaleDB SMT。要使连接器能够正确处理 TimescaleDB 的事件,请将以下选项添加到连接器配置中:

    "transforms": "timescaledb",
    "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
    "transforms.timescaledb.database.hostname": "timescaledb",
    "transforms.timescaledb.database.port": "...",
    "transforms.timescaledb.database.user": "...",
    "transforms.timescaledb.database.password": "...",
    "transforms.timescaledb.database.dbname": "..."

连接器配置示例

以下示例显示了配置 PostgreSQL 连接器以连接到逻辑名称为 dbserver1、端口为 5432、IP 地址为 192.168.99.100 的 TimescaleDB 服务器的配置。通常,您会在 JSON 文件中配置 Debezium PostgreSQL 连接器,方法是设置连接器可用的配置属性。

您可以选择为数据库中的一部分模式和表生成事件。可选地,您可以忽略、屏蔽或截断包含敏感数据、超出指定大小或您不需要的列。

{
  "name": "timescaledb-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "5432", (4)
    "database.user": "postgres", (5)
    "database.password": "postgres", (6)
    "database.dbname" : "postgres", (7)
    "topic.prefix": "dbserver1", (8)
    "plugin.name": "pgoutput", (9)
    "schema.include.list": "_timescaledb_internal", (10)
    "transforms": "timescaledb", (11)
    "transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
    "transforms.timescaledb.database.hostname": "timescaledb", (13)
    "transforms.timescaledb.database.port": "5432", (14)
    "transforms.timescaledb.database.user": "postgres", (15)
    "transforms.timescaledb.database.password": "postgres", (16)
    "transforms.timescaledb.database.dbname": "postgres" (17)
  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 PostgreSQL 连接器类的名称。
3 TimescaleDB 服务器的地址。
4 TimescaleDB 服务器的端口号。
5 TimescaleDB 用户的名称。
6 TimescaleDB 的密码。
7 要连接的 TimescaleDB 数据库的名称。
8 TimescaleDB 服务器或集群的主题前缀。此前缀构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及相应 Avro 模式的命名空间(当使用 Avro 转换器时)。
9 指示使用 pgoutput 逻辑解码插件。
10 包含 TimescaleDB 物理表的​​所有模式的列表。
11 启用 SMT 以处理原始 TimescaleDB 事件。
12 启用 SMT 以处理原始 TimescaleDB 事件。
13 提供 SMT 的 TimescaleDB 连接信息。值必须与第 3-7 项的值匹配。

配置选项

下表列出了您可以为 TimescaleDB 集成 SMT 设置的配置选项。

表 1. TimescaleDB 集成 SMT (TimescaleDB) 配置选项

属性

Default (默认值)

描述

无默认值

TimescaleDB 数据库服务器的 IP 地址或主机名。

5432

TimescaleDB 数据库服务器的整数端口号。

无默认值

用于连接到 TimescaleDB 数据库服务器的 TimescaleDB 数据库用户的名称。

无默认值

连接到 TimescaleDB 数据库服务器时使用的密码。

无默认值

要从中流式传输更改的 TimescaleDB 数据库的名称。

_timescaledb_internal

包含 TimescaleDB 原始(内部)数据表的​​模式名称的逗号分隔列表。SMT 仅处理源自列表中某个模式的更改。

timescaledb

TimescaleDB 事件路由到的主题的命名空间(前缀)。SMT 将消息路由到命名为 _<prefix>_._<schema>_._<hypertable|aggregate>_ 的主题。