解码逻辑解码消息内容
DecodeLogicalDecodingMessageContent SMT 将 PostgreSQL 逻辑解码消息的二进制内容转换为结构化格式。
当 Debezium PostgreSQL 连接器捕获逻辑解码消息时,它会将 消息事件记录 发送到 Kafka。默认情况下,这些消息记录中的 content 字段包含编码的二进制数据。为了方便其他 Kafka 消费者处理 PostgreSQL 事件消息,您可以使用 DecodeLogicalDecodingMessageContent SMT 来解码原始消息的二进制内容,并将其转换为更易于使用的格式。
您还可以将此 SMT 与其他 SMT 结合使用,例如 Debezium Outbox Event Router。
示例
要使 Debezium PostgreSQL 连接器能够解码消息事件中的二进制内容,请将 DecodeLogicalDecodingMessageContent SMT 添加到连接器的 Kafka Connect 配置中,如下例所示:
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
...
"transforms": "decodeLogicalDecodingMessageContent",
"transforms.decodeLogicalDecodingMessageContent.type": "io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
...
下面的示例显示了在应用转换之前和之后事件记录的 key 和 value。
示例 1. 应用
DecodeLogicalDecodingMessageContent SMT 的效果处理前
- SMT 处理记录之前的事件键
-
{ "prefix": "test-prefix" (1) } - SMT 处理记录之前的事件值
-
{ "op": "m", (2) "ts_ms": 1723115240065, "source": { "version": "3.0.0-SNAPSHOT", "connector": "postgresql", "name": "connector-name", "ts_ms": 1723115239782, "snapshot": "false", "db": "source-db", "sequence": "[\"26997744\",\"26997904\"]", "ts_us": 1723115239782690, "ts_ns": 1723115239782690000, "schema": "", "table": "", "txId": 756, "lsn": 26997904, "xmin": null }, "message": { (3) "prefix": "test-prefix", "content": "eyJpZCI6IDEsICJpdGVtIjogIkRlYmV6aXVtIGluIEFjdGlvbiIsICJzdGF0dXMiOiAiRU5URVJFRCIsICJxdWFudGl0eSI6IDIsICJ0b3RhbFByaWNlIjogMzkuOTh9" } }
处理后
- SMT 处理记录之后的事件键
null (1)
- SMT 处理记录之后的事件值
-
{ "op": "c", (2) "ts_ms": 1723115415729, "source": { "version": "3.0.0-SNAPSHOT", "connector": "postgresql", "name": "connector-name", "ts_ms": 1723115415640, "snapshot": "false", "db": "source-db", "sequence": "[\"26717416\",\"26717576\"]", "ts_us": 1723115415640161, "ts_ns": 1723115415640161000, "schema": "", "table": "", "txId": 745, "lsn": 26717576, "xmin": null }, "after": { (3) "id": 1, "item": "Debezium in Action", "status": "ENTERED", "quantity": 2, "totalPrice": 39.98 } }
在前面的示例中,SMT 对原始事件记录进行了以下更改:
-
删除了原始逻辑解码消息中包含
prefix字段的键("prefix": "test-prefix")。 -
将
op字段的值从m(消息)转换为c(创建),从而有效地将事件类型从message更改为INSERT。 -
用包含逻辑解码消息解码内容的
after字段替换message字段。
在 SMT 应用这些更改后,下游消费者或其他 SMT(如 Debezium Outbox Event Router)可以更轻松地处理该记录。