您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium Oracle 连接器

目录

概述

Debezium 的 Oracle 连接器会捕获和记录 Oracle 服务器数据库中发生的行级更改,包括在连接器运行时添加的表。您可以将连接器配置为为特定模式和表的子集发出更改事件,或者忽略、屏蔽或截断特定列中的值。

有关此连接器兼容的 Oracle 数据库版本的信息,请参阅 Debezium 版本概述

Debezium 可以通过使用原生的 LogMiner 数据库包、XStream APIOpenLogReplicator 来摄取 Oracle 的更改事件。

Oracle 连接器的工作原理

要优化配置和运行 Debezium Oracle 连接器,了解连接器的工作原理会很有帮助。

适配器模式

Debezium Oracle 连接器支持多种适配器来从 Oracle 数据库事务日志中捕获更改。您可以通过设置 database.connection.adapter 配置属性来将连接器配置为使用特定的适配器。

连接器支持以下适配器

LogMiner

默认情况下,Debezium Oracle 连接器使用原生的 Oracle LogMiner API 来读取和流式传输数据库事务日志中的更改。您可以将 Debezium 配置为在以下两种独立模式之一中使用 LogMiner:

未提交更改模式

在未提交更改模式下,Debezium Oracle 连接器会接收已提交和未提交事务的更改事件的连续流。由于 Oracle 会交错多个事务的操作,因此连接器必须缓冲事件,直到检测到相应的提交或回滚。

由于这是连接器的默认模式,因此分配足够的内存给连接器以处理所有捕获表的最大事务和最高并发级别非常重要。

未提交更改模式通过将大部分处理委托给连接器来最大程度地减少对源数据库的负载。Oracle LogMiner 仅负责从磁盘读取重做和归档日志,并流式传输原始更改。

通过将 database.connection.adapter 属性设置为 logminer(默认值)来启用此模式。

已提交更改模式

在已提交更改模式下,Debezium Oracle 连接器指示 Oracle LogMiner 仅流式传输已提交的更改。由于连接器仅接收已提交的事件,因此它可以立即将它们转发到目标主题,而无需维护内部事务缓冲区。

此模式需要更少的连接器内存,但会将更大的处理和内存负担转移到源数据库。对于大型事务,这很容易超出数据库可用的 PGA 内存,可能导致 PGA_AGGREGATE_LIMIT 错误。

仅当您能保证所有事务都适合数据库的 PGA 内存空间时,才推荐使用此模式。否则,请使用未提交更改模式。

通过将 database.connection.adapter 属性设置为 logminer_unbuffered 来启用此模式。

此功能当前处于 **孵化** 状态,在将来的版本中可能会发生变化。

OpenLogReplicator

您可以将 Debezium Oracle 连接器配置为使用 OpenLogReplicator,这是一个开源应用程序,可直接从重做和归档日志中读取 Oracle 更改,对数据库的影响最小。

通过将 database.connection.adapter 属性设置为 olr 来启用 OpenLogReplicator。有关更多信息,请参阅 OpenLogReplicator 支持 部分。

Oracle XStream

Debezium Oracle 连接器还可以使用 Oracle XStream,这是 Oracle GoldenGate 的商业组件。

通过将 database.connection.adapter 属性设置为 xstream 来启用 Oracle XStream。有关更多详细信息,请参阅 XStream 部分。

快照

通常,Oracle 服务器上的重做日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志中检索数据库的完整历史记录。为了使连接器能够建立数据库当前状态的基线,连接器首次启动时,它会执行数据库的初始一致性快照

如果完成初始快照所需的时间超过了数据库设置的 UNDO_RETENTION 时间(默认十五分钟),则可能发生 ORA-01555 异常。有关此错误的更多信息,以及有关您可以采取的恢复步骤,请参阅 常见问题解答

在表的快照过程中,Oracle 可能会抛出 ORA-01466 异常。当用户修改表结构或添加、更改、删除正在快照的表的索引或相关对象时,会发生此情况。如果发生这种情况,连接器将停止,并且需要从头开始进行初始快照。

为了解决这个问题,您可以将 snapshot.database.errors.max.retries 属性设置为大于 0 的值,以便可以重新启动特定表的快照。虽然整个快照不会从头开始重试,但有问题的那张表将从头开始重新读取,并且该表的主题将包含重复的快照事件。

Oracle 连接器执行初始快照的默认工作流

以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为其默认值 initial 时的快照过程。您可以通过更改 snapshot.mode 属性的值来定制连接器创建快照的方式。如果您配置了不同的快照模式,连接器将使用此工作流的修改版本来完成快照。

当快照模式设置为默认值时,连接器将完成以下任务来创建快照:

  1. Establish a connection to the database. (建立数据库连接。)

  2. 确定要捕获的表。默认情况下,连接器捕获所有表,但那些 模式将其排除从捕获中 的表除外。快照完成后,连接器将继续流式传输指定表的 ist。如果您希望连接器仅从特定表中捕获数据,则可以通过设置 table.include.listtable.exclude.list 等属性来指示连接器仅捕获一部分表或表元素的数据。

  3. 在每个捕获的表上获取 ROW SHARE MODE 锁,以防止在创建快照期间发生结构性更改。Debezium 仅持有锁很短时间。

  4. 从服务器的重做日志中读取当前的系统更改号 (SCN) 位置。

  5. 捕获所有数据库表的结构,或所有指定用于捕获的表的结构。连接器在其内部数据库模式历史主题中持久化模式信息。模式历史记录提供了有关在发生更改事件时生效的结构的 ist。

    默认情况下,连接器捕获处于捕获模式的数据库中每个表的模式,包括未配置为捕获的表。如果表未配置为捕获,初始快照仅捕获其结构;不捕获任何表数据。有关为什么快照会为未包含在初始快照中的表持久化模式信息,请参阅 理解为什么初始快照会捕获所有表的模式历史记录

  6. 释放步骤 3 中获取的锁。其他数据库客户端现在可以写入任何先前锁定的表。

  7. 在步骤 4 中读取的 SCN 位置,连接器会扫描指定用于捕获的表(SELECT * FROM …​ AS OF SCN 123)。在扫描期间,连接器会完成以下任务:

    1. Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began. (确认表在快照开始之前已创建。如果表在快照开始之后创建,连接器将跳过该表。快照完成后,连接器过渡到流式传输,它会为快照开始后创建的任何表发出变更事件。)

    2. 为从表中捕获的每一行生成一个 read 事件。所有 read 事件都包含相同的 SCN 位置,即步骤 4 中获取的 SCN 位置。

    3. Emits each read event to the Kafka topic for the source table. (将每个 read 事件发送到源表的 Kafka 主题。)

    4. Releases data table locks, if applicable. (释放数据表锁(如果适用)。)

  8. Record the successful completion of the snapshot in the connector offsets. (在连接器偏移量中记录快照的成功完成。)

The resulting initial snapshot captures the current state of each row in the captured tables. From this baseline state, the connector captures subsequent changes as they occur. (生成的初始快照捕获了捕获表中每一行的当前状态。从这个基线状态开始,连接器会捕获后续发生的更改。)

快照过程开始后,如果由于连接器故障、重新平衡或其他原因导致进程中断,则在连接器重启后,该过程将重新启动。在连接器完成初始快照后,它将继续从步骤 3 中读取的位置流式传输,这样就不会错过任何更新。如果连接器因任何原因再次停止,则在重启后,它将从之前停止的位置继续流式传输更改。

表 1. snapshot.mode 连接器配置属性的设置
Setting (设置) 描述

always (始终)

在每次连接器启动时执行快照。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。

initial (初始)

连接器执行数据库快照,如 执行初始快照的默认工作流 中所述。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。

initial_only (仅初始)

连接器执行数据库快照,并在流式传输任何更改事件记录之前停止,不允许捕获任何后续更改事件。

schema_only (仅模式)

Deprecated, see no_data. (已弃用,请参阅 no_data。)

no_data (无数据)

连接器捕获所有相关表的 ist,执行默认快照工作流中描述的所有步骤,但它不会创建 READ 事件来表示连接器启动时的 ist(步骤 6)。

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. (将此选项设置为恢复已丢失或损坏的数据库模式历史主题。重新启动后,连接器将运行一个快照,从源表重建该主题。您还可以将此属性设置为定期修剪经历意外增长的数据库模式历史主题。)

WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown. (警告:如果在上次连接器关闭后模式更改已提交到数据库,请不要使用此模式执行快照。)

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

Set the snapshot mode to configuration_based to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'. (将快照模式设置为 configuration_based,通过具有前缀“snapshot.mode.configuration.based”的连接器属性集来控制快照行为。)

custom (自定义)

custom snapshot 模式允许您注入自己对 io.debezium.spi.snapshot.Snapshotter 接口的实现。将 snapshot.mode.custom.name 配置属性设置为您的实现 `name()` 方法提供的名称。该名称在 Kafka Connect 群集的类路径中指定。如果您使用 Debezium DebeziumEngine,则名称包含在连接器 JAR 文件中。有关更多信息,请参阅 自定义 snapshot 程序 SPI

有关更多信息,请参阅连接器配置属性表中的 snapshot.mode

了解为什么初始快照会捕获所有表的模式历史记录

The initial snapshot that a connector runs captures two types of information (连接器运行的初始快照捕获两种类型的信息:)

Table data (表数据)

有关连接器 table.include.list 属性中命名的表的 INSERTUPDATEDELETE 操作的 ist。

Schema data (模式数据)

DDL statements that describe the structural changes that are applied to tables. Schema data is persisted to both the internal schema history topic, and to the connector’s schema change topic, if one is configured. (描述应用于表的结构化更改的 DDL 语句。模式数据将持久化到内部模式历史主题,以及连接器的模式变更主题(如果已配置)。)

After you run an initial snapshot, you might notice that the snapshot captures schema information for tables that are not designated for capture. By default, initial snapshots are designed to capture schema information for every table that is present in the database, not only from tables that are designated for capture. Connectors require that the table’s schema is present in the schema history topic before they can capture a table. By enabling the initial snapshot to capture schema data for tables that are not part of the original capture set, Debezium prepares the connector to readily capture event data from these tables should that later become necessary. If the initial snapshot does not capture a table’s schema, you must add the schema to the history topic before the connector can capture data from the table. (运行初始快照后,您可能会注意到快照捕获了未指定捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的所有表的模式信息,而不仅仅是指定捕获的表。连接器要求表的模式存在于模式历史主题中,然后才能捕获表。通过允许初始快照捕获非原始捕获集一部分的表的模式数据,Debezium 会为连接器做好准备,以便在将来需要时能够轻松地从这些表中捕获事件数据。如果初始快照未捕获表的模式,您必须在连接器能够从表中捕获数据之前将模式添加到历史主题。)

In some cases, you might want to limit schema capture in the initial snapshot. This can be useful when you want to reduce the time required to complete a snapshot. Or when Debezium connects to the database instance through a user account that has access to multiple logical databases, but you want the connector to capture changes only from tables in a specific logic database. (在某些情况下,您可能希望在初始快照中限制模式捕获。当您想减少完成快照所需的时间时,这可能很有用。或者,当 Debezium 通过具有访问多个逻辑数据库的用户帐户连接到数据库实例时,而您只想让连接器捕获特定逻辑数据库中表的更改。)

从初始快照未捕获的表中捕获数据(无模式更改)

In some cases, you might want the connector to capture data from a table whose schema was not captured by the initial snapshot. Depending on the connector configuration, the initial snapshot might capture the table schema only for specific tables in the database. If the table schema is not present in the history topic, the connector fails to capture the table, and reports a missing schema error. (在某些情况下,您可能希望连接器捕获模式未被初始快照捕获的表中的数据。根据连接器配置,初始快照可能仅捕获数据库中特定表的模式。如果表模式不存在于历史主题中,连接器将无法捕获该表,并报告模式丢失错误。)

You might still be able to capture data from the table, but you must perform additional steps to add the table schema. (您仍然可以从该表中捕获数据,但必须执行额外步骤来添加表模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • 事务日志中该表的所有条目都使用相同的模式。有关从新表捕获具有模式更改的数据的 ist,请参阅 从初始快照未捕获的表中捕获数据(模式更改)

过程
  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性 指定的内部数据库模式历史主题。

  3. 在连接器配置中

    1. snapshot.mode 设置为 recovery

    2. (可选) 将 schema.history.internal.store.only.captured.tables.ddl 的值设置为 false,以确保将来连接器可以轻松捕获当前未指定用于捕获的表的 ist。连接器只能从其模式信息存在于历史记录主题中的表中捕获数据。

    3. 将要捕获的表添加到 table.include.list

  4. Restart the connector. The snapshot recovery process rebuilds the schema history based on the current structure of the tables. (重新启动连接器。快照恢复过程将根据表的当前结构重建模式历史记录。)

  5. (可选) 快照完成后,对新添加的表发起增量快照。增量快照首先流式传输新表的 ist,然后继续从重做和归档日志中读取先前配置的表的 ist,包括连接器离线时发生的 ist。

  6. (Optional) Reset the snapshot.mode back to no_data to prevent the connector from initiating recovery after a future restart. ((可选)将 snapshot.mode 重置回 no_data,以防止连接器在将来的重启后启动恢复。)

从初始快照未捕获的表中捕获数据(模式更改)

If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results. (如果模式更改应用于某个表,则在模式更改之前提交的记录与更改之后提交的记录具有不同的结构。当 Debezium 从表中捕获数据时,它会读取模式历史记录以确保它将正确的模式应用于每个事件。如果模式不存在于模式历史主题中,连接器将无法捕获该表,并导致错误。)

If you want to capture data from a table that was not captured by the initial snapshot, and the schema of the table was modified, you must add the schema to the history topic, if it is not already available. You can add the schema by running a new schema snapshot, or by running an initial snapshot for the table. (如果您想从初始快照未捕获的表中捕获数据,并且该表的模式已修改,则必须将模式添加到历史主题(如果尚未提供)。您可以通过运行新的模式快照或为该表运行初始快照来添加模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • A schema change was applied to the table so that the records to be captured do not have a uniform structure. (已对表应用了模式更改,以便要捕获的记录没有统一的结构。)

过程
Initial snapshot captured the schema for all tables (store.only.captured.tables.ddl was set to false) (初始快照捕获了所有表的模式(store.only.captured.tables.ddl 设置为 false))
  1. 编辑 table.include.list 属性以指定要捕获的表。

  2. Restart the connector. (重新启动连接器。)

  3. 发起增量快照以捕获新表的 ist。

Initial snapshot did not capture the schema for all tables (store.only.captured.tables.ddl was set to true) (初始快照未捕获所有表的模式(store.only.captured.tables.ddl 设置为 true))

If the initial snapshot did not save the schema of the table that you want to capture, complete one of the following procedures (如果初始快照未保存您要捕获的表的模式,请完成以下任一程序:)

Procedure 1: Schema snapshot, followed by incremental snapshot (过程 1:模式快照,然后是增量快照)

In this procedure, the connector first performs a schema snapshot. You can then initiate an incremental snapshot to enable the connector to synchronize data. (在此过程中,连接器首先执行模式快照。然后,您可以启动增量快照以使连接器能够同步数据。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性 指定的内部数据库模式历史主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 no_data

    2. 编辑 table.include.list 以添加要捕获的表。

  5. Restart the connector. (重新启动连接器。)

  6. Wait for Debezium to capture the schema of the new and existing tables. Data changes that occurred any tables after the connector stopped are not captured. (等待 Debezium 捕获新表和现有表的模式。连接器停止后在任何表上发生的数据更改都不会被捕获。)

  7. 为确保不丢失任何 ist,请发起增量快照

Procedure 2: Initial snapshot, followed by optional incremental snapshot (过程 2:初始快照,然后是可选的增量快照)

In this procedure the connector performs a full initial snapshot of the database. As with any initial snapshot, in a database with many large tables, running an initial snapshot can be a time-consuming operation. After the snapshot completes, you can optionally trigger an incremental snapshot to capture any changes that occur while the connector is off-line. (在此过程中,连接器将执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一项耗时的操作。快照完成后,您可以选择触发增量快照以捕获连接器离线期间发生的任何更改。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性 指定的内部数据库模式历史主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. 编辑 table.include.list 以添加要捕获的表。

  5. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 initial

    2. (可选) 将 schema.history.internal.store.only.captured.tables.ddl 设置为 false

  6. Restart the connector. The connector takes a full database snapshot. After the snapshot completes, the connector transitions to streaming. (重新启动连接器。连接器将进行完整的数据库快照。快照完成后,连接器将过渡到流式传输。)

  7. (可选) 为了捕获连接器离线期间发生的任何 ist,请发起增量快照

临时快照

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment (但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 提供了一个执行即席快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行即席快照:)

  • The connector configuration is modified to capture a different set of tables. (修改了连接器配置以捕获不同的表集。)

  • Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)

  • Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table. (您可以通过启动所谓的即席快照来重新运行之前捕获了快照的表的快照。即席快照需要使用信号表。通过向 Debezium 信号表发送信号请求来启动即席快照。)

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled. (当您启动现有表的即席快照时,连接器会将内容追加到该表已存在的主题中。如果之前存在的主题已被删除,并且启用了自动主题创建,则 Debezium 可以自动创建主题。)

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database. (即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的部分内容。此外,快照还可以捕获数据库中表的部分内容。)

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table (您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供要在快照中包含的表名称,如下表所示:)

表 2. execute-snapshot 信号记录的示例
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。)
Currently, you can request incremental or blocking snapshots. (目前,您可以请求 incrementalblocking 快照。)

data-collections (数据集合)

N/A

An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. (一个包含正则表达式的数组,匹配要包含在快照中的表的完全限定名称。)
对于 Oracle 连接器,请使用以下格式指定表的完全限定名:database.schema.table

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:)

data-collection (数据集合)

The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. (过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。)

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

surrogate-key (代理键)

N/A

An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. (一个可选字符串,指定连接器在快照过程中用作表主键的列名。)

Triggering an ad hoc incremental snapshot (触发即席增量快照)

通过将具有 execute-snapshot 信号类型的条目添加到信号表中,或向 Kafka 信号主题发送信号消息,来启动临时增量快照。连接器处理该消息后,将开始快照操作。快照过程读取第一个和最后一个主键值,并使用这些值作为每个表的起始和结束点。根据表中条目的数量和配置的块大小,Debezium 将表分成块,然后依次快照每个块。

有关更多信息,请参阅 增量快照

Triggering an ad hoc blocking snapshot (触发即席阻塞快照)

You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming. (您可以通过向信号表或信号主题添加具有 execute-snapshot 信号类型的条目来启动即席阻塞快照。在连接器处理消息后,它将开始快照操作。连接器将暂时停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。)

有关更多信息,请参阅 阻塞快照

增量快照

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)

在增量快照中,Debezium 不像初始快照那样一次性捕获数据库的全部状态,而是分阶段、一系列可配置的块来捕获每个表。您可以指定要快照的表以及每个块的大小。块大小决定了快照在每次从数据库提取 ist 时收集的行数。增量快照的默认块大小为 1024 行。

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process (随着增量快照的进行,Debezium 使用水位标记来跟踪其进度,并维护捕获的每个表行的记录。与标准的初始快照过程相比,这种分阶段的数据捕获方法提供了以下优势:)

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning. (如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在过程恢复后,快照将从停止的点开始,而不是从头重新捕获表。)

  • 您可以按需随时运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以将表添加到其 table.include.list 属性后,您可能会重新运行快照。

Incremental snapshot process (增量快照过程)

运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据配置的块大小将表分成块。逐块工作,然后捕获块中的每个表行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示块快照开始时的行值。

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka. (随着快照的进行,其他进程很可能会继续访问数据库,可能修改表记录。为了反映这些更改,INSERTUPDATEDELETE 操作照常提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些变更事件,并将相应的变更事件记录发送到 Kafka。)

How Debezium resolves collisions among records with the same primary key (Debezium 如何解决具有相同主键的记录之间的冲突)

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka. (在某些情况下,流式传输过程发出的 UPDATEDELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前发出修改表行的事件。当快照最终发出行的相应 READ 事件时,其值已过时。为了确保乱序到达的增量快照事件按正确的逻辑顺序处理,Debezium 采用缓冲方案来解决冲突。仅当快照事件与流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。)

Snapshot window (快照窗口)

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key. (为了帮助解决延迟到达的 READ 事件与修改同一表行的流式事件之间的冲突,Debezium 采用所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的间隔。在某个块的快照窗口打开之前,Debezium 会遵循其常规行为,将事务日志中的事件直接向下游发送到目标 Kafka 主题。但在某个块的快照打开到关闭的整个过程中,Debezium 会执行去重步骤以解决具有相同主键的事件之间的冲突。)

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。)

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic. (随着快照窗口的打开,Debezium 开始处理快照块,它会将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入的流式事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃已缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。当块的快照窗口关闭后,缓冲区将仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。)

The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)

To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table. (要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。)

Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDBMySQLPostgreSQL)来说,写入权限是可选的。)

Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)

Debezium Oracle 连接器不支持在增量快照运行时进行模式更改。

触发增量快照

To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database. You submit snapshot signals as SQL INSERT queries. (要启动增量快照,您可以将即席快照信号发送到源数据库上的信号表。您将快照信号作为 SQL INSERT 查询提交。)

After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation. (在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。)

The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation. Debezium currently supports the incremental and blocking snapshot types. (您提交的查询指定了要包含在快照中的表,并可选地指定了快照操作的类型。Debezium 目前支持 incrementalblocking 快照类型。)

To specify the tables to include in the snapshot, provide a data-collections array that lists the tables, or an array of regular expressions used to match tables, for example, (要指定要包含在快照中的表,请提供一个列出表的 data-collections 数组,或者一个用于匹配表的正则表达式数组,例如:)

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot. (增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会将空数组解释为无需执行任何操作,并且不会执行快照。)

If the name of a table that you want to include in a snapshot contains a dot (.), a space, or some other non-alphanumeric character, you must escape the table name in double quotes. (如果要包含在快照中的表名称包含点 (.)、空格或其他非字母数字字符,则必须用双引号转义表名称。)
例如,要包含存在于 public 模式、db1 数据库中,并且名称为 My.Table 的表,请使用以下格式:"db1.public.\"My.Table\""

先决条件
Using a source signaling channel to trigger an incremental snapshot (使用源信号通道触发增量快照)
  1. Send a SQL query to add the ad hoc incremental snapshot request to the signaling table (发送 SQL 查询将即席增量快照请求添加到信号表)

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    For example, (例如,)

    INSERT INTO db1.myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'execute-snapshot',  (3)
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4)
        "type":"incremental", (5)
        "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)

    The values of the id,type, and data parameters in the command correspond to the fields of the signaling table. (命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 3. 用于将增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    database.schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its own id string as a watermarking signal. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。而是在快照期间,Debezium 生成自己的 id 字符串作为水位标记信号。)

    3

    execute-snapshot

    The type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    A required component of the data field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot. (信号的 data 字段的必需组件,该字段指定一个表名称数组或匹配要包含在快照中的表名称的正则表达式。)
    数组列出了使用 database.schema.table 格式的正则表达式,以匹配数据集合的完全限定名。此格式与您用于指定连接器 信号表 名称的格式相同。

    5

    incremental (增量)

    An optional type component of the data field of a signal that specifies the type of snapshot operation to run. (信号的 data 字段的可选 type 组件,指定要运行的快照操作的类型。)
    Valid values are incremental and blocking. (有效值为 incrementalblocking。)
    If you do not specify a value, the connector defaults to performing an incremental snapshot. (如果您不指定值,连接器将默认执行增量快照。)

    6

    additional-conditions (附加条件)

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    Each additional condition is an object with data-collection and filter properties. You can specify different filters for each data collection. (每个附加条件是一个具有 data-collectionfilter 属性的对象。您可以为每个数据集合指定不同的过滤器。)
    * data-collection 属性是过滤器应用的 ist 集合的完全限定名。有关 additional-conditions 参数的 ist,请参阅 使用 additional-conditions 运行临时增量快照

使用 additional-conditions 运行临时增量快照

If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions parameter to the snapshot signal. (如果您希望快照仅包含表内容的子集,则可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。)

The SQL query for a typical snapshot takes the following form (典型快照的 SQL 查询形式如下:)

SELECT * FROM <tableName> ....

By adding an additional-conditions parameter, you append a WHERE condition to the SQL query, as in the following example (通过添加 additional-conditions 参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:)

SELECT * FROM <data-collection> WHERE <filter> ....

The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table (以下示例显示了一个 SQL 查询,用于向信号表发送带有附加条件的即席增量快照请求:)

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

For example, suppose you have a products table that contains the following columns (例如,假设您有一个 products 表,其中包含以下列:)

  • id (primary key)

  • color (颜色)

  • quantity (数量)

If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot (如果您希望 products 表的增量快照仅包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:)

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');

The additional-conditions parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10 (additional-conditions 参数还允许您传递基于多个列的条件。例如,使用前面示例中的 products 表,您可以提交一个查询来触发增量快照,其中仅包含 color=bluequantity>10 的项目数据:)

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');

The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)

Example 1. Incremental snapshot event message (示例 1. 增量快照事件消息)
{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" (1)
    },
    "op":"r", (2)
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 4. 增量 snapshot 事件消息中字段的描述
Item Field name (字段名) 描述

1

snapshot (快照)

Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。)
Currently, the only valid options are blocking and incremental. (目前,唯一有效选项是 blockingincremental。)
Specifying a type value in the SQL query that you submit to the signaling table is optional. (在您提交给信号表的 SQL 查询中指定 type 值是可选的。)
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)

2

op (操作)

Specifies the event type. (指定事件类型。)
The value for snapshot events is r, signifying a READ operation. (快照事件的值为 r,表示 READ 操作。)

使用 Kafka 信号通道触发增量快照

You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshotdata 字段必须具有以下字段:)

表 5. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports the incremental and blocking types. (要执行的快照类型。目前 Debezium 支持 incrementalblocking 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。)
使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

additional-conditions (附加条件)

N/A

An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition: data-collection:: The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. filter:: Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:data-collection:: 过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。filter:: 指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

Example 2. An execute-snapshot Kafka message (示例 2. 一个 execute-snapshot Kafka 消息)
Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Ad hoc incremental snapshots with additional-conditions (带有附加条件的即席增量快照)

Debezium uses the additional-conditions field to select a subset of a table’s content. (Debezium 使用 additional-conditions 字段来选择表内容的子集。)

Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)

SELECT * FROM <tableName> …​.

When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collectionfilter 参数将被附加到 SQL 查询中,例如:)

SELECT * FROM <data-collection> WHERE <filter> …​.

For example, given a products table with the columns id (primary key), color, and brand, if you want a snapshot to include only content for which color='blue', when you request the snapshot, you could add the additional-conditions property to filter the content: :leveloffset: +1 (例如,给定一个具有 id(主键)、colorbrand 列的 products 表,如果您希望快照仅包含 color='blue' 的内容,在请求快照时,您可以添加 additional-conditions 属性来过滤内容::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`

You can also use the additional-conditions property to pass conditions based on multiple columns. For example, using the same products table as in the previous example, if you want a snapshot to include only the content from the products table for which color='blue', and brand='MyBrand', you could send the following request: :leveloffset: +1 (您还可以使用 additional-conditions 属性来传递基于多个列的条件。例如,使用前面示例中的相同 products 表,如果您希望快照仅包含 products 表中 color='blue'brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`

停止增量快照

In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database. (在某些情况下,可能需要停止增量快照。例如,您可能发现快照配置不正确,或者您想确保资源可用于其他数据库操作。您可以通过向源数据库上的信号表发送信号来停止正在运行的快照。)

You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT query. The stop-snapshot signal specifies the type of the snapshot operation as incremental, and optionally specifies the tables that you want to omit from the currently running snapshot. After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress. (您通过在 SQL INSERT 查询中发送来将停止快照信号提交到信号表。stop-snapshot 信号将快照操作的 type 指定为 incremental,并可选地指定您希望从当前正在运行的快照中排除的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在增量快照操作进行中时停止它。)

Additional resources (附加资源)

您还可以通过向 Kafka 信号主题发送 JSON 消息来停止增量快照。

先决条件
Using a source signaling channel to stop an incremental snapshot (使用源信号通道停止增量快照)
  1. Send a SQL query to stop the ad hoc incremental snapshot to the signaling table (发送 SQL 查询以停止添加到信号表的即席增量快照)

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    For example, (例如,)

    INSERT INTO db1.myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'stop-snapshot',  (3)
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4)
        "type":"incremental"}'); (5)

    The values of the id, type, and data parameters in the signal command correspond to the fields of the signaling table. (信号命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 6. 用于将停止增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    database.schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。)

    3

    stop-snapshot

    Specifies type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    An optional component of the data field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot. (信号的 data 字段的可选组件,指定一个表名称数组或匹配要从快照中删除的表名称的正则表达式。)
    数组列出了匹配表名称的正则表达式,格式为 database.schema.table

    If you omit this component from the data field, the signal stops the entire incremental snapshot that is in progress. (如果您从 data 字段中省略此组件,则信号将停止正在进行的整个增量快照。)

    5

    incremental (增量)

    A required component of the data field of a signal that specifies the type of snapshot operation to be stopped. (信号的 data 字段的必需组件,指定要停止的快照操作的类型。)
    Currently, the only valid option is incremental. (目前,唯一有效选项是 incremental。)
    If you do not specify a type value, the signal fails to stop the incremental snapshot. (如果您不指定 type 值,信号将无法停止增量快照。)

使用 Kafka 信号通道停止增量快照

You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshotdata 字段必须具有以下字段:)

表 7. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports only the incremental type. (要执行的快照类型。目前 Debezium 仅支持 incremental 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. (一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的表名称或匹配表名称的正则表达式数组。)
使用 database.schema.table 格式指定表名。

The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`

Custom snapshotter SPI (自定义快照器 SPI)

For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)

io.debezium.snapshot.spi.Snapshotter

Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)

io.debezium.snapshot.spi.SnapshotQuery

Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)

io.debezium.snapshot.spi.SnapshotLock

Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)

io.debezium.snapshot.spi.Snapshotter interface. All built-in snapshot modes implement this interface. (io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)
/**
 * {@link Snapshotter} is used to determine the following details about the snapshot process:
 * <p>
 * - Whether a snapshot occurs. <br>
 * - Whether streaming continues during the snapshot. <br>
 * - Whether the snapshot includes schema (if supported). <br>
 * - Whether to snapshot data or schema following an error.
 * <p>
 * Although Debezium provides many default snapshot modes,
 * to provide more advanced functionality, such as partial snapshots,
 * you can customize implementation of the interface.
 * For more information, see the documentation.
 *
 *
 *
 */
@Incubating
public interface Snapshotter extends Configurable {

    /**
     * @return the name of the snapshotter.
     *
     *
     */
    String name();

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a data snapshot
     */
    boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a schema snapshot
     */
    boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @return {@code true} if the snapshotter should stream after taking a snapshot
     */
    boolean shouldStream();

    /**
     * @return {@code true} whether the schema can be recovered if database schema history is corrupted.
     */
    boolean shouldSnapshotOnSchemaError();

    /**
     * @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
     */
    boolean shouldSnapshotOnDataError();

    /**
     *
     * @return {@code true} if streaming should resume from the start of the snapshot
     * transaction, or {@code false} for when a connector resumes and takes a snapshot,
     * streaming should resume from where streaming previously left off.
     */
    default boolean shouldStreamEventsStartingFromSnapshot() {
        return true;
    }

    /**
     * Lifecycle hook called after the snapshot phase is successful.
     */
    default void snapshotCompleted() {
        // no operation
    }

    /**
     * Lifecycle hook called after the snapshot phase is aborted.
     */
    default void snapshotAborted() {
        // no operation
    }
}
io.debezium.snapshot.spi.SnapshotQuery interface. All built-in snapshot query modes implement this interface. (io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)
/**
 * {@link SnapshotQuery} is used to determine the query used during a data snapshot
 *
 *
 */
public interface SnapshotQuery extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Generate a valid query string for the specified table, or an empty {@link Optional}
     * to skip snapshotting this table (but that table will still be streamed from)
     *
     * @param tableId the table to generate a query for
     * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
     *                              include/exclude filters
     * @return a valid query string, or none to skip snapshotting this table
     */
    Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);

}
io.debezium.snapshot.spi.SnapshotLock interface. All built-in snapshot lock modes implement this interface. (io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)
/**
 * {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
 *
 *
 */
public interface SnapshotLock extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
     * implementation.
     */
    Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);

}

阻塞快照

To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)

A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)

You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)

  • You add a new table and you want to complete the snapshot while the connector is running. (添加了一个新表,并且您希望在连接器运行时完成快照。)

  • You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot. (添加了一个大表,并且您希望快照的完成时间比增量快照更快。)

Blocking snapshot process (阻塞快照过程)

When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed. (运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,流式传输将恢复。)

Configure snapshot (配置快照)

You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)

  • data-collections: to specify which tables must be snapshot. (data-collections:用于指定必须进行快照的表。)

  • data-collections: Specifies the tables that you want the snapshot to include. (data-collections:指定您希望快照包含的表。)
    This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of the table.include.list property, which specifies the tables to capture in a blocking snapshot. (此属性接受逗号分隔的正则表达式列表,这些表达式匹配完全限定的表名。此属性的行为类似于 table.include.list 属性的行为,后者指定要在阻塞快照中捕获的表。)

  • additional-conditions: You can specify different filters for different table. (additional-conditions:您可以为不同的表指定不同的过滤器。)

    • The data-collection property is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. (data-collection 属性是过滤器将应用的表的完全限定名称,并且根据数据库的不同,可能区分大小写或不区分大小写。)

    • The filter property will have the same value used in the snapshot.select.statement.overrides, the fully-qualified name of the table that should match by case. (filter 属性将具有与 snapshot.select.statement.overrides 中使用的值相同的值,即应区分大小写匹配的表的完全限定名称。)

For example (例如:)

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Possible duplicates (可能重复)

A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)

主题名称

默认情况下,Oracle 连接器会将表中发生的 ist、UPDATEDELETE 操作的所有更改事件写入一个特定于该表的 Apache Kafka 主题。连接器使用以下约定来命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供了默认名称组件的定义:

topicPrefix

topic.prefix 连接器配置属性指定的 ist 前缀。

schemaName (模式名称)

操作发生的模式(schema)的名称。

tableName

操作发生的表的名称。

例如,如果 fulfillment 是服务器名称,inventory 是模式名称,并且数据库包含名为 orderscustomersproducts 的表,则 Debezium Oracle 连接器会为数据库中的每个表发出以下 Kafka 主题的 ist:

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

连接器对标记其内部数据库模式历史主题、模式更改主题事务元数据主题也应用了类似的命名约定。

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

模式历史主题

当数据库客户端查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以随时更改,这意味着连接器必须能够识别在记录每次插入、更新或删除操作时模式是什么。此外,连接器不一定能将当前模式应用于每个事件。如果事件相对较旧,它可能是在应用当前模式之前记录的。

为了确保模式更改后事件的正确处理,Oracle 不仅会在重做日志中包含影响数据的 ist,还会包含应用于数据库的 DDL 语句。当连接器在重做日志中遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存表示。连接器使用此模式表示来标识每个插入、更新或删除操作时表的 ist,并生成相应的更改事件。在单独的数据库模式历史 Kafka 主题中,连接器会记录所有 DDL 语句以及每个 DDL 语句出现在重做日志中的位置。

当连接器在崩溃或正常停止后重新启动时,它会从特定位置开始读取重做日志,即从特定时间点开始。连接器通过读取数据库模式历史 Kafka 主题并解析重做日志中连接器开始位置之前的 ddl 语句来重建该时间点存在的表结构。

此数据库模式历史主题仅供连接器内部使用。可选地,连接器还可以将模式更改事件发送到另一个供消费者应用程序使用的主题

Additional resources (附加资源)

模式更改主题

您可以将 Debezium Oracle 连接器配置为生成描述应用于数据库表中结构性更改的模式更改事件。连接器会将模式更改事件写入名为 <serverName> 的 Kafka 主题,其中 serverName 是在 topic.prefix 配置属性中指定的 ist。

每当连接器从新表流式传输数据或表结构发生更改时,Debezium 都会向模式更改主题发出新消息。

在表结构更改后,您必须遵循(模式演进过程)。

连接器发送到模式更改主题的消息包含一个 payload,并且可以选择性地包含更改事件消息的模式。

模式变更事件的模式具有以下元素:

name (名称)

模式变更事件消息的名称。

type

事件消息类型的类型。

version (版本)

模式的版本。版本是一个整数,每次模式更改时都会递增。

fields (字段)

变更事件消息中包含的字段。

示例:Oracle 连接器模式更改主题的模式

以下示例以 JSON 格式显示了典型的模式。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

模式变更事件消息的 payload 包含以下元素:

ddl

提供导致模式更改的 SQL CREATEALTERDROP 语句。

databaseName (数据库名称)

应用于语句的数据库名称。databaseName 的值用作消息键。

tableChanges (表变更)

模式更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者无需先通过 DDL 解析器处理消息即可轻松读取消息。

默认情况下,连接器使用 ALL_TABLES 数据库视图来标识要存储在模式历史主题中的表名。在该视图中,连接器只能访问其连接数据库的用户帐户可用的表的 ist。

您可以修改设置,使模式历史主题存储不同的表子集。使用以下方法之一来更改主题存储的表集:

当连接器配置为捕获表时,它会将该表模式更改的历史记录不仅存储在模式更改主题中,还存储在内部数据库模式历史记录主题中。内部数据库模式历史记录主题仅供连接器使用,不打算由消费应用程序直接使用。确保需要模式更改通知的应用程序仅从模式更改主题中获取该信息。

切勿分区数据库模式历史主题。为了使数据库模式历史主题正常工作,它必须维护连接器发送到它的事件记录的一致的全局顺序。

为了确保主题不会在分区之间分割,请使用以下任一方法设置主题的分区计数:

  • 如果您手动创建数据库模式历史主题,请指定分区计数为 1

  • 如果您使用 Apache Kafka 代理自动创建数据库模式历史主题,则会创建该主题,请将 Kafka num.partitions 配置选项的值设置为 1

模式更改主题的消息格式处于孵化状态,可能会在不通知的情况下进行更改。

示例:发送到 Oracle 连接器模式更改主题的消息

以下示例以 JSON 格式显示了典型的模式变更消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "3.3.0.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "ts_us": 1588252618953000,
      "ts_ns": 1588252618953000000,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user",
      "row_id": "AAASgjAAMAAAACnAAA",
      "commit_ts_ms": 1588252619012,
      "start_scn": "1513734",
      "start_ts_ms": 1588252618953
    },
    "ts_ms": 1588252618953, (1)
    "ts_us": 1588252618953987, (1)
    "ts_ns": 1588252618953987512, (1)
    "databaseName": "ORCLPDB1", (2)
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", (3)
    "tableChanges": [ (4)
      {
        "type": "CREATE", (5)
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", (6)
        "table": { (7)
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ (8)
            "ID"
          ],
          "columns": [ (9)
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ (10)
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 8. 发送到模式更改主题的消息中字段的描述
Item Field name (字段名) 描述

1

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在 source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。

2

databaseName (数据库名称)
schemaName (模式名称)

标识包含更改的 ist 和模式。

3

ddl

此字段包含导致模式更改的 DDL。

4

tableChanges (表变更)

包含 DDL 命令生成的模式变更的一个或多个项目的数组。

5

type

描述更改的类型。type 可以设置为以下值之一:

CREATE (创建)

表已创建。

ALTER (修改)

表已修改。

DROP (删除)

表已删除。

6

id

已创建、修改或删除的表的完整标识符。在表重命名的情况下,此标识符是 <old>,<new> 表名称的串联。

7

table (表)

表示应用更改后表的元数据。

8

primaryKeyColumnNames (主键列名)

组成表主键的列的列表。

9

columns (列)

已更改表中每个列的元数据。

10

attributes (属性)

每个表变更的自定义属性元数据。

在连接器发送到模式更改主题的消息中,消息键是包含模式更改的 ist 的名称。在以下示例中,payload 字段包含 databaseName 键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

事务元数据

Debezium 可以生成表示事务元数据边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

数据库事务由 BEGINEND 关键字括起来的语句块表示。Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示。

ts_ms

数据源中事务边界事件(BEGINEND 事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段将代表 Debezium 处理事件的时间。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

以下示例显示了一个典型的事务边界消息:

示例:Oracle 连接器事务边界事件
{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。

数据更改事件的丰富

启用事务元数据后,数据消息 Envelope 将会添加一个 transaction 字段。此字段提供有关每个事件的信息,形式为字段的组合:

id

唯一事务标识符的字符串表示。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中,每个数据集合的位置。

以下示例显示了一个典型的事务事件消息:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335741",
  "ts_ns": "1580390884335741963",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

LogMiner 挖掘策略

Oracle 重做日志中的条目不存储用户提交以进行 DML 更改的原始 SQL 语句。相反,重做条目包含一组更改向量和一组对象标识符,这些标识符代表与这些向量相关的表空间、表和列。换句话说,重做日志条目不包含受 DML 更改影响的模式、表或列的名称。

Debezium Oracle 连接器使用 log.mining.strategy 配置属性来控制 Oracle LogMiner 如何处理更改向量中对象标识符的查找。在某些情况下,一种日志挖掘策略在处理模式更改方面可能比另一种更可靠。但是,在选择日志挖掘策略之前,考虑它可能对性能和开销产生的影响很重要。

将数据字典写入重做日志

redo_log_catalog 挖掘策略指示数据库在每次重做日志切换后立即将数据字典的副本刷新到重做日志。这是跟踪与数据更改交织在一起的模式更改的最可靠策略,因为 Oracle LogMiner 可以通过一系列更改向量在起始和结束数据字典状态之间进行插值。

但是,redo_log_catalog 模式也是最昂贵的,因为它需要几个关键步骤才能正常工作。首先,此模式要求在每次日志切换后将数据字典刷新到重做日志。每次切换后刷新日志会迅速消耗归档日志中的宝贵空间,并且归档日志的高量可能超过数据库管理员准备的数量。如果您打算使用此模式,请与您的数据库管理员协调,以确保数据库配置正确。

如果将连接器配置为使用 redo_log_catalog 模式,请不要使用多个 Debezium Oracle 连接器从同一逻辑数据库捕获 ist。

直接使用在线目录

默认策略模式 online_catalog 的工作方式与 redo_log_catalog 模式不同。当策略设置为 online_catalog 时,数据库永远不会将数据字典刷新到重做日志。相反,Oracle LogMiner 始终使用最新的数据字典状态来执行比较。通过始终使用当前目录并消除刷新到重做日志,此策略需要更少的开销,并且运行效率更高。但是,这些好处被无法解析交织的模式更改和数据更改所抵消。因此,此策略有时可能导致事件失败。

如果 LogMiner 在模式更改后无法可靠地重建 SQL,请检查重做日志以获取证据。查找名称类似于 OBJ# 123456(数字是表的 ist)的表的引用,或名称类似于 COL1COL2 的列的引用。当您将连接器配置为使用 online_catalog 策略时,请采取措施确保表模式及其索引保持静态且无更改。如果 Debezium 连接器配置为使用 online_catalog 模式,并且您必须应用模式更改,请执行以下步骤:

  1. 等待连接器捕获所有现有 ist(DML)。

  2. 执行模式(DDL)更改,然后等待连接器捕获更改。

  3. 恢复对表的数据更改(DML)。

遵循此过程有助于确保 Oracle LogMiner 能够安全地重建所有 ist 的 SQL。

混合方法

您可以通过将 log.mining.strategy 配置属性的值设置为 hybrid 来启用此策略。此策略的目标是提供 redo_log_catalog 策略的可靠性以及 online_catalog 策略的性能和低开销,而不会产生任一策略的缺点。

hybrid 策略通过主要在 online_catalog 模式下运行来工作,这意味着 Debezium Oracle 连接器首先将事件重建委托给 Oracle LogMiner。如果 Oracle LogMiner 成功重建了 SQL,Debezium 将正常处理该事件,就好像它被配置为使用 online_catalog 策略一样。如果连接器检测到 Oracle LogMiner 无法重建 SQL,则连接器会尝试直接使用该表的模式历史记录来重建 SQL。只有当 Oracle LogMiner 和连接器都无法重建 SQL 时,连接器才会报告失败。

如果 lob.enabled 属性设置为 true,则不能使用 hybrid 挖掘策略。如果您需要流式传输 CLOBBLOBXML 数据,则只能使用 online_catalogredo_log_catalog 策略。

使用下游挖掘数据库

您可以使用 Debezium Oracle 连接器从 Oracle Database 挖掘数据库捕获 ist。日志挖掘操作会消耗系统资源并增加数据库负载,这可能导致性能下降。将连接器配置为在辅助挖掘数据库上执行更改数据捕获处理,可以减轻主数据库的负载。

此功能处于孵化状态。这意味着确切的语义、配置选项和其他详细信息可能会在将来基于反馈进行修改。请告诉我们您的具体要求,或者在使用此功能时遇到任何问题。

先决条件
  • Oracle 下游挖掘数据库已配置并与主实例同步。

  • 下游挖掘数据库可以访问主数据库实例生成的外部字典文件。

  • 连接器配置包括以下主要属性:

    log.mining.strategy

    此属性设置为 dictionary_from_file

    internal.log.mining.read.only

    此属性设置为 true 以启用从下游挖掘数据库捕获 ist,而不是从主实例捕获。

    log.mining.read.only.hostname

    指定要从中读取和捕获事务日志的下游挖掘数据库的主机名。由于连接器使用此下游挖掘实例来捕获 ist,因此在 Debezium 发出更改事件之前,必须将 ist 从主实例复制到下游实例。

    log.mining.path.dictionary

    指定 Oracle LogMiner 可以访问主 Oracle 实例导出的数据字典文件的下游挖掘数据库服务器上的绝对文件路径。下游数据库上的 LogMiner 进程必须能够访问指定路径。

工作原理

当 Debezium Oracle 连接器配置为从下游挖掘数据库捕获 ist 时,连接器将按以下方式运行:

  1. 主 Oracle 实例生成一个包含数据库模式和元数据信息的外部字典文件。

  2. 此字典文件可在指定路径访问下游挖掘数据库。

  3. 连接器连接到下游挖掘数据库,并使用具有外部字典文件的 LogMiner 读取事务日志。

  4. 从下游挖掘数据库捕获 ist,而不会影响主数据库性能。

重要注意事项

  • 确保网络和文件访问设置允许挖掘数据库访问主数据库上的字典文件和重做日志。

  • 使主实例上的字典文件保持最新,以确保它能正确反映模式更改。

  • 为了防止模式更改检测出现问题,请使挖掘数据库上的字典文件与主数据库上的文件保持同步。

查询模式

Debezium Oracle 连接器默认与 Oracle LogMiner 集成。此集成需要一套专门的步骤,包括生成复杂的 JDBC SQL 查询,将事务日志中记录的 ist 作为更改事件进行摄取。JDBC SQL 查询使用的 V$LOGMNR_CONTENTS 视图没有任何索引来提高查询性能,因此存在不同的查询模式,这些模式控制如何生成 SQL 查询以提高查询执行效率。

可以配置 log.mining.query.filter.mode 连接器属性为以下之一,以影响 JDBC SQL 查询的生成方式:

none

(默认) 此模式创建一个仅基于数据库级别的不同操作类型(如插入、更新或删除)进行筛选的 JDBC 查询。在基于模式、表或用户名包含/排除列表筛选 ist 时,此操作在连接器内的处理循环中完成。

此模式通常适用于从更改不饱和的小表数据库捕获 ist。生成的查询非常简单,主要侧重于以低数据库开销尽可能快地读取。

in

此模式创建一个 JDBC 查询,该查询不仅在数据库级别筛选操作类型,还筛选模式、表和用户名包含/排除列表。查询谓词使用基于包含/排除列表配置属性中指定的值的 SQL in 子句生成。

此模式通常适用于从更改饱和的大表数据库捕获大量 ist。生成的查询比 none 模式复杂得多,侧重于减少网络开销并在数据库级别执行尽可能多的筛选。

最后,不要将正则表达式作为模式和表包含/排除配置属性的一部分。使用正则表达式将导致连接器不匹配这些配置属性的 ist,从而导致 ist 丢失。

regex

此模式创建一个 JDBC 查询,该查询不仅在数据库级别筛选操作类型,还筛选模式、表和用户名包含/排除列表。但是,与 in 模式不同,此模式使用 Oracle REGEXP_LIKE 运算符生成 SQL 查询,具体取决于指定包含还是排除的值,使用合取或析取。

此模式通常适用于捕获可以使用少量正则表达式标识的可变数量的表。生成的查询比任何其他模式都复杂得多,侧重于减少网络开销并在数据库级别执行尽可能多的筛选。

事件缓冲

Oracle 按发生的顺序将所有 ist 写入重做日志,包括后来被回滚丢弃的 ist。因此,来自不同事务的并发 ist 会交织在一起。当连接器首次读取 ist 流时,由于它无法立即确定哪些 ist 已提交或已回滚,因此它会暂时将更改事件存储在内部缓冲区中。提交更改后,连接器会将缓冲区中的更改事件写入 Kafka。连接器会丢弃被回滚丢弃的更改事件。

您可以通过设置 log.mining.buffer.type 属性来配置连接器使用的缓冲机制。

默认缓冲区类型使用 memory 配置。在默认的 memory 设置下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 memory 缓冲区设置,请确保分配给 Java 进程的内存量能够容纳您环境中的长时间运行和大型事务。

Infinispan

Debezium Oracle 连接器还可以配置为使用 Infinispan 作为其缓存提供程序,支持本地嵌入式模式或远程服务器集群的缓存存储。为了使用 Infinispan,log.mining.buffer.type 必须使用 infinispan_embeddedinfinispan_remote 进行配置。

为了在 Infinispan 缓存配置方面提供灵活性,连接器在使用 Infinispan 缓冲事件数据时,需要提供一系列缓存配置属性。请参阅 log.mining.buffer.infinispan.cache 命名空间中的 配置属性。这些配置属性的内容取决于连接器是与远程 Infinispan 集群集成还是使用嵌入式引擎。

例如,以下显示了使用 Infinispan 嵌入式模式时事务缓存属性的嵌入式配置示例:

<local-cache name="transactions">
  <persistence passivation="false">
    <file-store read-only="false" preload="true" shared="false">
      <data path="./data"/>
      <index path="./index"/>
    </file-store>
  </persistence>
</local-cache>

深入查看配置,缓存被配置为持久化的。所有缓存都应如此配置,以避免在事务进行中时连接器重新启动时丢失事务事件。此外,缓存所在的位置由 path 属性定义,并且应为所有可能的运行时环境可访问的共享位置。

Infinispan 缓冲区实现使用了多个具有不同名称的缓存配置。应该为 transactionseventsprocessed-transactionsschema-changes 定义缓存。每个配置都可以根据您的性能需求进行调整,或者除了缓存名称外,还可以与其他配置相同。

将 XML 配置作为 JSON 连接器属性值提供时,必须省略换行符或将其替换为 \n 字符。

另一个示例,以下显示了使用 Infinispan 集群配置的相同缓存:

<distributed-cache name="transactions" statistics="true">
  <encoding media-type="application/x-protostream" />
  <persistence passivation="false">
   <file-store read-only="false" preload="true" shared="false">
     <data path="./data"/>
     <index path="./index"/>
   </file-store>
  </persistence>
</distributed-cache>

与前面的示例中的嵌入式本地缓存配置一样,此配置也定义为持久化的。所有缓存都应如此配置,以避免在事务进行中时连接器重新启动时丢失事务事件。

但是,有几点值得注意。首先,缓存被定义为分布式缓存而不是本地缓存。其次,缓存被定义为使用 application/x-protostream 编码,这是所有 Debezium 缓存所必需的。最后,在文件存储定义中不需要 path 属性,因为 Infinispan 集群会自动处理此问题。

Infinispan 缓冲区类型被认为是孵化状态;缓存格式可能在版本之间发生变化,并且可能需要重新快照。迁移说明将指示是否需要这样做。

此外,在移除使用 Infinispan 缓冲区的 Debezium Oracle 连接器时,持久化的缓存文件不会自动从磁盘中删除。如果新的连接器部署将使用相同的缓冲区位置,则应在部署新连接器之前手动删除文件。

Infinispan Hotrod 客户端集成

Debezium Oracle 连接器利用 Hotrod 客户端与 Infinispan 集群进行通信。任何以 log.mining.buffer.infinispan.client. 为前缀的连接器属性都将使用 infinispan.client. 命名空间直接传递给 Hotrod 客户端,从而可以完全自定义客户端如何与集群交互。

在使用此 Infinspan 模式时,至少必须提供一项必需的配置属性:

log.mining.buffer.infinispan.client.hotrod.server_list

指定 Infinispan 服务器主机名和端口组合的列表,格式为 <hostname>:<port>

SCN 差距检测

当 Debezium Oracle 连接器配置为使用 LogMiner 时,它通过使用基于系统更改号 (SCN) 的起始和结束范围来收集 Oracle 的 ist。连接器会自动管理此范围,根据连接器是否能够近乎实时地流式传输 ist,或者由于数据库中的大型事务或批量事务的数量而必须处理 ist 的积压,来增加或减少范围。

在某些情况下,Oracle 数据库会以异常高的量推进 SCN,而不是以恒定的速率增加 SCN 值。SCN 值的这种跳跃可能是由于特定集成与数据库的交互方式,或者由于热备份等事件。

Debezium Oracle 连接器依赖于以下配置属性来检测 SCN 差距并调整挖掘范围。

log.mining.scn.gap.detection.gap.size.min

指定最小差距大小。

log.mining.scn.gap.detection.time.interval.max.ms

指定最大时间间隔。

连接器首先比较当前 SCN 与当前挖掘范围中最高 SCN 之间的 ist 数量差异。如果当前 SCN 值与最高 SCN 值之间的差大于最小差距大小,则连接器可能检测到了 SCN 差距。为了确认是否存在差距,连接器接下来比较当前 SCN 的时间戳和前一个挖掘范围结束时的 SCN 的时间戳。如果时间戳之间的差小于最大时间间隔,则 SCN 差距的存在得到确认。

当发生 SCN 差距时,Debezium 连接器会自动使用当前 SCN 作为当前挖掘会话范围的结束点。这使得连接器能够快速赶上实时事件,而无需挖掘介于两者之间的较小范围(因为 SCN 值意外大幅增加,返回的 ist 为空)。当连接器响应 SCN 差距执行上述步骤时,它会忽略 log.mining.batch.size.max 属性指定的值。在连接器完成挖掘会话并赶上实时事件后,它将恢复强制执行最大日志挖掘批处理大小。

SCN 差距检测仅在连接器正在运行并处理近乎实时事件时发生大型 SCN 增量时可用。

低变更频率偏移管理

Debezium Oracle 连接器会在连接器偏移量中跟踪系统更改号,以便在连接器重新启动时,它可以从中断处继续。这些偏移量是每个发出的更改事件的一部分;但是,当数据库 ist 频率较低时(每隔几小时或几天),偏移量可能会过时,如果系统更改号不再存在于事务日志中,则会导致连接器无法成功重启。

对于使用非 CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms 以强制连接器定期发出心跳事件,从而使偏移量保持同步。

对于使用 CDB 模式连接到 Oracle 的连接器,保持同步更加复杂。不仅必须设置 heartbeat.interval.ms,还需要设置 heartbeat.action.query。必须同时指定这两个属性,因为在 CDB 模式下,连接器专门跟踪 PDB 内部的 ist。需要一个辅助机制来触发可插入数据库内的 ist。定期,心跳操作查询会导致连接器在可插入数据库中插入新表行或更新现有行。Debezium 检测表 ist 并为其发出更改事件,确保偏移量保持同步,即使在处理 ist 频率较低的可插入数据库中也是如此。

为了使连接器能够使用 heartbeat.action.query 处理非连接器用户帐户拥有的表,您必须授予连接器用户在这些表上运行必要的 INSERTUPDATE 查询的权限。

数据更改事件

Oracle 连接器发出的每个数据更改事件都有一个键和一个值。键和值结构取决于更改事件源自的表。有关 Debezium 如何构造主题名称的信息,请参阅 主题名称

Debezium Oracle 连接器确保所有 Kafka Connect *模式名称* 都是有效的 Avro 模式名称。这意味着逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])开头,逻辑服务器名称的其余字符以及模式和表名称中的所有字符必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器会自动用下划线字符替换无效字符。

当多个逻辑服务器名称、模式名称或表名称之间的唯一区分字符无效,并且这些字符被替换为下划线时,可能会出现意外的命名冲突。

Debezium 和 Kafka Connect 是围绕*连续的事件消息流*设计的。但是,这些事件的结构可能会随时间而变化,这对于主题使用者来说可能很难处理。为了便于处理可变事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值都有两部分:*模式*和*有效负载*。模式描述了有效负载的结构,而有效负载包含实际数据。

连接器可以捕获由用户启动或应用程序级别的操作引起的 DDL 和 DML 更改。它还可以捕获源自 SYSSYSTEM 帐户的 DML 更改。

更改事件键

对于每个已更改的表,更改事件键的结构是,在创建事件时,存在一个字段对应表的主键(或唯一键约束)中的每个列。

例如,在 inventory 数据库模式中定义的 customers 表可能具有以下更改事件键:

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

如果 <topic.prefix>.transaction 配置属性的值设置为 server1,则数据库中 customers 表中发生的每个更改事件的 JSON 表示形式具有以下键结构:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

键的 schema 部分包含一个 Kafka Connect 模式,该模式描述了键部分的 ist。在前一个示例中,payload 值不是可选的,其结构由名为 server1.DEBEZIUM.CUSTOMERS.Key 的模式定义,并且有一个类型为 int32 的必需字段 id。键的 payload 字段的值表明它确实是一个结构(在 JSON 中就是一个对象),其中包含一个 id 字段,其值为 1004

因此,您可以将此键解释为描述数据库模式 inventory.customers(来自名为 server1 的连接器的 ist)中 id 主键列值为 1004 的行。

更改事件值

更改事件消息中值的结构反映了消息中更改事件键的结构,并同时包含*模式*部分和*有效负载*部分。

更改事件值有效负载

更改事件值有效负载中的*信封*结构包含以下字段:

op (操作)

一个必需字段,其中包含描述操作类型的字符串值。Oracle 连接器更改事件值有效负载中的 op 字段包含以下值之一:c(创建或插入)、u(更新)、d(删除)或 r(读取,表示快照)。

before

一个可选字段,如果存在,则描述事件发生*之前*行的状态。其结构由 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述,该模式是 server1 连接器用于 inventory.customers 表中的所有行的。

after

一个可选字段,如果存在,则包含更改发生*之后*行的状态。其结构由用于 before 字段的 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述。

source (源)

一个必需字段,其中包含描述事件源元数据的结构。对于 Oracle 连接器,该结构包括以下字段:

  • Debezium 版本。

  • 连接器名称。

  • 事件是否属于正在进行的快照。

  • 事务 ID(快照不包含)。

  • 与数据库在事务提交时分配的 SCN(系统更改号)相关的以下值:

scn

数据库用于跟踪事务的唯一标识符。

start_scn

事务开始的 SCN。

start_ts_ms

事务开始的时间。

commit_ts_ms

事务提交的时间。

  • 表示连接器处理事件的时间戳,基于运行 Kafka Connect 任务的 JVM 的系统时钟。对于快照,时间戳表示快照发生的时间。
    连接器在以下字段中以不同的精度报告时间戳值:

ts_ms

以毫秒为单位提供时间戳。

ts_us

以微秒为单位提供时间戳。

ts_ns

以纳秒为单位提供时间戳。

  • 执行更改的用户名

  • 与行关联的 ROWID

    commit_scn 字段是可选的,它描述了更改事件参与其中的事务提交的 SCN。仅在使用 LogMiner 连接适配器时才存在此字段。

    仅在使用 LogMiner 连接适配器时才填充 user_name 字段。

ts_ms

一个可选字段,如果存在,则包含连接器处理事件的时间(基于运行 Kafka Connect 任务的 JVM 中的系统时钟)。

更改事件值的模式

事件消息值的*模式*部分包含一个模式,该模式描述了有效负载的信封结构及其内部的嵌套字段。

create 事件

以下示例显示了在更改事件键示例中描述的 customers 表的*create* 事件值的 ist:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "row_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "commit_ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "start_scn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "start_ts_ms"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "3.3.0.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "ts_us": 1520085154000000,
            "ts_ns": 1520085154000000000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 1520085154000,
            "start_scn": "2122185",
            "start_ts_ms": 1520085154000
        },
        "op": "c",
        "ts_ms": 1532592105975,
        "ts_us": 1532592105975741,
        "ts_ns": 1532592105975741582
    }
}

在前面的示例中,请注意事件如何定义以下模式:

  • 信封(server1.DEBEZIUM.CUSTOMERS.Envelope)。

  • source 结构(io.debezium.connector.oracle.Source,它特定于 Oracle 连接器并在所有事件中重用)。

  • beforeafter 字段的表特定模式。

beforeafter 字段的模式名称形式为 <logicalName>.<schemaName>.<tableName>.Value,因此与所有其他表的模式完全独立。因此,当您使用Avro 转换器时,每个逻辑源中的表的 Avro 模式都有其自己的演进和历史记录。

此事件*值*的 payload 部分提供了有关事件的信息。它描述了一个行被创建(op=c),并显示 after 字段值包含插入到行的 IDFIRST_NAMELAST_NAMEEMAIL 列中的值。

默认情况下,事件的 JSON 表示形式比它们描述的行要大得多。较大的尺寸是由于 JSON 表示形式同时包含消息的模式和有效负载部分。您可以使用Avro 转换器来减小连接器写入 Kafka 主题的消息的大小。

update 事件

以下示例显示了一个update 更改事件,该事件是从与前面create 事件相同的表中捕获的。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "3.3.0.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "ts_us": 1520085811000000,
            "ts_ns": 1520085811000000000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 152008581100,
            "start_scn": "2125544",
            "start_ts_ms": 152008581100
        },
        "op": "u",
        "ts_ms": 1532592713485,
        "ts_us": 1532592713485152,
        "ts_ns": 1532592713485152954,
    }
}

有效负载的结构与create(insert)事件的有效负载相同,但以下值不同:

  • op 字段的值为 u,表示此行因更新而更改。

  • before 字段显示了行在update 数据库提交之前的先前状态。

  • after 字段显示了行的更新状态,其中 EMAIL 值现在设置为 anne@example.com

  • source 字段的结构包括与之前相同的字段,但值不同,因为连接器是从重做日志中的不同位置捕获的事件。

  • ts_ms 字段显示了表示 Debezium 处理事件时间的 ist。

payload 部分揭示了其他几个有用的信息。例如,通过比较 beforeafter 结构,我们可以确定行是如何因提交而更改的。source 结构提供了有关 Oracle 对此更改的记录的信息,提供了可追溯性。它还让我们了解此事件与其他主题中的事件以及其他主题中的事件相比何时发生。它是否发生在另一个事件之前、之后,或作为同一提交的一部分?

当行的主键/唯一键列被更新时,行的键值会发生变化。因此,Debezium 在此类更新后会发出三个事件:

  • 一个 DELETE 事件。

  • 一个墓碑事件,其中包含行的旧键。

  • 一个 INSERT 事件,提供行的新键。

delete 事件

以下示例显示了与前面createupdate 事件示例相同的表中*delete* 事件。delete 事件的 schema 部分与那些事件的 schema 部分相同。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "3.3.0.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "ts_us": 1520085153000000,
            "ts_ns": 1520085153000000000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 1520085153000,
            "start_scn": "2122184",
            "start_ts_ms": 1520085153000
        },
        "op": "d",
        "ts_ms": 1532592105960,
        "ts_us": 1532592105960854,
        "ts_ns": 1532592105960854693
    }
}

createupdate 事件的有效负载相比,事件的 payload 部分显示了几个不同之处:

  • op 字段的值为 d,表示该行已被删除。

  • before 字段显示了被数据库提交删除的行的先前状态。

  • after 字段的值为 null,表示该行已不存在。

  • source 字段的结构包括createupdate 事件中存在的许多键,但 ts_msscntxId 字段中的值不同。

  • ts_ms 显示了表示 Debezium 处理此事件时间的 ist。

delete 事件为消费者提供了处理此行删除所需的 ist。

Oracle 连接器的事件设计用于与Kafka 日志压缩配合使用,这允许删除一些旧消息,只要保留每个键的至少最新消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并可用于重新加载基于键的状态。

当行被删除时,前面示例中显示的*delete* 事件值仍可与日志压缩配合使用,因为 Kafka 可以删除使用相同键的所有早期消息。消息值必须设置为 null 以指示 Kafka 删除*所有共享相同键的消息*。为了实现这一点,默认情况下,Debezium 的 Oracle 连接器总是在*delete* 事件之后发送一个特殊的*墓碑*事件,该事件具有相同的键但值为 null。您可以通过设置连接器属性 tombstones.on.delete 来更改默认行为。

truncate 事件

截断更改事件表示表已被截断。在这种情况下,消息键为 null,消息值如下所示:

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { (1)
            "version": "3.3.0.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "ts_us": 1638974535000000,
            "ts_ns": 1638974535000000000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "commit_ts_ms": 1638974538971,
            "start_scn": "13234396",
            "start_ts_ms": 1638974527000
        },
        "op": "t", (2)
        "ts_ms": 1638974558961, (3)
        "ts_us": 1638974558961987, (3)
        "ts_ns": 1638974558961987251, (3)
        "transaction": null
    }
}
表 9. *truncate* 事件值字段描述
Item Field name (字段名) 描述

1

source (源)

描述事件源元数据的

必需字段。在截断事件值中,source 字段结构与

同一表的创建更新删除事件相同,提供了此元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 模式名称。

  • 如果事件是快照的一部分(*truncate* 事件始终为 false

  • 执行操作的事务 ID。

  • 操作的 SCN

  • 数据库中发生更改的时间戳

  • 执行更改的用户名

2

op (操作)

描述操作类型的

必需字符串。op 字段值为 t,表示此表被截断。

3

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

如果单个 TRUNCATE 操作影响多个表,连接器会为每个被截断的表发出一个截断更改事件记录。

截断事件表示对整个表进行的更改,并且没有消息键。因此,对于具有多个分区的

主题,对于与表相关的更改事件(创建更新等)或截断事件,没有排序保证。例如,如果使用者从多个分区读取表的事件,它可能会在从另一个分区接收到删除表中所有数据的截断事件后,从一个分区接收到表的更新事件。仅保证使用单个分区的主题的排序。

如果您不希望连接器捕获truncate 事件,请使用 skipped.operations 选项将其过滤掉。

数据类型映射

当 Debezium Oracle 连接器检测到表行值更改时,它会发出表示该更改的更改事件。每个更改事件记录的结构都与原始表相同,事件记录包含一个字段对应每个列的值。表列的数据类型决定了连接器如何表示更改事件字段中的列值,如以下各节的表中所示。

对于表中的每个列,Debezium 将源数据类型映射到相应的事件字段中的*字面类型*,并在某些情况下映射到*语义类型*。

字面类型

使用以下 Kafka Connect 模式类型之一描述值如何被字面表示:INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT

语义类型

通过使用字段的 Kafka Connect 模式名称,描述 Kafka Connect 模式如何捕获字段的*含义*。

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

对于某些 Oracle 大型对象(CLOB、NCLOB 和 BLOB)和数字数据类型,您可以通过更改默认配置属性设置来操作连接器执行类型映射的方式。有关 Debezium 属性如何控制这些数据类型的映射的 ist,请参阅 二进制和字符 LOB 类型数字类型

支持更多数据类型计划在后续版本中提供。请为任何可能缺失的特定类型提交一个JIRA 问题

不支持的数据类型

Debezium Oracle 连接器不支持 Oracle 中使用的所有数据类型。连接器无法处理包含以下类型数据的列:

  • BFILE(二进制文件)

  • BOOLEAN。有关更多信息,请参阅 布尔类型

  • UPDATE 和 DELETE 语句 FROM 子句中的直接联接

  • 数据使用场景领域

  • 基于 JavaScript 的存储过程

  • LONG

  • LONG RAW

  • ROWID(XStream 不支持)

  • 表值构造函数(TVC)

  • UROWID(通用 ROWID)

  • VECTOR

如果连接器尝试捕获包含不支持数据类型的列的事件,它将跳过该事件,然后继续处理后续事件。连接器继续处理配置用于捕获的其他列和表,但不支持的列中的数据将丢失。

字符类型

下表描述了连接器如何映射基本字符类型。

表 10. Oracle 基本字符类型的映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

CHAR[(M)]

STRING

n/a

NCHAR[(M)]

STRING

n/a

NVARCHAR2[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

VARCHAR2[(M)]

STRING

n/a

二进制和字符 LOB 类型

下表描述了连接器如何映射二进制和字符大型对象(LOB)数据类型。

表 11. Oracle 二进制和字符 LOB 类型的映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BFILE

n/a

此数据类型不受支持

BLOB

BYTES

根据连接器配置中 binary.handling.mode 属性的设置,连接器将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)

  • base64 编码的字符串

  • base64-url-safe 编码的字符串

  • 十六进制编码的字符串

CLOB

STRING

n/a

LONG

n/a

此数据类型不受支持。

LONG RAW

n/a

此数据类型不受支持。

NCLOB

STRING

n/a

RAW

n/a

根据连接器配置中 binary.handling.mode 属性的设置,连接器将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)

  • base64 编码的字符串

  • base64-url-safe 编码的字符串

  • 十六进制编码的字符串

Oracle 仅在 SQL 语句中显式设置或更改 CLOBNCLOBBLOB 数据类型时才提供列值。因此,更改事件永远不包含未更改的 CLOBNCLOBBLOB 列的值。相反,它们包含连接器属性 unavailable.value.placeholder 定义的占位符。

如果更新了 CLOBNCLOBBLOB 列的值,则新值将放入相应更新更改事件的 after 元素中。before 元素包含不可用的值占位符。

数字类型

下表描述了 Debezium Oracle 连接器如何映射数字类型。

您可以通过更改连接器的 decimal.handling.mode 配置属性的值来修改连接器映射 Oracle DECIMALNUMBERNUMERICREAL 数据类型的方式。当属性设置为默认值 precise 时,连接器将这些 Oracle 数据类型映射到 Kafka Connect org.apache.kafka.connect.data.Decimal 逻辑类型,如表中所示。当属性值设置为 doublestring 时,连接器会为某些 Oracle 数据类型使用备用映射。有关更多信息,请参阅下表中*语义类型和注释*列。

表 12. Oracle 数字数据类型的映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BINARY_FLOAT

FLOAT32

n/a

BINARY_DOUBLE

FLOAT64

n/a

DECIMAL[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

org.apache.kafka.connect.data.Decimal(如果使用 BYTES

等同于 NUMBER 处理(请注意 S 默认为 0 用于 DECIMAL)。

decimal.handling.mode 属性设置为 double 时,连接器将 DECIMAL 值表示为模式类型为 FLOAT64 的 Java double 值。

decimal.handling.mode 属性设置为 string 时,连接器将 DECIMAL 值表示为格式化的字符串表示形式,模式类型为 STRING

DOUBLE PRECISION

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

FLOAT[(P)]

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

INTEGER, INT

BYTES

org.apache.kafka.connect.data.Decimal

INTEGER 在 Oracle 中映射到 NUMBER(38,0),因此可以存储大于任何 INT 类型的值。

NUMBER[(P[, *])]

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode 属性设置为 double 时,连接器将 NUMBER 值表示为模式类型为 FLOAT64 的 Java double 值。

decimal.handling.mode 属性设置为 string 时,连接器将 NUMBER 值表示为格式化的字符串表示形式,模式类型为 STRING

NUMBER(P, S <= 0)

INT8 / INT16 / INT32 / INT64

精度为 0 的 NUMBER 列表示整数。负精度表示 Oracle 中的舍入,例如,精度为 -2 会四舍五入到百位。

根据精度和精度,将选择以下匹配的 Kafka Connect 整数类型之一:

  • P - S < 3, INT8

  • P - S < 5, INT16

  • P - S < 10, INT32

  • P - S < 19, INT64

  • P - S >= 19, BYTESorg.apache.kafka.connect.data.Decimal

decimal.handling.mode 属性设置为 double 时,连接器将 NUMBER 值表示为模式类型为 FLOAT64 的 Java double 值。

decimal.handling.mode 属性设置为 string 时,连接器将 NUMBER 值表示为格式化的字符串表示形式,模式类型为 STRING

NUMBER(P, S > 0)

BYTES

org.apache.kafka.connect.data.Decimal

NUMERIC[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

org.apache.kafka.connect.data.Decimal(如果使用 BYTES

等同于 NUMBER 处理(请注意 S 默认为 0 用于 NUMERIC)。

decimal.handling.mode 属性设置为 double 时,连接器将 NUMERIC 值表示为模式类型为 FLOAT64 的 Java double 值。

decimal.handling.mode 属性设置为 string 时,连接器将 NUMERIC 值表示为格式化的字符串表示形式,模式类型为 STRING

SMALLINT

BYTES

org.apache.kafka.connect.data.Decimal

SMALLINT 在 Oracle 中映射到 NUMBER(38,0),因此可以存储大于任何 INT 类型的值。

REAL

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode 属性设置为 double 时,连接器将 REAL 值表示为模式类型为 FLOAT64 的 Java double 值。

decimal.handling.mode 属性设置为 string 时,连接器将 REAL 值表示为格式化的字符串表示形式,模式类型为 STRING

如上所述,Oracle 允许 NUMBER 类型具有负精度。在转换为 Avro 格式时,当数字表示为 Decimal 时,这可能会导致问题。Decimal 类型包含精度信息,但Avro 规范只允许正值的精度。根据使用的模式注册表,这可能会导致 Avro 序列化失败。为了避免此问题,您可以使用 NumberToZeroScaleConverter,它会将具有负精度的足够高的数字(P - S >= 19)转换为零精度的 Decimal 类型。可以如下配置:

converters=zero_scale
zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
zero_scale.decimal.mode=precise

默认情况下,数字被转换为 Decimal 类型(zero_scale.decimal.mode=precise),但为了完整起见,仍然支持其他两种支持的类型(doublestring)。

布尔类型

从 Oracle 23 开始,数据库为 BOOLEAN 数据类型提供了原生支持。但是,Debezium 连接器 for Oracle 不支持此类型。

早期版本的 Oracle 不包含对布尔类型的原生支持。在早期版本中,一些用户采用其他数据类型并加上特定语义来模拟逻辑 BOOLEAN 数据类型的概念。

为了使您能够将源列转换为布尔数据类型,Debezium 提供了一个 NumberOneToBooleanConverter 自定义转换器,您可以通过以下方式之一使用它:

  • 将所有 NUMBER(1) 列映射到 BOOLEAN 类型。

  • 使用逗号分隔的正则表达式列表枚举一组列。
    要使用此类型的转换,您必须在 converters 配置属性中设置 selector 参数,如下例所示:

    converters=boolean
    boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
    boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVED

时间类型

除了 Oracle INTERVALTIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE 数据类型之外,连接器如何转换时间类型取决于 time.precision.mode 配置属性的值。

time.precision.mode 配置属性设置为 adaptive(默认值)时,连接器会根据列的数据类型定义来确定时间类型的字面类型和语义类型,以便事件*精确*表示数据库中的值。

Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT64

io.debezium.time.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

INTERVAL DAY[(M)] TO SECOND

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式(平均每月天数)计算的时间间隔的微秒数。

io.debezium.time.Interval(当 interval.handling.mode 设置为 string 时)

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

INTERVAL YEAR[(M)] TO MONTH

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式(平均每月天数)计算的时间间隔的微秒数。

io.debezium.time.Interval(当 interval.handling.mode 设置为 string 时)

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

TIMESTAMP(0 - 3)

INT64

io.debezium.time.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP, TIMESTAMP(4 - 6)

INT64

io.debezium.time.MicroTimestamp

表示自 UNIX 纪元以来的微秒数,不包含时区信息。

TIMESTAMP(7 - 9)

INT64

io.debezium.time.NanoTimestamp

表示自 UNIX 纪元以来的纳秒数,不包含时区信息。

带时区的时间戳

STRING

io.debezium.time.ZonedTimestamp

带时区信息的时间戳的字符串表示。

带本地时区的时间戳

STRING

io.debezium.time.ZonedTimestamp

UTC 时间戳的字符串表示。

time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者仅了解内置的 Kafka Connect 逻辑类型且无法处理可变精度的时间值时,这可能很有用。由于 Oracle 支持的精度级别超过了 Kafka Connect 中逻辑类型支持的精度级别,因此,如果将 time.precision.mode 设置为 connect,当数据库列的小数秒精度值大于 3 时,将导致精度损失

Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

org.apache.kafka.connect.data.Date

表示自 UNIX 纪元以来的天数。

INTERVAL DAY[(M)] TO SECOND

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式(平均每月天数)计算的时间间隔的微秒数。

io.debezium.time.Interval(当 interval.handling.mode 设置为 string 时)

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

INTERVAL YEAR[(M)] TO MONTH

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式(平均每月天数)计算的时间间隔的微秒数。

io.debezium.time.Interval(当 interval.handling.mode 设置为 string 时)

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

TIMESTAMP(0 - 3)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP(4 - 6)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP(7 - 9)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

带时区的时间戳

STRING

io.debezium.time.ZonedTimestamp

带时区信息的时间戳的字符串表示。

带本地时区的时间戳

STRING

io.debezium.time.ZonedTimestamp

UTC 时间戳的字符串表示。

ROWID 类型

下表描述了连接器如何映射 ROWID(行地址)数据类型。

表 13. Oracle ROWID 数据类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

ROWID

STRING

使用 Oracle XStream 时不支持此数据类型。

UROWID

n/a

此数据类型不受支持.

矢量类型

Debezium Oracle 连接器不支持 Oracle VECTOR 数据类型。

XML 类型

XMLTYPE 的支持目前处于孵化状态。根据我们收到的反馈,确切的语义、配置选项等在未来的修订版中可能会发生变化。如果您在使用这些数据类型时遇到任何问题,请告知我们。

下表描述了连接器如何映射 XMLTYPE 数据类型。

表 14. Oracle XMLTYPE 数据类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

XMLTYPE

STRING

io.debezium.data.Xml

用户定义类型

Oracle 允许您定义自定义数据类型,以便在内置数据类型不满足您的要求时提供灵活性。有几种用户定义类型,如对象类型、REF 数据类型、Varray 和嵌套表。目前,您不能将 Debezium Oracle 连接器与这些用户定义类型中的任何一个一起使用。

Oracle 提供的类型

Oracle 提供基于 SQL 的接口,您可以在内置或 ANSI 支持的类型不足时使用这些接口来定义新类型。Oracle 提供了几种常用数据类型,可用于各种目的,例如AnySpatial类型。目前,您不能将 Debezium Oracle 连接器与这些数据类型中的任何一个一起使用。

默认值

如果数据库模式中为列指定了默认值,Oracle 连接器将尝试将此值传播到相应 Kafka 记录字段的模式。支持大多数常见数据类型,包括

  • 字符类型 (CHAR, NCHAR, VARCHAR, VARCHAR2, NVARCHAR, NVARCHAR2)

  • 数值类型 (INTEGER, NUMERIC, 等)

  • 时间类型 (DATE, TIMESTAMP, INTERVAL, 等)

如果时间类型使用函数调用(如 TO_TIMESTAMPTO_DATE)来表示默认值,连接器将通过执行额外的数据库调用来解析默认值以评估函数。例如,如果 DATE 列的默认值定义为 TO_DATE('2021-01-02', 'YYYY-MM-DD'),则该列的默认值将是自 UNIX 纪元以来该日期的天数,在此情况下为 18629

如果时间类型使用 SYSDATE 常量来表示默认值,连接器将根据列定义为 NOT NULL 还是 NULL 来解析该值。如果列可空,则不会设置默认值;但是,如果列不可空,则默认值将解析为 0(对于 DATETIMESTAMP(n) 数据类型)或 1970-01-01T00:00:00Z(对于 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE 数据类型)。默认值类型将是数值类型,除非该列是 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE,在这种情况下,它将被视为字符串发出。

自定义转换器

默认情况下,Debezium Oracle 连接器提供了一些特定于 Oracle 数据类型的 CustomConverter 实现。这些自定义转换器根据连接器配置为特定数据类型提供替代映射。要将 CustomConverter 添加到连接器,请按照自定义转换器文档中的说明进行操作。

Debezium Oracle 连接器提供以下自定义转换器

NUMBER(1) 转布尔值

从版本 23 开始,Oracle 数据库提供了 BOOLEAN 逻辑数据类型。在早期版本中,数据库通过使用 NUMBER(1) 数据类型模拟 BOOLEAN 类型,其中 0 表示 false,1 表示 true。

默认情况下,当 Debezium 发出使用 NUMBER(1) 数据类型的源列的更改事件时,它会将数据转换为 INT8 字面量类型。如果 NUMBER(1) 数据类型的默认映射不满足您的需求,您可以将连接器配置为在使用这些列时发出它们时使用逻辑 BOOL 类型,如以下示例所示。

示例:NumberOneToBooleanConverter 配置
converters=number-to-boolean
number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
number-to-boolean.selector=.*.MY_TABLE.DATA

在上例中,selector 属性是可选的。selector 属性指定一个正则表达式,该正则表达式标识转换器应用的表或列。如果省略 selector 属性,则当 Debezium 发出事件时,所有具有 NUMBER(1) 数据类型的列都将被转换为使用逻辑 BOOL 类型的字段。

NUMBER 转零比例

Oracle 支持使用负比例创建 NUMBER 列,即 NUMBER(-2)。并非所有系统都能处理负比例值,因此这些值可能导致管道出现处理问题。例如,由于 Apache Avro 不支持这些值,如果 Debezium 将事件转换为 Avro 格式,可能会出现问题。同样,不支持这些值的下游消费者也可能遇到错误。

配置示例
converters=number-zero-scale
number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
number-zero-scale.decimal.mode=precise

在上例中,decimal.mode 属性指定连接器如何发出十进制值。此属性是可选的。如果省略 decimal.mode 属性,转换器将默认为使用 PRECISE 十进制处理模式。

RAW 转字符串

尽管 Oracle 不推荐使用某些数据类型(如 RAW),但旧系统可能仍会使用这些类型。默认情况下,Debezium 将 RAW 列类型作为逻辑 BYTES 发出,这是一种允许存储二进制或文本数据类型的类型。

在某些情况下,RAW 列可能以字节序列的形式存储字符数据。为了方便消费者使用,您可以将 Debezium 配置为使用 RawToStringConverterRawToStringConverter 提供了一种轻松定位这些 RAW 列并以字符串而非字节的形式发出值的方法。以下示例显示了如何将 RawToStringConverter 添加到连接器配置中。

示例:RawToStringConverter 配置
converters=raw-to-string
raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter
raw-to-string.selector=.*.MY_TABLE.DATA

在上例中,selector 属性允许您定义一个正则表达式,该正则表达式指定转换器处理的表或列。如果省略 selector 属性,转换器会将所有 RAW 列类型映射到逻辑 STRING 字段类型。

设置 Oracle

以下步骤是设置 Oracle 以便与 Debezium Oracle 连接器配合使用所必需的。这些步骤假定使用了多租户配置,其中包含一个容器数据库和至少一个可插拔数据库。如果您不打算使用多租户配置,则可能需要调整以下步骤。

有关使用 Vagrant 在虚拟机中设置 Oracle 的信息,请参阅Debezium Vagrant Box for Oracle database GitHub 存储库。

与 Oracle 安装类型的兼容性

Oracle 数据库可以作为独立实例安装,也可以使用 Oracle Real Application Cluster (RAC) 进行安装。Debezium Oracle 连接器与这两种类型的安装兼容。

捕获更改事件时排除的模式

当 Debezium Oracle 连接器捕获表时,它会自动排除以下模式下的表

  • appqossys

  • audsys

  • ctxsys

  • dvsys

  • dbsfwuser

  • dbsnmp

  • qsmadmin_internal

  • lbacsys

  • mdsys

  • ojvmsys

  • olapsys

  • orddata

  • ordsys

  • outln

  • sys

  • system

  • vecsys (Oracle 23+)

  • wmsys

  • xdb

要使连接器能够捕获表的更改,该表必须使用未在上述列表中命名的模式。

捕获更改事件时排除的表

当 Debezium Oracle 连接器捕获表时,它会自动排除符合以下规则的表

  • 匹配模式 CMP[3|4]$[0-9]+ 的压缩顾问表。

  • 匹配模式 SYS_IOT_OVER_% 的索引组织表。

  • 匹配模式 MDRT_%MDRS_%MDXT_% 的空间表。

  • 嵌套表

要使连接器能够捕获名称与上述任何规则匹配的表,您必须重命名该表。

准备数据库

Oracle LogMiner 所需的配置
ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

Oracle AWS RDS 不允许您执行上述命令,也不允许您以 sysdba 身份登录。AWS 提供以下命令来配置 LogMiner。在执行这些命令之前,请确保您的 Oracle AWS RDS 实例已启用备份。

要确认 Oracle 已启用备份,请先执行以下命令。LOG_MODE 应显示 ARCHIVELOG。如果不是,您可能需要重启 Oracle AWS RDS 实例。

Oracle AWS RDS LogMiner 所需的配置
SQL> SELECT LOG_MODE FROM V$DATABASE;

LOG_MODE
------------
ARCHIVELOG

一旦 LOG_MODE 设置为 ARCHIVELOG,请执行命令以完成 LogMiner 配置。第一个命令将数据库设置为归档日志,第二个命令添加补充日志记录。

Oracle AWS RDS LogMiner 所需的配置
exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);

exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');

要使 Debezium 能够捕获已更改数据库行的之前状态,您还必须为捕获的表或整个数据库启用补充日志记录。以下示例说明了如何为 inventory.customers 单个表的所有列配置补充日志记录。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

启用所有表列的补充日志记录会增加 Oracle 重做日志的卷。为防止日志大小过度增长,请选择性地应用上述配置。

必须在数据库级别启用最小补充日志记录,并且可以按如下方式进行配置。

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

重做日志大小

根据数据库配置,重做日志的大小和数量可能不足以实现可接受的性能。在设置 Debezium Oracle 连接器之前,请确保重做日志的容量足以支持数据库。

数据库的重做日志容量必须足以存储其数据字典。通常,数据字典的大小随着数据库中表和列数量的增加而增加。如果重做日志容量不足,数据库和 Debezium 连接器都可能遇到性能问题。

请咨询您的数据库管理员,以评估数据库是否需要增加日志容量。

归档日志目标

Oracle 数据库管理员最多可以配置 31 个不同的归档日志目标。管理员可以为每个目标设置参数,将其指定用于特定用途,例如,用于物理备用数据库的日志传输,或用于外部存储以允许扩展日志保留。Oracle 在 V$ARCHIVE_DEST_STATUS 视图中报告有关归档日志目标的信息。

Debezium Oracle 连接器仅使用状态为 VALID 且类型为 LOCAL 的目标。如果您的 Oracle 环境包含多个满足这些条件的可用目标,请咨询您的 Oracle 管理员以确定 Debezium 应使用哪个归档日志目标。

过程
  • 要指定 Debezium 要使用的归档日志目标,请在连接器配置中设置 archive.destination.name 属性。

    例如,假设一个数据库配置了两个归档目标路径,/path/one/path/two,并且 V$ARCHIVE_DEST_STATUS 表将这些路径与在 DEST_NAME 列中指定的目標名称相关联。如果两个目标都满足 Debezium 的条件 — 即,如果它们的 statusVALIDtypeLOCAL — 那么要将连接器配置为使用数据库写入 /path/two 的归档日志,请将 archive.destination.name 的值设置为 V$ARCHIVE_DEST_STATUS 表中与 /path/two 关联的 DEST_NAME 列中的值。例如,如果 DEST_NAME 对于 /path/twoLOG_ARCHIVE_DEST_3,您将按如下方式配置 Debezium:

{
  "archive.destination.name": "LOG_ARCHIVE_DEST_3"
}

不要将 archive.destination.name 的值设置为数据库用于归档日志的路径。将属性设置为 V$ARCHIVE_DEST_STATUS 表中满足您归档日志保留策略的行的 DEST_NAME 列中的归档日志目标名称。

如果您的 Oracle 环境包含多个满足这些条件的可用目标,而您未能指定首选目标,则 Debezium Oracle 连接器将随机选择目标路径。由于为每个目标配置的保留策略可能不同,这可能导致错误,如果连接器选择了一个已被删除请求日志数据的路径。

在 Debezium Oracle 连接器与备用数据库使用与主实例不同的归档目标名称的环境中使用时,archive.destination.name 属性可以包含一个逗号分隔的目标名称列表。值应按优先级排序,因为 Debezium 连接器将使用第一个 LOCALVALID 的目标。这对于在故障转移场景中备用数据库成为新的主数据库时最小化所需的重新配置非常有用。

例如,如果主数据库的目标是 LOG_ARCHIVE_DEST_2,备用数据库的目标是 LOG_ARCHIVE_DEST_5,则将 archive.destination.name 属性设置为 LOG_ARCHIVE_DEST_2,LOG_ARCHIVE_DEST_5。这将优先使用目标 #2,它将在主数据库上使用,目标 #5 将在备用数据库上使用。

如果主实例和备用实例的 CDC 目标相同,则使用单个值就足够了。事实上,只有当任一实例有一个以上同时是 LOCALVALID 的归档目标时,才需要此属性。

创建用于连接器的用户帐户

要使 Debezium Oracle 连接器能够捕获更改事件,它必须以具有特定权限的 Oracle LogMiner 用户身份运行。

通常,当您为连接器创建 Oracle 帐户时,您会授予该帐户访问所有数据库表的权限,允许连接器检测所有表的更改。然而,在某些环境中,安全策略可能不允许您授予如此广泛的访问权限。

以下示例显示了在多租户数据库模型中创建连接器 Oracle 用户帐户的 SQL。示例中的授权设置允许 Debezium 用户访问数据库中的所有用户表。

为了遵守安全策略,您可以修改 SELECT ANY TABLEFLASHBACK ANY TABLE 授权,以便连接器只能访问您打算捕获的表。

不要修改其他授权,例如 SELECT ANY TRANSACTION 授权,或提供对动态性能视图(V$)访问的 SELECT ON V_$ 授权集。连接器正常运行需要这些授权。

为防止数据丢失,如果您限制了 SELECTFLASHBACK 授权的范围,请确保修改后的范围与连接器的 include 配置中的设置兼容。您为帐户设置的权限必须允许从连接器要捕获的所有表中读取。

示例 3. 创建连接器的 LogMiner 用户
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba

  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; (1)
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; (2)
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; (3)
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; (4)
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; (5)
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; (6)
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; (7)
  GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; (8)
  GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; (9)

  GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; (10)
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; (11)
  GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; (12)

  GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; (13)
  GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; (14)

  GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; (15)
  GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; (16)
  GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; (17)
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; (18)
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; (19)
  GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; (20)
  GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; (21)
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; (22)
  GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; (23)

  GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL; (24)
  GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL; (25)

  exit;
表 15. 权限/授权说明
Item 角色名称 描述

1

CREATE SESSION

使连接器能够连接到 Oracle。

2

SET CONTAINER

使连接器能够在可插拔数据库之间切换。仅当启用了容器数据库 (CDB) 支持时,才需要此项。

3

SELECT ON V_$DATABASE

使连接器能够读取 V$DATABASE 表。

4

FLASHBACK ANY TABLE

使连接器能够执行 Flashback 查询,这是连接器执行数据初始快照的方式。或者,您可以为特定表授予 FLASHBACK 权限,而不是为所有表授予 FLASHBACK 权限。

5

SELECT ANY TABLE

使连接器能够读取任何表。或者,您可以为特定表授予 SELECT 权限,而不是为所有表授予 SELECT 权限。

6

SELECT_CATALOG_ROLE

使连接器能够读取数据字典,这是 Oracle LogMiner 会话所必需的。

7

EXECUTE_CATALOG_ROLE

使连接器能够将数据字典写入 Oracle 重做日志,这是跟踪模式更改所必需的。

8

SELECT ANY TRANSACTION

使快照过程能够针对任何事务执行 Flashback 快照查询,以便连接器可以从 LogMiner 读取过去的更改。当授予 FLASHBACK ANY TABLE 时,也应授予此权限。对于 Oracle 12c 及更高版本,此授权是可选的。

在这些较新版本中,连接器通过 EXECUTE_CATALOG_ROLELOGMINING 授权获取所需权限。

9

LOGMINING

此角色是在较新版本的 Oracle 中添加的,用于授予对 Oracle LogMiner 及其包的完全访问权限。在没有此角色的较旧版本的 Oracle 中,您可以忽略此授权。

10

CREATE TABLE

使连接器能够在其默认表空间中创建其冲刷表。冲刷表允许连接器显式控制将 LGWR 内部缓冲区冲刷到磁盘。

11

LOCK ANY TABLE

使连接器能够在模式快照期间锁定表。如果通过配置显式禁用快照锁定,则可以安全地忽略此授权。

12

CREATE SEQUENCE

使连接器能够在其默认表空间中创建序列。

13

EXECUTE ON DBMS_LOGMNR

使连接器能够运行 DBMS_LOGMNR 包中的方法。这是与 Oracle LogMiner 交互所必需的。在较新版本的 Oracle 中,这通过 LOGMINING 角色授予,但在较旧版本中,必须显式授予。

14

EXECUTE ON DBMS_LOGMNR_D

使连接器能够运行 DBMS_LOGMNR_D 包中的方法。这是与 Oracle LogMiner 交互所必需的。在较新版本的 Oracle 中,这通过 LOGMINING 角色授予,但在较旧版本中,必须显式授予。

15 到 25

SELECT ON V_$…​.

使连接器能够读取这些表。连接器必须能够读取有关 Oracle 重做和归档日志以及当前事务状态的信息,以便准备 Oracle LogMiner 会话。没有这些授权,连接器将无法运行。

备用数据库

备用数据库提供主实例的同步副本。在主数据库发生故障的情况下,备用数据库可提供持续可用性和灾难恢复。Oracle 使用物理和逻辑备用数据库。

物理备用

物理备用数据库是主生产数据库的精确、逐块复制副本,其系统更改号 (SCN) 值与主数据库相同。Debezium Oracle 连接器无法直接从物理备用数据库捕获更改事件,因为物理备用数据库不接受外部连接。只有在备用数据库转换为主数据库后,连接器才能从物理备用数据库捕获事件。然后,连接器像连接任何主数据库一样连接到前备用数据库。

逻辑备用

逻辑备用数据库包含与主数据库相同的数据,但数据可能以不同的物理方式存储。逻辑备用数据库中的 SCN 偏移量与主数据库中的偏移量不同。您可以配置 Debezium Oracle 连接器以从逻辑备用数据库捕获更改

故障转移数据库

设置故障转移数据库时,最佳实践通常是使用物理备用数据库而不是逻辑备用数据库。物理备用数据库与主数据库保持比逻辑备用数据库更一致的状态。物理备用数据库包含主数据的精确副本,备用数据库的系统更改号 (SCN) 值与主数据库相同。在 Debezium 环境中,在数据库故障转移到物理备用数据库后,一致的 SCN 值可确保连接器找到最后处理的 SCN 值。

物理备用数据库处于只读模式,并运行托管恢复以保持同步。当数据库处于备用模式时,它不接受来自客户端的外部 JDBC 连接,外部应用程序也无法访问它。

在发生故障事件后,为了让 Debezium 连接到前物理备用数据库,DBA 必须执行一些操作以启用故障转移到备用数据库,并将其提升为主数据库。以下列表标识了一些关键操作。

  • 取消备用数据库的托管恢复。

  • 完成活动恢复过程。

  • 将备用数据库转换为主角色。

  • 将新主数据库打开以供客户端读写操作。

在原物理备用数据库可供正常使用后,您可以将 Debezium Oracle 连接器配置为连接到它。要使连接器能够从新主数据库捕获,请编辑连接器配置中的数据库主机名,将原始主数据库的主机名替换为新主数据库的主机名。

配置 Debezium Oracle 连接器以从逻辑备用数据库捕获事件

当 Debezium Oracle 连接器连接到主数据库时,它使用内部冲刷表来管理 Oracle Log Writer Buffer (LGWR) 进程的冲刷周期。冲刷过程要求连接器访问数据库的用户帐户具有创建和写入此冲刷表的权限。但是,逻辑备用数据库通常只允许只读访问,这会阻止连接器写入数据库。您可以修改连接器配置以启用连接器从逻辑备用数据库捕获事件,或者 DBA 可以创建一个新的可写表空间,连接器可以在其中存储冲刷表。

与逻辑备用数据库一起使用连接器的能力处于孵化状态,并且可能会在没有通知的情况下发生变化。有一个开放的Jira 问题需要调查支持从物理备用数据库捕获更改。

过程
  • 要使 Debezium 能够从 Oracle 只读逻辑备用数据库捕获事件,请将以下属性添加到连接器配置中,以禁用冲刷表的创建和管理。

    internal.log.mining.read.only=true

    上述设置阻止数据库创建和更新 LOG_MINING_FLUSH 表。您可以使用 internal.log.mining.read.only 属性与 Oracle Standalone 数据库或 Oracle RAC 安装。

扩展最大字符串大小

max_string_size 数据库参数控制 Oracle 数据库,进而控制 Debezium Oracle 连接器如何解释 VARCHAR2NVARCHAR2RAW 字段的值。默认值 STANDARD 表示这些数据类型的长度与 Oracle 12c 之前的版本(VARCHAR2NVARCHAR2 为 4000 字节,RAW 为 2000 字节)保持一致。当配置为 EXTENDED 时,这些列现在允许存储最多 32767 字节的数据。

虽然数据库管理员可以将 max_string_sizeSTANDARD 更改为 EXTENDED,但反向操作不允许。一旦数据库更新为 EXTENDED 字符串支持,就无法撤销。

对于 Debezium Oracle 连接器,当数据库参数 max_string_sizeEXTENDED 时,应将 lob.enabled 连接器配置选项设置为 true,以捕获对字符串长度超过 4000 字节的 VARCHAR2NVARCHAR2 字段或超过 2000 字节的 RAW 字段所做的更改。

当设置为 EXTENDED 时,Oracle 在数据传输过程中隐式转换字符串数据,当字符串数据的字节长度超过旧的最大值时。这种隐式转换意味着 Oracle 内部将字符串数据视为 CLOB,因此您将受益于将字段视为外部世界的常规字符串,但同时也存在数据库层存储的所有陷阱和注意事项。

由于 Oracle 在内部将这些字符串视为 CLOB,因此重做日志还反映了 Debezium Oracle 连接器需要知道并应挖掘的几种独特的操作类型。由于这些操作类型非常类似于 CLOB 操作,因此必须以与任何其他 LOB 类型相同的方式挖掘重做日志条目,这就是为什么 lob.enabled 必须设置为 true 以捕获重做日志中的更改,而无论字符串数据的字节长度如何。

当 Oracle 配置为使用 EXTENDED 字符串大小时,LogMiner 有时在重建扩展字符串字段的 SQL 时无法转义单引号(')。如果扩展字符串字段的字节长度不超过旧的最大长度,则可能发生此问题。结果,这些字段中的值可能会被截断,导致无效的 SQL 语句,Oracle 连接器无法解析。

为了帮助解决其中一些问题,您可以将连接器配置为放宽单引号检测,方法是将以下属性设置为 true

internal.log.mining.sql.relaxed.quote.detection

请注意,此内部设置的使用目前不受支持。
有关更多信息,请参阅DBZ-8034

部署

要部署 Debezium Oracle 连接器,您需要安装 Debezium Oracle 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件
过程
  1. 下载 Debezium Oracle 连接器插件存档

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 从 Maven Central 下载Oracle 的 JDBC 驱动程序,并将下载的驱动程序文件解压到包含 Debezium Oracle 连接器 JAR 文件的目录。

    如果您将 Debezium Oracle 连接器与 Oracle XStream 结合使用,请在 Oracle Instant Client 包中获取 JDBC 驱动程序。有关更多信息,请参阅获取 Oracle JDBC 驱动程序和 XStream API 文件
  4. 从 Maven Central 下载Oracle 的 XDB 库,并将下载的文件解压到包含 Debezium Oracle 连接器 JAR 文件的目录。

  5. 将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path

  6. 重新启动您的 Kafka Connect 进程以加载新 JAR 文件。

Debezium Oracle 连接器配置

通常,您通过提交一个指定连接器配置属性的 JSON 请求来注册 Debezium Oracle 连接器。以下示例显示了一个 JSON 请求,用于向端口 1521 处的逻辑名称为 server1 的实例注册 Debezium Oracle 连接器。

您可以选择为数据库中的模式和表的子集生成事件。可选地,您可以忽略、掩盖或截断包含敏感数据、大于指定大小或您不需要的列。

示例:Debezium Oracle 连接器配置
{
    "name": "inventory-connector",  (1)
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",  (2)
        "database.hostname" : "<ORACLE_IP_ADDRESS>",  (3)
        "database.port" : "1521",  (4)
        "database.user" : "c##dbzuser",  (5)
        "database.password" : "dbz",   (6)
        "database.dbname" : "ORCLCDB",  (7)
        "topic.prefix" : "server1",  (8)
        "tasks.max" : "1",  (9)
        "database.pdb.name" : "ORCLPDB1",  (10)
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", (11)
        "schema.history.internal.kafka.topic": "schema-changes.inventory"  (12)
    }
}
1 您在向 Kafka Connect 服务注册连接器时分配给连接器的名称。
2 此 Oracle 连接器类的名称。
3 Oracle 数据库实例的地址。
4 Oracle 数据库实例的端口号。
5 Oracle 用户名,如创建连接器用户中所述。
6 创建连接器用户中所述,连接到 Oracle 数据库服务器时使用的密码。
7 要从中捕获更改的数据库名称。
8 标识并为连接器捕获更改的 Oracle 数据库服务器提供命名空间的 Topic 前缀。
9 为此连接器创建的最大任务数。
10 连接器从中捕获更改的 Oracle 可插拔数据库的名称。仅在容器数据库 (CDB) 安装中使用。
11 此连接器用于读取和恢复 DDL 语句到数据库模式历史主题的 Kafka 代理列表。
12 连接器读取和恢复 DDL 语句的数据库模式历史主题的名称。此主题仅供内部使用,使用者不应使用。

在上例中,database.hostnamedatabase.port 属性用于定义到数据库主机的连接。然而,在更复杂的 Oracle 部署或使用 TNS (Transparent Network Substrate) 名称的部署中,您可以使用另一种方法,即指定 JDBC URL。

以下 JSON 示例显示了与前一个示例相同的配置,但它使用 JDBC URL 连接到数据库。

示例:使用 JDBC URL 连接到数据库的 Debezium Oracle 连接器配置
{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

有关您可以为 Debezium Oracle 连接器设置的配置属性的完整列表,请参阅Oracle 连接器属性

您可以将此配置与 POST 命令发送到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个连接器任务,该任务执行以下操作:

  • 连接到 Oracle 数据库。

  • 读取重做日志。

  • 将更改事件记录到 Kafka 主题。

添加连接器配置

要开始运行 Debezium Oracle 连接器,请创建连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件
过程
  1. 为 Oracle 连接器创建配置

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

连接器启动后,它将执行 Oracle 数据库的一致性快照,该数据库已配置为连接器捕获更改。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka Topic。

可插拔与非可插拔数据库

Oracle Database 支持以下部署类型:

容器数据库 (CDB)

一个可以包含多个可插拔数据库 (PDB) 的数据库。数据库客户端以连接到每个 PDB 的方式连接,就像连接到标准非 CDB 数据库一样。

非容器数据库 (non-CDB)

一个标准的 Oracle 数据库,不支持创建可插拔数据库。

示例:CDB 部署的 Debezium 连接器配置
{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

当您为 Oracle CDB 配置 Debezium Oracle 连接器时,必须为 database.pdb.name 属性指定一个值,该属性命名了您希望连接器从中捕获更改的 PDB。对于非 CDB 安装,请不要指定 database.pdb.name 属性。

示例:非 CDB 部署的 Debezium Oracle 连接器配置
{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

使用 mTLS 安全连接

使用 mTLS (mutual TLS) 身份验证连接到 Oracle 时,这涉及到连接器和数据库服务都证明自己的身份。Debezium for Oracle 连接器依赖于 Oracle JDBC 驱动程序的内置功能来支持 mTLS 身份验证。

您可以使用以下任一方法在 Debezium 和 Oracle 之间建立 mTLS 连接:

Java 密钥/信任库

先决条件
  • 验证连接器是否可以访问配置的密钥库和信任库文件。

  • 验证数据库 TNS (Transparent Network Substrate) 监听器是否支持 TCPS 安全连接。

过程
  • 按以下示例所示配置 Debezium Oracle 连接器。

    示例:Debezium Oracle 连接器 TLS 配置
    {
      "database.url": "jdbc:oracle:thin@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcps)(HOST=<host>)(PORT=<port>))(CONNECT_DATA(SERVICE_NAME=<service>)))",
      "driver.javax.net.ssl.keyStore": "<path-to-jks-keystore>",
      "driver.javax.net.ssl.keyStorePassword": "<keystore-password>",
      "driver.javax.net.ssl.keyStoreType": "JKS",
      "driver.javax.net.ssl.trustStore": "<path-to-jks-truststore>",
      "driver.javax.net.ssl.trustStorePassword": "<truststore-password>",
      "driver.javax.net.ssl.trustStoreType": "JKS"
    }

    Debezium 要使用 TLS 加密与 Oracle 通信,需要与服务器建立 TCPS 连接。要建立 TCPS 连接,您必须将连接器配置为使用 database.url 属性,而不是 database.host 属性。与 database.host 属性不同,database.url 属性允许您定义一个显式要求使用 TCPS 协议的 Oracle TNS (Transparent Network Substrate) 连接字符串。

Oracle Wallet

先决条件
  • 与您的数据库管理员确认 Oracle Wallet 已在 Oracle 数据库服务器上配置。

  • 在 Oracle JDBC 驱动程序存档中找到 oraclepki.jar

过程
  • oraclepki.jar 安装在包含 Debezium Oracle 连接器 JAR 文件的位置。这是您安装 Oracle JDBC 驱动程序的位置。

  • 在连接器配置中,将 database.url 属性设置为连接数据库的方式,而不是 database.hostname 属性。database.url 属性定义了一个与 Oracle Wallet 交互所需的基于 TNS 的连接字符串。TNS 连接字符串的示例,请参阅此列表后的 mTLS 配置示例。

  • 将 Oracle JDBC 驱动程序属性 oracle.net.wallet_location 设置为显式设置 Oracle JDBC 驱动程序使用的 Oracle Wallet 配置。

mTLS 配置示例
{
  "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))",
  "driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))"
}

请确保在 database.url 中设置了正确的主机、端口和服务标识符 (sid)。此外,请确保 driver.oracle.net.wallet_location 中的目录是可读的。

连接器属性

Debezium Oracle 连接器有许多配置属性,您可以用于实现适合您应用程序的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

必需的 Debezium Oracle 连接器配置属性

以下配置属性是必需的,除非有默认值可用。

属性

Default (默认值)

描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。)

无默认值

连接器的 Java 类名。对于 Oracle 连接器,始终使用 io.debezium.connector.oracle.OracleConnector 作为值。

无默认值

列出连接器可以使用的自定义转换器的符号名称的逗号分隔列表。
例如,boolean
需要此属性才能使连接器能够使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

boolean.type: io.debezium.connector.oracle.converters.NumberOneToBooleanConverter

如果要进一步控制已配置转换器的行为,可以添加一个或多个配置参数将值传递给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。

例如,要定义一个 selector 参数来指定 boolean 转换器处理的列子集,请添加以下属性:

boolean.selector: .*MYTABLE.FLAG,.*.IS_ARCHIVED

1

为此连接器创建的最大任务数。Oracle 连接器始终使用单个任务,因此不使用此值,所以默认值始终是可接受的。

无默认值

Oracle 数据库服务器的 IP 地址或主机名。

无默认值

Oracle 数据库服务器的整数端口号。

无默认值

连接器用于连接到 Oracle 数据库服务器的 Oracle 用户帐户的名称。

无默认值

连接到 Oracle 数据库服务器时使用的密码。

无默认值

要连接的数据库的名称。在容器数据库环境中,请指定根容器数据库 (CDB) 的名称,而不是包含的可插拔数据库 (PDB) 的名称。

无默认值

指定原始数据库 JDBC URL。使用此属性灵活定义该数据库连接。有效值包括原始 TNS 名称和 RAC 连接字符串。

无默认值

要连接的 Oracle 可插拔数据库的名称。仅在容器数据库 (CDB) 安装中使用此属性。

无默认值

Topic 前缀,为连接器从中捕获更改的 Oracle 数据库服务器提供命名空间。设置的值将用作连接器发出的所有 Kafka Topic 名称的前缀。请指定一个在您的 Debezium 环境中所有连接器之间唯一的 Topic 前缀。有效字符包括:字母数字字符、连字符、点和下划线。

请勿更改此属性的值。如果您更改名称值,重新启动后,连接器将不会继续将事件发送到原始主题,而是会将后续事件发送到名称基于新值的那些主题。连接器也无法恢复其数据库模式历史主题。

logminer

连接器在流式传输数据库更改时使用的适配器实现。您可以设置以下值:

logminer (默认)

连接器使用本地 Oracle LogMiner API,并具有连接器级别的缓冲。

logminer_unbuffered

连接器使用本地 Oracle LogMiner API,并具有数据库级别的缓冲。

olr

连接器使用 OpenLogReplicator。

xstream

连接器使用 Oracle XStream API。

initial (初始)

指定连接器在捕获表快照时使用的模式。您可以设置以下值:

always (始终)

快照包括捕获表的结构和数据。每次启动连接器时,请指定此值以使用捕获表中数据的完整表示形式填充 Topic。

initial (初始)

快照包括捕获表的结构和数据。如果快照成功完成,下次启动连接器时将不再执行快照。

initial_only (仅初始)

快照包括捕获表的结构和数据。连接器执行初始快照然后停止,而不处理任何后续更改。

schema_only (仅模式)

已弃用,请参阅 no_data

no_data (无数据)

快照仅包含捕获表的结构。如果您希望连接器仅捕获快照后的更改数据,请指定此值。

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

这是一个连接器已捕获更改的恢复设置。当您重新启动连接器时,此设置可实现损坏或丢失的数据库模式历史记录 Topic 的恢复。您可能需要定期设置它来“清理”一个意外增长的数据库模式历史记录 Topic。数据库模式历史记录 Topic 需要无限保留。请注意,此模式只有在保证从连接器关闭之前的时间点和拍摄快照的时间点之间没有发生任何模式更改时才是安全的。

快照完成后,连接器将继续从数据库的重做日志中读取更改事件,除非 snapshot.mode 配置为 initial_only

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

使用此选项,您可以通过一组以 'snapshot.mode.configuration.based' 前缀开头的连接器属性来控制快照行为。

custom (自定义)

连接器根据 snapshot.mode.custom.name 属性指定的实现执行快照,该属性定义了 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。

有关更多信息,请参阅snapshot.mode 选项表

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表数据。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表模式。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定快照完成后连接器是否开始流式传输更改事件。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定在模式历史记录主题不可用时,连接器是否在快照中包含表模式。

false

如果 snapshot.mode «设置» «为» configuration_based,「此» «属性» «指定» «是否» «在» «事务» «日志» «中» «未» «找到» «最后» «提交» «的» «偏移量» «时»,«连接器» «尝试» «快照» «表» «数据»。
将 «值» «设置» «为» true «以» «指示» «连接器» «执行» «新» «快照»。

无默认值

如果 snapshot.mode 设置为 custom,请使用此设置指定 'io.debezium.spi.snapshot.Snapshotter' 接口中定义的 name() 方法提供的自定义实现名称。连接器重启后,Debezium 会调用指定的自定义实现来确定是否执行快照。有关更多信息,请参阅自定义快照程序 SPI

shared

控制连接器持有表锁的时间。表锁可防止在连接器执行快照期间发生某些类型的更改操作。您可以设置以下值:

shared

允许并发访问表,但阻止任何会话获得独占表锁。连接器在捕获表模式时会获取 ROW SHARE 级别的锁。

none

«防止» «连接器» «在» «快照» «期间» «获取» «任何» «表» «锁»。仅当 «创建» «快照» «期间» «可能» «不会» «发生» «模式» «更改» «时» «才» «使用» «此» «设置»。

custom (自定义)

连接器根据 snapshot.locking.mode.custom.name 属性指定的实现执行快照,该属性是 io.debezium.spi.snapshot.SnapshotLock 接口的自定义实现。

无默认值

snapshot.locking.mode 设置为 custom 时,使用此设置指定由“io.debezium.spi.snapshot.SnapshotLock”接口定义的 name() 方法提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all

连接器«默认» «执行» «一个» select all «查询»,「并» «可选» «地» «根据» «列» «包含» «和» «排除» «列表» «配置» «调整» «所选» «的» «列»。

custom (自定义)

连接器根据 snapshot.query.mode.custom.name 属性指定的实现执行快照查询,该属性定义了 io.debezium.spi.snapshot.SnapshotQuery 接口的自定义实现。

此设置允许您比使用 snapshot.select.statement.overrides 属性更灵活地管理快照内容。

无默认值

snapshot.query.mode 设置为 custom 时,使用此设置指定由 'io.debezium.spi.snapshot.SnapshotQuery' 接口定义的自定义实现。有关更多信息,请参阅自定义快照程序 SPI

连接器 table.include.list 属性中指定的所有表。

一个可选的、逗号分隔的正则表达式列表,匹配要包含在快照中的表的完全限定名称 (<databaseName>.<schemaName>.<tableName>)。

在多租户容器数据库 (CDB) 环境中,正则表达式必须包含可插拔数据库 (PDB) 名称,格式为 <pdbName>.<schemaName>.<tableName>

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
在使用 LogMiner 实现的环境中,只能使用 POSIX 正则表达式。

快照只能包含连接器 table.include.list 属性中命名的表。

此属性仅在连接器的 snapshot.mode 属性设置为 never 以外的值时生效。
此属性不影响增量快照的行为。

无默认值

指定要在快照中包含的表行。如果您希望快照仅包含表中一部分行,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

此属性包含一个逗号分隔的完全限定表名列表,格式为 <schemaName>.<tableName>。例如:

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

对于列表中的每个表,添加一个额外的配置属性,该属性指定连接器在拍摄快照时在该表上运行的 SELECT 语句。指定的 SELECT 语句决定了要在快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides.<schemaName>.<tableName>

例如,snapshot.select.statement.overrides.customers.orders

示例

从包含软删除列 delete_flagcustomers.orders 表中,如果要快照仅包含未软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器仅包含 delete_flag = 0 的记录。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您捕获更改的模式的名称。在使用 LogMiner 实现的环境中,只能使用 POSIX 正则表达式。未包含在 schema.include.list 中的任何模式名称都将被排除在捕获更改之外。默认情况下,所有非系统模式都会捕获其更改。

为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果您在此配置中包含此属性,则不要同时设置 schema.exclude.list 属性。

false

布尔值,指定连接器是否应解析和发布元数据对象上的表和列注释。启用此选项将对内存使用产生影响。逻辑模式对象的数量和大小在很大程度上决定了 Debezium 连接器消耗多少内存,并且向每个对象添加潜在的大字符串数据可能会非常昂贵。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您想捕获更改的模式的名称。在使用 LogMiner 实现的环境中,只能使用 POSIX 正则表达式。任何名称未包含在 schema.exclude.list 中的模式,除了系统模式外,都会捕获其更改。
如果您在此配置中包含此属性,请不要设置 `schema.include.list` 属性。

为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果您在此配置中包含此属性,请不要设置 `schema.include.list` 属性。

无默认值

一个可选的逗号分隔的正则表达式列表,匹配要捕获的表的完全限定表标识符。如果您使用 LogMiner 实现,请仅将 POSIX 正则表达式用于此属性。设置此属性时,连接器仅捕获指定表的更改。每个表标识符使用以下格式:

<schema_name>.<table_name>

默认情况下,连接器会监视每个捕获数据库中的每个非系统表。

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
如果您在配置中包含此属性,请不要同时设置 table.exclude.list 属性。

无默认值

一个可选的逗号分隔的正则表达式列表,匹配要排除监视的表的完全限定表标识符。如果您使用 LogMiner 实现,请仅将 POSIX 正则表达式用于此属性。连接器捕获列表中未指定的任何表的更改事件。使用以下格式指定每个表的标识符:

<schemaName>.<tableName>.

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
如果您在配置中包含此属性,请不要同时设置 table.include.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您希望包含在更改事件消息值中的列的完全限定名称。在使用 LogMiner 实现的环境中,只能使用 POSIX 正则表达式。列的完全限定名称使用以下格式:

<Schema_name>.<table_name>.<column_name>

主键列始终包含在事件的键中,即使您不使用此属性显式包含其值。

要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配,而不是匹配可能存在于列名中的子字符串。
如果您在此配置中包含此属性,则不要同时设置 column.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您希望从更改事件消息值中排除的列的完全限定名称。在使用 LogMiner 实现的环境中,只能使用 POSIX 正则表达式。完全限定的列名使用以下格式:

<schema_name>.<table_name>.<column_name>

主键列始终包含在事件的键中,即使您使用此属性显式排除其值。

要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配,而不是匹配可能存在于列名中的子字符串。
如果您在此配置中包含此属性,请不要设置 column.include.list 属性。

false

指定当包含的列没有更改时是否跳过发布消息。这实际上会过滤掉没有包含列更改的消息(根据 column.include.listcolumn.exclude.list 属性)。

n/a

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。列的完全限定名称格式为 <schemaName>.<tableName>.<columnName>
要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;表达式不匹配可能存在于列名中的子字符串。
在生成的更改事件记录中,指定列的值将被替换为假名。

假名由应用指定的hashAlgorithmsalt 产生的哈希值组成。根据使用的哈希函数,可以维护引用完整性,同时用假名替换列值。支持的哈希函数在 Java Cryptography Architecture 标准算法名称文档的MessageDigest 部分中进行了描述。

在下面的示例中,CzQMA0cB5K 是随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如果需要,假名会自动截断为列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。

取决于使用的hashAlgorithm、选择的salt 和实际数据集,生成的数据集可能无法完全隐藏。

应使用哈希策略版本 2 来确保在不同位置或系统上对值进行哈希处理时的保真度。

bytes

指定二进制 (blob) 列如何在更改事件中表示,包括:bytes 将二进制数据表示为字节数组(默认),base64 将二进制数据表示为 base64 编码的字符串,base64-url-safe 将二进制数据表示为 base64-url-safe 编码的字符串,hex 将二进制数据表示为十六进制编码(base16)的字符串。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

precise

指定连接器如何处理 NUMBERDECIMALNUMERIC 列的浮点值。您可以选择以下选项之一:

precise (默认)

使用 java.math.BigDecimal 值精确表示,这些值在更改事件中以二进制形式表示。

double

使用 double 值表示。使用 double 值更简单,但可能导致精度损失。

string

将值编码为格式化的字符串。使用 string 选项更容易消费,但会导致实际类型的语义信息丢失。有关更多信息,请参阅数值类型

numeric

指定连接器如何处理 interval 列的值。

numeric 使用大约微秒数表示间隔。

string 使用字符串模式表示 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 精确表示间隔。例如:P1Y2M3DT4H5M6.78S

fail

指定当处理事件时发生异常时连接器的反应。您可以选择以下选项之一:

fail

传播异常(指示有问题事件的偏移量),导致连接器停止。

warn

跳过有问题事件。然后记录有问题事件的偏移量。

skip

跳过有问题事件。

2048

一个正整数值,指定在此连接器每次迭代中处理的事件批次的最大大小。

8192

正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列,然后将其写入 Kafka。当连接器比 Kafka 写入速度快地摄取消息,或者 Kafka 不可用时,阻塞队列可以为从数据库读取事件提供背压。队列中保存的事件在连接器定期记录偏移量时将被忽略。始终将 max.queue.size 的值设置得大于 max.batch.size 的值。

0 (禁用)

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果还设置了 max.queue.size,当队列的大小达到任一属性指定的限制时,写入队列将被阻塞。例如,如果设置 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后,或在队列中的记录量达到 5000 字节后,写入队列将被阻塞。

500 (0.5 秒)

正整数值,指定连接器在每次迭代中等待新更改事件出现的时间(以毫秒为单位)。

true

一个布尔值,指定连接器是否将数据库模式中的更改发布到与主题前缀同名的 Kafka 主题。连接器将每个模式更改记录下来,键包含数据库名称,值为描述模式更新的 JSON 结构。此记录模式更改的机制独立于连接器对数据库模式历史记录更改的内部记录。

true

控制是否在删除事件后跟一个墓碑事件。可能的值如下:

true

对于每次删除操作,连接器会发出一个删除事件和一个后续的墓碑事件。

false

对于每次删除操作,连接器仅发出一个删除事件。

在源记录被删除后,墓碑事件(默认行为)使 Kafka 能够完全删除 Topic 中具有已删除行键的所有事件,这些 Topic 已启用日志压缩

无默认值

«表达式» 列表,用于指定连接器用来为 «发布» 到指定表 «Kafka» «主题» 的更改事件 «记录» «形成» «自定义» «消息» «键» 的 «列»。

默认情况下,Debezium 使用表的 «主键» «列» 作为它发出的 «记录» 的 «消息» «键»。 «代替» 默认值,或为 «缺少» «主键» 的表指定 «键»,您可以根据一个或多个 «列» «配置» «自定义» «消息» «键»。
要为表建立自定义消息键,请列出该表,然后列出用作消息键的列。每个列表条目采用以下格式:

<fullyQualifiedTableName>:<keyColumn>,<keyColumn>

要基于多个列名设置表键,请在列名之间插入逗号。
每个完全限定的表名都是一个正则表达式,格式如下:

<schemaName>.<tableName>

该属性可以包含多个表的条目。使用分号分隔列表中的表条目。
以下示例设置了 inventory.customerspurchase.orders 表的消息键:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

对于表 inventory.customer,列 pk1pk2 被指定为消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 用作消息键。
用于创建自定义消息键的列数没有限制。但是,最好使用唯一键所需的最小数量。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望连接器掩盖一组列的值,请设置此属性(例如,如果它们包含敏感数据)。将 length 设置为正整数,以将指定列中的数据替换为属性名称中由 length 指定的星号(*)字符的数量。将 length 设置为 0(零)以将指定列中的数据替换为空字符串。

完全限定的列名遵循以下格式:<schemaName>.<tableName>.<columnName>。要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;表达式不匹配可能存在于列名称中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

无默认值

一个可选的逗号分隔的正则表达式列表,用于通过用星号 (*) 替换字符来屏蔽更改事件消息中的列名。
在属性名称中指定要替换的字符数,例如,column.mask.with.8.chars
将长度指定为正整数或零。然后,为要应用屏蔽的每个基于字符的列名添加正则表达式到列表中。
使用以下格式指定完全限定的列名:<schemaName>.<tableName>.<columnName>

连接器配置可以包含多个指定不同长度的属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称。设置此属性时,连接器会将以下字段添加到事件记录的模式中:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

列的完全限定名称遵循以下格式之一:<tableName>.<columnName>,或 <schemaName>.<tableName>.<columnName>
为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

无默认值

一个可选的、逗号分隔的 «正则表达式» 列表,用于指定数据库中 «列» 定义的 «数据类型» 的«完全限定»名称。设置此属性时,对于具有匹配 «数据类型» 的 «列»,连接器会发出包含以下额外字段的 «模式» 的事件记录:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

列的完全限定名称遵循以下格式之一:<tableName>.<typeName>,或 <schemaName>.<tableName>.<typeName>
为了匹配 «数据类型» 的名称,Debezium 将您指定的 «正则表达式» 应用为*锚定的* «正则表达式»。也就是说,指定的表达式会针对 «数据类型» 的整个名称字符串进行匹配;该 «表达式» «不会» 匹配可能存在于 «类型» 名称中的子字符串。

Oracle 特定数据类型名称的列表,请参阅Oracle 数据类型映射

0

指定连接器向心跳 Topic 发送消息的频率(以毫秒为单位)。
使用此属性确定连接器是否继续从源数据库接收更改事件。
在没有更改事件在捕获的表中发生很长时间的情况下,设置此属性也很有用。
在这种情况下,尽管连接器继续读取重做日志,但它不发出任何更改事件消息,因此 Kafka Topic 中的偏移量保持不变。由于连接器不冲刷它从数据库读取的最新系统更改号 (SCN),数据库可能会比必要保留重做日志文件。如果连接器重新启动,延长保留期可能会导致连接器重复发送某些更改事件。
默认值 0 会阻止连接器发送任何心跳消息。

无默认值

指定连接器在发送心跳消息时在源数据库上执行的查询。

For example (例如:)

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

连接器在发出心跳消息后运行查询。

设置此属性并创建心跳表以接收心跳消息,以解决 Debezium 在与高流量数据库位于同一主机上的低流量数据库上无法同步偏移量的情况。在连接器将记录插入已配置的表后,它就能够接收来自低流量数据库的更改并确认数据库中的 SCN 更改,从而使偏移量与代理同步。

无默认值

指定连接器启动后等待多长时间(以毫秒为单位)才进行快照。
使用此属性可防止在集群中启动多个连接器时发生快照中断,这可能会导致连接器重新平衡。

0

指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 offset.flush.interval.ms 属性值的延迟值。

10000

指定在进行快照时应从每个表中一次读取的最大行数。连接器以指定大小的多个批次读取表内容。

10000

指定给定查询的每次数据库往返将获取的行数。使用 0 的值将使用 JDBC 驱动程序的默认获取大小。

false

如果您希望 Debezium 生成包含事务边界的事件,并将数据事件的信封丰富为事务元数据,请将属性设置为 true

有关更多详细信息,请参阅事务元数据

null

指定您希望 Debezium Oracle 连接器从中捕获事务日志的下游只读挖掘数据库的主机名。

online_catalog

指定挖掘策略,该策略控制 Oracle LogMiner 如何构建和使用给定的数据字典来解析表和列 ID 到名称。
设置以下选项之一:

redo_log_catalog

将数据字典写入联机重做日志,导致随着时间的推移生成更多归档日志。这还允许跟踪对捕获表的 DDL 更改,因此如果模式更改频繁,这是理想的选择。

online_catalog

使用数据库的当前数据字典解析对象 ID,并且不向联机重做日志写入任何额外信息。这允许 LogMiner 更快地挖掘,但代价是无法跟踪 DDL 更改。如果捕获的表模式更改不频繁或从不更改,这是理想的选择。

hybrid

使用数据库的当前数据字典和 Debezium 内存模式模型相结合,无缝解析表和列名称。此模式的性能与 online_catalog LogMiner 策略相当,同时具有 redo_log_catalog 策略的模式跟踪弹性,而不会产生 redo_log_catalog 策略的归档日志生成开销和性能成本。

dictionary_from_file

连接器使用位于 Oracle 目录对象中的文件的字典。有关更多信息,请参阅从下游挖掘数据库捕获更改

none

指定挖掘查询模式,该模式控制 Oracle LogMiner 查询的构建方式。
设置以下选项之一:

none

查询的生成不进行任何模式、表或用户名的过滤。

in

查询使用标准的 SQL IN 子句过滤数据库端的模式、表和用户名。模式、表和用户名配置的包含/排除列表不应指定任何正则表达式,因为查询是直接使用这些值构建的。

regex

查询使用 Oracle 的 REGEXP_LIKE 运算符在数据库端过滤模式和表名,并使用 SQL IN 子句过滤用户名。模式和表配置的包含/排除列表可以安全地指定正则表达式。

memory

缓冲区类型控制连接器如何管理事务数据的缓冲。

memory - 使用 JVM 进程的堆缓冲所有事务数据。如果您不期望连接器处理大量长期或大型事务,请选择此选项。激活此选项后,缓冲区状态不会在重新启动之间持久化。重新启动后,从当前偏移量的 SCN 值重新创建缓冲区。

infinispan_embedded - 此选项使用嵌入式 Infinispan 缓存来缓冲事务数据并将其持久化到磁盘。

infinispan_remote - 此选项使用远程 Infinispan 集群来缓冲事务数据并将其持久化到磁盘。

0

事务在事务缓冲区中最多可以拥有的事件数。事件计数超过此阈值的事务将不会发出并且会被放弃。默认行为是没有事务事件阈值。

无默认值

Infinispan 全局配置的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 事务缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 事件缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 已处理事务缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 模式更改缓存的 XML 配置。

false

指定在连接器以正常、预期的停止方式停止后是否删除缓冲区状态。

此设置仅影响持久化跨重启状态的缓冲区实现,例如 infinispan
默认行为是缓冲区状态始终在重启之间保留。

仅在测试或开发环境中设置为 true

0

LogMiner 会话可以活动的最长时间(以毫秒为单位),之后将使用新的会话。

对于低流量系统,当同一个会话长时间运行时,LogMiner 会话可能会消耗过多的 PGA 内存。默认行为是仅在检测到日志切换时才使用新的 LogMiner 会话。通过将此值设置为大于 0 的值,可以指定 LogMiner 会话可以活动的最长时间,之后会停止并重新启动以释放和重新分配 PGA 内存。

false

指定在日志切换或挖掘会话达到最大生命周期阈值时,JDBC 连接是否会关闭并重新打开。

默认情况下,JDBC 连接不会在日志切换或最大会话生命周期之间关闭。
如果遇到 LogMiner 导致的 Oracle SGA 过度增长,则应启用此项。

1000

连接器尝试从重做/归档日志中读取的最小 SCN 间隔大小。

100000

连接器在从重做/归档日志读取时使用的最大 SCN 间隔大小。

20000

连接器在从重做/归档日志读取时使用的间隔的增加/减少量。

20000

连接器在从重做/归档日志读取数据时使用的起始 SCN 间隔大小。这还可以作为调整批次大小的度量 - 当当前 SCN 与批次的开始/结束 SCN 之间的差值大于此值时,批次大小会增加/减少。

0

连接器在从重做/归档日志读取数据后,开始再次读取数据之前的最小睡眠时间。值为毫秒。

3000

连接器在从重做/归档日志读取数据后,开始再次读取数据之前的最大睡眠时间。值为毫秒。

1000

连接器在从重做/归档日志读取数据后,开始再次读取数据之前的起始睡眠时间。值为毫秒。

200

在读取 LogMiner 数据时,连接器用于调整最佳睡眠时间的增量(向上或向下)。值为毫秒。

0

从 SYSDATE 开始,要挖掘的归档日志的小时数。使用默认设置 (0) 时,连接器会挖掘所有归档日志。

false

控制连接器是仅从归档日志挖掘更改,还是从在线重做日志和归档日志的组合中挖掘更改(默认)。

重做日志使用循环缓冲区,可以随时进行归档。在在线重做日志频繁归档的环境中,这可能导致 LogMiner 会话失败。与重做日志相比,归档日志保证可靠。将此选项设置为 true 可强制连接器仅挖掘归档日志。将连接器设置为仅挖掘归档日志后,操作提交与连接器发出关联的更改事件之间可能出现的延迟会增加。延迟的程度取决于数据库配置为归档在线重做日志的频率。

10000

连接器在轮询以确定起始系统更改号是否在归档日志中的间隔(毫秒)。如果未启用 log.mining.archive.log.only.mode,则不使用此设置。

0

指定在重做日志切换之间保留长事务的毫秒数的正整数值。设置为 0 时,事务会一直保留,直到检测到提交或回滚。

默认情况下,LogMiner 适配器维护所有正在运行的事务的内存缓冲区。由于事务的所有 DML 操作都会在检测到提交或回滚之前进行缓冲,因此应避免长事务,以免溢出该缓冲区。任何超出此配置值的事务都将被完全丢弃,连接器也不会为该事务中的操作发出任何消息。

无默认值

指定使用 LogMiner 挖掘归档日志时要使用的已配置 Oracle 归档目标。

默认行为会自动选择第一个有效的本地已配置目标。但是,可以通过提供目标名称来使用特定目标,例如 LOG_ARCHIVE_DEST_5

有关在主实例和备用实例使用不同名称的环境中指定归档目标的信息,请参阅归档日志目标

无默认值

从 LogMiner 查询中包含的数据库用户列表。如果您希望捕获进程包含指定用户的更改,则设置此属性可能很有用。

无默认值

从 LogMiner 查询中排除的数据库用户列表。如果您希望捕获进程始终排除特定用户所做的更改,则设置此属性可能很有用。

无默认值

从 LogMiner 查询中包含的 JDBC 连接客户端 ID 列表。如果您希望捕获进程仅包含来自特定 JDBC 连接客户端的更改,则设置此属性可能很有用。

无默认值

从 LogMiner 查询中排除的 JDBC 连接客户端 ID 列表。如果您希望捕获进程始终排除特定 JDBC 连接客户端所做的更改,则设置此属性可能很有用。

1000000

指定连接器将其与当前和先前 SCN 值之间的差值进行比较以确定是否存在 SCN 间隙的值。如果 SCN 值之间的差值大于指定值,并且时间差小于 log.mining.scn.gap.detection.time.interval.max.ms,则会检测到 SCN 间隙,并且连接器将使用大于配置的最大批次的挖掘窗口。

20000

以毫秒为单位指定一个值,连接器将其与当前和先前 SCN 时间戳之间的差值进行比较以确定是否存在 SCN 间隙。如果时间戳之间的差值小于指定值,并且 SCN 增量大于 log.mining.scn.gap.detection.gap.size.min,则会检测到 SCN 间隙,并且连接器将使用大于配置的最大批次的挖掘窗口。

LOG_MINING_FLUSH

指定刷新 Oracle LogWriter Buffer (LGWR) 到重做日志的刷新表的名称。可以使用 <schemaName>.<tableName><tableName> 格式指定此名称。通常,多个连接器可以使用同一个刷新表。但是,如果连接器遇到表锁争用错误,请使用此属性为每个连接器部署指定一个专用表。

false

指定重做日志构造的 SQL 语句是否包含在 source.redo_sql 字段中。使用 XStream 或 OpenLogReplicator 适配器时,将忽略此配置。

false

控制是否在更改事件中发出大对象(CLOB 或 BLOB)列值。

默认情况下,更改事件包含大对象列,但这些列不包含值。处理和管理大对象列类型和有效负载会产生一定的开销。要捕获大对象值并将其序列化到更改事件中,请将此选项设置为 true

__debezium_unavailable_value

指定连接器提供的常量,以指示原始值未更改且未由数据库提供。

无默认值

Oracle Real Application Clusters (RAC) 节点主机名或地址的逗号分隔列表。此字段对于启用与 Oracle RAC 部署的兼容性是必需的。

通过以下方法之一指定 RAC 节点列表

  • database.port 指定一个值,并为 rac.nodes 列表中的每个地址使用指定的端口值。例如

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101
  • database.port 指定一个值,并覆盖列表中一个或多个条目的默认端口。列表可以包括使用默认 database.port 值的条目,以及定义自己唯一端口值的条目。例如

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101:1522

如果您通过 database.url 属性为数据库提供原始 JDBC URL,而不是为 database.port 定义值,则每个 RAC 节点条目都必须显式指定端口值。

t

一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:

  • c (insert/create)

  • u (update)

  • d (delete)

  • t (truncate)

默认情况下,仅跳过 TRUNCATE 操作。

无默认值

用于将信号发送到连接器的集合的完全限定名称。当您将此属性与 Oracle 可插拔数据库 (PDB) 结合使用时,请将其值设置为根数据库的名称。
使用以下格式指定集合名称
<databaseName>.<schemaName>.<tableName>

source (源)

为连接器启用的信号通道名称列表。默认情况下,以下通道可用:

无默认值

为连接器启用的通知通道名称列表。默认情况下,以下通道可用:

1024

在增量快照块期间,连接器获取并加载到内存中的最大行数。增加块大小可以提高效率,因为快照运行的快照查询次数较少但规模更大。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为能为您的环境提供最佳性能的值。

insert_insert

指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
您可以指定以下选项之一:

insert_insert

当您发送信号以启动增量快照时,对于 Debezium 在快照期间读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,Debezium 会插入第二个条目,记录关闭窗口的信号。

insert_delete

当您发送信号以启动增量快照时,对于 Debezium 读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,此条目将被删除。不会为关闭快照窗口的信号创建条目。设置此选项可防止信号数据集合快速增长。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

.

指定主题名称的分隔符,默认为 .

10000

用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。

__debezium-heartbeat

控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

transaction

控制连接器发送事务元数据消息的主题的名称。主题名称的模式为:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

1

指定连接器在执行初始快照时使用的线程数。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会并发处理多个表。

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成工作。如果一个表的快照比其他表的快照花费的时间显著长,则已完成工作的线程将处于空闲状态。在某些环境中,网络设备(如负载均衡器或防火墙)会终止长时间空闲的连接。快照完成后,连接器将无法关闭连接,从而导致异常和快照不完整,即使在连接器成功传输了所有快照数据的情况下也是如此。

如果您遇到此问题,请将 snapshot.max.threads 的值恢复为 1,然后重试快照。

0

指定在发生数据库错误时尝试快照表的重试次数。此配置属性目前仅重试与 ORA-01466 异常相关的失败。默认情况下,不会执行额外的重试。

无默认值

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如:
k1=v1,k2=v2

连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

600000

等待查询执行的时间(以毫秒为单位)。默认为 600 秒 (600,000 毫秒);零表示没有限制。

0

指定连接器可以捕获的最大表数。超过此限制将触发 guardrail.collections.limit.action 指定的操作。将此属性设置为 0 可防止连接器触发保护措施。

warn

指定如果连接器捕获的表数超过 guardrail.collections.max 属性中指定的数量时触发的操作。将属性设置为以下值之一:

fail

连接器失败并报告异常。

warn

连接器记录警告。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

Debezium Oracle 连接器数据库模式历史配置属性

Debezium 提供了一组 schema.history.internal.* 属性来控制连接器如何与模式历史记录主题进行交互。

下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

表 16. 连接器数据库模式历史配置属性
属性 Default (默认值) 描述

无默认值

连接器存储数据库模式历史的 Kafka 主题的完整名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史,以及写入从源数据库读取的每个 DDL 语句。每个对都应指向 Kafka Connect 进程使用的相同 Kafka 集群。

100

一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认为 100 毫秒。

3000

一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。

30000

一个整数值,指定连接器在使用 Kafka 管理客户端创建 Kafka 历史主题时应等待的最大毫秒数。

100

连接器在连接器恢复因错误而失败之前尝试读取持久历史数据的最大次数。在收到无数据后的最长等待时间为 recovery.attempts × recovery.poll.interval.ms

false

一个布尔值,指定连接器是应忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。安全默认值为 false。跳过操作应谨慎使用,因为它可能导致在处理 binlog 时丢失或损坏数据。

false

一个布尔值,指定连接器是记录模式或数据库中所有表的模式结构,还是仅记录指定用于捕获的表的模式结构。
指定以下值之一:

false (默认)

在数据库快照期间,连接器会记录所有非系统表的模式数据,包括未指定用于捕获的表。最好保留默认设置。如果您稍后决定捕获未最初指定用于捕获的表的更改,连接器可以轻松开始从这些表中捕获数据,因为它们的模式结构已存储在模式历史记录主题中。Debezium 需要表的模式历史记录,以便它能够识别更改事件发生时存在的结构。

true

在数据库快照期间,连接器仅记录 Debezium 捕获更改事件的表的表模式。如果您更改默认值,并且稍后配置连接器以捕获数据库中的其他表的数据,则连接器将缺少捕获表中更改事件所需的模式信息。

false

一个布尔值,指定连接器是记录实例中所有逻辑数据库的模式结构。
指定以下值之一:

true

连接器仅记录 Debezium 捕获更改事件的逻辑数据库和模式中的表的模式结构。

false

连接器记录所有逻辑数据库的模式结构。

直通式 Oracle 连接器配置属性

该连接器支持直通式属性,使 Debezium 能够为微调 Apache Kafka 生产者和消费者行为指定自定义配置选项。有关 Kafka 生产者和消费者配置属性的完整范围,请参阅 Kafka 文档

用于配置生产者和消费者客户端如何与模式历史主题交互的直通式属性

Debezium 依赖 Apache Kafka 生产者将模式更改写入数据库模式历史记录主题。同样,它依赖 Kafka 消费者在连接器启动时从数据库模式历史记录主题读取。您可以通过为一组以 schema.history.internal.producer.*schema.history.internal.consumer.* 前缀开头的传递配置属性赋值来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库模式历史记录属性控制一系列行为,例如这些客户端如何与 Kafka 代理建立安全连接,如下例所示:

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium 会从属性名称中剥离前缀,然后再将属性传递给 Kafka 客户端。

有关 Kafka 生产者配置属性Kafka 消费者配置属性 的更多信息,请参阅 Apache Kafka 文档。

用于配置 Oracle 连接器如何与 Kafka 信号主题交互的直通式属性

Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了 Kafka signal 属性。

表 17. Kafka 信号配置属性
属性 Default (默认值) 描述

<topic.prefix>-signal

连接器监视的用于临时信号的 Kafka 主题的名称。

如果自动主题创建被禁用,则必须手动创建所需的信号主题。信号主题对于保留信号顺序是必需的。信号主题必须只有一个分区。

kafka-signal

Kafka 消费者使用的组 ID 的名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

100

一个整数值,指定连接器在轮询信号时等待的最大毫秒数。

用于配置信号通道的 Kafka 消费者客户端的直通式属性

Debezium 为信号 Kafka 消费者的传递配置提供了支持。传递信号属性以 signal.consumer.* 前缀开头。例如,连接器会将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给 Kafka 信号消费者。

用于配置 Oracle 连接器接收器通知通道的直通式属性

下表描述了可用于配置 Debezium sink notification 通道的属性。

表 18. 接收器通知配置属性
属性 Default (默认值) 描述

无默认值

接收来自 Debezium 通知的主题名称。当您将 notification.enabled.channels 属性配置为包含 sink 作为已启用通知通道之一时,此属性是必需的。

Debezium 连接器直通式数据库驱动程序配置属性

Debezium 连接器支持数据库驱动程序的传递配置。传递数据库属性以 driver.* 前缀开头。例如,连接器会将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给数据库驱动程序。

监控

除了 Apache Kafka 和 Kafka Connect 内置的 JMX 指标支持外,Debezium Oracle 连接器还提供三种指标类型。

有关如何通过 JMX 公开这些指标的详细信息,请参阅监控文档

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 4. 自定义标签如何修改连接器 MBean 名称

默认情况下,Oracle 连接器将以下 MBean 名称用于流式传输指标

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

快照指标

MBeandebezium.oracle:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

Attributes Type 描述

string

连接器已读取的最后一个快照事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 0

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

int

包含在快照中的表总数。

int

快照尚未复制的表数。

boolean

快照是否已启动。

boolean

快照是否已暂停。

boolean

快照是否被中止。

boolean

快照是否已完成。

boolean

快照是否被跳过。

long

快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。

long

快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

long

队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 设置为正长值,则此指标可用。

long

队列中记录的当前字节卷。

当执行增量快照时,连接器还提供以下附加快照指标:

Attributes Type 描述

string

当前快照块的标识符。

string

定义当前块的主键集的下限。

string

定义当前块的主键集的上限。

string

当前快照表的组成键集(主键)的下限。

string

当前快照表的组成键集(主键)的上限。

流式传输指标

MBeandebezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

Attributes Type 描述

string

连接器已读取的最后一个流式传输事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 0

long

自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。

long

自上次启动或重置指标以来,连接器处理的总创建事件数。

long

自上次启动或重置指标以来,连接器处理的总更新事件数。

long

自上次启动或重置指标以来,连接器处理的总删除事件数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

Map<String, String>

收到的最后一个事件的坐标。

string

已处理的最后一个事务的事务标识符。

long

队列的最大缓冲区(以字节为单位)。如果 max.queue.size.in.bytes 设置为正长值,则此指标可用。

long

队列中记录的当前字节卷。

Debezium Oracle 连接器还提供以下附加的流式传输指标

表 19. 附加流式传输指标的描述
Attributes Type 描述

BigInteger

已处理的最新系统更改号。

BigInteger

事务缓冲区中最旧的系统更改号。

long

最旧的系统更改号的年龄(以毫秒为单位)。如果缓冲区为空,则值为 0

BigInteger

事务缓冲区中最后提交的系统更改号。

BigInteger

当前写入连接器偏移量的系统更改号。

BigInteger

当前用作 LogMiner 会话下边界的系统更改号。

BigInteger

当前用作 LogMiner 会话上边界的系统更改号。

BigInteger

当前用作 LogMiner 提取查询下边界的系统更改号。

BigInteger

当前用作 LogMiner 提取查询上边界的系统更改号。

string[]

数据库当前在线重做日志文件的数组。

string[]

当前挖掘会话所涉及的所有归档和在线重做日志的数组。

long

为任何 LogMiner 会话指定的最小日志数。

long

为任何 LogMiner 会话指定的最多日志数。

string[]

数据库中每个在线重做日志当前状态的数组,格式为 filename|status

int

过去一天数据库执行的日志切换次数。

long

上次 LogMiner 会话查询中观察到的 DML 操作数。

long

处理单个 LogMiner 会话查询时观察到的最大 DML 操作数。

long

观察到的总 DML 操作数。

long

执行的 LogMiner 会话查询(又名批次)的总数。

long

上次 LogMiner 会话查询提取的持续时间(以毫秒为单位)。

long

任何 LogMiner 会话查询提取的最大持续时间(以毫秒为单位)。

long

处理最后一个 LogMiner 查询批次结果的持续时间(以毫秒为单位)。

long

解析 DML 事件 SQL 语句所花费的时间(以毫秒为单位)。

long

启动最后一个 LogMiner 会话的持续时间(以毫秒为单位)。

long

启动 LogMiner 会话的最长持续时间(以毫秒为单位)。

long

连接器用于启动 LogMiner 会话的总持续时间(以毫秒为单位)。

long

处理单个 LogMiner 会话结果所花费的最小持续时间(以毫秒为单位)。

long

处理单个 LogMiner 会话结果所花费的最大持续时间(以毫秒为单位)。

long

处理 LogMiner 会话结果的总持续时间(以毫秒为单位)。

long

JDBC 驱动程序从日志挖掘视图获取下一行以进行处理所花费的总时间(以毫秒为单位)。

long

跨所有会话从日志挖掘视图处理的总行数。

int

每次数据库往返,日志挖掘查询获取的条目数。

long

在从日志挖掘视图获取另一批结果之前,连接器休眠的毫秒数。

long

从日志挖掘视图处理的每秒最大行数。

long

从日志挖掘中处理的平均每秒行数。

long

最后批次从日志挖掘视图处理的平均每秒行数。

long

在提交或回滚之前,连接器的内存缓冲区保留事务的毫秒数,之后将被丢弃。有关更多信息,请参阅log.mining.transaction.retention.ms

long

连接器在 LogMiner 查询之间等待的毫秒数。

long

事务缓冲区中的事件数。

long

事务缓冲区中当前活动事务的数量。

long

事务缓冲区中已提交的事务数量。

long

由于大小超过 log.mining.buffer.transaction.events.threshold 而被丢弃的事务数量。

long

事务缓冲区中已回滚的事务数量。

long

已提交事务中回滚的事件数量,在大多数用例中,这表示存在约束冲突。

long

事务缓冲区中每秒已提交事务的平均数量。

long

连接器处理的更改数量。

long

更改发生在事务日志中与添加到事务缓冲区的时间之间的毫秒时间差。

long

更改发生在事务日志中与添加到事务缓冲区的时间之间的最大毫秒时间差。

long

更改发生在事务日志中与添加到事务缓冲区的时间之间的最小毫秒时间差。

string[]

由于年龄过大而从事务缓冲区中移除的最近被放弃的事务标识符数组。有关详细信息,请参阅log.mining.transaction.retention.ms

long

abandoned transactions 列表中当前条目数。

string[]

事务缓冲区中已挖掘并回滚的最近事务标识符的数组。

long

上次事务缓冲区提交操作的持续时间(以毫秒为单位)。

long

最长的事务缓冲区提交操作的持续时间(以毫秒为单位)。

int

检测到的错误数。

int

检测到的警告数。

int

检查系统更改号是否前进但未更改的次数。高值可能表示正在进行长事务,并且该事务阻止连接器将最近处理的系统更改号刷新到连接器的偏移量。在最佳条件下,该值应接近或等于 0

int

已检测到但 DDL 解析器无法解析的 DDL 记录数。此值应始终为 0;但是,当允许跳过不可解析的 DDL 时,此指标可用于确定是否已将任何警告写入连接器日志。

long

当前挖掘会话的用户全局区域 (UGA) 内存占用(以字节为单位)。

long

所有挖掘会话中当前挖掘会话的用户全局区域 (UGA) 内存占用(以字节为单位)。

long

当前挖掘会话的进程全局区域 (PGA) 内存占用(以字节为单位)。

long

所有挖掘会话中当前挖掘会话的进程全局区域 (PGA) 内存占用(以字节为单位)。

模式历史指标

MBeandebezium.oracle:type=connector-metrics,context=schema-history,server=<topic.prefix>

下表列出了可用的模式历史记录指标。

Attributes Type 描述

string

STOPPEDRECOVERING(从存储中恢复历史记录)或 RUNNING 中的一个,描述数据库模式历史记录的状态。

long

恢复开始的时间(以 epoch 秒为单位)。

long

恢复阶段读取的更改数。

long

在恢复和运行时应用的总模式更改数。

long

自从从历史存储中恢复最后一个更改以来经过的毫秒数。

long

自应用最后一个更改以来经过的毫秒数。

string

从历史存储中恢复的最后一个更改的字符串表示。

string

已应用的最后一个更改的字符串表示。

代理模式演进

Oracle 连接器通过解析重做日志中的 DDL 来自动跟踪和应用表模式更改。如果 DDL 解析器遇到不兼容的语句,在需要时,连接器会提供另一种应用模式更改的方法。

默认情况下,连接器在遇到无法解析的 DDL 语句时会停止。您可以使用 Debezium 的信号来触发此类 DDL 语句的数据库模式更新。

模式更新操作的类型为 schema-changes。此操作会更新信号参数中列出的所有表的模式。消息不包含模式的更新。相反,它包含完整的、新的模式结构。

表 20. 操作参数
名称 描述

database

Oracle 数据库的名称。

schema

应用更改的模式的名称。

changes

包含请求的模式更新的数组。

changes.type

模式更改的类型,通常为 ALTER

changes.id

表的完全限定名称

changes.table

表的完全限定名称

changes.table.defaultCharsetName

如果表使用的字符集与数据库默认值不同,则为字符集名称

changes.table.primaryKeyColumnNames

包含组成主键的列名称的数组

changes.table.columns

包含列元数据的数组

…​columns.name

列的名称

…​columns.jdbcType

列的 JDBC 类型,如JDBC API 中定义

…​columns.typeName

列类型的名称

…​columns.typeExpression

完整的列类型定义

…​columns.charsetName

列字符集(如果与默认值不同)

…​columns.length

列的长度/大小约束

…​columns.scale

数字列的标度

…​columns.position

表中列的位置,从 1 开始

…​columns.optional

布尔值 true 表示列值不是强制性的

…​columns.autoIncremented

布尔值 true 表示列值由序列自动计算

…​columns.generated

布尔值 true 表示列值是自动计算的

插入 schema-changes 信号后,必须使用修改后的配置重新启动连接器,该配置包括将 schema.history.internal.skip.unparseable.ddl 选项指定为 true。在连接器提交 SCN 超过 DDL 更改后,为了防止不可解析的 DDL 语句被意外跳过,请将连接器配置恢复到其之前的状态。

表 21. 日志记录记录示例
Column Value (值)

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

type

schema-changes

data

{
   "database":"ORCLPDB1",
   "schema":"DEBEZIUM",
   "changes":[
      {
         "type":"ALTER",
         "id":"\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMER\"",
         "table":{
            "defaultCharsetName":null,
            "primaryKeyColumnNames":[
               "ID",
               "NAME"
            ],
            "columns":[
               {
                  "name":"ID",
                  "jdbcType":2,
                  "typeName":"NUMBER",
                  "typeExpression":"NUMBER",
                  "charsetName":null,
                  "length":9,
                  "scale":0,
                  "position":1,
                  "optional":false,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"NAME",
                  "jdbcType":12,
                  "typeName":"VARCHAR2",
                  "typeExpression":"VARCHAR2",
                  "charsetName":null,
                  "length":1000,
                  "position":2,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"SCORE",
                  "jdbcType":2,
                  "typeName":"NUMBER",
                  "typeExpression":"NUMBER",
                  "charsetName":null,
                  "length":6,
                  "scale":2,
                  "position":3,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"REGISTERED",
                  "jdbcType":93,
                  "typeName":"TIMESTAMP(6)",
                  "typeExpression":"TIMESTAMP(6)",
                  "charsetName":null,
                  "length":6,
                  "position":4,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               }
            ]
         }
      }
   ]
}

OpenLogReplicator 支持

OpenLogReplicator 摄取适配器目前处于孵化状态,即确切的语义、配置选项等可能会在将来的版本中根据我们收到的反馈而更改。如果您遇到任何问题,请告知我们。

Debezium Oracle 连接器默认使用本机 Oracle LogMiner 摄取更改。但是,可以将连接器切换为使用 OpenLogReplicator,这是一个开源且免费的第三方应用程序,它以对数据库影响很小的方式直接从重做日志和归档日志读取 Oracle 更改。要将连接器配置为使用 OpenLogReplicator,您必须应用特定的数据库和连接器配置,这些配置与您与 LogMiner 使用的配置不同。

先决条件
  • 下载并编译适用于您数据库环境的 OpenLogReplicator。

  • OpenLogReplicator 必须安装,并且能够直接访问归档和重做日志文件。如果可以通过某个共享文件系统访问归档和重做日志,则不一定需要在物理数据库服务器上安装。

OpenLogReplicator 如何工作

当 Debezium Oracle 连接器流式传输更改时,OpenLogReplicator 取代了 Oracle LogMiner 和 Oracle XStream 的角色。它负责在重做日志和归档日志发生时捕获更改,并将这些更改批处理成逻辑事务。Debezium Oracle 连接器通过连接到 OpenLogReplicator 提供的网络端点并摄取批处理的事务来成为 OpenLogReplicator 的消费者。

在 OpenLogReplicator 适配器摄取更改后,Debezium Oracle 连接器会将事件转换为数据更改事件,就像其他任何适配器一样。

从网络拓扑的角度来看,Debezium Oracle 连接器依赖于与 Oracle 数据库和 OpenLogReplicator 的网络连接。同样,OpenLogReplicator 需要与 Oracle 数据库的网络连接,以及直接访问原始重做日志和归档日志。

准备数据库

Oracle 数据库必须配置为生成特定文件,称为归档日志。归档日志是完整在线重做日志的已保存副本。在可以归档重做日志文件之前,必须将数据库置于 ARCHIVELOG 模式。要将数据库置于 ARCHIVELOG 模式,您必须设置特定的配置属性来指定保存归档日志文件的目标。

以下示例显示了将数据库置于归档日志模式的命令

示例 5. OpenLogReplicator 所需的配置
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

为了让 Debezium 生成显示表行 beforeafter 状态的更改事件,必须在数据库上激活补充日志记录。补充日志记录会将列数据添加到重做日志中,以识别在修改表时受影响的行。

您可以使用不同的方法配置补充日志记录。为了支持 Debezium,您至少必须启用最小的数据库级补充日志记录。最小补充日志记录写入创建更改事件所需的最少信息量。以下示例显示了您可能用于启用最小补充日志记录的命令。

示例 6. 配置数据库级补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

如果 Debezium for Oracle 的补充日志记录已为其他目的配置,它可以与更高程度的数据库级补充日志记录一起工作。但如果您不需要更高保真度的日志记录来支持其他应用程序,您可以将数据库级日志记录减少到最低级别。

对于每个捕获的表,您必须显式配置更高保真度的补充日志记录,称为 (ALL) COLUMNS(ALL) COLUMNS 日志记录级别可确保 Oracle 捕获列的状态,而不管列在将重做条目写入重做日志时是否已更改。启用更高的日志记录级别可使 Debezium for Oracle 生成能够提供行准确的之前之后状态的更改事件。

以下示例显示了为每个捕获的表启用补充日志记录的命令。

示例 7. 配置捕获表补充日志记录
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

每当将新表添加到 Debezium Oracle 的连接器配置中时,您都必须为每个表配置补充日志记录。如果为捕获配置的表未正确配置补充日志记录,则在连接器开始流式传输后,它会返回警告消息。

创建连接器用户

Debezium Oracle 连接器需要一个设置了特定权限的用户帐户,以便连接器可以捕获更改事件。此用户帐户将由 Debezium 和 OpenLogReplicator 使用。以下示例显示了在多租户 Oracle 环境中部署 Debezium 和 OpenLogReplicator 的可能用户帐户配置。

示例 8. 创建 OpenLogReplicator 用户
# Create Log Miner Tablespace and User
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba <<- EOF
  CREATE TABLESPACE LOGMINER_TBS DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;
EOF

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba <<- EOF
  CREATE TABLESPACE LOGMINER_TBS DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;
EOF

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba <<- EOF
  CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS CONTAINER=ALL;

  -- Debezium specific permissions
  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_\$DATABASE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ANY DICTIONARY TO c##dbzuser CONTAINER=ALL;
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  -- These can be reduced from ANY TABLE to your captured tables depending on your security model
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;

  -- OpenLogReplicator specific permissions
  ALTER SESSION SET CONTAINER = ORCLPDB1;
  GRANT SELECT, FLASHBACK ON SYS.CCOL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.CDEF$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.COL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.DEFERRED_STG$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.ECOL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOB$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOBCOMPPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOBFRAG$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.OBJ$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TAB$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABCOMPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABSUBPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TS$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.USER$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON XDB.XDB$TTSET TO c##dbzuser;

  exit;
EOF

配置 OpenLogReplicator 适配器

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 摄取更改事件。您可以修改默认设置,将连接器配置为使用 OpenLogReplicator 适配器而不是 LogMiner。

在下面的示例中,将以下属性添加到连接器配置中,以使连接器能够使用 OpenLogReplicator 适配器

  • database.connection.adapter

  • openlogreplicator.source

  • openlogreplicator.host

  • openlogreplicator.port

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.connection.adapter": "olr",
        "openlogreplicator.source": "ORACLE",
        "openlogreplicator.host": "<ip address or hostname of OpenLogReplicator>",
        "openlogreplicator.port": "<port OpenLogReplicator is listening on>"
    }
}

构建 OpenLogReplicator

OpenLogReplicator 不分发二进制文件,因此您必须从代码构建第三方工具。OpenLogReplicator 用 C++ 编写;但是,作者提供了一个容器映像来为各种操作系统(包括 RHEL、Fedora、CentOS、Debian 等)构建源代码。有关如何使用容器构建工具的信息,请参阅此OpenLogReplicator GitHub 存储库

获取 OpenLogReplicator 的 Oracle JDBC 驱动程序

当您将 Debezium Oracle 连接器与 OpenLogReplicator 结合使用时,您必须获取 Oracle JDBC 驱动程序才能连接到 Oracle 数据库。有关更多信息,请参阅获取 Oracle JDBC 驱动程序

OpenLogReplicator 配置

OpenLogReplicator 是一个第三方独立进程,负责读取 Oracle 重做和归档日志。与其他 Debezium Oracle 适配器不同,这意味着有两个可配置的组件必须独立配置。

您可以使用名为 scripts/OpenLogReplicator.json 的 JSON 文件来配置 OpenLogReplicator。有关此文件必需格式的更多信息,请参阅OpenLogReplicator 文档

示例 9. OpenLogReplicator 配置
{
  "version": "1.6.0",
  "source": [{
    "alias": "SOURCE",
    "name": "ORACLE", (1)
    "reader": {
      "type": "online",
      "path-mapping": ["/opt/olr/recovery_area", "/opt/olr/recovery_area/ORCLCDB/archivelog"], (2)
      "user": "c##dbzuser", (3)
      "password": "dbz", (4)
      "server": "//<ip>:<port>/ORCLPDB1" (5)
    },
    "format": { (6)
      "type": "json",
      "column": 2,
      "db": 3,
      "interval-dts": 9,
      "interval-ytm": 4,
      "message": 2,
      "rid": 1,
      "schema": 7,
      "scn-all": 1,
      "timestamp-all": 1
    }
  }],
  "target": [{
    "alias": "DEBEZIUM",
    "source": "SOURCE",
    "writer": {
      "type": "network", (7)
      "uri": "<host>:<port>" (8)
    }
  }]
}
1 这应该与 openlogreplicator.source 连接器配置匹配。
2 文件路径对列表 [before1,after1,befor2,after2,…​],如果日志文件路径与 beforeX 前缀之一匹配,则用 afterX 路径替换该前缀。当 OpenLogReplicator 在与源数据库不同的主机上运行时,并且重做和归档日志的路径在数据库和 OpenLogReplicator 进程之间有所不同时,这很有用。
3 这应该与 database.user 连接器属性匹配。
4 这应该与 database.password 连接器属性匹配。
5 这应该指向数据库主机、端口和 Oracle SID。
6 这指定了 Debezium Oracle 连接器摄取的有效负载格式。请按指定使用这些值,因为这些是我们除了默认值之外唯一需要的格式选项。
7 这必须指定 network,因为 Debezium Oracle 连接器通过网络连接与 OpenLogReplicator 通信。
8 这指定了 OpenLogReplicator 监听连接的网络绑定主机和端口。Debezium Oracle 连接器应该可以访问它。

OpenLogReplicator 连接器属性

使用 OpenLogReplicator 时,以下配置属性是必需的。

属性

Default (默认值)

描述

无默认值

OpenLogReplicator JSON 配置中已配置的 source.name 元素的逻辑名称。

无默认值

OpenLogReplicator 网络服务的 W. 主机名或 IP 地址。

无默认值

OpenLogReplicator 网络服务使用的端口号。

OpenLogReplicator ROWID 支持

OpenLogReplicator 不在其更改事件有效负载中提供 ROWID 伪列的详细信息。因此,在使用 OpenLogReplicator 适配器的环境中,Debezium for Oracle 连接器在源信息块中始终显示空的 row_id 值。

OpenLogReplicator XML 支持

要使用 OpenLogReplicator 的 Debezium Oracle 连接器捕获 XML 列,OpenLogReplicator 应为 1.5.0 或更高版本。

XStream 支持

Debezium Oracle 连接器默认使用本机 Oracle LogMiner 从 Oracle 摄取更改。可以将连接器切换为使用 Oracle XStream。要将连接器配置为使用 Oracle XStream,您必须应用特定的数据库和连接器配置,这些配置与您与 LogMiner 使用的配置不同。

先决条件
  • 要使用 XStream API,您必须拥有 GoldenGate 产品的许可证。无需安装 GoldenGate。

准备数据库

Oracle XStream 所需的配置
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

此外,必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。以下示例说明如何在特定表上配置此设置,这是最小化 Oracle 重做日志中捕获的信息量的理想选择。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

为连接器创建 XStream 用户

Debezium Oracle 连接器要求设置用户帐户,并具有特定权限,以便连接器可以捕获更改事件。以下示例提供有关如何在多租户数据库模型中创建用户配置的信息。

创建 XStream 管理员用户

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE USER c##dbzadmin IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_adm_tbs
    QUOTA UNLIMITED ON xstream_adm_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;

  BEGIN
     DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
        grantee                 => 'c##dbzadmin',
        privilege_type          => 'CAPTURE',
        grant_select_privileges => TRUE,
        container               => 'ALL'
     );
  END;
  /

  exit;

创建连接器的 XStream 用户

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_tbs
    QUOTA UNLIMITED ON xstream_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  exit;

创建 XStream 出站服务器

创建XStream 出站服务器。(如果具有适当的权限,连接器将来可能会自动执行此操作,请参阅DBZ-721

创建 XStream 出站服务器
sqlplus c##dbzadmin/dbz@//:1521/ORCLCDB
DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
    tables(1)  := NULL;
    schemas(1) := 'debezium';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
/
exit;

当您设置 XStream 出站服务器以从可插拔数据库捕获更改时,请将可插拔数据库名称指定为 source_container_name 参数的值。

配置 XStream 用户帐户以连接到 XStream 出站服务器
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'dbzxout',
    connect_user => 'c##dbzuser');
END;
/
exit;

单个 XStream 出站服务器不能被多个 Debezium Oracle 连接器共享。每个连接器都需要配置一个唯一的 XStream 出站连接器。

配置 XStream 适配器

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 摄取更改事件。您可以调整连接器配置以启用连接器使用 Oracle XStream 适配器。

以下配置示例添加了 database.connection.adapterdatabase.out.server.name 属性,以使连接器能够使用 XStream API 实现。

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.connection.adapter": "xstream",
        "database.out.server.name" : "dbzxout"
    }
}

获取 Oracle JDBC 驱动程序和 XStream API 文件

Debezium Oracle 连接器需要 Oracle JDBC 驱动程序 (ojdbc11.jar) 来连接到 Oracle 数据库。如果连接器使用 XStream 访问数据库,您还必须拥有 XStream API (xstreams.jar)。许可要求禁止 Debezium 在 Oracle 连接器存档中包含这些文件。但是,这些必需文件可从 Oracle Instant Client 免费下载。以下步骤说明如何下载 Oracle Instant Client 并提取必需文件。

过程
  1. 从浏览器下载适用于您操作系统的Oracle Instant Client 包

  2. 提取存档,然后打开 instantclient_<version> 目录。

    For example (例如:)

    instantclient_21_1/
    ├── adrci
    ├── BASIC_LITE_LICENSE
    ├── BASIC_LITE_README
    ├── genezi
    ├── libclntshcore.so -> libclntshcore.so.21.1
    ├── libclntshcore.so.12.1 -> libclntshcore.so.21.1
    
    ...
    
    ├── ojdbc11.jar
    ├── ucp.jar
    ├── uidrvci
    └── xstreams.jar
  3. 复制 ojdbc11.jarxstreams.jar 文件,并将它们添加到 <kafka_home>/libs 目录,例如 kafka/libs

  4. 创建环境变量 LD_LIBRARY_PATH,并将其值设置为 Instant Client 目录的路径,例如

    LD_LIBRARY_PATH=/path/to/instant_client/

XStream 连接器属性

除非有默认值,否则使用 XStream 时需要以下配置属性。

属性

Default (默认值)

描述

无默认值

在数据库中配置的 XStream 出站服务器的名称。

XStream 和 DBMS_LOB

Oracle 提供了一个名为 DBMS_LOB 的数据库包,该包包含一组用于操作 BLOB、CLOB 和 NCLOB 列的程序。这些程序大多数操作 LOB 列的整体,但有一个程序 WRITEAPPEND 能够操作 LOB 数据缓冲区的一个子集。

使用 XStream 时,WRITEAPPEND 会为每次调用程序发出一个逻辑更改记录 (LCR) 事件。这些 LCR 事件不会像使用 Oracle LogMiner 适配器时那样合并为单个更改事件。因此,主题的消费者可能会收到带有部分列值的事件。此差异行为已在DBZ-4741 中捕获,将在未来的版本中解决。

常见问题解答

是否支持 Oracle 11g?

不支持 Oracle 11g;但是,我们确实以最佳努力为基础致力于与 Oracle 11g 向后兼容。我们依靠社区在传达与 Oracle 11g 的兼容性问题以及在确定回归时提供错误修复。

Oracle LogMiner 不是已弃用吗?

否,Oracle 仅在 Oracle 12c 中弃用了 Oracle LogMiner 的连续挖掘选项,并在 Oracle 19c 开始删除了该选项。Debezium Oracle 连接器不依赖此选项来运行,因此可以安全地用于新版本的 Oracle,而不会产生任何影响。

如何更改偏移量中的位置?

Debezium Oracle 连接器在偏移量中维护两个关键值:一个名为 scn 的字段,另一个名为 commit_scnscn 字段是一个字符串,表示连接器在捕获更改时使用的低水位标记起始位置。

  1. 查找包含连接器偏移量的主题名称。这是根据 offset.storage.topic 配置属性设置的值配置的。

  2. 查找连接器的最后一个偏移量、存储它的键以及用于存储偏移量的分区。这可以使用 Kafka Broker 安装提供的 kafkacat 实用脚本来完成。例如,可能看起来像这样

    kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
    Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}

    inventory-connector 的键是 ["inventory-connector",{"server":"server1"}],分区是 11,最后一个偏移量是跟在键后面的内容。

  3. 要回溯到之前的偏移量,应停止连接器并执行以下命令

    echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \
    kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11

    这会将给定的键和偏移量值写入 my_connect_offsets 主题的分区 11。在此示例中,我们将连接器倒回 SCN 3245675000 而不是 324567897

如果连接器找不到具有给定偏移量 SCN 的日志怎么办?

Debezium 连接器在连接器偏移量中维护低水位标记 SCN 和高水位标记 SCN。低水位标记 SCN 代表起始位置,并且必须存在于可用的在线重做日志或归档日志中,连接器才能成功启动。当连接器报告找不到此偏移量 SCN 时,这表示仍然可用的日志不包含该 SCN,因此连接器无法从其停下的地方挖掘更改。

发生这种情况时,有两种选择。第一种是删除连接器的历史记录主题和偏移量,然后重新启动连接器,进行新的快照。这将保证任何主题使用者都不会发生数据丢失。第二种是手动操作偏移量,将 SCN 高于一个在重做日志或归档日志中可用的位置。这将导致旧 SCN 值和新提供 SCN 值之间的更改丢失且未写入主题。不推荐这样做。

各种挖掘策略之间有什么区别?

Debezium Oracle 连接器为 log.mining.strategy 提供了三个选项。

默认是 online_catalog,它指示连接器不将数据字典写入重做日志。相反,Oracle LogMiner 将始终使用包含表结构当前状态的在线数据字典。这也意味着,如果表的结构发生更改并且不再与在线数据字典匹配,Oracle LogMiner 将无法解析表或列名(如果表的结构已更改)。如果正在捕获的表经常发生模式更改,则不应使用此挖掘策略选项。所有数据更改都必须与模式更改同步,以便已从表的日志中捕获所有更改,然后停止连接器,应用模式更改,并重新启动连接器并恢复对表的已更改数据。此选项需要的 Oracle 数据库内存更少,并且 Oracle LogMiner 会话通常启动得更快,因为数据字典不需要由 LogMiner 进程加载或预填充。

第二个选项 redo_log_catalog 会在每次检测到日志切换时将 Oracle 数据字典写入重做日志。此数据字典对于 Oracle LogMiner 在解析重做日志和归档日志时有效跟踪模式更改至关重要。此选项会生成比平常更多的归档日志,但允许实时操作正在捕获的表,而不会影响捕获数据更改。此选项通常需要更多的 Oracle 数据库内存,并且在每次日志切换后,Oracle LogMiner 会话和进程启动会稍长一些。

最后一个选项 hybrid 结合了上述两种策略的优点,同时避免了它们的缺点。此策略利用 online_catalog 的性能和 redo_log_catalog 的模式跟踪的弹性,同时还避免了开销和性能成本(例如,高于正常水平的归档日志生成)。此模式使用回退模式,即如果 LogMiner 无法为数据库更改重建 SQL,Debezium 连接器将依赖连接器维护的内存模式模型来实时重建 SQL。目标是此模式最终将过渡到未来的默认模式,很可能是唯一的操作模式。

Hybrid 挖掘策略在 LogMiner 中是否存在任何限制?

是的,log.mining.strategy 的 Hybrid 模式仍然是一个正在进行中的策略,因此尚未支持所有数据类型。目前,此模式无法重建包含对 CLOBNCLOBBLOBXMLJSON 数据类型的操作的 SQL 语句。简而言之,如果您启用了 lob.enabled 并将其值设置为 true,您将无法使用 Hybrid 策略,并且连接器将无法启动,因为这种组合不受支持。

为什么连接器在 AWS 上停止捕获更改?

由于 AWS Gateway Load Balancer 的固定空闲超时时间为 350 秒,完成时间超过 350 秒的 JDBC 调用可能会无限期挂起。

在调用 Oracle LogMiner API 的时间超过 350 秒的情况下,可能会触发超时,导致 AWS Gateway Load Balancer 挂起。例如,当处理大量数据的 LogMiner 会话与 Oracle 的定期检查点任务并发运行时,可能会发生此类超时。

为防止 AWS Gateway Load Balancer 上发生超时,请从 Kafka Connect 或 Debezium Server 环境启用来自这些环境的保持连接数据包,方法是在托管连接器的环境中以 root 用户或超级用户的身份执行以下任务

  1. 从终端运行以下命令

    sysctl -w net.ipv4.tcp_keepalive_time=60
  2. 编辑 /etc/sysctl.conf 并将以下变量的值设置为如下所示

    net.ipv4.tcp_keepalive_time=60
  3. 重新配置 Debezium for Oracle 连接器,使其使用 database.url 属性而不是 database.hostname,并添加 (ENABLE=broken) Oracle 连接字符串描述符,如下例所示

    database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))

前面的步骤配置 TCP 网络堆栈以每 60 秒发送一次保持连接数据包。因此,当 JDBC 调用 LogMiner API 的时间超过 350 秒时,AWS Gateway Load Balancer 不会超时,从而使连接器能够继续从数据库的事务日志中读取更改。

ORA-01555 的原因以及如何处理?

Debezium Oracle 连接器在执行初始快照阶段时使用闪回查询。闪回查询是一种特殊类型的查询,它依赖于闪回区域(由数据库的 UNDO_RETENTION 数据库参数维护)来根据给定时间(在本例中为给定 SCN)的表内容返回查询结果。默认情况下,Oracle 通常仅维护约 15 分钟的撤消或闪回区域,除非您的数据库管理员已增加或减少了此值。对于捕获大型表的配置,执行初始快照可能需要超过 15 分钟或您配置的 UNDO_RETENTION,这最终会导致此异常。

ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too small

处理此异常的第一种方法是与您的数据库管理员合作,看看他们是否可以暂时增加 UNDO_RETENTION 数据库参数。这不需要重新启动 Oracle 数据库,因此可以联机完成,而不会影响数据库的可用性。但是,更改此参数仍可能导致上述异常或“快照太旧”异常,如果表空间没有足够的空间来存储必要的撤消数据。

处理此异常的第二种方法是根本不依赖初始快照,将 snapshot.mode 设置为 no_data,然后改为依赖增量快照。增量快照不依赖于闪回查询,因此不受 ORA-01555 异常的影响。

ORA-04036 的原因以及如何处理?

当数据库更改不频繁时,Debezium Oracle 连接器可能会报告 ORA-04036 异常。启动 Oracle LogMiner 会话并重复使用它,直到检测到日志切换。之所以重复使用会话,是因为它提供了最佳的 Oracle LogMiner 性能利用率,但如果发生长时间的挖掘会话,这可能导致 PGA 内存使用过量,最终导致此类异常。

ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMIT

可以通过指定 Oracle 重做日志切换的频率或允许 Debezium Oracle 连接器重用挖掘会话的时间来避免此异常。Debezium Oracle 连接器提供了一个配置选项 log.mining.session.max.ms,该选项控制当前 Oracle LogMiner 会话可以重用多久,然后再关闭并启动新会话。这使得数据库资源可以保持在数据库允许的 PGA 内存限制内。

ORA-01882 的原因以及如何处理?

当连接到 Oracle 数据库时,Debezium Oracle 连接器可能会报告以下异常

ORA-01882: timezone region not found

当 JDBC 驱动程序无法正确解析时区信息时,会发生这种情况。为了解决此驱动程序相关问题,需要告知驱动程序不要使用区域解析时区详细信息。可以通过使用 driver.oracle.jdbc.timezoneAsRegion=false 指定驱动程序直通属性来完成。

ORA-25191 的原因以及如何处理?

Debezium Oracle 连接器会自动忽略索引组织表 (IOT),因为 Oracle LogMiner 不支持它们。但是,如果抛出 ORA-25191 异常,这可能是由于此类映射的独特边界情况,并且可能需要额外的规则来自动排除它们。ORA-25191 异常的示例可能看起来像这样

ORA-25191: cannot reference overflow table of an index-organized table

如果抛出 ORA-25191 异常,请提交 Jira 问题,提供有关表及其映射、与其他父表的关联等详细信息。作为一种变通方法,可以调整 include/exclude 配置选项以阻止连接器访问这些表。

如何解决 SAX 功能 external-general-entities not supported

Debezium 2.4 引入了对 Oracle XMLTYPE 列类型的支持。要使用此功能,需要 Oracle xdbxmlparserv2 依赖项。

Oracle 的 xmlparserv2 依赖项实现了基于 SAX 的解析器,如果运行时找到并使用此实现而不是类路径上的其他实现,则会发生此错误。为了具体影响使用哪个 SAX 实现,通常需要使用特定参数启动 JVM。

当提供以下 JVM 参数时,Oracle 连接器将在此错误下成功启动。

-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl
ORA-00600 的原因以及如何处理?

ORA-00600 是一个通用内部错误。它表示相关进程遇到了低级别意外情况 - 这通常意味着您遇到了一个错误。有关更多信息,请参阅此Oracle 博客

ORA-00600 附带多个参数,其中第一个参数是最重要的。根据第一个参数,有一个假设的解决方案需要应用

ORA-00600: internal error code, arguments: [flg0], ...

避免此问题的解决方案是在数据库级别启用补充日志记录。

ORA-00600: internal error code, arguments: [kdli_iread], ...

可以通过将 Oracle 数据库升级到 19.25 并应用补丁 32209850 来避免此异常。

Debezium 2.7 在 decimal.handling.mode 中引入了重大更改,是否有方法可以使用旧行为?

是的,可以通过将连接器配置属性 legacy.decimal.handling.strategy 设置为 true 来启用旧行为。将其设置为 true 后,连接器将不再将所有零精度数视为 VariableScaleDecimal 类型,并会根据列的大小继续将其作为 INT8、INT16、INT32 和 INT64 发出。