Debezium PostgreSQL 连接器

目录

Debezium PostgreSQL 连接器捕获 PostgreSQL 数据库模式的行级更改。有关与连接器兼容的 PostgreSQL 版本的信息,请参阅 Debezium 版本概述

首次连接到 PostgreSQL 服务器或集群时,连接器会获取所有模式的一致快照。快照完成后,连接器会持续捕获已提交到 PostgreSQL 数据库的插入、更新和删除数据库内容的行级更改。连接器生成数据更改事件记录,并将它们流式传输到 Kafka 主题。对于每个表,默认行为是连接器将所有生成的事件流式传输到该表的单独 Kafka 主题。应用程序和服务从该主题消费数据更改事件记录。

概述

PostgreSQL 的 逻辑解码 功能在 9.4 版本中引入。它是一种机制,允许通过 输出插件 以用户友好的方式提取事务日志中提交的更改并处理这些更改。输出插件使客户端能够消费更改。

PostgreSQL 连接器包含两个主要部分,它们协同工作以读取和处理数据库更改:

  • 一个逻辑解码输出插件。您可能需要安装所选的输出插件。在运行 PostgreSQL 服务器之前,必须配置一个使用您所选输出插件的复制槽。该插件可以是以下之一:

    • decoderbufs 基于 Protobuf,由 Debezium 社区维护。

    • pgoutput 是 PostgreSQL 10+ 中的标准逻辑解码输出插件。它由 PostgreSQL 社区维护,并由 PostgreSQL 本身用于 逻辑复制。此插件始终存在,因此无需安装其他库。Debezium 连接器直接将原始复制事件流解释为更改事件。

  • Java 代码(实际的 Kafka Connect 连接器),用于读取所选逻辑解码输出插件生成的更改。它使用 PostgreSQL 的 流复制协议,通过 PostgreSQL JDBC 驱动程序

连接器为捕获的每个行级插入、更新和删除操作生成一个更改事件,并将每个表的更改事件记录发送到单独的 Kafka 主题。客户端应用程序读取与感兴趣的数据库表对应的 Kafka 主题,并可以对从这些主题接收到的每个行级事件做出反应。

PostgreSQL 通常会在一段时间后清除写入预写日志 (WAL) 段。这意味着连接器无法通过仅读取 WAL 来获取所有更改的完整历史记录。因此,当 PostgreSQL 连接器首次连接到特定的 PostgreSQL 数据库时,它首先对每个数据库模式执行一致性快照。在连接器完成快照后,它将继续从快照创建的确切点流式传输更改。这样,连接器就可以从所有数据的 consistent view 开始,并且不会遗漏在快照拍摄期间进行的任何更改。

连接器对故障具有容忍性。当连接器读取更改并生成事件时,它会记录每个事件的 WAL 位置。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),则在重新启动时,连接器将从上次中断的地方继续读取 WAL。这包括快照。如果连接器在快照过程中停止,则连接器在重新启动时将开始一个新的快照。

连接器依赖并反映 PostgreSQL 逻辑解码功能,该功能具有以下限制:

  • 逻辑解码不支持 DDL 更改。这意味着连接器无法将 DDL 更改事件报告给消费者。

  • 由于逻辑解码复制槽在提交时(而不是提交后)发布更改,因此可能会发生不良副作用。在两个主要场景中,客户端可能会观察到不一致的状态。首先,在主服务器死亡之前复制未完成时发布未提交的更改。其次,由于正在复制,会发布无法读取的更改(即读后写一致性)。例如,DebeziumEngine 消费者收到已创建但无法通过事务读取的行的通知。

此外,pgoutput 逻辑解码输出插件不捕获生成列的值,导致连接器输出中缺少这些列的数据。

出现问题时的行为描述了连接器在出现问题时如何响应。

Debezium 目前仅支持 UTF-8 字符编码的数据库。使用单字节字符编码,无法正确处理包含扩展 ASCII 字符的字符串。

连接器工作原理

要最优地配置和运行 Debezium PostgreSQL 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据非常重要。

安全性

要使用 Debezium 连接器从 PostgreSQL 数据库流式传输更改,连接器必须在数据库中拥有特定的权限。虽然授予必要权限的一种方法是向用户提供 superuser 权限,但这可能会使您的 PostgreSQL 数据暴露于未经授权的访问。与其向 Debezium 用户授予过多权限,不如创建一个专用的 Debezium 复制用户,并授予其特定权限。

有关配置 Debezium PostgreSQL 用户权限的更多信息,请参阅 设置权限。有关 PostgreSQL 逻辑复制安全性的更多信息,请参阅 PostgreSQL 文档

快照

大多数 PostgreSQL 服务器配置为不保留 WAL 段中的数据库完整历史记录。这意味着 PostgreSQL 连接器仅通过读取 WAL 无法看到数据库的整个历史记录。因此,连接器首次启动时,它会对数据库的每个模式执行初始一致性快照

初始快照的默认工作流程行为

以下步骤描述了连接器在初始快照期间执行的默认步骤。您可以通过将 snapshot.mode 连接器配置属性设置为 initial 以外的值来更改此行为。

  1. 启动一个使用 snapshot.isolation.mode 属性指定的隔离级别的事务。指定的模式决定了此事务中后续的读取是否针对数据的单一一致版本。根据模式,其他客户端对数据进行的更改(导致后续的 INSERTUPDATEDELETE 操作)可能会在此事务中可见。

  2. 读取服务器事务日志中的当前位置。

  3. 扫描数据库表和模式,为每行生成一个 READ 事件,并将该事件写入相应的表特定 Kafka 主题。

  4. 提交事务。

  5. 在连接器偏移量中记录快照的成功完成。

如果连接器在开始步骤 1 但在完成步骤 5 之前失败、重新平衡或停止,则在重新启动时,连接器将开始一个新的快照。在连接器完成初始快照后,PostgreSQL 连接器将继续从步骤 2 中读取的位置进行流式传输。这确保连接器不会遗漏任何更新。如果连接器因任何原因再次停止,则在重新启动后,它将从之前中断的地方继续流式传输更改。

表 1. snapshot.mode 连接器配置属性的选项
Option 描述

always (始终)

连接器总是在启动时执行快照。快照完成后,连接器将从上述序列的第 3 步继续流式传输更改。此模式在以下情况下很有用:

  • 已知某些 WAL 段已被删除且不再可用。

  • 集群故障后,如果提升了新的主节点。always 快照模式确保连接器不会遗漏在新主节点提升后但在连接器在主节点上重启之前所做的任何更改。

initial(默认)

当不存在 Kafka 偏移量主题时,连接器执行数据库快照。数据库快照完成后,会写入 Kafka 偏移量主题。如果 Kafka 偏移量主题中先前存储了 LSN,连接器将从该位置继续流式传输更改。

initial_only (仅初始)

连接器执行数据库快照,并在流式传输任何更改事件记录之前停止。如果连接器已启动但未在停止前完成快照,则连接器将重新启动快照过程,并在快照完成后停止。

no_data (无数据)

连接器从不执行快照。当连接器配置为这样时,它启动后会执行以下操作:

如果 Kafka 偏移量主题中先前存储了 LSN,连接器将从该位置继续流式传输更改。如果没有存储 LSN,连接器将从 PostgreSQL 逻辑复制槽在服务器上创建的时间点开始流式传输更改。仅当您知道所有感兴趣的数据仍反映在 WAL 中时,才使用此快照模式。

never (从不)

已弃用,请参阅 no_data

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

Set the snapshot mode to configuration_based to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'. (将快照模式设置为 configuration_based,通过具有前缀“snapshot.mode.configuration.based”的连接器属性集来控制快照行为。)

custom (自定义)

The custom snapshot mode lets you inject your own implementation of the io.debezium.spi.snapshot.Snapshotter interface. Set the snapshot.mode.custom.name configuration property to the name provided by the name() method of your implementation. The name is specified on the classpath of your Kafka Connect cluster. If you use the DebeziumEngine, the name is included in the connector JAR file. For more information, see custom snapshotter SPI. (custom 快照模式允许您注入 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。将 snapshot.mode.custom.name 配置属性设置为实现类的 name() 方法提供的名称。该名称在 Kafka Connect 集群的类路径中指定。如果您使用 DebeziumEngine,则名称包含在连接器 JAR 文件中。有关更多信息,请参阅custom snapshotter SPI。)

临时快照

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment (但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 提供了一个执行即席快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行即席快照:)

  • The connector configuration is modified to capture a different set of tables. (修改了连接器配置以捕获不同的表集。)

  • Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)

  • Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table. (您可以通过启动所谓的即席快照来重新运行之前捕获了快照的表的快照。即席快照需要使用信号表。通过向 Debezium 信号表发送信号请求来启动即席快照。)

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled. (当您启动现有表的即席快照时,连接器会将内容追加到该表已存在的主题中。如果之前存在的主题已被删除,并且启用了自动主题创建,则 Debezium 可以自动创建主题。)

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database. (即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的部分内容。此外,快照还可以捕获数据库中表的部分内容。)

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table (您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供要在快照中包含的表名称,如下表所示:)

表 2. execute-snapshot 信号记录的示例
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。)
Currently, you can request incremental or blocking snapshots. (目前,您可以请求 incrementalblocking 快照。)

data-collections (数据集合)

N/A

An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. (一个包含正则表达式的数组,匹配要包含在快照中的表的完全限定名称。)
对于 PostgreSQL 连接器,请使用以下格式指定表的完全限定名称:schema.table

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:)

data-collection (数据集合)

The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. (过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。)

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

surrogate-key (代理键)

N/A

An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. (一个可选字符串,指定连接器在快照过程中用作表主键的列名。)

Triggering an ad hoc incremental snapshot (触发即席增量快照)

您可以通过向信号表添加一个具有 execute-snapshot 信号类型的条目,或 向 Kafka 信号主题发送信号消息 来启动临时增量快照。在连接器处理消息后,它将开始快照操作。快照过程读取第一个和最后一个主键值,并将这些值用作每个表的开始和结束点。根据表中的条目数和配置的块大小,Debezium 将表分成块,然后逐个快照每个块。

有关更多信息,请参阅 增量快照

Triggering an ad hoc blocking snapshot (触发即席阻塞快照)

You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming. (您可以通过向信号表或信号主题添加具有 execute-snapshot 信号类型的条目来启动即席阻塞快照。在连接器处理消息后,它将开始快照操作。连接器将暂时停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。)

有关更多信息,请参阅 阻塞快照

增量快照

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)

在增量快照中,Debezium 不是一次性捕获数据库的完整状态(如初始快照),而是分阶段、分一系列可配置的块来捕获每个表。您可以指定要快照的表以及每个块的大小。块大小决定了快照在每次从数据库获取操作期间收集的行数。增量快照的默认块大小为 1024 行。

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process (随着增量快照的进行,Debezium 使用水位标记来跟踪其进度,并维护捕获的每个表行的记录。与标准的初始快照过程相比,这种分阶段的数据捕获方法提供了以下优势:)

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning. (如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在过程恢复后,快照将从停止的点开始,而不是从头重新捕获表。)

  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以将表添加到其 table.include.list 属性后,您可能会重新运行快照。

Incremental snapshot process (增量快照过程)

当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据配置的块大小将表拆分成块。逐块工作,然后捕获每个块中的表行。对于捕获的每一行,快照都会发出一个 READ 事件。该事件代表快照开始时行的值。

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka. (随着快照的进行,其他进程很可能会继续访问数据库,可能修改表记录。为了反映这些更改,INSERTUPDATEDELETE 操作照常提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些变更事件,并将相应的变更事件记录发送到 Kafka。)

How Debezium resolves collisions among records with the same primary key (Debezium 如何解决具有相同主键的记录之间的冲突)

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka. (在某些情况下,流式传输过程发出的 UPDATEDELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前发出修改表行的事件。当快照最终发出行的相应 READ 事件时,其值已过时。为了确保乱序到达的增量快照事件按正确的逻辑顺序处理,Debezium 采用缓冲方案来解决冲突。仅当快照事件与流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。)

Snapshot window (快照窗口)

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key. (为了帮助解决延迟到达的 READ 事件与修改同一表行的流式事件之间的冲突,Debezium 采用所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的间隔。在某个块的快照窗口打开之前,Debezium 会遵循其常规行为,将事务日志中的事件直接向下游发送到目标 Kafka 主题。但在某个块的快照打开到关闭的整个过程中,Debezium 会执行去重步骤以解决具有相同主键的事件之间的冲突。)

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。)

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic. (随着快照窗口的打开,Debezium 开始处理快照块,它会将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入的流式事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃已缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。当块的快照窗口关闭后,缓冲区将仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。)

The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)

To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table. (要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。)

Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDBMySQLPostgreSQL)来说,写入权限是可选的。)

Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)

Debezium PostgreSQL 连接器不支持在增量快照运行时进行模式更改。如果模式更改发生在增量快照开始之前之后发送了信号,则 passthrough 配置选项 database.autosave 设置为 conservative 以正确处理模式更改。

触发增量快照

To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database. You submit snapshot signals as SQL INSERT queries. (要启动增量快照,您可以将即席快照信号发送到源数据库上的信号表。您将快照信号作为 SQL INSERT 查询提交。)

After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation. (在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。)

The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation. Debezium currently supports the incremental and blocking snapshot types. (您提交的查询指定了要包含在快照中的表,并可选地指定了快照操作的类型。Debezium 目前支持 incrementalblocking 快照类型。)

To specify the tables to include in the snapshot, provide a data-collections array that lists the tables, or an array of regular expressions used to match tables, for example, (要指定要包含在快照中的表,请提供一个列出表的 data-collections 数组,或者一个用于匹配表的正则表达式数组,例如:)

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot. (增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会将空数组解释为无需执行任何操作,并且不会执行快照。)

If the name of a table that you want to include in a snapshot contains a dot (.), a space, or some other non-alphanumeric character, you must escape the table name in double quotes. (如果要包含在快照中的表名称包含点 (.)、空格或其他非字母数字字符,则必须用双引号转义表名称。)
例如,要包含存在于 public 模式中并且名为 My.Table 的表,请使用以下格式:"public.\"My.Table\""

先决条件
Using a source signaling channel to trigger an incremental snapshot (使用源信号通道触发增量快照)
  1. Send a SQL query to add the ad hoc incremental snapshot request to the signaling table (发送 SQL 查询将即席增量快照请求添加到信号表)

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    For example, (例如,)

    INSERT INTO myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'execute-snapshot',  (3)
        '{"data-collections": ["schema1.table1", "schema1.table2"], (4)
        "type":"incremental", (5)
        "additional-conditions":[{"data-collection": "schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)

    The values of the id,type, and data parameters in the command correspond to the fields of the signaling table. (命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 3. 用于将增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its own id string as a watermarking signal. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。而是在快照期间,Debezium 生成自己的 id 字符串作为水位标记信号。)

    3

    execute-snapshot

    The type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    A required component of the data field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot. (信号的 data 字段的必需组件,该字段指定一个表名称数组或匹配要包含在快照中的表名称的正则表达式。)
    该数组列出了使用 schema.table 格式的正则表达式,以匹配数据集合的完全限定名。此格式与您用来指定连接器 信号表名称的格式相同。

    5

    incremental (增量)

    An optional type component of the data field of a signal that specifies the type of snapshot operation to run. (信号的 data 字段的可选 type 组件,指定要运行的快照操作的类型。)
    Valid values are incremental and blocking. (有效值为 incrementalblocking。)
    If you do not specify a value, the connector defaults to performing an incremental snapshot. (如果您不指定值,连接器将默认执行增量快照。)

    6

    additional-conditions (附加条件)

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    Each additional condition is an object with data-collection and filter properties. You can specify different filters for each data collection. (每个附加条件是一个具有 data-collectionfilter 属性的对象。您可以为每个数据集合指定不同的过滤器。)
    * data-collection 属性是过滤器应用的查找集合的完全限定名称。有关 additional-conditions 参数的更多信息,请参阅 使用 additional-conditions 运行临时增量快照

使用 additional-conditions 运行临时增量快照

If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions parameter to the snapshot signal. (如果您希望快照仅包含表内容的子集,则可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。)

The SQL query for a typical snapshot takes the following form (典型快照的 SQL 查询形式如下:)

SELECT * FROM <tableName> ....

By adding an additional-conditions parameter, you append a WHERE condition to the SQL query, as in the following example (通过添加 additional-conditions 参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:)

SELECT * FROM <data-collection> WHERE <filter> ....

The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table (以下示例显示了一个 SQL 查询,用于向信号表发送带有附加条件的即席增量快照请求:)

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

For example, suppose you have a products table that contains the following columns (例如,假设您有一个 products 表,其中包含以下列:)

  • id (primary key)

  • color (颜色)

  • quantity (数量)

If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot (如果您希望 products 表的增量快照仅包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:)

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue"}]}');

The additional-conditions parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10 (additional-conditions 参数还允许您传递基于多个列的条件。例如,使用前面示例中的 products 表,您可以提交一个查询来触发增量快照,其中仅包含 color=bluequantity>10 的项目数据:)

INSERT INTO myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "schema1.products", "filter": "color=blue AND quantity>10"}]}');

The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)

Example 1. Incremental snapshot event message (示例 1. 增量快照事件消息)
{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" (1)
    },
    "op":"r", (2)
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 4. 增量 snapshot 事件消息中字段的描述
Item Field name (字段名) 描述

1

snapshot (快照)

Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。)
Currently, the only valid options are blocking and incremental. (目前,唯一有效选项是 blockingincremental。)
Specifying a type value in the SQL query that you submit to the signaling table is optional. (在您提交给信号表的 SQL 查询中指定 type 值是可选的。)
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)

2

op (操作)

Specifies the event type. (指定事件类型。)
The value for snapshot events is r, signifying a READ operation. (快照事件的值为 r,表示 READ 操作。)

使用 Kafka 信号通道触发增量快照

You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshotdata 字段必须具有以下字段:)

表 5. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports the incremental and blocking types. (要执行的快照类型。目前 Debezium 支持 incrementalblocking 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。)
使用与 signal.data.collection 配置选项要求的格式相同的格式指定名称。

additional-conditions (附加条件)

N/A

An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition: data-collection:: The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. filter:: Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:data-collection:: 过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。filter:: 指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

Example 2. An execute-snapshot Kafka message (示例 2. 一个 execute-snapshot Kafka 消息)
Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Ad hoc incremental snapshots with additional-conditions (带有附加条件的即席增量快照)

Debezium uses the additional-conditions field to select a subset of a table’s content. (Debezium 使用 additional-conditions 字段来选择表内容的子集。)

Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)

SELECT * FROM <tableName> …​.

When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collectionfilter 参数将被附加到 SQL 查询中,例如:)

SELECT * FROM <data-collection> WHERE <filter> …​.

For example, given a products table with the columns id (primary key), color, and brand, if you want a snapshot to include only content for which color='blue', when you request the snapshot, you could add the additional-conditions property to filter the content: :leveloffset: +1 (例如,给定一个具有 id(主键)、colorbrand 列的 products 表,如果您希望快照仅包含 color='blue' 的内容,在请求快照时,您可以添加 additional-conditions 属性来过滤内容::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`

You can also use the additional-conditions property to pass conditions based on multiple columns. For example, using the same products table as in the previous example, if you want a snapshot to include only the content from the products table for which color='blue', and brand='MyBrand', you could send the following request: :leveloffset: +1 (您还可以使用 additional-conditions 属性来传递基于多个列的条件。例如,使用前面示例中的相同 products 表,如果您希望快照仅包含 products 表中 color='blue'brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`

停止增量快照

In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database. (在某些情况下,可能需要停止增量快照。例如,您可能发现快照配置不正确,或者您想确保资源可用于其他数据库操作。您可以通过向源数据库上的信号表发送信号来停止正在运行的快照。)

You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT query. The stop-snapshot signal specifies the type of the snapshot operation as incremental, and optionally specifies the tables that you want to omit from the currently running snapshot. After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress. (您通过在 SQL INSERT 查询中发送来将停止快照信号提交到信号表。stop-snapshot 信号将快照操作的 type 指定为 incremental,并可选地指定您希望从当前正在运行的快照中排除的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在增量快照操作进行中时停止它。)

Additional resources (附加资源)

您还可以通过将 JSON 消息发送到 Kafka 信号主题来停止增量快照。

先决条件
Using a source signaling channel to stop an incremental snapshot (使用源信号通道停止增量快照)
  1. Send a SQL query to stop the ad hoc incremental snapshot to the signaling table (发送 SQL 查询以停止添加到信号表的即席增量快照)

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    For example, (例如,)

    INSERT INTO myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'stop-snapshot',  (3)
        '{"data-collections": ["schema1.table1", "schema1.table2"], (4)
        "type":"incremental"}'); (5)

    The values of the id, type, and data parameters in the signal command correspond to the fields of the signaling table. (信号命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 6. 用于将停止增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。)

    3

    stop-snapshot

    Specifies type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    An optional component of the data field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot. (信号的 data 字段的可选组件,指定一个表名称数组或匹配要从快照中删除的表名称的正则表达式。)
    该数组列出了匹配表完全限定名(格式为 schema.table)的正则表达式。

    If you omit this component from the data field, the signal stops the entire incremental snapshot that is in progress. (如果您从 data 字段中省略此组件,则信号将停止正在进行的整个增量快照。)

    5

    incremental (增量)

    A required component of the data field of a signal that specifies the type of snapshot operation to be stopped. (信号的 data 字段的必需组件,指定要停止的快照操作的类型。)
    Currently, the only valid option is incremental. (目前,唯一有效选项是 incremental。)
    If you do not specify a type value, the signal fails to stop the incremental snapshot. (如果您不指定 type 值,信号将无法停止增量快照。)

使用 Kafka 信号通道停止增量快照

You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshotdata 字段必须具有以下字段:)

表 7. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports only the incremental type. (要执行的快照类型。目前 Debezium 仅支持 incremental 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. (一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的表名称或匹配表名称的正则表达式数组。)
使用 schema.table 格式指定表名。

The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

只读增量快照

您可以配置具有只读数据库连接的 PostgreSQL 连接器,以在不需要信号数据集合表的情况下运行增量快照。要使用只读访问权限运行增量快照,连接器会将当前正在进行的事务用作高水位线和低水位线。通过将写入预写日志事件或心跳事件的事务 ID 与低水位线和高水位线进行比较来更新块窗口的状态。

要切换到只读实现,请将 read.only 属性的值设置为 true

先决条件
  • PostgreSQL 13 或更高版本。

Ad hoc read-only incremental snapshots (即席只读增量快照)

当 PostgreSQL 连接是只读的时,您可以使用任何可用的信号通道,而无需使用 source 通道。

Custom snapshotter SPI (自定义快照器 SPI)

For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)

io.debezium.snapshot.spi.Snapshotter

Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)

io.debezium.snapshot.spi.SnapshotQuery

Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)

io.debezium.snapshot.spi.SnapshotLock

Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)

io.debezium.snapshot.spi.Snapshotter interface. All built-in snapshot modes implement this interface. (io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)
/**
 * {@link Snapshotter} is used to determine the following details about the snapshot process:
 * <p>
 * - Whether a snapshot occurs. <br>
 * - Whether streaming continues during the snapshot. <br>
 * - Whether the snapshot includes schema (if supported). <br>
 * - Whether to snapshot data or schema following an error.
 * <p>
 * Although Debezium provides many default snapshot modes,
 * to provide more advanced functionality, such as partial snapshots,
 * you can customize implementation of the interface.
 * For more information, see the documentation.
 *
 *
 *
 */
@Incubating
public interface Snapshotter extends Configurable {

    /**
     * @return the name of the snapshotter.
     *
     *
     */
    String name();

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a data snapshot
     */
    boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a schema snapshot
     */
    boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @return {@code true} if the snapshotter should stream after taking a snapshot
     */
    boolean shouldStream();

    /**
     * @return {@code true} whether the schema can be recovered if database schema history is corrupted.
     */
    boolean shouldSnapshotOnSchemaError();

    /**
     * @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
     */
    boolean shouldSnapshotOnDataError();

    /**
     *
     * @return {@code true} if streaming should resume from the start of the snapshot
     * transaction, or {@code false} for when a connector resumes and takes a snapshot,
     * streaming should resume from where streaming previously left off.
     */
    default boolean shouldStreamEventsStartingFromSnapshot() {
        return true;
    }

    /**
     * Lifecycle hook called after the snapshot phase is successful.
     */
    default void snapshotCompleted() {
        // no operation
    }

    /**
     * Lifecycle hook called after the snapshot phase is aborted.
     */
    default void snapshotAborted() {
        // no operation
    }
}
io.debezium.snapshot.spi.SnapshotQuery interface. All built-in snapshot query modes implement this interface. (io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)
/**
 * {@link SnapshotQuery} is used to determine the query used during a data snapshot
 *
 *
 */
public interface SnapshotQuery extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Generate a valid query string for the specified table, or an empty {@link Optional}
     * to skip snapshotting this table (but that table will still be streamed from)
     *
     * @param tableId the table to generate a query for
     * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
     *                              include/exclude filters
     * @return a valid query string, or none to skip snapshotting this table
     */
    Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);

}
io.debezium.snapshot.spi.SnapshotLock interface. All built-in snapshot lock modes implement this interface. (io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)
/**
 * {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
 *
 *
 */
public interface SnapshotLock extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
     * implementation.
     */
    Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);

}

阻塞快照

To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)

A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)

You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)

  • You add a new table and you want to complete the snapshot while the connector is running. (添加了一个新表,并且您希望在连接器运行时完成快照。)

  • You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot. (添加了一个大表,并且您希望快照的完成时间比增量快照更快。)

Blocking snapshot process (阻塞快照过程)

When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed. (运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,流式传输将恢复。)

Configure snapshot (配置快照)

You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)

  • data-collections: to specify which tables must be snapshot. (data-collections:用于指定必须进行快照的表。)

  • data-collections: Specifies the tables that you want the snapshot to include. (data-collections:指定您希望快照包含的表。)
    This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of the table.include.list property, which specifies the tables to capture in a blocking snapshot. (此属性接受逗号分隔的正则表达式列表,这些表达式匹配完全限定的表名。此属性的行为类似于 table.include.list 属性的行为,后者指定要在阻塞快照中捕获的表。)

  • additional-conditions: You can specify different filters for different table. (additional-conditions:您可以为不同的表指定不同的过滤器。)

    • The data-collection property is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. (data-collection 属性是过滤器将应用的表的完全限定名称,并且根据数据库的不同,可能区分大小写或不区分大小写。)

    • The filter property will have the same value used in the snapshot.select.statement.overrides, the fully-qualified name of the table that should match by case. (filter 属性将具有与 snapshot.select.statement.overrides 中使用的值相同的值,即应区分大小写匹配的表的完全限定名称。)

For example (例如:)

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Possible duplicates (可能重复)

A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)

流式传输更改

PostgreSQL 连接器通常花费大部分时间从其连接的 PostgreSQL 服务器流式传输更改。此机制依赖于 PostgreSQL 的复制协议。该协议使客户端能够接收服务器在其事务日志中特定位置(称为日志序列号 (LSN))提交的更改。

每当服务器提交事务时,一个单独的服务器进程会调用逻辑解码插件的回调函数。该函数处理事务的更改,将它们转换为特定格式(对于 Debezium 插件是 Protobuf 或 JSON),并将它们写入输出流,然后客户端可以消费该输出流。

Debezium PostgreSQL 连接器充当 PostgreSQL 客户端。当连接器收到更改时,它会将事件转换为 Debezium 的创建更新删除事件,其中包含事件的 LSN。PostgreSQL 连接器将这些更改事件作为记录转发给运行在同一进程中的 Kafka Connect 框架。Kafka Connect 进程以生成的顺序异步地将更改事件记录写入相应的 Kafka 主题。

Kafka Connect 会定期在另一个 Kafka 主题中记录最新的偏移量。偏移量表示 Debezium 随每个事件包含的特定于源的位置信息。对于 PostgreSQL 连接器,每个更改事件中记录的 LSN 就是偏移量。

当 Kafka Connect 正常关闭时,它会停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器收到的最后一个偏移量。当 Kafka Connect 重新启动时,它会读取每个连接器的最后一个记录偏移量,并从其最后一个记录偏移量开始启动每个连接器。连接器重新启动时,它会向 PostgreSQL 服务器发送请求,以从该位置之后的位置开始发送事件。

PostgreSQL 连接器在逻辑解码插件发送的事件中检索模式信息。但是,连接器无法检索有关哪些列组成主键的信息。连接器从 JDBC 元数据(侧通道)获取此信息。如果表的复制键定义发生更改(通过添加、删除或重命名主键列),则会有一个很短的时间段,此时 JDBC 的主键信息与逻辑解码插件生成的更改事件不同步。在此短暂期间,可能会创建具有不一致键结构的邮件。为防止此不一致,请按以下方式更新主键结构:

  1. 将数据库或应用程序置于只读模式。

  2. 让 Debezium 处理所有剩余的事件。

  3. 停止 Debezium。

  4. 更新相关表中的主键定义。

  5. 将数据库或应用程序置于读/写模式。

  6. 重新启动 Debezium。

PostgreSQL 10+ 逻辑解码支持 (pgoutput)

从 PostgreSQL 10+ 开始,有一个称为 pgoutput 的逻辑复制流模式,由 PostgreSQL 本身支持。这意味着 Debezium PostgreSQL 连接器可以在不需要额外插件的情况下消费该复制流。这对于不支持或不允许安装插件的环境特别有价值。

有关更多信息,请参阅 设置 PostgreSQL

主题名称

默认情况下,PostgreSQL 连接器将表中发生的所有 INSERTUPDATEDELETE 操作的更改事件写入一个特定于该表的 Apache Kafka 主题。连接器使用以下约定来命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供了默认名称组件的定义:

topicPrefix

topic.prefix 配置属性指定的主题前缀

schemaName (模式名称)

更改事件发生的数据库模式的名称。

tableName

更改事件发生的数据库表的名称。

例如,假设 fulfillment 是一个连接器配置中的逻辑服务器名称,该连接器捕获 PostgreSQL 安装中的更改,该安装具有 postgres 数据库和 inventory 模式,其中包含四个表:productsproducts_on_handcustomersorders。连接器会将记录流式传输到这四个 Kafka 主题:

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

现在假设这些表不属于特定模式,而是创建在默认的 public PostgreSQL 模式中。Kafka 主题的名称将是:

  • fulfillment.public.products

  • fulfillment.public.products_on_hand

  • fulfillment.public.customers

  • fulfillment.public.orders

连接器应用类似的命名约定来标记其事务元数据主题

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

事务元数据

Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

对于每个事务 BEGINEND,Debezium 会生成一个包含以下字段的事件:

status

BEGINEND

id

唯一事务标识符的字符串表示形式,由 Postgres 事务 ID 本身和给定操作的 LSN 组成,中间用冒号分隔,即格式为 txID:LSN

ts_ms

数据源中事务边界事件(BEGINEND 事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段将代表 Debezium 处理事件的时间。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

示例
{
  "status": "BEGIN",
  "id": "571:53195829",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "571:53195832",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "s1.a",
      "event_count": 1
    },
    {
      "data_collection": "s2.a",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。

更改数据事件丰富

启用事务元数据后,数据消息 Envelope 将使用新的 transaction 字段进行丰富。该字段以字段的复合形式提供关于每个事件的信息:

id

唯一事务标识符的字符串表示。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中,每个数据集合的位置。

以下是消息示例

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
   ...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335451",
  "ts_ns": "1580390884335451325",
  "transaction": {
    "id": "571:53195832",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

数据更改事件

Debezium PostgreSQL 连接器为每个行级 INSERTUPDATEDELETE 操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于已更改的表。

Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。

以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:

{
 "schema": { (1)
   ...
  },
 "payload": { (2)
   ...
 },
 "schema": { (3)
   ...
 },
 "payload": { (4)
   ...
 },
}
表 8. 更改事件基本内容概述
Item Field name (字段名) 描述

1

schema

第一个 schema 字段是事件键的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件键的 payload 部分的内容。换句话说,第一个 schema 字段描述了已更改表的

主键(或唯一键,如果表没有主键)的结构。



可以通过设置 message.key.columns 连接器配置属性来覆盖表的复制键。在这种情况下,第一个模式字段描述了该属性标识的键的结构。

2

payload

第一个 payload 字段是事件键的一部分。它的结构由前面的 schema 字段描述,它包含已更改行的键。

3

schema

第二个 schema 字段是事件值的一部分。它指定了一个 Kafka Connect 模式,该模式描述了事件值 payload 部分的内容。换句话说,第二个 schema 描述了已更改行的结构。通常,此模式包含嵌套模式。

4

payload

第二个 payload 字段是事件值的一部分。它的结构由上一个 schema 字段描述,并且包含已更改行的实际数据。

默认行为是连接器将更改事件记录流式传输到名称与事件的源表相同的主题

从 Kafka 0.10 开始,Kafka 可以选择记录事件键和值以及消息创建(由生产者记录)或由 Kafka 写入日志的*时间戳*。

PostgreSQL 连接器确保所有 Kafka Connect 模式名称都符合 Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称的每个剩余字符以及模式和表名称的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,它将被替换为下划线字符。

如果逻辑服务器名称、schema 名称或表名称包含无效字符,并且区分名称的唯一字符是无效的,因此被替换为下划线,则这可能导致意外冲突。

更改事件键

对于给定的表,更改事件的键具有一个结构,其中包含事件创建时表中主键中每个列的一个字段。或者,如果表的 REPLICA IDENTITY 设置为 FULLUSING INDEX,则有一个字段对应每个唯一键约束。

考虑 public 数据库模式中定义的 customers 表以及该表更改事件键的示例。

示例表
CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);
示例更改事件键

如果 topic.prefix 连接器配置属性的值为 PostgreSQL_server,则 customers 表在具有此定义时,该表的所有更改事件都具有相同的键结构,在 JSON 中如下所示:

{
  "schema": { (1)
    "type": "struct",
    "name": "PostgreSQL_server.public.customers.Key", (2)
    "optional": false, (3)
    "fields": [ (4)
          {
              "name": "id",
              "index": "0",
              "schema": {
                  "type": "INT32",
                  "optional": "false"
              }
          }
      ]
  },
  "payload": { (5)
      "id": "1"
  },
}
表 9. 更改事件键描述
Item Field name (字段名) 描述

1

schema

键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 payload 部分的内容。此模式描述了已更改表的

主键的结构。

2

PostgreSQL_server.inventory.customers.Key

定义键的有效负载结构的模式的名称。此模式描述了已更改表的

主键的结构。键模式名称的格式为 connector-name.database-name.table-name.Key。在此示例中:


  • PostgreSQL_server 是生成此事件的连接器的名称。

  • inventory 是包含已更改表的数据库。

  • customers 是已更新的表。

3

optional

指示事件键的 payload 字段是否必须包含值。在此示例中,需要一个值在键的有效负载中。当表没有主键时,键的有效负载字段中的值是可选的。

4

fields (字段)

指定 payload 中预期的每个字段,包括每个字段的名称、索引和 schema。

5

payload

包含生成此更改事件的行的键。在此示例中,键包含一个名为 id 的字段,其值为 1

尽管 column.exclude.listcolumn.include.list 连接器配置属性允许您仅捕获表列的子集,但主键或唯一键中的所有列始终包含在事件的键中。

如果表没有主键或唯一键,则更改事件的键为 null。没有主键或唯一键约束的表中的行无法唯一标识。

更改事件值

更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。

考虑用于显示更改事件键示例的相同样本表。

CREATE TABLE customers (
  id SERIAL,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

对于此表更改的更改事件的值部分根据 REPLICA IDENTITY 设置和事件的操作而异。

副本标识

REPLICA IDENTITY 是 PostgreSQL 特定的表级设置,它确定逻辑解码插件可用于 UPDATEDELETE 事件的信息量。更具体地说,REPLICA IDENTITY 的设置控制在发生 UPDATEDELETE 事件时,对于表列的先前值(如果有)可用的信息。

REPLICA IDENTITY 有 4 个可能的值:

  • DEFAULT - 默认行为是,如果表有主键,则 UPDATEDELETE 事件包含表主键列的先前值。对于 UPDATE 事件,仅包含已更改值的那些主键列。

    如果表没有主键,连接器不会为该表发出 UPDATEDELETE 事件。对于没有主键的表,连接器仅发出创建事件。通常,没有主键的表用于将消息追加到表末尾,这意味着 UPDATEDELETE 事件没有用。

  • NOTHING - 发出的 UPDATEDELETE 操作的事件不包含有关任何表列先前值的信息。

  • FULL - 发出的 UPDATEDELETE 操作的事件包含表中所有列的先前值。

  • INDEX index-name - 发出的 UPDATEDELETE 操作的事件包含指定索引中的列的先前值。UPDATE 事件还包含已更新值的索引列。

创建事件

以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:

{
    "schema": { (1)
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value", (2)
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "first_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "last_name"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "email"
                    }
                ],
                "optional": true,
                "name": "PostgreSQL_server.inventory.customers.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ns"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "default": false,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source", (3)
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "PostgreSQL_server.inventory.customers.Envelope" (4)
    },
    "payload": { (5)
        "before": null, (6)
        "after": { (7)
            "id": 1,
            "first_name": "Anne",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (8)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863123,
            "ts_ns": 1559033904863123000,
            "snapshot": true,
            "db": "postgres",
            "sequence": "[\"24023119\",\"24023128\"]",
            "schema": "public",
            "table": "customers",
            "txId": 555,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "c", (9)
        "ts_ms": 1559033904863, (10)
        "ts_us": 1559033904863841, (10)
        "ts_ns": 1559033904863841257 (10)
    }
}
表 10. 创建 事件值字段描述
Item Field name (字段名) 描述

1

schema

描述值有效负载结构的

值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。

2

name (名称)

schema 部分,每个 name 字段指定了值有效负载中字段的模式。

PostgreSQL_server.inventory.customers.Value 是 payload 的 beforeafter 字段的模式。此模式特定于 customers 表。

beforeafter 字段的模式名称格式为 logicalName.tableName.Value,这确保了模式名称在数据库中是唯一的。这意味着在使用Avro 转换器时,每个逻辑源中每个表的最终 Avro 模式都有其自己的演变和历史记录。

3

name (名称)

io.debezium.connector.postgresql.Source 是 payload 的 source 字段的模式。此模式特定于 PostgreSQL 连接器。连接器将其用于它生成的所有事件。

4

name (名称)

PostgreSQL_server.inventory.customers.Envelope 是 payload 的整体结构的模式,其中 PostgreSQL_server 是连接器名称,inventory 是数据库,customers 是表。

5

payload

的实际数据。这是更改事件提供的信息。



事件的 JSON 表示可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的模式和有效负载部分。但是,通过使用Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。

6

before

一个可选字段,指定事件发生前行的状态。当 op 字段是创建的 c 时(如本例所示),before 字段为 null,因为此更改事件针对的是新内容。

此字段是否可用取决于每个表的 REPLICA IDENTITY 设置。

7

after

一个可选字段,指定事件发生后行的状态。在此示例中,after 字段包含新行idfirst_namelast_nameemail 列的值。

8

source (源)

描述事件源元数据的

必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 字符串化的 JSON 数组,包含其他偏移量信息。第一个值始终是最后提交的 LSN,第二个值始终是当前 LSN。任一值可能为 null

  • 模式名称。

  • 如果事件属于快照

  • 执行操作的事务 ID。

  • 数据库日志中操作的偏移量

  • 数据库中发生更改的时间戳

9

op (操作)

描述导致连接器生成事件的操作类型的

必需字符串。在此示例中,c 表示已创建行。有效值为:

  • c = create

  • u = update

  • d = delete

  • r = read(仅适用于快照)

  • t = truncate(截断)

  • m = message(消息)

10

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

更新事件

对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值

的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1
        },
        "after": { (2)
            "id": 1,
            "first_name": "Anne Marie",
            "last_name": "Kretchmar",
            "email": "annek@noanswer.org"
        },
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863769,
            "ts_ns": 1559033904863769000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 24023128,
            "xmin": null
        },
        "op": "u", (4)
        "ts_ms": 1465584025523,  (5)
        "ts_us": 1465584025523514,  (5)
        "ts_ns": 1465584025523514964,  (5)
    }
}
表 11. 更新 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,包含数据库提交之前的行中的值。在此示例中,仅存在主键列 id,因为该表的 REPLICA IDENTITY 设置默认为 DEFAULT

要使更新事件包含行中所有列的先前值,您需要通过运行 ALTER TABLE customers REPLICA IDENTITY FULL 来更改 customers 表。

2

after

一个可选字段,指定事件发生后行的状态。在此示例中,first_name 的值现在是 Anne Marie

3

source (源)

一个强制性字段,描述事件的源元数据。source 字段结构与*create* 事件中的字段相同,但某些值不同。源元数据包括:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 模式名称。

  • 如果事件是快照的一部分(更新事件始终为 false

  • 执行操作的事务 ID。

  • 数据库日志中操作的偏移量

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。在更新事件值中,op 字段值为 u,表示此行因更新而更改。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

更新行主键的列会更改行键的值。当键更改时,Debezium 会输出三个事件:一个带有旧键的 DELETE 事件和一个墓碑事件,然后是一个带有新键的事件。详细信息将在下一节中介绍。

主键更新

更改行主键字段的 UPDATE 操作称为主键更改。对于主键更改,连接器不会发送 UPDATE 事件记录,而是发送一个带有旧键的 DELETE 事件记录和一个带有新(已更新)键的 CREATE 事件记录。这些事件具有常规的结构和内容,此外,每个事件都有一个与主键更改相关的消息头。

  • DELETE 事件记录具有 __debezium.newkey 作为消息头。此标头的值是更新行的

    新主键。

  • CREATE 事件记录具有 __debezium.oldkey 作为消息头。此标头的值是更新行以前(旧)的主键。

删除事件

删除更改事件中的值具有与

同一表的创建更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:

{
    "schema": { ... },
    "payload": {
        "before": { (1)
            "id": 1
        },
        "after": null, (2)
        "source": { (3)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863852,
            "ts_ns": 1559033904863852000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "d", (4)
        "ts_ms": 1465581902461, (5)
        "ts_us": 1465581902461496, (5)
        "ts_ns": 1465581902461496187, (5)
    }
}
表 12. 删除 事件值字段描述
Item Field name (字段名) 描述

1

before

一个可选字段,指定事件发生前行的状态。在删除事件值中,before 字段包含在数据库提交删除行之前行中的值。

在此示例中,before 字段仅包含主键列,因为该表的 REPLICA IDENTITY 设置为 DEFAULT

2

after

一个可选字段,指定事件发生后行的状态。在删除事件值中,after 字段为 null,表示该行不再存在。

3

source (源)

一个强制性字段,描述事件的源元数据。在*delete* 事件值中,source 字段结构与同一表的*create* 和*update* 事件相同。许多 source 字段值也相同。在*delete* 事件值中,ts_mslsn 字段值以及其他值可能已更改。但是,*delete* 事件值中的 source 字段提供相同的元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含已删除行的数据库和表

  • 模式名称。

  • 如果事件是快照的一部分(删除事件始终为 false

  • 执行操作的事务 ID。

  • 数据库日志中操作的偏移量

  • 数据库中发生更改的时间戳

4

op (操作)

描述操作类型的

必需字符串。op 字段值为 d,表示此行已被删除。

5

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

*delete* 更改事件记录为使用者提供了处理该行删除所需的信息。

为了使消费者能够处理为没有主键的表生成的删除事件,请将表的 REPLICA IDENTITY 设置为 FULL。当表没有主键且表的 REPLICA IDENTITY 设置为 DEFAULTNOTHING 时,删除事件没有 before 字段。

PostgreSQL 连接器事件设计用于与Kafka 日志压缩配合使用。日志压缩允许移除一些较旧的消息,只要保留每个键至少最新的消息。这使 Kafka 能够回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

Tombstone events (墓碑事件)

当行被删除时,删除事件值仍然可以与日志压缩一起使用,因为 Kafka 可以移除具有相同键的所有早期消息。但是,为了让 Kafka 移除具有相同键的所有消息,消息值必须为 null。为了实现这一点,PostgreSQL 连接器在删除事件之后会跟一个特殊的墓碑事件,该事件具有相同的键但值为 null

截断事件

截断更改事件表示表已被截断。在这种情况下,消息键为 null,消息值如下所示:

{
    "schema": { ... },
    "payload": {
        "source": { (1)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863112,
            "ts_ns": 1559033904863112000,
            "snapshot": false,
            "db": "postgres",
            "schema": "public",
            "table": "customers",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "t", (2)
        "ts_ms": 1559033904961, (3)
        "ts_us": 1559033904961654, (3)
        "ts_ns": 1559033904961654789 (3)
    }
}
表 13. 截断事件值字段描述
Item Field name (字段名) 描述

1

source (源)

描述事件源元数据的

必需字段。在截断事件值中,source 字段结构与

同一表的创建更新删除事件相同,提供了此元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 模式名称。

  • 如果事件是快照的一部分(删除事件始终为 false

  • 执行操作的事务 ID。

  • 数据库日志中操作的偏移量

  • 数据库中发生更改的时间戳

2

op (操作)

描述操作类型的

必需字符串。op 字段值为 t,表示此表被截断。

3

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

如果单个 TRUNCATE 操作影响多个表,连接器会为每个被截断的表发出一个截断更改事件记录。

截断事件表示对整个表进行的更改,并且没有消息键。因此,对于具有多个分区的

主题,对于与表相关的更改事件(创建更新等)或截断事件,没有排序保证。例如,如果使用者从多个分区读取表的事件,它可能会在从另一个分区接收到删除表中所有数据的截断事件后,从一个分区接收到表的更新事件。仅保证使用单个分区的主题的排序。

如果您不希望连接器捕获截断事件,请使用 skipped.operations 选项将其过滤掉。

消息事件

此事件类型仅通过 Postgres 14+ 上的 pgoutput 插件支持(Postgres 文档)。

消息事件表示一个通用逻辑解码消息已直接插入 WAL,通常使用 pg_logical_emit_message 函数。消息键是具有单个字段 prefixStruct,包含插入消息时指定的名称。对于事务性消息,消息值如下所示:

{
    "schema": { ... },
    "payload": {
        "source": { (1)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863879,
            "ts_ns": 1559033904863879000,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "txId": 556,
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", (2)
        "ts_ms": 1559033904961, (3)
        "ts_us": 1559033904961621, (3)
        "ts_ns": 1559033904961621379, (3)
        "message": { (4)
            "prefix": "foo",
            "content": "Ymfy"
        }
    }
}

与其他事件类型不同,非事务性消息将不具有任何关联的 BEGINEND 事务事件。对于非事务性消息,消息值如下所示:

{
    "schema": { ... },
    "payload": {
        "source": { (1)
            "version": "3.3.1.Final",
            "connector": "postgresql",
            "name": "PostgreSQL_server",
            "ts_ms": 1559033904863,
            "ts_us": 1559033904863762,
            "ts_ns": 1559033904863762000,
            "snapshot": false,
            "db": "postgres",
            "schema": "",
            "table": "",
            "lsn": 46523128,
            "xmin": null
        },
        "op": "m", (2)
        "ts_ms": 1559033904961, (3)
        "ts_us": 1559033904961741, (3)
        "ts_ns": 1559033904961741698, (3)
        "message": { (4)
            "prefix": "foo",
            "content": "Ymfy"
    }
}
表 14. 消息事件值字段描述
Item Field name (字段名) 描述

1

source (源)

描述事件源元数据的必填字段。在消息事件值中,source 字段结构将不包含任何消息事件的 tableschema 信息,并且只有在消息事件是事务性的时才会有 txId

  • Debezium 版本

  • 连接器类型和名称

  • 数据库名称

  • 模式名称(对于消息事件始终为 ""

  • 表名称(对于消息事件始终为 ""

  • 如果事件是快照的一部分(消息事件始终为 false

  • 执行操作的事务 ID(非事务性消息事件为 null

  • 数据库日志中操作的偏移量

  • 事务性消息:消息插入 WAL 的时间戳

  • 非事务性消息:连接器遇到消息的时间戳

2

op (操作)

描述操作类型的必填字符串。op 字段值为 m,表示这是一个消息事件。

3

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

对于事务性消息事件,source 对象中的 ts_ms 属性表示事务性消息事件在数据库中发生更改的时间。通过比较 payload.source.ts_mspayload.ts_ms 的值,可以确定源数据库更新和 Debezium 之间的延迟。

对于非事务性消息事件,source 对象中的 ts_ms 表示连接器遇到消息事件的时间,而 payload.ts_ms 表示连接器处理事件的时间。此差异是由于 Postgres 的通用逻辑消息格式中不存在提交时间戳,并且非事务性逻辑消息前面没有 BEGIN 事件(其中包含时间戳信息)。

4

message(消息)

包含消息元数据的字段

  • Prefix(前缀)(文本)

  • Content(内容)(字节数组,根据二进制处理模式设置进行编码)

数据类型映射

PostgreSQL 连接器使用与行所在的表相似的结构来表示行的更改。事件包含每个列值的字段。该值如何在事件中表示取决于列的 PostgreSQL 数据类型。以下各节描述了连接器如何将 PostgreSQL 数据类型映射到事件字段中的字面类型语义类型

  • *字面类型*描述了如何使用 Kafka Connect schema 类型:INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT 来表示值。

  • 语义类型使用 Kafka Connect 模式的名称描述了 Kafka Connect 模式如何捕获字段的*含义*。

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器

基本类型

下表描述了连接器如何映射基本类型。

表 15. PostgreSQL 基本数据类型映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BOOLEAN

BOOLEAN

n/a

BIT(1)

BOOLEAN

n/a

BIT( > 1)

BYTES

io.debezium.data.Bits

length 模式参数包含一个表示比特数的整数。生成的 byte[] 以小端序包含比特,其大小足以容纳指定的比特数。例如,numBytes = n/8 + (n % 8 == 0 ? 0 : 1),其中 n 是比特数。

BIT VARYING[(M)]

BYTES

io.debezium.data.Bits

length 模式参数包含一个表示比特数的整数(如果没有为列指定长度,则为 2^31 - 1)。生成的 byte[] 以小端序包含比特,其大小基于内容。指定的尺寸 (M) 存储在 io.debezium.data.Bits 类型的 length 参数中。

SMALLINT, SMALLSERIAL

INT16

n/a

INTEGER, SERIAL

INT32

n/a

BIGINT, BIGSERIAL, OID

INT64

n/a

REAL

FLOAT32

n/a

DOUBLE PRECISION

FLOAT64

n/a

CHAR[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

CHARACTER[(M)]

STRING

n/a

BPCHAR[(M)]

STRING

n/a

CHARACTER VARYING[(M)]

STRING

n/a

TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

包含时区信息的带时区时间戳的字符串表示形式,其中时区为 GMT。

TIMETZ, TIME WITH TIME ZONE

STRING

io.debezium.time.ZonedTime

包含时区信息的时间值的字符串表示形式,其中时区为 GMT。

INTERVAL [P]

INT64

io.debezium.time.MicroDuration
(默认)

使用 365.25 / 12.0 公式计算的平均每月天数的时间间隔的近似微秒数。

INTERVAL [P]

STRING

io.debezium.time.Interval
(当 interval.handling.mode 设置为 string 时)

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

BYTEA

BYTESSTRING

n/a

原始字节(默认)、base64 编码字符串、base64-url 安全编码字符串或十六进制编码字符串,具体取决于连接器的 二进制处理模式设置。

Debezium 仅支持值配置为 hex 的 Postgres bytea_output。有关 PostgreSQL 二进制数据类型的更多信息,请参阅 PostgreSQL 文档

JSON, JSONB

STRING

io.debezium.data.Json

包含 JSON 文档、数组或标量值的字符串表示形式。

XML

STRING

io.debezium.data.Xml

包含 XML 文档的字符串表示形式。

UUID

STRING

io.debezium.data.Uuid

包含 PostgreSQL UUID 值的字符串表示形式。

POINT

STRUCT

io.debezium.data.geometry.Point

包含一个具有两个 FLOAT64 字段 (x,y) 的结构。每个字段代表一个几何点的坐标。

LTREE

STRING

io.debezium.data.Ltree

包含 PostgreSQL LTREE 值的字符串表示形式。

CITEXT

STRING

n/a

INET

STRING

n/a

INT4RANGE

STRING

n/a

整数范围。

INT8RANGE

STRING

n/a

bigint 范围。

NUMRANGE

STRING

n/a

numeric 范围。

TSRANGE

STRING

n/a

包含不带时区的 timestamp 范围的字符串表示形式。

TSTZRANGE

STRING

n/a

包含带本地系统时区的 timestamp 范围的字符串表示形式。

DATERANGE

STRING

n/a

包含日期范围的字符串表示形式。它始终具有排他的上限。

ENUM

STRING

io.debezium.data.Enum

包含 PostgreSQL ENUM 值的字符串表示形式。允许值的集合在 allowed 模式参数中维护。

时间类型

除了 PostgreSQL 的 TIMESTAMPTZTIMETZ 数据类型(包含时区信息)之外,时间类型的映射方式取决于 time.precision.mode 连接器配置属性的值。以下各节描述了这些映射:

time.precision.mode=adaptive

time.precision.mode 属性设置为 adaptive(默认值)时,连接器根据列的数据类型定义确定字面类型和语义类型。这确保事件精确表示数据库中的值。

表 16. time.precision.modeadaptive 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

io.debezium.time.Date

表示自 epoch 以来的天数。

TIME(1), TIME(2), TIME(3)

INT32

io.debezium.time.Time

表示午夜后的毫秒数,不包含时区信息。

TIME(4)TIME(5)TIME(6)

INT64

io.debezium.time.MicroTime

表示午夜后的微秒数,不包含时区信息。

TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

表示自 epoch 以来的毫秒数,不包含时区信息。

TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

表示自 epoch 以来的微秒数,不包含时区信息。

time.precision.mode=adaptive_time_microseconds

time.precision.mode 配置属性设置为 adaptive_time_microseconds 时,连接器根据列的数据类型定义确定时间类型的字面类型和语义类型。这确保事件精确表示数据库中的值,除了所有 TIME 字段都捕获为微秒。

表 17. time.precision.modeadaptive_time_microseconds 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

io.debezium.time.Date

表示自 epoch 以来的天数。

TIME([P])

INT64

io.debezium.time.MicroTime

表示微秒为单位的时间值,不包含时区信息。PostgreSQL 允许精度 P 的范围是 0-6,以存储到微秒精度。

TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)

INT64

io.debezium.time.Timestamp

表示自纪元以来的毫秒数,不包含时区信息。

TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP

INT64

io.debezium.time.MicroTimestamp

表示自纪元以来的微秒数,不包含时区信息。

time.precision.mode=connect

time.precision.mode 配置属性设置为 connect 时,连接器使用 Kafka Connect 逻辑类型。当消费者只能处理内置的 Kafka Connect 逻辑类型而无法处理可变精度的时间值时,这可能很有用。但是,由于 PostgreSQL 支持微秒精度,当数据库列的小数秒精度值大于 3 时,具有 connect 时间精度模式的连接器生成的事件将导致精度损失

表 18. time.precision.modeconnect 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

org.apache.kafka.connect.data.Date

表示自 epoch 以来的天数。

TIME([P])

INT64

org.apache.kafka.connect.data.Time

表示自午夜以来的毫秒数,不包含时区信息。PostgreSQL 允许 P 的范围是 0-6,以存储到微秒精度,但当 P 大于 3 时,此模式会导致精度损失。

TIMESTAMP([P])

INT64

org.apache.kafka.connect.data.Timestamp

表示自 epoch 以来的毫秒数,不包含时区信息。PostgreSQL 允许 P 的范围是 0-6,以存储到微秒精度,但当 P 大于 3 时,此模式会导致精度损失。

time.precision.mode=isostring

time.precision.mode 属性设置为 isostring,以将连接器配置为将时间类型的值映射为 ISO-8601 格式的字符串(UTC 时间)。应用此设置后,连接器使用语义类型 io.debezium.time.IsoTimestampio.debezium.time.IsoTimeio.debezium.time.IsoDate 来映射时间戳、日期时间、日期和时间值。

表 19. time.precision.modeisostring 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

STRING

io.debezium.time.IsoDate

根据 ISO 8601 标准,以 UTC 格式表示日期值,例如 2017-09-15Z

TIME([P])

STRING

io.debezium.time.IsoTime

根据 ISO 8601 标准,以 UTC 格式表示时间值,例如 04:05:11.789Z

TIMESTAMP([P])

STRING

io.debezium.time.IsoTimestamp

根据 ISO 8601 标准,以 UTC 格式表示时间戳值,例如 2019-07-09T02:28:57.123456Z

time.precision.mode=microseconds

time.precision.mode 属性设置为 microseconds,以将连接器配置为以微秒精度指定时间类型的值。应用此设置后,连接器使用语义类型 io.debezium.time.MicroTimeio.debezium.time.MicroTimestamp 来映射时间戳、日期时间(datetime)和时间值。

表 20. time.precision.modemicroseconds 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

io.debezium.time.Date

表示自 epoch 以来的天数。

TIME([P])

INT64

io.debezium.time.MicroTime

表示微秒为单位的时间值,不包含时区信息。在 PostgreSQL 中,精度 p 参数指定时间值秒部分的小数位数。精度范围可以从 0(无小数秒)到 6(微秒精度)。

TIMESTAMP([P])

INT64

io.debezium.time.MicroTimestamp

表示自 epoch 以来的微秒数,不包含时区信息。在 PostgreSQL 中,精度 p 参数指定时间值秒部分的小数位数。精度范围可以从 0(无小数秒)到 6(微秒精度)。

time.precision.mode=nanoseconds

time.precision.mode 属性设置为 nanoseconds,以将连接器配置为以纳秒精度指定时间类型的值。应用此设置后,连接器使用语义类型 io.debezium.time.NanoTimeio.debezium.time.NanoTimestamp(存储纳秒精度值)来映射时间戳、日期时间(datetime)和时间值。

表 21. time.precision.modenanoseconds 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

io.debezium.time.Date

表示自 epoch 以来的天数。

TIME([P])

INT64

io.debezium.time.NanoTime

表示纳秒为单位的时间值,不包含时区信息。在 PostgreSQL 中,精度 p 参数指定时间值秒部分的小数位数。精度范围可以从 0(无小数秒)到 6(微秒精度)。

TIMESTAMP([P])

INT64

io.debezium.time.NanoTimestamp

表示自 epoch 以来的纳秒数,不包含时区信息。在 PostgreSQL 中,精度 p 参数指定时间值秒部分的小数位数。精度范围可以从 0(无小数秒)到 6(微秒精度)。

TIMESTAMP 类型

TIMESTAMP 类型表示不带时区信息的时间戳。此类列根据 UTC 转换为等效的 Kafka Connect 值。例如,当 time.precision.mode 未设置为 connect 时,“2018-06-20 15:13:16.945104” 的 TIMESTAMP 值由 io.debezium.time.MicroTimestamp 表示,其值为“1529507596945104”。

运行 Kafka Connect 和 Debezium 的 JVM 的时区不影响此转换。

PostgreSQL 支持在 TIMESTAMP 列中使用 +/-infinite 值。当为正无穷大时,这些特殊值转换为时间戳,值为 9223372036825200000;当为负无穷大时,值为 -9223372036832400000。此行为模仿了 PostgreSQL JDBC 驱动程序的标准行为。有关参考,请参阅 org.postgresql.PGStatement 接口。

Decimal 类型

PostgreSQL 连接器配置属性 decimal.handling.mode 的设置决定了连接器如何映射 decimal 类型。

decimal.handling.mode 属性设置为 precise 时,连接器将为所有 DECIMALNUMERICMONEY 列使用 Kafka Connect org.apache.kafka.connect.data.Decimal 逻辑类型。这是默认模式。

表 22. decimal.handling.modeprecise 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

NUMERIC[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale 模式参数包含一个整数,表示小数点移动了多少位。

DECIMAL[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale 模式参数包含一个整数,表示小数点移动了多少位。

MONEY[(M[,D])]

BYTES

org.apache.kafka.connect.data.Decimal

scale 模式参数包含一个整数,表示小数点移动了多少位。scale 模式参数由 money.fraction.digits 连接器配置属性确定。

对此规则有一个例外。当 NUMERICDECIMAL 类型在没有精度约束的情况下使用时,来自数据库的值对于每个值都有不同的(可变)精度。在这种情况下,连接器使用 io.debezium.data.VariableScaleDecimal,它同时包含 transferred value 的值和精度。

表 23. 没有精度约束时 DECIMALNUMERIC 类型的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

NUMERIC

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

DECIMAL

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode 属性设置为 double 时,连接器将所有 DECIMALNUMERICMONEY 值表示为 Java double 值,并按以下表所示进行编码。

表 24. decimal.handling.modedouble 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型(模式名称)。

NUMERIC[(M[,D])]

FLOAT64

DECIMAL[(M[,D])]

FLOAT64

MONEY[(M[,D])]

FLOAT64

decimal.handling.mode 配置属性的最后一个可能设置为 string。在这种情况下,连接器将 DECIMALNUMERICMONEY 值表示为它们的格式化字符串表示形式,并按以下表所示进行编码。

表 25. decimal.handling.modestring 时的映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型(模式名称)。

NUMERIC[(M[,D])]

STRING

DECIMAL[(M[,D])]

STRING

MONEY[(M[,D])]

STRING

PostgreSQL 支持 NaN(非数字)作为存储在 DECIMAL/NUMERIC 值中的特殊值,当 decimal.handling.mode 设置为 stringdouble 时。在这种情况下,连接器将 NaN 编码为 Double.NaN 或字符串常量 NAN

HSTORE 类型

PostgreSQL 连接器配置属性 hstore.handling.mode 的设置决定了连接器如何映射 HSTORE 值。

hstore.handling.mode 属性设置为 json(默认值)时,连接器将 HSTORE 值表示为 JSON 值的字符串表示形式,并按以下表所示进行编码。当 hstore.handling.mode 属性设置为 map 时,连接器使用 MAP 模式类型处理 HSTORE 值。

表 26. HSTORE 数据类型映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

HSTORE

STRING

io.debezium.data.Json

示例:使用 JSON 转换器时的输出表示形式是 {"key" : "val"}

HSTORE

MAP

n/a

示例:使用 JSON 转换器时的输出表示形式是 {"key" : "val"}

域类型

PostgreSQL 支持基于其他底层类型的用户定义类型。当使用此类列类型时,Debezium 会根据完整的类型层次结构公开列的表示。

捕获使用 PostgreSQL 域类型的列中的更改需要特别考虑。当列定义为包含扩展自默认数据库类型的域类型,并且该域类型定义了自定义长度或精度时,生成的模式将继承该定义的长度或精度。

当列定义为包含扩展自另一个定义了自定义长度或精度的域类型的域类型时,生成的模式将继承定义的长度或精度,因为 PostgreSQL 驱动程序的列元数据中不包含该信息。

网络地址类型

PostgreSQL 具有可以存储 IPv4、IPv6 和 MAC 地址的数据类型。最好使用这些类型而不是纯文本类型来存储网络地址。网络地址类型提供输入错误检查以及专门的运算符和函数。

表 27. 网络地址类型映射
PostgreSQL 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

INET

STRING

n/a

IPv4 和 IPv6 网络

CIDR

STRING

n/a

IPv4 和 IPv6 主机和网络

MACADDR

STRING

n/a

MAC 地址

MACADDR8

STRING

n/a

EUI-64 格式的 MAC 地址

PostGIS 类型

PostgreSQL 连接器支持所有 PostGIS 数据类型

表 28. PostGIS 数据类型映射
PostGIS 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

GEOMETRY
(平面)

STRUCT

io.debezium.data.geometry.Geometry

包含一个具有两个字段的结构:

  • srid (INT32) - 空间参考系统标识符,用于定义结构中存储的几何对象类型。

  • wkb (BYTES) - 使用 Well-Known-Binary 格式编码的几何对象的二进制表示。

有关格式详细信息,请参阅 Open Geospatial Consortium Simple Features Access 规范

GEOGRAPHY
(球面)

STRUCT

io.debezium.data.geometry.Geography

包含一个具有两个字段的结构:

  • srid (INT32) - 空间参考系统标识符,用于定义结构中存储的地理对象类型。

  • wkb (BYTES) - 使用 Well-Known-Binary 格式编码的几何对象的二进制表示。

有关格式详细信息,请参阅 Open Geospatial Consortium Simple Features Access 规范

pgvector 类型

PostgreSQL 连接器支持所有 pgvector 扩展数据类型

表 29. PostgreSQL pgvector 数据类型映射
pgvector 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

VECTOR

ARRAY (FLOAT64)

io.debezium.data.DoubleVector

HALFVEC

ARRAY (FLOAT32)

io.debezium.data.FloatVector

SPARSEVEC

STRUCT

io.debezium.data.SparseVector

包含一个具有以下字段的结构:

dimensions(维度)(INT16)

稀疏向量的总长度。

vector(MAP (INT16, FLOAT64))

表示稀疏向量的映射。
每个映射值包含以下元素:

  • 向量元素的索引号(从 1 开始)。

  • 向量元素的值。

TOAST 值

PostgreSQL 对页面大小有限制。这意味着大于 8 KB 的值需要使用 TOAST 存储来存储。这会影响来自数据库的复制消息。使用 TOAST 机制存储且未更改的值不包含在消息中,除非它们是表复制标识的一部分。Debezium 没有安全的方法可以从数据库中带外直接读取丢失的值,因为这可能会导致竞态条件。因此,Debezium 遵循以下规则来处理 TOAST 值:

  • 具有 REPLICA IDENTITY FULL 的表 - TOAST 列值与任何其他列一样,包含在更改事件的 beforeafter 字段中。

  • 具有 REPLICA IDENTITY DEFAULT 的表 - 当从数据库接收 UPDATE 事件时,未更改的 TOAST 列值(如果不是复制标识的一部分)将不包含在事件中。类似地,当接收 DELETE 事件时,before 字段中不包含任何 TOAST 列(如果有)。由于 Debezium 无法安全地提供列值,因此连接器返回一个占位符值,该值由连接器配置属性 unavailable.value.placeholder 定义。

默认值

如果在数据库模式中为列指定了默认值,PostgreSQL 连接器将在可能的情况下尝试将此值传播到 Kafka 模式。支持大多数常见数据类型,包括:

  • BOOLEAN

  • 数字类型(INTFLOATNUMERIC 等)

  • 文本类型(CHARVARCHARTEXT 等)

  • 时间类型(DATETIMEINTERVALTIMESTAMPTIMESTAMPTZ

  • JSONJSONBXML

  • UUID

请注意,对于时间类型,默认值的解析由 PostgreSQL 库提供;因此,PostgreSQL 通常支持的任何字符串表示形式也应该由连接器支持。

如果默认值是由函数生成的,而不是直接内联指定的,则连接器将改为导出给定数据类型的 0 的等效值。这些值包括:

  • BOOLEANFALSE

  • 数字类型的 0(具有适当的精度)

  • 文本/XML 类型的空字符串

  • JSON 类型的 {}

  • DATETIMESTAMPTIMESTAMPTZ 类型的 1970-01-01

  • TIME00:00

  • INTERVALEPOCH

  • UUID00000000-0000-0000-0000-000000000000

当前,此支持仅限于显式使用函数。例如,支持带括号的 CURRENT_TIMESTAMP(6),但不支持 CURRENT_TIMESTAMP

默认值传播的支持主要允许在使用 PostgreSQL 连接器与强制执行模式版本之间兼容性的模式注册表时进行安全的模式演进。由于这个主要问题以及不同插件的刷新行为,Kafka 模式中存在的默认值不能保证始终与数据库模式中的默认值同步。

  • 默认值可能会“迟到”出现在 Kafka 模式中,具体取决于某个插件何时/如何触发内存中模式的刷新。如果默认值在刷新之间更改了多次,则可能永远不会出现/跳过默认值。

  • 默认值可能“提前”出现在 Kafka 模式中,如果连接器在有待处理记录时触发了模式刷新。这是因为列元数据是在刷新时从数据库读取的,而不是存在于复制消息中。如果连接器落后并且发生刷新,或者在连接器停止一段时间而源数据库继续写入更新时,在连接器启动时可能会发生这种情况。

此行为可能出乎意料,但仍然是安全的。仅受影响的是模式定义,而消息中存在的实际值将与写入源数据库的值保持一致。

自定义转换器

默认情况下,Debezium 不会复制自定义数据类型的列(例如,使用 SQL CREATE TYPE 语句创建的复合类型)的数据。要复制具有自定义数据类型的列,请遵循创建自定义转换器的说明,但有几个重要注意事项:

  • 将连接器配置中的 include.unknown.datatypes 属性设置为 true。默认的 false 设置会导致自定义转换器始终返回 null 值。

  • 传递给转换器的值类型取决于为复制槽配置的逻辑解码输出插件。

    • decoderbufs 传递列数据的字节数组(byte[])表示。

    • pgoutput 传递列数据的字符串表示。

设置 PostgreSQL

在使用 PostgreSQL 连接器监视提交到 PostgreSQL 服务器的更改之前,请决定您打算使用哪个逻辑解码插件。如果您计划使用原生的 pgoutput 逻辑复制流支持,则必须将逻辑解码插件安装到 PostgreSQL 服务器中。之后,启用复制槽,并配置一个具有足够权限执行复制的用户。

如果您的数据库由 Heroku Postgres 等服务托管,您可能无法安装插件。如果这样,并且您使用的是 PostgreSQL 10+,您可以使用 pgoutput 解码器支持来捕获数据库中的更改。如果这不是一个选项,您将无法将 Debezium 与您的数据库一起使用。

云端 PostgreSQL

Amazon RDS 上的 PostgreSQL

可以捕获在 Amazon RDS 中运行的 PostgreSQL 数据库中的更改。为此:

  • 将实例参数 rds.logical_replication 设置为 1

  • 通过以数据库 RDS 主用户身份运行查询 SHOW wal_level 来验证 wal_level 参数是否设置为 logical。在多区域复制设置中可能不是这样。您无法手动设置此选项。当 rds.logical_replication 参数设置为 1 时,它会被自动更改。如果更改上述设置后 wal_level 未设置为 logical,可能是因为在参数组更改后需要重新启动实例。重新启动在您的维护窗口中发生,或者您可以手动启动重新启动。

  • 将 Debezium plugin.name 参数设置为 pgoutput

  • 从具有 rds_replication 角色的 AWS 账户发起逻辑复制。该角色授予管理逻辑槽和使用逻辑槽流式传输数据的权限。默认情况下,只有 AWS 上的主用户账户在 Amazon RDS 上拥有 rds_replication 角色。要允许主账户以外的用户账户发起逻辑复制,您必须授予该账户 rds_replication 角色。例如,grant rds_replication to <my_user>。您必须拥有 superuser 访问权限才能将 rds_replication 角色授予用户。要允许主账户以外的账户创建初始快照,您必须对要捕获的表授予 SELECT 权限。有关 PostgreSQL 逻辑复制安全性的更多信息,请参阅 PostgreSQL 文档

Azure 上的 PostgreSQL

可以使用 Debezium 与 Azure Database for PostgreSQL 配合使用,它支持 Debezium 支持的 pgoutput 逻辑解码 插件。

将 Azure 复制支持设置为 logical。您可以使用 Azure CLIAzure Portal 进行配置。例如,要使用 Azure CLI,以下是您需要执行的 az postgres server 命令:

az postgres server configuration set --resource-group mygroup --server-name myserver --name azure.replication_support --value logical

az postgres server restart --resource-group mygroup --name myserver

Cloud SQL for PostgreSQL

要将 Debezium 与 Cloud SQL for PostgreSQL 配合使用,您必须将数据库配置为使用 pgoutput 逻辑解码插件。

以下各节概述了为将 Cloud SQL for PostgreSQL 数据库用于 Debezium 而必须完成的任务。

设置 cloudsql.logical_decoding 标志

在 Cloud SQL 中,您可以通过将 cloudsql.logical_decoding 标志设置为 on 来启用逻辑解码。设置标志后,它会自动将 wal_level 配置参数调整为 logical

您可以使用 Google Cloud 控制台或 gcloud 命令行工具来修改 cloudsql.logical_decoding 标志。

有关如何更改 Cloud SQL 中标志值的详细说明,请参阅 Google Cloud SQL 文档

要验证设置值是否反映了您的更改,请运行以下查询:

SHOW wal_level;
创建复制用户

要使用逻辑解码功能,您必须创建一个具有 REPLICATION 属性的 PostgreSQL 用户,或将此属性授予现有用户。

要创建具有 REPLICATION 属性的用户:

postgres 用户或 cloudsqlsuperuser 用户组的成员身份登录,并运行以下命令:

CREATE USER replication_user WITH REPLICATION IN ROLE cloudsqlsuperuser LOGIN PASSWORD 'secret';
要为现有用户设置 REPLICATION 属性:

postgres 用户或 cloudsqlsuperuser 用户组的成员身份登录,并运行以下命令:

ALTER USER existing_user WITH REPLICATION;
指定 Debezium plugin.name

在 Debezium 连接器配置中,将 plugin.name 属性的值设置为 pgoutput,如下例所示:

{
      ..
      "plugin.name": "pgoutput",
      ..
      ..
}

CrunchyBridge 上的 PostgreSQL

可以使用 Debezium 与 CrunchyBridge 配合使用;逻辑复制已启用。pgoutput 插件可用。您需要创建一个复制用户并提供正确的权限。

在使用 pgoutput 插件时,建议您将 filtered 配置为 publication.autocreate.mode。如果您使用 all_tables(这是 publication.autocreate.mode 的默认值),并且找不到发布,连接器会尝试使用 CREATE PUBLICATION <publication_name> FOR ALL TABLES; 来创建它,但这会因为权限不足而失败。

安装逻辑解码输出插件

有关设置和测试逻辑解码插件的更详细说明,请参阅 PostgreSQL 的逻辑解码输出插件安装

从 PostgreSQL 9.4 开始,读取写入预写日志的唯一方法是安装逻辑解码输出插件。插件用 C 语言编写,编译后安装在运行 PostgreSQL 服务器的机器上。插件使用多个 PostgreSQL 特定 API,如 PostgreSQL 文档所述。

PostgreSQL 连接器与 Debezium 支持的逻辑解码插件之一配合使用,以接收 Protobuf 格式或 pgoutput 格式的数据库更改事件。pgoutput 插件随 PostgreSQL 数据库一同提供。有关通过 decoderbufs 插件使用 Protobuf 的更多详细信息,请参阅插件 文档,其中讨论了其要求、限制以及如何编译。

为简单起见,Debezium 还提供了一个基于上游 PostgreSQL 服务器映像的容器映像,在其之上编译并安装了插件。您可以使用此映像作为安装所需详细步骤的示例。

Debezium 逻辑解码插件仅在 Linux 机器上安装和测试过。对于 Windows 和其他操作系统,可能需要不同的安装步骤。

插件差异

插件行为并非在所有情况下都完全相同。已识别出这些差异

  • 虽然所有插件在流式传输过程中检测到模式更改时都会从数据库刷新模式元数据,但 pgoutput 插件在触发此类刷新方面更为“积极”。例如,对列的默认值进行的更改将触发 pgoutput 的刷新,而其他插件在另一个更改触发刷新(例如,添加新列)之前不会注意到此更改。这是由于 pgoutput 的行为,而不是 Debezium 本身。

所有最新的差异都跟踪在一个测试套件的 Java 类中。

配置 PostgreSQL 服务器

如果您使用的 逻辑解码插件 不是 pgoutput,在安装它之后,请按以下方式配置 PostgreSQL 服务器:

  1. 要使插件在启动时加载,请将以下内容添加到 postgresql.conf 文件:

    # MODULES
    shared_preload_libraries = 'decoderbufs' (1)
    1 指示服务器在启动时加载 decoderbufs 逻辑解码插件(插件的名称在 Protobuf make 文件中设置)。
  2. 要配置复制槽,无论使用何种解码器,请在 postgresql.conf 文件中指定以下内容:

    # REPLICATION
    wal_level = logical             (1)
    1 指示服务器使用逻辑解码和预写日志。

根据您的需求,在使用 Debezium 时,您可能需要设置其他 PostgreSQL 流式复制参数。例如,max_wal_sendersmax_replication_slots 用于增加可以同时访问发送服务器的连接器数量,wal_keep_size 用于限制复制槽可以保留的最大 WAL 大小。有关配置流式复制的更多信息,请参阅 PostgreSQL 文档

Debezium 使用 PostgreSQL 的逻辑解码,它使用复制槽。复制槽保证即使在 Debezium 停机期间也能保留 Debezium 所需的所有 WAL 段。因此,密切监控复制槽以避免过多的磁盘消耗和其他可能发生的情况(例如,如果复制槽闲置太久而导致目录膨胀)非常重要。有关更多信息,请参阅 PostgreSQL 流式复制文档

连接到 Postgres 主节点(即,不是读取副本)并且 Postgres 版本为 17 或更高版本时,Debezium 将创建一个启用了 故障转移 的复制槽。这意味着在发生故障时,Debezium 可以继续从提升为主节点的副本读取更改,而不会丢失任何事件。这需要同步槽的状态从主服务器到副本服务器;有关详细信息,请参阅 Postgres 文档。

如果您使用的是 synchronous_commit 设置,而不是 on,则建议将 wal_writer_delay 设置为 10 毫秒等值,以实现低延迟的更改事件。否则,将应用默认值,这会增加约 200 毫秒的延迟。

设置权限

设置一个 PostgreSQL 服务器来运行 Debezium 连接器需要一个可以执行复制的数据库用户。复制只能由具有适当权限的用户执行,并且仅限于配置的主机数量。

虽然默认情况下,超级用户具有必要的 REPLICATIONLOGIN 角色,但如 安全 中所述,最好不要为 Debezium 复制用户提供提升的权限。相反,创建一个具有最低必需权限的 Debezium 用户。

先决条件
  • PostgreSQL 管理权限。

过程
  1. 要为用户提供复制权限,请定义一个至少具有 REPLICATIONLOGIN 权限的 PostgreSQL 角色,然后将该角色授予用户。例如:

    CREATE ROLE <name> REPLICATION LOGIN;

设置权限以允许 Debezium 在使用 pgoutput 时创建 PostgreSQL 发布

如果您使用 pgoutput 作为逻辑解码插件,Debezium 必须以具有特定权限的用户身份在数据库中运行。

Debezium 从为表创建的发布中流式传输 PostgreSQL 源表的更改事件。发布包含从一个或多个表中生成的更改事件的过滤集。每个发布中的数据根据发布规范进行过滤。该规范可以由 PostgreSQL 数据库管理员或 Debezium 连接器创建。为了允许 Debezium PostgreSQL 连接器创建发布并指定要复制到其中的数据,连接器必须在数据库中以特定权限运行。

有几种确定发布如何创建的选项。通常,最好在设置连接器之前手动创建要捕获的表的发布。但是,您可以配置您的环境,以便 Debezium 可以自动创建发布,并指定添加到其中的数据。

Debezium 使用包含列表和排除列表属性来指定数据如何插入到发布中。有关允许 Debezium 创建发布的选项的更多信息,请参阅 publication.autocreate.mode

为了让 Debezium 创建 PostgreSQL 发布,它必须以具有以下权限的用户身份运行:

  • 数据库中的复制权限,用于将表添加到发布。

  • 数据库上的 CREATE 权限,用于添加发布。

  • 表上的 SELECT 权限,用于复制初始表数据。表所有者自动拥有该表的 SELECT 权限。

要将表添加到发布,用户必须是表的拥有者。但由于源表已存在,您需要一种机制来与原始拥有者共享所有权。要启用共享所有权,请创建一个 PostgreSQL 复制组,然后将现有表的所有者和复制用户添加到该组。

过程
  1. 创建一个复制组。

    CREATE ROLE <replication_group>;
  2. 将表的原始所有者添加到组。

    GRANT REPLICATION_GROUP TO <original_owner>;
  3. 将 Debezium 复制用户添加到组。

    GRANT REPLICATION_GROUP TO <replication_user>;
  4. 将表的拥有权转移给 <replication_group>

    ALTER TABLE <table_name> OWNER TO REPLICATION_GROUP;

为了让 Debezium 指定捕获配置,publication.autocreate.mode 的值必须设置为 filtered

配置 PostgreSQL 以允许与 Debezium 连接器主机进行复制

为了让 Debezium 能够复制 PostgreSQL 数据,您必须配置数据库以允许与运行 PostgreSQL 连接器的主机进行复制。要指定允许复制到数据库的客户端,请向基于主机的 PostgreSQL 身份验证文件 pg_hba.conf 添加条目。有关 pg_hba.conf 文件的更多信息,请参阅 PostgreSQL 文档。

过程
  • 将条目添加到 pg_hba.conf 文件,以指定允许复制到数据库主机的 Debezium 连接器主机。例如:

    pg_hba.conf 文件示例
    local   replication     <youruser>                          trust   (1)
    host    replication     <youruser>  127.0.0.1/32            trust   (2)
    host    replication     <youruser>  ::1/128                 trust   (3)
    表 30. pg_hba.conf 设置说明
    Item 描述

    1

    指示服务器允许本地用户 <youruser> 进行复制,即在服务器机器上。

    2

    指示服务器允许 localhost 上的用户 <youruser> 使用 IPV4 接收复制更改。

    3

    指示服务器允许 localhost 上的用户 <youruser> 使用 IPV6 接收复制更改。

有关网络掩码的更多信息,请参阅 PostgreSQL 文档

支持的 PostgreSQL 拓扑

PostgreSQL 连接器可以与独立的 PostgreSQL 服务器或 PostgreSQL 服务器集群一起使用。

PostgreSQL 15 或更早版本的集群

当您在运行 PostgreSQL 15 或更早版本的环境中部署 Debezium 时,您只能在集群的主服务器上配置逻辑复制槽。您不能在集群的副本服务器上配置逻辑复制。

因此,Debezium PostgreSQL 连接器只能连接和通信主服务器。如果主服务器发生故障,连接器将停止。要从故障中恢复,您必须修复集群,然后要么将原始主服务器提升为 primary,要么将另一个 PostgreSQL 服务器提升为 primary。有关更多信息,请参阅 故障后从新的 pg15 主服务器捕获数据

PostgreSQL 16 或更高版本的集群

当您使用 PostgreSQL 16 或更高版本集群部署 Debezium 时,您可以在副本服务器上设置逻辑复制槽。此功能使 Debezium 能够从主服务器以外的服务器捕获更改事件。但是,请注意,Debezium 连接到副本服务器的延迟通常高于连接到主服务器的延迟。

另外,请记住,PostgreSQL 副本服务器上的复制槽不会自动与主服务器上的相应槽同步。为了在 PostgreSQL 16 集群中发生故障后进行恢复,您应该定期执行手动同步,将备用服务器上的复制槽位置推进到与主服务器上的位置匹配。

Debezium 与 PostgreSQL 17 或更高版本集群

当您使用 PostgreSQL 17 或更高版本部署 Debezium 时,您可以在主服务器上设置逻辑复制槽,并为这些槽启用故障转移。PostgreSQL 可以自动将故障转移槽的状态传播到一个或多个副本服务器。在启用自动复制的环境中,如果发生故障,可用副本将自动提升为主节点。Debezium 可以继续从新主节点摄取更改,而无需任何配置更改,从而确保连接器不会错过任何事件。

WAL 磁盘空间消耗

在某些情况下,WAL 文件占用的 PostgreSQL 磁盘空间可能会急剧增加或超出常规比例。这种情况有多种可能的原因:

  • 连接器已接收数据的 LSN 在服务器的 pg_replication_slots 视图的 confirmed_flush_lsn 列中可用。早于此 LSN 的数据不再可用,数据库负责收回磁盘空间。

    同样在 pg_replication_slots 视图中,restart_lsn 列包含连接器可能需要的 LSN 最早的 WAL。如果 confirmed_flush_lsn 的值定期增加而 restart_lsn 的值滞后,则数据库需要收回空间。

    数据库通常以批块的方式收回磁盘空间。这是预期行为,用户无需采取任何措施。

  • 正在跟踪的数据库中有许多更新,但只有极少数更新与连接器正在捕获更改的表/模式相关。可以通过定期心跳事件轻松解决这种情况。设置 heartbeat.interval.ms 连接器配置属性。

    为了让连接器检测和处理来自心跳表事件,您必须将该表添加到 publication.name 属性指定的 PostgreSQL 发布中。如果此发布早于您的 Debezium 部署,连接器将使用已定义的发布。如果发布尚未配置为自动复制数据库中的所有表的更改,您必须明确将心跳表添加到发布中,例如:

    ALTER PUBLICATION <publicationName> ADD TABLE <heartbeatTableName>;

  • PostgreSQL 实例包含多个数据库,其中一个数据库是高流量数据库。Debezium 捕获另一个低流量数据库的更改,与之相比。然后 Debezium 无法确认 LSN,因为复制槽是按数据库工作的,并且 Debezium 没有被调用。由于 WAL 被所有数据库共享,因此使用的量会不断增长,直到 Debezium 正在捕获更改的数据库发出事件。为克服此问题,需要:

    • 通过 heartbeat.interval.ms 连接器配置属性启用定期心跳记录生成。

    • 定期从 Debezium 正在捕获更改的数据库发出更改事件。

    然后,一个单独的进程将定期更新该表,通过插入新行或重复更新同一行。PostgreSQL 然后调用 Debezium,它会确认最新的 LSN 并允许数据库收回 WAL 空间。此任务可以通过 heartbeat.action.query 连接器配置属性来自动化。

对于 AWS RDS with PostgreSQL 的用户,在空闲环境中可能会发生类似于高流量/低流量场景的情况。AWS RDS 会频繁(5 分钟)地将其自身系统表的写入对客户端隐藏。同样,定期发出事件可以解决此问题。

为同一数据库服务器设置多个连接器

Debezium 使用复制槽从数据库流式传输更改。这些复制槽以 LSN(日志序列号)的形式维护当前位置,该 LSN 是指向 Debezium 连接器正在使用的 WAL 中某个位置的指针。这有助于 PostgreSQL 在 Debezium 处理之前保留 WAL。单个复制槽只能由单个使用者或进程存在,因为不同的使用者可能有不同的状态,并且可能需要来自不同位置的数据。

由于一个复制槽只能由一个连接器使用,因此为每个 Debezium 连接器创建唯一的复制槽至关重要。尽管当连接器不活动时,Postgres 可能会允许其他连接器使用复制槽,但这可能很危险,因为它可能导致数据丢失,因为一个槽只会发出每个更改一次[更多信息]。

除了复制槽之外,在使用 pgoutput 插件时,Debezium 还使用发布来流式传输事件。与复制槽类似,发布是在数据库级别定义的,并且是为一组表定义的。因此,您需要为每个连接器一个唯一的发布,除非连接器处理相同的表集。有关允许 Debezium 创建发布的选项的更多信息,请参阅 publication.autocreate.mode

请参阅 slot.namepublication.name,了解如何为每个连接器设置唯一的复制槽名称和发布名称。

升级 PostgreSQL

当您升级 Debezium 使用的 PostgreSQL 数据库时,您必须采取特定步骤来防止数据丢失并确保 Debezium 继续运行。一般来说,Debezium 对网络故障和其他中断造成的干扰具有弹性。例如,当连接器监视的数据库服务器停止或崩溃时,在连接器重新建立与 PostgreSQL 服务器的通信后,它会继续从日志序列号 (LSN) 偏移量记录的最后一个位置读取。连接器从 Kafka Connect 偏移量主题检索有关最后一个记录偏移量的信息,并查询配置的 PostgreSQL 复制槽以获取具有相同值的日志序列号 (LSN)。

为了让连接器启动并捕获 PostgreSQL 数据库的更改事件,必须存在一个复制槽。然而,作为 PostgreSQL 升级过程的一部分,复制槽会被删除,并且在升级完成后不会恢复原始槽。因此,当连接器重新启动并请求复制槽的最后一个已知偏移量时,PostgreSQL 无法返回信息。

您可以创建一个新的复制槽,但您需要做的不只是创建一个新槽来防止数据丢失。新的复制槽只能提供自创建槽后发生的更改的 LSN;它不能提供升级前发生的事件的偏移量。当连接器重新启动时,它首先从 Kafka 偏移量主题请求最后一个已知偏移量。然后,它会发送一个请求到复制槽,要求返回从偏移量主题检索到的偏移量信息。但是,新的复制槽无法提供连接器在预期位置恢复流式传输所需的信息。然后,连接器会跳过日志中的任何现有更改事件,只从日志中的最新位置恢复流式传输。这可能导致静默数据丢失:连接器不会为跳过的事件发出任何记录,也不会提供任何信息表明事件已被跳过。

有关如何执行 PostgreSQL 数据库升级以使 Debezium 能够继续捕获事件同时最大限度地降低数据丢失风险的指南,请参阅以下过程。

过程
  1. 暂时停止写入数据库的应用程序,或将其置于只读模式。

  2. 备份数据库。

  3. 暂时禁用对数据库的写访问。

  4. 验证在阻止写操作之前在数据库中发生的任何更改是否已保存到预写日志 (WAL) 中,并且 WAL LSN 是否反映在复制槽上。

  5. 为连接器提供足够的时间来捕获写入复制槽的所有事件记录。
    此步骤确保在停机前发生的所有更改事件都得到处理,并已保存到 Kafka。

  6. 通过检查已刷新 LSN 的值,验证连接器是否已完成从复制槽读取所有数据库更改。

  7. 通过停止 Kafka Connect,优雅地关闭连接器。
    Kafka Connect 将停止连接器,将所有事件记录刷新到 Kafka,并记录从每个连接器接收到的最后一个偏移量。

    作为停止整个 Kafka Connect 群集的替代方法,您可以删除连接器来停止它。请勿删除偏移量主题,因为它可能被其他 Kafka 连接器共享。之后,在您恢复对数据库的写访问并准备好重新启动连接器时,您必须重新创建连接器。

  8. 作为 PostgreSQL 管理员,请删除主数据库服务器上的复制槽。请勿使用 slot.drop.on.stop 属性来删除复制槽。此属性仅用于测试。

  9. 停止数据库。

  10. 使用批准的 PostgreSQL 升级过程,例如 pg_upgradepg_dumppg_restore,执行升级。

  11. (可选) 使用标准的 Kafka 工具从偏移量存储主题中删除连接器偏移量。
    有关如何删除连接器偏移量的示例,请参阅 Debezium 社区 FAQ 中的 如何删除连接器的已提交偏移量

  12. 重启数据库。

  13. 作为 PostgreSQL 管理员,在数据库上创建一个 Debezium 逻辑复制槽。您必须在启用数据库写入之前创建槽。否则,Debezium 无法捕获更改,导致数据丢失。

  14. 验证定义 Debezium 要捕获的表的发布在升级后是否仍然存在。如果发布不存在,请以 PostgreSQL 管理员身份连接到数据库以创建新发布。

  15. 如果在上一步中必须创建新发布,请更新 Debezium 连接器配置,将新发布的名称添加到 publication.name 属性。

  16. 在连接器配置中,重命名连接器。

  17. 在连接器配置中,将 slot.name 设置为 Debezium 复制槽的名称。

  18. 验证新的复制槽是否可用。

  19. 恢复对数据库的写访问,并重新启动写入数据库的任何应用程序。

  20. 在连接器配置中,将 snapshot.mode 属性设置为 never,然后重新启动连接器。

    如果您无法验证 Debezium 是否已读取所有数据库更改(步骤 6),您可以将连接器配置为执行新快照,方法是设置 snapshot.mode=initial。如果需要,您可以通过检查升级前立即拍摄的数据库备份的内容来确认连接器是否读取了复制槽中的所有更改。

Additional resources (附加资源)

部署

要部署 Debezium PostgreSQL 连接器,您需要安装 Debezium PostgreSQL 连接器存档,配置连接器,然后通过将配置添加到 Kafka Connect 来启动连接器。

先决条件
过程
  1. 下载 Debezium PostgreSQL 连接器插件存档

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path

  4. 重新启动您的 Kafka Connect 进程以加载新 JAR 文件。

如果您使用不可变容器,请参阅 Debezium 的容器镜像,其中已安装 Kafka、PostgreSQL 和带 PostgreSQL 连接器的 Kafka Connect,并已准备好运行。您还可以在 Kubernetes 和 OpenShift 上运行 Debezium

quay.io 获取的 Debezium 容器映像未经严格测试或安全分析,仅供测试和评估目的使用。这些映像不适用于生产环境。为降低生产部署中的风险,请仅部署由受信任供应商积极维护并经过彻底漏洞分析的容器。

连接器配置示例

以下是一个 PostgreSQL 连接器的配置示例,该连接器连接到端口 5432、IP 地址 192.168.99.100 上的 PostgreSQL 服务器,其逻辑名称为 fulfillment。通常,您通过设置连接器可用的配置属性来配置 Debezium PostgreSQL 连接器,将其存储在 JSON 文件中。

您可以选择为数据库中的一部分模式和表生成事件。此外,您还可以选择忽略、屏蔽或截断包含敏感数据、尺寸大于指定大小或您不需要的列。

{
  "name": "fulfillment-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "5432", (4)
    "database.user": "postgres", (5)
    "database.password": "postgres", (6)
    "database.dbname" : "postgres", (7)
    "topic.prefix": "fulfillment", (8)
    "table.include.list": "public.inventory" (9)

  }
}
1 连接器在 Kafka Connect 服务中注册时的名称。
2 此 PostgreSQL 连接器类的名称。
3 PostgreSQL 数据库服务器的 IP 地址。
4 PostgreSQL 数据库服务器的端口号。
5 用于连接 PostgreSQL 数据库服务器的 PostgreSQL 用户名。必需的权限
6 用于连接 PostgreSQL 数据库服务器的 PostgreSQL 用户的密码。必需的权限
7 要连接的 PostgreSQL 数据库的名称
8 PostgreSQL 服务器/集群的主题前缀,它构成了命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及使用 Avro 转换器时的相应 Avro 模式的命名空间。
9 此连接器将监视的此服务器承载的所有表的列表。这是可选的,还有其他用于包含或排除要监视的模式和表的属性。

请参阅 PostgreSQL 连接器属性的完整列表,这些属性可以在这些配置中指定。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个连接器任务,该任务执行以下操作:

  • 连接到 PostgreSQL 数据库。

  • 读取事务日志。

  • 将更改事件记录流式传输到 Kafka 主题。

添加连接器配置

要运行 Debezium PostgreSQL 连接器,请创建连接器配置,并将该配置添加到您的 Kafka Connect 集群。

先决条件
过程
  1. 创建 PostgreSQL 连接器的配置。

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

连接器启动后,它将对连接器配置的 PostgreSQL 服务器数据库执行一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

连接器属性

Debezium PostgreSQL 连接器具有许多配置属性,您可以使用这些属性来实现适合您应用程序的正确连接器行为。许多属性都有默认值。有关属性的信息组织如下:

必需的 Debezium PostgreSQL 连接器配置属性

以下配置属性是必需的,除非有默认值可用。

表 31. 必需的连接器配置属性
属性 Default (默认值) 描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名。对于 PostgreSQL 连接器,请始终使用 io.debezium.connector.postgresql.PostgresConnector 的值。

1

应为该连接器创建的最大任务数。PostgreSQL 连接器始终使用单个任务,因此不使用此值,所以默认值始终是可以接受的。

decoderbufs

安装在 PostgreSQL 服务器上的 PostgreSQL 逻辑解码插件的名称。

支持的值为 decoderbufspgoutput

debezium

已为从特定插件为特定数据库/模式流式传输更改而创建的 PostgreSQL 逻辑解码槽的名称。服务器使用此槽将事件流式传输到您正在配置的 Debezium 连接器。

槽名称必须符合 PostgreSQL 复制槽命名规则,该规则规定:“每个复制槽都有一个名称,该名称可以包含小写字母、数字和下划线字符。”

false

当连接器以正常、预期的方式停止时,是否删除逻辑复制槽。默认行为是复制槽在连接器停止时仍为连接器配置。当连接器重新启动时,拥有相同的复制槽使连接器能够从上次停止的地方继续处理。

仅在测试或开发环境中设置为 true。删除槽允许数据库丢弃 WAL 段。当连接器重新启动时,它将执行新的快照,或者它可以从 Kafka Connect 偏移量主题中的持久偏移量继续。

false

指定连接器是否创建故障转移槽。如果省略此设置,或者主服务器运行 PostgreSQL 16 或更早版本,则连接器不会创建故障转移槽。

PostgreSQL 使用 synchronized_standby_slots 参数来配置主服务器和备用服务器之间的复制槽同步。在此主服务器上设置此参数以指定它在备用服务器上同步的物理复制槽。

dbz_publication

使用 pgoutput 流式传输更改时创建的 PostgreSQL 发布的名称。

如果发布不存在,则在启动时创建此发布,并且它包含所有表。然后 Debezium 应用其自己的包含/排除列表过滤(如果已配置),以将发布限制为感兴趣的特定表的更改事件。连接器用户必须拥有创建此发布的超级用户权限,因此通常最好在第一次启动连接器之前创建发布。

如果发布已存在(无论是包含所有表还是配置了表子集),Debezium 将使用定义的发布。

无默认值

PostgreSQL 数据库服务器的 IP 地址或主机名。

5432

PostgreSQL 数据库服务器的整数端口号。

无默认值

用于连接 PostgreSQL 数据库服务器的 PostgreSQL 数据库用户名。

无默认值

连接 PostgreSQL 数据库服务器时使用的密码。

无默认值

要从中流式传输更改的 PostgreSQL 数据库的名称。

无默认值

主题前缀,为 Debezium 正在捕获更改的特定 PostgreSQL 数据库服务器或集群提供命名空间。此前缀应在所有其他连接器中是唯一的,因为它被用作从此连接器接收记录的所有 Kafka 主题的名称前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。

不要更改此属性的值。如果您更改名称值,在重新启动后,连接器将不会继续将事件发出到原始主题,而是将后续事件发出到名称基于新值的名称。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您想要捕获其更改的模式的名称。不在 schema.include.list 中的任何模式名称都将被排除,不捕获其更改。默认情况下,捕获所有非系统模式的更改。

为了匹配模式名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与模式的整个标识符进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果您在此配置中包含此属性,则不要同时设置 schema.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您想捕获更改的模式名称。名称未包含在 schema.exclude.list 中的任何模式都会捕获更改,系统模式除外。

为了匹配模式名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与模式的整个标识符进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果您在此配置中包含此属性,请不要设置 schema.include.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您想要捕获其更改的表的完全限定表标识符。当设置此属性时,连接器仅从指定的表中捕获更改。每个标识符的格式为 schemaName.tableName。默认情况下,连接器捕获每个模式中每个正在捕获更改的非系统表中的更改。

为了匹配表名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与表的整个标识符进行匹配;它不会匹配可能存在于表名称中的子字符串。
如果您在配置中包含此属性,请不要同时设置 table.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您不想要捕获其更改的表的完全限定表标识符。每个标识符的格式为 schemaName.tableName。当设置此属性时,连接器将捕获您未指定的每个表的更改。

为了匹配表名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与表的整个标识符进行匹配;它不会匹配可能存在于表名称中的子字符串。
如果在此配置中包含此属性,请不要设置 table.include.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配应包含在更改事件记录值中的列的完全限定名称。列的完全限定名称的格式为 schemaName.tableName.columnName

为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,该表达式用于匹配列的整个名称字符串;它不会匹配可能存在于列名称中的子字符串。
如果您在此配置中包含此属性,则不要同时设置 column.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配应从更改事件记录值中排除的列的完全限定名称。列的完全限定名称的格式为 schemaName.tableName.columnName

为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,该表达式用于匹配列的整个名称字符串;它不会匹配可能存在于列名称中的子字符串。
如果您在此配置中包含此属性,请不要设置 column.include.list 属性。

false

指定当包含的列没有更改时是否跳过发布消息。这实际上会过滤掉没有包含列更改的消息(根据 column.include.listcolumn.exclude.list 属性)。

此属性仅在表的 REPLICA IDENTITY 设置为 FULL 时应用。

adaptive

时间、日期和时间戳可以用不同类型的精度来表示:

adaptive 使用 «毫秒»、«微秒» 或 «纳秒» 精度值(基于数据库 «列» 的 «类型»)精确地捕获数据库中的 «时间» 和 «时间戳» 值。

adaptive_time_microseconds 能够精确捕获数据库中的日期、datetime 和时间戳值,具体精度(毫秒、微秒或纳秒)取决于数据库列的类型。一个例外是 TIME 类型字段,它们始终以微秒为单位捕获。

connect 始终使用 Kafka Connect 内置的 TimeDateTimestamp 表示法来表示时间和时间戳值,无论数据库列的精度如何,它们都使用毫秒精度。有关更多信息,请参阅 时间值

precise

指定连接器如何处理 DECIMALNUMERIC 列的值。

precise 使用 java.math.BigDecimal 在更改事件中以二进制形式表示值。

double 使用 double 值表示值,这可能会导致精度损失,但使用起来更方便。

string 将值编码为格式化的字符串,易于消费,但实际类型的语义信息会丢失。有关更多信息,请参阅 Decimal 类型

json

指定连接器如何处理 hstore 列的值。

map 使用 MAP 表示值。

json 使用 json string 表示值。此设置将值编码为格式化的字符串,例如 {"key" : "val"}。有关更多信息,请参阅 PostgreSQL HSTORE 类型

numeric

指定连接器如何处理 interval 列的值。

numeric 使用大约微秒数表示间隔。

string 使用字符串模式表示法 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 精确表示间隔。例如:P1Y2M3DT4H5M6.78S。有关更多信息,请参阅 PostgreSQL 基本类型

prefer

是否使用加密连接到 PostgreSQL 服务器。选项包括:

disable 使用未加密的连接。

allow 尝试先使用未加密的连接,如果失败,则使用安全的(加密的)连接。

prefer 尝试先使用安全的(加密的)连接,如果失败,则使用未加密的连接。

require 使用安全的(加密的)连接,如果无法建立连接则失败。

verify-ca 的行为类似于 require,但它还会根据配置的证书颁发机构 (CA) 证书验证服务器 TLS 证书,如果找不到匹配的有效 CA 证书则失败。

verify-full 的行为类似于 verify-ca,但它还会验证服务器证书是否与连接器正在尝试连接的主机匹配。有关更多信息,请参阅 PostgreSQL 文档

无默认值

包含客户端 SSL 证书的文件的路径。有关更多信息,请参阅 PostgreSQL 文档

无默认值

包含客户端 SSL 私钥的文件的路径。有关更多信息,请参阅 PostgreSQL 文档

无默认值

用于访问 database.sslkey 指定文件中的客户端私钥的密码。有关更多信息,请参阅 PostgreSQL 文档

无默认值

用于验证服务器的根证书文件的路径。有关更多信息,请参阅 PostgreSQL 文档

无默认值

用于创建 SSL 套接字类的名称。在开发环境中使用 org.postgresql.ssl.NonValidatingFactory 来禁用 SSL 验证。

true

启用 TCP keep-alive 探测以验证数据库连接是否仍然有效。有关更多信息,请参阅 PostgreSQL 文档

true

控制是否在*delete* 事件后跟一个墓碑事件。

true - 删除操作表示为*delete* 事件和随后的墓碑事件。

false - 只发出*delete* 事件。

在源记录被删除后,发出墓碑事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的任何事件,前提是日志压缩已为该主题启用。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 length 设置为正整数值,例如 column.truncate.to.20.chars

完全限定的列名遵循以下格式:<schemaName>.<tableName>.<columnName>。要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;表达式不匹配可能存在于列名称中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望连接器掩盖一组列的值,请设置此属性(例如,如果它们包含敏感数据)。将 length 设置为正整数,以将指定列中的数据替换为属性名称中由 length 指定的星号(*)字符的数量。将 length 设置为 0(零)以将指定列中的数据替换为空字符串。

«列» 的 «完全限定» 名称遵循以下格式:schemaName.tableName.columnName。为了匹配 «列» 的名称,Debezium 将您指定的 «正则表达式» 应用为*锚定的* «正则表达式»。也就是说,指定的表达式会针对 «列» 的整个名称字符串进行匹配;该 «表达式» «不会» 匹配可能存在于 «列» 名称中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配字符型列的完全限定名称。列的完全限定名称的格式为 <schemaName>.<tableName>.<columnName>
为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。在生成的更改事件记录中,指定列的值将被替换为假名。

假名由应用指定的hashAlgorithmsalt 产生的哈希值组成。根据使用的哈希函数,可以维护引用完整性,同时用假名替换列值。支持的哈希函数在 Java Cryptography Architecture 标准算法名称文档的MessageDigest 部分中进行了描述。

在下面的示例中,CzQMA0cB5K 是随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如果需要,假名会自动截断为列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。

取决于使用的hashAlgorithm、选择的salt 和实际数据集,生成的数据集可能无法完全隐藏。

应使用哈希策略版本 2 来确保在不同位置或系统上对值进行哈希处理时的保真度。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称。设置此属性时,连接器会将以下字段添加到事件记录的模式中:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

«列» 的 «完全限定» 名称遵循以下格式之一:databaseName.tableName.columnName,或 databaseName.schemaName.tableName.columnName
为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

n/a

一个可选的、逗号分隔的 «正则表达式» 列表,用于指定数据库中 «列» 定义的 «数据类型» 的«完全限定»名称。设置此属性时,对于具有匹配 «数据类型» 的 «列»,连接器会发出包含以下额外字段的 «模式» 的事件记录:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

«列» 的 «完全限定» 名称遵循以下格式之一:databaseName.tableName.typeName,或 databaseName.schemaName.tableName.typeName
为了匹配 «数据类型» 的名称,Debezium 将您指定的 «正则表达式» 应用为*锚定的* «正则表达式»。也就是说,指定的表达式会针对 «数据类型» 的整个名称字符串进行匹配;该 «表达式» «不会» 匹配可能存在于 «类型» 名称中的子字符串。

PostgreSQL 特定的数据类型名称列表,请参阅 PostgreSQL 数据类型映射

空字符串

«表达式» 列表,用于指定连接器用来为 «发布» 到指定表 «Kafka» «主题» 的更改事件 «记录» «形成» «自定义» «消息» «键» 的 «列»。

默认情况下,Debezium 使用表的 «主键» «列» 作为它发出的 «记录» 的 «消息» «键»。 «代替» 默认值,或为 «缺少» «主键» 的表指定 «键»,您可以根据一个或多个 «列» «配置» «自定义» «消息» «键»。

要为表建立自定义消息键,请列出该表,然后列出用作消息键的列。每个列表条目采用以下格式:

<fully-qualified_tableName>:<keyColumn>,<keyColumn>

要基于多个列名设置表键,请在列名之间插入逗号。

每个完全限定的表名都是一个正则表达式,格式如下:

<schemaName>.<tableName>

该属性可以包含多个表的条目。使用分号分隔列表中的表条目。

以下示例设置了 inventory.customerspurchase.orders 表的消息键:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

在示例中,列 pk1pk2 被指定为 inventory.customer 表的消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 作为消息键。

用于创建自定义消息键的列数没有限制。但是,最好使用唯一键所需的最小数量。

如果为此属性指定的表达式匹配不是表主键一部分的列,请将表的 REPLICA IDENTITY 设置为 FULL。如果您将 REPLICA IDENTITY 设置为其他值(例如 DEFAULT),在删除操作后,连接器将无法生成具有预期 null 值的 tombstone 事件。

all_tables

指定连接器是否以及如何创建发布。此设置仅在连接器使用pgoutput 插件流式传输更改时适用。

要创建发布,连接器必须通过具有特定权限的数据库帐户访问 PostgreSQL。有关更多信息,请参阅 设置权限以允许 Debezium 创建 PostgreSQL 发布

指定以下值之一:

all_tables

如果发布已存在,连接器将使用它。
如果发布不存在,连接器将通过运行以下 SQL 命令为连接器捕获更改的数据库中的所有表创建发布:

CREATE PUBLICATION <publication_name> FOR ALL TABLES;

disabled

连接器不会尝试创建发布。数据库管理员或配置为执行复制的用户必须在运行连接器之前创建发布。如果连接器找不到发布,它将抛出异常并停止。

filtered

如果发布不存在,连接器将通过运行以下格式的 SQL 命令创建一个发布:

CREATE PUBLICATION <publication_name> FOR TABLE <tbl1, tbl2, tbl3>
生成的发布将包括与当前过滤器配置匹配的表,如 schema.include.listschema.exclude.listtable.include.listtable.exclude.list 连接器配置属性所指定的那样。
如果发布已存在,连接器将通过运行以下格式的 SQL 命令来更新匹配当前过滤器配置的表的发布:

ALTER PUBLICATION <publication_name> SET TABLE <tbl1, tbl2, tbl3>.

no_tables

如果发布已存在,连接器将使用它。如果发布不存在,连接器将通过运行以下格式的 SQL 命令创建一个不指定任何表的发布:

CREATE PUBLICATION <publication_name>;

如果希望连接器仅捕获逻辑解码消息,而不捕获任何其他更改事件(例如,由任何表的 INSERTUPDATEDELETE 操作引起的事件),请选择 no_tables 选项。

如果选择此选项,为防止连接器发出和处理 READ 事件,您可以指定不希望捕获更改的模式或表的名称,例如,使用 "table.exclude.list": "public.*""schema.exclude.list": "public"

空字符串

设置此属性以根据表名将特定的副本标识设置应用于连接器捕获的表的子集。此属性设置的副本标识值将覆盖数据库中设置的副本标识值。

该属性接受逗号分隔的键值对列表。每个键都是一个匹配完全限定表名的正则表达式;相应的值指定一个副本标识类型。例如:

<fqTableNameA>:<replicaIdentity1>,<fqTableNameB>:<replicaIdentity2>,<fqTableNameC>:<replicaIdentity3>

使用以下格式指定完全限定表名:
SchemaName.TableName

将副本标识设置为以下值之一:

DEFAULT

记录更改事件之前为主键列设置的值(如果存在)。这是非系统表的默认设置。

INDEX indexName

记录更改事件之前为指定索引的所有列设置的值。该索引必须是唯一的、非部分的、非可延迟的,并且只能包含标记为 NOT NULL 的列。如果指定的索引被删除,结果行为与将值设置为 NOTHING 相同。

FULL

记录更改事件之前行中所有列的值。

NOTHING

不记录更改事件之前行状态的任何信息。这是系统表的默认值。

示例
schema1.*:FULL,schema2.table2:NOTHING,schema2.table3:INDEX idx_name

replica.identity.autoset.values 属性仅适用于连接器捕获的表。其他表将被忽略,即使它们匹配指定的表达式。使用以下连接器属性来指定要捕获的表:

bytes

指定二进制(bytea)列在更改事件中如何表示。指定以下值之一:

bytes

将二进制数据表示为字节数组。

base64

将二进制数据表示为 base64 编码的字符串。

base64-url-safe

将二进制数据表示为 base64-url-safe 编码的字符串。

hex

将二进制数据表示为十六进制编码(base16)字符串。

none

指定如何调整模式名称以与连接器使用的消息转换器兼容。设置以下值之一:

none

不进行任何调整。

avro

将 Avro 类型名称中不能使用的字符替换为下划线。

avro_unicode

将下划线或 Avro 类型名称中不能使用的字符替换为相应的 Unicode 字符,例如 _uxxxx

在前面的示例中,下划线字符(_)表示一个转义序列,等同于 Java 中的反斜杠。

none

指定如何调整字段名称以与连接器使用的消息转换器兼容。指定以下值之一:

none

不进行任何调整。

avro

将 Avro 类型名称中不能使用的字符替换为下划线。

avro_unicode

将下划线或 Avro 类型名称中不能使用的字符替换为相应的 Unicode 字符,例如 _uxxxx

在前面的示例中,下划线字符(_)表示一个转义序列,等同于 Java 中的反斜杠。

有关更多信息,请参阅 Avro 命名

2

指定在将 Postgres money 类型转换为 java.math.BigDecimal 时使用的小数位数,它在更改事件中表示值。仅当 decimal.handling.mode 设置为 precise 时适用。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器捕获的逻辑解码消息前缀的名称。默认情况下,连接器捕获所有逻辑解码消息。当设置此属性时,连接器仅捕获具有属性指定的名称的消息前缀的逻辑解码消息。所有其他逻辑解码消息将被排除。

为了匹配消息名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与消息前缀的整个字符串进行匹配;该表达式不会匹配可能存在于前缀中的子字符串。

如果在此配置中包含此属性,请不要同时设置 message.prefix.exclude.list 属性。

有关消息事件的结构和排序语义的信息,请参阅 消息事件

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您不希望连接器捕获的逻辑解码消息前缀的名称。当设置此属性时,连接器不会捕获使用指定前缀的逻辑解码消息。所有其他消息都将被捕获。
要排除所有逻辑解码消息,请将此属性的值设置为 .*

为了匹配消息名称,Debezium 将您指定的正则表达式作为锚定正则表达式应用。也就是说,指定的表达式将与消息前缀的整个字符串进行匹配;该表达式不会匹配可能存在于前缀中的子字符串。

如果在此配置中包含此属性,请不要同时设置 message.prefix.include.list 属性。

有关消息事件的结构和排序语义的信息,请参阅 消息事件

高级 Debezium PostgreSQL 连接器配置属性

以下*高级*配置属性的默认值在大多数情况下都适用,因此很少需要在连接器的配置中指定。

表 32. 高级连接器配置属性
属性 Default (默认值) 描述

无默认值

枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:

isbn

您必须设置 converters 属性才能启用连接器使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

isbn.type: io.debezium.test.IsbnConverter

如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数将值传递给转换器。要将任何额外的配置参数与转换器关联,请在参数名称前加上转换器的 «符号名称»。
For example, (例如,)

isbn.schema.name: io.debezium.postgresql.type.Isbn

serializable

指定连接器在执行初始快照或临时阻塞快照时读取数据时应用的事务隔离级别和锁定类型(如果存在)。

每个隔离级别在优化并发性和性能与最大化数据一致性和准确性之间取得不同的平衡。使用更严格隔离级别的快照可获得更高质量、更一致的数据,但代价是由于更长的锁定时间和更少的并发事务而导致性能下降。限制性较低的隔离级别可以提高效率,但会牺牲数据的一致性。有关 PostgreSQL 中事务隔离级别的更多信息,请参阅 PostgreSQL 文档

指定以下隔离级别之一:

serializable

默认且最严格的隔离级别。此选项可防止序列化异常,并提供最高程度的数据完整性。

为了确保捕获表的数据库一致性,快照在事务中运行,该事务使用可重复读取隔离级别,阻止对表的并发 DDL 更改,并锁定数据库以创建索引。设置此选项后,用户或管理员无法执行某些操作,例如创建表索引,直到快照完成。表键的整个范围在快照完成之前保持锁定。此选项匹配在引入此属性之前连接器中可用的快照行为。

repeatable_read

阻止其他事务在快照期间更新表行。快照捕获的新记录可能会出现两次;首先,作为初始快照的一部分,然后再次作为流式传输阶段的一部分。但是,此一致性级别对于数据库镜像来说是可容忍的。确保正在扫描的表之间的一致性,并阻止对所选表的 DDL,以及整个数据库的并发索引创建。允许序列化异常。

read_committed

在 PostgreSQL 中,Read Uncommitted 和 Read Committed 隔离模式的行为没有区别。因此,对于此属性,read_committed 选项实际上提供了最低级别的隔离。设置此选项会牺牲初始和临时阻塞快照的一些一致性,但会提高快照期间其他用户的数据库性能。

通常,此事务一致性级别适用于数据库镜像。其他事务在快照期间无法更新表行。但是,当记录在初始快照期间添加,并且连接器在流式传输阶段开始后再次捕获该记录时,可能会出现轻微的数据不一致。

read_uncommitted

名义上,此选项提供了最低级别的隔离。然而,如 read-committed 选项的说明中所述,对于 Debezium PostgreSQL 连接器,此选项提供与 read_committed 选项相同的隔离级别。

initial (初始)

指定连接器启动时执行快照的 «标准»。

always (始终)

The connector performs a snapshot every time that it starts. The snapshot includes the structure and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables every time that the connector starts. After the snapshot completes, the connector begins to stream event records for subsequent database changes. (连接器每次启动时都会执行快照。快照包括捕获表的结构和数据。每次连接器启动时,通过指定此值来用捕获表数据的完整表示填充主题。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。)

initial (初始)

仅当未记录逻辑服务器名称的偏移量时,连接器才执行快照。

initial_only (仅初始)

连接器执行初始快照然后停止,不处理任何后续更改。

no_data (无数据)

连接器从不执行快照。当连接器配置为这样时,它启动后会执行以下操作:

如果 Kafka 偏移量主题中存在先前存储的 LSN,则连接器将从该位置继续流式传输更改。如果未存储 LSN,则连接器将从服务器上创建 PostgreSQL 逻辑复制槽的时间点开始流式传输更改。仅当您知道所有感兴趣的数据仍反映在 WAL 中时,才使用此快照模式。

never (从不)

已弃用,请参阅 no_data

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

使用此选项,您可以通过一组以 'snapshot.mode.configuration.based' 前缀开头的连接器属性来控制快照行为。

custom (自定义)

连接器根据 snapshot.mode.custom.name 属性指定的实现执行快照,该属性定义了 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。

有关更多信息,请参阅 snapshot.mode 选项表

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表数据。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表模式。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定快照完成后连接器是否开始流式传输更改事件。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定在模式历史记录主题不可用时,连接器是否在快照中包含表模式。

false

如果 snapshot.mode «设置» «为» configuration_based,「此» «属性» «指定» «是否» «在» «事务» «日志» «中» «未» «找到» «最后» «提交» «的» «偏移量» «时»,«连接器» «尝试» «快照» «表» «数据»。
将 «值» «设置» «为» true «以» «指示» «连接器» «执行» «新» «快照»。

无默认值

snapshot.mode 设置为 custom 时,使用此设置来指定 'io.debezium.spi.snapshot.Snapshotter' 接口定义的 name() 方法中提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

none

指定连接器在执行模式快照时如何锁定表。
设置以下选项之一:

shared

连接器在数据库模式和其他元数据被读取的快照的初始阶段持有表锁,以阻止对表的独占访问。初始阶段后,快照不再需要表锁。

none

连接器完全避免锁定。

如果在快照期间可能发生模式更改,请不要使用此模式。

custom (自定义)

连接器根据 snapshot.locking.mode.custom.name 属性指定的实现执行快照,该属性是 io.debezium.spi.snapshot.SnapshotLock 接口的自定义实现。

无默认值

snapshot.locking.mode 设置为 custom 时,使用此设置来指定 'io.debezium.spi.snapshot.SnapshotLock' 接口定义的 name() 方法中提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all

连接器«默认» «执行» «一个» select all «查询»,「并» «可选» «地» «根据» «列» «包含» «和» «排除» «列表» «配置» «调整» «所选» «的» «列»。

custom (自定义)

连接器根据 snapshot.query.mode.custom.name 属性指定的实现执行快照查询,该属性定义了 io.debezium.spi.snapshot.SnapshotQuery 接口的自定义实现。

与使用 snapshot.select.statement.overrides 属性相比,此设置使您能够更灵活地管理快照内容。

无默认值

snapshot.query.mode 设置为 custom 时,使用此设置来指定 'io.debezium.spi.snapshot.SnapshotQuery' 接口定义的 name() 方法中提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

table.include.list 中指定的所有表

一个可选的、逗号分隔的正则表达式列表,匹配要包含在快照中的表的完全限定名称(<schemaName>.<tableName>)。指定的项必须在连接器的 table.include.list 属性中命名。仅当连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才生效。
此属性不影响增量快照的行为。

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。

10000

一个正整数值,指定在执行快照时获取表锁的最大等待时间(以毫秒为单位)。如果在该时间间隔内连接器无法获取表锁,则快照将失败。连接器如何执行快照提供了详细信息。

无默认值

指定要在快照中包含的表行。如果您希望快照仅包含表中一部分行,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

此属性包含一个逗号分隔的完全限定表名列表,格式为 <schemaName>.<tableName>。例如:

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

对于列表中的每个表,添加一个额外的配置属性,该属性指定连接器在拍摄快照时在该表上运行的 SELECT 语句。指定的 SELECT 语句决定了要在快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides.<schemaName>.<tableName>。例如,snapshot.select.statement.overrides.customers.orders

示例

从包含软删除列 delete_flagcustomers.orders 表中,如果要快照仅包含未软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器仅包含 delete_flag = 0 的记录。

fail

指定连接器在处理事件时如何应对异常:

fail 传播异常,指示有问题事件的偏移量,并导致连接器停止。

warn 记录有问题事件的偏移量,跳过该事件,并继续处理。

skip 跳过有问题事件并继续处理。

2048

一个正整数值,指定连接器处理的每个事件批次的最大大小。

8192

一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列,然后将其写入 Kafka。当连接器摄取消息的速度快于写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。存储在队列中的事件在连接器定期记录偏移量时将被忽略。始终将 max.queue.size 的值设置得大于 max.batch.size 的值。

0

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果还设置了 max.queue.size,则当队列大小达到任一属性指定的限制时,将阻塞写入队列。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 条记录或队列中记录的卷达到 5000 字节后,将阻塞写入队列。

500

一个正整数值,指定连接器在开始处理一批事件之前等待新更改事件出现的毫秒数。默认为 500 毫秒。

false

指定连接器遇到数据类型未知的字段时的行为。默认行为是连接器将字段从更改事件中省略并记录警告。

如果希望更改事件包含字段的不透明二进制表示,请将此属性设置为 true。这允许使用者解码该字段。您可以通过设置 binary handling mode 属性来控制确切的表示。

include.unknown.datatypes 设置为 true 时,使用者存在向后兼容性问题的风险。不仅数据库特定的二进制表示在版本之间可能会更改,而且如果 Debezium 最终支持该数据类型,则该数据类型将以逻辑类型下游发送,这需要使用者进行调整。通常,当遇到不受支持的数据类型时,请创建一个功能请求,以便添加支持。

无默认值

一个分号分隔的 SQL 语句列表,连接器在建立与数据库的 JDBC 连接时执行这些语句。要将分号用作字符而不是分隔符,请指定两个连续的分号 ;;

连接器可以自行酌情建立 JDBC 连接。因此,此属性仅用于会话参数的配置,而不是用于执行 DML 语句。

连接器在创建读取事务日志的连接时不会执行这些语句。

10000

向服务器发送复制连接状态更新的频率,以毫秒为单位。
此属性还控制检查数据库状态以检测死连接的频率,以防数据库被关闭。

0

控制连接器向 Kafka 主题发送心跳消息的频率。默认行为是连接器不发送心跳消息。

心跳消息对于监视连接器是否正在接收数据库的更改事件很有用。心跳消息可能有助于减少连接器重新启动时需要重新发送的更改事件的数量。要发送心跳消息,请将此属性设置为正整数,表示心跳消息之间的毫秒数。

当正在跟踪的数据库中有大量更新,但只有极少数更新与连接器正在捕获更改的表和模式相关时,需要心跳消息。在这种情况下,连接器像往常一样从数据库事务日志读取,但很少将更改记录发出到 Kafka。这意味着没有偏移更新被提交到 Kafka,连接器也没有机会将最新检索到的 LSN 发送回数据库。数据库保留包含连接器已处理事件的 WAL 文件。发送心跳消息使连接器能够将最新检索到的 LSN 发送回数据库,从而允许数据库回收不再需要的 WAL 文件所占用的磁盘空间。

无默认值

指定连接器在发送心跳消息时在源数据库上执行的查询。

这对于解决WAL 磁盘空间消耗中所述的情况很有用,其中捕获同一主机上的低流量数据库与高流量数据库的更改会阻止 Debezium 处理 WAL 记录,从而无法向数据库确认 WAL 位置。要解决此情况,请在低流量数据库中创建一个心跳表,并将此属性设置为一个向该表插入记录的语句,例如

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

这允许连接器接收来自低流量数据库的更改并确认其 LSN,从而防止数据库主机上的 WAL 无限增长。

columns_diff

指定触发表内存中模式刷新的条件。

columns_diff 是最安全模式。它始终确保内存中的模式与数据库表中的模式保持同步。

columns_diff_exclude_unchanged_toast 指示连接器在内存中的模式与从传入消息派生的模式存在差异时刷新内存中的模式缓存,除非未更改的 TOASTable 数据完全解释了差异。

如果存在频繁更新但 TOASTed 数据很少成为更新一部分的表,此设置可以显著提高连接器性能。但是,如果从表中删除了 TOASTable 列,则内存中的模式可能会过时。

无默认值

连接器启动时执行快照之前应等待的毫秒数间隔。如果要启动集群中的多个连接器,此属性有助于避免快照中断,这可能会导致连接器重新平衡。

0

指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 offset.flush.interval.ms 属性值的延迟值。

10240

在快照期间,连接器以行批次的形式读取表内容。此属性指定批次中的最大行数。

无默认值

用于传递给已配置的逻辑解码插件的参数,以分号分隔。例如,add-tables=public.table,public.table2;include-lsn=true

6

如果连接到复制槽失败,这是连续连接的最大尝试次数。

10000 (10 秒)

当连接器无法连接到复制槽时,重试尝试之间的等待毫秒数。

__debezium_unavailable_value

指定连接器提供的常量,以指示原始值是数据库未提供的 toasted 值。如果 unavailable.value.placeholder 的设置以 hex: 前缀开头,则期望字符串的其余部分代表十六进制编码的字节。有关更多信息,请参阅toasted 值

false

确定连接器是否生成带有事务边界的事件,并使用事务元数据丰富更改事件信封。如果要连接器执行此操作,请指定 true。有关更多信息,请参阅事务元数据

false

指定连接器如何捕获和发出它从分区表捕获的更改事件。此设置仅在 publication.autocreate.mode 属性设置为 all_tablesfiltered 并且 Debezium 为捕获的表创建发布时适用。

设置以下选项之一:

true

连接器将所有分区的更改事件发出到以基础表名称命名的主题。
当连接器创建发布时,它会提交一个 CREATE PUBLICATION 语句,其中 publish_via_partition_root 参数设置为 true。结果,发布忽略更改源所在的分区,只记录源表的名称。

false

连接器将每个源分区的更改事件发出到一个反映分区名称的主题。
当连接器创建发布时,CREATE PUBLICATION 语句会省略 publish_via_partition_root 参数,以便发布始终使用源分区的名称来发布更改事件。

true

确定连接器是否应在源 PostgreSQL 数据库中提交已处理记录的 LSN,以便可以删除 WAL 日志。如果您不希望连接器提交已处理记录的 LSN,请指定 false

如果此属性的值设置为 false,Debezium 将不会确认 LSN。未能确认 LSN 可能导致 WAL 日志不受控制地增长,从而对存储容量造成压力,并可能导致性能下降,甚至数据丢失。为了维持正常服务,如果您将此属性设置为 false,则必须配置其他机制来提交 LSN。

10000 (10 秒)

在发生可重试错误后重启连接器的等待时间(以毫秒为单位)。

t

一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:

  • c (insert/create)

  • u (update)

  • d (delete)

  • t (truncate)

如果不想让连接器跳过任何操作,请将值设置为 none

无默认值

用于向连接器发送信号的数据集合的完全限定名称。
使用以下格式指定集合名称
<schemaName>.<tableName>

source (源)

为连接器启用的信号通道名称列表。默认情况下,以下通道可用:

无默认值

为连接器启用的通知通道名称列表。默认情况下,以下通道可用:

1024

在增量快照块期间,连接器获取并加载到内存中的最大行数。增加块大小可以提高效率,因为快照运行的快照查询次数较少但规模更大。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为能为您的环境提供最佳性能的值。

insert_insert

指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
您可以指定以下选项之一:

insert_insert

当您发送信号以启动增量快照时,对于 Debezium 在快照期间读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,Debezium 会插入第二个条目来记录窗口的关闭。

insert_delete

当您发送信号以启动增量快照时,对于 Debezium 读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,此条目将被删除。不会为关闭快照窗口的信号创建条目。设置此选项可防止信号数据集合快速增长。

false

指定连接器是否将水印写入信号数据集合以跟踪增量快照的进度。将值设置为 true 可使具有数据库只读连接的连接器使用不需要写入信号数据集合的增量快照水印策略。

0

以毫秒为单位,XMIN 将从复制槽中读取的频率。XMIN 值提供了新复制槽可以开始的下界。默认值 0 禁用跟踪 XMIN 跟踪。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

.

指定主题名称的分隔符,默认为 .

10000

用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。

__debezium-heartbeat

控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

transaction

控制连接器发送事务元数据消息的主题的名称。主题名称的模式为:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

1

指定连接器在执行初始快照时使用的线程数。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会并发处理多个表。

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成工作。如果一个表的快照比其他表的快照花费的时间显著长,则已完成工作的线程将处于空闲状态。在某些环境中,网络设备(如负载均衡器或防火墙)会终止长时间空闲的连接。快照完成后,连接器将无法关闭连接,从而导致异常和快照不完整,即使在连接器成功传输了所有快照数据的情况下也是如此。

如果您遇到此问题,请将 snapshot.max.threads 的值恢复为 1,然后重试快照。

无默认值

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如:
k1=v1,k2=v2

连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

600000 (10 分钟)

指定连接器等待查询完成的时间(以毫秒为单位)。将值设置为 0(零)以移除超时限制。

0

指定连接器可以捕获的最大表数。超过此限制将触发 guardrail.collections.limit.action 指定的操作。将此属性设置为 0 可防止连接器触发保护措施。

warn

指定如果连接器捕获的表数超过 guardrail.collections.max 属性中指定的数量时触发的操作。将属性设置为以下值之一:

fail

连接器失败并报告异常。

warn

连接器记录警告。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

PostgreSQL 连接器直通配置属性

该连接器支持直通式属性,使 Debezium 能够为微调 Apache Kafka 生产者和消费者行为指定自定义配置选项。有关 Kafka 生产者和消费者配置属性的完整范围,请参阅 Kafka 文档

用于配置 PostgreSQL 连接器如何与 Kafka 信号主题交互的直通属性

Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了 Kafka signal 属性。

表 33. Kafka 信号配置属性
属性 Default (默认值) 描述

<topic.prefix>-signal

连接器监视的用于临时信号的 Kafka 主题的名称。

如果自动主题创建被禁用,则必须手动创建所需的信号主题。信号主题对于保留信号顺序是必需的。信号主题必须只有一个分区。

kafka-signal

Kafka 消费者使用的组 ID 的名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

100

一个整数值,指定连接器在轮询信号时等待的最大毫秒数。

用于配置信号通道的 Kafka 消费者客户端的直通属性

Debezium 为信号 Kafka 消费者的传递配置提供了支持。传递信号属性以 signal.consumer.* 前缀开头。例如,连接器会将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给 Kafka 信号消费者。

用于配置 PostgreSQL 连接器接收通知通道的直通属性

下表描述了可用于配置 Debezium sink notification 通道的属性。

表 34. 接收通知配置属性
属性 Default (默认值) 描述

无默认值

接收来自 Debezium 的通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为启用的通知通道之一时,此属性是必需的。

Debezium 连接器直通数据库驱动程序配置属性

Debezium 连接器支持数据库驱动程序的传递配置。传递数据库属性以 driver.* 前缀开头。例如,连接器会将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给数据库驱动程序。

监控

Debezium PostgreSQL 连接器除了 Kafka 和 Kafka Connect 提供的内置 JMX 指标支持之外,还提供两种类型的指标。

  • 快照指标 提供有关连接器在执行快照时的操作信息。

  • 流式指标 提供有关连接器在捕获更改和流式传输更改事件记录时的操作信息。

Debezium 监控文档 提供了有关如何使用 JMX 公开这些指标的详细信息。

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 3. 自定义标签如何修改连接器 MBean 名称

默认情况下,PostgreSQL 连接器对流式指标使用以下 MBean 名称

debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.postgresql:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

快照指标

MBeandebezium.postgres:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

Attributes Type 描述

string

连接器已读取的最后一个快照事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 0

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

int

包含在快照中的表总数。

int

快照尚未复制的表数。

boolean

快照是否已启动。

boolean

快照是否已暂停。

boolean

快照是否被中止。

boolean

快照是否已完成。

boolean

快照是否被跳过。

long

快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。

long

快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

long

队列的最大缓冲区(字节)。如果 max.queue.size.in.bytes 设置为正长整型值,则此指标可用。

long

队列中记录的当前字节卷。

当执行增量快照时,连接器还提供以下附加快照指标:

Attributes Type 描述

string

当前快照块的标识符。

string

定义当前块的主键集的下限。

string

定义当前块的主键集的上限。

string

当前快照表的组成键集(主键)的下限。

string

当前快照表的组成键集(主键)的上限。

流式指标

MBeandebezium.postgres:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

Attributes Type 描述

string

连接器已读取的最后一个流式传输事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 0

long

自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。

long

自上次启动或重置指标以来,连接器处理的总创建事件数。

long

自上次启动或重置指标以来,连接器处理的总更新事件数。

long

自上次启动或重置指标以来,连接器处理的总删除事件数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

Map<String, String>

收到的最后一个事件的坐标。

string

已处理的最后一个事务的事务标识符。

long

队列的最大缓冲区(字节)。如果 max.queue.size.in.bytes 设置为正长整型值,则此指标可用。

long

队列中记录的当前字节卷。

出现问题时的行为

Debezium 是一个分布式系统,它捕获多个上游数据库的所有更改;它从不遗漏或丢失事件。当系统正常运行或被仔细管理时,Debezium 会为每个更改事件记录提供精确一次的传递。

如果发生故障,系统不会丢失任何事件。但是,在从故障中恢复时,连接器可能会发出一些重复的更改事件。在这些异常情况下,Debezium 像 Kafka 一样提供至少一次的更改事件传递。

本节的其余部分描述了 Debezium 如何处理各种类型的故障和问题。

配置和启动错误

在以下情况下,连接器在尝试启动时会失败,并在日志中报告错误/异常,然后停止运行:

  • 连接器的配置无效。

  • 连接器无法使用指定的连接参数成功连接到 PostgreSQL。

  • 连接器正在从 PostgreSQL WAL 中先前记录的位置(使用 LSN)重新启动,而 PostgreSQL 不再具有该历史记录。

在这些情况下,错误消息包含有关问题的详细信息,以及可能的建议的解决方法。在更正配置或解决 PostgreSQL 问题后,重新启动连接器。

PostgreSQL 变得不可用

当连接器运行时,它连接的 PostgreSQL 服务器可能出于任何原因而变得不可用。如果发生这种情况,连接器会报错并停止。当服务器再次可用时,重新启动连接器。

PostgreSQL 连接器将最后一个已处理的偏移量外部存储为 PostgreSQL LSN 的形式。在连接器重新启动并连接到服务器实例后,连接器会与服务器通信,以从该特定偏移量继续流式传输。只要 Debezium 复制槽保持完好,此偏移量就可用。切勿在主服务器上删除复制槽,否则将丢失数据。有关槽已被删除的故障情况的信息,请参阅下一节。

集群故障

PostgreSQL 15 或更早版本

在 PostgreSQL 15 或更早版本的集群中,您只能在主服务器上创建逻辑复制槽。因此,在 PostgreSQL 15 环境中,Debezium PostgreSQL 连接器只能从集群中活动的服务器捕获事件。在 PostgreSQL 15 集群中,主节点上的复制槽不会传播到副本服务器。如果主服务器发生故障,您必须提升一个备用节点为主。

PostgreSQL 16 或更高版本

当您将 Debezium 与 PostgreSQL 16 或更高版本一起使用时,您可以在副本上创建逻辑复制槽,但您必须手动同步副本上的复制槽与主服务器上的相应槽。副本槽的同步不是自动的。

PostgreSQL 17 或更高版本

当您将 Debezium 与 PostgreSQL 17 或更高版本一起使用时,您可以为自动故障转移配置主服务器上的复制槽,以便 Debezium 不会错过任何更改事件。当复制槽配置为故障转移时,PostgreSQL 会自动将复制槽从主服务器同步到副本,从而使 Debezium 在副本被提升并成为新主服务器后能够继续从该槽读取。

一些托管的 PostgreSQL 服务(例如 AWS RDS 和 GCP CloudSQL)使用磁盘复制来实现到备用服务器的复制。因此,这些服务会自动复制复制槽,以便在故障转移后可用。

新的主服务器必须安装逻辑解码插件,并且有一个复制槽已配置为该插件和您要捕获更改的数据库使用。只有这样,您才能将连接器指向新服务器并重新启动连接器。

从 PostgreSQL 17 集群故障中恢复

运行 PostgreSQL 17 或更高版本的环境支持使用故障转移复制槽。如果在 PostgreSQL 17 或更高版本的集群中发生故障,并且备用服务器配置了故障转移复制槽,请完成以下步骤以使 Debezium 能够恢复捕获。

  1. 暂停 Debezium,直到您可以验证您拥有一个完好的复制槽,并且没有丢失数据。

  2. 在允许应用程序写入*新*主服务器之前,重新创建 Debezium 复制槽。如果您在重新创建复制槽之前允许应用程序写入新主服务器,您的应用程序可能会错过更改事件。

  3. Restart the connector. (重新启动连接器。)

  4. 验证 Debezium 是否可以从复制槽读取 LSN,以获取原始主服务器故障之前发生的任何更改。
    例如,恢复故障主服务器故障前不久的备份,并确定槽中记录的最后一个位置。虽然检索备份数据在管理上可能很困难,但检查备份提供了一种可靠的方法来确定 Debezium 是否已消耗所有更改。

在 PostgreSQL 15 或更早版本集群故障后从新主服务器捕获数据

在 PostgreSQL 15 或更早版本集群中的主服务器发生故障后,您可能决定将 Debezium 配置为从前备用服务器之一捕获数据,而不是从原始主服务器捕获数据。要使 Debezium 能够从前备用服务器捕获数据,请完成以下过程。

过程
  1. 修复导致集群发生故障的条件。

  2. 在连接器停止运行时,更新连接器配置中的属性值以反映新服务器的详细信息。例如,验证配置是否包含以下属性的正确值

  3. 通过完成以下任务,将新主服务器配置为与 Debezium 一起工作

  4. 将备用 PostgreSQL 节点提升为primary

  5. Restart the connector. (重新启动连接器。)

  6. 将快照模式设置为always,并在新主服务器上执行快照以捕获服务器上的数据初始状态并确保不丢失任何数据。

Kafka Connect 进程正常停止

假设 Kafka Connect 正在分布式模式下运行,并且一个 Kafka Connect 进程正常停止。在停止该进程之前,Kafka Connect 会将该进程的连接器任务迁移到该组中的另一个 Kafka Connect 进程。新的连接器任务将从先前任务停止的地方继续处理。在连接器任务正常停止并重新启动到新进程的过程中,会有短暂的处理延迟。

Kafka Connect 进程崩溃

如果 Kafka 连接器进程意外停止,它正在运行的任何连接器任务都会终止,而不会记录它们最近处理的偏移量。当 Kafka Connect 在分布式模式下运行时,Kafka Connect 会在其他进程上重新启动这些连接器任务。但是,PostgreSQL 连接器会从先前进程*记录*的最后一个偏移量恢复。这意味着新的替换任务可能会生成一些在崩溃前刚刚处理过的相同更改事件。重复事件的数量取决于偏移量刷新周期以及崩溃前的数据更改量。

由于在从故障中恢复时可能会发生事件重复,因此使用者应始终预料到某些重复事件。Debezium 更改是幂等的,因此一系列事件始终会导致相同的状态。

在每个更改事件记录中,Debezium 连接器都会插入特定于源的信息,说明事件的来源,包括 PostgreSQL 服务器事件发生的时间、服务器事务的 ID 以及事务更改被写入的写入预写日志中的位置。消费者可以跟踪这些信息,特别是 LSN,以确定事件是否重复。

Kafka 变得不可用

当连接器生成更改事件时,Kafka Connect 框架会使用 Kafka producer API 将这些事件记录在 Kafka 中。您可以在 Kafka Connect 配置中指定的频率周期性地,Kafka Connect 会记录出现在这些更改事件中的最新偏移量。如果 Kafka Broker 不可用,运行连接器的 Kafka Connect 进程将反复尝试重新连接到 Kafka Broker。换句话说,连接器任务将暂停,直到可以重新建立连接,届时连接器将从它们离开的地方恢复。

连接器停止一段时间

如果连接器被正常停止,数据库可以继续使用。任何更改都会记录在 PostgreSQL WAL 中。当连接器重新启动时,它将从之前的位置继续流式传输更改。也就是说,它会为在连接器停止期间进行的所有数据库更改生成更改事件记录。

正确配置的 Kafka 集群能够处理海量吞吐量。Kafka Connect 是根据 Kafka 的最佳实践编写的,并且在有足够资源的情况下,Kafka Connect 连接器也可以处理非常大量的数据库更改事件。因此,在停止一段时间后,当 Debezium 连接器重新启动时,它很可能会赶上在它停止期间进行的数据库更改。这需要多长时间取决于 Kafka 的能力和性能以及 PostgreSQL 中数据的更改量。