解码逻辑解码消息内容

DecodeLogicalDecodingMessageContent SMT 将 PostgreSQL 逻辑解码消息的二进制内容转换为结构化格式。

当 Debezium PostgreSQL 连接器捕获逻辑解码消息时,它会将 消息事件记录 发送到 Kafka。默认情况下,这些消息记录中的 content 字段包含编码的二进制数据。为了方便其他 Kafka 消费者处理 PostgreSQL 事件消息,您可以使用 DecodeLogicalDecodingMessageContent SMT 来解码原始消息的二进制内容,并将其转换为更易于使用的格式。

您还可以将此 SMT 与其他 SMT 结合使用,例如 Debezium Outbox Event Router

示例

要使 Debezium PostgreSQL 连接器能够解码消息事件中的二进制内容,请将 DecodeLogicalDecodingMessageContent SMT 添加到连接器的 Kafka Connect 配置中,如下例所示:

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
...
"transforms": "decodeLogicalDecodingMessageContent",
"transforms.decodeLogicalDecodingMessageContent.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
...

下面的示例显示了在应用转换之前和之后事件记录的 keyvalue

示例 1. 应用 DecodeLogicalDecodingMessageContent SMT 的效果
处理前
SMT 处理记录之前的事件键
{
	"prefix": "test-prefix"   (1)
}
SMT 处理记录之前的事件值
{
	"op": "m",  (2)
	"ts_ms": 1723115240065,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115239782,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26997744\",\"26997904\"]",
		"ts_us": 1723115239782690,
		"ts_ns": 1723115239782690000,
		"schema": "",
		"table": "",
		"txId": 756,
		"lsn": 26997904,
		"xmin": null
	},
	"message": {     (3)
		"prefix": "test-prefix",
		"content": "eyJpZCI6IDEsICJpdGVtIjogIkRlYmV6aXVtIGluIEFjdGlvbiIsICJzdGF0dXMiOiAiRU5URVJFRCIsICJxdWFudGl0eSI6IDIsICJ0b3RhbFByaWNlIjogMzkuOTh9"
	}
}
处理后
SMT 处理记录之后的事件键
null  (1)
SMT 处理记录之后的事件值
{
	"op": "c",   (2)
	"ts_ms": 1723115415729,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115415640,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26717416\",\"26717576\"]",
		"ts_us": 1723115415640161,
		"ts_ns": 1723115415640161000,
		"schema": "",
		"table": "",
		"txId": 745,
		"lsn": 26717576,
		"xmin": null
	},
	"after": {     (3)
		"id": 1,
		"item": "Debezium in Action",
		"status": "ENTERED",
		"quantity": 2,
		"totalPrice": 39.98
	}
}

在前面的示例中,SMT 对原始事件记录进行了以下更改:

  1. 删除了原始逻辑解码消息中包含 prefix 字段的键("prefix": "test-prefix")。

  2. op 字段的值从 m(消息)转换为 c(创建),从而有效地将事件类型从 message 更改为 INSERT

  3. 用包含逻辑解码消息解码内容的 after 字段替换 message 字段。

在 SMT 应用这些更改后,下游消费者或其他 SMT(如 Debezium Outbox Event Router)可以更轻松地处理该记录。

配置选项

下表列出了您可以在 DecodeLogicalDecodingMessageContent SMT 中使用的配置选项。

表 1. DecodeLogicalDecodingMessageContent SMT 配置选项

属性

Type

Default (默认值)

描述

boolean

false

指定解码过程如何处理源消息中值为 null 的字段。默认情况下,转换会删除值为 null 的字段。