Debezium 通知
概述
Debezium 通知提供了一种获取连接器状态信息的方法。通知可以发送到以下渠道:
- SinkNotificationChannel
-
通过 Connect API 将通知发送到配置好的主题。
- LogNotificationChannel
-
通知将被追加到日志中。
- JmxNotificationChannel
-
通知作为 JMX bean 的属性公开。
- Custom
-
通知将被发送到您实现的自定义渠道。
Debezium 通知格式
通知消息包含以下信息:
| 属性 | 描述 | ||
|---|---|---|---|
id |
分配给通知的唯一标识符。对于增量快照通知,
|
||
aggregate_type |
与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用一个聚合。 |
||
type |
提供 |
||
additional_data |
一个 Map<String,String>,其中包含有关通知的详细信息。有关示例,请参阅Debezium 关于增量快照进度的通知。 |
||
timestamp |
创建通知的时间。该值表示自 UNIX 纪元以来的毫秒数。 |
可用的通知
Debezium 关于初始快照状态的通知
以下示例显示了一个典型的通知,它提供了初始快照的状态。
{
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
"aggregate_type": "Initial Snapshot",
"type": "COMPLETED", (1)
"additional_data" : {
"connector_name": "myConnector"
},
"timestamp": "1695817046353"
}
| Item | 描述 |
|---|---|
1 |
|
下表显示了在报告初始快照状态的通知中可能出现的不同载荷的示例:
| 状态 | Payload |
|---|---|
STARTED |
|
IN_PROGRESS |
字段 |
TABLE_SCAN_COMPLETED |
在前面的示例中,
字段 |
COMPLETED |
|
ABORTED |
|
SKIPPED |
|
Debezium 关于增量快照进度的通知
下表显示了在报告增量快照状态的通知中可能出现的不同载荷的示例:
| 状态 | Payload |
|---|---|
开始 |
|
暂停 |
|
恢复 |
|
停止 |
|
正在处理数据块 |
|
表快照已完成 |
在前面的示例中,
|
已完成 |
|
启用 Debezium 通知
要启用 Debezium 发送通知,请通过设置 notification.enabled.channels 配置属性来指定通知渠道列表。默认情况下,以下通知渠道可用:
-
sink -
log -
jmx
|
要使用 |
访问 Debezium JMX 通知
要启用 Debezium 通过 JMX bean 公开事件,请完成以下配置步骤:
-
启用 JMX MBean 服务器以公开通知 bean。
-
在连接器的
notification.enabled.channels属性中添加jmx。 -
将您首选的 JMX 客户端连接到 MBean 服务器。
通知通过名称为 debezium.<connector-type>.management.notifications.<server> 的 bean 的 Notifications 属性公开。
下图显示了一个报告增量快照启动的通知。
要丢弃通知,请调用 bean 上的 reset 操作。
通知也作为类型为 debezium.notification 的 JMX 通知公开。要使应用程序能够监听 MBean 发出的 JMX 通知,请将应用程序订阅到通知。
自定义通知渠道
通知机制设计为可扩展的。您可以根据需要实现渠道,以最适合您环境的方式发送通知。添加通知渠道涉及几个步骤:
配置自定义通知渠道
自定义通知渠道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供商接口 (SPI) 的 Java 类。例如:
public interface NotificationChannel {
String name(); (1)
void init(CommonConnectorConfig config); (2)
void send(Notification notification); (3)
void close(); (4)
}
| Item | 描述 |
|---|---|
1 |
渠道的名称。要使 Debezium 使用该渠道,请在连接器的 |
2 |
初始化通道所需的特定配置、变量或连接。 |
3 |
在渠道上发送通知。Debezium 调用此方法来报告其状态。 |
4 |
关闭所有分配的资源。Debezium 在连接器停止时调用此方法。 |
Debezium 核心模块依赖项
自定义通知渠道 Java 项目对 Debezium 核心模块具有编译依赖项。您必须在项目的 pom.xml 文件中包含这些编译依赖项,如下例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> (1)
</dependency>
| Item | 描述 |
|---|---|
1 |
|
在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。
部署自定义 Debezium 通知渠道
-
您有一个自定义通知渠道 Java 程序。
-
要将通知渠道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,然后将该文件复制到包含要使用它的每个 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2、/kafka/connect/debezium-connector-mysql等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。
| 要将自定义通知渠道与多个连接器一起使用,您必须将通知渠道 JAR 文件的副本放在每个连接器子目录中。 |