TimescaleDB 集成
TimescaleDB 是一个开源数据库,旨在使 SQL 能够大规模处理时间序列数据。它基于 PostgreSQL 数据库,并作为其扩展来实现。
Debezium PostgreSQL 连接器可以捕获 TimescaleDB 的数据更改。标准的 PostgreSQL 连接器 从数据库读取原始数据。然后,您可以使用 io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb 转换来处理原始数据、执行逻辑路由以及添加相关元数据。
安装
-
按照 TimescaleDB 文档 中的说明安装 TimescaleDB。
-
按照 Debezium 安装指南 中的说明安装 Debezium PostgresSQL 连接器。
-
配置 TimescaleDB 并部署连接器。
工作原理
Debezium 可以捕获来自以下 TimescaleDB 函数的事件
-
Hypertables (超级表)
-
持续聚合 (Continuous aggregates)
-
Compression (压缩)
这三个函数在内部是相互依赖的。每个函数都建立在 PostgreSQL 存储数据的表的功能之上。Debezium 对所有三个函数的支持程度不同。
SMT 需要访问 TimescaleDB 元数据。由于 SMT 无法在连接器级别访问数据库配置,因此您必须为转换显式定义配置元数据。
HyperTables
Hypertable 是一个用于存储时间序列数据的逻辑表。数据根据定义的时限列进行分块(分区)。TimescaleDB 在其内部模式下创建一个或多个物理表,每个表代表一个块。默认情况下,连接器会捕获每个块表的更改,并将更改流式传输到与每个块对应的单独主题。Timescaledb 转换会重新组合来自不同主题的数据,然后将重新组合后的数据流式传输到单个主题。
-
转换可以访问 TimescaleDB 元数据以获取块/Hypertable 映射。
-
转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式如下:
<prefix>.<hypertable-schema-name>.<hypertable-name> -
转换会将以下标头添加到事件中
__debezium_timescaledb_chunk_table-
存储事件数据的物理表的名称。
__debezium_timescaledb_chunk_schema-
物理表所属的模式的名称。
以下示例显示了一个用于在 public 模式下创建 conditions hypertable 的 SQL 命令
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');
Timescaledb SMT 将捕获的 hypertable 中的更改事件路由到一个名为 timescaledb.public.conditions 的主题。转换会使用您在配置中定义的标头来丰富事件消息。例如:
__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
持续聚合 (Continuous aggregates)
持续聚合对存储在 hypertables 中的数据提供自动统计计算。聚合视图由其自己的 hypertable 支持,而该 hypertable 又由一组 PostgreSQL 表支持。聚合可以自动或手动重新计算。在重新计算聚合后,新值会存储在 hypertable 中,然后可以从中捕获和流式传输。来自聚合的数据会根据其存储的块流式传输到不同的主题。Timescaledb 转换会重新组合流式传输到不同主题的数据,并将其路由到单个主题。
-
转换可以访问 TimescaleDB 元数据,以获取块与 hypertable 之间以及 hypertable 与聚合之间的映射。
-
转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式为
<prefix>.<aggregate-schema-name>.<aggregate-name>。 -
转换会将以下标头添加到事件中
__debezium_timescaledb_hypertable_table-
存储持续聚合的 hypertable 的名称。
__debezium_timescaledb_hypertable_schema-
hypertable 所属的模式的名称。
__debezium_timescaledb_chunk_table-
存储持续聚合的物理表的名称。
__debezium_timescaledb_chunk_schema-
物理表所属的模式的名称。
以下示例显示了一个用于在 public 模式下创建 conditions_summary 持续聚合的 SQL 命令。
CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
TimescaleDB SMT 将捕获的聚合中的更改事件路由到一个名为 timescaledb.public.conditions_summary 的主题。转换会使用您在配置中定义的标头来丰富事件消息。例如:
_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
__debezium_timescaledb_hypertable_table: _materialized_hypertable_2
__debezium_timescaledb_hypertable_schema: _timescaledb_internal
TimescaleDB 配置
Debezium 使用复制槽来捕获 TimescaleDB 和 PostgreSQL 的更改。复制槽以多种消息格式存储数据。通常,最好将 Debezium 配置为使用 pgoutput 解码器(TimescaleDB 实例的默认解码器)从插槽读取。
要配置复制槽,请在 postgresql.conf 文件中指定以下内容:
# REPLICATION
wal_level = logical (1)
| 1 | 指示服务器使用逻辑解码和预写日志。 |
要配置要复制的表,您必须创建一个发布,如下面的示例所示:
CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
您可以全局创建发布(如上例所示),也可以为每个表创建单独的发布。由于 TimescaleDB 会根据需要自动创建表,因此强烈建议使用全局发布。
连接器配置
以配置 PostgreSQL 连接器相同的方式配置 TimescaleDB SMT。要使连接器能够正确处理来自 TimescaleDB 的事件,请在连接器配置中添加以下选项:
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "...",
"transforms.timescaledb.database.user": "...",
"transforms.timescaledb.database.password": "...",
"transforms.timescaledb.database.dbname": "..."
连接器配置示例
以下示例显示了用于设置 PostgreSQL 连接器的配置,以便连接到具有逻辑名称 dbserver1、端口 5432、位于 192.168.99.100 的 TimescaleDB 服务器。通常,您会在 JSON 文件中配置 Debezium PostgreSQL 连接器,方法是设置连接器可用的配置属性。
您可以选择为数据库中的部分模式和表生成事件。可选地,您可以忽略、屏蔽或截断包含敏感数据、超出指定大小或不需要的列。
{
"name": "timescaledb-connector", (1)
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "5432", (4)
"database.user": "postgres", (5)
"database.password": "postgres", (6)
"database.dbname" : "postgres", (7)
"topic.prefix": "dbserver1", (8)
"plugin.name": "pgoutput", (9)
"schema.include.list": "_timescaledb_internal", (10)
"transforms": "timescaledb", (11)
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
"transforms.timescaledb.database.hostname": "timescaledb", (13)
"transforms.timescaledb.database.port": "5432", (14)
"transforms.timescaledb.database.user": "postgres", (15)
"transforms.timescaledb.database.password": "postgres", (16)
"transforms.timescaledb.database.dbname": "postgres" (17)
}
}
| 1 | 连接器在 Kafka Connect 服务中注册时的名称。 |
| 2 | 此 PostgreSQL 连接器类的名称。 |
| 3 | TimescaleDB 服务器的地址。 |
| 4 | TimescaleDB 服务器的端口号。 |
| 5 | TimescaleDB 用户的名称。 |
| 6 | TimescaleDB 的密码。 |
| 7 | 要连接的 TimescaleDB 数据库的名称。 |
| 8 | TimescaleDB 服务器或集群的主题前缀。此前缀构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及 Avro 转换器使用时的相应 Avro 模式的命名空间。 |
| 9 | 指示使用 pgoutput 逻辑解码插件。 |
| 10 | 包含 TimescaleDB 物理表的模式列表。 |
| 11 | 启用 SMT 以处理 TimescaleDB 原始事件。 |
| 12 | 启用 SMT 以处理 TimescaleDB 原始事件。 |
| 13 | 提供 SMT 的 TimescaleDB 连接信息。值必须与项目 3 - 7 的值匹配。 |
配置选项
下表列出了您可以为 TimescaleDB 集成 SMT 设置的配置选项。
属性 |
Default (默认值) |
描述 |
无默认值 |
TimescaleDB 数据库服务器的 IP 地址或主机名。 |
|
|
TimescaleDB 数据库服务器的整数端口号。 |
|
无默认值 |
用于连接到 TimescaleDB 数据库服务器的 TimescaleDB 数据库用户名。 |
|
无默认值 |
连接到 TimescaleDB 数据库服务器时使用的密码。 |
|
无默认值 |
要从中流式传输更改的 TimescaleDB 数据库的名称。 |
|
|
包含 TimescaleDB 原始(内部)数据表的模式名称的逗号分隔列表。SMT 仅处理源自列表中某个模式的更改。 |
|
|
TimescaleDB 事件被路由到的主题的命名空间(前缀)。SMT 将消息路由到名为 |