Debezium 事件反序列化
|
此功能目前处于孵化状态,即确切的语义、配置选项等可能会根据我们收到的反馈在未来的版本中发生变化。在使用这些 SerDes 时,如果您遇到任何问题,请告知我们。 |
Debezium 以复杂的结构化消息的形式生成数据变更事件。此消息随后会被配置的 Kafka Connect 转换器序列化,并且由消费者负责将其反序列化为逻辑消息。为此,Kafka 使用所谓的 SerDes。
Debezium 提供了 SerDes (io.debezium.serde.DebeziumSerdes) 来简化消费者(无论是 Kafka Streams 管道还是普通 Kafka 消费者)的反序列化。
JSON SerDe
JSON SerDe 会反序列化 JSON 编码的变更事件,并将其转换为 Java 类。这在内部通过 Jackson Databind 实现。
消费者使用以下方式创建一个 serde 实例
final Serde<MyType> serde = DebeziumSerdes.payloadJson(MyType.class);
消费者随后将接收到的逻辑 Java 类型 MyType 的字段将从 JSON 消息中初始化。这同样适用于键和值。也可以使用普通的 Java 类型,例如 Integer,当键只包含一个 INT 字段时。
当 Kafka Connect 使用 JSON 转换器时,它通常提供两种操作模式——带模式或不带模式。如果使用了模式,则消息如下所示:
{
"schema": {...},
"payload": {
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
}
而在不带模式的情况下,结构则更像这样:
{
"op": "u",
"source": {
...
},
"ts_ms" : "...",
"ts_us" : "...",
"ts_ns" : "...",
"before" : {
"field1" : "oldvalue1",
"field2" : "oldvalue2"
},
"after" : {
"field1" : "newvalue1",
"field2" : "newvalue2"
}
}
反序列化器的行为由 from.field 配置选项驱动,并遵循以下规则:
-
如果消息包含模式,则仅使用
payload -
如果正在反序列化键,则将键字段映射到目标类
-
如果正在反序列化值并且该值包含 Debezium 事件信封,则:
-
如果未设置
from.field,则将整个信封反序列化为目标类型 -
否则,仅反序列化并映射到配置字段的内容到目标类型,从而有效地扁平化消息
-
-
如果正在反序列化值并且该值已包含一个扁平化的消息(例如,在使用 SMT 进行 事件扁平化 时),则将扁平化的记录映射到目标逻辑类型。