向 Debezium 连接器发送信号
概述
Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如,启动表的 即时快照)的方法。要使用信号触发连接器执行指定的操作,您可以将连接器配置为使用以下一个或多个通道:
- SourceSignalChannel
-
您可以发出 SQL 命令,将信号消息添加到专用的信号数据集合中。信号数据集合在源数据库上创建,专门用于与 Debezium 通信。每个连接器实例都必须有其唯一的信号数据集合。
- KafkaSignalChannel
-
您将信号消息提交到可配置的 Kafka 主题。
- JmxSignalChannel
-
您通过 JMX
signal操作提交信号。 - FileSignalChannel
-
您可以使用文件发送信号。
- Custom
-
您将信号提交到您实现的 自定义通道。当 Debezium 检测到新 日志记录 或 即时阻塞快照记录 被添加到通道时,它会读取信号并启动请求的操作。
以下 Debezium 连接器可用于信号功能:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
您可以通过设置 signal.enabled.channels 配置属性来指定要启用哪个通道。该属性列出了已启用的通道名称。默认情况下,Debezium 提供以下通道:source 和 kafka。source 通道默认启用,因为它对于增量快照信号是必需的。
启用源信号通道
默认情况下,Debezium 源信号通道已启用。
您必须为要使用信号的每个连接器显式配置信号。
-
在源数据库上,创建一个用于向连接器发送信号的信号数据集合表。有关信号数据集合所需结构的更多信息,请参阅 信号数据集合的结构。
-
对于实现原生更改数据捕获 (CDC) 机制的源数据库(如 Db2 或 SQL Server),请为信号表启用 CDC。
-
将信号数据集合的名称添加到 Debezium 连接器配置中。
在连接器配置中,添加属性signal.data.collection,并将其值设置为在步骤 1 中创建的信号数据集合的完全限定名。
例如:signal.data.collection = inventory.debezium_signals。
信号集合的完全限定名称的格式取决于连接器。
以下示例显示了每个连接器要使用的命名格式:
- Db2
-
<schemaName>.<tableName> - MariaDB (技术预览)
-
<databaseName>.<tableName> - MongoDB
-
<databaseName>.<collectionName> - MySQL
-
<databaseName>.<tableName> - Oracle
-
<databaseName>.<schemaName>.<tableName> - PostgreSQL
-
<schemaName>.<tableName> - SQL Server
-
<databaseName>.<schemaName>.<tableName>
有关设置signal.data.collection属性的更多信息,请参阅您连接器的配置属性表。
信号数据集合的结构
信号数据集合或信号表存储您发送给连接器以触发指定操作的信号。信号表的结构必须符合以下标准格式。
-
包含三个字段(列)。
-
字段按特定顺序排列,如 表 1 所示。
| Field (字段) | Type | 描述 |
|---|---|---|
|
|
一个任意的唯一字符串,用于标识信号实例。 |
|
|
指定要发送的信号的类型。 |
|
|
指定要传递给信号操作的 JSON 格式参数。 |
信号数据集合必须包含名为 id、type 和 data 的列。请勿在名称中包含引号。如果为这些列分配了备用名称,连接器将无法处理信号。 |
创建信号数据集合
您可以通过向源数据库提交标准的 SQL DDL 查询来创建信号表。
-
您拥有在源数据库上创建表的足够权限。
-
向源数据库提交 SQL 查询,以创建与 必需结构 一致的表,如下例所示:
CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);
|
为 |
以下示例显示了一个 CREATE TABLE 命令,该命令创建了一个三列的 debezium_signal 表:
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
启用 Kafka 信号通道
您可以通过将其添加到 signal.enabled.channels 配置属性来启用 Kafka 信号通道,然后将接收信号的主题名称添加到 signal.kafka.topic 属性。启用信号通道后,将创建一个 Kafka 消费者来消费发送到配置的信号主题的信号。
|
要使用 Kafka 信号为大多数连接器触发即时增量快照,您必须首先在连接器配置中 启用 |
消息格式
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
该值为一个 JSON 对象,包含 type 和 data 字段。
当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
要运行的快照类型。目前 Debezium 支持 |
|
N/A |
一个逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名称。 |
|
N/A |
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
|
以下示例显示了一个典型的 execute-snapshot Kafka 消息:
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`
启用 JMX 信号通道
您可以通过将 jmx 添加到连接器配置中的 signal.enabled.channels 属性,然后 启用 JMX MBean Server 来公开信号 MBean,从而启用 JMX 信号。
发送 JMX 信号
-
使用您偏好的 JMX 客户端(例如,JConsole 或 JDK Mission Control)连接到 MBean 服务器。
-
搜索 MBean
debezium.<connector-type>.management.signals.<server>。该 MBean 公开了signal操作,该操作接受以下输入参数:- p0
-
信号的 ID。
- p1
-
信号的类型,例如
execute-snapshot。 - p2
-
一个 JSON 数据字段,包含有关指定信号类型的其他信息。
-
通过提供输入参数值来发送
execute-snapshot信号。
在 JSON 数据字段中,包含下表中列出的信息:表 3. 执行快照数据字段 Field (字段) Default (默认值) Value (值) typeincremental (增量)要运行的快照类型。目前 Debezium 支持
incremental和blocking类型。data-collections (数据集合)N/A
一个逗号分隔的正则表达式数组,匹配要包含在快照中的 表的完全限定名称。
additional-conditions (附加条件)N/A
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
每个附加条件都是一个对象,指定要捕获即时快照的数据的过滤条件。您可以为每个附加条件设置以下属性:data-collection (数据集合)-
过滤器适用的数据集合的完全限定名称。您可以为每个数据集合应用不同的过滤器。
filter (过滤器)-
Specifies column values that must be present in a database record for the snapshot to include it, for example,
"color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如"color='blue'"。)
快照过程会根据filter值评估数据集合中的记录,并且只捕获包含匹配值的记录。
分配给filter属性的特定值取决于即时快照的类型:-
对于增量快照,您指定一个搜索条件片段,例如
"color='blue'",快照会将此片段附加到查询的条件子句中。 -
对于阻塞快照,您指定一个完整的
SELECT语句,例如您可能在snapshot.select.statement.overrides属性中设置的语句。
-
下图显示了如何使用 JConsole 发送信号的示例:
启用文件信号通道
您可以通过将 file 添加到连接器配置中的 signal.enabled.channels 属性来启用文件信号通道。启用信号通道后,您必须配置连接器从文件中读取信号。默认情况下,信号文件创建在连接器类路径的根目录中,文件名为 file-signals.txt。如果您想使用不同的文件,请在连接器配置中设置 signal.file 属性,并指定文件名和路径。该文件路径必须可供连接器环境访问。
消息格式
信号文件中的信号表示为 JSON 对象,这些对象由 id、type 和 data 字段组成。
id 字段是信号的唯一标识符,通常是 UUID 字符串。
当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
要运行的快照类型。目前 Debezium 支持 |
|
N/A |
一个逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名称。 |
|
N/A |
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
|
以下示例显示了文件中的典型 execute-snapshot 消息:
{"id":"d139b9b7-7777-4547-917d-111111111111", "type":"execute-snapshot", "data":{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}}
自定义信号通道
信号机制被设计为可扩展的。您可以根据需要实现通道,以您在环境中认为最有效的方式向 Debezium 发送信号。
添加信号通道涉及几个步骤:
提供自定义信号通道
自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供商接口 (SPI) 的 Java 类。例如:
public interface SignalChannelReader {
String name(); (1)
void init(CommonConnectorConfig connectorConfig); (2)
List<SignalRecord> read(); (3)
void close(); (4)
}
| 1 | 读取器的名称。要使 Debezium 能够使用该通道,请在连接器的 signal.enabled.channels 属性中指定此名称。 |
| 2 | 初始化通道所需的特定配置、变量或连接。 |
| 3 | 从通道读取信号。SignalProcessor 类调用此方法来检索要处理的信号。 |
| 4 | 关闭所有分配的资源。连接器停止时,Debezium 会调用此方法。 |
Debezium 核心模块依赖
自定义信号通道 Java 项目对 Debezium 核心模块具有编译依赖。您必须在项目的 pom.xml 文件中包含这些编译依赖,如下例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version> (1)
</dependency>
| 1 | ${version.debezium} 表示 Debezium 连接器的版本。 |
在 META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 文件中声明您的实现。
部署自定义信号通道
-
您有一个自定义信号通道 Java 程序。
-
要将自定义信号通道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含每个要使用它的 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2、/kafka/connect/debezium-connector-mysql等)。
| 要将自定义信号通道与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。 |
信号操作
您可以使用信号来触发以下操作:
某些信号与所有连接器不兼容。
记录信号
您可以通过创建类型为 log 的信号表条目来请求连接器将条目添加到日志中。处理信号后,连接器会将指定的消息打印到日志中。可选地,您可以配置信号,以便生成的包含流坐标的消息。
| Column | Value (值) | 描述 |
|---|---|---|
id |
|
|
type |
|
信号的操作类型。 |
data |
|
|
即时增量快照信号
您可以通过创建类型为 execute-snapshot 的信号来请求连接器触发即时快照。处理信号后,连接器将运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,即时快照发生在运行时,在连接器已经开始从数据库流式传输更改事件之后。您可以随时触发即时快照。
以下 Debezium 连接器可用于即时快照:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
| Column | Value (值) |
|---|---|
id |
|
type |
|
data |
|
| Key | Value (值) |
|---|---|
test_connector |
|
有关即时快照的更多信息,请参阅连接器文档中的“快照”主题。
即时快照停止信号
您可以通过创建类型为 stop-snapshot 的信号表条目来请求连接器停止正在进行的即时快照。处理信号后,连接器将停止当前正在进行的快照操作。
以下 Debezium 连接器可用于停止即时快照:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
| Column | Value (值) |
|---|---|
id |
|
type |
|
data |
|
您必须指定信号的 type。data-collections 字段是可选的。将 data-collections 字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但想排除特定集合的快照,请提供要排除的集合名称或正则表达式的逗号分隔列表。在连接器处理完信号后,增量快照将继续,但会排除您指定的集合中的数据。
增量快照
增量快照是一种特殊的即时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同的是,增量快照会分块捕获表,而不是一次性全部捕获。连接器使用水位线方法来跟踪快照的进度。
通过分块捕获指定表的初始状态,而不是进行一次性的操作,增量快照相对于初始快照过程提供了以下优势:
-
在连接器捕获指定表的基线状态时,来自事务日志的近实时事件流式传输将继续进行,不会中断。
-
如果增量快照过程被中断,它可以从停止的点恢复。
-
您可以随时触发增量快照。
增量快照暂停信号
您可以通过创建类型为 pause-snapshot 的信号表条目来请求连接器暂停正在进行的增量快照。处理信号后,连接器将停止当前正在进行的快照操作。因此,无法指定数据集合,因为快照处理将在处理信号时暂停在当前位置。
以下 Debezium 连接器可用于暂停增量快照:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
| Column | Value (值) |
|---|---|
id |
|
type |
|
您必须指定信号的 type。data 字段被忽略。
增量快照恢复信号
您可以通过创建类型为 resume-snapshot 的信号表条目来请求连接器恢复已暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。
以下 Debezium 连接器可用于恢复增量快照:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
| Column | Value (值) |
|---|---|
id |
|
type |
|
您必须指定信号的 type。data 字段被忽略。
有关增量快照的更多信息,请参阅连接器文档中的“快照”主题。
阻塞快照信号
您可以通过创建类型为 execute-snapshot 且 data.type 值为 blocking 的信号来请求连接器触发即时阻塞快照。处理信号后,连接器将运行请求的快照操作。
与连接器首次启动后运行的初始快照不同,即时阻塞快照发生在运行时,在连接器停止从数据库流式传输更改事件之后。您可以随时触发即时阻塞快照。
以下 Debezium 连接器可用于阻塞快照:
-
Db2
-
MariaDB (技术预览)
-
MongoDB
-
MySQL
-
Oracle
-
PostgreSQL
-
SQL Server
| Column | Value (值) |
|---|---|
id |
|
type |
|
data |
|
| Key | Value (值) |
|---|---|
test_connector |
|
有关阻塞快照的更多信息,请参阅连接器文档中的“快照”主题。
定义自定义操作
自定义操作使您能够扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以将自定义操作与多个连接器一起使用。
要定义自定义信号操作,您必须定义以下接口:
@FunctionalInterface
public interface SignalAction<P extends Partition> {
/**
* @param signalPayload the content of the signal
* @return true if the signal was processed
*/
boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}
io.debezium.pipeline.signal.actions.SignalAction 公开一个带有单个参数的方法,该参数代表通过信号通道发送的消息负载。
定义自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用:io.debezium.pipeline.signal.actions.SignalActionProvider。
public interface SignalActionProvider {
/**
* Create a map of signal action where the key is the name of the action.
*
* @param dispatcher the event dispatcher instance
* @param connectorConfig the connector config
* @return a concrete action
*/
<P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}
您的实现必须返回一个信号操作的映射。将映射键设置为操作的名称。该键用作信号的 type。
Debezium 核心模块依赖
自定义操作 Java 项目对 Debezium 核心模块具有编译依赖。在项目的 pom.xml 文件中包含以下编译依赖:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
在上一个示例中,占位符 ${version.debezium} 表示 Debezium 连接器的版本。在 pom.xml 文件的 <properties> 部分指定 version.debezium 属性的值。例如:
<properties>
<version.debezium>3.3.1.Final</version.debezium>
</properties>
在 META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider 文件中声明您的提供商实现。
部署自定义操作
-
您有一个自定义操作 Java 程序。
-
要将自定义操作与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含每个要使用它的 Debezium 连接器的 JAR 文件的目录中。
例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2、/kafka/connect/debezium-connector-mysql等)。
| 要将自定义操作与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。 |