您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

向 Debezium 连接器发送信号

概述

Debezium 信号机制提供了一种修改连接器行为或触发一次性操作(例如,启动表的即席快照)的方法。要使用信号触发连接器执行指定的操作,您可以配置连接器使用以下一个或多个通道:

SourceSignalChannel

您可以发出 SQL 命令,将信号消息添加到专门的信号数据集合中。信号数据集合在源数据库上创建,专门用于与 Debezium 进行通信。每个连接器实例的信号数据集合必须是唯一的。

KafkaSignalChannel

您将信号消息提交到可配置的 Kafka 主题。

JmxSignalChannel

您通过 JMX signal 操作提交信号。

FileSignalChannel

您可以使用文件发送信号。

Custom

您将信号提交到您实现的自定义通道。当 Debezium 检测到新日志记录即席阻塞快照记录已添加到通道时,它会读取信号并启动请求的操作。

以下 Debezium 连接器可用于信号功能:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

您可以通过设置 signal.enabled.channels 配置属性来指定启用哪个通道。该属性列出了已启用的通道名称。默认情况下,Debezium 提供以下通道:sourcekafkasource 通道默认启用,因为它对于增量快照信号是必需的。

错误处理

除了 source 通道外,Debezium 信号通道不实现重试策略。您发起信号后,请务必验证其是否成功完成。

您可以通过配置连接器发送通知,使其能够自动报告增量或阻塞快照的进度。

启用源信号通道

默认情况下,Debezium 源信号通道是启用的。

您必须为要使用的每个连接器显式配置信号。

过程
  1. 在源数据库上,创建一个用于将信号发送到连接器的信号数据集合表。有关信号数据集合所需结构的更多信息,请参阅信号数据集合的结构

  2. 对于实现原生 CDC(变更数据捕获)机制的源数据库(如 Db2 或 SQL Server),请为信号表启用 CDC。

  3. 将信号数据集合的名称添加到 Debezium 连接器配置中。
    在连接器配置中,添加属性 signal.data.collection,并将其值设置为您在步骤 1 中创建的信号数据集合的完全限定名。

    例如:signal.data.collection = inventory.debezium_signals

    信号集合的完全限定名的格式取决于连接器。
    以下示例显示了每个连接器要使用的命名格式:

完全限定的表名
Db2

<schemaName>.<tableName>

MariaDB (技术预览)

<databaseName>.<tableName>

MongoDB

<databaseName>.<collectionName>

MySQL

<databaseName>.<tableName>

Oracle

<databaseName>.<schemaName>.<tableName>

PostgreSQL

<schemaName>.<tableName>

SQL Server

<databaseName>.<schemaName>.<tableName>

有关设置 signal.data.collection 属性的更多信息,请参阅您的连接器的配置属性表。

信号数据集合的结构

信号数据集合(或信号表)存储您发送给连接器以触发指定操作的信号。信号表的结构必须符合以下标准格式。

  • 包含三个字段(列)。

  • 字段按特定顺序排列,如表 1 所示。

表 1. 信号数据集合的必需结构
Field (字段) Type 描述

id
(必需)

string

标识信号实例的任意唯一字符串。
您为提交到信号表的每个信号分配一个 id
通常,ID 是一个 UUID 字符串。
您可以使用信号实例进行日志记录、调试或去重。
当信号触发 Debezium 执行增量快照时,它会生成一个具有任意 id 字符串的信号消息。生成的消息中包含的 id 字符串与提交的信号中的 id 字符串无关。

type
(必需)

string

指定要发送的信号的类型。
您可以使用某些信号类型与任何可用的信号连接器,而其他信号类型仅可用于特定连接器。

data
(可选)

string

指定要传递给信号操作的 JSON 格式的参数。
每种信号类型都需要一组特定的数据。

信号数据集合必须包含名为 idtypedata 的列。名称中不要包含引号。如果您为这些列分配了备用名称,则连接器无法处理信号。

创建信号数据集合

您可以通过向源数据库提交标准的 SQL DDL 查询来创建信号表。

先决条件
  • 您拥有在源数据库上创建表的足够访问权限。

过程
  • 向源数据库提交 SQL 查询,以创建符合必需结构的表,如下例所示:

    CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

您为 id 变量的 VARCHAR 参数分配的空间量必须足以容纳发送到信号表的信号的 ID 字符串的大小。
如果 ID 的大小超过可用空间,连接器将无法处理信号。

以下示例显示了一个创建了三列 debezium_signal 表的 CREATE TABLE 命令:

CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

启用 Kafka 信号通道

您可以通过将 Kafka 信号通道添加到 signal.enabled.channels 配置属性来启用它,然后将接收信号的主题名称添加到 signal.kafka.topic 属性。启用信号通道后,将创建一个 Kafka 消费者来消耗发送到配置的信号主题的信号。

要使用 Kafka 信号触发大多数连接器的即席增量快照,您必须首先在连接器配置中启用 source 信号通道。源通道实现了一种水印机制,用于去重可能被增量快照捕获然后又在流恢复后再次捕获的事件。在使用信号通道触发已启用GTID 的只读 MySQL 数据库的增量快照时,不需要启用源通道。有关更多信息,请参阅MySQL 只读增量快照

消息格式

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

值为一个包含 typedata 字段的 JSON 对象。

当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 2. 执行快照数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

data-collections (数据集合)

N/A

逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名。
命名格式取决于数据库。

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
每个附加条件都是一个对象,指定用于过滤即席快照捕获数据的标准。您可以为每个附加条件设置以下属性:

data-collection (数据集合)

过滤器适用的数据集合的完全限定名。您可以对每个数据集合应用不同的过滤器。

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
快照过程评估数据集合中的记录,并仅捕获包含匹配值的记录。

分配给 filter 属性的特定值取决于即席快照的类型:

  • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照将其附加到查询的条件子句中。

  • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了一个典型的 execute-snapshot Kafka 消息:

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

启用 JMX 信号通道

您可以通过将 jmx 添加到连接器配置的 signal.enabled.channels 属性,然后启用 JMX MBean 服务器以公开信号 MBean 来启用 JMX 信号。

发送 JMX 信号

过程
  1. 使用您首选的 JMX 客户端(例如 JConsole 或 JDK Mission Control)连接到 MBean 服务器。

  2. 搜索 MBean debezium.<connector-type>.management.signals.<server>。该 MBean 公开了 signal 操作,该操作接受以下输入参数:

    p0

    信号的 ID。

    p1

    信号的类型,例如 execute-snapshot

    p2

    一个 JSON 数据字段,其中包含有关指定信号类型的附加信息。

  3. 通过提供输入参数值来发送 execute-snapshot 信号。
    在 JSON 数据字段中,包含下表中列出的信息:

    表 3. 执行快照数据字段
    Field (字段) Default (默认值) Value (值)

    type

    incremental (增量)

    要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

    data-collections (数据集合)

    N/A

    匹配要包含在快照中的表的完全限定名的逗号分隔的正则表达式数组。

    additional-conditions (附加条件)

    N/A

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    每个附加条件都是一个对象,指定用于过滤即席快照捕获数据的标准。您可以为每个附加条件设置以下属性:

    data-collection (数据集合)

    过滤器适用的数据集合的完全限定名。您可以对每个数据集合应用不同的过滤器。

    filter (过滤器)

    Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
    快照过程评估数据集合中的记录,并仅捕获包含匹配值的记录。

    分配给 filter 属性的特定值取决于即席快照的类型:

    • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照将其附加到查询的条件子句中。

    • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

下图显示了如何使用 JConsole 发送信号的示例:

Using JConsole to send an `execute-snapshot` signal

启用文件信号通道

您可以通过将 file 添加到连接器配置的 signal.enabled.channels 属性来启用文件信号通道。启用信号通道后,您必须配置连接器从文件中读取信号。默认情况下,信号文件创建在连接器类路径的根目录中,文件名为 file-signals.txt。如果您想使用不同的文件,请在连接器配置中设置 signal.file 属性,并指定文件名和路径。文件路径必须对连接器环境可用。

消息格式

信号文件中的信号表示为 JSON 对象,由 idtypedata 字段组成。

id 字段是信号的唯一标识符,通常是 UUID 字符串。

当信号类型设置为 execute-snapshot 时,data 字段必须包含下表中列出的字段:

表 4. 执行快照数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

要运行的快照类型。目前 Debezium 支持 incrementalblocking 类型。

data-collections (数据集合)

N/A

逗号分隔的正则表达式数组,匹配要包含在快照中的数据集合的完全限定名。
命名格式取决于数据库。

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
每个附加条件都是一个对象,指定用于过滤即席快照捕获数据的标准。您可以为每个附加条件设置以下属性:

data-collection (数据集合)

过滤器适用的数据集合的完全限定名。您可以对每个数据集合应用不同的过滤器。

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)
快照过程评估数据集合中的记录,并仅捕获包含匹配值的记录。

分配给 filter 属性的特定值取决于即席快照的类型:

  • 对于增量快照,您指定一个搜索条件片段,例如 "color='blue'",快照将其附加到查询的条件子句中。

  • 对于阻塞快照,您指定一个完整的 SELECT 语句,例如您可能在 snapshot.select.statement.overrides 属性中设置的语句。

以下示例显示了文件中典型的 execute-snapshot 消息:

{"id":"d139b9b7-7777-4547-917d-111111111111", "type":"execute-snapshot", "data":{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}}

自定义信号通道

信号机制设计为可扩展的。您可以根据需要实现通道,以您在环境中认为最有效的方式向 Debezium 发送信号。

添加信号通道涉及多个步骤:

提供自定义信号通道

自定义信号通道是实现 io.debezium.pipeline.signal.channels.SignalChannelReader 服务提供商接口(SPI)的 Java 类。例如:

public interface SignalChannelReader {

    String name(); (1)

    void init(CommonConnectorConfig connectorConfig); (2)

    List<SignalRecord> read(); (3)

    void close(); (4)
}
1 读取器的名称。要使 Debezium 能够使用该通道,请在连接器的 signal.enabled.channels 属性中指定此名称。
2 初始化通道所需的特定配置、变量或连接。
3 从通道读取信号。SignalProcessor 类调用此方法以检索要处理的信号。
4 关闭所有分配的资源。连接器停止时,Debezium 会调用此方法。

Debezium 核心模块依赖项

自定义信号通道 Java 项目具有对 Debezium 核心模块的编译依赖项。您必须在项目的 pom.xml 文件中包含这些编译依赖项,如下例所示:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version> (1)
</dependency>
1 ${version.debezium} 表示 Debezium 连接器的版本。

META-INF/services/io.debezium.pipeline.signal.channels.SignalChannelReader 文件中声明您的实现。

部署自定义信号通道

先决条件
  • 您有一个自定义信号通道 Java 程序。

过程
  • 要将自定义信号通道与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含您想使用的每个 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等)。

要将自定义信号通道与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。

配置连接器以使用自定义信号通道

将自定义信号通道的名称添加到 signal.enabled.channels 配置属性。

信号操作

您可以使用信号来触发以下操作:

某些信号与所有连接器不兼容。

记录信号

您可以通过创建具有 log 信号类型的信号表条目来请求连接器将条目添加到日志。处理信号后,连接器会将指定的消息打印到日志。可选地,您可以配置信号,以便生成的消息包含流坐标。

表 5. 用于添加日志消息的信号记录示例
Column Value (值) 描述

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

type

log

信号的操作类型。

data

{"message": "Signal message at offset {}"}

message 参数指定要打印到日志的字符串。
如果在消息中添加了占位符 ({}),它将被替换为流坐标。

即席增量快照信号

您可以通过创建具有 execute-snapshot 信号类型的信号来请求连接器启动即席快照。处理信号后,连接器将运行请求的快照操作。

与连接器启动后运行的初始快照不同,即席快照在运行时发生,即连接器已开始从数据库流式传输更改事件之后。您可以随时启动即席快照。

以下 Debezium 连接器可用即席快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 6. 即席快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
表 7. 即席快照信号消息示例
Key Value (值)

test_connector

{"type":"execute-snapshot","data": {"data-collections": ["public.MyFirstTable"], "type": "INCREMENTAL", "additional-conditions":[{"data-collection": "public.MyFirstTable", "filter":"color='blue' AND brand='MyBrand'"}]}}

有关即席快照的更多信息,请参阅您的连接器文档中的快照主题。

即席快照停止信号

您可以通过创建具有 stop-snapshot 信号类型的信号表条目来请求连接器停止正在进行的即席快照。处理信号后,连接器将停止当前正在进行的快照操作。

以下 Debezium 连接器可用停止即席快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 8. 停止即席快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

stop-snapshot

data

{"type":"INCREMENTAL", "data-collections": ["public.MyFirstTable"]}

您必须指定信号的 typedata-collections 字段是可选的。将 data-collections 字段留空以请求连接器停止当前快照中的所有活动。如果您希望增量快照继续,但您想从快照中排除特定集合,请提供要排除的集合名称或正则表达式的逗号分隔列表。连接器处理信号后,增量快照将继续,但会排除您指定的集合中的数据。

增量快照

增量快照是一种特殊的即席快照。在增量快照中,连接器会捕获您指定的表的基线状态,类似于初始快照。但是,与初始快照不同,增量快照分块捕获表,而不是一次性完成。连接器使用水印方法来跟踪快照的进度。

通过分块捕获指定表的初始状态而不是一次性操作,增量快照比初始快照过程提供了以下优势:

  • 在连接器捕获指定表的基线状态的同时,来自事务日志的近实时事件流将不间断地继续。

  • 如果增量快照过程被中断,它可以从停止的地方恢复。

  • 您可以随时启动增量快照。

增量快照暂停信号

您可以通过创建具有 pause-snapshot 信号类型的信号表条目来请求连接器暂停正在进行的增量快照。处理信号后,连接器将停止当前正在进行的快照操作。因此,无法指定数据集合,因为快照处理将在处理信号时暂停。

以下 Debezium 连接器可用暂停增量快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 9. 暂停增量快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

pause-snapshot

您必须指定信号的 typedata 字段被忽略。

增量快照恢复信号

您可以通过创建具有 resume-snapshot 信号类型的信号表条目来请求连接器恢复已暂停的增量快照。处理信号后,连接器将恢复先前暂停的快照操作。

以下 Debezium 连接器可用恢复增量快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 10. 恢复增量快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

resume-snapshot

您必须指定信号的 typedata 字段被忽略。

有关增量快照的更多信息,请参阅您的连接器文档中的快照主题。

阻塞快照信号

您可以通过创建具有 execute-snapshot 信号类型和 data.type 值为 blocking 的信号来请求连接器启动即席阻塞快照。处理信号后,连接器将运行请求的快照操作。

与连接器启动后运行的初始快照不同,即席阻塞快照在运行时发生,即连接器已停止从数据库流式传输更改事件之后。您可以随时启动即席阻塞快照。

以下 Debezium 连接器可用阻塞快照:

  • Db2

  • MariaDB (技术预览)

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

表 11. 阻塞快照信号记录示例
Column Value (值)

id

d139b9b7-7777-4547-917d-e1775ea61d41

type

execute-snapshot

data

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
表 12. 阻塞快照信号消息示例
Key Value (值)

test_connector

{"type":"execute-snapshot","data": {"type": "blocking"}

有关阻塞快照的更多信息,请参阅您的连接器文档中的快照主题。

定义自定义操作

自定义操作使您能够扩展 Debezium 信号框架以触发默认实现中不可用的操作。您可以将自定义操作与多个连接器一起使用。

要定义自定义信号操作,您必须定义以下接口:

@FunctionalInterface
public interface SignalAction<P extends Partition> {

    /**
     * @param signalPayload the content of the signal
     * @return true if the signal was processed
     */
    boolean arrived(SignalPayload<P> signalPayload) throws InterruptedException;
}

io.debezium.pipeline.signal.actions.SignalAction 公开一个方法,该方法接受一个参数,该参数代表通过信号通道发送的消息负载。

定义自定义信号操作后,使用以下 SPI 接口使自定义操作可供信号机制使用:io.debezium.pipeline.signal.actions.SignalActionProvider

public interface SignalActionProvider {

    /**
     * Create a map of signal action where the key is the name of the action.
     *
     * @param dispatcher the event dispatcher instance
     * @param connectorConfig the connector config
     * @return a concrete action
     */

    <P extends Partition> Map<String, SignalAction<P>> createActions(EventDispatcher<P, ? extends DataCollectionId> dispatcher, CommonConnectorConfig connectorConfig);
}

您的实现必须返回一个信号操作映射。将映射键设置为操作的名称。该键用作信号的type

Debezium 核心模块依赖项

自定义操作 Java 项目具有对 Debezium 核心模块的编译依赖项。请在项目的 pom.xml 文件中包含以下编译依赖项:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-core</artifactId>
    <version>${version.debezium}</version>
</dependency>

在前面的示例中,占位符 ${version.debezium} 代表 Debezium 连接器的版本。在 pom.xml 文件的 <properties> 部分指定 version.debezium 属性的值。例如:

<properties>
    <version.debezium>3.3.0.Final</version.debezium>
</properties>

META-INF/services/io.debezium.pipeline.signal.actions.SignalActionProvider 文件中声明您的提供程序实现。

部署自定义操作

先决条件
  • 您有一个自定义操作 Java 程序。

过程
  • 要将自定义操作与 Debezium 连接器一起使用,请将 Java 项目导出为 JAR 文件,并将该文件复制到包含您想使用的每个 Debezium 连接器的 JAR 文件的目录中。

    例如,在典型的部署中,Debezium 连接器文件存储在 Kafka Connect 目录(/kafka/connect)的子目录中,每个连接器 JAR 都在自己的子目录中(/kafka/connect/debezium-connector-db2/kafka/connect/debezium-connector-mysql 等)。

要将自定义操作与多个连接器一起使用,您必须将自定义信号通道 JAR 文件的副本放在每个连接器的子目录中。