您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

事件记录更改

此单条消息转换 (SMT) 仅支持 SQL 数据库连接器。

Debezium 数据更改事件具有复杂的结构,提供了丰富的信息。然而,在某些情况下,下游消费者在处理 Debezium 更改事件消息之前,需要有关因原始数据库更改而产生的字段级更改的额外信息。为了用有关数据库操作如何修改源数据库中字段的详细信息来增强事件消息,Debezium 提供了 ExtractChangedRecordState 单条消息转换 (SMT)。

事件更改转换是 Kafka Connect SMT

更改事件结构

Debezium 生成具有复杂结构的数据更改事件。每个事件包含以下部分

  • 元数据,包括但不限于以下类型

    • 更改数据的操作类型。

    • 源信息,例如更改发生的数据库和表的名称。

    • 标识更改发生时间的时戳。

    • 可选的事务信息。

  • 更改前的行数据。

  • 更改后的行数据。

以下示例显示了一个典型的 Debezium UPDATE 更改事件的部分结构

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

每个连接器的文档 中提供了有关更改事件结构的更多详细信息。

前面示例中消息的复杂格式提供了有关源数据库中发生的更改的详细信息。但是,该格式可能不适用于某些下游消费者。Sink 连接器或 Kafka 生态系统的其他部分可能期望消息明确标识数据库操作更改或未更改的字段。ExtractChangedRecordState SMT 向更改事件消息添加标头,以标识数据库操作修改的字段以及未更改的字段。

行为

事件更改 SMT 从 Kafka 记录中的 Debezium UPDATE 更改事件中提取 beforeafter 字段。该转换会检查 beforeafter 事件状态结构,以识别操作修改的字段以及未更改的字段。根据连接器配置,该转换会生成修改后的事件消息,添加消息标头以列出已更改的字段、未更改的字段或两者。如果事件代表 INSERTDELETEREAD,则此单条消息转换会将配置的标头添加为空列表,因为没有已更改或未更改的字段。

您可以为 Debezium 连接器或消耗 Debezium 连接器发出的消息的 sink 连接器配置事件更改 SMT。如果您希望 Apache Kafka 保留整个原始 Debezium 更改事件,请为 sink 连接器配置事件更改 SMT。将 SMT 应用于源连接器还是 sink 连接器取决于您的具体用例。

根据您的用例,您可以配置转换以通过执行以下一项或两项任务来修改原始消息

  • 通过在用户配置的 header.changed.name 标头中列出它们来标识由 UPDATE 事件更改的字段。

  • 通过在用户配置的 header.unchanged.name 标头中列出它们来标识未被 UPDATE 事件更改的字段。

配置

通过将 SMT 配置详细信息添加到连接器的配置中,您可以为 Kafka Connect 源或 sink 连接器配置 Debezium 事件更改 SMT。要获得默认行为(不添加任何标头),请将转换添加到连接器配置中,如下例所示

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState

与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个以逗号分隔的 SMT 别名,顺序是 Kafka Connect 应用 SMT 的顺序。

以下示例中的连接器配置为事件更改 SMT 设置了多个选项

transforms=changes,...
transforms.changes.type=io.debezium.transforms.ExtractChangedRecordState
transforms.changes.header.changed.name=Changed
transforms.changes.header.unchanged.name=Unchanged
header.changed.name

用于存储由数据库操作更改的字段的逗号分隔列表的 Kafka 消息标头名称。

header.unchanged.name

用于存储数据库操作后未更改的字段的逗号分隔列表的 Kafka 消息标头名称。

自定义配置

连接器可能会发出多种类型的事件消息(心跳消息、墓碑消息、或有关事务或模式更改的元数据消息)。要将转换应用于事件子集,您可以定义 SMT 谓词语句,该语句选择性地仅将转换应用于特定事件

选择性应用事件更改转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。

有关如何选择性应用 SMT 的更多信息,请参阅 为转换配置 SMT 谓词

配置选项

下表描述了用于配置事件更改 SMT 的选项。

表 1. 事件更改 SMT 配置选项说明
Option Default (默认值) 描述

用于存储由数据库操作更改的字段的逗号分隔列表的 Kafka 消息标头名称。

用于存储数据库操作后未更改的字段的逗号分隔列表的 Kafka 消息标头名称。