您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

解码逻辑解码消息内容

DecodeLogicalDecodingMessageContent SMT 将 PostgreSQL 逻辑解码消息的二进制内容转换为结构化格式。

当 Debezium PostgreSQL 连接器捕获逻辑解码消息时,它会向 Kafka 发送 message 事件记录。默认情况下,这些消息记录中的 content 字段包含编码的二进制数据。为了方便其他 Kafka 消费者处理 PostgreSQL 事件消息,您可以使用 DecodeLogicalDecodingMessageContent SMT 来解码原始消息的二进制内容,并将其转换为更易于使用和理解的格式。

您还可以将此 SMT 与其他 SMT 结合使用,例如 Debezium Outbox Event Router

示例

要使 Debezium PostgreSQL 连接器能够解码消息事件中的二进制内容,请将 DecodeLogicalDecodingMessageContent SMT 添加到连接器的 Kafka Connect 配置中,如下面的示例所示。

"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
...
"transforms": "decodeLogicalDecodingMessageContent",
"transforms.decodeLogicalDecodingMessageContent.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
...

以下示例显示了转换应用之前和之后的事件记录的 keyvalue

示例 1. 应用 DecodeLogicalDecodingMessageContent SMT 的效果
处理前
SMT 处理记录之前的事件 key
{
	"prefix": "test-prefix"   (1)
}
SMT 处理记录之前的事件 value
{
	"op": "m",  (2)
	"ts_ms": 1723115240065,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115239782,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26997744\",\"26997904\"]",
		"ts_us": 1723115239782690,
		"ts_ns": 1723115239782690000,
		"schema": "",
		"table": "",
		"txId": 756,
		"lsn": 26997904,
		"xmin": null
	},
	"message": {     (3)
		"prefix": "test-prefix",
		"content": "eyJpZCI6IDEsICJpdGVtIjogIkRlYmV6aXVtIGluIEFjdGlvbiIsICJzdGF0dXMiOiAiRU5URVJFRCIsICJxdWFudGl0eSI6IDIsICJ0b3RhbFByaWNlIjogMzkuOTh9"
	}
}
处理后
SMT 处理记录之后的事件 key
null  (1)
SMT 处理记录之后的事件 value
{
	"op": "c",   (2)
	"ts_ms": 1723115415729,
	"source": {
		"version": "3.0.0-SNAPSHOT",
		"connector": "postgresql",
		"name": "connector-name",
		"ts_ms": 1723115415640,
		"snapshot": "false",
		"db": "source-db",
		"sequence": "[\"26717416\",\"26717576\"]",
		"ts_us": 1723115415640161,
		"ts_ns": 1723115415640161000,
		"schema": "",
		"table": "",
		"txId": 745,
		"lsn": 26717576,
		"xmin": null
	},
	"after": {     (3)
		"id": 1,
		"item": "Debezium in Action",
		"status": "ENTERED",
		"quantity": 2,
		"totalPrice": 39.98
	}
}

在前面的示例中,SMT 对原始事件记录应用了以下更改:

  1. 删除了包含原始逻辑解码消息中 prefix 字段的 key ("prefix": "test-prefix")。

  2. op 字段的值从 m (message) 转换为 c (create),从而有效地将事件类型从 message 更改为 INSERT

  3. 用包含解码后的逻辑解码消息内容的 after 字段替换 message 字段。

在 SMT 应用这些更改后,下游消费者或其他 SMT(如 Debezium Outbox Event Router)可以更轻松地处理该记录。

配置选项

下表列出了您可以在 DecodeLogicalDecodingMessageContent SMT 中使用的配置选项。

表 1. DecodeLogicalDecodingMessageContent SMT 配置选项

属性

Type

Default (默认值)

描述

boolean

false

指定解码过程如何处理源消息中值为 null 的字段。默认情况下,转换会删除值为 null 的字段。