Debezium 通知

概述

Debezium 通知提供了一种获取连接器状态信息的方法。通知可以发送到以下通道:

SinkNotificationChannel

通过 Connect API 将通知发送到配置的主题。

LogNotificationChannel

通知将被追加到日志中。

JmxNotificationChannel

通知将作为 JMX bean 的属性公开。

Custom

通知将发送到您实现的自定义通道

Debezium 通知格式

通知消息包含以下信息:

属性 描述

id

分配给通知的唯一标识符。对于增量快照通知,id 与发送的 execute-snapshot 信号相同。

由于代码限制,当多个增量快照正在进行时,id 与原始信号不匹配。在这种情况下,最后一个发送信号的 id 将用于所有通知。

aggregate_type

与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用一个聚合。

type

提供 aggregate_type 字段中指定的事件的状态信息。

additional_data

一个 Map<String,String>,包含有关通知的详细信息。有关示例,请参阅Debezium 关于增量快照进度的通知

timestamp

创建通知的时间。该值表示自 UNIX epoch 以来的毫秒数。

可用通知

Debezium 通知传递有关初始快照增量快照进度的信息。有关特定连接器使用通知的信息,请参阅该连接器的文档。

Debezium 关于初始快照状态的通知

以下示例显示了一个典型的通知,该通知提供了初始快照的状态。

{
    "id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
    "aggregate_type": "Initial Snapshot",
    "type": "COMPLETED", (1)
    "additional_data" : {
        "connector_name": "myConnector"
    },
    "timestamp": "1695817046353"
}
Item 描述

1

type 字段可以包含以下值之一:

  • COMPLETED

  • ABORTED

  • SKIPPED

下表显示了在报告初始快照状态的通知中可能出现的不同有效负载示例:

Status Payload

STARTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

IN_PROGRESS

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1"
   },
   "timestamp": "1695817046353"
}

data_collection 字段目前不适用于 MongoDB 连接器

TABLE_SCAN_COMPLETED

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Initial Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

SQL_EXCEPTION

在执行快照时发生了 SQL 异常。

SUCCEEDED

快照成功完成。

total_rows_scanneddata_collection 字段目前不适用于 MongoDB 连接器

COMPLETED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"COMPLETED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

ABORTED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"ABORTED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

SKIPPED

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Initial Snapshot",
      "type":"SKIPPED",
      "additional_data":{
         "connector_name":"my-connector"
      },
      "timestamp": "1695817046353"
}

Debezium 关于增量快照进度的通知

下表显示了在报告增量快照状态的通知中可能出现的不同有效负载示例:

Status Payload

Start

  {
      "id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
      "aggregate_type":"Incremental Snapshot",
      "type":"STARTED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

Paused

{
      "id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
      "aggregate_type":"Incremental Snapshot",
      "type":"PAUSED",
      "additional_data":{
         "connector_name":"my-connector",
         "data_collections":"table1, table2"
      },
      "timestamp": "1695817046353"
}

Resumed

 {
   "id":"a9468204-769d-430f-96d2-b0933d4839f3",
   "aggregate_type":"Incremental Snapshot",
   "type":"RESUMED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2"
   },
   "timestamp": "1695817046353"
}

Stopped

{
   "id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
   "aggregate_type":"Incremental Snapshot",
   "type":"ABORTED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

Processing chunk

{
   "id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
   "aggregate_type":"Incremental Snapshot",
   "type":"IN_PROGRESS",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collections":"table1, table2",
      "current_collection_in_progress":"table1",
      "maximum_key":"100",
      "last_processed_key":"50"
   },
   "timestamp": "1695817046353"
}

Snapshot completed for a table

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"TABLE_SCAN_COMPLETED",
   "additional_data":{
      "connector_name":"my-connector",
      "data_collection":"table1, table2",
      "scanned_collection":"table1",
      "total_rows_scanned":"100",
      "status":"SUCCEEDED"
   },
   "timestamp": "1695817046353"
}

在前面的示例中,additional_data.status 字段可以包含以下值之一:

EMPTY

表不包含任何值。

NO_PRIMARY_KEY

无法完成快照;表没有主键。

SKIPPED

Cannot complete a snapshots for this type of table. Refer to the logs for details.

SQL_EXCEPTION

在执行快照时发生了 SQL 异常。

SUCCEEDED

快照成功完成。

UNKNOWN_SCHEMA

找不到表的模式。请检查日志以获取已知表的列表。

Completed

{
   "id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
   "aggregate_type":"Incremental Snapshot",
   "type":"COMPLETED",
   "additional_data":{
      "connector_name":"my-connector"
   },
   "timestamp": "1695817046353"
}

启用 Debezium 通知

要启用 Debezium 发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知通道可用:

  • sink

  • log

  • jmx

要使用 sink 通知通道,您还必须将 notification.sink.topic.name 配置属性设置为您希望 Debezium 发送通知的主题名称。

访问 Debezium JMX 通知

要启用 Debezium 通过 JMX bean 公开事件,请完成以下配置步骤:

  1. 启用 JMX MBean 服务器以公开通知 bean。

  2. jmx 添加到连接器配置中的 notification.enabled.channels 属性。

  3. 将您首选的 JMX 客户端连接到 MBean 服务器。

通知通过名称为 debezium.<connector-type>.management.notifications.<server> 的 bean 的 Notifications 属性公开。

下图显示了一个报告增量快照开始的通知。

Fields in the JMX `Notifications` attribute

要丢弃通知,请在 bean 上调用 reset 操作。

通知也作为类型为 debezium.notification 的 JMX 通知公开。要使应用程序能够监听 MBean 发出的 JMX 通知,请订阅该应用程序到通知

自定义通知通道

通知机制设计为可扩展的。您可以根据需要实现通道,以您在环境中效果最好的方式传递通知。添加通知通道涉及几个步骤:

配置自定义通知通道

自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供商接口 (SPI) 的 Java 类。例如:

public interface NotificationChannel {

    String name(); (1)

    void init(CommonConnectorConfig config); (2)

    void send(Notification notification); (3)

    void close(); (4)
}
Item 描述

1

通道的名称。要启用 Debezium 使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。

2

初始化通道所需的特定配置、变量或连接。

3

在通道上结束通知。Debezium 调用此方法来报告其状态。

4

关闭所有分配的资源。Debezium 在连接器停止时调用此方法。

Debezium 核心模块依赖

自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖。您必须在项目的 pom.xml 文件中包含这些编译依赖项,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> (1)
</dependency>
Item 描述

1

${version.debezium} 表示 Debezium 连接器的版本。

META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。

部署 Debezium 自定义通知通道

先决条件
  • 您有一个自定义通知通道 Java 程序。

过程
  • 要将通知通道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含要使用的每个 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 都在自己的子目录中 (/kafka/connect/debezium-connector-db2, /kafka/connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录。

要将自定义通知通道与多个连接器一起使用,您必须将通知通道 JAR 文件的副本放在每个连接器子目录中。

配置连接器以使用自定义通知通道

在连接器配置中,将自定义通知通道的名称添加到 notification.enabled.channels 属性。