Debezium Connector for Oracle

目录

概述

Debezium 的 Oracle 连接器捕获并记录 Oracle 服务器数据库中发生的行级更改,包括连接器运行时添加的表。您可以配置连接器为特定模式和表的子集发出更改事件,或者忽略、屏蔽或截断特定列中的值。

有关此连接器兼容的 Oracle 数据库版本的信息,请参阅 Debezium 版本概述

Debezium 可以通过使用本地 LogMiner 数据库包、XStream APIOpenLogReplicator 来摄取 Oracle 的更改事件。

Oracle 连接器的工作原理

要最佳地配置和运行 Debezium Oracle 连接器,了解连接器的工作原理会很有帮助。

适配器模式

Debezium Oracle 连接器支持多种适配器来从 Oracle 数据库事务日志中捕获更改。您可以通过设置 database.connection.adapter 配置属性来配置连接器使用特定的适配器。

连接器支持以下适配器

LogMiner

默认情况下,Debezium Oracle 连接器使用本地 Oracle LogMiner API 来读取和流式传输数据库事务日志中的更改。您可以配置 Debezium 使用以下两种不同的 LogMiner 模式之一:

未提交更改模式

在未提交更改模式下,Debezium Oracle 连接器会接收已提交和进行中的未提交事务的更改事件的连续流。由于 Oracle 会交错来自多个事务的操作,因此连接器必须缓冲事件,直到检测到相应的提交或回滚。

由于这是连接器的默认模式,因此分配足够的内存给连接器以处理所有捕获的表的最大事务和最高并发级别非常重要。

未提交更改模式通过将大部分处理委托给连接器来最小化源数据库的负载。Oracle LogMiner 仅负责从磁盘读取重做日志和归档日志,并流式传输原始更改。

通过将 database.connection.adapter 属性设置为 logminer(默认值)来启用此模式。

已提交更改模式

在已提交更改模式下,Debezium Oracle 连接器指示 Oracle LogMiner 仅流式传输已提交的更改。由于连接器仅接收已提交的事件,因此它可以立即将它们转发到目标主题,而无需维护内部事务缓冲区。

此模式需要显著减少连接器的内存,但会将更大的处理和内存负担转移到源数据库。对于大型事务,这很容易超出数据库的可用 PGA 内存,可能导致 PGA_AGGREGATE_LIMIT 错误。

仅当您可以保证所有事务都适合数据库的 PGA 内存空间时,才推荐使用此模式。否则,请使用未提交更改模式。

通过将 database.connection.adapter 属性设置为 logminer_unbuffered 来启用此模式。

此功能目前处于 **孵化** 阶段,未来版本可能会发生变化。

OpenLogReplicator

您可以配置 Debezium Oracle 连接器使用 OpenLogReplicator,这是一个开源应用程序,可以直接从重做日志和归档日志中读取 Oracle 更改,对数据库的影响极小。

通过将 database.connection.adapter 属性设置为 olr 来启用 OpenLogReplicator。有关更多信息,请参阅 OpenLogReplicator 支持 部分。

Oracle XStream

Debezium Oracle 连接器还可以使用 Oracle XStream,这是 Oracle GoldenGate 的商业组件。

通过将 database.connection.adapter 属性设置为 xstream 来启用 Oracle XStream。有关更多详细信息,请参阅 XStream 部分。

快照

通常,Oracle 服务器上的重做日志配置为不保留数据库的完整历史记录。因此,Debezium Oracle 连接器无法从日志中检索数据库的整个历史记录。为了使连接器能够为数据库的当前状态建立基线,连接器首次启动时,会执行初始的一致性快照数据库。

如果完成初始快照所需的时间超过了为数据库设置的 UNDO_RETENTION 时间(默认为 15 分钟),则可能发生 ORA-01555 异常。有关错误的更多信息,以及您可以采取的恢复步骤,请参阅 常见问题解答

在表的快照过程中,Oracle 有可能引发 ORA-01466 异常。当用户修改表架构或添加、更改或删除与正在快照的表相关的索引或对象时,就会发生这种情况。如果发生这种情况,连接器将停止,并且需要从头开始进行初始快照。

要解决此问题,您可以将 snapshot.database.errors.max.retries 属性配置为大于 0 的值,以便特定表的快照将重新启动。虽然整个快照不会从头开始重试,但有问题的特定表将从头开始重新读取,并且该表的 topic 将包含重复的快照事件。

Oracle 连接器执行初始快照的默认工作流

以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为其默认值(即 initial)时的快照过程。您可以通过更改 snapshot.mode 属性的值来定制连接器创建快照的方式。如果您配置了不同的快照模式,连接器将通过使用此工作流的修改版本来完成快照。

当快照模式设置为默认值时,连接器将完成以下任务以创建快照:

  1. Establish a connection to the database. (建立数据库连接。)

  2. 确定要捕获的表。默认情况下,连接器捕获所有表,但排除那些 模式被排除捕获的表。快照完成后,连接器将继续流式传输指定表的数据。如果您希望连接器仅捕获特定表的数据,可以通过设置 table.include.listtable.exclude.list 等属性来指定仅捕获表或表元素的子集的数据。

  3. 在每个捕获的表上获取 ROW SHARE MODE 锁,以防止在创建快照期间发生结构性更改。Debezium 仅在短时间内持有锁。

  4. 从服务器的重做日志中读取当前系统更改号 (SCN) 位置。

  5. 捕获所有数据库表的结构,或所有被指定用于捕获的表的结构。连接器将其模式信息持久化到其内部数据库模式历史记录主题中。模式历史记录提供了在发生更改事件时生效的结构信息。

    默认情况下,连接器捕获处于捕获模式的数据库中每个表的模式,包括未配置为捕获的表。如果表未配置为捕获,初始快照仅捕获其结构;不捕获任何表数据。有关为什么快照会为未包含在初始快照中的表持久化模式信息,请参阅 理解为什么初始快照会捕获所有表的模式历史记录

  6. 释放步骤 3 中获取的锁。其他数据库客户端现在可以写入任何以前锁定的表。

  7. 在步骤 4 中读取的 SCN 位置,连接器扫描指定用于捕获的表(SELECT * FROM …​ AS OF SCN 123)。在扫描过程中,连接器将完成以下任务:

    1. Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began. (确认表在快照开始之前已创建。如果表在快照开始之后创建,连接器将跳过该表。快照完成后,连接器过渡到流式传输,它会为快照开始后创建的任何表发出变更事件。)

    2. 为从表中捕获的每一行生成一个 read 事件。所有 read 事件都包含相同的 SCN 位置,即步骤 4 中获取的 SCN 位置。

    3. Emits each read event to the Kafka topic for the source table. (将每个 read 事件发送到源表的 Kafka 主题。)

    4. Releases data table locks, if applicable. (释放数据表锁(如果适用)。)

  8. Record the successful completion of the snapshot in the connector offsets. (在连接器偏移量中记录快照的成功完成。)

The resulting initial snapshot captures the current state of each row in the captured tables. From this baseline state, the connector captures subsequent changes as they occur. (生成的初始快照捕获了捕获表中每一行的当前状态。从这个基线状态开始,连接器会捕获后续发生的更改。)

快照过程开始后,如果由于连接器故障、重新平衡或其他原因导致过程中断,则在连接器重新启动后,该过程将重新启动。在连接器完成初始快照后,它将从步骤 3 中读取的位置继续流式传输,这样就不会错过任何更新。如果连接器因任何原因再次停止,在重新启动后,它将从之前停止的地方继续流式传输更改。

表 1. snapshot.mode 连接器配置属性的设置
Setting (设置) 描述

always (始终)

在每次连接器启动时执行快照。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。

initial (初始)

连接器执行数据库快照,如 执行初始快照的默认工作流 中所述。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。

initial_only (仅初始)

连接器执行数据库快照,并在流式传输任何更改事件记录之前停止,不允许捕获任何后续更改事件。

schema_only (仅模式)

Deprecated, see no_data. (已弃用,请参阅 no_data。)

no_data (无数据)

连接器捕获所有相关表的结构,执行 默认快照工作流 中描述的所有步骤,但它不会创建 READ 事件来表示连接器启动时的数据集(步骤 6)。

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. (将此选项设置为恢复已丢失或损坏的数据库模式历史主题。重新启动后,连接器将运行一个快照,从源表重建该主题。您还可以将此属性设置为定期修剪经历意外增长的数据库模式历史主题。)

WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown. (警告:如果在上次连接器关闭后模式更改已提交到数据库,请不要使用此模式执行快照。)

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

Set the snapshot mode to configuration_based to control snapshot behavior through the set of connector properties that have the prefix 'snapshot.mode.configuration.based'. (将快照模式设置为 configuration_based,通过具有前缀“snapshot.mode.configuration.based”的连接器属性集来控制快照行为。)

custom (自定义)

custom snapshot 模式允许您注入自己对 io.debezium.spi.snapshot.Snapshotter 接口的实现。将 snapshot.mode.custom.name 配置属性设置为您的实现 `name()` 方法提供的名称。该名称在 Kafka Connect 群集的类路径中指定。如果您使用 Debezium DebeziumEngine,则名称包含在连接器 JAR 文件中。有关更多信息,请参阅 自定义 snapshot 程序 SPI

有关更多信息,请参阅连接器配置属性表中的 snapshot.mode

了解为什么初始快照会捕获所有表的模式历史记录

The initial snapshot that a connector runs captures two types of information (连接器运行的初始快照捕获两种类型的信息:)

Table data (表数据)

有关连接器 table.include.list 属性中命名的表中 INSERTUPDATEDELETE 操作的信息。

Schema data (模式数据)

DDL statements that describe the structural changes that are applied to tables. Schema data is persisted to both the internal schema history topic, and to the connector’s schema change topic, if one is configured. (描述应用于表的结构化更改的 DDL 语句。模式数据将持久化到内部模式历史主题,以及连接器的模式变更主题(如果已配置)。)

After you run an initial snapshot, you might notice that the snapshot captures schema information for tables that are not designated for capture. By default, initial snapshots are designed to capture schema information for every table that is present in the database, not only from tables that are designated for capture. Connectors require that the table’s schema is present in the schema history topic before they can capture a table. By enabling the initial snapshot to capture schema data for tables that are not part of the original capture set, Debezium prepares the connector to readily capture event data from these tables should that later become necessary. If the initial snapshot does not capture a table’s schema, you must add the schema to the history topic before the connector can capture data from the table. (运行初始快照后,您可能会注意到快照捕获了未指定捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的所有表的模式信息,而不仅仅是指定捕获的表。连接器要求表的模式存在于模式历史主题中,然后才能捕获表。通过允许初始快照捕获非原始捕获集一部分的表的模式数据,Debezium 会为连接器做好准备,以便在将来需要时能够轻松地从这些表中捕获事件数据。如果初始快照未捕获表的模式,您必须在连接器能够从表中捕获数据之前将模式添加到历史主题。)

In some cases, you might want to limit schema capture in the initial snapshot. This can be useful when you want to reduce the time required to complete a snapshot. Or when Debezium connects to the database instance through a user account that has access to multiple logical databases, but you want the connector to capture changes only from tables in a specific logic database. (在某些情况下,您可能希望在初始快照中限制模式捕获。当您想减少完成快照所需的时间时,这可能很有用。或者,当 Debezium 通过具有访问多个逻辑数据库的用户帐户连接到数据库实例时,而您只想让连接器捕获特定逻辑数据库中表的更改。)

从初始快照未捕获的表中捕获数据(无模式更改)

In some cases, you might want the connector to capture data from a table whose schema was not captured by the initial snapshot. Depending on the connector configuration, the initial snapshot might capture the table schema only for specific tables in the database. If the table schema is not present in the history topic, the connector fails to capture the table, and reports a missing schema error. (在某些情况下,您可能希望连接器捕获模式未被初始快照捕获的表中的数据。根据连接器配置,初始快照可能仅捕获数据库中特定表的模式。如果表模式不存在于历史主题中,连接器将无法捕获该表,并报告模式丢失错误。)

You might still be able to capture data from the table, but you must perform additional steps to add the table schema. (您仍然可以从该表中捕获数据,但必须执行额外步骤来添加表模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • 事务日志中表的每个条目都使用相同的模式。有关从新表捕获数据(已发生结构更改)的信息,请参阅 从具有模式更改的新表中捕获数据

过程
  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. 在连接器配置中

    1. snapshot.mode 设置为 recovery

    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 的值设置为 false,以确保将来连接器可以轻松捕获当前未指定捕获的表的数据。连接器只能从表中捕获数据,前提是表的模式历史记录存在于历史记录主题中。

    3. 将要捕获的表添加到 table.include.list

  4. Restart the connector. The snapshot recovery process rebuilds the schema history based on the current structure of the tables. (重新启动连接器。快照恢复过程将根据表的当前结构重建模式历史记录。)

  5. (可选)快照完成后,对新添加的表发起 增量快照。增量快照首先流式传输新表的历史数据,然后恢复从重做日志和归档日志中读取先前配置的表的更改,包括连接器离线期间发生的更改。

  6. (Optional) Reset the snapshot.mode back to no_data to prevent the connector from initiating recovery after a future restart. ((可选)将 snapshot.mode 重置回 no_data,以防止连接器在将来的重启后启动恢复。)

从初始快照未捕获的表中捕获数据(模式更改)

If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results. (如果模式更改应用于某个表,则在模式更改之前提交的记录与更改之后提交的记录具有不同的结构。当 Debezium 从表中捕获数据时,它会读取模式历史记录以确保它将正确的模式应用于每个事件。如果模式不存在于模式历史主题中,连接器将无法捕获该表,并导致错误。)

If you want to capture data from a table that was not captured by the initial snapshot, and the schema of the table was modified, you must add the schema to the history topic, if it is not already available. You can add the schema by running a new schema snapshot, or by running an initial snapshot for the table. (如果您想从初始快照未捕获的表中捕获数据,并且该表的模式已修改,则必须将模式添加到历史主题(如果尚未提供)。您可以通过运行新的模式快照或为该表运行初始快照来添加模式。)

先决条件
  • You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)

  • A schema change was applied to the table so that the records to be captured do not have a uniform structure. (已对表应用了模式更改,以便要捕获的记录没有统一的结构。)

过程
Initial snapshot captured the schema for all tables (store.only.captured.tables.ddl was set to false) (初始快照捕获了所有表的模式(store.only.captured.tables.ddl 设置为 false))
  1. 编辑 table.include.list 属性以指定要捕获的表。

  2. Restart the connector. (重新启动连接器。)

  3. 发起 增量快照 以捕获新添加表中的现有数据。

Initial snapshot did not capture the schema for all tables (store.only.captured.tables.ddl was set to true) (初始快照未捕获所有表的模式(store.only.captured.tables.ddl 设置为 true))

If the initial snapshot did not save the schema of the table that you want to capture, complete one of the following procedures (如果初始快照未保存您要捕获的表的模式,请完成以下任一程序:)

Procedure 1: Schema snapshot, followed by incremental snapshot (过程 1:模式快照,然后是增量快照)

In this procedure, the connector first performs a schema snapshot. You can then initiate an incremental snapshot to enable the connector to synchronize data. (在此过程中,连接器首先执行模式快照。然后,您可以启动增量快照以使连接器能够同步数据。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 no_data

    2. 编辑 table.include.list 以添加要捕获的表。

  5. Restart the connector. (重新启动连接器。)

  6. Wait for Debezium to capture the schema of the new and existing tables. Data changes that occurred any tables after the connector stopped are not captured. (等待 Debezium 捕获新表和现有表的模式。连接器停止后在任何表上发生的数据更改都不会被捕获。)

  7. 为确保不丢失任何数据,请发起 增量快照

Procedure 2: Initial snapshot, followed by optional incremental snapshot (过程 2:初始快照,然后是可选的增量快照)

In this procedure the connector performs a full initial snapshot of the database. As with any initial snapshot, in a database with many large tables, running an initial snapshot can be a time-consuming operation. After the snapshot completes, you can optionally trigger an incremental snapshot to capture any changes that occur while the connector is off-line. (在此过程中,连接器将执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一项耗时的操作。快照完成后,您可以选择触发增量快照以捕获连接器离线期间发生的任何更改。)

  1. Stop the connector. (停止连接器。)

  2. 删除由 schema.history.internal.kafka.topic 属性指定的内部数据库模式历史记录主题。

  3. Clear the offsets in the configured Kafka Connect offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connect offset.storage.topic 中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)

    Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)

  4. 编辑 table.include.list 以添加要捕获的表。

  5. Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)

    1. snapshot.mode 属性的值设置为 initial

    2. (可选)将 schema.history.internal.store.only.captured.tables.ddl 设置为 false

  6. Restart the connector. The connector takes a full database snapshot. After the snapshot completes, the connector transitions to streaming. (重新启动连接器。连接器将进行完整的数据库快照。快照完成后,连接器将过渡到流式传输。)

  7. (可选)要捕获连接器离线期间发生的任何数据更改,请发起 增量快照

临时快照

By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)

However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment (但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 提供了一个执行即席快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行即席快照:)

  • The connector configuration is modified to capture a different set of tables. (修改了连接器配置以捕获不同的表集。)

  • Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)

  • Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)

You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table. (您可以通过启动所谓的即席快照来重新运行之前捕获了快照的表的快照。即席快照需要使用信号表。通过向 Debezium 信号表发送信号请求来启动即席快照。)

When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled. (当您启动现有表的即席快照时,连接器会将内容追加到该表已存在的主题中。如果之前存在的主题已被删除,并且启用了自动主题创建,则 Debezium 可以自动创建主题。)

Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database. (即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的部分内容。此外,快照还可以捕获数据库中表的部分内容。)

You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table (您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incrementalblocking,并提供要在快照中包含的表名称,如下表所示:)

表 2. execute-snapshot 信号记录的示例
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。)
Currently, you can request incremental or blocking snapshots. (目前,您可以请求 incrementalblocking 快照。)

data-collections (数据集合)

N/A

An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. (一个包含正则表达式的数组,匹配要包含在快照中的表的完全限定名称。)
对于 Oracle 连接器,请使用以下格式指定表的完全限定名:database.schema.table

additional-conditions (附加条件)

N/A

An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:)

data-collection (数据集合)

The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. (过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。)

filter (过滤器)

Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

surrogate-key (代理键)

N/A

An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. (一个可选字符串,指定连接器在快照过程中用作表主键的列名。)

Triggering an ad hoc incremental snapshot (触发即席增量快照)

通过向信号表添加具有 execute-snapshot 信号类型的条目,或 将信号消息发送到 Kafka 信号主题 来发起临时增量快照。连接器处理消息后,将开始快照操作。快照过程读取第一个和最后一个主键值,并将这些值作为每个表的起始和结束点。根据表中的条目数和配置的块大小,Debezium 将表分成块,然后逐个快照每个块。

有关更多信息,请参阅 增量快照

Triggering an ad hoc blocking snapshot (触发即席阻塞快照)

You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming. (您可以通过向信号表或信号主题添加具有 execute-snapshot 信号类型的条目来启动即席阻塞快照。在连接器处理消息后,它将开始快照操作。连接器将暂时停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。)

有关更多信息,请参阅 阻塞快照

增量快照

To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)

在增量快照中,Debezium 不像初始快照那样一次性捕获数据库的完整状态,而是分阶段、分可配置的块来捕获每个表。您可以指定要快照捕获的表以及 每个块的大小。块大小决定了快照在数据库上每次提取操作期间收集的行数。增量快照的默认块大小为 1024 行。

As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process (随着增量快照的进行,Debezium 使用水位标记来跟踪其进度,并维护捕获的每个表行的记录。与标准的初始快照过程相比,这种分阶段的数据捕获方法提供了以下优势:)

  • You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)

  • If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning. (如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在过程恢复后,快照将从停止的点开始,而不是从头重新捕获表。)

  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以在 table.include.list 属性中添加表后,您可能需要重新运行快照。

Incremental snapshot process (增量快照过程)

运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据 配置的块大小 将表分成块。逐块工作,它捕获块中的每一行。对于捕获的每一行,快照都会发出一个 READ 事件。该事件表示快照开始时行的值。

As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka. (随着快照的进行,其他进程很可能会继续访问数据库,可能修改表记录。为了反映这些更改,INSERTUPDATEDELETE 操作照常提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些变更事件,并将相应的变更事件记录发送到 Kafka。)

How Debezium resolves collisions among records with the same primary key (Debezium 如何解决具有相同主键的记录之间的冲突)

In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka. (在某些情况下,流式传输过程发出的 UPDATEDELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前发出修改表行的事件。当快照最终发出行的相应 READ 事件时,其值已过时。为了确保乱序到达的增量快照事件按正确的逻辑顺序处理,Debezium 采用缓冲方案来解决冲突。仅当快照事件与流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。)

Snapshot window (快照窗口)

To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key. (为了帮助解决延迟到达的 READ 事件与修改同一表行的流式事件之间的冲突,Debezium 采用所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的间隔。在某个块的快照窗口打开之前,Debezium 会遵循其常规行为,将事务日志中的事件直接向下游发送到目标 Kafka 主题。但在某个块的快照打开到关闭的整个过程中,Debezium 会执行去重步骤以解决具有相同主键的事件之间的冲突。)

For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATEDELETE 操作。)

As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic. (随着快照窗口的打开,Debezium 开始处理快照块,它会将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入的流式事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃已缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。当块的快照窗口关闭后,缓冲区将仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。)

The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)

To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table. (要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。)

Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDBMySQLPostgreSQL)来说,写入权限是可选的。)

Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)

Debezium Oracle 连接器不支持在增量快照运行时进行模式更改。

触发增量快照

To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database. You submit snapshot signals as SQL INSERT queries. (要启动增量快照,您可以将即席快照信号发送到源数据库上的信号表。您将快照信号作为 SQL INSERT 查询提交。)

After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation. (在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。)

The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation. Debezium currently supports the incremental and blocking snapshot types. (您提交的查询指定了要包含在快照中的表,并可选地指定了快照操作的类型。Debezium 目前支持 incrementalblocking 快照类型。)

To specify the tables to include in the snapshot, provide a data-collections array that lists the tables, or an array of regular expressions used to match tables, for example, (要指定要包含在快照中的表,请提供一个列出表的 data-collections 数组,或者一个用于匹配表的正则表达式数组,例如:)

{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}

The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot. (增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会将空数组解释为无需执行任何操作,并且不会执行快照。)

If the name of a table that you want to include in a snapshot contains a dot (.), a space, or some other non-alphanumeric character, you must escape the table name in double quotes. (如果要包含在快照中的表名称包含点 (.)、空格或其他非字母数字字符,则必须用双引号转义表名称。)
例如,要包含存在于 public 模式、db1 数据库中,并且名称为 My.Table 的表,请使用以下格式:"db1.public.\"My.Table\""

先决条件
Using a source signaling channel to trigger an incremental snapshot (使用源信号通道触发增量快照)
  1. Send a SQL query to add the ad hoc incremental snapshot request to the signaling table (发送 SQL 查询将即席增量快照请求添加到信号表)

    INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

    For example, (例如,)

    INSERT INTO db1.myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'execute-snapshot',  (3)
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4)
        "type":"incremental", (5)
        "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)

    The values of the id,type, and data parameters in the command correspond to the fields of the signaling table. (命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 3. 用于将增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    database.schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its own id string as a watermarking signal. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。而是在快照期间,Debezium 生成自己的 id 字符串作为水位标记信号。)

    3

    execute-snapshot

    The type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    A required component of the data field of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot. (信号的 data 字段的必需组件,该字段指定一个表名称数组或匹配要包含在快照中的表名称的正则表达式。)
    数组列出了使用 database.schema.table 格式的正则表达式,以匹配数据集合的完全限定名。此格式与您用于指定连接器 信号表 名称的格式相同。

    5

    incremental (增量)

    An optional type component of the data field of a signal that specifies the type of snapshot operation to run. (信号的 data 字段的可选 type 组件,指定要运行的快照操作的类型。)
    Valid values are incremental and blocking. (有效值为 incrementalblocking。)
    If you do not specify a value, the connector defaults to performing an incremental snapshot. (如果您不指定值,连接器将默认执行增量快照。)

    6

    additional-conditions (附加条件)

    An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
    Each additional condition is an object with data-collection and filter properties. You can specify different filters for each data collection. (每个附加条件是一个具有 data-collectionfilter 属性的对象。您可以为每个数据集合指定不同的过滤器。)
    * data-collection 属性是要应用过滤器的 数据集合的完全限定名。有关 additional-conditions 参数的更多信息,请参阅 使用 additional-conditions 运行临时增量快照

使用 additional-conditions 运行临时增量快照

If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions parameter to the snapshot signal. (如果您希望快照仅包含表内容的子集,则可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。)

The SQL query for a typical snapshot takes the following form (典型快照的 SQL 查询形式如下:)

SELECT * FROM <tableName> ....

By adding an additional-conditions parameter, you append a WHERE condition to the SQL query, as in the following example (通过添加 additional-conditions 参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:)

SELECT * FROM <data-collection> WHERE <filter> ....

The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table (以下示例显示了一个 SQL 查询,用于向信号表发送带有附加条件的即席增量快照请求:)

INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');

For example, suppose you have a products table that contains the following columns (例如,假设您有一个 products 表,其中包含以下列:)

  • id (primary key)

  • color (颜色)

  • quantity (数量)

If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot (如果您希望 products 表的增量快照仅包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:)

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');

The additional-conditions parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10 (additional-conditions 参数还允许您传递基于多个列的条件。例如,使用前面示例中的 products 表,您可以提交一个查询来触发增量快照,其中仅包含 color=bluequantity>10 的项目数据:)

INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');

The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)

Example 1. Incremental snapshot event message (示例 1. 增量快照事件消息)
{
    "before":null,
    "after": {
        "pk":"1",
        "value":"New data"
    },
    "source": {
        ...
        "snapshot":"incremental" (1)
    },
    "op":"r", (2)
    "ts_ms":"1620393591654",
    "ts_us":"1620393591654547",
    "ts_ns":"1620393591654547920",
    "transaction":null
}
表 4. 增量 snapshot 事件消息中字段的描述
Item Field name (字段名) 描述

1

snapshot (快照)

Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。)
Currently, the only valid options are blocking and incremental. (目前,唯一有效选项是 blockingincremental。)
Specifying a type value in the SQL query that you submit to the signaling table is optional. (在您提交给信号表的 SQL 查询中指定 type 值是可选的。)
If you do not specify a value, the connector runs an incremental snapshot. (如果您不指定值,连接器将运行增量快照。)

2

op (操作)

Specifies the event type. (指定事件类型。)
The value for snapshot events is r, signifying a READ operation. (快照事件的值为 r,表示 READ 操作。)

使用 Kafka 信号通道触发增量快照

You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshotdata 字段必须具有以下字段:)

表 5. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports the incremental and blocking types. (要执行的快照类型。目前 Debezium 支持 incrementalblocking 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。)
使用与 signal.data.collection 配置选项相同的格式指定名称。

additional-conditions (附加条件)

N/A

An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。)
Each additional condition is an object that specifies the criteria for filtering the data that an ad hoc snapshot captures. You can set the following parameters for each additional condition: data-collection:: The fully-qualified name of the table that the filter applies to. You can apply different filters to each table. filter:: Specifies column values that must be present in a database record for the snapshot to include it, for example, "color='blue'". (每个附加条件是一个对象,指定即席快照捕获的数据的过滤标准。您可以为每个附加条件设置以下参数:data-collection:: 过滤器适用的表的完全限定名称。您可以为每个表应用不同的过滤器。filter:: 指定数据库记录中必须存在的列值,快照才能包含它,例如 "color='blue'"。)

The values that you assign to the filter parameter are the same types of values that you might specify in the WHERE clause of SELECT statements when you set the snapshot.select.statement.overrides property for a blocking snapshot. (您赋给 filter 参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的类型的值相同。)

Example 2. An execute-snapshot Kafka message (示例 2. 一个 execute-snapshot Kafka 消息)
Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Ad hoc incremental snapshots with additional-conditions (带有附加条件的即席增量快照)

Debezium uses the additional-conditions field to select a subset of a table’s content. (Debezium 使用 additional-conditions 字段来选择表内容的子集。)

Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)

SELECT * FROM <tableName> …​.

When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collectionfilter 参数将被附加到 SQL 查询中,例如:)

SELECT * FROM <data-collection> WHERE <filter> …​.

For example, given a products table with the columns id (primary key), color, and brand, if you want a snapshot to include only content for which color='blue', when you request the snapshot, you could add the additional-conditions property to filter the content: :leveloffset: +1 (例如,给定一个具有 id(主键)、colorbrand 列的 products 表,如果您希望快照仅包含 color='blue' 的内容,在请求快照时,您可以添加 additional-conditions 属性来过滤内容::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`

You can also use the additional-conditions property to pass conditions based on multiple columns. For example, using the same products table as in the previous example, if you want a snapshot to include only the content from the products table for which color='blue', and brand='MyBrand', you could send the following request: :leveloffset: +1 (您还可以使用 additional-conditions 属性来传递基于多个列的条件。例如,使用前面示例中的相同 products 表,如果您希望快照仅包含 products 表中 color='blue'brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1)

Key = `test_connector`

Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`

停止增量快照

In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database. (在某些情况下,可能需要停止增量快照。例如,您可能发现快照配置不正确,或者您想确保资源可用于其他数据库操作。您可以通过向源数据库上的信号表发送信号来停止正在运行的快照。)

You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT query. The stop-snapshot signal specifies the type of the snapshot operation as incremental, and optionally specifies the tables that you want to omit from the currently running snapshot. After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress. (您通过在 SQL INSERT 查询中发送来将停止快照信号提交到信号表。stop-snapshot 信号将快照操作的 type 指定为 incremental,并可选地指定您希望从当前正在运行的快照中排除的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在增量快照操作进行中时停止它。)

Additional resources (附加资源)

您还可以通过向 Kafka 信号主题 发送 JSON 消息来停止增量快照。

先决条件
Using a source signaling channel to stop an incremental snapshot (使用源信号通道停止增量快照)
  1. Send a SQL query to stop the ad hoc incremental snapshot to the signaling table (发送 SQL 查询以停止添加到信号表的即席增量快照)

    INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');

    For example, (例如,)

    INSERT INTO db1.myschema.debezium_signal (id, type, data) (1)
    values ('ad-hoc-1',   (2)
        'stop-snapshot',  (3)
        '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4)
        "type":"incremental"}'); (5)

    The values of the id, type, and data parameters in the signal command correspond to the fields of the signaling table. (信号命令中的 idtypedata 参数的值对应于信号表的字段。)
    The following table describes the parameters in the example (下表描述了示例中的参数:)

    表 6. 用于将停止增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述
    Item Value (值) 描述

    1

    database.schema.debezium_signal

    Specifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)

    2

    ad-hoc-1

    The id parameter specifies an arbitrary string that is assigned as the id identifier for the signal request. (id 参数指定一个任意字符串,该字符串被分配为信号请求的 id 标识符。)
    Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。)

    3

    stop-snapshot

    Specifies type parameter specifies the operation that the signal is intended to trigger. (type 参数指定信号旨在触发的操作。)

    4

    data-collections (数据集合)

    An optional component of the data field of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot. (信号的 data 字段的可选组件,指定一个表名称数组或匹配要从快照中删除的表名称的正则表达式。)
    数组列出了匹配表名称的正则表达式,格式为 database.schema.table

    If you omit this component from the data field, the signal stops the entire incremental snapshot that is in progress. (如果您从 data 字段中省略此组件,则信号将停止正在进行的整个增量快照。)

    5

    incremental (增量)

    A required component of the data field of a signal that specifies the type of snapshot operation to be stopped. (信号的 data 字段的必需组件,指定要停止的快照操作的类型。)
    Currently, the only valid option is incremental. (目前,唯一有效选项是 incremental。)
    If you do not specify a type value, the signal fails to stop the incremental snapshot. (如果您不指定 type 值,信号将无法停止增量快照。)

使用 Kafka 信号通道停止增量快照

You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)

The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)

The value of the message is a JSON object with type and data fields. (消息的值是一个带有 typedata 字段的 JSON 对象。)

The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshotdata 字段必须具有以下字段:)

表 7. 执行 snapshot 数据字段
Field (字段) Default (默认值) Value (值)

type

incremental (增量)

The type of the snapshot to be executed. Currently Debezium supports only the incremental type. (要执行的快照类型。目前 Debezium 仅支持 incremental 类型。)
See the next section for more details. (有关更多详细信息,请参阅下一节。)

data-collections (数据集合)

N/A

An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. (一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的表名称或匹配表名称的正则表达式数组。)
使用 database.schema.table 格式指定表名。

The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)

Key = `test_connector`

Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`

Custom snapshotter SPI (自定义快照器 SPI)

For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)

io.debezium.snapshot.spi.Snapshotter

Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)

io.debezium.snapshot.spi.SnapshotQuery

Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)

io.debezium.snapshot.spi.SnapshotLock

Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)

io.debezium.snapshot.spi.Snapshotter interface. All built-in snapshot modes implement this interface. (io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)
/**
 * {@link Snapshotter} is used to determine the following details about the snapshot process:
 * <p>
 * - Whether a snapshot occurs. <br>
 * - Whether streaming continues during the snapshot. <br>
 * - Whether the snapshot includes schema (if supported). <br>
 * - Whether to snapshot data or schema following an error.
 * <p>
 * Although Debezium provides many default snapshot modes,
 * to provide more advanced functionality, such as partial snapshots,
 * you can customize implementation of the interface.
 * For more information, see the documentation.
 *
 *
 *
 */
@Incubating
public interface Snapshotter extends Configurable {

    /**
     * @return the name of the snapshotter.
     *
     *
     */
    String name();

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a data snapshot
     */
    boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
     * @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
     *
     * @return {@code true} if the snapshotter should take a schema snapshot
     */
    boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);

    /**
     * @return {@code true} if the snapshotter should stream after taking a snapshot
     */
    boolean shouldStream();

    /**
     * @return {@code true} whether the schema can be recovered if database schema history is corrupted.
     */
    boolean shouldSnapshotOnSchemaError();

    /**
     * @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
     */
    boolean shouldSnapshotOnDataError();

    /**
     *
     * @return {@code true} if streaming should resume from the start of the snapshot
     * transaction, or {@code false} for when a connector resumes and takes a snapshot,
     * streaming should resume from where streaming previously left off.
     */
    default boolean shouldStreamEventsStartingFromSnapshot() {
        return true;
    }

    /**
     * Lifecycle hook called after the snapshot phase is successful.
     */
    default void snapshotCompleted() {
        // no operation
    }

    /**
     * Lifecycle hook called after the snapshot phase is aborted.
     */
    default void snapshotAborted() {
        // no operation
    }
}
io.debezium.snapshot.spi.SnapshotQuery interface. All built-in snapshot query modes implement this interface. (io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)
/**
 * {@link SnapshotQuery} is used to determine the query used during a data snapshot
 *
 *
 */
public interface SnapshotQuery extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Generate a valid query string for the specified table, or an empty {@link Optional}
     * to skip snapshotting this table (but that table will still be streamed from)
     *
     * @param tableId the table to generate a query for
     * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
     *                              include/exclude filters
     * @return a valid query string, or none to skip snapshotting this table
     */
    Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);

}
io.debezium.snapshot.spi.SnapshotLock interface. All built-in snapshot lock modes implement this interface. (io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)
/**
 * {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
 *
 *
 */
public interface SnapshotLock extends Configurable, Service {

    /**
     * @return the name of the snapshot lock.
     *
     *
     */
    String name();

    /**
     * Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
     * implementation.
     */
    Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);

}

阻塞快照

To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)

A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)

You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)

  • You add a new table and you want to complete the snapshot while the connector is running. (添加了一个新表,并且您希望在连接器运行时完成快照。)

  • You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot. (添加了一个大表,并且您希望快照的完成时间比增量快照更快。)

Blocking snapshot process (阻塞快照过程)

When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed. (运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,流式传输将恢复。)

Configure snapshot (配置快照)

You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)

  • data-collections: to specify which tables must be snapshot. (data-collections:用于指定必须进行快照的表。)

  • data-collections: Specifies the tables that you want the snapshot to include. (data-collections:指定您希望快照包含的表。)
    This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of the table.include.list property, which specifies the tables to capture in a blocking snapshot. (此属性接受逗号分隔的正则表达式列表,这些表达式匹配完全限定的表名。此属性的行为类似于 table.include.list 属性的行为,后者指定要在阻塞快照中捕获的表。)

  • additional-conditions: You can specify different filters for different table. (additional-conditions:您可以为不同的表指定不同的过滤器。)

    • The data-collection property is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. (data-collection 属性是过滤器将应用的表的完全限定名称,并且根据数据库的不同,可能区分大小写或不区分大小写。)

    • The filter property will have the same value used in the snapshot.select.statement.overrides, the fully-qualified name of the table that should match by case. (filter 属性将具有与 snapshot.select.statement.overrides 中使用的值相同的值,即应区分大小写匹配的表的完全限定名称。)

For example (例如:)

  {"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
Possible duplicates (可能重复)

A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)

主题名称

默认情况下,Oracle 连接器将表中发生的 INSERTUPDATEDELETE 操作的所有更改事件写入一个特定于该表的 Apache Kafka 主题。连接器使用以下约定来命名更改事件主题:

topicPrefix.schemaName.tableName

以下列表提供了默认名称组件的定义:

topicPrefix

topic.prefix 连接器配置属性指定的主题前缀

schemaName (模式名称)

操作发生的模式(schema)的名称。

tableName

操作发生的表的名称。

例如,如果 fulfillment 是服务器名称,inventory 是模式名称,并且数据库包含名为 orderscustomersproducts 的表,则 Debezium Oracle 连接器会将事件分别发送到以下 Kafka 主题,每个表一个:

fulfillment.inventory.orders
fulfillment.inventory.customers
fulfillment.inventory.products

连接器应用类似的命名约定来标记其内部数据库模式历史记录主题、模式更改主题事务元数据主题

如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由

模式历史记录主题

当数据库客户端查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以随时更改,这意味着连接器必须能够识别在记录每次插入、更新或删除操作时模式是什么。此外,连接器不一定能将当前模式应用于每个事件。如果事件相对较旧,它可能是在应用当前模式之前记录的。

为了确保对模式更改后发生的事件的正确处理,Oracle 在重做日志中不仅包含影响数据的行级更改,还包含应用于数据库的 DDL 语句。当连接器在重做日志中遇到这些 DDL 语句时,它会解析它们并更新每个表模式的内存表示。连接器使用此模式表示来识别每个插入、更新或删除操作发生时的表结构,并生成相应的更改事件。在单独的数据库模式历史记录 Kafka 主题中,连接器记录所有 DDL 语句以及每个 DDL 语句出现在重做日志中的位置。

当连接器在崩溃或正常停止后重新启动时,它将从特定位置(即特定时间点)开始读取重做日志。连接器通过读取数据库模式历史记录 Kafka 主题并解析重做日志中连接器正在开始的位置之前的所有 DDL 语句来重建此时间点存在的表结构。

此数据库模式历史记录主题仅供内部连接器使用。可选地,连接器还可以 将模式更改事件发送到另一个供使用者应用程序使用的主题

Additional resources (附加资源)

模式更改主题

您可以配置 Debezium Oracle 连接器以生成描述应用于数据库表中结构更改的模式更改事件。连接器将模式更改事件写入名为 <serverName> 的 Kafka 主题,其中 serverName 是在 topic.prefix 配置属性中指定的命名空间。

每当连接器从新表流式传输数据或表结构发生更改时,Debezium 都会向模式更改主题发出新消息。

在表结构发生更改后,您必须遵循(模式演变过程)。

连接器发送到模式更改主题的消息包含一个 payload,并且可以选择性地包含更改事件消息的模式。

模式变更事件的模式具有以下元素:

name (名称)

模式变更事件消息的名称。

type

事件消息类型的类型。

version (版本)

模式的版本。版本是一个整数,每次模式更改时都会递增。

fields (字段)

变更事件消息中包含的字段。

示例:Oracle 连接器模式更改主题的模式

以下示例以 JSON 格式显示了典型的模式。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "inventory"
  }
}

模式变更事件消息的 payload 包含以下元素:

ddl

提供导致模式更改的 SQL CREATEALTERDROP 语句。

databaseName (数据库名称)

应用于语句的数据库名称。databaseName 的值用作消息键。

tableChanges (表变更)

模式更改后整个表模式的结构化表示。tableChanges 字段包含一个数组,其中包含表每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者无需先通过 DDL 解析器处理消息即可轻松读取消息。

默认情况下,连接器使用 ALL_TABLES 数据库视图来识别要存储在模式历史记录主题中的表名。在此视图中,连接器只能访问通过其连接到数据库的用户账户可用的表的。数据。

您可以修改设置,使模式历史记录主题存储不同的表子集。使用以下方法之一来更改主题存储的表集:

当连接器配置为捕获表时,它会将该表模式更改的历史记录不仅存储在模式更改主题中,还存储在内部数据库模式历史记录主题中。内部数据库模式历史记录主题仅供连接器使用,不打算由消费应用程序直接使用。确保需要模式更改通知的应用程序仅从模式更改主题中获取该信息。

切勿分区数据库模式历史主题。为了使数据库模式历史主题正常工作,它必须维护连接器发送到它的事件记录的一致的全局顺序。

为了确保主题不会在分区之间分割,请使用以下任一方法设置主题的分区计数:

  • 如果您手动创建数据库模式历史主题,请指定分区计数为 1

  • 如果您使用 Apache Kafka 代理自动创建数据库模式历史主题,则会创建该主题,请将 Kafka num.partitions 配置选项的值设置为 1

模式更改主题消息格式处于孵化状态,可能会在不通知的情况下更改。

示例:发送到 Oracle 连接器模式更改主题的消息

以下示例以 JSON 格式显示了典型的模式变更消息。该消息包含表模式的逻辑表示。

{
  "schema": {
  ...
  },
  "payload": {
    "source": {
      "version": "3.3.1.Final",
      "connector": "oracle",
      "name": "server1",
      "ts_ms": 1588252618953,
      "ts_us": 1588252618953000,
      "ts_ns": 1588252618953000000,
      "snapshot": "true",
      "db": "ORCLPDB1",
      "schema": "DEBEZIUM",
      "table": "CUSTOMERS",
      "txId" : null,
      "scn" : "1513734",
      "commit_scn": "1513754",
      "lcr_position" : null,
      "rs_id": "001234.00012345.0124",
      "ssn": 1,
      "redo_thread": 1,
      "user_name": "user",
      "row_id": "AAASgjAAMAAAACnAAA",
      "commit_ts_ms": 1588252619012,
      "start_scn": "1513734",
      "start_ts_ms": 1588252618953
    },
    "ts_ms": 1588252618953, (1)
    "ts_us": 1588252618953987, (1)
    "ts_ns": 1588252618953987512, (1)
    "databaseName": "ORCLPDB1", (2)
    "schemaName": "DEBEZIUM", //
    "ddl": "CREATE TABLE \"DEBEZIUM\".\"CUSTOMERS\" \n   (    \"ID\" NUMBER(9,0) NOT NULL ENABLE, \n    \"FIRST_NAME\" VARCHAR2(255), \n    \"LAST_NAME" VARCHAR2(255), \n    \"EMAIL\" VARCHAR2(255), \n     PRIMARY KEY (\"ID\") ENABLE, \n     SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n   ) SEGMENT CREATION IMMEDIATE \n  PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 \n NOCOMPRESS LOGGING\n  STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645\n  PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1\n  BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT)\n  TABLESPACE \"USERS\" ", (3)
    "tableChanges": [ (4)
      {
        "type": "CREATE", (5)
        "id": "\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMERS\"", (6)
        "table": { (7)
          "defaultCharsetName": null,
          "primaryKeyColumnNames": [ (8)
            "ID"
          ],
          "columns": [ (9)
            {
              "name": "ID",
              "jdbcType": 2,
              "nativeType": null,
              "typeName": "NUMBER",
              "typeExpression": "NUMBER",
              "charsetName": null,
              "length": 9,
              "scale": 0,
              "position": 1,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "FIRST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 2,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "LAST_NAME",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 3,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            },
            {
              "name": "EMAIL",
              "jdbcType": 12,
              "nativeType": null,
              "typeName": "VARCHAR2",
              "typeExpression": "VARCHAR2",
              "charsetName": null,
              "length": 255,
              "scale": null,
              "position": 4,
              "optional": false,
              "autoIncremented": false,
              "generated": false
            }
          ],
          "attributes": [ (10)
            {
              "customAttribute": "attributeValue"
            }
          ]
        }
      }
    ]
  }
}
表 8. 发送到模式更改主题的消息中字段的描述
Item Field name (字段名) 描述

1

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

在 source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。

2

databaseName (数据库名称)
schemaName (模式名称)

标识包含更改的数据库和模式。

3

ddl

此字段包含导致模式更改的 DDL。

4

tableChanges (表变更)

包含 DDL 命令生成的模式变更的一个或多个项目的数组。

5

type

描述了更改的类型。type 可以设置为以下值之一:

CREATE (创建)

表已创建。

ALTER (修改)

表已修改。

DROP (删除)

表已删除。

6

id

已创建、修改或删除的表的完整标识符。在表重命名的情况下,此标识符是 <old>,<new> 表名称的串联。

7

table (表)

表示应用更改后表的元数据。

8

primaryKeyColumnNames (主键列名)

组成表主键的列的列表。

9

columns (列)

已更改表中每个列的元数据。

10

attributes (属性)

每个表变更的自定义属性元数据。

在连接器发送到模式更改主题的消息中,消息键是包含模式更改的数据库的名称。在下面的示例中,payload 字段包含 databaseName 键:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "databaseName"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.oracle.SchemaChangeKey",
    "version": 1
  },
  "payload": {
    "databaseName": "ORCLPDB1"
  }
}

事务元数据

Debezium 可以生成代表事务元数据边界的事件,并丰富数据更改事件消息。

Debezium 接收事务元数据的限制

Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。

数据库事务由 BEGINEND 关键字括起来的语句块表示。Debezium 为每个事务中的 BEGINEND 分隔符生成事务边界事件。事务边界事件包含以下字段:

status

BEGINEND

id

唯一事务标识符的字符串表示。

ts_ms

数据源中事务边界事件(BEGINEND 事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段将代表 Debezium 处理事件的时间。

event_count(针对 END 事件)

事务发出的事件总数。

data_collections(针对 END 事件)

一对 data_collectionevent_count 元素的数组,指示连接器为源自数据集合的更改发出的事件数量。

以下示例显示了一个典型的事务边界消息:

示例:Oracle 连接器事务边界事件
{
  "status": "BEGIN",
  "id": "5.6.641",
  "ts_ms": 1486500577125,
  "event_count": null,
  "data_collections": null
}

{
  "status": "END",
  "id": "5.6.641",
  "ts_ms": 1486500577691,
  "event_count": 2,
  "data_collections": [
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.CUSTOMER",
      "event_count": 1
    },
    {
      "data_collection": "ORCLPDB1.DEBEZIUM.ORDER",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则连接器会将事务事件发送到 <topic.prefix>.transaction 主题。

数据更改事件的丰富

启用事务元数据后,数据消息 Envelope 将会添加一个 transaction 字段。此字段提供有关每个事件的信息,形式为字段的组合:

id

唯一事务标识符的字符串表示。

total_order

事件在事务生成的所有事件中的绝对位置。

data_collection_order

事件在事务发出的所有事件中,每个数据集合的位置。

以下示例显示了一个典型的事务事件消息:

{
  "before": null,
  "after": {
    "pk": "2",
    "aa": "1"
  },
  "source": {
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "ts_us": "1580390884335741",
  "ts_ns": "1580390884335741963",
  "transaction": {
    "id": "5.6.641",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

LogMiner 挖掘策略

Oracle 重做日志中的条目不存储用户提交以进行 DML 更改的原始 SQL 语句。相反,重做条目包含一组更改向量和一组对象标识符,这些标识符代表与这些向量相关的表空间、表和列。换句话说,重做日志条目不包含受 DML 更改影响的模式、表或列的名称。

Debezium Oracle 连接器使用 log.mining.strategy 配置属性来控制 Oracle LogMiner 如何处理更改向量中对象标识符的查找。在某些情况下,一种日志挖掘策略可能比另一种在模式更改方面更可靠。但是,在选择日志挖掘策略之前,重要的是要考虑它可能对性能和开销产生的影响。

将数据字典写入重做日志

redo_log_catalog 挖掘策略指示数据库在每次重做日志切换后立即将数据字典的副本刷新到重做日志。这是跟踪与数据更改交织的模式更改的最可靠策略,因为 Oracle LogMiner 有一种方法可以在一系列更改向量之间内插起始和结束数据字典状态。

然而,redo_log_catalog 模式也是最昂贵的,因为它需要几个关键步骤才能正常工作。首先,此模式要求在每次日志切换后将数据字典刷新到重做日志。每次切换后刷新日志会迅速消耗归档日志中的宝贵空间,并且大量的归档日志可能超过数据库管理员准备的数量。如果您打算使用此模式,请与您的数据库管理员协调,以确保数据库配置正确。

如果您将连接器配置为使用 redo_log_catalog 模式,请不要使用多个 Debezium Oracle 连接器从同一逻辑数据库捕获更改。

直接使用在线目录

默认策略模式 online_catalog 的工作方式与 redo_log_catalog 模式不同。当策略设置为 online_catalog 时,数据库永远不会将数据字典刷新到重做日志。相反,Oracle LogMiner 始终使用最新的数据字典状态来执行比较。通过始终使用当前字典并消除刷新到重做日志,此策略需要的开销更少,并且运行效率更高。然而,这些好处被无法解析交织的模式更改和数据更改所抵消。因此,此策略有时可能导致事件失败。

如果 LogMiner 在模式更改后无法可靠地重建 SQL,请检查重做日志以获取证据。查找引用表名(例如 OBJ# 123456,其中数字是表的 对象标识符)或列名(例如 COL1COL2)的条目。当您将连接器配置为使用 online_catalog 策略时,请采取措施确保表模式及其索引保持静态且没有更改。如果 Debezium 连接器配置为使用 online_catalog 模式,并且您必须应用模式更改,请执行以下步骤:

  1. 等待连接器捕获所有现有数据更改 (DML)。

  2. 执行模式 (DDL) 更改,然后等待连接器捕获更改。

  3. 恢复对表的 数据更改 (DML)

遵循此过程有助于确保 Oracle LogMiner 可以安全地重建所有数据更改的 SQL。

混合方法

您可以通过将 log.mining.strategy 配置属性的值设置为 hybrid 来启用此策略。此策略的目标是提供 redo_log_catalog 策略的可靠性与 online_catalog 策略的性能和低开销,而不会产生任一策略的缺点。

hybrid 策略主要在 online_catalog 模式下运行,这意味着 Debezium Oracle 连接器首先将事件重建委托给 Oracle LogMiner。如果 Oracle LogMiner 成功重建了 SQL,Debezium 将正常处理该事件,就像它配置为使用 online_catalog 策略一样。如果连接器检测到 Oracle LogMiner 无法重建 SQL,则连接器将尝试直接通过使用该表的模式历史记录来重建 SQL。只有当 Oracle LogMiner 和连接器都无法重建 SQL 时,连接器才会报告失败。

如果 lob.enabled 属性设置为 true,则不能使用 hybrid 挖掘策略。如果您需要流式传输 CLOBBLOBXML 数据,则只能使用 online_catalogredo_log_catalog 策略。

查询模式

Debezium Oracle 连接器默认与 Oracle LogMiner 集成。此集成需要一组专门的步骤,包括生成复杂的 JDBC SQL 查询,以将记录在事务日志中的更改作为更改事件摄取。JDBC SQL 查询使用的 V$LOGMNR_CONTENTS 视图没有任何索引来提高查询的性能,因此有不同的查询模式可以用来控制 SQL 查询的生成方式,以提高查询的执行效率。

可以将 log.mining.query.filter.mode 连接器属性配置为以下之一,以影响 JDBC SQL 查询的生成方式:

none

(默认)此模式创建一个仅基于数据库级别的不同操作类型(如插入、更新或删除)进行过滤的 JDBC 查询。当基于模式、表或用户名包含/排除列表过滤数据时,这是在连接器内的处理循环中完成的。

当从一个更改不多的数据库捕获少量表时,此模式通常很有用。生成的查询非常简单,主要侧重于以低数据库开销尽可能快地读取。

in

此模式创建一个 JDBC 查询,该查询不仅在数据库级别过滤操作类型,还过滤模式、表和用户名包含/排除列表。查询的谓词是使用基于包含/排除列表配置属性中指定值的 SQL in 子句生成的。

当从一个更改繁多的数据库捕获大量表时,此模式通常很有用。生成的查询比 none 模式复杂得多,并且侧重于减少网络开销,并在数据库级别尽可能多地执行过滤。

最后,不要将正则表达式作为模式和表包含/排除配置属性的一部分。使用正则表达式会导致连接器无法匹配这些配置属性的更改,从而导致更改丢失。

regex

此模式创建一个 JDBC 查询,该查询不仅在数据库级别过滤操作类型,还过滤模式、表和用户名包含/排除列表。但是,与 in 模式不同,此模式使用 Oracle REGEXP_LIKE 运算符生成 SQL 查询,根据是否指定包含或排除值,使用合取或析取。

当捕获可以使用少量正则表达式识别的可变数量的表时,此模式通常很有用。生成的查询比任何其他模式都复杂得多,并且侧重于减少网络开销,并在数据库级别尽可能多地执行过滤。

事件缓冲

Oracle 以发生的顺序将所有更改写入重做日志,包括稍后被回滚丢弃的更改。因此,来自不同事务的并发更改是交织在一起的。当连接器首次读取更改流时,由于它无法立即确定哪些更改已提交或已回滚,因此它会暂时将更改事件存储在内部缓冲区中。在更改提交后,连接器将缓冲区中的更改事件写入 Kafka。连接器会丢弃被回滚丢弃的更改事件。

您可以通过设置 log.mining.buffer.type 属性来配置连接器使用的缓冲机制。

默认缓冲区类型使用 memory 配置。在默认的 memory 设置下,连接器使用 JVM 进程的堆内存来分配和管理缓冲的事件记录。如果您使用 memory 缓冲区设置,请确保分配给 Java 进程的内存量能够适应您环境中长期运行的大型事务。

Infinispan

Debezium Oracle 连接器还可以配置为使用 Infinispan 作为其缓存提供程序,支持本地嵌入式模式或远程服务器集群的缓存存储。要使用 Infinispan,必须使用 infinispan_embeddedinfinispan_remote 来配置 log.mining.buffer.type

为了在 Infinispan 缓存配置上提供灵活性,当使用 Infinispan 缓冲事件数据时,连接器期望提供一系列缓存配置属性。请参阅 log.mining.buffer.infinispan.cache 命名空间中的 配置属性。这些配置属性的内容取决于连接器是与远程 Infinispan 集群集成还是使用嵌入式引擎。

例如,以下说明了在使用 Infinispan 嵌入式模式时,事务缓存属性的嵌入式配置外观:

<local-cache name="transactions">
  <persistence passivation="false">
    <file-store read-only="false" preload="true" shared="false">
      <data path="./data"/>
      <index path="./index"/>
    </file-store>
  </persistence>
</local-cache>

深入查看配置,缓存被配置为持久化的。所有缓存都应如此配置,以避免在事务进行中连接器重新启动时丢失事务事件。此外,缓存的存储位置由 path 属性定义,并且这应该是所有可能运行时环境都可以访问的共享位置。

Infinispan 缓冲区实现使用多个不同名称的缓存配置。应该为 transactionseventsprocessed-transactionsschema-changes 定义缓存。每个配置都可以根据您的性能需求进行调整,或者除了缓存名称外,都可以相同。

当将 XML 配置作为 JSON 连接器属性值提供时,必须省略换行符或用 \n 字符替换。

另一个示例,以下说明了使用 Infinispan 集群配置的相同缓存:

<distributed-cache name="transactions" statistics="true">
  <encoding media-type="application/x-protostream" />
  <persistence passivation="false">
   <file-store read-only="false" preload="true" shared="false">
     <data path="./data"/>
     <index path="./index"/>
   </file-store>
  </persistence>
</distributed-cache>

与前面的示例中的嵌入式本地缓存配置一样,此配置也定义为持久化的。所有缓存都应如此配置,以避免在事务进行中连接器重新启动时丢失事务事件。

然而,有几点值得注意的差异。首先,缓存被定义为分布式缓存而不是本地缓存。其次,缓存被定义为使用 application/x-protostream 编码,这是所有 Debezium 缓存所必需的。最后,在文件存储定义上不需要 path 属性,因为 Infinispan 集群将自动处理此问题。

Infinispan 缓冲区类型被认为是孵化状态;缓存格式可能在版本之间发生变化,并可能需要重新快照。迁移说明将表明是否需要。

此外,当移除使用 Infinispan 缓冲区的 Debezium Oracle 连接器时,持久化的缓存文件不会从磁盘自动删除。如果同一缓冲区位置将由新的连接器部署使用,则应在部署新连接器之前手动删除文件。

Infinispan Hotrod 客户端集成

Debezium Oracle 连接器使用 Hotrod 客户端与 Infinispan 集群进行通信。任何以 log.mining.buffer.infinispan.client. 作为前缀的连接器属性都将直接传递到 Hotrod 客户端,使用 infinispan.client. 命名空间,从而允许完全自定义客户端如何与集群交互。

在使用此 Infinspan 模式时,至少需要一个必需的配置属性:

log.mining.buffer.infinispan.client.hotrod.server_list

指定 Infinispan 服务器主机名和端口组合的列表,使用 <hostname>:<port> 格式。

SCN 间隙检测

当 Debezium Oracle 连接器配置为使用 LogMiner 时,它通过使用基于系统更改号 (SCN) 的起始和结束范围来收集 Oracle 的更改事件。连接器自动管理此范围,根据连接器是能够近乎实时地流式传输更改,还是由于数据库中的大事务或批量事务而必须处理积压的更改,来增加或减小该范围。

在某些情况下,Oracle 数据库会跳过 SCN,而不是以恒定速率增加 SCN 值。这种 SCN 值的跳跃可能由于特定集成与数据库的交互方式,或者由于热备份等事件而发生。

Debezium Oracle 连接器依赖以下配置属性来检测 SCN 间隙并调整挖掘范围。

log.mining.scn.gap.detection.gap.size.min

指定最小间隙大小。

log.mining.scn.gap.detection.time.interval.max.ms

指定最大时间间隔。

连接器首先比较当前 SCN 和当前挖掘范围中的最高 SCN 之间的更改数。如果当前 SCN 值与前一个挖掘范围结束处的 SCN 值之间的差值大于最小间隙大小,则连接器可能已检测到 SCN 间隙。要确认是否存在间隙,连接器接下来比较当前 SCN 的时间戳和前一个挖掘范围结束处的 SCN 的时间戳。如果时间戳之间的差值小于最大时间间隔,则 SCN 间隙的存在得到确认。

当发生 SCN 间隙时,Debezium 连接器会自动使用当前 SCN 作为当前挖掘会话范围的结束点。这允许连接器快速赶上实时事件,而无需挖掘中间的较小范围,因为 SCN 值被意外的大数增加而没有返回任何更改。当连接器响应 SCN 间隙执行上述步骤时,它会忽略 log.mining.batch.size.max 属性指定的值。在连接器完成挖掘会话并赶上实时事件后,它将恢复对最大日志挖掘批处理大小的强制执行。

SCN 间隙检测仅在连接器运行时并且正在处理近乎实时事件时,当发生大的 SCN 增量时才可用。

低更改频率偏移量管理

Debezium Oracle 连接器在连接器偏移量中跟踪系统更改号,以便在连接器重新启动时,它可以从上次停止的地方开始。这些偏移量是每个发出的更改事件的一部分;但是,当数据库更改频率低时(每几个小时或几天一次),偏移量可能会过时,并在系统更改号不再存在于事务日志中时阻止连接器成功重启。

对于使用非 CDB 模式连接到 Oracle 的连接器,您可以启用 heartbeat.interval.ms 来强制连接器定期发出心跳事件,以使偏移量保持同步。

对于使用 CDB 模式连接到 Oracle 的连接器,保持同步更加复杂。您不仅必须设置 heartbeat.interval.ms,还需要设置 heartbeat.action.query。必须同时指定这两个属性,因为在 CDB 模式下,连接器专门跟踪 PDB 内部的更改。需要一个补充机制来触发来自可插拔数据库内部的更改事件。定期,心跳操作查询会导致连接器在可插拔数据库中插入新表行或更新现有行。Debezium 检测表更改并为其发出更改事件,确保偏移量保持同步,即使在处理更改不频繁的可插拔数据库中也是如此。

为了使连接器能够使用 heartbeat.action.query 与不属于 连接器用户账户 的表,您必须授予连接器用户对这些表运行必要的 INSERTUPDATE 查询的权限。

数据更改事件

Oracle 连接器发出的每个数据更改事件都有一个键和一个值。键和值的结构取决于更改事件的来源表。有关 Debezium 如何构造主题名称的信息,请参阅 主题名称

Debezium Oracle 连接器确保所有 Kafka Connect 模式名称都是 有效的 Avro 模式名称。这意味着逻辑服务器名称必须以字母字符或下划线([a-z,A-Z,_])开头,并且逻辑服务器名称的其余字符以及模式和表名称中的所有字符必须是字母数字字符或下划线([a-z,A-Z,0-9,\_])。连接器会自动用下划线字符替换无效字符。

当多个逻辑服务器名称、模式名称或表名称之间唯一的区分字符是无效字符,并且这些字符被替换为下划线时,可能会导致意外的命名冲突。

Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。然而,这些事件的结构可能会随着时间的推移而变化,这对于主题消费者来说可能难以处理。为了方便处理可变事件结构,Kafka Connect 中的每个事件都是自包含的。每个消息键和值都有两部分:模式payload。模式描述了 payload 的结构,而 payload 包含实际数据。

连接器可以捕获用户启动或应用程序级别的操作导致的 DDL 和 DML 更改。它还可以捕获源自 SYSSYSTEM 账户的 DML 更改。

更改事件键

对于每个已更改的表,更改事件键的结构使得在创建事件时,对于表的主键(或唯一键约束)中的每个列都存在一个字段。

例如,定义在 inventory 数据库模式中的 customers 表可能具有以下更改事件键:

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

如果 <topic.prefix>.transaction 配置属性的值设置为 server1,则数据库中 customers 表中发生的每个更改事件的 JSON 表示形式具有以下键结构:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "ID"
            }
        ],
        "optional": false,
        "name": "server1.INVENTORY.CUSTOMERS.Key"
    },
    "payload": {
        "ID": 1004
    }
}

键的 schema 部分包含一个 Kafka Connect 模式,该模式描述了键部分的结构。在前例中,payload 值不是可选的,其结构由一个名为 server1.DEBEZIUM.CUSTOMERS.Key 的模式定义,并且有一个名为 id 的必需字段,类型为 int32。键的 payload 字段的值表明它确实是一个结构(在 JSON 中只是一个对象),其中包含一个 id 字段,其值为 1004

因此,您可以将此键解释为描述数据库(连接器名为 server1 的输出)中的 inventory.customers 表的行,该行的 id 主键列的值为 1004

更改事件值

更改事件消息中值的结构与消息中更改事件键的结构相对应,并包含模式部分和payload部分。

更改事件值 payload

更改事件值 payload 部分中的信封结构包含以下字段:

op (操作)

一个必需字段,包含一个描述操作类型的字符串值。Oracle 连接器更改事件值 payload 中的 op 字段包含以下值之一:c(创建或插入)、u(更新)、d(删除)或 r(读取,表示快照)。

before

一个可选字段,如果存在,则描述事件发生之前行的状态。结构由 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述,该模式由 server1 连接器用于 inventory.customers 表的所有行。

after

一个可选字段,如果存在,则包含行更改后的状态。结构由用于 before 字段的相同 server1.INVENTORY.CUSTOMERS.Value Kafka Connect 模式描述。

source (源)

一个必需字段,包含一个描述事件源元数据的结构。对于 Oracle 连接器,该结构包括以下字段:

  • Debezium 版本。

  • 连接器名称。

  • 事件是否正在进行的快照的一部分。

  • 事务 ID(快照不包含)。

  • 与数据库提交更改时分配的 SCN(系统更改号)相关的以下值:

scn

数据库用于跟踪事务的唯一标识符。

start_scn

事务开始的 SCN。

start_ts_ms

事务开始的时间。

commit_ts_ms

事务提交的时间。

  • 表示连接器处理事件的时间戳,基于运行 Kafka Connect 任务的 JVM 的系统时钟。对于快照,时间戳表示快照发生的时间。
    连接器在以下字段中报告的时间戳具有不同的精度:

ts_ms

以毫秒为单位提供时间戳。

ts_us

以微秒为单位提供时间戳。

ts_ns

以纳秒为单位提供时间戳。

  • 进行更改的用户名

  • 与行关联的 ROWID

    commit_scn 字段是可选的,它描述了更改事件参与的事务提交的 SCN。仅在使用 LogMiner 连接适配器时才存在此字段。

    仅在使用 LogMiner 连接适配器时才会填充 user_name 字段。

ts_ms

一个可选字段,如果存在,则包含连接器处理事件的时间(基于运行 Kafka Connect 任务的 JVM 的系统时钟)。

更改事件值的模式

事件消息值的模式部分包含一个描述 payload 的信封结构以及其中的嵌套字段。

create 事件

以下示例显示了在更改事件键示例中描述的 customers 表的create 事件值的示例:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "ID"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "FIRST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "LAST_NAME"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "EMAIL"
                    }
                ],
                "optional": true,
                "name": "server1.DEBEZIUM.CUSTOMERS.Value",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": true,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ms"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_us"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ts_ns"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "commit_scn"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "rs_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "ssn"
                    },
                    {
                        "type": "int32",
                        "optional": true,
                        "field": "redo_thread"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "user_name"
                    },
                    {
                        "type": "boolean",
                        "optional": true,
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "row_id"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "commit_ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "start_scn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "start_ts_ms"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.oracle.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_us"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ns"
            }
        ],
        "optional": false,
        "name": "server1.DEBEZIUM.CUSTOMERS.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "source": {
            "version": "3.3.1.Final",
            "name": "server1",
            "ts_ms": 1520085154000,
            "ts_us": 1520085154000000,
            "ts_ns": 1520085154000000000,
            "txId": "6.28.807",
            "scn": "2122185",
            "commit_scn": "2122185",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 1520085154000,
            "start_scn": "2122185",
            "start_ts_ms": 1520085154000
        },
        "op": "c",
        "ts_ms": 1532592105975,
        "ts_us": 1532592105975741,
        "ts_ns": 1532592105975741582
    }
}

在前例中,请注意事件如何定义以下模式:

  • 信封server1.DEBEZIUM.CUSTOMERS.Envelope)。

  • source 结构(io.debezium.connector.oracle.Source,它特定于 Oracle 连接器并在所有事件中重用)。

  • beforeafter 字段的表特定模式。

beforeafter 字段的模式名称形式为 <logicalName>.<schemaName>.<tableName>.Value,因此与所有其他表的模式完全无关。结果是,当您使用 Avro 转换器时,每个逻辑源中表的 Avro 模式都有其自己的演变和历史记录。

此事件payload 部分提供有关事件的信息。它描述了一个行被创建(op=c),并显示 after 字段值包含插入到行中的 IDFIRST_NAMELAST_NAMEEMAIL 列的值。

默认情况下,事件的 JSON 表示比它们描述的行大得多。较大的尺寸是因为 JSON 表示同时包含消息的模式和 payload 部分。您可以使用 Avro 转换器来减小连接器写入 Kafka 主题的消息的大小。

update 事件

以下示例显示了一个update 更改事件,该事件由连接器从与前一个create 事件相同的表中捕获。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "annek@noanswer.org"
        },
        "after": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "source": {
            "version": "3.3.1.Final",
            "name": "server1",
            "ts_ms": 1520085811000,
            "ts_us": 1520085811000000,
            "ts_ns": 1520085811000000000,
            "txId": "6.9.809",
            "scn": "2125544",
            "commit_scn": "2125544",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 152008581100,
            "start_scn": "2125544",
            "start_ts_ms": 152008581100
        },
        "op": "u",
        "ts_ms": 1532592713485,
        "ts_us": 1532592713485152,
        "ts_ns": 1532592713485152954,
    }
}

payload 具有与create(insert)事件的 payload 相同的结构,但以下值不同:

  • op 字段的值为 u,表示此行因更新而更改。

  • before 字段显示了行在update 数据库提交之前存在的旧状态。

  • after 字段显示了行的更新状态,其中 EMAIL 值现在设置为 anne@example.com

  • source 字段的结构包含与之前相同的字段,但值不同,因为连接器从重做日志中的不同位置捕获了事件。

  • ts_ms 字段显示了一个时间戳,指示 Debezium 何时处理了该事件。

payload 部分揭示了其他几条有用的信息。例如,通过比较 beforeafter 结构,我们可以确定行如何因提交而改变。source 结构提供了关于 Oracle 对此更改的记录的信息,提供了可追溯性。它还让我们了解此事件相对于此主题和其他主题中的其他事件发生的时间。它是在另一个事件之前、之后还是作为同一提交的一部分发生的?

当行的主键/唯一键列被更新时,行的键的值会发生变化。因此,Debezium 在此类更新后发出三个事件:

  • 一个 DELETE 事件。

  • 一个带有旧行键的墓碑事件

  • 一个 INSERT 事件,提供行的新键。

delete 事件

以下示例显示了前一个createupdate 事件示例中显示的表的delete 事件。delete 事件的模式部分与这些事件的模式部分相同。

{
    "schema": { ... },
    "payload": {
        "before": {
            "ID": 1004,
            "FIRST_NAME": "Anne",
            "LAST_NAME": "Kretchmar",
            "EMAIL": "anne@example.com"
        },
        "after": null,
        "source": {
            "version": "3.3.1.Final",
            "name": "server1",
            "ts_ms": 1520085153000,
            "ts_us": 1520085153000000,
            "ts_ns": 1520085153000000000,
            "txId": "6.28.807",
            "scn": "2122184",
            "commit_scn": "2122184",
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "snapshot": false,
            "row_id": "AAASgjAAMAAAACnAAA",
            "commit_ts_ms": 1520085153000,
            "start_scn": "2122184",
            "start_ts_ms": 1520085153000
        },
        "op": "d",
        "ts_ms": 1532592105960,
        "ts_us": 1532592105960854,
        "ts_ns": 1532592105960854693
    }
}

createupdate 事件的 payload 相比,该事件的payload 部分显示了几处差异:

  • op 字段的值为 d,表示该行已被删除。

  • before 字段显示了被数据库提交删除的行的旧状态。

  • after 字段的值为 null,表示该行不再存在。

  • source 字段的结构包含许多存在于createupdate 事件中的键,但 ts_msscntxId 字段中的值不同。

  • ts_ms 显示了指示 Debezium 何时处理此事件的时间戳。

delete 事件为消费者提供了处理该行删除所需的信息。

Oracle 连接器的事件旨在与 Kafka 日志压缩 配合使用,该功能允许删除一些旧消息,只要保留每种键的最新消息即可。这允许 Kafka 重新获得存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。

当删除一行时,示例中显示的删除事件值仍然适用于日志压缩,因为 Kafka 能够删除所有使用相同键的早期消息。必须将消息值设置为 null,以指示 Kafka 删除共享相同键的所有消息。为了实现这一点,默认情况下,Debezium 的 Oracle 连接器始终在删除事件后发送一个特殊的墓碑事件,该事件具有相同的键但值为 null。您可以通过设置连接器属性 tombstones.on.delete 来更改默认行为。

截断事件

截断更改事件表示表已被截断。在这种情况下,消息键为 null,消息值如下所示:

{
    "schema": { ... },
    "payload": {
        "before": null,
        "after": null,
        "source": { (1)
            "version": "3.3.1.Final",
            "connector": "oracle",
            "name": "oracle_server",
            "ts_ms": 1638974535000,
            "ts_us": 1638974535000000,
            "ts_ns": 1638974535000000000,
            "snapshot": "false",
            "db": "ORCLPDB1",
            "sequence": null,
            "schema": "DEBEZIUM",
            "table": "TEST_TABLE",
            "txId": "02000a0037030000",
            "scn": "13234397",
            "commit_scn": "13271102",
            "lcr_position": null,
            "rs_id": "001234.00012345.0124",
            "ssn": 1,
            "redo_thread": 1,
            "user_name": "user",
            "commit_ts_ms": 1638974538971,
            "start_scn": "13234396",
            "start_ts_ms": 1638974527000
        },
        "op": "t", (2)
        "ts_ms": 1638974558961, (3)
        "ts_us": 1638974558961987, (3)
        "ts_ns": 1638974558961987251, (3)
        "transaction": null
    }
}
表 9. 截断事件值字段说明
Item Field name (字段名) 描述

1

source (源)

描述事件源元数据的

必需字段。在截断事件值中,source 字段结构与

同一表的创建更新删除事件相同,提供了此元数据:

  • Debezium 版本

  • 连接器类型和名称

  • 包含新行的数据库和表。

  • 模式名称。

  • 如果事件是快照的一部分(截断事件始终为 false

  • 执行操作的事务 ID。

  • 操作的 SCN

  • 数据库中发生更改的时间戳

  • 执行更改的用户名

2

op (操作)

描述操作类型的

必需字符串。op 字段值为 t,表示此表被截断。

3

ts_ms, ts_us, ts_ns

可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。

source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值和 payload.ts_ms 的值,您可以确定源数据库更新和 Debezium 之间的延迟。

如果单个 TRUNCATE 操作影响多个表,连接器会为每个被截断的表发出一个截断更改事件记录。

截断事件表示对整个表进行的更改,并且没有消息键。因此,对于具有多个分区的

主题,对于与表相关的更改事件(创建更新等)或截断事件,没有排序保证。例如,如果使用者从多个分区读取表的事件,它可能会在从另一个分区接收到删除表中所有数据的截断事件后,从一个分区接收到表的更新事件。仅保证使用单个分区的主题的排序。

如果您不希望连接器捕获截断事件,请使用 skipped.operations 选项将其过滤掉。

数据类型映射

当 Debezium Oracle 连接器检测到表行值发生更改时,它会发出一个表示该更改的更改事件。每个更改事件记录的结构与原始表相同,事件记录包含一个字段用于每个列值。表列的数据类型决定了连接器如何在更改事件字段中表示列的值,如下一节的表格所示。

对于表中的每个列,Debezium 会将源数据类型映射到字面类型,在某些情况下,还会映射到相应的事件字段中的语义类型

字面类型

描述值的字面表示方式,使用以下 Kafka Connect 模式类型之一:INT8INT16INT32INT64FLOAT32FLOAT64BOOLEANSTRINGBYTESARRAYMAPSTRUCT

语义类型

描述 Kafka Connect 模式如何通过使用字段的 Kafka Connect 模式名称来捕获字段的含义

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

对于某些 Oracle 大对象(CLOB、NCLOB 和 BLOB)和数字数据类型,您可以通过更改默认配置属性设置来修改连接器执行类型映射的方式。有关 Debezium 属性如何控制这些数据类型的映射的更多信息,请参阅 二进制和字符 LOB 类型数字类型

计划在后续版本中支持更多数据类型。如果您发现缺少任何特定类型,请提交一个 JIRA 问题

不支持的数据类型

Debezium Oracle 连接器不支持 Oracle 中使用的所有数据类型。连接器无法处理包含以下数据类型的列:

  • BFILE (二进制文件)

  • BOOLEAN。有关更多信息,请参阅 布尔类型

  • UPDATE 和 DELETE 语句的 FROM 子句中的直接连接

  • 数据用例域

  • 基于 JavaScript 的存储过程

  • LONG

  • LONG RAW

  • ROWID(在 XStream 上不支持)

  • 表值构造器 (TVC)

  • UROWID (通用 ROWID)

  • VECTOR

如果连接器尝试捕获包含具有这些不受支持数据类型之一的列的事件,它将跳过该事件,然后继续处理后续事件。连接器会继续处理配置为捕获的其他列和表,但来自不受支持列的数据将丢失。

字符类型

下表描述了连接器如何映射基本字符类型。

表 10. Oracle 基本字符类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

CHAR[(M)]

STRING

n/a

NCHAR[(M)]

STRING

n/a

NVARCHAR2[(M)]

STRING

n/a

VARCHAR[(M)]

STRING

n/a

VARCHAR2[(M)]

STRING

n/a

二进制和字符 LOB 类型

下表描述了连接器如何映射二进制和字符大型对象 (LOB) 数据类型。

表 11. Oracle 二进制和字符 LOB 类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BFILE

n/a

此数据类型不受支持

BLOB

BYTES

根据连接器配置中 binary.handling.mode 属性的设置,连接器将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)

  • Base64 编码的字符串

  • Base64 URL 安全编码的字符串

  • 十六进制编码的字符串

CLOB

STRING

n/a

LONG

n/a

此数据类型不受支持。

LONG RAW

n/a

此数据类型不受支持。

NCLOB

STRING

n/a

RAW

n/a

根据连接器配置中 binary.handling.mode 属性的设置,连接器将此类型的 LOB 值映射到以下语义类型之一:

  • 原始字节(默认)

  • Base64 编码的字符串

  • Base64 URL 安全编码的字符串

  • 十六进制编码的字符串

Oracle 仅在 SQL 语句中显式设置或更改 CLOBNCLOBBLOB 数据类型时才提供列值。因此,更改事件永远不包含未更改的 CLOBNCLOBBLOB 列的值。相反,它们包含连接器属性 unavailable.value.placeholder 定义的占位符。

如果 CLOBNCLOBBLOB 列的值已更新,新值将放在相应更新更改事件的 after 元素中。before 元素包含不可用值占位符。

数字类型

下表描述了 Debezium Oracle 连接器如何映射数字类型。

您可以通过更改连接器的 decimal.handling.mode 配置属性的值来修改连接器映射 Oracle DECIMALNUMBERNUMERICREAL 数据类型的方式。当属性设置为默认值 precise 时,连接器会将这些 Oracle 数据类型映射到 Kafka Connect org.apache.kafka.connect.data.Decimal 逻辑类型,如表中所示。当属性值设置为 doublestring 时,连接器会对某些 Oracle 数据类型使用备用映射。有关更多信息,请参阅下表的语义类型和注意事项列。

表 12. Oracle 数字数据类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

BINARY_FLOAT

FLOAT32

n/a

BINARY_DOUBLE

FLOAT64

n/a

DECIMAL[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

如果使用 BYTES,则为 org.apache.kafka.connect.data.Decimal

等同于 NUMBER 处理(注意 DECIMAL 的 S 默认为 0)。

decimal.handling.mode 属性设置为 double 时,连接器会将 DECIMAL 值表示为 Java double 值,模式类型为 FLOAT64

decimal.handling.mode 属性设置为 string 时,连接器会将 DECIMAL 值表示为其格式化的字符串表示形式,模式类型为 STRING

DOUBLE PRECISION

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

FLOAT[(P)]

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

INTEGER, INT

BYTES

org.apache.kafka.connect.data.Decimal

Oracle 中的 INTEGER 被映射到 NUMBER(38,0),因此可以存储大于任何 INT 类型的值。

NUMBER[(P[, *])]

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode 属性设置为 double 时,连接器会将 NUMBER 值表示为 Java double 值,模式类型为 FLOAT64

decimal.handling.mode 属性设置为 string 时,连接器会将 NUMBER 值表示为其格式化的字符串表示形式,模式类型为 STRING

NUMBER(P, S <= 0)

INT8 / INT16 / INT32 / INT64

NUMBER 列的 SCL 为 0 表示整数。负 SCL 表示 Oracle 中的四舍五入,例如,SCL 为 -2 会四舍五入到百位。

根据精度和 SCL,将选择以下匹配的 Kafka Connect 整数类型之一:

  • P - S < 3, INT8

  • P - S < 5, INT16

  • P - S < 10, INT32

  • P - S < 19, INT64

  • P - S >= 19, BYTES (org.apache.kafka.connect.data.Decimal)

decimal.handling.mode 属性设置为 double 时,连接器会将 NUMBER 值表示为 Java double 值,模式类型为 FLOAT64

decimal.handling.mode 属性设置为 string 时,连接器会将 NUMBER 值表示为其格式化的字符串表示形式,模式类型为 STRING

NUMBER(P, S > 0)

BYTES

org.apache.kafka.connect.data.Decimal

NUMERIC[(P, S)]

BYTES / INT8 / INT16 / INT32 / INT64

如果使用 BYTES,则为 org.apache.kafka.connect.data.Decimal

等同于 NUMBER 处理(注意 NUMERIC 的 S 默认为 0)。

decimal.handling.mode 属性设置为 double 时,连接器会将 NUMERIC 值表示为 Java double 值,模式类型为 FLOAT64

decimal.handling.mode 属性设置为 string 时,连接器会将 NUMERIC 值表示为其格式化的字符串表示形式,模式类型为 STRING

SMALLINT

BYTES

org.apache.kafka.connect.data.Decimal

Oracle 中的 SMALLINT 被映射到 NUMBER(38,0),因此可以存储大于任何 INT 类型的值。

REAL

STRUCT

io.debezium.data.VariableScaleDecimal

包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode 属性设置为 double 时,连接器会将 REAL 值表示为 Java double 值,模式类型为 FLOAT64

decimal.handling.mode 属性设置为 string 时,连接器会将 REAL 值表示为其格式化的字符串表示形式,模式类型为 STRING

如上所述,Oracle 允许 NUMBER 类型使用负 SCL。这在转换为 Avro 格式时可能会出现问题,特别是当数字表示为 Decimal 时。Decimal 类型包含 SCL 信息,但 Avro 规范只允许 SCL 为正值。根据使用的模式注册表,这可能会导致 Avro 序列化失败。为了避免此问题,您可以使用 NumberToZeroScaleConverter,它将具有负 SCL 的足够大的数字 (P - S >= 19) 转换为 SCL 为零的 Decimal 类型。可以按如下方式配置:

converters=zero_scale
zero_scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
zero_scale.decimal.mode=precise

默认情况下,数字会转换为 Decimal 类型(zero_scale.decimal.mode=precise),但为完整起见,也支持其余两种受支持的类型(doublestring)。

布尔类型

从 Oracle 23 开始,数据库为 BOOLEAN 数据类型提供了原生支持。但是,Debezium Oracle 连接器不支持此类型。

早期版本的 Oracle 不包含对布尔类型的原生支持。在早期版本中,一些用户采用使用具有特定语义的其他数据类型来模拟逻辑 BOOLEAN 数据类型的概念。

为了使您能够将源列转换为布尔数据类型,Debezium 提供了一个 NumberOneToBooleanConverter 自定义转换器,您可以通过以下方式之一使用它:

  • 将所有 NUMBER(1) 列映射到 BOOLEAN 类型。

  • 使用逗号分隔的正则表达式列表枚举一组列。
    要使用此类型的转换,必须使用 selector 参数设置 converters 配置属性,如以下示例所示:

    converters=boolean
    boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
    boolean.selector=.*MYTABLE.FLAG,.*.IS_ARCHIVED

时间类型

除了 Oracle INTERVALTIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE 数据类型之外,连接器转换时间类型的方式取决于 time.precision.mode 配置属性的值。

time.precision.mode 配置属性设置为 adaptive(默认)时,连接器根据列的数据类型定义来确定时间类型的字面类型和语义类型,以便事件精确地表示数据库中的值。

Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT64

io.debezium.time.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

INTERVAL DAY[(M)] TO SECOND

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式计算的平均每月天数的时间间隔的微秒数。

(当 interval.handling.mode 设置为 string 时)io.debezium.time.Interval

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

INTERVAL YEAR[(M)] TO MONTH

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式计算的平均每月天数的时间间隔的微秒数。

(当 interval.handling.mode 设置为 string 时)io.debezium.time.Interval

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

TIMESTAMP(0 - 3)

INT64

io.debezium.time.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP, TIMESTAMP(4 - 6)

INT64

io.debezium.time.MicroTimestamp

表示自 UNIX 纪元以来的微秒数,不包含时区信息。

TIMESTAMP(7 - 9)

INT64

io.debezium.time.NanoTimestamp

表示自 UNIX 纪元以来的纳秒数,不包含时区信息。

TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

带时区信息的时间戳的字符串表示。

TIMESTAMP WITH LOCAL TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

UTC 时间戳的字符串表示。

time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当使用者仅了解内置的 Kafka Connect 逻辑类型且无法处理可变精度时间值时,这可能很有用。由于 Oracle 支持的精度级别超过了 Kafka Connect 中逻辑类型支持的级别,如果您将 time.precision.mode 设置为 connect,则当数据库列的小数秒精度值大于 3 时,会丢失精度

Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

DATE

INT32

org.apache.kafka.connect.data.Date

表示自 UNIX 纪元以来的天数。

INTERVAL DAY[(M)] TO SECOND

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式计算的平均每月天数的时间间隔的微秒数。

(当 interval.handling.mode 设置为 string 时)io.debezium.time.Interval

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

INTERVAL YEAR[(M)] TO MONTH

FLOAT64

io.debezium.time.MicroDuration

使用 365.25 / 12.0 公式计算的平均每月天数的时间间隔的微秒数。

(当 interval.handling.mode 设置为 string 时)io.debezium.time.Interval

间隔值的字符串表示形式,遵循 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 的模式,例如,P1Y2M3DT4H5M6.78S

TIMESTAMP(0 - 3)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP(4 - 6)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP(7 - 9)

INT64

org.apache.kafka.connect.data.Timestamp

表示自 UNIX 纪元以来的毫秒数,不包含时区信息。

TIMESTAMP WITH TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

带时区信息的时间戳的字符串表示。

TIMESTAMP WITH LOCAL TIME ZONE

STRING

io.debezium.time.ZonedTimestamp

UTC 时间戳的字符串表示。

ROWID 类型

下表描述了连接器如何映射 ROWID(行地址)数据类型。

表 13. Oracle ROWID 数据类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

ROWID

STRING

在使用 Oracle XStream 时,此数据类型不受支持。

UROWID

n/a

此数据类型不受支持.

向量类型

Debezium Oracle 连接器不支持 Oracle VECTOR 数据类型。

XML 类型

XMLTYPE 的支持目前处于孵化阶段。根据收到的反馈,确切的语义、配置选项等在未来的修订版中可能会发生变化。在使用这些数据类型时遇到任何问题,请告知我们。

下表描述了连接器如何映射 XMLTYPE 数据类型。

表 14. Oracle XMLTYPE 数据类型映射
Oracle 数据类型 字面类型 (模式类型) 语义类型 (模式名称) 和注释

XMLTYPE

STRING

io.debezium.data.Xml

用户定义类型

Oracle 允许您定义自定义数据类型,当内置数据类型不满足您的要求时,可以提供灵活性。有几种用户定义类型,如对象类型、REF 数据类型、Varray 和嵌套表。目前,您不能将 Debezium Oracle 连接器与任何这些用户定义类型一起使用。

Oracle 提供的类型

Oracle 提供基于 SQL 的接口,您可以使用它们来定义新类型,当内置或 ANSI 支持的类型不足时。Oracle 提供了几种常用的数据类型来满足各种用途,例如任何空间类型。目前,您不能将 Debezium Oracle 连接器与任何这些数据类型一起使用。

默认值

如果在数据库模式中为列指定了默认值,Oracle 连接器将尝试将该值传播到相应 Kafka 记录字段的模式。支持大多数常见数据类型,包括:

  • 字符类型(CHARNCHARVARCHARVARCHAR2NVARCHARNVARCHAR2

  • 数字类型(INTEGERNUMERIC 等)

  • 时间类型(DATETIMESTAMPINTERVAL 等)

如果时间类型使用函数调用(如 TO_TIMESTAMPTO_DATE)来表示默认值,连接器将通过额外的数据库调用来评估函数以解析默认值。例如,如果一个 DATE 列定义了默认值 TO_DATE('2021-01-02', 'YYYY-MM-DD'),该列的默认值将是自 UNIX 纪元以来该日期的天数,在本例中为 18629

如果时间类型使用 SYSDATE 常量来表示默认值,连接器将根据列定义为 NOT NULL 还是 NULL 来解析此值。如果列允许 null,则不设置默认值;但是,如果列不允许 null,则默认值将解析为 0(对于 DATETIMESTAMP(n) 数据类型)或 1970-01-01T00:00:00Z(对于 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE 数据类型)。默认值类型将是数字,除非列是 TIMESTAMP WITH TIME ZONETIMESTAMP WITH LOCAL TIME ZONE,在这种情况下,它将作为字符串发出。

自定义转换器

默认情况下,Debezium Oracle 连接器提供了一些特定于 Oracle 数据类型的 CustomConverter 实现。这些自定义转换器基于连接器配置为特定数据类型提供备用映射。要将 CustomConverter 添加到连接器,请遵循 自定义转换器文档中的说明。

Debezium Oracle 连接器提供以下自定义转换器:

NUMBER(1) 到布尔值

从 23 版本开始,Oracle 数据库提供 BOOLEAN 逻辑数据类型。在早期版本中,数据库通过使用 NUMBER(1) 数据类型来模拟 BOOLEAN 类型,其中 0 表示 false,1 表示 true。

默认情况下,当 Debezium 为使用 NUMBER(1) 数据类型的源列发出更改事件时,它会将数据转换为 INT8 字面类型。如果 NUMBER(1) 数据类型的默认映射不满足您的需求,您可以配置连接器,使其在发出这些列时使用逻辑 BOOL 类型,如以下示例所示配置 NumberOneToBooleanConverter

示例:NumberOneToBooleanConverter 配置
converters=number-to-boolean
number-to-boolean.type=io.debezium.connector.oracle.converters.NumberOneToBooleanConverter
number-to-boolean.selector=.*.MY_TABLE.DATA

在前面的示例中,selector 属性是可选的。selector 属性指定一个正则表达式,该表达式标识转换器适用的表或列。如果省略 selector 属性,则当 Debezium 发出事件时,每个具有 NUMBER(1) 数据类型的列都将转换为使用逻辑 BOOL 类型字段。

NUMBER 转零 SCL

Oracle 支持使用负 SCL 创建基于 NUMBER 的列,即 NUMBER(-2)。并非所有系统都能处理负 SCL 值,因此这些值可能会导致管道处理问题。例如,由于 Apache Avro 不支持这些值,因此如果 Debezium 将事件转换为 Avro 格式,则可能会出现问题。同样,不支持这些值的下游使用者也可能遇到错误。

配置示例
converters=number-zero-scale
number-zero-scale.type=io.debezium.connector.oracle.converters.NumberToZeroScaleConverter
number-zero-scale.decimal.mode=precise

在前面的示例中,decimal.mode 属性指定了连接器如何发出十进制值。此属性是可选的。如果省略 decimal.mode 属性,转换器将默认为使用 PRECISE 十进制处理模式。

RAW 转字符串

尽管 Oracle 不建议使用某些数据类型(如 RAW),但遗留系统可能仍会使用这些类型。默认情况下,Debezium 将 RAW 列类型作为逻辑 BYTES 发出,这是一种允许存储二进制或基于文本数据的类型。

在某些情况下,RAW 列可能将字符数据存储为一系列字节。为了方便使用者消费,您可以将 Debezium 配置为使用 RawToStringConverterRawToStringConverter 提供了一种简单地定位此类 RAW 列并将值作为字符串(而不是字节)发出的方法。以下示例显示了如何将 RawToStringConverter 添加到连接器配置中:

示例:RawToStringConverter 配置
converters=raw-to-string
raw-to-string.type=io.debezium.connector.oracle.converters.RawToStringConverter
raw-to-string.selector=.*.MY_TABLE.DATA

在前面的示例中,selector 属性使您能够定义一个正则表达式,该表达式指定转换器处理的表或列。如果省略 selector 属性,转换器会将所有 RAW 列类型映射到逻辑 STRING 字段类型。

设置 Oracle

设置 Oracle 以与 Debezium Oracle 连接器一起使用需要执行以下步骤。这些步骤假定使用了多租户配置,其中包含一个容器数据库和至少一个可插拔数据库。如果您不打算使用多租户配置,可能需要调整以下步骤。

有关使用 Vagrant 在虚拟机中设置 Oracle 的信息,请参阅 Debezium Oracle 数据库 Vagrant Box GitHub 仓库。

与 Oracle 安装类型的兼容性

Oracle 数据库可以安装为独立实例,也可以使用 Oracle Real Application Cluster (RAC)。Debezium Oracle 连接器与这两种安装类型兼容。

捕获更改事件时排除的模式

当 Debezium Oracle 连接器捕获表时,它会自动排除以下模式下的表:

  • appqossys

  • audsys

  • ctxsys

  • dvsys

  • dbsfwuser

  • dbsnmp

  • qsmadmin_internal

  • lbacsys

  • mdsys

  • ojvmsys

  • olapsys

  • orddata

  • ordsys

  • outln

  • sys

  • system

  • vecsys (Oracle 23+)

  • wmsys

  • xdb

要使连接器能够捕获表的更改,表必须使用不在上述列表中的模式。

捕获时排除的表

当 Debezium Oracle 连接器捕获表时,它会自动排除符合以下规则的表:

  • 匹配模式 CMP[3|4]$[0-9]+ 的压缩顾问表。

  • 匹配模式 SYS_IOT_OVER_% 的索引组织表。

  • 匹配模式 MDRT_%MDRS_%MDXT_% 的空间表。

  • 嵌套表

要使连接器能够捕获名称符合上述任何规则的表,您必须重命名该表。

准备数据库

Oracle LogMiner 所需的配置
ORACLE_SID=ORACLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should now "Database log mode: Archive Mode"
archive log list

exit;

Oracle AWS RDS 不允许您执行上述命令,也不允许您以 sysdba 身份登录。AWS 提供了以下命令来配置 LogMiner。在执行这些命令之前,请确保您的 Oracle AWS RDS 实例已启用备份。

要确认 Oracle 已启用备份,请先执行以下命令。LOG_MODE 应显示 ARCHIVELOG。如果未显示,您可能需要重启 Oracle AWS RDS 实例。

Oracle AWS RDS LogMiner 所需的配置
SQL> SELECT LOG_MODE FROM V$DATABASE;

LOG_MODE
------------
ARCHIVELOG

一旦 LOG_MODE 设置为 ARCHIVELOG,请执行命令来完成 LogMiner 配置。第一个命令将数据库设置为 archivelogs,第二个命令添加了补充日志。

Oracle AWS RDS LogMiner 所需的配置
exec rdsadmin.rdsadmin_util.set_configuration('archivelog retention hours',24);

exec rdsadmin.rdsadmin_util.alter_supplemental_logging('ADD');

要使 Debezium 能够捕获已更改数据库行的之前状态,您还必须为要捕获的表或整个数据库启用补充日志。以下示例说明了如何为 inventory.customers 单个表的所有列配置补充日志。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

为所有表列启用补充日志会增加 Oracle 重做日志的量。为防止日志大小过度增长,请选择性地应用上述配置。

必须在数据库级别启用最小补充日志,并可按如下方式配置:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

重做日志大小

根据数据库配置,重做日志的大小和数量可能不足以实现可接受的性能。在设置 Debezium Oracle 连接器之前,请确保重做日志的容量足以支持数据库。

数据库的重做日志容量必须足以存储其数据字典。通常,数据字典的大小会随着数据库中表和列数量的增加而增加。如果重做日志容量不足,数据库和 Debezium 连接器都可能出现性能问题。

请咨询您的数据库管理员,评估数据库是否需要增加日志容量。

归档日志目标

Oracle 数据库管理员最多可以配置 31 个不同的归档日志目标。管理员可以为每个目标设置参数,将其指定用于特定用途,例如,用于物理备用数据库的日志传输,或用于外部存储以实现日志的长期保留。Oracle 在 V$ARCHIVE_DEST_STATUS 视图中报告有关归档日志目标的信息。

Debezium Oracle 连接器仅使用状态为 VALID 且类型为 LOCAL 的目标。如果您的 Oracle 环境包含多个满足这些条件的目标,请咨询您的 Oracle 管理员,确定 Debezium 应使用哪个归档日志目标。

过程
  • 要指定 Debezium 要使用的归档日志目标,请在连接器配置中设置 archive.destination.name 属性。

    例如,假设一个数据库配置了两个归档目标路径:/path/one/path/two,并且 V$ARCHIVE_DEST_STATUS 表将这些路径与 DEST_NAME 列中指定的目标名称相关联。如果两个目标都满足 Debezium 的条件——即,如果它们的 statusVALIDtypeLOCAL——要将连接器配置为使用数据库写入 /path/two 的归档日志,请将 archive.destination.name 的值设置为 V$ARCHIVE_DEST_STATUS 表中与 /path/two 关联的 DEST_NAME 列中的值。例如,如果 DEST_NAME 对于 /path/twoLOG_ARCHIVE_DEST_3,则您将按如下方式配置 Debezium:

{
  "archive.destination.name": "LOG_ARCHIVE_DEST_3"
}

请勿将 archive.destination.name 的值设置为数据库用于归档日志的路径。应将属性设置为满足您的归档日志保留策略的 V$ARCHIVE_DEST_STATUS 表中某行的 DEST_NAME 列中的归档日志目标名称。

如果您的 Oracle 环境包含多个满足这些条件的目标,而您未能指定首选目标,Debezium Oracle 连接器将随机选择目标路径。由于每个目标配置的保留策略可能不同,如果连接器选择了一个已删除请求日志数据的路径,这可能会导致错误。

当 Debezium Oracle 连接器在备用数据库使用与主实例不同的归档目标名称的环境中使用时,archive.destination.name 属性可以包含一个逗号分隔的目标名称列表。值应按首选项排序,因为 Debezium 连接器将使用第一个 LOCALVALID 的目标。这在发生故障转移场景时,主数据库切换到备用数据库时,有助于最小化所需的重新配置。

例如,如果主数据库的目标是 LOG_ARCHIVE_DEST_2,备用数据库的目标是 LOG_ARCHIVE_DEST_5,则将 archive.destination.name 属性设置为 LOG_ARCHIVE_DEST_2,LOG_ARCHIVE_DEST_5。这将优先使用目标 #2,它将在主数据库上使用;目标 #5 将在备用数据库上使用。

如果 CDC 的目标对于主实例和备用实例都是相同的,使用单个值就足够了。事实上,只有当任一实例有多个 LOCALVALID 的归档目标时,才需要此属性。

为连接器创建用户帐户

为了让 Debezium Oracle 连接器捕获更改事件,它必须作为一个具有特定权限的 Oracle LogMiner 用户运行。

通常,在创建连接器的 Oracle 帐户时,您会授予该帐户访问权限,允许连接器检测数据库中所有表的更改。但是,在某些环境中,安全策略可能会禁止您授予如此广泛的访问权限。

以下示例显示了在多租户数据库模型中为连接器创建 Oracle 用户帐户的 SQL。示例中的 grant 设置允许 Debezium 用户访问数据库中的所有用户表。

为了符合安全策略,您可以修改 SELECT ANY TABLEFLASHBACK ANY TABLE 的 grant,以便连接器只能访问您打算捕获的表。

请勿修改其他 grant,例如 SELECT ANY TRANSACTION grant,或 SELECT ON V_$ grant 集合,它们提供对动态性能视图 (V$) 的访问。连接器正常运行需要这些 grant。

为防止数据丢失,如果您限制了 SELECTFLASHBACK grant 的范围,请确保修改后的范围与连接器的 include 配置设置兼容。您为帐户设置的权限必须允许读取连接器要捕获的所有表。

示例 3. 创建连接器的 LogMiner 用户
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba

  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL; (1)
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL; (2)
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL; (3)
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL; (4)
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL; (5)
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; (6)
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL; (7)
  GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL; (8)
  GRANT LOGMINING TO c##dbzuser CONTAINER=ALL; (9)

  GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL; (10)
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL; (11)
  GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL; (12)

  GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL; (13)
  GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL; (14)

  GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL; (15)
  GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL; (16)
  GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL; (17)
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL; (18)
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL; (19)
  GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL; (20)
  GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL; (21)
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL; (22)
  GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL; (23)

  GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL; (24)
  GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL; (25)

  exit;
表 15. 权限/grant 说明
Item 角色名称 描述

1

CREATE SESSION

使连接器能够连接到 Oracle。

2

SET CONTAINER

使连接器能够在可插拔数据库之间切换。仅当 Oracle 安装启用了容器数据库支持 (CDB) 时才需要此设置。

3

SELECT ON V_$DATABASE

使连接器能够读取 V$DATABASE 表。

4

FLASHBACK ANY TABLE

使连接器能够执行 Flashback 查询,这是连接器执行数据初始快照的方式。或者,您可以仅为特定表授予 FLASHBACK 权限,而不是为所有表授予 FLASHBACK 权限。

5

SELECT ANY TABLE

使连接器能够读取任何表。或者,您可以仅为特定表授予 SELECT 权限,而不是为所有表授予 SELECT 权限。

6

SELECT_CATALOG_ROLE

使连接器能够读取数据字典,这对于 Oracle LogMiner 会话是必需的。

7

EXECUTE_CATALOG_ROLE

使连接器能够将数据字典写入 Oracle 重做日志,这对于跟踪模式更改是必需的。

8

SELECT ANY TRANSACTION

使快照过程能够对任何事务执行 Flashback 快照查询,以便连接器可以从 LogMiner 读取过去的更改。当授予 FLASHBACK ANY TABLE 时,也应授予此权限。此 grant 对于 Oracle 12c 及更高版本是可选的。

在这些较新版本中,连接器通过 EXECUTE_CATALOG_ROLELOGMINING grant 获取所需权限。

9

LOGMINING

此角色在较新版本的 Oracle 中被添加,作为授予对 Oracle LogMiner 及其包的完全访问权限的一种方式。对于没有此角色的较旧版本的 Oracle,您可以忽略此 grant。

10

CREATE TABLE

使连接器能够在其默认表空间中创建其刷新表。刷新表允许连接器显式控制 LGWR 内部缓冲区刷新到磁盘。

11

LOCK ANY TABLE

使连接器在模式快照期间能够锁定表。如果通过配置显式禁用快照锁,则可以安全地忽略此 grant。

12

CREATE SEQUENCE

使连接器能够在其默认表空间中创建序列。

13

EXECUTE ON DBMS_LOGMNR

使连接器能够运行 DBMS_LOGMNR 包中的方法。这对于与 Oracle LogMiner 交互是必需的。在较新版本的 Oracle 中,这通过 LOGMINING 角色授予,但在较旧版本中,必须显式授予。

14

EXECUTE ON DBMS_LOGMNR_D

使连接器能够运行 DBMS_LOGMNR_D 包中的方法。这对于与 Oracle LogMiner 交互是必需的。在较新版本的 Oracle 中,这通过 LOGMINING 角色授予,但在较旧版本中,必须显式授予。

15 到 25

SELECT ON V_$…​。

使连接器能够读取这些表。连接器必须能够读取有关 Oracle 重做和归档日志以及当前事务状态的信息,才能准备 Oracle LogMiner 会话。没有这些 grant,连接器将无法运行。

备用数据库

备用数据库提供主实例的同步副本。在主数据库发生故障时,备用数据库可提供持续可用性和灾难恢复。Oracle 使用物理和逻辑备用数据库。

物理备用

物理备用数据库是主生产数据库的精确、逐块复制副本,其系统更改号 (SCN) 值与主数据库相同。Debezium Oracle 连接器无法直接从物理备用数据库捕获更改事件,因为物理备用数据库不接受外部连接。只有在备用数据库转换为主数据库后,连接器才能从物理备用数据库捕获事件。然后,连接器像连接任何主数据库一样连接到前备用数据库。

逻辑备用

逻辑备用数据库包含与主数据库相同的数据,但数据可能以不同的物理方式存储。逻辑备用数据库中的 SCN 偏移量与主数据库中的偏移量不同。您可以配置 Debezium Oracle 连接器以从逻辑备用数据库捕获事件

故障转移数据库

在设置故障转移数据库时,最佳实践通常是使用物理备用数据库而不是逻辑备用数据库。物理备用数据库比逻辑备用数据库与主数据库保持更一致的状态。物理备用数据库包含主数据的精确副本,备用数据库的系统更改号 (SCN) 值与主数据库的 SCN 值相同。在 Debezium 环境中,数据库故障转移到物理备用数据库后,一致的 SCN 值可确保连接器能够找到最后处理的 SCN 值。

物理备用数据库处于只读模式,并运行托管恢复以保持同步。当数据库处于备用模式时,它不接受来自客户端的外部 JDBC 连接,外部应用程序也无法访问它。

在发生故障事件后,为了让 Debezium 连接到前物理备用数据库,DBA 必须执行多项操作才能启用故障转移到备用数据库,并将其提升为主数据库。以下列表确定了一些关键操作:

  • 取消备用数据库上的托管恢复。

  • 完成活动恢复过程。

  • 将备用数据库转换为主角色。

  • 将新主数据库打开以进行客户端读写操作。

在原物理备用数据库可供正常使用后,您可以配置 Debezium Oracle 连接器连接到它。要使连接器能够从新主数据库捕获,请编辑连接器配置中的数据库主机名,将原始主数据库的主机名替换为新主数据库的主机名。

配置 Debezium Oracle 连接器以从逻辑备用数据库捕获事件

当 Debezium Oracle 连接器连接到主数据库时,它使用内部刷新表来管理 Oracle Log Writer Buffer (LGWR) 进程的刷新周期。刷新过程要求连接器访问数据库的用户帐户具有创建和写入此刷新表的权限。但是,逻辑备用数据库通常允许只读访问,从而阻止连接器写入数据库。您可以修改连接器配置以允许连接器从逻辑备用数据库捕获事件,或者 DBA 可以创建一个新的可写表空间,连接器可以在其中存储刷新表。

使用逻辑备用数据库的连接器处于孵化状态,并且可能会在不另行通知的情况下进行更改。有一个打开的 Jira 问题正在调查支持从物理备用数据库捕获更改。

过程
  • 要使 Debezium 能够从 Oracle 只读逻辑备用数据库捕获事件,请将以下属性添加到连接器配置中,以禁用刷新表的创建和管理:

    internal.log.mining.read.only=true

    上述设置可阻止数据库创建和更新 LOG_MINING_FLUSH 表。您可以在 Oracle Standalone 数据库或 Oracle RAC 安装中使用 internal.log.mining.read.only 属性。

扩展的最大字符串大小

数据库参数 max_string_size 控制 Oracle 数据库以及 Debezium Oracle 连接器如何解释 VARCHAR2NVARCHAR2RAW 字段的值。默认值 STANDARD 意味着这些数据类型的长度与 Oracle 12c 之前的版本(VARCHAR2NVARCHAR2 为 4000 字节,RAW 为 2000 字节)保持一致。当配置为 EXTENDED 时,这些列现在允许存储最多 32767 字节的数据。

虽然数据库管理员可以将 max_string_sizeSTANDARD 更改为 EXTENDED,但不能反向操作。一旦数据库更新到 EXTENDED 字符串支持,就无法撤销。

对于 Debezium Oracle 连接器,当数据库参数 max_string_sizeEXTENDED 时,应将 lob.enabled 连接器配置选项设置为 true,以捕获对字符串长度超过 4000 字节的 VARCHAR2NVARCHAR2 字段或字节数超过 2000 字节的 RAW 字段所做的更改。

当设置为 EXTENDED 时,Oracle 会在字符串数据字节长度超过旧最大值时,在飞行中隐式转换字符数据。这种隐式转换意味着 Oracle 内部将字符串数据视为 CLOB,因此您获得了将其视为普通字符串的好处,但同时也有数据库层面的存储的所有陷阱和担忧。

由于 Oracle 在内部将这些字符串视为 CLOB,重做日志也反映了 Debezium Oracle 连接器需要知道并应该挖掘的几种独特操作类型。并且由于这些操作类型非常类似于 CLOB 操作,因此重做日志条目必须像任何其他 LOB 类型一样进行挖掘,这就是为什么 lob.enabled 必须设置为 true 才能捕获重做日志中的更改,而无论字符串数据的字节长度如何。

当 Oracle 配置为使用 EXTENDED 字符串大小时,LogMiner 有时无法在重新构造扩展字符串字段的 SQL 时转义单引号(')。如果扩展字符串字段的字节长度未超过旧的最大长度,则可能会发生此问题。因此,这些字段中的值可能会被截断,导致无效的 SQL 语句,Oracle 连接器无法解析。

为了帮助解决部分问题,您可以配置连接器以通过将以下属性设置为 true 来放宽单引号检测:

internal.log.mining.sql.relaxed.quote.detection

请注意,此内部设置的使用目前不是受支持的功能。
有关更多信息,请参阅 DBZ-8034

部署

要部署 Debezium Oracle 连接器,请安装 Debezium Oracle 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件
过程
  1. 下载 Debezium Oracle 连接器插件存档

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 从 Maven Central 下载 Oracle 的 JDBC 驱动程序,并将下载的驱动程序文件解压到包含 Debezium Oracle 连接器 JAR 文件的目录。

    如果您将 Debezium Oracle 连接器与 Oracle XStream 一起使用,请将 JDBC 驱动程序作为 Oracle Instant Client 包的一部分获取。有关更多信息,请参阅 获取 Oracle JDBC 驱动程序和 XStreams API 文件
  4. 从 Maven Central 下载 Oracle 的 XDB 库,并将下载的文件解压到包含 Debezium Oracle 连接器 JAR 文件的目录。

  5. 将包含 JAR 文件的目录添加到Kafka Connect 的 plugin.path

  6. 重新启动您的 Kafka Connect 进程以加载新 JAR 文件。

Debezium Oracle 连接器配置

通常,您通过提交一个 JSON 请求来注册 Debezium Oracle 连接器,该请求指定连接器的配置属性。以下示例显示了一个用于向端口 1521 的逻辑名称 server1 注册 Debezium Oracle 连接器实例的 JSON 请求:

您可以选择为数据库中的模式和表的子集生成事件。可选地,您可以忽略、掩盖或截断包含敏感数据、大于指定大小或您不需要的列。

示例:Debezium Oracle 连接器配置
{
    "name": "inventory-connector",  (1)
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",  (2)
        "database.hostname" : "<ORACLE_IP_ADDRESS>",  (3)
        "database.port" : "1521",  (4)
        "database.user" : "c##dbzuser",  (5)
        "database.password" : "dbz",   (6)
        "database.dbname" : "ORCLCDB",  (7)
        "topic.prefix" : "server1",  (8)
        "tasks.max" : "1",  (9)
        "database.pdb.name" : "ORCLPDB1",  (10)
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", (11)
        "schema.history.internal.kafka.topic": "schema-changes.inventory"  (12)
    }
}
1 在向 Kafka Connect 服务注册连接器时分配给它的名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。)
2 连接器对应的 Java 类名称。对于 Oracle 连接器,始终使用 io.debezium.connector.oracle.OracleConnector 的值。
3 Oracle 数据库服务器的 IP 地址或主机名。
4 Oracle 数据库服务器的整数端口号。
5 连接器用于连接 Oracle 数据库服务器的 Oracle 用户帐户的名称。有关详细信息,请参阅 创建连接器用户
6 连接到 Oracle 数据库服务器时使用的密码。有关详细信息,请参阅 创建连接器用户
7 要从中捕获更改的数据库名称。
8 主题前缀,用于标识和提供连接器从中捕获更改的 Oracle 数据库服务器的命名空间。
9 为此连接器创建的最大任务数。
10 连接器从中捕获更改的 Oracle 可插拔数据库的名称。仅在容器数据库 (CDB) 安装中使用。
11 此连接器用于读取和恢复 DDL 语句到数据库模式历史主题的 Kafka 代理列表。
12 连接器读取和恢复 DDL 语句的数据库模式历史主题的名称。此主题仅供内部使用,使用者不应使用。

在前面的示例中,database.hostnamedatabase.port 属性用于定义到数据库主机的连接。但是,在更复杂的 Oracle 部署或使用 Transparent Network Substrate (TNS) 名称的部署中,您可以使用一种替代方法,即指定 JDBC URL。

以下 JSON 示例显示了与前面示例相同的配置,但它使用 JDBC URL 连接到数据库。

示例:使用 JDBC URL 连接数据库的 Debezium Oracle 连接器配置
{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

有关可为 Debezium Oracle 连接器设置的完整配置属性列表,请参阅 Oracle 连接器属性

您可以将此配置与 POST 命令一起发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个连接器任务,该任务执行以下操作:

  • 连接到 Oracle 数据库。

  • 读取重做日志。

  • 将更改事件记录到 Kafka 主题。

添加连接器配置

要开始运行 Debezium Oracle 连接器,请创建连接器配置,并将配置添加到您的 Kafka Connect 集群。

先决条件
过程
  1. 创建 Oracle 连接器的配置

  2. 使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。

结果

连接器启动后,它将对连接器配置的 Oracle 数据库执行一致性快照。然后,连接器开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。

可插拔与非可插拔数据库

Oracle Database 支持以下部署类型:

容器数据库 (CDB)

一个可以包含多个可插拔数据库 (PDB) 的数据库。数据库客户端连接到每个 PDB,就像连接到标准非 CDB 数据库一样。

非容器数据库 (non-CDB)

标准的 Oracle 数据库,不支持创建可插拔数据库。

示例:CDB 部署的 Debezium 连接器配置
{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

当您为 Oracle CDB 配置 Debezium Oracle 连接器时,必须为 database.pdb.name 属性指定一个值,该属性命名您希望连接器从中捕获更改的 PDB。对于非 CDB 安装,请*不要*指定 database.pdb.name 属性。

示例:非 CDB 部署的 Debezium Oracle 连接器配置
{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

使用 mTLS 安全连接

使用 mTLS 身份验证连接到 Oracle 时,需要连接器和数据库服务都证明其身份。Debezium for Oracle 连接器依赖 Oracle JDBC 驱动程序的内置功能来支持 mTLS 身份验证。

您可以使用以下任一方法在 Debezium 和 Oracle 之间建立 mTLS 连接:

Java 密钥/信任库

先决条件
  • 验证连接器是否可以访问配置的密钥库和信任库文件。

  • 验证数据库 TNS(透明网络底层)监听器是否支持 TCPS 安全连接。

过程
  • 配置 Debezium Oracle 连接器,如下例所示。

    示例:Debezium Oracle 连接器 TLS 配置
    {
      "database.url": "jdbc:oracle:thin@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcps)(HOST=<host>)(PORT=<port>))(CONNECT_DATA(SERVICE_NAME=<service>)))",
      "driver.javax.net.ssl.keyStore": "<path-to-jks-keystore>",
      "driver.javax.net.ssl.keyStorePassword": "<keystore-password>",
      "driver.javax.net.ssl.keyStoreType": "JKS",
      "driver.javax.net.ssl.trustStore": "<path-to-jks-truststore>",
      "driver.javax.net.ssl.trustStorePassword": "<truststore-password>",
      "driver.javax.net.ssl.trustStoreType": "JKS"
    }

    为了让 Debezium 使用 TLS 加密与 Oracle 通信,需要与服务器建立 TCPS 连接。要建立 TCPS 连接,您必须将连接器配置为使用 database.url 属性,而不是 database.host 属性。与 database.host 属性不同,database.url 属性允许您定义一个显式要求使用 TCPS 协议的 Oracle TNS(透明网络底层)连接字符串。

Oracle Wallet

先决条件
  • 与您的数据库管理员确认 Oracle Wallet 已在 Oracle 数据库服务器上配置。

  • 在 Oracle JDBC 驱动程序存档中找到 oraclepki.jar

过程
  • oraclepki.jar 安装在包含 Debezium Oracle 连接器 JAR 文件的位置。这是您安装 Oracle JDBC 驱动程序的相同位置。

  • 在连接器配置中,设置 database.url 属性,而不是 database.hostname 属性,以指定 Debezium 如何连接到数据库。database.url 属性定义了一个与 Oracle Wallet 交互所需的基于 Oracle TNS 的连接字符串。TNS 连接字符串的示例,请参阅此列表后的 mTLS 配置示例。

  • 设置 Oracle JDBC 驱动程序属性 oracle.net.wallet_location 以明确设置 Oracle JDBC 驱动程序使用的 Oracle Wallet 配置。

示例 mTLS 配置
{
  "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCPS)(PORT=xxxx)(HOST=xx.xx.xx.xx))(CONNECT_DATA=(SID=xxxx)))",
  "driver.oracle.net.wallet_location": "(SOURCE=(METHOD=file)(METHOD_DATA=(DIRECTORY=/opt/kafka/external-configuration/oracle_wallet/)))"
}

请确保在 database.url 中设置了正确的宿主、端口和服务标识符 (sid)。此外,请确保 driver.oracle.net.wallet_location 中的目录是可读的。

连接器属性

Debezium Oracle 连接器有许多配置属性,您可以使用它们来实现适合您应用程序的连接器行为。许多属性都有默认值。属性信息组织如下:

必需的 Debezium Oracle 连接器配置属性

以下配置属性是必需的,除非有默认值可用。

属性

Default (默认值)

描述

无默认值

连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器所必需的。)

无默认值

连接器的 Java 类名称。对于 Oracle 连接器,始终使用 io.debezium.connector.oracle.OracleConnector 的值。

无默认值

枚举连接器可以使用的一系列自定义转换器的符号名称,用逗号分隔。
例如,boolean
需要此属性才能使连接器能够使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

boolean.type: io.debezium.connector.oracle.converters.NumberOneToBooleanConverter

如果要进一步控制已配置转换器的行为,可以添加一个或多个配置参数来将值传递给转换器。要将任何附加配置参数与转换器关联,请在参数名称前加上转换器的符号名称。

例如,要定义一个 selector 参数来指定 boolean 转换器处理的列子集,请添加以下属性:

boolean.selector: .*MYTABLE.FLAG,.*.IS_ARCHIVED

1

为此连接器创建的最大任务数。Oracle 连接器始终使用单个任务,因此不使用此值,因此默认值始终是可接受的。

无默认值

Oracle 数据库服务器的 IP 地址或主机名。

无默认值

Oracle 数据库服务器的整数端口号。

无默认值

连接器用于连接 Oracle 数据库服务器的 Oracle 用户帐户的名称。

无默认值

连接到 Oracle 数据库服务器时使用的密码。

无默认值

要连接的数据库名称。在容器数据库环境中,请指定根容器数据库 (CDB) 的名称,而不是包含的可插拔数据库 (PDB) 的名称。

无默认值

指定原始数据库 JDBC URL。使用此属性可以灵活地定义数据库连接。有效值包括原始 TNS 名称和 RAC 连接字符串。

无默认值

要连接的 Oracle 可插拔数据库的名称。仅在容器数据库 (CDB) 安装中使用此属性。

无默认值

主题前缀,为连接器从中捕获更改的 Oracle 数据库服务器提供命名空间。您设置的值将用作连接器发出的所有 Kafka 主题名称的前缀。为您的 Debezium 环境中的所有连接器指定一个唯一的主题前缀。有效字符包括:字母数字字符、连字符、点和下划线。

请勿更改此属性的值。如果您更改名称值,重新启动后,连接器将不会继续将事件发送到原始主题,而是会将后续事件发送到名称基于新值的那些主题。连接器也无法恢复其数据库模式历史主题。

logminer

连接器在流式传输数据库更改时使用的适配器实现。您可以设置以下值:

logminer(默认)

连接器使用原生的 Oracle LogMiner API 和连接器级别的缓冲。

logminer_unbuffered

连接器使用原生的 Oracle LogMiner API 和数据库级别的缓冲。

olr

连接器使用 OpenLogReplicator。

xstream

连接器使用 Oracle XStream API。

initial (初始)

指定连接器用于捕获表快照的模式。您可以设置以下值:

always (始终)

快照包括捕获表的结构和数据。指定此值以在每次连接器启动时,使用捕获表中数据的完整表示形式填充主题。

initial (初始)

快照包括捕获表的结构和数据。指定此值以使用捕获表中数据的完整表示形式填充主题。如果快照成功完成,下次连接器启动时将不再执行快照。

initial_only (仅初始)

快照包括捕获表的结构和数据。连接器执行初始快照,然后停止,而不处理任何后续更改。

schema_only (仅模式)

已弃用,请参阅 no_data

no_data (无数据)

快照仅包括捕获表的结构。如果您希望连接器仅捕获快照后发生的更改的数据,请指定此值。

schema_only_recovery (仅模式恢复)

Deprecated, see recovery. (已弃用,请参阅 recovery。)

recovery (恢复)

这是已在捕获更改的连接器的恢复设置。当您重新启动连接器时,此设置可以恢复损坏或丢失的数据库模式历史主题。您可以定期设置它以“清理”一个意外增长的数据库模式历史主题。数据库模式历史主题需要无限保留。注意,只有在保证从连接器关闭之前的时间点到拍摄快照的时间点之间没有发生任何模式更改时,此模式才是安全的。

快照完成后,连接器将继续从数据库的重做日志读取更改事件,除非 snapshot.mode 配置为 initial_only

when_needed (需要时)

After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)

  • It cannot detect any topic offsets. (无法检测到任何主题偏移量。)

  • A previously recorded offset specifies a log position that is not available on the server. (先前记录的偏移量指定了一个服务器上不可用的日志位置。)

configuration_based (基于配置)

使用此选项,您可以通过一组以 'snapshot.mode.configuration.based' 前缀开头的连接器属性来控制快照行为。

custom (自定义)

连接器根据 snapshot.mode.custom.name 属性指定的实现执行快照,该属性定义了 io.debezium.spi.snapshot.Snapshotter 接口的自定义实现。

有关更多信息,请参阅 snapshot.mode 选项表

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表数据。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定连接器在执行快照时是否包含表模式。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定快照完成后连接器是否开始流式传输更改事件。

false

如果 snapshot.mode 设置为 configuration_based,请设置此属性以指定在模式历史记录主题不可用时,连接器是否在快照中包含表模式。

false

如果 snapshot.mode «设置» «为» configuration_based,「此» «属性» «指定» «是否» «在» «事务» «日志» «中» «未» «找到» «最后» «提交» «的» «偏移量» «时»,«连接器» «尝试» «快照» «表» «数据»。
将 «值» «设置» «为» true «以» «指示» «连接器» «执行» «新» «快照»。

无默认值

如果 snapshot.mode 设置为 custom,请使用此设置指定 'io.debezium.spi.snapshot.Snapshotter' 接口中定义的 name() 方法提供的自定义实现名称。连接器重启后,Debezium 会调用指定的自定义实现来确定是否执行快照。有关更多信息,请参阅自定义快照程序 SPI

shared

控制连接器持有表锁的时间。表锁可防止在连接器执行快照期间发生某些类型的更改操作。您可以设置以下值:

shared

允许对表的并发访问,但阻止任何会话获取表排他锁。连接器在捕获表模式时获取 ROW SHARE 级别的锁。

none

«防止» «连接器» «在» «快照» «期间» «获取» «任何» «表» «锁»。仅当 «创建» «快照» «期间» «可能» «不会» «发生» «模式» «更改» «时» «才» «使用» «此» «设置»。

custom (自定义)

连接器根据 snapshot.locking.mode.custom.name 属性指定的实现执行快照,该属性是 io.debezium.spi.snapshot.SnapshotLock 接口的自定义实现。

无默认值

snapshot.locking.mode 设置为 custom 时,使用此设置指定由“io.debezium.spi.snapshot.SnapshotLock”接口定义的 name() 方法提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

select_all

指定连接器在执行快照时如何查询数据。
设置以下选项之一:

select_all

连接器«默认» «执行» «一个» select all «查询»,「并» «可选» «地» «根据» «列» «包含» «和» «排除» «列表» «配置» «调整» «所选» «的» «列»。

custom (自定义)

连接器根据 snapshot.query.mode.custom.name 属性指定的实现执行快照查询,该属性定义了 io.debezium.spi.snapshot.SnapshotQuery 接口的自定义实现。

此设置允许您比使用 snapshot.select.statement.overrides 属性更灵活地管理快照内容。

无默认值

snapshot.query.mode 设置为 custom 时,使用此设置指定由“io.debezium.spi.snapshot.SnapshotQuery”接口定义的 name() 方法提供的自定义实现的名称。有关更多信息,请参阅 自定义快照器 SPI

连接器 table.include.list 属性中指定的所有表。

一个可选的、逗号分隔的正则表达式列表,匹配要包含在快照中的表的完全限定名称(<databaseName>.<schemaName>.<tableName>)。

在多租户容器数据库 (CDB) 环境中,正则表达式必须包含 可插拔数据库 (PDB) 名称,格式为 <pdbName>.<schemaName>.<tableName>

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
在使用 LogMiner 实现的环境中,您只能使用 POSIX 正则表达式。

快照只能包含在连接器的 table.include.list 属性中命名的表。

仅当连接器的 snapshot.mode 属性设置为除 never 之外的值时,此属性才生效。
此属性不影响增量快照的行为。

无默认值

指定要在快照中包含的表行。如果您希望快照仅包含表中一部分行,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。

此属性包含一个逗号分隔的完全限定表名列表,格式为 <schemaName>.<tableName>。例如:

"snapshot.select.statement.overrides": "inventory.products,customers.orders"

对于列表中的每个表,添加一个额外的配置属性,该属性指定连接器在拍摄快照时在该表上运行的 SELECT 语句。指定的 SELECT 语句决定了要在快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

snapshot.select.statement.overrides.<schemaName>.<tableName>

例如,snapshot.select.statement.overrides.customers.orders

示例

从包含软删除列 delete_flagcustomers.orders 表中,如果要快照仅包含未软删除的记录,请添加以下属性:

"snapshot.select.statement.overrides": "customer.orders",
"snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC"

在生成的快照中,连接器仅包含 delete_flag = 0 的记录。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您*希望*捕获更改的模式的名称。在 LogMiner 实现的环境中,您必须仅使用 POSIX 正则表达式。未包含在 schema.include.list 中的任何模式名称都将被排除捕获其更改。默认情况下,所有非系统模式都捕获其更改。

为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果您在此配置中包含此属性,则不要同时设置 schema.exclude.list 属性。

false

布尔值,指定连接器是否应解析和发布有关元数据对象的表和列注释。启用此选项将影响内存使用量。逻辑模式对象的数量和大小主要影响 Debezium 连接器消耗的内存量,并且向每个对象添加可能很大的字符串数据可能非常昂贵。

无默认值

一个可选的、逗号分隔的正则表达式列表,用于匹配您希望捕获更改的模式名称。在使用 LogMiner 实现的环境中,您只能使用 POSIX 正则表达式。
除了系统模式外,所有未包含在 schema.exclude.list 中的模式名称都会捕获其更改。

为了匹配模式的名称,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式会与模式的整个名称字符串进行匹配;它不会匹配可能存在于模式名称中的子字符串。
如果在此配置中包含此属性,请勿设置 schema.include.list 属性。

无默认值

一个可选的逗号分隔的正则表达式列表,用于匹配要捕获的表的完全限定表标识符。如果您使用 LogMiner 实现,请仅为此属性使用 POSIX 正则表达式。设置此属性时,连接器仅从指定的表中捕获更改。每个表标识符的格式如下:

<schema_name>.<table_name>

默认情况下,连接器会监视每个捕获数据库中的所有非系统表。

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
如果您在配置中包含此属性,请不要同时设置 table.exclude.list 属性。

无默认值

一个可选的逗号分隔的正则表达式列表,用于匹配要从监视中排除的表的完全限定表标识符。如果您使用 LogMiner 实现,请仅为此属性使用 POSIX 正则表达式。连接器会捕获排除列表中未指定的任何表的更改事件。使用以下格式指定每个表的标识符:

<schemaName>.<tableName>.

为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。
如果您在配置中包含此属性,请不要同时设置 table.include.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,用于匹配您希望包含在更改事件消息值中的列的完全限定名称。在使用 LogMiner 实现的环境中,您只能使用 POSIX 正则表达式。列的完全限定名称的格式如下:

<Schema_name>.<table_name>.<column_name>

即使您没有在此属性中显式包含主键列的值,它也始终包含在事件的键中。

为了匹配列名,Debezium 会将您指定的正则表达式视为*锚定*正则表达式。也就是说,指定的表达式会与列名的整个字符串进行匹配,而不是匹配列名中可能存在的子字符串。
如果您在此配置中包含此属性,则不要同时设置 column.exclude.list 属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,用于匹配您希望从更改事件消息值中排除的列的完全限定名称。在使用 LogMiner 实现的环境中,您只能使用 POSIX 正则表达式。列的完全限定名称的格式如下:

<schema_name>.<table_name>.<column_name>

即使您在此属性中显式排除主键列的值,它也始终包含在事件的键中。

为了匹配列名,Debezium 会将您指定的正则表达式视为*锚定*正则表达式。也就是说,指定的表达式会与列名的整个字符串进行匹配,而不是匹配列名中可能存在的子字符串。
如果您在此配置中包含此属性,请不要设置 column.include.list 属性。

false

指定当包含的列没有更改时是否跳过发布消息。这实际上会过滤掉没有包含列更改的消息(根据 column.include.listcolumn.exclude.list 属性)。

n/a

一个可选的、逗号分隔的正则表达式列表,用于匹配基于字符的列的完全限定名称。列的完全限定名称的格式为 <schemaName>.<tableName>.<columnName>
为了匹配列名,Debezium 会将您指定的正则表达式视为*锚定*正则表达式。也就是说,指定的表达式会与列名的整个字符串进行匹配;表达式不会匹配列名中可能存在的子字符串。
在生成的更改事件记录中,指定列的值将被替换为假名。

假名由应用指定的hashAlgorithmsalt 产生的哈希值组成。根据使用的哈希函数,可以维护引用完整性,同时用假名替换列值。支持的哈希函数在 Java Cryptography Architecture 标准算法名称文档的MessageDigest 部分中进行了描述。

在下面的示例中,CzQMA0cB5K 是随机选择的 salt。

column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName

如果需要,假名会自动截断为列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。

取决于使用的hashAlgorithm、选择的salt 和实际数据集,生成的数据集可能无法完全隐藏。

应使用哈希策略版本 2 来确保在不同位置或系统上对值进行哈希处理时的保真度。

bytes

指定如何在更改事件中表示二进制 (blob) 列,包括:bytes 将二进制数据表示为字节数组(默认),base64 将二进制数据表示为 base64 编码的字符串,base64-url-safe 将二进制数据表示为 base64-url-safe 编码的字符串,hex 将二进制数据表示为十六进制编码(base16)字符串。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

precise

指定连接器应如何处理 NUMBERDECIMALNUMERIC 列的浮点值。您可以设置以下选项之一:

precise(默认)

使用 java.math.BigDecimal 值精确表示,这些值在更改事件中以二进制形式表示。

double

使用 double 值表示。使用 double 值更简单,但可能导致精度损失。

string

将值编码为格式化字符串。使用 string 选项更容易解析,但会导致关于实际类型的语义信息丢失。有关更多信息,请参阅数值类型

numeric

指定连接器如何处理 interval 列的值。

numeric 使用大约微秒数表示间隔。

string 使用字符串模式表示 P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S 精确表示间隔。例如:P1Y2M3DT4H5M6.78S

fail

指定在处理事件期间发生异常时连接器应如何响应。您可以设置以下选项之一:

fail

传播异常(指示有问题事件的偏移量),导致连接器停止。

warn

跳过有问题事件。然后记录有问题事件的偏移量。

skip

跳过有问题事件。

2048

一个正整数值,指定此连接器每次迭代处理的事件批次的最大大小。

8192

一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 从数据库读取流式传输的事件时,它会将事件放入阻塞队列,然后将其写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。队列中保存的事件在连接器定期记录偏移量时将被忽略。max.queue.size 的值应始终大于 max.batch.size 的值。

0(禁用)

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果还设置了 max.queue.size,当队列大小达到任一属性指定的限制时,将阻止写入队列。例如,如果设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后,或在队列中的记录量达到 5000 字节后,将阻止写入队列。

500(0.5 秒)

一个正整数值,指定连接器在每次迭代中等待新更改事件出现的时间(以毫秒为单位)。

true

一个布尔值,指定连接器是否将数据库模式中的更改发布到与主题前缀同名的 Kafka 主题。连接器将每个模式更改记录下来,键包含数据库名称,值为描述模式更新的 JSON 结构。此记录模式更改的机制独立于连接器对数据库模式历史记录更改的内部记录。

true

控制“删除”事件后是否会跟一个墓碑事件。可能的值如下:

true

对于每个删除操作,连接器会发出一个“删除”事件和一个随后的墓碑事件。

false

对于每个删除操作,连接器仅发出一个“删除”事件。

在源记录被删除后,墓碑事件(默认行为)可让 Kafka 完全删除主题中具有与已删除行相同键的所有事件,前提是该主题已启用日志压缩

无默认值

«表达式» 列表,用于指定连接器用来为 «发布» 到指定表 «Kafka» «主题» 的更改事件 «记录» «形成» «自定义» «消息» «键» 的 «列»。

默认情况下,Debezium 使用表的 «主键» «列» 作为它发出的 «记录» 的 «消息» «键»。 «代替» 默认值,或为 «缺少» «主键» 的表指定 «键»,您可以根据一个或多个 «列» «配置» «自定义» «消息» «键»。
要为表建立自定义消息键,请列出该表,然后列出用作消息键的列。每个列表条目采用以下格式:

<fullyQualifiedTableName>:<keyColumn>,<keyColumn>

要基于多个列名设置表键,请在列名之间插入逗号。
每个完全限定的表名都是一个正则表达式,格式如下:

<schemaName>.<tableName>

该属性可以包含多个表的条目。使用分号分隔列表中的表条目。
以下示例设置了 inventory.customerspurchase.orders 表的消息键:

inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

对于表 inventory.customer,列 pk1pk2 被指定为消息键。对于任何模式中的 purchaseorders 表,列 pk3pk4 用作消息键。
用于创建自定义消息键的列数没有限制。但是,最好使用唯一键所需的最小数量。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望连接器掩盖一组列的值,请设置此属性(例如,如果它们包含敏感数据)。将 length 设置为正整数,以将指定列中的数据替换为属性名称中由 length 指定的星号(*)字符的数量。将 length 设置为 0(零)以将指定列中的数据替换为空字符串。

完全限定的列名遵循以下格式:<schemaName>.<tableName>.<columnName>。要匹配列名,Debezium 会将您指定的正则表达式作为锚定正则表达式进行匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;表达式不匹配可能存在于列名称中的子字符串。

您可以在单个配置中指定多个具有不同长度的属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,用于通过用星号 (*) 替换字符来屏蔽更改事件消息中的列名。
在属性名称中指定要替换的字符数,例如 column.mask.with.8.chars
将长度指定为正整数或零。然后,为要在其中应用屏蔽的每个基于字符的列名添加到列表中。对于要应用屏蔽的每个基于字符的列名,添加相应的正则表达式。
使用以下格式指定列的完全限定名称:<schemaName>.<tableName>.<columnName>

连接器配置可以包含多个指定不同长度的属性。

无默认值

一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称。设置此属性时,连接器会将以下字段添加到事件记录的模式中:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

列的完全限定名称遵循以下格式之一:<tableName>.<columnName><schemaName>.<tableName>.<columnName>
为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。

无默认值

一个可选的、逗号分隔的 «正则表达式» 列表,用于指定数据库中 «列» 定义的 «数据类型» 的«完全限定»名称。设置此属性时,对于具有匹配 «数据类型» 的 «列»,连接器会发出包含以下额外字段的 «模式» 的事件记录:

  • __debezium.source.column.type

  • __debezium.source.column.length

  • __debezium.source.column.scale

这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。
启用连接器发出此额外数据有助于正确确定接收器数据库中特定数字或基于字符的列的大小。

列的完全限定名称遵循以下格式之一:<tableName>.<typeName><schemaName>.<tableName>.<typeName>
为了匹配 «数据类型» 的名称,Debezium 将您指定的 «正则表达式» 应用为*锚定的* «正则表达式»。也就是说,指定的表达式会针对 «数据类型» 的整个名称字符串进行匹配;该 «表达式» «不会» 匹配可能存在于 «类型» 名称中的子字符串。

有关 Oracle 特定数据类型名称列表,请参阅Oracle 数据类型映射

0

以毫秒为单位指定连接器向心跳主题发送消息的频率。
使用此属性确定连接器是否继续从源数据库接收更改事件。
在捕获的表在很长一段时间内没有发生更改事件的情况下,设置此属性也可能很有用。
在这种情况下,尽管连接器继续读取重做日志,但它不会发出任何更改事件消息,因此 Kafka 主题中的偏移量保持不变。由于连接器不会刷新它从数据库读取的最新系统更改号 (SCN),数据库可能会比必要保留重做日志文件更长时间。如果连接器重新启动,过长的保留期可能导致连接器冗余发送一些更改事件。
默认值 0 可阻止连接器发送任何心跳消息。

无默认值

指定连接器在发送心跳消息时在源数据库上执行的查询。

For example (例如:)

INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')

连接器在发出心跳消息后运行此查询。

设置此属性并创建一个心跳表以接收心跳消息,以解决Debezium 无法同步位于同一主机上的低流量数据库与高流量数据库的偏移量的情况。在连接器将记录插入到配置的表中后,它就能够接收来自低流量数据库的更改并确认数据库中的 SCN 更改,从而使偏移量能够与代理同步。

无默认值

指定连接器在启动后等待快照的时间间隔(以毫秒为单位)。
使用此属性可防止在群集中启动多个连接器时发生快照中断,这可能会导致连接器重新平衡。

0

指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 offset.flush.interval.ms 属性值的延迟值。

10000

指定在快照时从每个表一次读取的最大行数。连接器以指定大小的多个批次读取表内容。

10000

指定每次数据库往返查询将获取的行数。使用值为 0 将使用 JDBC 驱动程序的默认提取大小。

false

如果您希望 Debezium 生成包含事务边界的事件,并将数据事件的信封丰富为事务元数据,请将属性设置为 true

有关更多详细信息,请参阅事务元数据

online_catalog

指定用于解析表和列 ID 到名称的日志挖掘策略,该策略控制 Oracle LogMiner 如何构建和使用给定的数据字典。
设置以下选项之一:

redo_log_catalog

将数据字典写入在线重做日志,随着时间的推移生成更多的归档日志。这还允许跟踪对捕获表的 DDL 更改,因此如果模式经常更改,这是理想选择。

online_catalog

使用数据库当前的字典来解析对象 ID,并且不将任何额外信息写入在线重做日志。这使得 LogMiner 可以更快地挖掘,但代价是无法跟踪 DDL 更改。如果捕获的表模式更改不频繁或从不更改,这是理想选择。

hybrid

结合使用数据库当前的字典和 Debezium 的内存模式模型来无缝解析表和列名。此模式的性能与 online_catalog LogMiner 策略相当,并且具有 redo_log_catalog 策略的模式跟踪恢复能力,同时不会产生 redo_log_catalog 策略的归档日志生成开销和性能成本。

none

指定控制 Oracle LogMiner 查询如何构建的挖掘查询模式。
设置以下选项之一:

none

查询在不进行任何模式、表或用户名过滤的情况下生成。

in

查询使用标准的 SQL IN 子句生成,以在数据库端过滤模式、表和用户名。模式、表和用户名配置的包含/排除列表不应指定任何正则表达式,因为查询是使用值直接构建的。

regex

查询使用 Oracle 的 REGEXP_LIKE 运算符在数据库端过滤模式和表名,并使用 SQL IN 子句过滤用户名。模式和表配置的包含/排除列表可以安全地指定正则表达式。

memory

缓冲区类型控制连接器如何管理事务数据的缓冲。

memory - 使用 JVM 进程的堆来缓冲所有事务数据。如果您不希望连接器处理大量长时间运行或大型事务,请选择此选项。激活此选项后,缓冲区状态不会在重启之间持久化。重启后,从当前偏移量的 SCN 值重新创建缓冲区。

infinispan_embedded - 此选项使用嵌入式 Infinispan 缓存来缓冲事务数据并将其持久化到磁盘。

infinispan_remote - 此选项使用远程 Infinispan 集群来缓冲事务数据并将其持久化到磁盘。

0

事务在事务缓冲区中最多可以包含的事件数。事件数超过此阈值的事务将不会发出,并将被放弃。默认行为是没有事务事件阈值。

无默认值

Infinispan 全局配置的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 事务缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 事件缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 已处理事务缓存的 XML 配置。有关更多信息,请参阅Infinispan 事件缓冲

无默认值

Infinispan 模式更改缓存的 XML 配置。

false

指定在连接器以正常、预期的方式停止后是否删除缓冲区状态。

此设置仅影响在重启之间持久化状态的缓冲区实现,例如 infinispan
默认行为是缓冲区状态在重启之间始终保留。

仅在测试或开发环境中设置为 true

0

一个 LogMiner 会话可以保持活动状态的最长时间(以毫秒为单位),之后将使用新会话。

对于低流量系统,如果长时间使用同一个 LogMiner 会话,它可能会消耗过多的 PGA 内存。默认行为是仅在检测到日志切换时才使用新的 LogMiner 会话。通过将此值设置为大于 0 的值,指定 LogMiner 会话可以保持活动状态的最长时间,之后它将被停止和启动,以释放和重新分配 PGA 内存。

false

指定在日志切换或挖掘会话达到最大生命周期阈值时是否关闭并重新打开 JDBC 连接。

默认情况下,JDBC 连接不会在日志切换或最大会话生命周期之间关闭。
如果您遇到 LogMiner 导致 Oracle SGA 过度增长的问题,则应启用此选项。

1000

此连接器尝试从重做/归档日志读取的最小 SCN 间隔大小。

100000

连接器从重做/归档日志读取时使用的最大 SCN 间隔大小。

20000

连接器用于从重做/归档日志读取的间隔的增加/减少量。

20000

连接器用于从重做/归档日志读取数据时的起始 SCN 间隔大小。这也可以用作调整批次大小的度量 - 当当前 SCN 与批次的开始/结束 SCN 之间的差值大于此值时,批次大小会增加/减少。

0

连接器从重做/归档日志读取数据后,在再次开始读取数据之前休眠的最短时间。值为毫秒。

3000

连接器从重做/归档日志读取数据后,在再次开始读取数据之前休眠的最长时间。值为毫秒。

1000

连接器从重做/归档日志读取数据后,在再次开始读取数据之前休眠的起始时间。值为毫秒。

200

连接器在读取日志挖掘数据时用于调整最佳睡眠时间的最大增减量。值为毫秒。

0

从 SYSDATE 开始回溯挖掘归档日志的小时数。当使用默认设置 (0) 时,连接器会挖掘所有归档日志。

false

控制连接器仅从归档日志中挖掘更改,还是从在线重做日志和归档日志的组合中挖掘更改(默认)。

重做日志使用循环缓冲区,可以在任何时候归档。在在线重做日志频繁归档的环境中,这可能导致 LogMiner 会话失败。与重做日志不同,归档日志保证可靠。将此选项设置为 true 以强制连接器仅挖掘归档日志。将连接器设置为仅挖掘归档日志后,操作提交与连接器发出相应更改事件之间的时间延迟可能会增加。延迟的程度取决于数据库配置归档在线重做日志的频率。

10000

连接器在轮询以确定起始系统更改号是否存在于归档日志中之间休眠的毫秒数。如果未启用 log.mining.archive.log.only.mode,则不使用此设置。

0

一个正整数值,指定在重做日志切换之间保留长时间运行事务的毫秒数。设置为 0 时,事务将一直保留,直到检测到提交或回滚。

默认情况下,LogMiner 适配器维护一个所有正在运行的事务的内存缓冲区。由于事务的所有 DML 操作都会被缓冲,直到检测到提交或回滚,因此应避免长时间运行的事务,以免溢出该缓冲区。任何超过此配置值的事务都将被完全丢弃,连接器不会为该事务中的操作发出任何消息。

无默认值

指定在使用 LogMiner 挖掘归档日志时要使用的已配置 Oracle 归档目标。

默认行为是自动选择第一个有效的本地配置的目标。但是,您可以通过提供目标名称来使用特定目标,例如 LOG_ARCHIVE_DEST_5

有关在主实例和备用实例使用不同名称的环境中指定归档目标的信息,请参阅归档日志目标

无默认值

包含在 LogMiner 查询中的数据库用户列表。如果您希望捕获过程包含指定用户的更改,设置此属性可能很有用。

无默认值

从 LogMiner 查询中排除的数据库用户列表。如果您希望捕获过程始终排除特定用户的更改,设置此属性可能很有用。

无默认值

包含在 LogMiner 查询中的 JDBC 连接客户端 ID 列表。如果您希望捕获过程仅包含来自特定 JDBC 连接客户端的更改,设置此属性可能很有用。

无默认值

从 LogMiner 查询中排除的 JDBC 连接客户端 ID 列表。如果您希望捕获过程始终排除特定 JDBC 连接客户端的更改,设置此属性可能很有用。

1000000

指定连接器将其与当前 SCN 值和上一个 SCN 值之间的差值进行比较,以确定是否存在 SCN 间隙的值。如果 SCN 值之间的差值大于指定值,并且时间差小于 log.mining.scn.gap.detection.time.interval.max.ms,则检测到 SCN 间隙,连接器将使用大于配置的最大批次的挖掘窗口。

20000

指定一个值(以毫秒为单位),连接器将其与当前 SCN 时间戳和上一个 SCN 时间戳之间的差值进行比较,以确定是否存在 SCN 间隙。如果时间戳之间的差值小于指定值,并且 SCN 差值大于 log.mining.scn.gap.detection.gap.size.min,则检测到 SCN 间隙,连接器将使用大于配置的最大批次的挖掘窗口。

LOG_MINING_FLUSH

指定用于协调将 Oracle LogWriter Buffer (LGWR) 刷新到重做日志的刷新表名称。可以使用 <schemaName>.<tableName><tableName> 格式指定此名称。通常,多个连接器可以使用同一个刷新表。但是,如果连接器遇到表锁定争用错误,请使用此属性为每个连接器部署指定一个专用表。

false

指定重做日志构造的 SQL 语句是否包含在 source.redo_sql 字段中。使用 XStream 或 OpenLogReplicator 适配器时,将忽略此配置。

false

控制是否在更改事件中发出大型对象(CLOB 或 BLOB)列的值。

默认情况下,更改事件包含大型对象列,但列中没有值。处理和管理大型对象列类型和有效载荷会产生一定的开销。要捕获大型对象值并将其序列化到更改事件中,请将此选项设置为 true

__debezium_unavailable_value

指定连接器提供的常量,用于指示原始值未更改且未由数据库提供。

无默认值

Oracle Real Application Clusters (RAC) 节点主机名或地址的逗号分隔列表。此字段是启用与 Oracle RAC 部署的兼容性所必需的。

使用以下任一方法指定 RAC 节点列表:

  • database.port 指定值,并为 rac.nodes 列表中的每个地址使用指定端口值。例如:

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101
  • database.port 指定值,并覆盖列表中一个或多个条目的默认端口。列表可以包含使用默认 database.port 值的条目,以及定义其自身唯一端口值的条目。例如:

    database.port=1521
    rac.nodes=192.168.1.100,192.168.1.101:1522

如果您通过 database.url 属性为数据库提供原始 JDBC URL,而不是为 database.port 定义值,则每个 RAC 节点条目都必须显式指定端口值。

t

一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:

  • c (insert/create)

  • u (update)

  • d (delete)

  • t (truncate)

默认情况下,仅跳过 TRUNCATE 操作。

无默认值

用于向连接器发送信号的数据集合的完全限定名称。当您将此属性与 Oracle 可插入数据库 (PDB) 结合使用时,请将其值设置为根数据库的名称。
使用以下格式指定集合名称
<databaseName>.<schemaName>.<tableName>

source (源)

为连接器启用的信号通道名称列表。默认情况下,以下通道可用:

无默认值

为连接器启用的通知通道名称列表。默认情况下,以下通道可用:

1024

在增量快照块期间,连接器获取并加载到内存中的最大行数。增加块大小可以提高效率,因为快照运行的快照查询次数较少但规模更大。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为能为您的环境提供最佳性能的值。

insert_insert

指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
您可以指定以下选项之一:

insert_insert

当您发送信号以启动增量快照时,对于 Debezium 在快照期间读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,Debezium 会插入第二个条目,记录关闭窗口的信号。

insert_delete

当您发送信号以启动增量快照时,对于 Debezium 读取的每个块,它都会在信号数据集合中写入一个条目,以记录打开快照窗口的信号。快照完成后,此条目将被删除。不会为关闭快照窗口的信号创建条目。设置此选项可防止信号数据集合快速增长。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

.

指定主题名称的分隔符,默认为 .

10000

用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。

__debezium-heartbeat

控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下:

topic.heartbeat.prefix.topic.prefix

例如,如果主题前缀是 fulfillment,则默认主题名称为 __debezium-heartbeat.fulfillment

transaction

控制连接器发送事务元数据消息的主题的名称。主题名称的模式为:

topic.prefix.topic.transaction

例如,如果主题前缀是 fulfillment,则默认主题名称是 fulfillment.transaction

1

指定连接器在执行初始快照时使用的线程数。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会并发处理多个表。

当您启用并行初始快照时,执行每个表快照的线程可能需要不同的时间来完成工作。如果一个表的快照比其他表的快照花费的时间显著长,则已完成工作的线程将处于空闲状态。在某些环境中,网络设备(如负载均衡器或防火墙)会终止长时间空闲的连接。快照完成后,连接器将无法关闭连接,从而导致异常和快照不完整,即使在连接器成功传输了所有快照数据的情况下也是如此。

如果您遇到此问题,请将 snapshot.max.threads 的值恢复为 1,然后重试快照。

0

指定在发生数据库错误时尝试快照表的最大重试次数。此配置属性目前仅重试与 ORA-01466 异常相关的失败。默认情况下,不会执行额外的重试。

无默认值

定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如:
k1=v1,k2=v2

连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称

-1

指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

600000

等待查询执行的时间,以毫秒为单位。默认为 600 秒(600,000 毫秒);零表示没有限制。

0

指定连接器可以捕获的最大表数。超过此限制将触发 guardrail.collections.limit.action 指定的操作。将此属性设置为 0 可防止连接器触发保护措施。

warn

指定如果连接器捕获的表数超过 guardrail.collections.max 属性中指定的数量时触发的操作。将属性设置为以下值之一:

fail

连接器失败并报告异常。

warn

连接器记录警告。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。

这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

Debezium Oracle 连接器数据库模式历史记录配置属性

Debezium 提供了一组 schema.history.internal.* 属性来控制连接器如何与模式历史记录主题进行交互。

下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

表 16. 连接器数据库模式历史记录配置属性
属性 Default (默认值) 描述

无默认值

连接器存储数据库模式历史的 Kafka 主题的完整名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史,以及写入从源数据库读取的每个 DDL 语句。每个对都应指向 Kafka Connect 进程使用的相同 Kafka 集群。

100

一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认为 100 毫秒。

3000

一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。

30000

一个整数值,指定连接器在使用 Kafka 管理客户端创建 Kafka 历史主题时应等待的最大毫秒数。

100

连接器在连接器恢复因错误而失败之前尝试读取持久历史数据的最大次数。在收到无数据后的最长等待时间为 recovery.attempts × recovery.poll.interval.ms

false

一个布尔值,指定连接器是应忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。安全默认值为 false。跳过操作应谨慎使用,因为它可能导致在处理 binlog 时丢失或损坏数据。

false

一个布尔值,指定连接器是记录模式或数据库中所有表的模式结构,还是仅记录指定用于捕获的表的模式结构。
指定以下值之一:

false (默认)

在数据库快照期间,连接器会记录所有非系统表的模式数据,包括未指定用于捕获的表。最好保留默认设置。如果您稍后决定捕获未最初指定用于捕获的表的更改,连接器可以轻松开始从这些表中捕获数据,因为它们的模式结构已存储在模式历史记录主题中。Debezium 需要表的模式历史记录,以便它能够识别更改事件发生时存在的结构。

true

在数据库快照期间,连接器仅记录 Debezium 捕获更改事件的表的表模式。如果您更改默认值,并且稍后配置连接器以捕获数据库中的其他表的数据,则连接器将缺少捕获表中更改事件所需的模式信息。

false

一个布尔值,指定连接器是记录实例中所有逻辑数据库的模式结构。
指定以下值之一:

true

连接器仅记录 Debezium 捕获更改事件的逻辑数据库和模式中的表的模式结构。

false

连接器记录所有逻辑数据库的模式结构。

传递的 Oracle 连接器配置属性

该连接器支持直通式属性,使 Debezium 能够为微调 Apache Kafka 生产者和消费者行为指定自定义配置选项。有关 Kafka 生产者和消费者配置属性的完整范围,请参阅 Kafka 文档

用于配置生产者和消费者客户端如何与模式历史记录主题交互的传递属性

Debezium 依赖 Apache Kafka 生产者将模式更改写入数据库模式历史记录主题。同样,它依赖 Kafka 消费者在连接器启动时从数据库模式历史记录主题读取。您可以通过为一组以 schema.history.internal.producer.*schema.history.internal.consumer.* 前缀开头的传递配置属性赋值来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库模式历史记录属性控制一系列行为,例如这些客户端如何与 Kafka 代理建立安全连接,如下例所示:

schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234

schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234

Debezium 会从属性名称中剥离前缀,然后再将属性传递给 Kafka 客户端。

有关 Kafka 生产者配置属性Kafka 消费者配置属性 的更多信息,请参阅 Apache Kafka 文档。

用于配置 Oracle 连接器如何与 Kafka 信号主题交互的传递属性

Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。

下表描述了 Kafka signal 属性。

表 17. Kafka 信号配置属性
属性 Default (默认值) 描述

<topic.prefix>-signal

连接器监视的用于临时信号的 Kafka 主题的名称。

如果自动主题创建被禁用,则必须手动创建所需的信号主题。信号主题对于保留信号顺序是必需的。信号主题必须只有一个分区。

kafka-signal

Kafka 消费者使用的组 ID 的名称。

无默认值

连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。

100

一个整数值,指定连接器在轮询信号时等待的最大毫秒数。

用于配置信号通道的 Kafka 消费者客户端的传递属性

Debezium 为信号 Kafka 消费者的传递配置提供了支持。传递信号属性以 signal.consumer.* 前缀开头。例如,连接器会将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给 Kafka 信号消费者。

用于配置 Oracle 连接器接收器通知通道的传递属性

下表描述了可用于配置 Debezium sink notification 通道的属性。

表 18. 接收器通知配置属性
属性 Default (默认值) 描述

无默认值

接收来自 Debezium 的通知的主题名称。当您将 notification.enabled.channels 属性设置为包含 sink 作为启用的通知通道之一时,需要此属性。

Debezium 连接器传递数据库驱动程序配置属性

Debezium 连接器支持数据库驱动程序的传递配置。传递数据库属性以 driver.* 前缀开头。例如,连接器会将 driver.foobar=false 等属性传递给 JDBC URL。

Debezium 在将前缀从属性名称中剥离,然后再将属性传递给数据库驱动程序。

监视

除了 Apache Kafka 和 Kafka Connect 内置的 JMX 指标支持外,Debezium Oracle 连接器还提供三种指标类型。

有关如何通过 JMX 公开这些指标的详细信息,请参阅监视文档

自定义 MBean 名称

Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。

默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。

为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。

配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。

使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。

以下示例说明了标签如何修改默认 MBean 名称。

示例 4. 自定义标签如何修改连接器 MBean 名称

默认情况下,Oracle 连接器使用以下 MBean 名称用于流式处理指标:

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>

如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:

debezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory

快照指标

MBeandebezium.oracle:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

Attributes Type 描述

string

连接器已读取的最后一个快照事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 0

long

自上次启动或重置以来,此连接器已看到的事件总数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。

int

包含在快照中的表总数。

int

快照尚未复制的表数。

boolean

快照是否已启动。

boolean

快照是否已暂停。

boolean

快照是否被中止。

boolean

快照是否已完成。

boolean

快照是否被跳过。

long

快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。

long

快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

long

队列的最大缓冲区(字节)。如果 max.queue.size.in.bytes 设置为正长整型值,则此指标可用。

long

队列中记录的当前字节卷。

当执行增量快照时,连接器还提供以下附加快照指标:

Attributes Type 描述

string

当前快照块的标识符。

string

定义当前块的主键集的下限。

string

定义当前块的主键集的上限。

string

当前快照表的组成键集(主键)的下限。

string

当前快照表的组成键集(主键)的上限。

流式处理指标

MBeandebezium.oracle:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

Attributes Type 描述

string

连接器已读取的最后一个流式传输事件。

long

自连接器读取和处理最近事件以来经过的毫秒数。

long

记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 0

long

自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。

long

自上次启动或重置指标以来,连接器处理的总创建事件数。

long

自上次启动或重置指标以来,连接器处理的总更新事件数。

long

自上次启动或重置指标以来,连接器处理的总删除事件数。

long

已被连接器配置的包含/排除列表过滤规则过滤的事件数量。

string[]

由连接器捕获的表列表。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。

int

用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。

boolean

表示连接器当前是否连接到数据库服务器的标志。

long

上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。

long

已提交的处理过的事务数。

Map<String, String>

收到的最后一个事件的坐标。

string

已处理的最后一个事务的事务标识符。

long

队列的最大缓冲区(字节)。如果 max.queue.size.in.bytes 设置为正长整型值,则此指标可用。

long

队列中记录的当前字节卷。

Debezium Oracle 连接器还提供以下其他流式处理指标:

表 19. 其他流式处理指标的描述
Attributes Type 描述

BigInteger

已处理的最新系统更改号。

BigInteger

事务缓冲区中最旧的系统更改号。

long

最旧系统更改号的年龄(以毫秒为单位)。如果缓冲区为空,则值为 0

BigInteger

事务缓冲区中最后一个已提交的系统更改号。

BigInteger

当前写入连接器偏移量的系统更改号。

string[]

数据库当前在线重做日志文件的数组。

string[]

当前挖掘会话中所有归档和在线重做日志的数组。

long

为任何 LogMiner 会话指定的日志的最小数量。

long

为任何 LogMiner 会话指定的日志的最大数量。

string[]

数据库中每个在线重做日志的当前状态数组,格式为 filename|status

int

数据库在过去一天中执行日志切换的次数。

long

在上次 LogMiner 会话查询中观察到的 DML 操作数。

long

处理单个 LogMiner 会话查询时观察到的最大 DML 操作数。

long

观察到的 DML 操作总数。

long

执行的 LogMiner 会话查询(也称为批次)总数。

long

上次 LogMiner 会话查询的获取持续时间(以毫秒为单位)。

long

任何 LogMiner 会话查询获取的最大持续时间(以毫秒为单位)。

long

处理上次 LogMiner 查询批次结果的持续时间(以毫秒为单位)。

long

解析 DML 事件 SQL 语句所花费的时间(以毫秒为单位)。

long

启动上次 LogMiner 会话的持续时间(以毫秒为单位)。

long

启动 LogMiner 会话的最长持续时间(以毫秒为单位)。

long

连接器启动 LogMiner 会话所花费的总时间(以毫秒为单位)。

long

处理单个 LogMiner 会话结果的最小持续时间(以毫秒为单位)。

long

处理单个 LogMiner 会话结果的最大持续时间(以毫秒为单位)。

long

处理 LogMiner 会话结果的总持续时间(以毫秒为单位)。

long

JDBC 驱动程序从日志挖掘视图获取下一行以进行处理所花费的总时间(以毫秒为单位)。

long

在所有会话中从日志挖掘视图处理的总行数。

int

日志挖掘查询每次数据库往返获取的条目数。

long

连接器在从日志挖掘视图获取下一批结果之前休眠的毫秒数。

long

从日志挖掘视图处理的每秒最大行数。

long

从日志挖掘处理的平均每秒行数。

long

最后批次从日志挖掘视图处理的平均每秒行数。

long

事务在连接器的内存缓冲区中保留(在提交或回滚之前)的毫秒数,之后将被丢弃。有关更多信息,请参阅log.mining.transaction.retention.ms

long

连接器在 LogMiner 查询之间等待的毫秒数。

long

事务缓冲区中的事件数。

long

事务缓冲区中当前活动的事务数。

long

事务缓冲区中已提交的事务数。

long

由于大小超过 log.mining.buffer.transaction.events.threshold 而被丢弃的事务数。

long

事务缓冲区中已回滚的事务数。

long

在已提交的事务中回滚的事件数,在大多数用例中这意味着约束违规。

long

事务缓冲区中每秒已提交事务的平均数。

long

连接器处理的更改数。

long

更改发生在事务日志中与添加到事务缓冲区之间的毫秒时间差。

long

更改发生在事务日志中与添加到事务缓冲区之间的最大毫秒时间差。

long

更改发生在事务日志中与添加到事务缓冲区之间的最小毫秒时间差。

string[]

由于年龄过大而从事务缓冲区移除的最近被放弃的事务标识符数组。有关详细信息,请参阅log.mining.transaction.retention.ms

long

abandoned transactions 列表中的当前条目数。

string[]

在事务缓冲区中已挖掘并回滚的最近事务标识符数组。

long

上次事务缓冲区提交操作的持续时间(以毫秒为单位)。

long

最长的事务缓冲区提交操作的持续时间(以毫秒为单位)。

int

检测到的错误数。

int

检测到的警告数。

int

检查系统更改号是否已更新但保持不变的次数。高值可能表明正在进行长时间运行的事务,阻止了连接器将最近处理的系统更改号刷新到连接器的偏移量。在最佳条件下,该值应接近或等于 0

int

检测到但 DDL 解析器无法解析的 DDL 记录数。此值应始终为 0;但是,当允许跳过无法解析的 DDL 时,此指标可用于确定是否已将任何警告写入连接器日志。

long

当前挖掘会话的用户全局区域 (UGA) 内存消耗(以字节为单位)。

long

所有挖掘会话中当前挖掘会话的用户全局区域 (UGA) 内存消耗的最大值(以字节为单位)。

long

当前挖掘会话的进程全局区域 (PGA) 内存消耗(以字节为单位)。

long

所有挖掘会话中当前挖掘会话的进程全局区域 (PGA) 内存消耗的最大值(以字节为单位)。

模式历史记录指标

MBeandebezium.oracle:type=connector-metrics,context=schema-history,server=<topic.prefix>

下表列出了可用的模式历史记录指标。

Attributes Type 描述

string

STOPPEDRECOVERING(从存储中恢复历史记录)或 RUNNING 中的一个,描述数据库模式历史记录的状态。

long

恢复开始的时间(以 epoch 秒为单位)。

long

恢复阶段读取的更改数。

long

在恢复和运行时应用的总模式更改数。

long

自从从历史存储中恢复最后一个更改以来经过的毫秒数。

long

自应用最后一个更改以来经过的毫秒数。

string

从历史存储中恢复的最后一个更改的字符串表示。

string

已应用的最后一个更改的字符串表示。

代理模式演进

Oracle 连接器通过解析重做日志中的 DDL 自动跟踪和应用表模式更改。如果 DDL 解析器遇到不兼容的语句,如有必要,连接器将提供一种替代方法来应用模式更改。

默认情况下,连接器在遇到无法解析的 DDL 语句时会停止。您可以使用 Debezium 信号来触发对此类 DDL 语句的模式更新。

模式更新操作的类型为 schema-changes。此操作会更新信号参数中列出的所有表的模式。消息不包含模式更新。相反,它包含完整的模式结构。

表 20. 操作参数
名称 描述

database

Oracle 数据库的名称。

schema

应用更改的模式名称。

changes

包含请求的模式更改的数组。

changes.type

模式更改的类型,通常为 ALTER

changes.id

表的完全限定名称。

changes.table

表的完全限定名称。

changes.table.defaultCharsetName

表的字符集名称(如果与数据库默认值不同)。

changes.table.primaryKeyColumnNames

构成主键的列名数组。

changes.table.columns

列元数据数组。

…​columns.name

列的名称。

…​columns.jdbcType

列的 JDBC 类型,如JDBC API 中定义。

…​columns.typeName

列类型的名称。

…​columns.typeExpression

完整的列类型定义。

…​columns.charsetName

列的字符集(如果与默认值不同)。

…​columns.length

列的长度/大小约束。

…​columns.scale

数字列的精度。

…​columns.position

表中列的位置,从 1 开始。

…​columns.optional

布尔值 true,如果列值不是必需的。

…​columns.autoIncremented

布尔值 true,如果列值是从序列自动计算的。

…​columns.generated

布尔值 true,如果列值是自动计算的。

插入 schema-changes 信号后,必须使用修改后的配置重启连接器,该配置包括将 schema.history.internal.skip.unparseable.ddl 选项指定为 true。在连接器的提交 SCN 超过 DDL 更改后,为了防止无法解析的 DDL 语句被意外跳过,请将连接器配置恢复到以前的状态。

表 21. 日志记录记录示例
Column Value (值)

id

924e3ff8-2245-43ca-ba77-2af9af02fa07

type

schema-changes

data

{
   "database":"ORCLPDB1",
   "schema":"DEBEZIUM",
   "changes":[
      {
         "type":"ALTER",
         "id":"\"ORCLPDB1\".\"DEBEZIUM\".\"CUSTOMER\"",
         "table":{
            "defaultCharsetName":null,
            "primaryKeyColumnNames":[
               "ID",
               "NAME"
            ],
            "columns":[
               {
                  "name":"ID",
                  "jdbcType":2,
                  "typeName":"NUMBER",
                  "typeExpression":"NUMBER",
                  "charsetName":null,
                  "length":9,
                  "scale":0,
                  "position":1,
                  "optional":false,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"NAME",
                  "jdbcType":12,
                  "typeName":"VARCHAR2",
                  "typeExpression":"VARCHAR2",
                  "charsetName":null,
                  "length":1000,
                  "position":2,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"SCORE",
                  "jdbcType":2,
                  "typeName":"NUMBER",
                  "typeExpression":"NUMBER",
                  "charsetName":null,
                  "length":6,
                  "scale":2,
                  "position":3,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               },
               {
                  "name":"REGISTERED",
                  "jdbcType":93,
                  "typeName":"TIMESTAMP(6)",
                  "typeExpression":"TIMESTAMP(6)",
                  "charsetName":null,
                  "length":6,
                  "position":4,
                  "optional":true,
                  "autoIncremented":false,
                  "generated":false
               }
            ]
         }
      }
   ]
}

OpenLogReplicator 支持

OpenLogReplicator 摄取适配器目前处于孵化阶段,即确切的语义、配置选项等可能会在未来的版本中根据收到的反馈进行更改。如果您遇到任何问题,请告知我们。

Debezium Oracle 连接器默认使用本机 Oracle LogMiner 摄取更改。但是,可以切换连接器以使用 OpenLogReplicator,这是一个开源且免费的第三方应用程序,它直接从重做和归档日志读取 Oracle 更改,对数据库的影响很小。要配置连接器使用 OpenLogReplicator,您必须应用与 LogMiner 使用的不同的特定数据库和连接器配置。

先决条件
  • 下载并编译适用于您数据库环境的 OpenLogReplicator。

  • OpenLogReplicator 必须安装并具有直接访问归档和重做日志文件的权限。如果归档和重做日志可以通过某个共享文件系统访问,则不一定要求将其安装在物理数据库服务器上。

OpenLogReplicator 的工作原理

当 Debezium Oracle 连接器流式传输更改时,OpenLogReplicator 接管 Oracle LogMiner 和 Oracle XStream 的角色。它负责在重做和归档日志中捕获发生的更改,并将这些更改分批处理为逻辑事务。Debezium Oracle 连接器是一个 OpenLogReplicator 的消费者,它连接到 OpenLogReplicator 提供的网络端点,并摄取这些事务的处理批次。

在 OpenLogReplicator 适配器摄取更改后,Debezium Oracle 连接器会将事件转换为数据更改事件,就像任何其他适配器一样。

从网络拓扑的角度来看,Debezium Oracle 连接器依赖于与 Oracle 数据库和 OpenLogReplicator 的网络连接。类似地,OpenLogReplicator 需要与 Oracle 数据库的网络连接,以及对原始重做和归档日志的直接访问。

准备数据库

Oracle 数据库必须配置为生成特定文件,称为归档日志。归档日志是在线重做日志的已保存副本。数据库必须先置于 ARCHIVELOG 模式才能归档重做日志文件。要将数据库置于 ARCHIVELOG 模式,您必须设置特定的配置属性来指定保存归档日志文件的目标。

以下示例显示了将数据库置于归档日志模式的命令:

示例 5. OpenLogReplicator 所需的配置
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

为了让 Debezium 生成显示表行之前之后状态的更改事件,必须在数据库上启用补充日志记录。补充日志记录将列数据添加到重做日志中,以标识修改表时受影响的行。

您可以使用不同的方法配置补充日志记录。为了支持 Debezium,您至少必须启用最小的数据库级别补充日志记录。最小补充日志记录会写入能够创建更改事件所需的最少量信息。以下示例显示了您可能用于启用最小补充日志记录的命令:

示例 6. 配置数据库级别的补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA

Debezium for Oracle 可以与更高程度的数据库级别补充日志记录一起工作,如果这些日志记录已为其他目的配置。但如果您不需要更高保真度的日志记录来支持其他应用程序,您可以将数据库级别的日志记录减少到最低级别。

对于每个捕获的表,您必须显式配置一个更高保真度的补充日志记录,称为 (ALL) COLUMNS(ALL) COLUMNS 日志记录级别保证 Oracle 在写入重做日志的重做条目时,会捕获列的状态,无论该列是否已更改。启用更高日志记录级别可以使 Debezium for Oracle 生成提供准确的行之前之后状态的更改事件。

以下示例显示了为每个捕获的表启用补充日志记录的命令:

示例 7. 配置捕获表补充日志记录
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

每当向 Debezium Oracle 的连接器配置添加新表时,都必须为每个表配置补充日志记录。如果为捕获配置的表未正确配置补充日志记录,在连接器开始流式传输后,它会返回警告消息。

创建连接器用户

Debezium Oracle 连接器需要设置一个具有特定权限的用户帐户,以便连接器能够捕获更改事件。此用户帐户将由 Debezium 和 OpenLogReplicator 共用。下面的示例展示了在多租户 Oracle 环境中部署 Debezium 和 OpenLogReplicator 时用户帐户配置的一种可能方式。

示例 8. 创建 OpenLogReplicator 用户
# Create Log Miner Tablespace and User
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba <<- EOF
  CREATE TABLESPACE LOGMINER_TBS DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;
EOF

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba <<- EOF
  CREATE TABLESPACE LOGMINER_TBS DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;
EOF

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba <<- EOF
  CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS CONTAINER=ALL;

  -- Debezium specific permissions
  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_\$DATABASE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ANY DICTIONARY TO c##dbzuser CONTAINER=ALL;
  GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  -- These can be reduced from ANY TABLE to your captured tables depending on your security model
  GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;

  -- OpenLogReplicator specific permissions
  ALTER SESSION SET CONTAINER = ORCLPDB1;
  GRANT SELECT, FLASHBACK ON SYS.CCOL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.CDEF$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.COL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.DEFERRED_STG$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.ECOL$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOB$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOBCOMPPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.LOBFRAG$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.OBJ$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TAB$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABCOMPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TABSUBPART$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.TS$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON SYS.USER$ TO c##dbzuser;
  GRANT SELECT, FLASHBACK ON XDB.XDB$TTSET TO c##dbzuser;

  exit;
EOF

配置 OpenLogReplicator 适配器

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 摄取更改事件。您可以修改默认设置,将连接器配置为使用 OpenLogReplicator 适配器替代 LogMiner。

在接下来的示例中,将以下属性添加到连接器配置中,以启用连接器使用 OpenLogReplicator 适配器:

  • database.connection.adapter

  • openlogreplicator.source

  • openlogreplicator.host

  • openlogreplicator.port

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.connection.adapter": "olr",
        "openlogreplicator.source": "ORACLE",
        "openlogreplicator.host": "<ip address or hostname of OpenLogReplicator>",
        "openlogreplicator.port": "<port OpenLogReplicator is listening on>"
    }
}

构建 OpenLogReplicator

OpenLogReplicator 不分发二进制文件,因此您必须从代码构建第三方工具。OpenLogReplicator 用 C++ 编写;但是,作者提供了一个容器镜像,用于构建适用于包括 RHEL、Fedora、CentOS、Debian 等各种操作系统的源代码。有关如何使用容器构建工具的信息,请参阅此 OpenLogReplicator GitHub 仓库

获取 OpenLogReplicator 的 Oracle JDBC 驱动程序

当您将 Debezium Oracle 连接器与 OpenLogReplicator 一起使用时,必须获取 Oracle JDBC 驱动程序以连接到 Oracle 数据库。有关更多信息,请参阅 获取 Oracle JDBC 驱动程序

OpenLogReplicator 配置

OpenLogReplicator 是一个第三方独立进程,负责读取 Oracle 的重做日志和归档日志。与 Debezium Oracle 的其他适配器不同,这意味着有两个可配置的组件,它们必须独立配置。

您可以通过一个名为 scripts/OpenLogReplicator.json 的 JSON 文件来配置 OpenLogReplicator。有关此文件所需格式的更多信息,请参阅 OpenLogReplicator 文档

示例 9. OpenLogReplicator 配置
{
  "version": "1.6.0",
  "source": [{
    "alias": "SOURCE",
    "name": "ORACLE", (1)
    "reader": {
      "type": "online",
      "path-mapping": ["/opt/olr/recovery_area", "/opt/olr/recovery_area/ORCLCDB/archivelog"], (2)
      "user": "c##dbzuser", (3)
      "password": "dbz", (4)
      "server": "//<ip>:<port>/ORCLPDB1" (5)
    },
    "format": { (6)
      "type": "json",
      "column": 2,
      "db": 3,
      "interval-dts": 9,
      "interval-ytm": 4,
      "message": 2,
      "rid": 1,
      "schema": 7,
      "scn-all": 1,
      "timestamp-all": 1
    }
  }],
  "target": [{
    "alias": "DEBEZIUM",
    "source": "SOURCE",
    "writer": {
      "type": "network", (7)
      "uri": "<host>:<port>" (8)
    }
  }]
}
1 这应与 openlogreplicator.source 连接器配置匹配。
2 文件路径对列表 [before1,after1,befor2,after2,…​],其中如果日志文件路径与 beforeX 前缀之一匹配,则前缀将被替换为 afterX 路径。当 OpenLogReplicator 运行在与源数据库不同的主机上,并且重做日志和归档日志的路径在数据库和 OpenLogReplicator 进程之间不同时,这非常有用。
3 这应与 database.user 连接器属性匹配。
4 这应与 database.password 连接器属性匹配。
5 这应指向数据库主机、端口和 Oracle SID。
6 这指定了 Debezium Oracle 连接器摄取的载荷格式。请按指定使用这些值,因为除了默认值之外,这些是我们要求的唯一格式选项。
7 这必须指定 network,因为 Debezium Oracle 连接器通过网络连接与 OpenLogReplicator 通信。
8 这指定了 OpenLogReplicator 监听连接的绑定主机和端口。Debezium Oracle 连接器应可访问此地址。

OpenLogReplicator 连接器属性

使用 OpenLogReplicator 时,需要以下配置属性。

属性

Default (默认值)

描述

无默认值

OpenLogReplicator JSON 配置中已配置的 source.name 元素的逻辑名称。

无默认值

OpenLogReplicator 网络服务的主机名或 IP 地址。

无默认值

OpenLogReplicator 网络服务使用的端口号。

OpenLogReplicator ROWID 支持

OpenLogReplicator 不在其更改事件载荷中提供 ROWID 伪列的详细信息。因此,在使用 OpenLogReplicator 适配器的环境中,Debezium Oracle 连接器在源信息块中始终显示空的 row_id 值。

OpenLogReplicator XML 支持

要使用 Debezium Oracle 连接器和 OpenLogReplicator 捕获 XML 列,OpenLogReplicator 必须是 1.5.0 或更高版本。

XStream 支持

Debezium Oracle 连接器默认使用 Oracle LogMiner 摄取更改。可以切换连接器以使用 Oracle XStream。要将连接器配置为使用 Oracle XStream,您必须应用与 LogMiner 使用的配置不同的特定数据库和连接器配置。

先决条件
  • 要使用 XStream API,您必须拥有 GoldenGate 产品的许可证。无需安装 GoldenGate。

准备数据库

Oracle XStream 所需的配置
ORACLE_SID=ORCLCDB dbz_oracle sqlplus /nolog

CONNECT sys/top_secret AS SYSDBA
alter system set db_recovery_file_dest_size = 5G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
alter system set enable_goldengate_replication=true;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list

exit;

此外,必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的*之前*状态。以下示例演示了如何在特定表上配置此项,这是最小化 Oracle 重做日志中捕获信息量的理想选择。

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

为连接器创建 XStream 用户

Debezium Oracle 连接器要求设置具有特定权限的用户帐户,以便连接器能够捕获更改事件。以下示例提供了在多租户数据库模型中创建用户配置的信息。

创建 XStream 管理员用户

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_adm_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_adm_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE USER c##dbzadmin IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_adm_tbs
    QUOTA UNLIMITED ON xstream_adm_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION, SET CONTAINER TO c##dbzadmin CONTAINER=ALL;

  BEGIN
     DBMS_XSTREAM_AUTH.GRANT_ADMIN_PRIVILEGE(
        grantee                 => 'c##dbzadmin',
        privilege_type          => 'CAPTURE',
        grant_select_privileges => TRUE,
        container               => 'ALL'
     );
  END;
  /

  exit;

创建连接器的 XStream 用户

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLPDB1 as sysdba
  CREATE TABLESPACE xstream_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/xstream_tbs.dbf'
    SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  exit;

sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
  CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE xstream_tbs
    QUOTA UNLIMITED ON xstream_tbs
    CONTAINER=ALL;

  GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  exit;

创建 XStream 出站服务器

创建 XStream 出站服务器。(如果具有正确的权限,连接器将来可能会自动完成此操作,请参阅 DBZ-721

创建 XStream 出站服务器
sqlplus c##dbzadmin/dbz@//:1521/ORCLCDB
DECLARE
  tables  DBMS_UTILITY.UNCL_ARRAY;
  schemas DBMS_UTILITY.UNCL_ARRAY;
BEGIN
    tables(1)  := NULL;
    schemas(1) := 'debezium';
  DBMS_XSTREAM_ADM.CREATE_OUTBOUND(
    server_name     =>  'dbzxout',
    table_names     =>  tables,
    schema_names    =>  schemas);
END;
/
exit;

当您设置 XStream 出站服务器以捕获可插入数据库的更改时,请将可插入数据库名称指定为 source_container_name 参数的值。

配置 XStream 用户帐户以连接到 XStream 出站服务器
sqlplus sys/top_secret@//:1521/ORCLCDB as sysdba
BEGIN
  DBMS_XSTREAM_ADM.ALTER_OUTBOUND(
    server_name  => 'dbzxout',
    connect_user => 'c##dbzuser');
END;
/
exit;

一个 XStream 出站服务器不能被多个 Debezium Oracle 连接器共享。每个连接器都需要配置一个唯一的 XStream 出站连接器。

配置 XStream 适配器

默认情况下,Debezium 使用 Oracle LogMiner 从 Oracle 摄取更改事件。您可以调整连接器配置,以启用连接器使用 Oracle XStream 适配器。

以下配置示例将属性 database.connection.adapterdatabase.out.server.name 添加到配置中,以启用连接器使用 XStream API 实现。

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "database.connection.adapter": "xstream",
        "database.out.server.name" : "dbzxout"
    }
}

获取 Oracle JDBC 驱动程序和 XStream API 文件

Debezium Oracle 连接器需要 Oracle JDBC 驱动程序 (ojdbc11.jar) 来连接到 Oracle 数据库。如果连接器使用 XStream 访问数据库,您还必须拥有 XStream API (xstreams.jar)。许可要求禁止 Debezium 将这些文件包含在 Oracle 连接器存档中。但是,所需文件可作为 Oracle Instant Client 的一部分免费下载。以下步骤描述了如何下载 Oracle Instant Client 并提取所需文件。

过程
  1. 从浏览器下载适用于您操作系统的 Oracle Instant Client 包

  2. 解压缩存档,然后打开 instantclient_<version> 目录。

    For example (例如:)

    instantclient_21_1/
    ├── adrci
    ├── BASIC_LITE_LICENSE
    ├── BASIC_LITE_README
    ├── genezi
    ├── libclntshcore.so -> libclntshcore.so.21.1
    ├── libclntshcore.so.12.1 -> libclntshcore.so.21.1
    
    ...
    
    ├── ojdbc11.jar
    ├── ucp.jar
    ├── uidrvci
    └── xstreams.jar
  3. 复制 ojdbc11.jarxstreams.jar 文件,并将它们添加到 <kafka_home>/libs 目录中,例如 kafka/libs

  4. 创建一个环境变量 LD_LIBRARY_PATH,并将其值设置为 Instant Client 目录的路径,例如:

    LD_LIBRARY_PATH=/path/to/instant_client/

XStream 连接器属性

使用 XStream 时,以下配置属性是*必需*的,除非有默认值可用。

属性

Default (默认值)

描述

无默认值

在数据库中配置的 XStream 出站服务器的名称。

XStream 和 DBMS_LOB

Oracle 提供了一个名为 DBMS_LOB 的数据库包,它包含一组用于操作 BLOB、CLOB 和 NCLOB 列的程序。其中大部分程序会整体操作 LOB 列,但有一个程序 WRITEAPPEND 能够操作 LOB 数据缓冲区的一部分。

使用 XStream 时,WRITEAPPEND 会为该程序的每次调用发出一个逻辑更改记录 (LCR) 事件。这些 LCR 事件不会像使用 Oracle LogMiner 适配器时那样合并为单个更改事件。因此,主题的消费者可能会收到带有部分列值的事件。这种行为差异已在 DBZ-4741 中记录,并将在将来的版本中解决。

常见问题解答

Oracle 11g 是否受支持?

Oracle 11g 不受支持;但是,我们旨在尽最大努力向后兼容 Oracle 11g。我们依赖社区来沟通与 Oracle 11g 的兼容性问题,并在出现回归时提供错误修复。

Oracle LogMiner 不是已弃用吗?

不,Oracle 只在 Oracle 12c 中弃用了 Oracle LogMiner 的连续挖掘选项,并在 Oracle 19c 中删除了该选项。Debezium Oracle 连接器不依赖此选项即可运行,因此可以安全地与较新版本的 Oracle 一起使用,没有任何影响。

如何更改偏移量中的位置?

Debezium Oracle 连接器在偏移量中维护两个关键值,一个名为 scn 的字段和另一个名为 commit_scn 的字段。scn 字段是一个字符串,表示连接器在捕获更改时使用的低水位线起始位置。

  1. 查找包含连接器偏移量的主题名称。这根据 offset.storage.topic 配置属性的值进行配置。

  2. 找出连接器的最后一个偏移量、存储它的键,并确定用于存储偏移量的分区。这可以使用 Kafka 代理安装提供的 kafkacat 实用脚本来完成。示例可能如下所示:

    kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
    Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}

    inventory-connector 的键是 ["inventory-connector",{"server":"server1"}],分区是 11,最后一个偏移量是紧随键之后的内容。

  3. 要回溯到之前的偏移量,应停止连接器并执行以下命令

    echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \
    kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11

    这会将给定键和偏移量值写入 my_connect_offsets 主题的 11 分区。在此示例中,我们将连接器恢复到 SCN 3245675000 而不是 324567897

如果连接器找不到具有给定偏移量 SCN 的日志会怎样?

Debezium 连接器在连接器偏移量中维护低高水位线 SCN 值。低水位线 SCN 表示起始位置,必须存在于可用的在线重做日志或归档日志中,连接器才能成功启动。当连接器报告找不到此偏移量 SCN 时,这表明仍可用的日志不包含该 SCN,因此连接器无法从其停止的地方挖掘更改。

发生这种情况时,有两种选择。第一种是删除连接器的历史主题和偏移量,然后重新启动连接器,就像建议的那样执行新的快照。这将保证任何主题使用者都不会丢失数据。第二种是手动修改偏移量,将 SCN 高级到一个在重做日志或归档日志中可用的位置。这会导致旧 SCN 值和新提供的 SCN 值之间的更改丢失,并且不会写入主题。不建议这样做。

各种挖掘策略之间有什么区别?

Debezium Oracle 连接器为 log.mining.strategy 提供了三个选项。

默认值是 online_catalog,它指示连接器不将数据字典写入重做日志。相反,Oracle LogMiner 将始终使用包含表结构的当前状态的在线数据字典。这也意味着,如果表结构发生更改并且不再与在线数据字典匹配,Oracle LogMiner 将无法解析表或列名。如果正在捕获的表经常发生架构更改,则不应使用此挖掘策略选项。重要的是所有数据更改都与架构更改同步,以便从表的日志中捕获了所有更改,然后停止连接器,应用架构更改,然后重新启动连接器并恢复表的*.**数据更改。此选项需要的 Oracle 数据库内存更少,并且 Oracle LogMiner 会话通常启动得更快,因为数据字典不需要由 LogMiner 进程加载或预加载。

第二个选项 redo_log_catalog 会在每次检测到日志切换时将 Oracle 数据字典写入重做日志。此数据字典对于 Oracle LogMiner 在解析重做日志和归档日志时有效跟踪架构更改至关重要。此选项会生成比平时更多的归档日志,但允许在不影响数据更改捕获的情况下实时操纵正在捕获的表。此选项通常需要更多的 Oracle 数据库内存,并且会导致 Oracle LogMiner 会话和进程在每次日志切换后启动的时间稍长。

最后一个选项 hybrid 结合了上述两个策略的优点,同时避免了它们的缺点。此策略利用了 online_catalog 的性能,同时又具备 redo_log_catalog 的架构跟踪弹性,同时还避免了开销和性能成本以及高于正常水平的归档日志生成。此模式使用回退模式,即如果 LogMiner 无法为数据库更改重建 SQL,Debezium 连接器将依赖连接器维护的内存中架构模型来实时重建 SQL。意图是此模式最终将过渡到未来的默认的、很可能唯一的运行模式。

混合挖掘策略与 LogMiner 是否存在任何限制?

是的,log.mining.strategy 的混合模式仍处于开发阶段,因此尚不支持所有数据类型。目前,此模式无法重建涉及对 CLOBNCLOBBLOBXMLJSON 数据类型的操作的 SQL 语句。简而言之,如果您启用了 lob.enabled 并将其值设置为 true,则将无法使用混合策略,并且连接器将无法启动,因为此组合不受支持。

为什么连接器在 AWS 上似乎停止捕获更改?

由于 AWS 网关负载均衡器的固定空闲超时时间为 350 秒,需要超过 350 秒才能完成的 JDBC 调用可能会无限期挂起。

在处理大量数据的 LogMiner 会话与 Oracle 的定期检查点任务并发运行时,调用 Oracle LogMiner API 需要超过 350 秒才能完成,可能会触发超时,导致 AWS 网关负载均衡器挂起。

为防止 AWS 网关负载均衡器发生超时,请在托管连接器的环境中,以 root 用户或超级用户的身份启用从 kafka Connect 或 Debezium Server 环境发出的保持连接数据包,方法是执行以下任务:

  1. 从终端运行以下命令:

    sysctl -w net.ipv4.tcp_keepalive_time=60
  2. 编辑 /etc/sysctl.conf 并将以下变量的值设置为如下所示:

    net.ipv4.tcp_keepalive_time=60
  3. 将 Debezium for Oracle 连接器重新配置为使用 database.url 属性而不是 database.hostname,并添加 (ENABLE=broken) Oracle 连接字符串描述符,如下面的示例所示:

    database.url=jdbc:oracle:thin:username/password!@(DESCRIPTION=(ENABLE=broken)(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(Host=hostname)(Port=port)))(CONNECT_DATA=(SERVICE_NAME=serviceName)))

前面的步骤将 TCP 网络堆栈配置为每 60 秒发送一次保持连接数据包。因此,当 JDBC 调用 LogMiner API 超过 350 秒才能完成时,AWS 网关负载均衡器不会超时,从而使连接器能够继续从数据库的事务日志中读取更改。

ORA-01555 的原因是什么?如何处理?

Debezium Oracle 连接器在初始快照阶段执行时会使用闪回查询。闪回查询是一种特殊类型的查询,它依赖于闪回区域(由数据库的 UNDO_RETENTION 数据库参数维护)来返回查询结果,基于在给定时间点(在本例中为给定 SCN)的表内容。默认情况下,Oracle 通常只维护大约 15 分钟的撤销或闪回区域,除非您的数据库管理员增加了或减少了此设置。对于捕获大表的配置,可能需要超过 15 分钟或您配置的 UNDO_RETENTION 时间来执行初始快照,这最终会导致此异常。

ORA-01555: snapshot too old: rollback segment number 12345 with name "_SYSSMU11_1234567890$" too small

处理此异常的第一种方法是与您的数据库管理员合作,看看他们是否可以暂时增加 UNDO_RETENTION 数据库参数。这不需要重新启动 Oracle 数据库,因此可以联机进行,而不会影响数据库的可用性。但是,更改此设置仍可能导致上述异常或“快照太旧”异常,如果表空间没有足够的空间来存储必要的撤销数据。

处理此异常的第二种方法是不完全依赖初始快照,将 snapshot.mode 设置为 no_data,然后依赖增量快照。增量快照不依赖于闪回查询,因此不受 ORA-01555 异常的影响。

ORA-04036 的原因是什么?如何处理?

当数据库更改不频繁发生时,Debezium Oracle 连接器可能会报告 ORA-04036 异常。Oracle LogMiner 会话会启动并重用,直到检测到日志切换。该会话会重用,因为它提供了 Oracle LogMiner 的最佳性能利用率,但如果发生长时间运行的挖掘会话,这可能导致 PGA 内存使用过高,最终导致此类异常。

ORA-04036: PGA memory used by the instance exceeds PGA_AGGREGATE_LIMIT

可以通过指定 Oracle 重做日志切换的频率或 Debezium Oracle 连接器允许重用挖掘会话的时间来避免此异常。Debezium Oracle 连接器提供了一个配置选项 log.mining.session.max.ms,该选项控制当前 Oracle LogMiner 会话可以重用多长时间,然后再关闭并启动新会话。这允许在不超过数据库允许的 PGA 内存的情况下保持数据库资源的检查。

ORA-01882 的原因是什么?如何处理?

当连接到 Oracle 数据库时,Debezium Oracle 连接器可能会报告以下异常:

ORA-01882: timezone region not found

当 JDBC 驱动程序无法正确解析时区信息时会发生这种情况。为了解决此驱动程序相关问题,需要告知驱动程序不要使用区域来解析时区详细信息。可以通过指定驱动程序传递属性 driver.oracle.jdbc.timezoneAsRegion=false 来完成。

ORA-25191 的原因是什么?如何处理?

Debezium Oracle 连接器会自动忽略索引组织表 (IOT),因为 Oracle LogMiner 不支持它们。但是,如果抛出 ORA-25191 异常,这可能是由于此类映射的一个独特边界情况,并且可能需要其他规则来自动排除它们。ORA-25191 异常的示例可能如下所示:

ORA-25191: cannot reference overflow table of an index-organized table

如果抛出 ORA-25191 异常,请提交一个 Jira 问题,提供有关表及其映射、与其他父表的关系等详细信息。作为一种解决方法,可以调整 include/exclude 配置选项以防止连接器访问此类表。

如何解决 SAX feature external-general-entities not supported 问题

Debezium 2.4 引入了对 Oracle XMLTYPE 列类型的支持。要使用此功能,需要 Oracle xdbxmlparserv2 依赖项。

Oracle 的 xmlparserv2 依赖项实现了基于 SAX 的解析器,如果运行时找到并使用此实现而不是类路径上的其他实现,则会出现此错误。为了专门影响使用的是哪个 SAX 实现,通常需要使用特定参数启动 JVM。

提供以下 JVM 参数后,Oracle 连接器将成功启动而不会出现此错误。

-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl
ORA-00600 的原因是什么?如何处理?

ORA-00600 是一个通用的内部错误。它表示相关进程遇到了低级别的意外情况,这通常意味着您遇到了一个 bug。有关更多信息,请参阅此 Oracle 博客

ORA-00600 带有多个参数,其中第一个参数最重要。根据第一个参数,有一个假定要应用的解决方案:

ORA-00600: internal error code, arguments: [flg0], ...

避免此问题的解决方案是在数据库级别启用补充日志记录。

ORA-00600: internal error code, arguments: [kdli_iread], ...

通过将 Oracle 数据库升级到 19.25 并应用补丁 32209850,可以避免此异常。

Debezium 2.7 在 decimal.handling.mode 方面引入了破坏性更改,是否有办法使用旧行为?

是的,通过将连接器配置属性 legacy.decimal.handling.strategy 设置为 true 可以启用旧行为。通过将其设置为 true,连接器将不再将所有零标度数字视为 VariableScaleDecimal 类型,而是根据列的大小继续将它们作为 INT8、INT16、INT32 和 INT64 发出。