向 Debezium 连接器发送信号

概述

Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如,启动表的 即时快照)的方法。要使用信号触发连接器执行指定的操作,您可以将连接器配置为使用以下一个或多个通道:

SourceSignalChannel

您可以发出 SQL 命令,将信号消息添加到专用的信号数据集合中。信号数据集合在源数据库上创建,专门用于与 Debezium 通信。每个连接器实例都必须有其唯一的信号数据集合。

KafkaSignalChannel

您将信号消息提交到可配置的 Kafka 主题。

JmxSignalChannel

您通过 JMX signal 操作提交信号。

FileSignalChannel

您可以使用文件发送信号。

Custom

您将信号提交到您实现的 自定义通道。当 Debezium 检测到新 日志记录即时阻塞快照记录 被添加到通道时,它会读取信号并启动请求的操作。

以下 Debezium 连接器可用于信号功能:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

您可以通过设置 signal.enabled.channels 配置属性来指定要启用哪个通道。该属性列出了已启用的通道名称。默认情况下,Debezium 提供以下通道:sourcekafkasource 通道默认启用,因为它对于增量快照信号是必需的。

错误处理

除了 source 通道外,Debezium 信号通道不实现重试策略。在触发信号后,请务必验证其是否成功完成。

您可以通过配置连接器发送 通知,使其能够自动报告增量或阻塞快照的进度。

启用源信号通道

默认情况下,Debezium 源信号通道已启用。

您必须为要使用信号的每个连接器显式配置信号。

过程
  1. 在源数据库上,创建一个用于向连接器发送信号的信号数据集合表。有关信号数据集合所需结构的更多信息,请参阅 信号数据集合的结构

  2. 对于实现原生更改数据捕获 (CDC) 机制的源数据库(如 Db2 或 SQL Server),请为信号表启用 CDC。

  3. 将信号数据集合的名称添加到 Debezium 连接器配置中。
    在连接器配置中,添加属性 signal.data.collection,并将其值设置为在步骤 1 中创建的信号数据集合的完全限定名。

    例如:signal.data.collection = inventory.debezium_signals

    信号集合的完全限定名称的格式取决于连接器。
    以下示例显示了每个连接器要使用的命名格式:

完全限定表名
Db2

<schemaName>.<tableName>

MariaDB (技术预览)

<databaseName>.<tableName>

MongoDB

<databaseName>.<collectionName>

MySQL

<databaseName>.<tableName>

Oracle

<databaseName>.<schemaName>.<tableName>

PostgreSQL

<schemaName>.<tableName>

SQL Server

<databaseName>.<schemaName>.<tableName>

有关设置 signal.data.collection 属性的更多信息,请参阅您连接器的配置属性表。

信号数据集合的结构

信号数据集合或信号表存储您发送给连接器以触发指定操作的信号。信号表的结构必须符合以下标准格式。

  • 包含三个字段(列)。

  • 字段按特定顺序排列,如 表 1 所示。

表 1. 信号数据集合的必需结构
Field (字段) Type 描述

id
(必需)

string

一个任意的唯一字符串,用于标识信号实例。
您为提交到信号表的每个信号分配一个 id
通常,ID 是一个 UUID 字符串。
您可以使用信号实例进行日志记录、调试或去重。
当信号触发 Debezium 执行增量快照时,它会生成一个带有任意 id 字符串的信号消息。生成的消息中包含的 id 字符串与提交的信号中的 id 字符串无关。

type
(必需)

string

指定要发送的信号的类型。
您可以将某些信号类型用于支持信号的任何连接器,而其他信号类型仅适用于特定连接器。

data
(可选)

string

指定要传递给信号操作的 JSON 格式参数。
每种信号类型都需要一组特定的数据。

信号数据集合必须包含名为 idtypedata 的列。请勿在名称中包含引号。如果为这些列分配了备用名称,连接器将无法处理信号。

创建信号数据集合

您可以通过向源数据库提交标准的 SQL DDL 查询来创建信号表。

先决条件
  • 您拥有在源数据库上创建表的足够权限。

过程
  • 向源数据库提交 SQL 查询,以创建与 必需结构 一致的表,如下例所示:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

id 变量的 VARCHAR 参数分配的空间量必须足以容纳发送到信号表的信号 ID 字符串的大小。
如果 ID 的大小超过可用空间,连接器将无法处理信号。

以下示例显示了一个 CREATE TABLE 命令,该命令创建了一个三列的 debezium_signal 表:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

启用 Kafka 信号通道

您可以通过将其添加到 signal.enabled.channels 配置属性来启用 Kafka 信号通道,然后将接收信号的主题名称添加到 signal.kafka.topic 属性。启用信号通道后,将创建一个 Kafka 消费者来消费发送到配置的信号主题的信号。

要使用 Kafka 信号为大多数连接器触发即时增量快照,您必须首先在连接器配置中 启用 source 信号通道。源通道实现了一个水位线机制,用于去重可能被增量快照捕获,然后在流式传输恢复后再次捕获的事件。当使用信号通道触发具有 已启用 GTID 的只读 MySQL 数据库的增量快照时,不需要启用源通道。有关更多信息,请参阅 MySQL 只读增量快照

消息格式

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

该值为一个 JSON 对象,包含 typedata 字段。

当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 2. 执行快照数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

data-collections (数据集合)

N/A

一个逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名称。
数据集合的 命名格式 取决于数据库。

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
每个附加条件都是一个对象,指定要捕获即时快照的数据的过滤条件。您可以为每个附加条件设置以下属性:

data-collection (数据集合)

过滤器适用的数据集合的完全限定名称。您可以为每个数据集合应用不同的过滤器。

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
快照过程会根据 filter 值评估数据集合中的记录,并且只捕获包含匹配值的记录。

分配给 filter 属性的特定值取决于即时快照的类型:

  • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照会将此片段附加到查询的条件子句中。

  • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了一个典型的 execute-snapshot Kafka 消息:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

启用 JMX 信号通道

您可以通过将 jmx 添加到连接器配置中的 signal.enabled.channels 属性,然后 启用 JMX MBean Server 来公开信号 MBean,从而启用 JMX 信号。

发送 JMX 信号

过程
  1. 使用您偏好的 JMX 客户端(例如,JConsole 或 JDK Mission Control)连接到 MBean 服务器。

  2. 搜索 MBean debezium.<connector-type>.management.signals.<server>。该 MBean 公开了 signal 操作,该操作接受以下输入参数:

    p0

    信号的 ID。

    p1

    信号的类型,例如 execute-snapshot

    p2

    一个 JSON 数据字段,包含有关指定信号类型的其他信息。

  3. 通过提供输入参数值来发送 execute-snapshot 信号。
    在 JSON 数据字段中,包含下表中列出的信息:

    表 3. 执行快照数据字段
    Field (字段) Default (默认值) Value (值)

    type

    incremental (增量)

    要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

    data-collections (数据集合)

    N/A

    一个逗号分隔的正则表达式数组,匹配要包含在快照中的 表的完全限定名称

    additional-conditions (附加条件)

    N/A

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    每个附加条件都是一个对象,指定要捕获即时快照的数据的过滤条件。您可以为每个附加条件设置以下属性:

    data-collection (数据集合)

    过滤器适用的数据集合的完全限定名称。您可以为每个数据集合应用不同的过滤器。

    filter (过滤器)

    Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
    快照过程会根据 filter 值评估数据集合中的记录,并且只捕获包含匹配值的记录。

    分配给 filter 属性的特定值取决于即时快照的类型:

    • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照会将此片段附加到查询的条件子句中。

    • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

下图显示了如何使用 JConsole 发送信号的示例:

Using JConsole to send an `execute-snapshot` signal

启用文件信号通道

您可以通过将 file 添加到连接器配置中的 signal.enabled.channels 属性来启用文件信号通道。启用信号通道后,您必须配置连接器从文件中读取信号。默认情况下,信号文件创建在连接器类路径的根目录中,文件名为 file-signals.txt。如果您想使用不同的文件,请在连接器配置中设置 signal.file 属性,并指定文件名和路径。该文件路径必须可供连接器环境访问。

消息格式

信号文件中的信号表示为 JSON 对象,这些对象由 idtypedata 字段组成。

id 字段是信号的唯一标识符,通常是 UUID 字符串。

当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 4. 执行快照数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

data-collections (数据集合)

N/A

一个逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名称。
数据集合的 命名格式 取决于数据库。

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
每个附加条件都是一个对象,指定要捕获即时快照的数据的过滤条件。您可以为每个附加条件设置以下属性:

data-collection (数据集合)

过滤器适用的数据集合的完全限定名称。您可以为每个数据集合应用不同的过滤器。

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
快照过程会根据 filter 值评估数据集合中的记录,并且只捕获包含匹配值的记录。

分配给 filter 属性的特定值取决于即时快照的类型:

  • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照会将此片段附加到查询的条件子句中。

  • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了文件中的典型 execute-snapshot 消息:

{"id":"d139b9b7-7777-4547-917d-111111111111", "type":"execute-snapshot", "data":{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}}

自定义信号通道

信号机制被设计为可扩展的。您可以根据需要实现通道,以您在环境中认为最有效的方式向 Debezium 发送信号。

添加信号通道涉及几个步骤:

提供自定义信号通道

自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供商接口 (SPI) 的 Java 类。例如:

public interface SignalChannelReader {

    String name(); (1)

    void init(CommonConnectorConfig connectorConfig); (2)

    List<SignalRecord> read(); (3)

    void close(); (4)
}
1 读取器的名称。要使 Debezium 能够使用该通道,请在连接器的 signal.enabled.channels 属性中指定此名称。
2 初始化通道所需的特定配置、变量或连接。
3 从通道读取信号。SignalProcessor 类调用此方法来检索要处理的信号。
4 关闭所有分配的资源。连接器停止时,Debezium 会调用此方法。

Debezium 核心模块依赖

自定义信号通道 Java 项目对 Debezium 核心模块具有编译依赖。您必须在项目的 pom.xml 文件中包含这些编译依赖,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> (1)
</dependency>
1 ${version.debezium} 表示 Debezium 连接器的版本。

META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 文件中声明您的实现。

部署自定义信号通道

先决条件
  • 您有一个自定义信号通道 Java 程序。

过程
  • 要将自定义信号通道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含每个要使用它的 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等)。

要将自定义信号通道与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。

配置连接器使用自定义信号通道

将自定义信号通道的名称添加到 signal.enabled.channels 配置属性。

信号操作

您可以使用信号来触发以下操作:

某些信号与所有连接器不兼容。

记录信号

您可以通过创建类型为 log 的信号表条目来请求连接器将条目添加到日志中。处理信号后,连接器会将指定的消息打印到日志中。可选地,您可以配置信号,以便生成的包含流坐标的消息。

表 5. 用于添加日志消息的信号记录示例
Column Value (值) 描述

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

type

log

信号的操作类型。

data

{"message": "Signal message at offset {}"}

message 参数指定要打印到日志的字符串。
如果您在消息中添加占位符 ({}),它将被流坐标替换。

即时增量快照信号

您可以通过创建类型为 execute-snapshot 的信号来请求连接器触发即时快照。处理信号后,连接器将运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,即时快照发生在运行时,在连接器已经开始从数据库流式传输更改事件之后。您可以随时触发即时快照。

以下 Debezium 连接器可用于即时快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 6. 即时快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
表 7. 即时快照信号消息示例
Key Value (值)

test_connector

{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}}

有关即时快照的更多信息,请参阅连接器文档中的“快照”主题。

即时快照停止信号

您可以通过创建类型为 stop-snapshot 的信号表条目来请求连接器停止正在进行的即时快照。处理信号后,连接器将停止当前正在进行的快照操作。

以下 Debezium 连接器可用于停止即时快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 8. 停止即时快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

stop-snapshot

data

{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

您必须指定信号的 typedata-collections 字段是可选的。将 data-collections 字段留空,以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但想排除特定集合的快照,请提供要排除的集合名称或正则表达式的逗号分隔列表。在连接器处理完信号后,增量快照将继续,但会排除您指定的集合中的数据。

增量快照

增量快照是一种特殊的即时快照。在增量快照中,连接器捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同的是,增量快照会分块捕获表,而不是一次性全部捕获。连接器使用水位线方法来跟踪快照的进度。

通过分块捕获指定表的初始状态,而不是进行一次性的操作,增量快照相对于初始快照过程提供了以下优势:

  • 在连接器捕获指定表的基线状态时,来自事务日志的近实时事件流式传输将继续进行,不会中断。

  • 如果增量快照过程被中断,它可以从停止的点恢复。

  • 您可以随时触发增量快照。

增量快照暂停信号

您可以通过创建类型为 pause-snapshot 的信号表条目来请求连接器暂停正在进行的增量快照。处理信号后,连接器将停止当前正在进行的快照操作。因此,无法指定数据集合,因为快照处理将在处理信号时暂停在当前位置。

以下 Debezium 连接器可用于暂停增量快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 9. 增量快照暂停信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

pause-snapshot

您必须指定信号的 typedata 字段被忽略。

增量快照恢复信号

您可以通过创建类型为 resume-snapshot 的信号表条目来请求连接器恢复已暂停的增量快照。处理信号后,连接器将恢复之前暂停的快照操作。

以下 Debezium 连接器可用于恢复增量快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 10. 增量快照恢复信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

resume-snapshot

您必须指定信号的 typedata 字段被忽略。

有关增量快照的更多信息,请参阅连接器文档中的“快照”主题。

阻塞快照信号

您可以通过创建类型为 execute-snapshotdata.type 值为 blocking 的信号来请求连接器触发即时阻塞快照。处理信号后,连接器将运行请求的快照操作。

与连接器首次启动后运行的初始快照不同,即时阻塞快照发生在运行时,在连接器停止从数据库流式传输更改事件之后。您可以随时触发即时阻塞快照。

以下 Debezium 连接器可用于阻塞快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 11. 阻塞快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
表 12. 阻塞快照信号消息示例
Key Value (值)

test_connector

{"type":"execute-snapshot","data": {"type": "blocking"}

有关阻塞快照的更多信息,请参阅连接器文档中的“快照”主题。

定义自定义操作

自定义操作使您能够扩展 Debezium 信号框架,以触发默认实现中不可用的操作。您可以将自定义操作与多个连接器一起使用。

要定义自定义信号操作,您必须定义以下接口:

@FunctionalInterface
public interface SignalAction<P extends Partition> {

    /**
     * @param signalPayload the content of the signal
     * @return true if the signal was processed
     */
    boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}

io.debezium.pipeline.signal.actions.SignalAction 公开一个带有单个参数的方法,该参数代表通过信号通道发送的消息负载。

定义自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用:io.debezium.pipeline.signal.actions.SignalActionProvider

public interface SignalActionProvider {

    /**
     * Create a map of signal action where the key is the name of the action.
     *
     * @param dispatcher the event dispatcher instance
     * @param connectorConfig the connector config
     * @return a concrete action
     */

    <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}

您的实现必须返回一个信号操作的映射。将映射键设置为操作的名称。该键用作信号的 type

Debezium 核心模块依赖

自定义操作 Java 项目对 Debezium 核心模块具有编译依赖。在项目的 pom.xml 文件中包含以下编译依赖:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version>
</dependency>

在上一个示例中,占位符 ${version.debezium} 表示 Debezium 连接器的版本。在 pom.xml 文件的 <properties> 部分指定 version.debezium 属性的值。例如:

<properties>
    <version.debezium>3.3.1.Final</version.debezium>
</properties>

META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider 文件中声明您的提供商实现。

部署自定义操作

先决条件
  • 您有一个自定义操作 Java 程序。

过程
  • 要将自定义操作与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含每个要使用它的 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等)。

要将自定义操作与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。