您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

新记录状态提取

Debezium 连接器会发出数据更改消息,以表示它们从源数据库捕获的每个操作。连接器发送到 Apache Kafka 的消息具有复杂的结构,能够忠实地表示原始数据库事件的详细信息。

尽管这种复杂的邮件格式准确地详细说明了系统中的更改信息,但该格式可能不适合某些下游使用者。Sink 连接器或 Kafka 生态系统的其他部分可能需要格式化的消息,以便字段名称和值以简化的、展平的结构呈现。

为了简化 Debezium 连接器生成的事件记录的格式,您可以使用 Debezium 事件展平单消息转换 (SMT)。配置转换以支持需要 Kafka 记录格式比连接器生成的默认格式更简单的使用者。根据您的具体用例,您可以将 SMT 应用于 Debezium 连接器,或应用于消耗 Debezium 连接器生成的消息的 Sink 连接器。要使 Apache Kafka 能够保留 Debezium 更改事件消息的原始格式,请为 Sink 连接器配置 SMT。

事件展平转换是一个 Kafka Connect SMT

本章中的信息描述了适用于 Debezium 基于 SQL 的数据库连接器的事件展平单消息转换 (SMT)。有关适用于 Debezium MongoDB 连接器的等效 SMT 的信息,请参阅 MongoDB 新文档状态提取

更改事件结构

Debezium 生成具有复杂结构的数据更改事件。每个事件包含三部分

  • 元数据,包括但不限于

    • 更改数据的操作类型。

    • 源信息,例如更改发生的数据库和表的名称。

    • 标识更改发生时间的时戳。

    • 可选的事务信息。

  • 更改前的行数据

  • 更改后的行数据

以下示例显示了一个 UPDATE 更改事件消息结构的_部分

{
	"op": "u",
	"source": {
		...
	},
	"ts_ms" : "...",
	"ts_us" : "...",
	"ts_ns" : "...",
	"before" : {
		"field1" : "oldvalue1",
		"field2" : "oldvalue2"
	},
	"after" : {
		"field1" : "newvalue1",
		"field2" : "newvalue2"
	}
}

有关连接器的更改事件结构的更多信息,请参阅 连接器文档

在事件展平 SMT 处理完上一个示例中的消息后,它会简化消息格式,生成以下示例中的消息

{
	"field1" : "newvalue1",
	"field2" : "newvalue2"
}

行为

事件展平 SMT 从 Kafka 记录中的 Debezium 更改事件中提取 after 字段。SMT 用其 after 字段替换原始更改事件,以创建简单的 Kafka 记录。

您可以为 Debezium 连接器或消耗 Debezium 连接器发出的消息的 Sink 连接器配置事件展平 SMT。为 Sink 连接器配置事件展平的优点是存储在 Apache Kafka 中的记录包含完整的 Debezium 更改事件。将 SMT 应用于源连接器还是 Sink 连接器的决定取决于您的具体用例。

您可以配置转换以执行以下任一操作

  • 将更改事件中的元数据添加到简化的 Kafka 记录中。默认行为是 SMT 不添加元数据。

  • 在流中保留包含 DELETE 操作更改事件的 Kafka 记录。默认行为是 SMT 会丢弃 DELETE 操作更改事件的 Kafka 记录,因为大多数使用者尚无法处理它们。

数据库 DELETE 操作会导致 Debezium 生成两条 Kafka 记录

  • 包含 "op": "d",before 行数据以及其他一些字段的记录。

  • 具有与已删除行相同键且值为 null 的墓碑记录。此记录是 Apache Kafka 的标记。它指示 日志压缩 可以删除所有具有此键的记录。

您可以配置事件展平 SMT 以执行以下任一操作,而不是丢弃包含 before 行数据的记录

  • 将记录保留在流中,并将其编辑为仅包含 "value": "null" 字段。

  • 将记录保留在流中,并将其编辑为包含 value 字段,该字段包含 before 字段中的键/值对,并添加了一个 "__deleted": "true" 条目。

同样,您可以配置事件展平 SMT 以在流中保留墓碑记录,而不是丢弃它。

配置

通过将 SMT 配置详细信息添加到连接器的配置中,在 Kafka Connect 源或 Sink 连接器中配置 Debezium 事件展平 SMT。例如,要获得转换的默认行为,请将其添加到连接器配置中,而无需指定任何选项,如下面的示例所示

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState

与任何 Kafka Connect 连接器配置一样,您可以将 transforms= 设置为多个以逗号分隔的 SMT 别名,顺序是 Kafka Connect 应用 SMT 的顺序。

以下 .properties 示例设置了多个事件展平 SMT 选项

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.delete.tombstone.handling.mode=rewrite
transforms.unwrap.add.fields=table,lsn
delete.tombstone.handling.mode=rewrite

对于 DELETE 操作,删除墓碑,并通过展平更改事件中的 value 字段来编辑 Kafka 记录。value 字段直接包含 before 字段中的键/值对。SMT 添加 __deleted 并将其设置为 true,例如

"value": {
  "pk": 2,
  "cola": null,
  "__deleted": "true"
}
add.fields=table,lsn

tablelsn 字段的更改事件元数据添加到简化的 Kafka 记录中。

自定义配置

连接器可能会发出多种类型的事件消息(心跳消息、墓碑消息或关于事务或模式更改的元数据消息)。要将转换应用于事件的子集,您可以定义 SMT 谓词语句,以选择性地仅将转换应用于特定事件

添加元数据

您可以配置事件展平 SMT,将原始更改事件元数据添加到简化的 Kafka 记录中。例如,您可能希望简化的记录的头或值包含以下任一内容

  • 导致更改的操作类型

  • 被更改的数据库或表的名称

  • 连接器特定的字段,例如 Postgres LSN 字段

有关可用内容的更多信息,请参阅 每个连接器的文档

要将元数据添加到简化的 Kafka 记录的头中,请指定 add.headers 选项。要将元数据添加到简化的 Kafka 记录的值中,请指定 add.fields 选项。这些选项中的每一个都接受一个逗号分隔的字段名称列表,不包含空格。当存在重复的字段名时,要添加其中一个字段的元数据,请同时指定结构和字段。例如

transforms=unwrap,...
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table,lsn,source.ts_ms
transforms.unwrap.add.headers=db
transforms.unwrap.delete.tombstone.handling.mode=rewrite

使用该配置,简化的 Kafka 记录将包含类似以下内容的内容

{
 ...
	"__op" : "c",
	"__table": "MY_TABLE",
	"__lsn": "123456789",
	"__source_ts_ms" : "123456789",
 ...
}

此外,简化的 Kafka 记录将有一个 __db 头。

在简化的 Kafka 记录中,SMT 会在元数据字段名前加上双下划线。当您指定一个结构时,SMT 也会在结构名和字段名之间插入一个下划线。

要将元数据添加到 DELETE 操作的简化 Kafka 记录中,您还必须配置 delete.tombstone.handling.mode=rewrite

选择性应用事件展平转换的选项

除了 Debezium 连接器在数据库发生更改时发出的更改事件消息之外,连接器还会发出其他类型的消息,包括心跳消息以及关于模式更改和事务的元数据消息。由于这些其他消息的结构与 SMT 设计用于处理的更改事件消息的结构不同,因此最好将连接器配置为选择性地应用 SMT,以便它只处理预期的数据更改消息。

有关如何选择性应用 SMT 的更多信息,请参阅 为转换配置 SMT 谓词

配置选项

下表描述了您可以用于配置事件展平 SMT 的选项。

表 1. 事件展平 SMT 配置选项说明
Option Default (默认值) 描述

tombstone

Debezium 为每个 DELETE 操作生成一个更改事件记录。此设置决定了事件展平 SMT 如何处理流中的 DELETE 事件。

设置以下选项之一:

drop

SMT 会从流中移除 DELETE 事件和 TOMBSTONE

tombstone (默认)

SMT 会将 TOMBSTONE 记录保留在流中。TOMBSTONE 记录仅包含以下值:"value": "null"

rewrite

SMT 会将更改事件记录保留在流中并进行以下更改:

  • 向记录添加一个 value 字段,其中包含原始记录的 before 字段中的键/值对。

  • 向记录的 value 添加 __deleted: true

  • 删除 TOMBSTONE 记录。

    此设置提供了另一种指示记录已被删除的方式。

rewrite-with-tombstone

SMT 的行为与选择 rewrite 选项时相同,不同之处在于它还会保留 TOMBSTONE 记录。

delete-to-tombstone

SMT 会从流中移除 TOMBSTONE 记录,并将 DELETE 记录转换为 TOMBSTONE 记录。

要使用行数据确定路由记录的主题,请将此选项设置为 after 字段属性。SMT 将记录路由到名称与指定 after 字段属性的值匹配的主题。对于 DELETE 操作,请将此选项设置为 before 字段属性。

例如,配置 route.by.field=destination 会将记录路由到名称为 after.destination 值的主题。默认行为是 Debezium 连接器将每个更改事件记录发送到由数据库名称和发生更改的表名称组成的名称的主题。

如果您在 Sink 连接器上配置事件展平 SMT,则设置此选项可能很有用,因为目标主题名称决定了将被简化更改事件记录更新的数据库表名称。如果主题名称不符合您的用例,您可以配置 route.by.field 来重新路由事件。

__ (双下划线)

将此可选字符串设置为要添加为字段前缀。

将此选项设置为一个逗号分隔的列表,不带空格,包含要添加到简化的 Kafka 记录值中的元数据字段。当存在重复的字段名时,要添加其中一个字段的元数据,请同时指定结构和字段,例如 source.ts_ms

可选地,您可以通过 <field name>:<new field name> 来覆盖字段名,例如:新的字段名如 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP。请注意,new field name 是区分大小写的。

当 SMT 将元数据字段添加到简化记录的值时,它会在每个元数据字段名前加上双下划线。对于结构规范,SMT 也会在结构名和字段名之间插入一个下划线。

如果您指定了一个不在更改事件记录中的字段,SMT 仍会将该字段添加到记录的值中。

__ (双下划线)

设置此可选字符串以在标题前添加前缀。

将此选项设置为一个逗号分隔的列表,不带空格,包含要添加到简化的 Kafka 记录头中的元数据字段。当存在重复的字段名时,要添加其中一个字段的元数据,请同时指定结构和字段,例如 source.ts_ms

可选地,您可以通过 <field name>:<new field name> 来覆盖字段名,例如:新的字段名如 version:VERSION, connector:CONNECTOR, source.ts_ms:EVENT_TIMESTAMP。请注意,new field name 是区分大小写的。

当 SMT 将元数据字段添加到简化记录的头中时,它会在每个元数据字段名前加上双下划线。对于结构规范,SMT 也会在结构名和字段名之间插入一个下划线。

如果您指定了一个不在更改事件记录中的字段,SMT 不会将该字段添加到头中。

用于列出源消息中要从输出消息中删除的字段名的 Kafka 消息头名称。

false

指定是否希望 SMT 从事件的键中删除 drop.fields.header.name 中列出的字段。

true

指定是否希望 SMT 从 drop.fields.header.name 配置属性中包含的非可选字段中删除字段。

默认情况下,SMT 只删除标记为 optional 的字段。

true

指定是否希望 SMT 用源定义的默认值替换记录的 null 值。