导出 CloudEvents
CloudEvents 是一项关于如何以通用方式描述事件数据的规范。其目标是在服务、平台和系统之间提供互操作性。Debezium 使您能够配置 Db2、Informix、MongoDB、MySQL、Oracle、PostgreSQL 或 SQL Server 连接器,以发出符合 CloudEvents 规范的更改事件记录。
|
对 CloudEvents 的支持尚处于孵化阶段。这意味着确切的语义、配置选项和其他细节可能会在未来的修订版中根据反馈进行更改。在使用此功能时,请告知我们您的具体需求或您遇到的任何问题。 |
CloudEvents 规范定义了
-
一组标准化的事件属性
-
定义自定义属性的规则
-
将事件格式映射到 JSON 或 Apache Avro 等序列化表示形式的编码规则
-
适用于 Apache Kafka、HTTP 或 AMQP 等传输层的协议绑定
要配置 Debezium 连接器以发出符合 CloudEvents 规范的更改事件记录,Debezium 提供了 io.debezium.converters.CloudEventsConverter,这是一个 Kafka Connect 消息转换器。
目前,只能使用结构化映射模式。CloudEvents 更改事件信封可以是 JSON 或 Avro,并且您可以为每个信封类型使用 JSON 或 Avro 作为 data 格式。预计未来的 Debezium 版本将支持二进制映射模式。
有关使用 Avro 的信息,请参阅
示例事件格式
以下示例显示了 PostgreSQL 连接器发出的 CloudEvents 更改事件记录的样式。在此示例中,PostgreSQL 连接器已配置为使用 JSON 作为 CloudEvents 格式信封,并将其用作 data 格式。
{
"id" : "name:test_server;lsn:29274832;txId:565", (1)
"source" : "/debezium/postgresql/test_server", (2)
"specversion" : "1.0", (3)
"type" : "io.debezium.connector.postgresql.DataChangeEvent", (4)
"time" : "2020-01-13T13:55:39.738Z", (5)
"datacontenttype" : "application/json", (6)
"iodebeziumop" : "r", (7)
"iodebeziumversion" : "3.3.1.Final", (8)
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578923739738",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumlsn" : "29274832",
"iodebeziumxmin" : null,
"iodebeziumtxid": "565", (9)
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : { (10)
"before" : null,
"after" : {
"pk" : 1,
"name" : "Bob"
}
}
}
| Item | 描述 |
|---|---|
1 |
连接器根据更改事件的内容为更改事件生成的唯一 ID。 |
2 |
事件的源,即数据库的逻辑名称,如连接器配置中的 |
3 |
CloudEvents 规范版本。 |
4 |
生成更改事件的连接器类型。此字段的格式为 |
5 |
源数据库中更改的时间。 |
6 |
描述 |
7 |
操作标识符。可能的值为 |
8 |
从 Debezium 更改事件中已知的 |
9 |
当在连接器中启用时,从 Debezium 更改事件中已知的每个 |
10 |
实际的数据更改。根据操作和连接器,数据可能包含 |
以下示例还显示了 PostgreSQL 连接器发出的 CloudEvents 更改事件记录的样式。在此示例中,PostgreSQL 连接器再次配置为使用 JSON 作为 CloudEvents 格式信封,但这次连接器配置为使用 Avro 作为 data 格式。
{
"id" : "name:test_server;lsn:33227720;txId:578",
"source" : "/debezium/postgresql/test_server",
"specversion" : "1.0",
"type" : "io.debezium.connector.postgresql.DataChangeEvent",
"time" : "2020-01-13T14:04:18.597Z",
"datacontenttype" : "application/avro", (1)
"dataschema" : "http://my-registry/schemas/ids/1", (2)
"iodebeziumop" : "r",
"iodebeziumversion" : "3.3.1.Final",
"iodebeziumconnector" : "postgresql",
"iodebeziumname" : "test_server",
"iodebeziumtsms" : "1578924258597",
"iodebeziumsnapshot" : "true",
"iodebeziumdb" : "postgres",
"iodebeziumschema" : "s1",
"iodebeziumtable" : "a",
"iodebeziumtxId" : "578",
"iodebeziumlsn" : "33227720",
"iodebeziumxmin" : null,
"iodebeziumtxid": "578",
"iodebeziumtxtotalorder": "1",
"iodebeziumtxdatacollectionorder": "1",
"data" : "AAAAAAEAAgICAg==" (3)
}
| Item | 描述 |
|---|---|
1 |
指示 |
2 |
Avro 数据所遵循的模式的 URI。 |
3 |
|
也可以使用 Avro 作为信封和 data 属性。
示例配置
在 Debezium 连接器配置中配置 io.debezium.converters.CloudEventsConverter。以下示例显示了如何配置 CloudEvents 转换器以发出具有以下特征的更改事件记录:
-
使用 JSON 作为信封。
-
使用
http://my-registry/schemas/ids/1的模式注册表将data属性序列化为二进制 Avro 数据。
...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json", (1)
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
| Item | 描述 |
|---|---|
1 |
指定 |
CloudEvents 转换器会转换 Kafka 记录值。在相同的连接器配置中,您可以指定 key.converter 来操作记录键。例如,您可以指定 StringConverter、LongConverter、JsonConverter 或 AvroConverter。
元数据源和部分 CloudEvents 字段的配置
默认情况下,metadata.source 属性由五个部分组成,如下例所示:
"value,id:generate,type:generate,traceparent:header,dataSchemaName:generate"
第一部分指定用于检索记录元数据的源;允许的值为 value 和 header。接下来的部分指定转换器如何填充以下 CloudEvents 字段和数据模式名称的值:
-
id -
type -
dataSchemaName(模式在模式注册表中注册的名称)
转换器可以使用以下任一方法填充每个字段:
generate-
转换器为字段生成一个值。
header-
转换器从消息头获取字段的值。
traceparent CloudEvents 字段的值只能从头信息中获取。 |
获取记录元数据
要构建 CloudEvent,转换器需要源、操作和事务元数据。通常,转换器可以从记录的值中检索元数据。但在某些情况下,在转换器接收记录之前,记录可能已经被处理,导致其值中缺少元数据,例如,在记录被 Outbox Event Router SMT 处理之后。为了保留所需的元数据,您可以使用以下方法将元数据传递到记录头中。
-
实现一种机制,在记录到达转换器之前将其元数据记录到记录的头中,例如,使用
HeaderFromSMT。 -
将转换器的
metadata.source属性值设置为header。
以下示例显示了使用 Outbox Event Router SMT 和 HeaderFrom SMT 的连接器的配置:
...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
要使用 HeaderFrom 转换,可能需要过滤 tombstone 和 heartbeat 消息。 |
metadata.source 属性的 header 值是一个全局设置。因此,即使您省略了属性值的一部分(例如 id 和 type 源),转换器也会为省略的部分生成 header 值。
获取 CloudEvent 元数据
默认情况下,CloudEvents 转换器会自动为 CloudEvent 的 id 和 type 字段生成值,并为其 data 字段生成模式名称。仅当 opentelemetry.tracing.attributes.enable 设置为 true 时,traceparent CloudEvents 字段才会包含在消息中。您可以通过更改默认值并在适当的头中指定字段值来定制转换器填充这些字段的方式。例如:
"value.converter.metadata.source": "value,id:header,type:header,traceparent:header,dataSchemaName:header"
在上述配置生效的情况下,您可以配置上游函数以添加 id、type、traceparent 和 dataSchemaName 头,并带有您想要传递给 CloudEvents 转换器的值。
如果您只想为 id 头提供值,请使用:
"value.converter.metadata.source": "value,id:header,type:generate,traceparent:header,dataSchemaName:generate"
要配置转换器以从头信息获取 id、type、traceparent 和 dataSchemaName 元数据,请使用以下简短语法:
"value.converter.metadata.source": "header"
要使转换器能够从头信息检索数据模式名称,您必须将 schema.data.name.source.header.enable 设置为 true。
配置选项
当您配置 Debezium 连接器以使用 CloudEvent 转换器时,您可以指定以下选项:
Option |
Default (默认值) |
描述 |
|
用于 CloudEvents 信封结构的编码类型。值可以是 |
|
|
用于 |
|
N/A |
在使用 JSON 时要传递到底层转换器的任何配置选项。 |
|
N/A |
在使用 Avro 时要传递到底层转换器的任何配置选项。 |
|
none |
指定模式名称应如何调整以与连接器使用的消息转换器兼容。值可以是 |
|
none |
指定模式在模式注册表中注册的 CloudEvents 模式名称。当 |
|
|
指定转换器是否可以从头信息中检索 CloudEvents |
|
|
指定转换器在生成云事件时是否包含 OpenTelemetry 跟踪属性。值可以是 |
|
|
指定转换器在生成云事件时是否包含扩展属性。值可以是 |
|
|
一个逗号分隔的列表,指定转换器从中检索元数据值(源、操作、事务)的源,用于
有关配置示例,请参阅 元数据源和部分 CloudEvents 字段的配置。 |