您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

导出 CloudEvents

CloudEvents 是一项用于以通用方式描述事件数据的规范。其目标是提供跨服务、平台和系统的互操作性。Debezium 使您能够配置 Db2、Informix、MongoDB、MySQL、Oracle、PostgreSQL 或 SQL Server 连接器,以发出符合 CloudEvents 规范的变更事件记录。

对 CloudEvents 的支持尚处于孵化阶段。这意味着确切的语义、配置选项和其他细节可能会在未来的版本中根据反馈进行更改。在使用此功能时,请告诉我们您的具体要求或您遇到的任何问题。

CloudEvents 规范定义了

  • 一组标准化的事件属性

  • 定义自定义属性的规则

  • 将事件格式映射到 JSON 或 Apache Avro 等序列化表示形式的编码规则

  • 用于 Apache Kafka、HTTP 或 AMQP 等传输层的协议绑定

要配置 Debezium 连接器以发出符合 CloudEvents 规范的变更事件记录,Debezium 提供了 io.debezium.converters.CloudEventsConverter,这是一个 Kafka Connect 消息转换器。

目前,只能使用结构化映射模式。CloudEvents 变更事件信封可以是 JSON 或 Avro,并且您可以将 JSON 或 Avro 作为每种信封类型的 data 格式。预计未来的 Debezium 版本将支持二进制映射模式。

有关使用 Avro 的信息,请参阅

事件格式示例

以下示例显示了 PostgreSQL 连接器发出的 CloudEvents 变更事件记录的外观。在此示例中,PostgreSQL 连接器配置为使用 JSON 作为 CloudEvents 格式信封以及 data 格式。

{
  "id" : "name:test_server;lsn:29274832;txId:565",  (1)
  "source" : "/debezium/postgresql/test_server",    (2)
  "specversion" : "1.0",                            (3)
  "type" : "io.debezium.connector.postgresql.DataChangeEvent", (4)
  "time" : "2020-01-13T13:55:39.738Z",             (5)
  "datacontenttype" : "application/json",          (6)
  "iodebeziumop" : "r",                            (7)
  "iodebeziumversion" : "3.3.0.Final",      (8)
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578923739738",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumlsn" : "29274832",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "565",                       (9)
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : {                                       (10)
    "before" : null,
    "after" : {
      "pk" : 1,
      "name" : "Bob"
    }
  }
}
表 1. CloudEvents 变更事件记录中字段的描述
Item 描述

1

连接器根据变更事件的内容为变更事件生成的唯一 ID。

2

事件的来源,即数据库的逻辑名称,由连接器配置中的 topic.prefix 属性指定。

3

CloudEvents 规范版本。

4

生成变更事件的连接器类型。此字段的格式为 io.debezium.connector.CONNECTOR_TYPE.DataChangeEvent
CONNECTOR_TYPE 的有效值为 db2informixmongodbmysqloraclepostgresqlsqlserver

5

源数据库中发生变更的时间。

6

描述 data 属性的内容类型。可能的值包括 json(如本示例所示)或 avro

7

操作标识符。可能的值包括 r(读取)、c(创建)、u(更新)或 d(删除)。

8

从 Debezium 变更事件中已知的 source 属性都通过使用 iodebezium 前缀作为属性名映射到 CloudEvents 扩展属性。

9

当在连接器中启用时,从 Debezium 变更事件中已知的每个 transaction 属性都通过使用 iodebeziumtx 前缀作为属性名映射到 CloudEvents 扩展属性。

10

实际的数据变更。根据操作和连接器,数据可能包含 beforeafterpatch 字段。

以下示例还显示了 PostgreSQL 连接器发出的 CloudEvents 变更事件记录的外观。在此示例中,PostgreSQL 连接器再次配置为使用 JSON 作为 CloudEvents 格式信封,但这次连接器配置为使用 Avro 作为 data 格式。

{
  "id" : "name:test_server;lsn:33227720;txId:578",
  "source" : "/debezium/postgresql/test_server",
  "specversion" : "1.0",
  "type" : "io.debezium.connector.postgresql.DataChangeEvent",
  "time" : "2020-01-13T14:04:18.597Z",
  "datacontenttype" : "application/avro",          (1)
  "dataschema" : "http://my-registry/schemas/ids/1", (2)
  "iodebeziumop" : "r",
  "iodebeziumversion" : "3.3.0.Final",
  "iodebeziumconnector" : "postgresql",
  "iodebeziumname" : "test_server",
  "iodebeziumtsms" : "1578924258597",
  "iodebeziumsnapshot" : "true",
  "iodebeziumdb" : "postgres",
  "iodebeziumschema" : "s1",
  "iodebeziumtable" : "a",
  "iodebeziumtxId" : "578",
  "iodebeziumlsn" : "33227720",
  "iodebeziumxmin" : null,
  "iodebeziumtxid": "578",
  "iodebeziumtxtotalorder": "1",
  "iodebeziumtxdatacollectionorder": "1",
  "data" : "AAAAAAEAAgICAg=="                    (3)
}
表 2. 使用 Avro 格式化数据的连接器的 CloudEvents 事件记录字段描述
Item 描述

1

指示 data 属性包含 Avro 二进制数据。

2

Avro 数据所遵循的模式的 URI。

3

data 属性包含 base64 编码的 Avro 二进制数据。

也可以将 Avro 用于信封以及 data 属性。

配置示例

在 Debezium 连接器配置中配置 io.debezium.converters.CloudEventsConverter。以下示例展示了如何配置 CloudEvents 转换器以发出具有以下特征的变更事件记录

  • 使用 JSON 作为信封。

  • 使用 http://my-registry/schemas/ids/1 的模式注册表将 data 属性序列化为二进制 Avro 数据。

...
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type" : "json",        (1)
"value.converter.data.serializer.type" : "avro",
"value.converter.avro.schema.registry.url": "http://my-registry/schemas/ids/1"
...
表 3. CloudEvents 转换器配置字段描述
Item 描述

1

指定 serializer.type 是可选的,因为 json 是默认值。

CloudEvents 转换器转换 Kafka 记录值。在相同的连接器配置中,如果想操作记录键,可以指定 key.converter。例如,可以指定 StringConverterLongConverterJsonConverterAvroConverter

元数据来源和部分 CloudEvents 字段的配置

默认情况下,metadata.source 属性包含五个部分,如下例所示

"value,id:generate,type:generate,traceparent:header,dataSchemaName:generate"

第一部分指定用于检索记录元数据的来源;允许的值为 valueheader。接下来的部分指定转换器如何填充以下 CloudEvents 字段和数据模式名称的值

  • id

  • type

  • traceparent

  • dataSchemaName(模式在模式注册表中注册的名称)

转换器可以使用以下方法之一来填充每个字段

generate

转换器为字段生成一个值。

header

转换器从消息头获取字段的值。

traceparent CloudEvent 字段的值只能从头信息中检索。

获取记录元数据

要构造 CloudEvent,转换器需要源、操作和事务元数据。通常,转换器可以从记录的值中检索元数据。但在某些情况下,在转换器收到记录之前,记录可能已经过处理,导致其值中不存在元数据,例如,记录经过 Outbox Event Router SMT 处理后。为了保留所需的元数据,可以使用以下方法将元数据传递到记录头中。

过程
  1. 实现一种机制,在记录到达转换器之前将其元数据记录到记录头中,例如,使用 HeaderFrom SMT。

  2. 将转换器的 metadata.source 属性值设置为 header

以下示例显示了一个使用 Outbox Event Router SMT 和 HeaderFrom SMT 的连接器的配置

...
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "type:header",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.metadata.source": "header",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
...
要使用 HeaderFrom 转换,可能需要过滤掉墓碑消息和心跳消息。

metadata.source 属性的 header 值是全局设置。因此,即使您省略了属性值的部分内容(例如 idtype 来源),转换器也会为省略的部分生成 header 值。

获取 CloudEvent 元数据

默认情况下,CloudEvents 转换器会自动为 CloudEvent 的 idtype 字段生成值,并为其 data 字段生成模式名称。只有当 opentelemetry.tracing.attributes.enable 设置为 true 时,traceparent CloudEvent 字段才会包含在消息中。您可以自定义转换器填充这些字段的方式,方法是更改默认值并指定相应头中的字段值。例如

"value.converter.metadata.source": "value,id:header,type:header,traceparent:header,dataSchemaName:header"

在上述配置生效后,您可以配置上游函数来添加 idtypetraceparentdataSchemaName 头,其中包含您想要传递给 CloudEvents 转换器的值。

如果您只想为 id 头提供值,请使用

"value.converter.metadata.source": "value,id:header,type:generate,traceparent:header,dataSchemaName:generate"

要配置转换器以从头信息中获取 idtypetraceparentdataSchemaName 元数据,请使用以下简短语法

"value.converter.metadata.source": "header"

要使转换器能够从头信息中检索数据模式名称,您必须将 schema.data.name.source.header.enable 设置为 true

配置选项

当您配置 Debezium 连接器以使用 CloudEvent 转换器时,可以指定以下选项

表 4. CloudEvents 转换器配置选项描述

Option

Default (默认值)

描述

json

用于 CloudEvents 信封结构的编码类型。该值可以是 jsonavro

json

用于 data 属性的编码类型。该值可以是 jsonavro

N/A

在使用 JSON 时要传递到底层转换器的任何配置选项。json. 前缀将被删除。

N/A

在使用 Avro 时要传递到底层转换器的任何配置选项。avro. 前缀将被删除。例如,对于 Avro data,您将指定 avro.schema.registry.url 选项。

none

指定应如何调整模式名称以与连接器使用的消息转换器兼容。该值可以是 noneavro

none

指定模式在模式注册表中注册的 CloudEvents 模式名称。当 serializer.typejson 时(此时记录的值是无模式的),此设置将被忽略。如果未指定此属性,则使用默认算法生成模式名称:${serverName}.${databaseName}.CloudEvents.Envelope

false

指定转换器是否可以从头信息中检索 CloudEvents data 字段的模式名称。模式名称从 metadata.source 属性中指定的 dataSchemaName 参数获取。

false

指定转换器在生成 CloudEvent 时是否包含 OpenTelemetry 跟踪属性。该值可以是 truefalse

true

指定转换器在生成 CloudEvent 时是否包含扩展属性。该值可以是 truefalse

value,id:generate,type:generate,traceparent:header,dataSchemaName:generate

一个逗号分隔的列表,指定转换器从中检索元数据值(源、操作、事务)的来源,用于 idtypetraceparent CloudEvents 字段,以及用于 dataSchemaName 参数,该参数指定模式在模式注册表中注册的名称。列表中的第一个元素是指定元数据来源的全局设置。元数据的来源可以是 valueheader。全局设置后面是一组对。每对中的第一个元素指定 CloudEvent 字段(idtypetraceparent)的名称,或数据模式(dataSchemaName)的名称。对中的第二个元素指定转换器如何填充字段的值。有效值为 generateheader。用冒号分隔对中的值,例如

value,id:header,type:generate,traceparent:header,dataSchemaName:header

有关配置示例,请参阅 元数据来源和部分 CloudEvents 字段的配置