Debezium 事件反序列化
|
此功能目前处于孵化阶段,即确切的语义、配置选项等可能会在未来的修订版中根据我们收到的反馈进行更改。在使用这些 SerDes 时,如果您遇到任何问题,请告诉我们。 |
Debezium 生成的数据更改事件具有复杂的消息结构。此消息随后由配置的 Kafka Connect 转换器序列化,消费者有责任将其反序列化为逻辑消息。为此,Kafka 使用所谓的 SerDes。
Debezium 提供了 SerDes (io.debezium.serde.DebeziumSerdes) 来简化消费者(无论是 Kafka Streams 流管道还是普通 Kafka 消费者)的反序列化。
JSON SerDe
JSON SerDe 会反序列化 JSON 编码的更改事件,并将其转换为 Java 类。内部通过 Jackson Databind 实现。
消费者使用以下方式创建一个 serde 实例
final Serde<MyType> serde = DebeziumSerdes.payloadJson(MyType.class);
然后,消费者将收到逻辑 Java 类型 MyType,其字段将从 JSON 消息中初始化。这既适用于键也适用于值。也可以使用纯 Java 类型,例如 Integer,当键只包含一个 INT 字段时。
当 Kafka Connect 使用 JSON 转换器时,它通常提供两种操作模式——带模式或不带模式。如果使用模式,则消息如下所示
{
"schema": {...},
"payload": {
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
}
而在不带模式的情况下,结构看起来更像这样
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
反序列化器的行为由 from.field 配置选项驱动,并遵循以下规则
-
如果消息包含模式,则仅使用
payload -
如果键被反序列化,则将键字段映射到目标类
-
如果值被反序列化并且包含 Debezium 事件信封,那么
-
如果未设置
from.field,则将整个信封反序列化到目标类型 -
否则,将配置的字段内容反序列化并映射到目标类型,从而有效地展平消息
-
-
如果值被反序列化并且已经包含一个展平的消息(即,在使用 SMT 进行 事件展平 时),则将展平的记录映射到目标逻辑类型