Debezium 通知
概述
Debezium 通知提供了一种获取连接器状态信息的方法。通知可以发送到以下通道:
- SinkNotificationChannel
-
通过 Connect API 将通知发送到配置的主题。
- LogNotificationChannel
-
通知将被追加到日志中。
- JmxNotificationChannel
-
通知将作为 JMX bean 的属性公开。
- Custom
-
通知将发送到您实现的自定义通道。
Debezium 通知格式
通知消息包含以下信息:
| 属性 | 描述 | ||
|---|---|---|---|
id |
分配给通知的唯一标识符。对于增量快照通知,
|
||
aggregate_type |
与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用一个聚合。 |
||
type |
提供 |
||
additional_data |
一个 Map<String,String>,包含有关通知的详细信息。有关示例,请参阅Debezium 关于增量快照进度的通知。 |
||
timestamp |
创建通知的时间。该值表示自 UNIX epoch 以来的毫秒数。 |
可用通知
Debezium 关于初始快照状态的通知
以下示例显示了一个典型的通知,该通知提供了初始快照的状态。
{
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
"aggregate_type": "Initial Snapshot",
"type": "COMPLETED", (1)
"additional_data" : {
"connector_name": "myConnector"
},
"timestamp": "1695817046353"
}
| Item | 描述 |
|---|---|
1 |
|
下表显示了在报告初始快照状态的通知中可能出现的不同有效负载示例:
| Status | Payload |
|---|---|
STARTED |
|
IN_PROGRESS |
|
TABLE_SCAN_COMPLETED |
在前面的示例中,
|
COMPLETED |
|
ABORTED |
|
SKIPPED |
|
Debezium 关于增量快照进度的通知
下表显示了在报告增量快照状态的通知中可能出现的不同有效负载示例:
| Status | Payload |
|---|---|
Start |
|
Paused |
|
Resumed |
|
Stopped |
|
Processing chunk |
|
Snapshot completed for a table |
在前面的示例中,
|
Completed |
|
启用 Debezium 通知
要启用 Debezium 发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知通道可用:
-
sink -
log -
jmx
|
要使用 |
访问 Debezium JMX 通知
要启用 Debezium 通过 JMX bean 公开事件,请完成以下配置步骤:
-
启用 JMX MBean 服务器以公开通知 bean。
-
将
jmx添加到连接器配置中的notification.enabled.channels属性。 -
将您首选的 JMX 客户端连接到 MBean 服务器。
通知通过名称为 debezium.<connector-type>.management.notifications.<server> 的 bean 的 Notifications 属性公开。
下图显示了一个报告增量快照开始的通知。
要丢弃通知,请在 bean 上调用 reset 操作。
通知也作为类型为 debezium.notification 的 JMX 通知公开。要使应用程序能够监听 MBean 发出的 JMX 通知,请订阅该应用程序到通知。
自定义通知通道
通知机制设计为可扩展的。您可以根据需要实现通道,以您在环境中效果最好的方式传递通知。添加通知通道涉及几个步骤:
配置自定义通知通道
自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供商接口 (SPI) 的 Java 类。例如:
public interface NotificationChannel {
String name(); (1)
void init(CommonConnectorConfig config); (2)
void send(Notification notification); (3)
void close(); (4)
}
| Item | 描述 |
|---|---|
1 |
通道的名称。要启用 Debezium 使用该通道,请在连接器的 |
2 |
初始化通道所需的特定配置、变量或连接。 |
3 |
在通道上结束通知。Debezium 调用此方法来报告其状态。 |
4 |
关闭所有分配的资源。Debezium 在连接器停止时调用此方法。 |
Debezium 核心模块依赖
自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖。您必须在项目的 pom.xml 文件中包含这些编译依赖项,如下例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> (1)
</dependency>
| Item | 描述 |
|---|---|
1 |
|
在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。
部署 Debezium 自定义通知通道
-
您有一个自定义通知通道 Java 程序。
-
要将通知通道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含要使用的每个 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 都在自己的子目录中 (/kafka/connect/debezium-connector-db2,/kafka/connect/debezium-connector-mysql等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录。
| 要将自定义通知通道与多个连接器一起使用,您必须将通知通道 JAR 文件的副本放在每个连接器子目录中。 |