事件记录更改

此单一消息转换 (SMT) 仅支持 SQL 数据库连接器。

Debezium 数据更改事件具有复杂的结构,提供丰富的信息。但是,在某些情况下,下游使用者在处理 Debezium 更改事件消息之前,需要有关由原始数据库更改引起字段级更改的附加信息。为了通过关于数据库操作如何修改源数据库字段的详细信息来增强事件消息,Debezium 提供了 ExtractChangedRecordState 单一消息转换 (SMT)。

事件更改转换是 Kafka Connect SMT

更改事件结构

Debezium 生成具有复杂结构的数据更改事件。每个事件由以下部分组成:

  • 元数据,包括但不限于以下类型:

    • 更改数据的操作类型。

    • 源信息,例如更改发生的数据库和表的名称。

    • 标识更改发生时间的时戳。

    • 可选的事务信息。

  • 更改前的行数据。

  • 更改后的行数据。

以下示例显示了典型 Debezium UPDATE 更改事件的结构的一部分:

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

有关更改事件结构的更多详细信息,请参阅 每个连接器的文档

前面示例中消息的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,该格式可能不适合某些下游使用者。Sink 连接器或 Kafka 生态系统的其他部分可能期望消息显式标识已更改或未更改的字段的数据库操作。ExtractChangedRecordState SMT 将标头添加到更改事件消息中,以标识由数据库操作修改的字段以及未修改的字段。

行为

event changes SMT 从 Kafka 记录中的 Debezium UPDATE 更改事件中提取 beforeafter 字段。该转换会检查 beforeafter 事件状态结构,以识别由操作修改的字段以及未修改的字段。根据连接器配置,转换会生成修改后的事件消息,添加消息标头以列出已更改的字段、未更改的字段或两者。如果事件代表 INSERTDELETEREAD,则此单一消息转换会将配置的标头添加为空列表,因为没有更改或未更改的字段。

您可以为 Debezium 连接器或使用 Debezium 连接器发出的消息的 Sink 连接器配置 event changes SMT。如果您希望 Apache Kafka 保留原始 Debezium 更改事件的全部内容,请为 Sink 连接器配置 event changes SMT。将 SMT 应用于源连接器还是 Sink 连接器的决定取决于您的特定用例。

根据您的用例,您可以配置转换以执行以下一项或两项任务来修改原始消息:

  • 通过将 UPDATE 事件更改的字段列在用户配置的 header.changed.name 标头中来识别它们。

  • 通过将 UPDATE 事件未更改的字段列在用户配置的 header.unchanged.name 标头中来识别它们。

配置

您可以通过将 SMT 配置详细信息添加到连接器配置中,为 Kafka Connect 源或 Sink 连接器配置 Debezium event changes SMT。要获得默认行为(不添加任何标头),请将转换添加到连接器配置中,如下例所示:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState

与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个以逗号分隔的 SMT 别名,顺序是 Kafka Connect 应用 SMT 的顺序。

以下示例中的连接器配置为 event changes SMT 设置了多个选项:

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
header.changed.name

用于存储数据库操作所更改字段的逗号分隔列表的 Kafka 消息标头名称。

header.unchanged.name

用于存储数据库操作后保持不变的字段的逗号分隔列表的 Kafka 消息标头名称。

自定义配置

连接器可能发出多种类型的事件消息(心跳消息、墓碑消息或有关事务或架构更改的元数据消息)。要将转换应用于事件子集,您可以定义 SMT 谓词语句,选择性地将转换应用于特定事件。

选择性应用事件更改转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。

有关如何选择性应用 SMT 的更多信息,请参阅 为转换配置 SMT 谓词

配置选项

下表描述了可用于配置 event changes SMT 的选项。

表 1. event changes SMT 配置选项说明
Option Default (默认值) 描述

用于存储数据库操作所更改字段的逗号分隔列表的 Kafka 消息标头名称。

用于存储数据库操作后保持不变的字段的逗号分隔列表的 Kafka 消息标头名称。