TimescaleDB 集成
TimescaleDB 是一个开源数据库,旨在实现时间序列数据的 SQL 可扩展性。它基于 PostgreSQL 数据库,并作为其扩展实现。
Debezium PostgreSQL 连接器可以捕获 TimescaleDB 的数据更改。标准的 PostgreSQL 连接器会从数据库读取原始数据。然后,您可以使用 io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb 转换来处理原始数据、执行逻辑路由,并添加相关的元数据。
安装
-
按照 TimescaleDB 文档中的说明安装 TimescaleDB。
-
按照 Debezium 安装指南中的说明安装 Debezium PostgresSQL 连接器。
-
配置 TimescaleDB,并部署连接器。
工作原理
Debezium 可以捕获来自以下 TimescaleDB 函数的事件:
-
Hypertables (超级表)
-
连续聚合
-
Compression (压缩)
这三个函数在内部是相互依赖的。每个函数都建立在 PostgreSQL 存储数据到表的功能之上。Debezium 对这三个函数都有不同程度的支持。
SMT 需要访问 TimescaleDB 元数据。由于 SMT 无法在连接器级别访问数据库配置,因此您必须为转换显式定义配置元数据。
HyperTables
HyperTable 是用于存储时间序列数据的逻辑表。数据根据定义的时限列进行分块(分区)。TimescaleDB 在其内部模式中创建一个或多个物理表,每个表代表一个块。默认情况下,连接器会捕获每个块表中的更改,并将更改流式传输到与每个块对应的单独主题中。Timescaledb 转换会重新组合来自各个主题的数据,然后将重新组合的数据流式传输到一个主题中。
-
转换可以访问 TimescaleDB 元数据以获取块/HyperTable 映射。
-
转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式如下:
<prefix>.<hypertable-schema-name>.<hypertable-name> -
转换会将以下标头添加到事件中:
__debezium_timescaledb_chunk_table-
存储事件数据的物理表的名称。
__debezium_timescaledb_chunk_schema-
物理表所属模式的名称。
以下示例显示了在 public 模式中创建 conditions HyperTable 的 SQL 命令。
CREATE TABLE conditions (time TIMESTAMPTZ NOT NULL, location TEXT NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL);
SELECT create_hypertable('conditions', 'time');
Timescaledb SMT 将在 HyperTable 中捕获的更改事件路由到一个名为 timescaledb.public.conditions 的主题。转换会使用您在配置中定义的标头丰富事件消息。例如:
__debezium_timescaledb_chunk_table: _hyper_1_1_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
连续聚合
连续聚合提供对存储在 HyperTable 中的数据的自动统计计算。聚合视图由其自身的 HyperTable 支持,而该 HyperTable 又由一组 PostgreSQL 表支持。可以自动或手动重新计算聚合。重新计算聚合后,新值将存储在 HyperTable 中,然后可以从中捕获并流式传输数据。来自聚合的数据将根据其存储的块流式传输到不同的主题。Timescaledb 转换会重新组合流式传输到不同主题的数据,并将其路由到一个主题。
-
转换可以访问 TimescaleDB 元数据以获取块和 HyperTable 之间的映射,以及 HyperTable 和聚合之间的映射。
-
转换会将捕获的事件从其特定于块的主题重新路由到一个逻辑主题,该主题的命名模式为
<prefix>.<aggregate-schema-name>.<aggregate-name>。 -
转换会将以下标头添加到事件中:
__debezium_timescaledb_hypertable_table-
存储连续聚合的 HyperTable 的名称。
__debezium_timescaledb_hypertable_schema-
HyperTable 所属模式的名称。
__debezium_timescaledb_chunk_table-
存储连续聚合的物理表的名称。
__debezium_timescaledb_chunk_schema-
物理表所属模式的名称。
以下示例显示了在 public 模式中创建连续聚合 conditions_summary 的 SQL 命令。
CREATE MATERIALIZED VIEW conditions_summary WITH (timescaledb.continuous) AS
SELECT
location,
time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket;
TimescaleDB SMT 将在聚合中捕获的更改事件路由到一个名为 timescaledb.public.conditions_summary 的主题。转换会使用您在配置中定义的标头丰富事件消息。例如:
_debezium_timescaledb_chunk_table: _hyper_2_2_chunk
__debezium_timescaledb_chunk_schema: _timescaledb_internal
__debezium_timescaledb_hypertable_table: _materialized_hypertable_2
__debezium_timescaledb_hypertable_schema: _timescaledb_internal
TimescaleDB 配置
Debezium 使用复制槽从 TimescaleDB 和 PostgreSQL 捕获更改。复制槽会以多种消息格式存储数据。通常,最好将 Debezium 配置为使用 pgoutput 解码器(TimescaleDB 实例的默认解码器)从复制槽读取。
要配置复制槽,请在 postgresql.conf 文件中指定以下内容:
# REPLICATION
wal_level = logical (1)
| 1 | 指示服务器使用逻辑解码和预写日志。 |
要配置要复制的表,您必须创建一个发布,如下面的示例所示:
CREATE PUBLICATION dbz_publication FOR ALL TABLES WITH (publish = 'insert, update')
您可以全局创建发布(如上例所示),也可以为每个表创建单独的发布。由于 TimescaleDB 会根据需要自动创建表,因此强烈建议使用全局发布。
连接器配置
以配置 PostgreSQL 连接器相同的方式配置 TimescaleDB SMT。要使连接器能够正确处理 TimescaleDB 的事件,请将以下选项添加到连接器配置中:
"transforms": "timescaledb",
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb",
"transforms.timescaledb.database.hostname": "timescaledb",
"transforms.timescaledb.database.port": "...",
"transforms.timescaledb.database.user": "...",
"transforms.timescaledb.database.password": "...",
"transforms.timescaledb.database.dbname": "..."
连接器配置示例
以下示例显示了配置 PostgreSQL 连接器以连接到逻辑名称为 dbserver1、端口为 5432、IP 地址为 192.168.99.100 的 TimescaleDB 服务器的配置。通常,您会在 JSON 文件中配置 Debezium PostgreSQL 连接器,方法是设置连接器可用的配置属性。
您可以选择为数据库中的一部分模式和表生成事件。可选地,您可以忽略、屏蔽或截断包含敏感数据、超出指定大小或您不需要的列。
{
"name": "timescaledb-connector", (1)
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "5432", (4)
"database.user": "postgres", (5)
"database.password": "postgres", (6)
"database.dbname" : "postgres", (7)
"topic.prefix": "dbserver1", (8)
"plugin.name": "pgoutput", (9)
"schema.include.list": "_timescaledb_internal", (10)
"transforms": "timescaledb", (11)
"transforms.timescaledb.type": "io.debezium.connector.postgresql.transforms.timescaledb.TimescaleDb", (12)
"transforms.timescaledb.database.hostname": "timescaledb", (13)
"transforms.timescaledb.database.port": "5432", (14)
"transforms.timescaledb.database.user": "postgres", (15)
"transforms.timescaledb.database.password": "postgres", (16)
"transforms.timescaledb.database.dbname": "postgres" (17)
}
}
| 1 | 连接器在 Kafka Connect 服务中注册时的名称。 |
| 2 | 此 PostgreSQL 连接器类的名称。 |
| 3 | TimescaleDB 服务器的地址。 |
| 4 | TimescaleDB 服务器的端口号。 |
| 5 | TimescaleDB 用户的名称。 |
| 6 | TimescaleDB 的密码。 |
| 7 | 要连接的 TimescaleDB 数据库的名称。 |
| 8 | TimescaleDB 服务器或集群的主题前缀。此前缀构成一个命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及相应 Avro 模式的命名空间(当使用 Avro 转换器时)。 |
| 9 | 指示使用 pgoutput 逻辑解码插件。 |
| 10 | 包含 TimescaleDB 物理表的所有模式的列表。 |
| 11 | 启用 SMT 以处理原始 TimescaleDB 事件。 |
| 12 | 启用 SMT 以处理原始 TimescaleDB 事件。 |
| 13 | 提供 SMT 的 TimescaleDB 连接信息。值必须与第 3-7 项的值匹配。 |
配置选项
下表列出了您可以为 TimescaleDB 集成 SMT 设置的配置选项。
属性 |
Default (默认值) |
描述 |
无默认值 |
TimescaleDB 数据库服务器的 IP 地址或主机名。 |
|
|
TimescaleDB 数据库服务器的整数端口号。 |
|
无默认值 |
用于连接到 TimescaleDB 数据库服务器的 TimescaleDB 数据库用户的名称。 |
|
无默认值 |
连接到 TimescaleDB 数据库服务器时使用的密码。 |
|
无默认值 |
要从中流式传输更改的 TimescaleDB 数据库的名称。 |
|
|
包含 TimescaleDB 原始(内部)数据表的模式名称的逗号分隔列表。SMT 仅处理源自列表中某个模式的更改。 |
|
|
TimescaleDB 事件路由到的主题的命名空间(前缀)。SMT 将消息路由到命名为 |