让我们谈谈 TOAST。吐司?不,TOAST!
那是什么?TOAST (The Oversized-Attribute Storage Technique,超大属性存储技术) 是 Postgres 中的一种机制,它将大型列值存储在多个物理行中,从而绕过了 8 KB 的页面大小限制。
通常,TOAST 存储对用户是透明的,所以您真的不必关心它。但是有一个例外:如果表行已更改,则使用 TOAST 机制存储的任何*未更改*值都不会包含在 Debezium 从数据库收到的消息中,除非它们是表复制身份的一部分。因此,这种未更改的 TOAST 列值将不会包含在发送到 Apache Kafka 的 Debezium 数据变更事件中。在本文中,我们将讨论处理这种情况的不同策略。
当在从数据库接收到的逻辑复制消息中遇到未更改的 TOAST 列值时,Debezium Postgres 连接器将使用可配置的占位符来表示该值。默认情况下,它是字面量 __debezium_unavailable_value,但可以使用 toasted.value.placeholder 连接器属性覆盖该值。
让我们以以下 Postgres 表定义为例
CREATE TABLE customers (
id SERIAL NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE,
biography TEXT
); 在此,biography TEXT 列是可 TOAST 的列,因为其值可能超过页面大小限制。因此,在发出类似 update inventory.customers set first_name = 'Dana' where id = 1004; 的更新时,您可能会在 Apache Kafka 中收到如下所示的数据更改事件(假设该表具有默认的复制标识)。
{
"before": null,
"after": {
"id": 1004,
"first_name": "Dana",
"last_name": "Kretchmar",
"email": "annek@noanswer.org",
"biography": "__debezium_unavailable_value"
},
"source": {
"version": "0.10.0.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1570448151151,
"snapshot": "false",
"db": "sourcedb",
"schema": "inventory",
"table": "customers",
"txId": 627,
"lsn": 34650016,
"xmin": null
},
"op": "u",
"ts_ms": 1570448151611
} 请注意,biography 字段(其值未随 UPDATE 更改)如何具有特殊的 __debezium_unavailable_value 标记值。现在,如果更改事件使用者接收到该占位符值,就会出现如何应对的问题。
一种方法,也是 **从消费者的角度来看最简单的方法**,就是从一开始就避免这种情况。这可以通过为相关 Postgres 表使用 FULL 的“复制标识”来实现。或者,复制标识可以基于包含可 TOAST 列的索引。
排除未更改的值
如果更改源表的复制标识不是一个选项,那么对于更新接收端数据存储(例如数据库、缓存或搜索索引)的消费者来说,一种方法是忽略更改事件中具有占位符值的任何字段。
这意味着,任何带有占位符值的列都必须从在接收端数据存储上执行的更新语句中省略。例如,就 SQL 数据库而言,必须构建并执行一个不包含带有占位符值的列的特定 UPDATE 语句。Hibernate ORM 的用户可能会想起“动态更新”功能,该功能工作方式类似。不过,一些数据存储和连接器可能只支持完全更新,在这种情况下,此策略不可行。
触发器
“忽略”方法的一个有趣变体是使用接收端数据库中的触发器:注册的列可以接收标记值,它们可以“否决”此类更改,而是保留先前存储的值。以下显示了一个 Postgres 中此类触发器的示例
CREATE OR REPLACE FUNCTION ignore_unchanged_biography()
RETURNS TRIGGER AS
$BODY$
BEGIN
IF NEW."biography" = '__debezium_unavailable_value'
THEN
NEW."biography" = OLD."biography";
END IF;
RETURN NEW;
END;
$BODY$ LANGUAGE PLPGSQL;
CREATE TRIGGER customer_biography_trigger
BEFORE UPDATE OF "biography"
ON customers
FOR EACH ROW
EXECUTE PROCEDURE ignore_unchanged_biography(); 如果 biography 列的值被设置为 __debezium_unavailable_value 标记值,这将保留该列的旧值。
有状态流处理
处理未更改的 TOAST 列值的另一种方法是有状态流处理应用程序。
该应用程序可以持久化 TOAST 列的最新值(从快照、插入事件或包含可 TOAST 列的更新中获取),并将其值放回带有标记值的更改事件中。
Debezium 确保一个特定记录的所有更改事件始终进入同一个分区,因此它们将以与创建时完全相同的顺序进行处理。这确保了在接收带有标记值的更改事件时,状态存储中提供了最新值。
使用 Kafka Streams 及其状态存储 API 可以非常方便地构建此类服务。基于 Quarkus 及其用于构建 Kafka Streams 应用程序 的扩展,可以在 JVM 上或通过 GraalVM 本地运行,解决方案可能如下所示
@ApplicationScoped
public class TopologyProducer {
private static final Logger LOG = LoggerFactory.getLogger(TopologyProducer.class);
static final String BIOGRAPHY_STORE = "biography-store";
@ConfigProperty(name = "pgtoast.customers.topic")
String customersTopic;
@ConfigProperty(name = "pgtoast.customers.enriched.topic")
String customersEnrichedTopic;
@Produces
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
StoreBuilder<KeyValueStore<JsonObject, String>> biographyStore = (1)
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(BIOGRAPHY_STORE),
new JsonObjectSerde(),
new Serdes.StringSerde()
);
builder.addStateStore(biographyStore);
builder.<JsonObject, JsonObject>stream(customersTopic) (2)
.transformValues(ToastColumnValueProvider::new, BIOGRAPHY_STORE)
.to(customersEnrichedTopic);
return builder.build();
}
class ToastColumnValueProvider implements
ValueTransformerWithKey<JsonObject, JsonObject, JsonObject> {
private KeyValueStore<JsonObject, String> biographyStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
biographyStore = (KeyValueStore<JsonObject, String>) context.getStateStore(
TopologyProducer.BIOGRAPHY_STORE);
}
@Override
public JsonObject transform(JsonObject key, JsonObject value) {
JsonObject payload = value.getJsonObject("payload");
JsonObject newRowState = payload.getJsonObject("after");
String biography = newRowState.getString("biography");
if (isUnavailableValueMarker(biography)) { (3)
String currentValue = biographyStore.get(key); (4)
if (currentValue == null) {
LOG.warn("No biography value found for key '{}'", key);
}
else {
value = Json.createObjectBuilder(value) (5)
.add(
"payload",
Json.createObjectBuilder(payload)
.add(
"after",
Json.createObjectBuilder(newRowState).add(
"biography",
currentValue
)
)
)
.build();
}
}
else { (6)
biographyStore.put(key, biography);
}
return value;
}
private boolean isUnavailableValueMarker(String value) {
return "__debezium_unavailable_value".contentEquals(value);
}
@Override
public void close() {
}
}
} | 1 | 为存储每个客户 ID 的最新 biography 值设置状态存储 |
| 2 | 实际的流处理管道:对于 customers 主题上的每条消息,应用替换 TOAST 列标记值的逻辑,并将转换后的消息写入输出主题 |
| 3 | 检查传入消息中的 biography 值是否为标记值 |
| 4 | 如果是,则从状态存储中获取该客户当前的 biography 值 |
| 5 | 用从状态存储中获取的实际值替换标记值 |
| 6 | 如果传入消息具有实际的 biography 值,则将其放入状态存储 |
现在,如果某个使用者订阅了“已丰富”的主题,它将看到任何客户更改事件,其中包含从状态存储中物化的任何未更改 TOAST 列的实际值。Debezium 连接器最初发出特殊标记值的事实,在这一点上是完全透明的。
| 主键更改 当记录的主键被更新时,Debezium 将创建两个更改事件:一个使用旧键的“删除”事件和一个使用新键的“插入”事件。在处理第二个事件时,流处理应用程序将无法查找之前存储的 解决此问题的一种方法是将原始键值公开为插入事件的消息头。此需求已在 DBZ-1531 中跟踪;如果您想贡献并实现此功能,请告知我们。 |
何时使用哪种方法?
我们已经讨论了处理 Debezium 数据更改事件中未更改 TOAST 列值的各种选项。那么,在每种情况下应该使用哪种选项?
将复制标识更改为 FULL 是迄今为止最简单的方法:对源表进行一次配置即可从一开始就避免该问题。但它并不是最高效的解决方案,有些 DBA 可能不愿应用此设置。
当使用更改事件更新某种接收端数据存储时,在发出更新时简单地省略带有特殊标记值的字段,乍一看可能很有吸引力。但这种技术有一些缺点:并非所有数据存储和相应的连接器都支持部分更新。相反,在接收端数据存储中,基于传入数据,可能只有完全更新记录的选项。即使存在该选项,也可能不是最优的。例如,对于 SQL 数据库,可能会执行仅包含可用值的语句。但这与高效使用预编译语句和批量处理相冲突:由于同一表上的两次更新之间数据的“形状”可能会发生变化,因此无法重用相同的预编译语句,性能可能会受到影响。
基于触发器的实现方式不易出现这些问题:对表的任何更新都将具有相同数量的列,因此使用者(例如接收端连接器)可以重用相同的预编译语句,并将多个记录批量执行一次。需要注意的一点是与此方法相关的组织成本:必须为每个受影响的列安装触发器,并在表结构发生变化时保持同步。这必须在每个接收端数据存储中单独完成,而且并非所有存储都支持触发器。但是,如果可能,触发器可以是一个很好的解决方案。
最后,流处理使 TOAST 可用列以及它们在更新事件中缺失的值对消费者而言完全透明。丰富逻辑在一个地方实现,更改事件流的所有使用者从中受益,而无需在它们各自实现单独的解决方案。此外,如果消费者本身是无状态的,并且没有办法物化此类列的最后一个值,例如通过 WebSockets 或 GraphQL 订阅将更改事件流式传输到浏览器,那么这是唯一可行的解决方案。需要付出的代价是维护和运行单独服务的开销。
顺便说一句,此类流处理应用程序也可以作为 Debezium 平台的一部分,提供一个可配置的、即用型组件。这不仅对 Postgres 有用,而且在考虑其他 Debezium 连接器时也很有用。例如,在 Cassandra 的情况下,更改事件将只包含更新的字段;可以通过支持 MySQL 的“非完整”binlog 模式来设想类似的模式。在这两种情况下,都可以使用有状态流处理服务,基于从本地状态存储检索到的先前行状态和传入的“补丁”式更改事件来填充完整的数据更改事件。如果您认为这对 Debezium 是一个有用的补充,请告诉我们。
一如既往,没有万能的解决方案:您应该根据您的具体情况和要求选择一个解决方案。作为起点,您可以在 Debezium 的 示例存储库 中找到触发器和 Kafka Streams 方法的基本实现。
您会更喜欢哪种方法?或者您是否有更进一步的替代方案?请在下面的评论中告诉我们。
非常感谢 Dave Cramer 和 Jiri Pechanec 在处理这篇文章和配套示例代码时提供的反馈!
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。