Debezium SQL Server 连接器
|
想帮助我们进一步完善和改进它吗?了解如何操作。 |
- 概述
- SQL Server 连接器的工作原理
- Snapshots (快照)
- Ad hoc snapshots (即席快照)
- Incremental snapshots (增量快照)
- Custom snapshotter SPI (自定义快照器 SPI)
- Blocking snapshots (阻塞快照)
- 读取更改数据表
- 数据库中未记录最大 LSN
- 局限性
- Topic names (主题名称)
- Schema history topic (模式历史主题)
- Schema change topic (模式变更主题)
- Data change events (数据变更事件)
- Transaction metadata (事务元数据)
- Data type mappings (数据类型映射)
- 设置 SQL Server
- Deployment (部署)
- 数据库模式演进
- 通知
- 监控
Debezium SQL Server 连接器捕获 SQL Server 数据库模式中发生的行级更改。
有关与此连接器兼容的 SQL Server 版本的信息,请参阅Debezium 版本概览。
Debezium SQL Server 连接器首次连接到 SQL Server 数据库或群集时,它会创建数据库模式的一致快照。在初始快照完成后,连接器会持续捕获已启用 CDC 的 SQL Server 数据库中已提交的 INSERT、UPDATE 或 DELETE 操作的行级更改。连接器为每个数据更改操作生成事件,并将它们流式传输到 Kafka 主题。连接器将表的所有事件流式传输到专用的 Kafka 主题。应用程序和服务随后可以从该主题中消费数据更改事件记录。
概览
Debezium SQL Server 连接器基于 SQL Server 2016 Service Pack 1 (SP1) 及更高版本标准版或企业版中提供的更改数据捕获功能。SQL Server 捕获进程监控指定的数据库和表,并将更改存储到专门创建的更改表中,这些表具有存储过程接口。
要使 Debezium SQL Server 连接器能够捕获数据库操作的更改事件记录,您必须首先在 SQL Server 数据库上启用更改数据捕获。CDC 必须同时在数据库和要捕获的每个表上启用。在源数据库上设置 CDC 后,连接器可以捕获数据库中发生的行级 INSERT、UPDATE 和 DELETE 操作。连接器将每个源表的事件记录写入一个专为此表设计的 Kafka 主题。每个捕获的表都有一个主题。客户端应用程序读取它们跟踪的数据库表的 Kafka 主题,并可以响应从这些主题中消费的行级事件。
连接器首次连接到 SQL Server 数据库或群集时,它会创建其配置为捕获更改的所有表的模式的一致快照,并将此状态流式传输到 Kafka。快照完成后,连接器将持续捕获后续发生的行级更改。通过首先建立所有数据的完整视图,连接器可以继续读取而不会丢失在快照过程中发生的任何更改。
Debezium SQL Server 连接器具有故障容忍性。当连接器读取更改并生成事件时,它会定期记录数据库日志中的事件位置(LSN / Log Sequence Number)。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在重新启动后,连接器将从上次读取的位置继续读取 SQL Server CDC 表。
| 偏移量会定期提交。它们不会在更改事件发生时提交。因此,在发生故障后,可能会生成重复的事件。 |
容错也适用于快照。也就是说,如果连接器在快照过程中停止,连接器会在重新启动时开始新的快照。
SQL Server 连接器的工作原理
为了优化配置和运行 Debezium SQL Server 连接器,了解连接器如何执行快照、流式传输更改事件、确定 Kafka 主题名称以及使用元数据将非常有帮助。
快照
SQL Server CDC 并非设计用于存储数据库更改的完整历史记录。为了让 Debezium SQL Server 连接器建立数据库当前状态的基线,它使用一个称为快照的过程。初始快照捕获数据库中表的结构和数据。
Debezium SQL Server 连接器执行初始快照的默认工作流
以下工作流列出了 Debezium 创建快照的步骤。这些步骤描述了当 snapshot.mode 配置属性设置为默认值 initial 时的快照过程。您可以通过更改 snapshot.mode 属性的值来自定义连接器创建快照的方式。如果配置了不同的快照模式,连接器将使用此工作流的修改版本来完成快照。
-
Establish a connection to the database. (建立数据库连接。)
-
确定要捕获的表。默认情况下,连接器捕获所有非系统表。要让连接器捕获表的子集或表元素,您可以设置一系列
include和exclude属性来过滤数据,例如,table.include.list或table.exclude.list。 -
在 CDC 启用的 SQL Server 表上获取锁,以防止在创建快照期间发生结构更改。锁的级别由
snapshot.isolation.mode配置属性确定。 -
读取服务器事务日志中的最大日志序列号 (LSN) 位置。
-
捕获所有非系统表或所有指定要捕获的表的结构。连接器将此信息持久化到其内部数据库模式历史记录主题中。模式历史记录提供了在发生更改事件时生效的结构信息。
默认情况下,连接器捕获处于捕获模式的数据库中每个表的模式,包括未配置为捕获的表。如果表未配置为捕获,初始快照仅捕获其结构;不捕获任何表数据。有关为什么快照会为未包含在初始快照中的表持久化模式信息,请参阅 理解为什么初始快照会捕获所有表的模式历史记录。
-
如果需要,释放步骤 3 中获取的锁。其他数据库客户端现在可以写入任何先前锁定的表。
-
在步骤 4 读取的 LSN 位置,连接器会扫描要捕获的表。在扫描过程中,连接器将完成以下任务
-
Confirms that the table was created before the snapshot began. If the table was created after the snapshot began, the connector skips the table. After the snapshot is complete, and the connector transitions to streaming, it emits change events for any tables that were created after the snapshot began. (确认表在快照开始之前已创建。如果表在快照开始之后创建,连接器将跳过该表。快照完成后,连接器过渡到流式传输,它会为快照开始后创建的任何表发出变更事件。)
-
为从表中捕获的每一行生成一个
read事件。所有read事件都具有相同的 LSN 位置,该位置是步骤 4 中获取的 LSN 位置。 -
将每个
read事件发出到表的 Kafka 主题。
-
-
在连接器偏移量中记录快照的成功完成。
生成的初始快照捕获已启用 CDC 的表中每行的当前状态。从此基线状态开始,连接器将捕获后续发生的更改。
After the snapshot process begins, if the process is interrupted due to connector failure, rebalancing, or other reasons, the process restarts after the connector restarts. (在快照过程开始后,如果由于连接器故障、重新平衡或其他原因导致过程中断,该过程将在连接器重新启动后重新启动。)
连接器完成后期 snapshot,它将从步骤 4 中读取的位置继续流式传输,以确保不遗漏任何更新。
If the connector stops again for any reason, after it restarts, it resumes streaming changes from where it previously left off. (如果连接器因任何原因再次停止,则在重新启动后,它将从之前停止的地方继续流式传输更改。)
| Setting (设置) | 描述 |
|---|---|
|
在每次连接器启动时执行快照。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。 |
|
连接器按 执行初始快照的默认工作流中所述执行数据库快照。快照完成后,连接器将开始流式传输后续数据库更改的事件记录。 |
|
连接器执行数据库快照,并在流式传输任何更改事件记录之前停止,不允许捕获任何后续更改事件。 |
|
Deprecated, see |
|
连接器捕获所有相关表的结构,执行 默认快照工作流中描述的所有步骤,但它不会创建 |
|
Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. (将此选项设置为恢复已丢失或损坏的数据库模式历史主题。重新启动后,连接器将运行一个快照,从源表重建该主题。您还可以将此属性设置为定期修剪经历意外增长的数据库模式历史主题。) + 警告:如果模式更改在上次连接器关闭后已提交到数据库,请勿使用此模式执行快照。 |
|
After the connector starts, it performs a snapshot only if it detects one of the following circumstances (连接器启动后,仅当检测到以下任一情况时,它才会执行快照:)
|
|
Set the snapshot mode to |
|
|
有关更多信息,请参阅连接器配置属性表中的 snapshot.mode。
了解为什么初始快照会捕获所有表的模式历史记录
The initial snapshot that a connector runs captures two types of information (连接器运行的初始快照捕获两种类型的信息:)
- Table data (表数据)
-
有关连接器
table.include.list属性中命名的表中INSERT、UPDATE和DELETE操作的信息。 - Schema data (模式数据)
-
DDL statements that describe the structural changes that are applied to tables. Schema data is persisted to both the internal schema history topic, and to the connector’s schema change topic, if one is configured. (描述应用于表的结构化更改的 DDL 语句。模式数据将持久化到内部模式历史主题,以及连接器的模式变更主题(如果已配置)。)
After you run an initial snapshot, you might notice that the snapshot captures schema information for tables that are not designated for capture. By default, initial snapshots are designed to capture schema information for every table that is present in the database, not only from tables that are designated for capture. Connectors require that the table’s schema is present in the schema history topic before they can capture a table. By enabling the initial snapshot to capture schema data for tables that are not part of the original capture set, Debezium prepares the connector to readily capture event data from these tables should that later become necessary. If the initial snapshot does not capture a table’s schema, you must add the schema to the history topic before the connector can capture data from the table. (运行初始快照后,您可能会注意到快照捕获了未指定捕获的表的模式信息。默认情况下,初始快照旨在捕获数据库中存在的所有表的模式信息,而不仅仅是指定捕获的表。连接器要求表的模式存在于模式历史主题中,然后才能捕获表。通过允许初始快照捕获非原始捕获集一部分的表的模式数据,Debezium 会为连接器做好准备,以便在将来需要时能够轻松地从这些表中捕获事件数据。如果初始快照未捕获表的模式,您必须在连接器能够从表中捕获数据之前将模式添加到历史主题。)
In some cases, you might want to limit schema capture in the initial snapshot. This can be useful when you want to reduce the time required to complete a snapshot. Or when Debezium connects to the database instance through a user account that has access to multiple logical databases, but you want the connector to capture changes only from tables in a specific logic database. (在某些情况下,您可能希望在初始快照中限制模式捕获。当您想减少完成快照所需的时间时,这可能很有用。或者,当 Debezium 通过具有访问多个逻辑数据库的用户帐户连接到数据库实例时,而您只想让连接器捕获特定逻辑数据库中表的更改。)
-
设置
schema.history.internal.store.only.captured.tables.ddl属性以指定要从中捕获模式信息的表。 -
设置
schema.history.internal.store.only.captured.databases.ddl属性以指定要从中捕获模式更改的逻辑数据库。
捕获初始快照未捕获的表的数据(无模式更改)
In some cases, you might want the connector to capture data from a table whose schema was not captured by the initial snapshot. Depending on the connector configuration, the initial snapshot might capture the table schema only for specific tables in the database. If the table schema is not present in the history topic, the connector fails to capture the table, and reports a missing schema error. (在某些情况下,您可能希望连接器捕获模式未被初始快照捕获的表中的数据。根据连接器配置,初始快照可能仅捕获数据库中特定表的模式。如果表模式不存在于历史主题中,连接器将无法捕获该表,并报告模式丢失错误。)
You might still be able to capture data from the table, but you must perform additional steps to add the table schema. (您仍然可以从该表中捕获数据,但必须执行额外步骤来添加表模式。)
-
You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)
-
在连接器读取的最早和最新的更改表条目 LSN 之间,未对该表应用模式更改。有关捕获新表的(已进行结构更改的)数据的信息,请参阅捕获初始快照未捕获的表的数据(有模式更改)。
-
Stop the connector. (停止连接器。)
-
删除由
schema.history.internal.kafka.topic属性指定的内部数据库模式历史记录主题。 -
Clear the offsets in the configured Kafka Connect
offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connectoffset.storage.topic中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)
-
Apply the following changes to the connector configuration (对连接器配置应用以下更改:)
-
(可选)将
schema.history.internal.store.only.captured.tables.ddl的值设置为false。此设置使连接器捕获所有表的模式,并保证将来连接器可以重建所有表的模式历史记录。捕获所有表模式的 snapshot 需要更长的时间来完成。
-
将要捕获的表添加到
table.include.list。 -
将
snapshot.mode设置为以下值之一initial (初始)-
当您重新启动连接器时,它将对数据库进行完整 snapshot,捕获表数据和表结构。
如果选择此选项,请考虑将schema.history.internal.store.only.captured.tables.ddl属性的值设置为false,以使连接器能够捕获所有表的模式。 schema_only (仅模式)-
当您重新启动连接器时,它将执行一个仅捕获表模式的 snapshot。与完整数据 snapshot 不同,此选项不捕获任何表数据。如果您希望连接器比完整 snapshot 更快地重新启动,可以使用此选项。
-
-
重新启动连接器。连接器将完成由
snapshot.mode指定的 snapshot 类型。 -
(可选)如果连接器执行了
schema_only快照,则在快照完成后,启动增量快照以捕获已添加表的中的数据。连接器在继续流式传输实时更改的同时运行快照。运行增量快照会捕获以下数据更改-
对于连接器之前已捕获的表,增量 snapshot 会捕获在连接器停机期间发生的更改,即在连接器停止和当前重新启动之间的时间间隔。
-
对于新添加的表,增量 snapshot 会捕获所有现有的表行。
-
捕获初始快照未捕获的表的数据(有模式更改)
If a schema change is applied to a table, records that are committed before the schema change have different structures than those that were committed after the change. When Debezium captures data from a table, it reads the schema history to ensure that it applies the correct schema to each event. If the schema is not present in the schema history topic, the connector is unable to capture the table, and an error results. (如果模式更改应用于某个表,则在模式更改之前提交的记录与更改之后提交的记录具有不同的结构。当 Debezium 从表中捕获数据时,它会读取模式历史记录以确保它将正确的模式应用于每个事件。如果模式不存在于模式历史主题中,连接器将无法捕获该表,并导致错误。)
If you want to capture data from a table that was not captured by the initial snapshot, and the schema of the table was modified, you must add the schema to the history topic, if it is not already available. You can add the schema by running a new schema snapshot, or by running an initial snapshot for the table. (如果您想从初始快照未捕获的表中捕获数据,并且该表的模式已修改,则必须将模式添加到历史主题(如果尚未提供)。您可以通过运行新的模式快照或为该表运行初始快照来添加模式。)
-
You want to capture data from a table with a schema that the connector did not capture during the initial snapshot. (您想从一个连接器在初始快照期间未捕获其模式的表中捕获数据。)
-
A schema change was applied to the table so that the records to be captured do not have a uniform structure. (已对表应用了模式更改,以便要捕获的记录没有统一的结构。)
- Initial snapshot captured the schema for all tables (
store.only.captured.tables.ddlwas set tofalse) (初始快照捕获了所有表的模式(store.only.captured.tables.ddl设置为false)) -
-
编辑
table.include.list属性以指定要捕获的表。 -
Restart the connector. (重新启动连接器。)
-
如果要在首次快照中捕获新添加的表中的现有数据,请启动增量快照。
-
- Initial snapshot did not capture the schema for all tables (
store.only.captured.tables.ddlwas set totrue) (初始快照未捕获所有表的模式(store.only.captured.tables.ddl设置为true)) -
If the initial snapshot did not save the schema of the table that you want to capture, complete one of the following procedures (如果初始快照未保存您要捕获的表的模式,请完成以下任一程序:)
- Procedure 1: Schema snapshot, followed by incremental snapshot (过程 1:模式快照,然后是增量快照)
-
In this procedure, the connector first performs a schema snapshot. You can then initiate an incremental snapshot to enable the connector to synchronize data. (在此过程中,连接器首先执行模式快照。然后,您可以启动增量快照以使连接器能够同步数据。)
-
Stop the connector. (停止连接器。)
-
删除由
schema.history.internal.kafka.topic属性指定的内部数据库模式历史记录主题。 -
Clear the offsets in the configured Kafka Connect
offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connectoffset.storage.topic中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)
-
Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)
-
将
snapshot.mode属性的值设置为schema_only。 -
编辑
table.include.list以添加要捕获的表。
-
-
Restart the connector. (重新启动连接器。)
-
Wait for Debezium to capture the schema of the new and existing tables. Data changes that occurred any tables after the connector stopped are not captured. (等待 Debezium 捕获新表和现有表的模式。连接器停止后在任何表上发生的数据更改都不会被捕获。)
-
为确保不丢失任何数据,请启动增量快照。
-
- Procedure 2: Initial snapshot, followed by optional incremental snapshot (过程 2:初始快照,然后是可选的增量快照)
-
In this procedure the connector performs a full initial snapshot of the database. As with any initial snapshot, in a database with many large tables, running an initial snapshot can be a time-consuming operation. After the snapshot completes, you can optionally trigger an incremental snapshot to capture any changes that occur while the connector is off-line. (在此过程中,连接器将执行数据库的完整初始快照。与任何初始快照一样,在具有许多大表的数据库中,运行初始快照可能是一项耗时的操作。快照完成后,您可以选择触发增量快照以捕获连接器离线期间发生的任何更改。)
-
Stop the connector. (停止连接器。)
-
删除由
schema.history.internal.kafka.topic属性指定的内部数据库模式历史记录主题。 -
Clear the offsets in the configured Kafka Connect
offset.storage.topic. For more information about how to remove offsets, see the Debezium community FAQ. (清除配置的 Kafka Connectoffset.storage.topic中的偏移量。有关如何删除偏移量的更多信息,请参阅Debezium 社区 FAQ。)Removing offsets should be performed only by advanced users who have experience in manipulating internal Kafka Connect data. This operation is potentially destructive, and should be performed only as a last resort. (仅应由有经验处理内部 Kafka Connect 数据的用户执行移除偏移量的操作。此操作可能具有破坏性,应仅作为最后的手段执行。)
-
编辑
table.include.list以添加要捕获的表。 -
Set values for properties in the connector configuration as described in the following steps (按以下步骤为连接器配置中的属性设置值:)
-
将
snapshot.mode属性的值设置为initial。 -
(可选)将
schema.history.internal.store.only.captured.tables.ddl设置为false。
-
-
Restart the connector. The connector takes a full database snapshot. After the snapshot completes, the connector transitions to streaming. (重新启动连接器。连接器将进行完整的数据库快照。快照完成后,连接器将过渡到流式传输。)
-
(可选)要捕获连接器离线期间发生的任何数据更改,请启动增量快照。
-
临时快照
By default, a connector runs an initial snapshot operation only after it starts for the first time. Following this initial snapshot, under normal circumstances, the connector does not repeat the snapshot process. Any future change event data that the connector captures comes in through the streaming process only. (默认情况下,连接器仅在首次启动后运行初始快照操作。在初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来变更事件数据仅通过流式传输过程进入。)
However, in some situations the data that the connector obtained during the initial snapshot might become stale, lost, or incomplete. To provide a mechanism for recapturing table data, Debezium includes an option to perform ad hoc snapshots. You might want to perform an ad hoc snapshot after any of the following changes occur in your Debezium environment (但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获表数据的机制,Debezium 提供了一个执行即席快照的选项。您可能希望在 Debezium 环境中发生以下任何更改后执行即席快照:)
-
The connector configuration is modified to capture a different set of tables. (修改了连接器配置以捕获不同的表集。)
-
Kafka topics are deleted and must be rebuilt. (Kafka 主题被删除且必须重建。)
-
Data corruption occurs due to a configuration error or some other problem. (由于配置错误或其他问题导致数据损坏。)
You can re-run a snapshot for a table for which you previously captured a snapshot by initiating a so-called ad hoc snapshot. Ad hoc snapshots require the use of signaling tables. You initiate an ad hoc snapshot by sending a signal request to the Debezium signaling table. (您可以通过启动所谓的即席快照来重新运行之前捕获了快照的表的快照。即席快照需要使用信号表。通过向 Debezium 信号表发送信号请求来启动即席快照。)
When you initiate an ad hoc snapshot of an existing table, the connector appends content to the topic that already exists for the table. If a previously existing topic was removed, Debezium can create a topic automatically if automatic topic creation is enabled. (当您启动现有表的即席快照时,连接器会将内容追加到该表已存在的主题中。如果之前存在的主题已被删除,并且启用了自动主题创建,则 Debezium 可以自动创建主题。)
Ad hoc snapshot signals specify the tables to include in the snapshot. The snapshot can capture the entire contents of the database, or capture only a subset of the tables in the database. Also, the snapshot can capture a subset of the contents of the table(s) in the database. (即席快照信号指定要包含在快照中的表。快照可以捕获数据库的全部内容,或仅捕获数据库中表的部分内容。此外,快照还可以捕获数据库中表的部分内容。)
You specify the tables to capture by sending an execute-snapshot message to the signaling table. Set the type of the execute-snapshot signal to incremental or blocking, and provide the names of the tables to include in the snapshot, as described in the following table (您可以通过向信号表发送 execute-snapshot 消息来指定要捕获的表。将 execute-snapshot 信号的类型设置为 incremental 或 blocking,并提供要在快照中包含的表名称,如下表所示:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
Specifies the type of snapshot that you want to run. (指定您要运行的快照类型。) |
|
N/A |
An array that contains regular expressions matching the fully-qualified names of the tables to include in the snapshot. (一个包含正则表达式的数组,匹配要包含在快照中的表的完全限定名称。) |
|
N/A |
An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
|
|
N/A |
An optional string that specifies the column name that the connector uses as the primary key of a table during the snapshot process. (一个可选字符串,指定连接器在快照过程中用作表主键的列名。) |
您可以通过将 execute-snapshot 信号类型添加到信号表,或向 Kafka 信号主题发送信号消息来启动临时增量快照。在连接器处理完消息后,它将开始快照操作。快照过程读取第一个和最后一个主键值,并将这些值用作每个表的开始和结束点。根据表中的条目数量和配置的块大小,Debezium 将表分成块,然后逐个快照每个块。
有关更多信息,请参阅增量快照。
You initiate an ad hoc blocking snapshot by adding an entry with the execute-snapshot signal type to the signaling table or signaling topic. After the connector processes the message, it begins the snapshot operation. The connector temporarily stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the connector resumes streaming. (您可以通过向信号表或信号主题添加具有 execute-snapshot 信号类型的条目来启动即席阻塞快照。在连接器处理消息后,它将开始快照操作。连接器将暂时停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。)
有关更多信息,请参阅阻塞快照。
增量快照
|
SQL Server 排序规则
每个 SQL Server 服务器或数据库都配置为使用特定的排序规则,该规则决定字符数据如何存储、排序、比较和显示。某些排序规则集(如SQL Server 排序规则 (SQL_*))的排序规则与 Unicode 排序算法不兼容。在某些情况下,不兼容的排序规则可能导致在连接器运行临时快照时丢失数据。例如,如果 SQL Server 配置为将字符串作为 Unicode 发送(即,连接属性 有关使用 |
To provide flexibility in managing snapshots, Debezium includes a supplementary snapshot mechanism, known as incremental snapshotting. Incremental snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. Incremental snapshots are based on the DDD-3 design document. (为了在管理快照时提供灵活性,Debezium 提供了一个补充快照机制,称为增量快照。增量快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。增量快照基于DDD-3 设计文档。)
在增量快照中,Debezium 不是一次捕获数据库的完整状态(如初始快照),而是分阶段、一系列可配置的块来捕获每个表。您可以指定快照要捕获的表以及每个块的大小。块大小决定了快照在每次从数据库获取操作期间收集的行数。增量快照的默认块大小为 1024 行。
As an incremental snapshot proceeds, Debezium uses watermarks to track its progress, maintaining a record of each table row that it captures. This phased approach to capturing data provides the following advantages over the standard initial snapshot process (随着增量快照的进行,Debezium 使用水位标记来跟踪其进度,并维护捕获的每个表行的记录。与标准的初始快照过程相比,这种分阶段的数据捕获方法提供了以下优势:)
-
You can run incremental snapshots in parallel with streamed data capture, instead of postponing streaming until the snapshot completes. The connector continues to capture near real-time events from the change log throughout the snapshot process, and neither operation blocks the other. (您可以与流式数据捕获并行运行增量快照,而不是将流式传输推迟到快照完成。在整个快照过程中,连接器会继续从变更日志中捕获近乎实时事件,并且任一操作都不会阻塞另一个操作。)
-
If the progress of an incremental snapshot is interrupted, you can resume it without losing any data. After the process resumes, the snapshot begins at the point where it stopped, rather than recapturing the table from the beginning. (如果增量快照的进度被中断,您可以恢复它而不会丢失任何数据。在过程恢复后,快照将从停止的点开始,而不是从头重新捕获表。)
-
您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,在修改连接器配置以向其
table.include.list属性添加表后,您可能需要重新运行快照。
当您运行增量快照时,Debezium 会按主键对每个表进行排序,然后根据配置的块大小将表拆分成块。然后,它逐块捕获块中的每一行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示快照开始时该行中的值。
As a snapshot proceeds, it’s likely that other processes continue to access the database, potentially modifying table records. To reflect such changes, INSERT, UPDATE, or DELETE operations are committed to the transaction log as per usual. Similarly, the ongoing Debezium streaming process continues to detect these change events and emits corresponding change event records to Kafka. (随着快照的进行,其他进程很可能会继续访问数据库,可能修改表记录。为了反映这些更改,INSERT、UPDATE 或 DELETE 操作照常提交到事务日志。同样,正在进行的 Debezium 流式传输过程会继续检测这些变更事件,并将相应的变更事件记录发送到 Kafka。)
In some cases, the UPDATE or DELETE events that the streaming process emits are received out of sequence. That is, the streaming process might emit an event that modifies a table row before the snapshot captures the chunk that contains the READ event for that row. When the snapshot eventually emits the corresponding READ event for the row, its value is already superseded. To ensure that incremental snapshot events that arrive out of sequence are processed in the correct logical order, Debezium employs a buffering scheme for resolving collisions. Only after collisions between the snapshot events and the streamed events are resolved does Debezium emit an event record to Kafka. (在某些情况下,流式传输过程发出的 UPDATE 或 DELETE 事件可能会乱序接收。也就是说,流式传输过程可能会在快照捕获包含该行 READ 事件的块之前发出修改表行的事件。当快照最终发出行的相应 READ 事件时,其值已过时。为了确保乱序到达的增量快照事件按正确的逻辑顺序处理,Debezium 采用缓冲方案来解决冲突。仅当快照事件与流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。)
To assist in resolving collisions between late-arriving READ events and streamed events that modify the same table row, Debezium employs a so-called snapshot window. The snapshot window demarcates the interval during which an incremental snapshot captures data for a specified table chunk. Before the snapshot window for a chunk opens, Debezium follows its usual behavior and emits events from the transaction log directly downstream to the target Kafka topic. But from the moment that the snapshot for a particular chunk opens, until it closes, Debezium performs a de-duplication step to resolve collisions between events that have the same primary key. (为了帮助解决延迟到达的 READ 事件与修改同一表行的流式事件之间的冲突,Debezium 采用所谓的快照窗口。快照窗口划定了增量快照捕获指定表块数据的间隔。在某个块的快照窗口打开之前,Debezium 会遵循其常规行为,将事务日志中的事件直接向下游发送到目标 Kafka 主题。但在某个块的快照打开到关闭的整个过程中,Debezium 会执行去重步骤以解决具有相同主键的事件之间的冲突。)
For each data collection, the Debezium emits two types of events, and stores the records for them both in a single destination Kafka topic. The snapshot records that it captures directly from a table are emitted as READ operations. Meanwhile, as users continue to update records in the data collection, and the transaction log is updated to reflect each commit, Debezium emits UPDATE or DELETE operations for each change. (对于每个数据集合,Debezium 会发出两种类型的事件,并将两者的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且事务日志更新以反映每次提交,Debezium 会为每个更改发出 UPDATE 或 DELETE 操作。)
As the snapshot window opens, and Debezium begins processing a snapshot chunk, it delivers snapshot records to a memory buffer. During the snapshot windows, the primary keys of the READ events in the buffer are compared to the primary keys of the incoming streamed events. If no match is found, the streamed event record is sent directly to Kafka. If Debezium detects a match, it discards the buffered READ event, and writes the streamed record to the destination topic, because the streamed event logically supersede the static snapshot event. After the snapshot window for the chunk closes, the buffer contains only READ events for which no related transaction log events exist. Debezium emits these remaining READ events to the table’s Kafka topic. (随着快照窗口的打开,Debezium 开始处理快照块,它会将快照记录传递到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入的流式事件的主键进行比较。如果未找到匹配项,则将流式事件记录直接发送到 Kafka。如果 Debezium 检测到匹配,它将丢弃已缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代了静态快照事件。当块的快照窗口关闭后,缓冲区将仅包含没有相关事务日志事件的 READ 事件。Debezium 将这些剩余的 READ 事件发送到表的 Kafka 主题。)
The connector repeats the process for each snapshot chunk. (连接器对每个快照块重复此过程。)
|
To enable Debezium to perform incremental snapshots, you must grant the connector permission to write to the signaling table. (要使 Debezium 能够执行增量快照,您必须授予连接器写入信号表的权限。) Write permission is unnecessary only for connectors that can be configured to perform read-only incrementals snapshots (MariaDB, MySQL, or PostgreSQL). (仅对于可以配置为执行只读增量快照的连接器(MariaDB、MySQL 或 PostgreSQL)来说,写入权限是可选的。) |
Currently, you can use either of the following methods to initiate an incremental snapshot (当前,您可以使用以下任一方法启动增量快照:)
|
Debezium SQL Server 连接器不支持在增量快照运行时进行模式更改。 |
触发增量快照
To initiate an incremental snapshot, you can send an ad hoc snapshot signal to the signaling table on the source database. You submit snapshot signals as SQL INSERT queries. (要启动增量快照,您可以将即席快照信号发送到源数据库上的信号表。您将快照信号作为 SQL INSERT 查询提交。)
After Debezium detects the change in the signaling table, it reads the signal, and runs the requested snapshot operation. (在 Debezium 检测到信号表中的更改后,它会读取信号并运行请求的快照操作。)
The query that you submit specifies the tables to include in the snapshot, and, optionally, specifies the type of snapshot operation. Debezium currently supports the incremental and blocking snapshot types. (您提交的查询指定了要包含在快照中的表,并可选地指定了快照操作的类型。Debezium 目前支持 incremental 和 blocking 快照类型。)
To specify the tables to include in the snapshot, provide a data-collections array that lists the tables, or an array of regular expressions used to match tables, for example, (要指定要包含在快照中的表,请提供一个列出表的 data-collections 数组,或者一个用于匹配表的正则表达式数组,例如:)
{"data-collections": ["public.MyFirstTable", "public.MySecondTable"]}
The data-collections array for an incremental snapshot signal has no default value. If the data-collections array is empty, Debezium interprets the empty array to mean that no action is required, and it does not perform a snapshot. (增量快照信号的 data-collections 数组没有默认值。如果 data-collections 数组为空,Debezium 会将空数组解释为无需执行任何操作,并且不会执行快照。)
|
If the name of a table that you want to include in a snapshot contains a dot ( |
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
Send a SQL query to add the ad hoc incremental snapshot request to the signaling table (发送 SQL 查询将即席增量快照请求添加到信号表)
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');For example, (例如,)
INSERT INTO db1.myschema.debezium_signal (id, type, data) (1) values ('ad-hoc-1', (2) 'execute-snapshot', (3) '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4) "type":"incremental", (5) "additional-conditions":[{"data-collection": "db1.schema1.table1" ,"filter":"color=\'blue\'"}]}'); (6)The values of the
id,type, anddataparameters in the command correspond to the fields of the signaling table. (命令中的id、type和data参数的值对应于信号表的字段。)
The following table describes the parameters in the example (下表描述了示例中的参数:)表 3. 用于将增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述 Item Value (值) 描述 1
database.schema.debezium_signalSpecifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)
2
ad-hoc-1The
idparameter specifies an arbitrary string that is assigned as theididentifier for the signal request. (id参数指定一个任意字符串,该字符串被分配为信号请求的id标识符。)
Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. Rather, during the snapshot, Debezium generates its ownidstring as a watermarking signal. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。而是在快照期间,Debezium 生成自己的id字符串作为水位标记信号。)3
execute-snapshotThe
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)A required component of the
datafield of a signal that specifies an array of table names or regular expressions to match table names to include in the snapshot. (信号的data字段的必需组件,该字段指定一个表名称数组或匹配要包含在快照中的表名称的正则表达式。)
数组列出了使用database.schema.table格式的正则表达式,以匹配数据集合的完全限定名。此格式与您用于指定连接器 信号表 名称的格式相同。5
incremental (增量)An optional
typecomponent of thedatafield of a signal that specifies the type of snapshot operation to run. (信号的data字段的可选type组件,指定要运行的快照操作的类型。)
Valid values areincrementalandblocking. (有效值为incremental和blocking。)
If you do not specify a value, the connector defaults to performing an incremental snapshot. (如果您不指定值,连接器将默认执行增量快照。)6
additional-conditions (附加条件)An optional array that specifies a set of additional conditions that the connector evaluates to determine the subset of records to include in a snapshot. (一个可选数组,指定连接器用于确定要包含在快照中的记录子集的附加条件集。)
Each additional condition is an object withdata-collectionandfilterproperties. You can specify different filters for each data collection. (每个附加条件是一个具有data-collection和filter属性的对象。您可以为每个数据集合指定不同的过滤器。)
*data-collection属性是过滤器适用的数据集合的完全限定名称。有关additional-conditions参数的更多信息,请参阅使用additional-conditions运行临时增量快照。
使用 additional-conditions 运行临时增量快照
If you want a snapshot to include only a subset of the content in a table, you can modify the signal request by appending an additional-conditions parameter to the snapshot signal. (如果您希望快照仅包含表内容的子集,则可以通过将 additional-conditions 参数附加到快照信号来修改信号请求。)
The SQL query for a typical snapshot takes the following form (典型快照的 SQL 查询形式如下:)
SELECT * FROM <tableName> ....
By adding an additional-conditions parameter, you append a WHERE condition to the SQL query, as in the following example (通过添加 additional-conditions 参数,您可以将 WHERE 条件附加到 SQL 查询,如下例所示:)
SELECT * FROM <data-collection> WHERE <filter> ....
The following example shows a SQL query to send an ad hoc incremental snapshot request with an additional condition to the signaling table (以下示例显示了一个 SQL 查询,用于向信号表发送带有附加条件的即席增量快照请求:)
INSERT INTO <signalTable> (id, type, data) VALUES ('<id>', '<snapshotType>', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"<snapshotType>","additional-conditions":[{"data-collection": "<fullyQualfiedTableName>", "filter": "<additional-condition>"}]}');
For example, suppose you have a products table that contains the following columns (例如,假设您有一个 products 表,其中包含以下列:)
-
id(primary key) -
color (颜色) -
quantity (数量)
If you want an incremental snapshot of the products table to include only the data items where color=blue, you can use the following SQL statement to trigger the snapshot (如果您希望 products 表的增量快照仅包含 color=blue 的数据项,您可以使用以下 SQL 语句来触发快照:)
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue"}]}');
The additional-conditions parameter also enables you to pass conditions that are based on more than one column. For example, using the products table from the previous example, you can submit a query that triggers an incremental snapshot that includes the data of only those items for which color=blue and quantity>10 (additional-conditions 参数还允许您传递基于多个列的条件。例如,使用前面示例中的 products 表,您可以提交一个查询来触发增量快照,其中仅包含 color=blue 且 quantity>10 的项目数据:)
INSERT INTO db1.myschema.debezium_signal (id, type, data) VALUES('ad-hoc-1', 'execute-snapshot', '{"data-collections": ["db1.schema1.products"],"type":"incremental", "additional-conditions":[{"data-collection": "db1.schema1.products", "filter": "color=blue AND quantity>10"}]}');
The following example, shows the JSON for an incremental snapshot event that is captured by a connector. (以下示例显示了连接器捕获的增量快照事件的 JSON。)
{
"before":null,
"after": {
"pk":"1",
"value":"New data"
},
"source": {
...
"snapshot":"incremental" (1)
},
"op":"r", (2)
"ts_ms":"1620393591654",
"ts_us":"1620393591654547",
"ts_ns":"1620393591654547920",
"transaction":null
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
Specifies the type of snapshot operation to run. (指定要运行的快照操作的类型。) |
2 |
|
Specifies the event type. (指定事件类型。) |
使用 Kafka 信号通道触发增量快照
You can send a message to the configured Kafka topic to request the connector to run an ad hoc incremental snapshot. (您可以向配置的 Kafka 主题发送消息,请求连接器运行即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is execute-snapshot, and the data field must have the following fields (信号类型为 execute-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports the |
|
N/A |
An array of comma-separated regular expressions that match the fully-qualified names of tables to include in the snapshot. (一个逗号分隔的正则表达式数组,匹配要包含在快照中的表的完全限定名称。) |
|
N/A |
An optional array of additional conditions that specifies criteria that the connector evaluates to designate a subset of records to include in a snapshot. (一个可选的附加条件数组,指定连接器评估以指定要包含在快照中的记录子集的标准。) |
execute-snapshot Kafka message (示例 2. 一个 execute-snapshot Kafka 消息)Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["{collection-container}.table1", "{collection-container}.table2"], "type": "INCREMENTAL"}}`
Debezium uses the additional-conditions field to select a subset of a table’s content. (Debezium 使用 additional-conditions 字段来选择表内容的子集。)
Typically, when Debezium runs a snapshot, it runs a SQL query such as (通常,当 Debezium 运行快照时,它会运行一个 SQL 查询,例如:)
SELECT * FROM <tableName> ….
When the snapshot request includes an additional-conditions property, the data-collection and filter parameters of the property are appended to the SQL query, for example (当快照请求包含 additional-conditions 属性时,该属性的 data-collection 和 filter 参数将被附加到 SQL 查询中,例如:)
SELECT * FROM <data-collection> WHERE <filter> ….
For example, given a products table with the columns id (primary key), color, and brand, if you want a snapshot to include only content for which color='blue', when you request the snapshot, you could add the additional-conditions property to filter the content: :leveloffset: +1 (例如,给定一个具有 id(主键)、color 和 brand 列的 products 表,如果您希望快照仅包含 color='blue' 的内容,在请求快照时,您可以添加 additional-conditions 属性来过滤内容::leveloffset: +1)
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue'"}]}}`
You can also use the additional-conditions property to pass conditions based on multiple columns. For example, using the same products table as in the previous example, if you want a snapshot to include only the content from the products table for which color='blue', and brand='MyBrand', you could send the following request: :leveloffset: +1 (您还可以使用 additional-conditions 属性来传递基于多个列的条件。例如,使用前面示例中的相同 products 表,如果您希望快照仅包含 products 表中 color='blue' 且 brand='MyBrand' 的内容,您可以发送以下请求::leveloffset: +1)
Key = `test_connector`
Value = `{"type":"execute-snapshot","data": {"data-collections": ["db1.schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{"data-collection": "db1.schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`
停止增量快照
In some situations, it might be necessary to stop an incremental snapshot. For example, you might realize that snapshot was not configured correctly, or maybe you want to ensure that resources are available for other database operations. You can stop a snapshot that is already running by sending a signal to the signaling table on the source database. (在某些情况下,可能需要停止增量快照。例如,您可能发现快照配置不正确,或者您想确保资源可用于其他数据库操作。您可以通过向源数据库上的信号表发送信号来停止正在运行的快照。)
You submit a stop snapshot signal to the signaling table by sending it in a SQL INSERT query. The stop-snapshot signal specifies the type of the snapshot operation as incremental, and optionally specifies the tables that you want to omit from the currently running snapshot. After Debezium detects the change in the signaling table, it reads the signal, and stops the incremental snapshot operation if it’s in progress. (您通过在 SQL INSERT 查询中发送来将停止快照信号提交到信号表。stop-snapshot 信号将快照操作的 type 指定为 incremental,并可选地指定您希望从当前正在运行的快照中排除的表。在 Debezium 检测到信号表中的更改后,它会读取信号,并在增量快照操作进行中时停止它。)
您还可以通过向Kafka 信号主题发送 JSON 消息来停止增量快照。
-
-
A signaling data collection exists on the source database. (源数据库上存在信号数据集合。)
-
信号数据集合在
signal.data.collection属性中指定。
-
-
Send a SQL query to stop the ad hoc incremental snapshot to the signaling table (发送 SQL 查询以停止添加到信号表的即席增量快照)
INSERT INTO <signalTable> (id, type, data) values ('<id>', 'stop-snapshot', '{"data-collections": ["<fullyQualfiedTableName>","<fullyQualfiedTableName>"],"type":"incremental"}');For example, (例如,)
INSERT INTO db1.myschema.debezium_signal (id, type, data) (1) values ('ad-hoc-1', (2) 'stop-snapshot', (3) '{"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], (4) "type":"incremental"}'); (5)The values of the
id,type, anddataparameters in the signal command correspond to the fields of the signaling table. (信号命令中的id、type和data参数的值对应于信号表的字段。)
The following table describes the parameters in the example (下表描述了示例中的参数:)表 6. 用于将停止增量 snapshot 信号发送到信号表的 SQL 命令中字段的描述 Item Value (值) 描述 1
database.schema.debezium_signalSpecifies the fully-qualified name of the signaling table on the source database. (指定源数据库上信号表的完全限定名称。)
2
ad-hoc-1The
idparameter specifies an arbitrary string that is assigned as theididentifier for the signal request. (id参数指定一个任意字符串,该字符串被分配为信号请求的id标识符。)
Use this string to identify logging messages to entries in the signaling table. Debezium does not use this string. (使用此字符串将日志消息标识到信号表中的条目。Debezium 不使用此字符串。)3
stop-snapshotSpecifies
typeparameter specifies the operation that the signal is intended to trigger. (type参数指定信号旨在触发的操作。)4
data-collections (数据集合)An optional component of the
datafield of a signal that specifies an array of table names or regular expressions to match table names to remove from the snapshot. (信号的data字段的可选组件,指定一个表名称数组或匹配要从快照中删除的表名称的正则表达式。)
数组列出了匹配表名称的正则表达式,格式为database.schema.tableIf you omit this component from the
datafield, the signal stops the entire incremental snapshot that is in progress. (如果您从data字段中省略此组件,则信号将停止正在进行的整个增量快照。)5
incremental (增量)A required component of the
datafield of a signal that specifies the type of snapshot operation to be stopped. (信号的data字段的必需组件,指定要停止的快照操作的类型。)
Currently, the only valid option isincremental. (目前,唯一有效选项是incremental。)
If you do not specify atypevalue, the signal fails to stop the incremental snapshot. (如果您不指定type值,信号将无法停止增量快照。)
使用 Kafka 信号通道停止增量快照
You can send a signal message to the configured Kafka signaling topic to stop an ad hoc incremental snapshot. (您可以向配置的 Kafka 信号主题发送信号消息,以停止即席增量快照。)
The key of the Kafka message must match the value of the topic.prefix connector configuration option. (Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。)
The value of the message is a JSON object with type and data fields. (消息的值是一个带有 type 和 data 字段的 JSON 对象。)
The signal type is stop-snapshot, and the data field must have the following fields (信号类型为 stop-snapshot,data 字段必须具有以下字段:)
| Field (字段) | Default (默认值) | Value (值) |
|---|---|---|
|
|
The type of the snapshot to be executed. Currently Debezium supports only the |
|
N/A |
An optional array of comma-separated regular expressions that match the fully-qualified names of the tables an array of table names or regular expressions to match table names to remove from the snapshot. (一个可选的逗号分隔的正则表达式数组,匹配要从快照中删除的表的表名称或匹配表名称的正则表达式数组。) |
The following example shows a typical stop-snapshot Kafka message (以下示例显示了一个典型的 stop-snapshot Kafka 消息:)
Key = `test_connector`
Value = `{"type":"stop-snapshot","data": {"data-collections": ["db1.schema1.table1", "db1.schema1.table2"], "type": "INCREMENTAL"}}`
Custom snapshotter SPI (自定义快照器 SPI)
For more advanced uses, you can fine-tune control of the snapshot by implementing one of the following interfaces (对于更高级的用途,您可以实现以下接口之一来微调快照的控制:)
io.debezium.snapshot.spi.Snapshotter-
Controls whether the connector takes a snapshot. (控制连接器是否执行快照。)
io.debezium.snapshot.spi.SnapshotQuery-
Controls how data is queried during a snapshot. (控制快照期间如何查询数据。)
io.debezium.snapshot.spi.SnapshotLock-
Controls whether the connector locks tables when taking a snapshot. (控制连接器在进行快照时是否锁定表。)
io.debezium.snapshot.spi.Snapshotter 接口。所有内置快照模式都实现了此接口。)/**
* {@link Snapshotter} is used to determine the following details about the snapshot process:
* <p>
* - Whether a snapshot occurs. <br>
* - Whether streaming continues during the snapshot. <br>
* - Whether the snapshot includes schema (if supported). <br>
* - Whether to snapshot data or schema following an error.
* <p>
* Although Debezium provides many default snapshot modes,
* to provide more advanced functionality, such as partial snapshots,
* you can customize implementation of the interface.
* For more information, see the documentation.
*
*
*
*/
@Incubating
public interface Snapshotter extends Configurable {
/**
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a data snapshot
*/
boolean shouldSnapshotData(boolean offsetExists, boolean snapshotInProgress);
/**
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @return {@code true} if the snapshotter should take a schema snapshot
*/
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called after the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called after the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}
io.debezium.snapshot.spi.SnapshotQuery 接口。所有内置快照查询模式都实现了此接口。)/**
* {@link SnapshotQuery} is used to determine the query used during a data snapshot
*
*
*/
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}
io.debezium.snapshot.spi.SnapshotLock 接口。所有内置快照锁定模式都实现了此接口。)/**
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
*
*/
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given table during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, String tableId);
}
阻塞快照
To provide more flexibility in managing snapshots, Debezium includes a supplementary ad hoc snapshot mechanism, known as a blocking snapshot. Blocking snapshots rely on the Debezium mechanism for sending signals to a Debezium connector. (为了在管理快照时提供更大的灵活性,Debezium 提供了一个补充的即席快照机制,称为阻塞快照。阻塞快照依赖于 Debezium 向 Debezium 连接器发送信号的机制。)
A blocking snapshot behaves just like an initial snapshot, except that you can trigger it at run time. (阻塞快照的行为与初始快照完全相同,只是您可以随时在运行时触发它。)
You might want to run a blocking snapshot rather than use the standard initial snapshot process in the following situations (您可能希望在以下情况下运行阻塞快照,而不是使用标准的初始快照过程:)
-
You add a new table and you want to complete the snapshot while the connector is running. (添加了一个新表,并且您希望在连接器运行时完成快照。)
-
You add a large table, and you want the snapshot to complete in less time than is possible with an incremental snapshot. (添加了一个大表,并且您希望快照的完成时间比增量快照更快。)
When you run a blocking snapshot, Debezium stops streaming, and then initiates a snapshot of the specified table, following the same process that it uses during an initial snapshot. After the snapshot completes, the streaming is resumed. (运行阻塞快照时,Debezium 会停止流式传输,然后启动指定表的快照,遵循其在初始快照期间使用的相同过程。快照完成后,流式传输将恢复。)
You can set the following properties in the data component of a signal (您可以在信号的 data 组件中设置以下属性:)
-
data-collections: to specify which tables must be snapshot. (data-collections:用于指定必须进行快照的表。)
-
data-collections: Specifies the tables that you want the snapshot to include. (data-collections:指定您希望快照包含的表。)
This property accepts a comma-separated list of regular expressions that match fully-qualified table names. The behavior of the property is similar to the behavior of thetable.include.listproperty, which specifies the tables to capture in a blocking snapshot. (此属性接受逗号分隔的正则表达式列表,这些表达式匹配完全限定的表名。此属性的行为类似于table.include.list属性的行为,后者指定要在阻塞快照中捕获的表。) -
additional-conditions: You can specify different filters for different table. (additional-conditions:您可以为不同的表指定不同的过滤器。)
-
The
data-collectionproperty is the fully-qualified name of the table for which the filter will be applied, and can be case-sensitive or case-insensitive depending on the database. (data-collection属性是过滤器将应用的表的完全限定名称,并且根据数据库的不同,可能区分大小写或不区分大小写。) -
The
filterproperty will have the same value used in thesnapshot.select.statement.overrides, the fully-qualified name of the table that should match by case. (filter属性将具有与snapshot.select.statement.overrides中使用的值相同的值,即应区分大小写匹配的表的完全限定名称。)
-
For example (例如:)
{"type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{"data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {"data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}
A delay might exist between the time that you send the signal to trigger the snapshot, and the time when streaming stops and the snapshot starts. As a result of this delay, after the snapshot completes, the connector might emit some event records that duplicate records captured by the snapshot. (从您发送触发快照的信号到流式传输停止并开始快照之间可能存在延迟。由于此延迟,快照完成后,连接器可能会发出一些重复快照捕获记录的事件记录。)
读取更改数据表
当连接器首次启动时,它会捕获捕获表的结构的快照,并将其持久化到其内部数据库模式历史记录主题。然后,连接器会为每个源表标识一个更改表,并完成以下步骤。
-
对于每个更改表,连接器读取在上次存储的最大 LSN 和当前最大 LSN 之间创建的所有更改。
-
连接器根据其提交 LSN 和更改 LSN 的值按升序排序读取的更改。此排序顺序可确保 Debezium 按更改在数据库中发生的顺序重放更改。
-
连接器将提交和更改 LSN 作为偏移量传递给 Kafka Connect。
-
连接器存储最大 LSN 并从步骤 1 重新开始该过程。
重启后,连接器将从上次读取的偏移量(提交和更改 LSN)继续处理。
连接器能够检测包含的源表是否启用了 CDC,并相应地调整其行为。
数据库中未记录最大 LSN
可能出现未在数据库中记录最大 LSN 的情况,因为
-
SQL Server 代理未运行
-
更改表中尚未记录任何更改
-
数据库活动量低,并且 cdc 清理作业会定期清除 cdc 表中的条目
在这些可能性中,由于运行 SQL Server 代理是先决条件,因此第 1 点是一个真正的问题(而第 2 点和第 3 点是正常的)。
为了缓解此问题并区分第 1 点和其他点,通过以下查询检查 SQL Server 代理的状态:"SELECT CASE WHEN dss.[status]=4 THEN 1 ELSE 0 END AS isRunning FROM [#db].sys.dm_server_services dss WHERE dss.[servicename] LIKE N’SQL Server Agent (%';"。如果 SQL Server 代理未运行,日志中将写入 ERROR:“数据库中未记录最大 LSN;SQL Server 代理未运行”。
|
SQL Server 代理运行状态查询需要 |
主题名称
默认情况下,SQL Server 连接器将表中发生的所有 INSERT、UPDATE 和 DELETE 操作的事件写入一个特定于该表的 Apache Kafka 主题。连接器使用以下约定命名更改事件主题:<topicPrefix>.<schemaName>.<tableName>
以下列表提供了默认名称组件的定义:
- topicPrefix
-
服务器的逻辑名称,由
topic.prefix配置属性指定。 - schemaName (模式名称)
-
更改事件发生的数据库模式的名称。
- tableName
-
更改事件发生的数据库表的名称。
例如,如果 fulfillment 是逻辑服务器名称,dbo 是模式名称,并且数据库包含名为 products、products_on_hand、customers 和 orders 的表,则连接器会将更改事件记录流式传输到以下 Kafka 主题:
-
fulfillment.testDB.dbo.products -
fulfillment.testDB.dbo.products_on_hand -
fulfillment.testDB.dbo.customers -
fulfillment.testDB.dbo.orders
如果默认主题名称不符合您的要求,您可以配置自定义主题名称。要配置自定义主题名称,请在逻辑主题路由 SMT 中指定正则表达式。有关使用逻辑主题路由 SMT 自定义主题命名的更多信息,请参阅 主题路由。
模式历史记录主题
当数据库客户端查询数据库时,客户端使用数据库的当前模式。但是,数据库模式可以随时更改,这意味着连接器必须能够识别在记录每次插入、更新或删除操作时模式是什么。此外,连接器不一定能将当前模式应用于每个事件。如果事件相对较旧,它可能是在应用当前模式之前记录的。
为确保正确处理模式更改后的更改事件,Debezium SQL Server 连接器会将新模式的快照存储在 SQL Server 更改表中,这些表镜像了其关联数据表的结构。连接器将表模式信息以及导致模式更改的操作的 LSN 存储在数据库模式历史记录 Kafka 主题中。连接器使用存储的模式表示来生成更改事件,这些事件正确地镜像了每次插入、更新或删除操作时表的结构。
当连接器在崩溃或正常停止后重新启动时,它将从上次读取的位置继续读取 SQL Server CDC 表中的条目。根据连接器从数据库模式历史记录主题读取的模式信息,连接器将应用连接器重新启动时生效的表结构。
如果更新了处于捕获模式的 SQL Server 表的模式,那么您还必须更新相应更改表的模式。您必须是具有提升权限的 SQL Server 数据库管理员才能更新数据库模式。有关在 Debezium 环境中更新 SQL Server 数据库模式的更多信息,请参阅数据库模式演进。
数据库模式历史记录主题仅供连接器内部使用。可选地,连接器还可以将模式更改事件发送到另一个供消费者应用程序使用的主题。
模式更改主题
对于启用了 CDC 的每个表,Debezium SQL Server 连接器都会存储数据库中应用于表的模式更改事件的历史记录。连接器将模式更改事件写入一个名为 <topicPrefix> 的 Kafka 主题,其中 topicPrefix 是在 topic.prefix 配置属性中指定的逻辑服务器名称。
连接器发送到模式更改主题的消息包含一个 payload,并且可以选择性地包含更改事件消息的模式。
模式变更事件的模式具有以下元素:
name (名称)-
模式变更事件消息的名称。
type-
事件消息类型的类型。
version (版本)-
模式的版本。版本是一个整数,每次模式更改时都会递增。
fields (字段)-
变更事件消息中包含的字段。
以下示例以 JSON 格式显示了典型的模式。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.SchemaChangeKey",
"version": 1
},
"payload": {
"databaseName": "inventory"
}
}
模式变更事件消息的 payload 包含以下元素:
databaseName (数据库名称)-
应用于语句的数据库名称。
databaseName的值用作消息键。 tableChanges (表变更)-
模式更改后整个表模式的结构化表示。
tableChanges字段包含一个数组,其中包含表每个列的条目。由于结构化表示以 JSON 或 Avro 格式呈现数据,因此消费者无需先通过 DDL 解析器处理消息即可轻松读取消息。
|
当连接器配置为捕获表时,它会将该表模式更改的历史记录不仅存储在模式更改主题中,还存储在内部数据库模式历史记录主题中。内部数据库模式历史记录主题仅供连接器使用,不打算由消费应用程序直接使用。确保需要模式更改通知的应用程序仅从模式更改主题中获取该信息。 |
|
连接器发送到其模式更改主题的消息的格式处于孵化阶段,可能会随时更改。 |
发生以下事件时,Debezium 会向模式更改主题发出消息
-
您为表启用了 CDC。
-
您禁用了表的 CDC。
-
您已按照模式演进过程修改了 CDC 启用的表的结构。
以下示例显示了模式更改主题中的一条消息。该消息包含表的逻辑表示。
{
"schema": {
...
},
"payload": {
"source": {
"version": "3.3.1.Final",
"connector": "sqlserver",
"name": "server1",
"ts_ms": 0,
"snapshot": "true",
"db": "testDB",
"schema": "dbo",
"table": "customers",
"change_lsn": null,
"commit_lsn": "00000025:00000d98:00a2",
"event_serial_no": null
},
"ts_ms": 1588252618953, (1)
"databaseName": "testDB", (2)
"schemaName": "dbo",
"ddl": null, (3)
"tableChanges": [ (4)
{
"type": "CREATE", (5)
"id": "\"testDB\".\"dbo\".\"customers\"", (6)
"table": { (7)
"defaultCharsetName": null,
"primaryKeyColumnNames": [ (8)
"id"
],
"columns": [ (9)
{
"name": "id",
"jdbcType": 4,
"nativeType": null,
"typeName": "int identity",
"typeExpression": "int identity",
"charsetName": null,
"length": 10,
"scale": 0,
"position": 1,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "first_name",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 2,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "last_name",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 3,
"optional": false,
"autoIncremented": false,
"generated": false
},
{
"name": "email",
"jdbcType": 12,
"nativeType": null,
"typeName": "varchar",
"typeExpression": "varchar",
"charsetName": null,
"length": 255,
"scale": null,
"position": 4,
"optional": false,
"autoIncremented": false,
"generated": false
}
],
"attributes": [ (10)
{
"customAttribute": "attributeValue"
}
]
}
}
]
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
可选字段,显示连接器处理事件的时间。时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 在 source 对象中,ts_ms 指示数据库中发生更改的时间。通过比较 payload.source.ts_ms 的值与 payload.ts_ms 的值,您可以确定源数据库更新与 Debezium 之间的延迟。 |
2 |
|
标识包含更改的数据库和模式。 |
3 |
|
对于 SQL Server 连接器,始终为 |
4 |
|
包含 DDL 命令生成的模式变更的一个或多个项目的数组。 |
5 |
|
描述更改的类型。值是以下之一:
|
6 |
|
已创建、已修改或已删除的表的完整标识符。 |
7 |
|
表示应用更改后表的元数据。 |
8 |
|
组成表主键的列的列表。 |
9 |
|
已更改表中每个列的元数据。 |
10 |
|
每个表变更的自定义属性元数据。 |
在连接器发送到模式更改主题的消息中,键是包含模式更改的数据库的名称。在以下示例中,payload 字段包含键
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "databaseName"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.SchemaChangeKey",
"version": 1
},
"payload": {
"databaseName": "testDB"
}
}
数据更改事件
Debezium SQL Server 连接器为每个行级 INSERT、UPDATE 和 DELETE 操作生成一个数据更改事件。每个事件包含一个键和一个值。键和值的结构取决于已更改的表。
Debezium 和 Kafka Connect 的设计围绕着连续的事件消息流。但是,这些事件的结构可能会随时间变化,这对于使用者来说很难处理。为了解决这个问题,每个事件都包含其内容的模式,或者,如果您使用的是模式注册表,则包含一个模式 ID,使用者可以使用该 ID 从注册表中获取模式。这使得每个事件都是自包含的。
以下骨架 JSON 显示了更改事件的基本四个部分。但是,您如何配置应用程序中使用的 Kafka Connect 转换器决定了这四个部分在更改事件中的表示。schema 字段仅在配置转换器生成它时才存在于更改事件中。同样,只有配置转换器生成事件键和事件有效负载时,它们才会在更改事件中出现。如果您使用 JSON 转换器并配置它生成所有四个基本更改事件部分,则更改事件将具有此结构:
{
"schema": { (1)
...
},
"payload": { (2)
...
},
"schema": { (3)
...
},
"payload": { (4)
...
},
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
第一个 主键(或唯一键,如果表没有主键)的结构。 可以通过设置 message.key.columns 连接器配置属性来覆盖表的主键。在这种情况下,第一个模式字段描述了该属性标识的键的结构。 |
2 |
|
第一个 |
3 |
|
第二个 |
4 |
|
第二个 |
默认情况下,连接器将更改事件记录流式传输到名称与事件源表相同的表中。有关更多信息,请参阅主题名称。
|
SQL Server 连接器确保所有 Kafka Connect 模式名称都符合Avro 模式名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和表名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 \_。如果存在无效字符,则会用下划线字符替换。 如果逻辑服务器名称、数据库名称或表名称包含无效字符,并且区分名称的唯一字符是无效的并因此被替换为下划线,这可能会导致意外冲突。 |
更改事件键
更改事件的键包含已更改表的键的模式以及已更改行的实际键。模式及其相应的负载都包含已更改表的主键(或唯一约束)在连接器创建事件时生效的每个列的字段。
考虑以下 customers 表,后面是该表更改事件键的示例。
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
捕获 customers 表更改的每个更改事件都有相同的事件键模式。只要 customers 表具有先前定义,捕获 customers 表更改的每个更改事件都具有以下键结构,在 JSON 中如下所示:
{
"schema": { (1)
"type": "struct",
"fields": [ (2)
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false, (3)
"name": "server1.testDB.dbo.customers.Key" (4)
},
"payload": { (5)
"id": 1004
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
键的模式部分指定了一个 Kafka Connect 模式,该模式描述了键的 主键的结构。 |
2 |
|
指定 |
3 |
|
指示事件键的 |
4 |
|
定义键负载结构的模式的名称。此模式描述了已更改表的 P-key 结构。键模式名称的格式为 connector-name.database-schema-name.table-name.
|
5 |
|
包含为此更改事件生成的行的键。在此示例中,键包含一个名为 |
|
当 Debezium 发出更改事件记录时,它会将每个记录的消息键设置为源表的主键或唯一键列的名称。Debezium 必须能够读取这些列才能正常工作。如果您在连接器配置中设置了 |
|
如果表没有主键或唯一键,则更改事件的键为 null。这是有意义的,因为没有主键或唯一键约束的表中的行无法唯一标识。 |
更改事件值
更改事件中的值比键要复杂一些。与键一样,值也包含 schema 部分和 payload 部分。schema 部分包含描述 payload 部分的 Envelope 结构的模式,包括其嵌套字段。对于创建、更新或删除数据的操作的更改事件,其值有效负载都具有信封结构。
考虑用于显示更改事件键示例的相同样本表。
CREATE TABLE customers (
id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE
);
对该表进行的更改的更改事件的值部分根据每个事件类型进行描述。
create 事件
以下示例显示了连接器为在 customers 表中创建数据的操作生成的更改事件的值部分:
{
"schema": { (1)
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "server1.dbo.testDB.customers.Value", (2)
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "server1.dbo.testDB.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "int64",
"optional": false,
"field": "ts_us"
},
{
"type": "int64",
"optional": false,
"field": "ts_ns"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source", (3)
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "int64",
"optional": true,
"field": "ts_us"
},
{
"type": "int64",
"optional": true,
"field": "ts_ns"
}
],
"optional": false,
"name": "server1.dbo.testDB.customers.Envelope" (4)
},
"payload": { (5)
"before": null, (6)
"after": { (7)
"id": 1005,
"first_name": "john",
"last_name": "doe",
"email": "john.doe@example.org"
},
"source": { (8)
"version": "3.3.1.Final",
"connector": "sqlserver",
"name": "server1",
"ts_ms": 1559729468470,
"ts_us": 1559729468470000,
"ts_ns": 1559729468470000000,
"snapshot": false,
"db": "testDB",
"schema": "dbo",
"table": "customers",
"change_lsn": "00000027:00000758:0003",
"commit_lsn": "00000027:00000758:0005",
"event_serial_no": "1"
},
"op": "c", (9)
"ts_ms": 1559729471739, (10)
"ts_ms": 1559729471739876, (10)
"ts_ms": 1559729471739876149 (10)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
描述值有效负载结构的 值模式。更改事件的值模式对于连接器为特定表生成的每个更改事件都相同。 |
2 |
|
在 |
3 |
|
|
4 |
|
|
5 |
|
值 的实际数据。这是更改事件提供的信息。 事件的 JSON 表示可能比它们描述的行大得多。这是因为 JSON 表示必须包含消息的模式和有效负载部分。但是,通过使用Avro 转换器,您可以显著减小连接器流式传输到 Kafka 主题的消息的大小。 |
6 |
|
一个可选字段,指定事件发生前行的状态。当 |
7 |
|
一个可选字段,指定事件发生后行的状态。在此示例中, |
8 |
|
描述事件源元数据的 必需字段。此字段包含可用于将此事件与其他事件进行比较的信息,关于事件的来源、事件发生的顺序以及事件是否属于同一事务。源元数据包括:
|
9 |
|
描述导致连接器生成事件的操作类型的 必需字符串。在此示例中,
|
10 |
|
可选字段,显示连接器处理事件的时间。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
update 事件
对于样本 customers 表中的更新,更改事件的值具有与该表的创建事件相同的模式。同样,事件值
的有效负载也具有相同的结构。但是,在更新事件中,事件值有效负载包含不同的值。以下是一个更改事件值在连接器为 customers 表中的更新生成的事件中的示例:
{
"schema": { ... },
"payload": {
"before": { (1)
"id": 1005,
"first_name": "john",
"last_name": "doe",
"email": "john.doe@example.org"
},
"after": { (2)
"id": 1005,
"first_name": "john",
"last_name": "doe",
"email": "noreply@example.org"
},
"source": { (3)
"version": "3.3.1.Final",
"connector": "sqlserver",
"name": "server1",
"ts_ms": 1559729995937,
"ts_us": 1559729995937000,
"ts_ns": 1559729995937000000,
"snapshot": false,
"db": "testDB",
"schema": "dbo",
"table": "customers",
"change_lsn": "00000027:00000ac0:0002",
"commit_lsn": "00000027:00000ac0:0007",
"event_serial_no": "2"
},
"op": "u", (4)
"ts_ms": 1559729998706, (5)
"ts_us": 1559729998706318, (5)
"ts_ns": 1559729998706318547 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,指定事件发生前行的状态。在 update 事件值中, |
2 |
|
一个可选字段,指定事件发生后行的状态。您可以比较 |
3 |
|
描述事件源元数据所必需的字段。
|
4 |
|
描述操作类型的 必需字符串。在更新事件值中, |
5 |
|
可选字段,显示连接器处理事件的时间。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
|
更新行主键/唯一键的列会更改该行键的值。当键更改时,Debezium 会输出三个事件:一个带有旧键的delete事件和墓碑事件,然后是一个带有新键的create事件。 |
delete 事件
删除更改事件中的值具有与
同一表的创建和更新事件相同的 schema 部分。对于样本 customers 表,删除事件中的 payload 部分如下所示:
{
"schema": { ... },
},
"payload": {
"before": { <>
"id": 1005,
"first_name": "john",
"last_name": "doe",
"email": "noreply@example.org"
},
"after": null, (2)
"source": { (3)
"version": "3.3.1.Final",
"connector": "sqlserver",
"name": "server1",
"ts_ms": 1559730445243,
"ts_us": 1559730445243000,
"ts_ns": 1559730445243000000,
"snapshot": false,
"db": "testDB",
"schema": "dbo",
"table": "customers",
"change_lsn": "00000027:00000db0:0005",
"commit_lsn": "00000027:00000db0:0007",
"event_serial_no": "1"
},
"op": "d", (4)
"ts_ms": 1559730450205, (5)
"ts_us": 1559730450205387, (5)
"ts_ns": 1559730450205387492 (5)
}
}
| Item | Field name (字段名) | 描述 |
|---|---|---|
1 |
|
一个可选字段,指定事件发生前行的状态。在删除事件值中, |
2 |
|
一个可选字段,指定事件发生后行的状态。在删除事件值中, |
3 |
|
描述事件源元数据的 必需字段。在删除事件值中, 同一表的创建和更新事件相同。许多
|
4 |
|
描述操作类型的 必需字符串。 |
5 |
|
可选字段,显示连接器处理事件的时间。在事件消息信封中,时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。 |
SQL Server 连接器事件旨在与Kafka 日志压缩配合使用。日志压缩允许删除一些较旧的消息,只要保留每个键的至少最新消息。这使 Kafka 可以回收存储空间,同时确保主题包含完整的数据集,并可用于重新加载基于键的状态。
当行被删除时,delete 事件值仍然可以与日志压缩一起使用,因为 Kafka 可以删除具有该相同键的所有先前消息。但是,为了让 Kafka 删除具有该相同键的所有消息,消息值必须为 null。为了实现这一点,在 Debezium 的 SQL Server 连接器发出 delete 事件后,连接器会发出一个特殊的墓碑事件,该事件具有相同的键但值为 null。
事务元数据
Debezium 可以生成表示事务边界的事件,并丰富数据更改事件消息。
|
Debezium 接收事务元数据的限制
Debezium 仅为部署连接器后发生的事务注册和接收元数据。部署连接器之前发生的事务的元数据不可用。 |
数据库事务由 BEGIN 和 END 关键字括起来的语句块表示。Debezium 为每个事务中的 BEGIN 和 END 分隔符生成事务边界事件。事务边界事件包含以下字段:
status-
BEGIN或END。 id-
唯一事务标识符的字符串表示。
ts_ms-
数据源中事务边界事件(
BEGIN或END事件)的时间。如果数据源未向 Debezium 提供事件时间,则该字段将代表 Debezium 处理事件的时间。 event_count(针对END事件)-
事务发出的事件总数。
data_collections(针对END事件)-
一对
data_collection和event_count元素的数组,指示连接器为源自数据集合的更改发出的事件数量。
|
Debezium 无法可靠地确定事务何时结束。因此,事务 |
以下示例显示了一个典型的事务边界消息:
{
"status": "BEGIN",
"id": "00000025:00000d08:0025",
"ts_ms": 1486500577125,
"event_count": null,
"data_collections": null
}
{
"status": "END",
"id": "00000025:00000d08:0025",
"ts_ms": 1486500577691,
"event_count": 2,
"data_collections": [
{
"data_collection": "testDB.dbo.testDB.tablea",
"event_count": 1
},
{
"data_collection": "testDB.dbo.testDB.tableb",
"event_count": 1
}
]
}
除非通过 topic.transaction 选项进行覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。
数据更改事件的丰富
启用事务元数据后,数据消息 Envelope 将会添加一个 transaction 字段。此字段提供有关每个事件的信息,形式为字段的组合:
id-
表示事务标识符的字符串表示
total_order-
事件在事务生成的所有事件中的绝对位置
data_collection_order-
事务中发出所有事件的每个数据集合的位置
以下示例显示了典型消息的外观
{
"before": null,
"after": {
"pk": "2",
"aa": "1"
},
"source": {
...
},
"op": "c",
"ts_ms": "1580390884335",
"ts_us": "1580390884335172",
"ts_ns": "1580390884335172574",
"transaction": {
"id": "00000025:00000d08:0025",
"total_order": "1",
"data_collection_order": "1"
}
}
数据类型映射
Debezium SQL Server 连接器通过生成与行所在的表结构相似的事件来表示表行数据的更改。每个事件都包含字段来表示行的列值。事件表示操作的列值的方式取决于列的 SQL 数据类型。在事件中,连接器将每个 SQL Server 数据类型的字段映射到文字类型和语义类型。
连接器可以将 SQL Server 数据类型映射到文字和语义类型。
- 文字类型
-
描述了如何使用 Kafka Connect 模式类型(即
INT8、INT16、INT32、INT64、FLOAT32、FLOAT64、BOOLEAN、STRING、BYTES、ARRAY、MAP和STRUCT)来表示该值。 - 语义类型
-
描述了 Kafka Connect 模式如何使用字段的 Kafka Connect 模式名称来捕获字段的含义。
如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。
基本类型
下表显示了连接器如何映射基本 SQL Server 数据类型。
| SQL Server 数据类型 | 字面类型 (模式类型) | 语义类型 (模式名称) 和注释 |
|---|---|---|
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
|
|
|
|
其他数据类型映射在以下各节中进行描述。
如果存在,列的默认值会传播到相应字段的 Kafka Connect 模式。更改消息将包含字段的默认值(除非已提供显式列值),因此很少需要从模式中获取默认值。但是,当使用 Avro 作为序列化格式以及 Confluent 模式注册表时,传递默认值有助于满足兼容性规则。
时间值
除了 SQL Server 的 DATETIMEOFFSET 数据类型(包含时区信息)之外,其他时间类型取决于 time.precision.mode 配置属性的值。当 time.precision.mode 配置属性设置为 adaptive(默认值)时,连接器将根据列的数据类型定义确定时间类型和时间戳类型的文字类型和语义类型,以便事件精确表示数据库中的值。
| SQL Server 数据类型 | 字面类型 (模式类型) | 语义类型 (模式名称) 和注释 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
当 time.precision.mode 配置属性设置为 connect 时,连接器将使用预定义的 Kafka Connect 逻辑类型。当消费者仅了解内置的 Kafka Connect 逻辑类型并且无法处理可变精度时间值时,这可能很有用。另一方面,由于 SQL Server 支持十分之一微秒的精度,因此具有 connect 时间精度模式的连接器生成的时间戳事件在数据库列的小数秒精度值大于 3 时将导致精度损失。
| SQL Server 数据类型 | 字面类型 (模式类型) | 语义类型 (模式名称) 和注释 |
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
小数类型
Debezium 连接器根据decimal.handling.mode 连接器配置属性的设置来处理小数。
- decimal.handling.mode=precise
-
表 15. decimal.handling.mode=precise时的映射SQL Server 类型 字面类型 (模式类型) 语义类型(模式名称)。 NUMERIC[(P[,S])]BYTESorg.apache.kafka.connect.data.Decimalscale模式参数包含一个表示小数点移动了多少位的整数。DECIMAL[(P[,S])]BYTESorg.apache.kafka.connect.data.Decimalscale模式参数包含一个表示小数点移动了多少位的整数。SMALLMONEYBYTESorg.apache.kafka.connect.data.Decimalscale模式参数包含一个表示小数点移动了多少位的整数。MONEYBYTESorg.apache.kafka.connect.data.Decimalscale模式参数包含一个表示小数点移动了多少位的整数。 - decimal.handling.mode=double
-
表 16. decimal.handling.mode=double时的映射SQL Server 类型 文字类型 语义类型 NUMERIC[(M[,D])]FLOAT64n/a
DECIMAL[(M[,D])]FLOAT64n/a
SMALLMONEY[(M[,D])]FLOAT64n/a
MONEY[(M[,D])]FLOAT64n/a
- decimal.handling.mode=string
-
表 17. decimal.handling.mode=string时的映射SQL Server 类型 文字类型 语义类型 NUMERIC[(M[,D])]STRINGn/a
DECIMAL[(M[,D])]STRINGn/a
SMALLMONEY[(M[,D])]STRINGn/a
MONEY[(M[,D])]STRINGn/a
设置 SQL Server
为了让 Debezium 能够从 SQL Server 表捕获更改事件,具有必要权限的 SQL Server 管理员必须首先运行查询以在数据库上启用 CDC。然后,管理员必须为 Debezium 要捕获的每个表启用 CDC。
|
默认情况下,到 Microsoft SQL Server 的 JDBC 连接受到 SSL 加密的保护。如果 SQL Server 数据库未启用 SSL,或者您想在不使用 SSL 的情况下连接到数据库,可以通过将连接器配置中的 |
应用 CDC 后,它会捕获已启用 CDC 的表中已提交的所有 INSERT、UPDATE 和 DELETE 操作。然后,Debezium 连接器可以捕获这些事件并将其发送到 Kafka 主题。
在 SQL Server 数据库上启用 CDC
在为表启用 CDC 之前,必须先为 SQL Server 数据库启用 CDC。SQL Server 管理员通过运行系统存储过程来启用 CDC。系统存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 来运行。
-
您是 SQL Server 的sysadmin 固定服务器角色的成员。
-
您是数据库的 db_owner。
-
SQL Server Agent 正在运行。
SQL Server CDC 功能仅处理用户创建的表中的更改。您无法在 SQL Server master 数据库上启用 CDC。 |
-
在 SQL Server Management Studio 中,从视图菜单中,单击模板资源管理器。
-
在模板浏览器中,展开SQL Server 模板。
-
展开更改数据捕获 > 配置,然后单击启用数据库进行 CDC。
-
在模板中,将
USE语句中的数据库名称替换为您要为其启用 CDC 的数据库名称。 -
运行存储过程
sys.sp_cdc_enable_db以启用数据库的 CDC。在数据库启用 CDC 后,将创建一个名为
cdc的模式,以及一个 CDC 用户、元数据表和其他系统对象。以下示例显示了如何为
MyDB数据库启用 CDC示例:为 CDC 模板启用 SQL Server 数据库USE MyDB GO EXEC sys.sp_cdc_enable_db GO
在 SQL Server 表上启用 CDC
SQL Server 管理员必须在 Debezium 要捕获的源表上启用更改数据捕获。数据库必须已启用 CDC。要在表上启用 CDC,SQL Server 管理员需要为该表运行存储过程 sys.sp_cdc_enable_table。可以使用 SQL Server Management Studio 或 Transact-SQL 来运行存储过程。必须为要捕获的每个表启用 SQL Server CDC。
-
CDC 已在 SQL Server 数据库上启用。
-
SQL Server Agent 正在运行。
-
您是数据库的
db_owner固定数据库角色的成员。
-
在 SQL Server Management Studio 中,从视图菜单中,单击模板资源管理器。
-
在模板浏览器中,展开SQL Server 模板。
-
展开更改数据捕获 > 配置,然后单击启用表指定文件组选项。
-
在模板中,将
USE语句中的表名替换为您要捕获的表名。 -
运行存储过程
sys.sp_cdc_enable_table。以下示例显示了如何为
MyTable表启用 CDC示例:为 SQL Server 表启用 CDCUSE MyDB GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', (1) @role_name = N'MyRole', (2) @filegroup_name = N'MyDB_CT',(3) @supports_net_changes = 0 GO1 指定要捕获的表的名称。 2 指定一个角色 MyRole,您可以将用户添加到该角色,以授予他们对源表捕获列的SELECT权限。sysadmin或db_owner角色的用户也可以访问指定的更改表。如果@role_name的值显式设置为NULL,则不使用角色来限制对捕获信息的访问。3 指定 SQL Server 将更改表放置在已捕获表的文件组。指定的文件组必须已存在。最好不要将更改表与用于源表的文件组放在同一位置。
验证用户是否拥有 CDC 表的访问权限
SQL Server 管理员可以运行系统存储过程来查询数据库或表以检索其 CDC 配置信息。可以使用 SQL Server Management Studio 或 Transact-SQL 来运行存储过程。
-
您对捕获实例的所有捕获列都有
SELECT权限。db_owner数据库角色的成员可以查看所有已定义捕获实例的信息。 -
您属于查询中包含的表信息定义的任何限制角色。
-
在 SQL Server Management Studio 中,从视图菜单中,单击对象资源管理器。
-
在对象资源管理器中,展开数据库,然后展开您的数据库对象,例如MyDB。
-
展开可编程性 > 存储过程 > 系统存储过程。
-
运行
sys.sp_cdc_help_change_data_capture存储过程来查询表。查询不应返回空结果。
以下示例在
MyDB数据库上运行sys.sp_cdc_help_change_data_capture存储过程示例:查询表以获取 CDC 配置信息USE MyDB; GO EXEC sys.sp_cdc_help_change_data_capture GO查询返回数据库中启用了 CDC 且调用者有权访问的更改数据的所有表的配置信息。如果结果为空,请验证用户是否具有访问捕获实例和 CDC 表的权限。
Azure 上的 SQL Server
Debezium SQL Server 连接器可与 Azure 上的 SQL Server 一起使用。请参阅此示例,了解如何配置 Azure 上的 SQL Server 的 CDC 并与 Debezium 结合使用。
SQL Server Always On 副本
SQL Server 连接器可以从 Always On 只读副本捕获更改。
-
更改数据捕获已在主节点上配置并启用。SQL Server 不直接支持副本上的 CDC。
-
配置选项
driver.applicationIntent设置为ReadOnly。SQL Server 需要此设置。当 Debezium 检测到此配置选项时,它将通过执行以下操作进行响应:-
将
snapshot.isolation.mode设置为snapshot,这是只读副本支持的唯一事务隔离模式。 -
在流式查询循环的每次执行中提交(只读)事务,这对于获取 CDC 数据的最新视图是必需的。
-
SQL Server 捕获作业代理配置对服务器负载和延迟的影响
当数据库管理员为源表启用更改数据捕获时,捕获作业代理将开始运行。该代理从事务日志中读取新的更改事件记录,并将事件记录复制到更改数据表中。在更改在源表中提交的时间与更改出现在相应更改表的时间之间,总会有一个小的延迟间隔。此延迟间隔表示更改在源表中发生与其可供 Debezium 流式传输到 Apache Kafka 之间的时间差距。
理想情况下,对于必须快速响应数据更改的应用程序,您希望保持源表和更改表之间的紧密同步。您可能会认为,让捕获代理以尽可能快的速度持续处理更改事件,可能会提高吞吐量并降低延迟——尽可能快地在新事件记录填充更改表,几乎实时。然而,这并非总是如此。为了追求更即时的同步,会付出性能代价。捕获作业代理每次查询数据库以获取新事件记录时,都会增加数据库主机上的 CPU 负载。服务器上的额外负载可能会对整体数据库性能产生负面影响,并可能降低事务效率,尤其是在数据库使用高峰期。
监控数据库指标非常重要,以便您知道数据库是否已达到服务器无法再支持捕获代理活动水平的程度。如果您注意到性能问题,可以调整 SQL Server 捕获代理设置,以在整体数据库主机的 CPU 负载与可容忍的延迟度之间取得平衡。
SQL Server 捕获作业代理配置参数
在 SQL Server 上,控制捕获作业代理行为的参数定义在 SQL Server 表 msdb.dbo.cdc_jobs 中。如果您在运行捕获作业代理时遇到性能问题,请通过运行sys.sp_cdc_change_job 存储过程并提供新值来调整捕获作业设置以减少 CPU 负载。
|
有关如何配置 SQL Server 捕获作业代理参数的具体指导超出了本文档的范围。 |
以下参数对于修改捕获代理行为以与 Debezium SQL Server 连接器配合使用最为重要:
pollinginterval-
-
指定捕获代理在日志扫描周期之间等待的秒数。
-
较高的值会降低数据库主机的负载并增加延迟。
-
值为
0表示扫描之间没有等待。 -
默认值为
5。
-
maxtrans-
-
指定在每个日志扫描周期中处理的最大事务数。在捕获作业处理完指定数量的事务后,它会根据
pollinginterval指定的时间暂停,然后再开始下一次扫描。 -
较低的值会降低数据库主机的负载并增加延迟。
-
默认值为
500。
-
maxscans-
-
指定捕获作业在捕获数据库事务日志的完整内容时可以尝试的扫描周期的数量限制。如果
continuous参数设置为1,则作业会根据pollinginterval指定的时间暂停,然后再恢复扫描。 -
较低的值会降低数据库主机的负载并增加延迟。
-
默认值为
10。
-
-
有关捕获代理参数的更多信息,请参阅 SQL Server 文档。
部署
要部署 Debezium SQL Server 连接器,请安装 Debezium SQL Server 连接器存档,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
-
已安装 Apache Kafka 和 Kafka Connect。
-
SQL Server 已安装,已配置为 CDC,并已准备好与 Debezium 连接器一起使用。
-
下载 Debezium SQL Server 连接器插件存档
-
将文件解压到您的 Kafka Connect 环境。
-
将包含 JAR 文件的目录添加到Kafka Connect 的
plugin.path。 -
重新启动您的 Kafka Connect 进程以加载新 JAR 文件。
如果您使用的是不可变容器,请参阅Debezium 的容器映像,用于 Apache Kafka 和 Kafka Connect。您可以从 Docker Hub 拉取 Microsoft SQL Server on Linux 的官方容器映像。
|
从 |
SQL Server 连接器配置示例
以下是一个连接器实例的配置示例,该实例捕获位于 192.168.99.100 上端口 1433 的 SQL Server 服务器的数据,我们将其逻辑命名为 fullfillment。通常,您通过设置连接器可用的配置属性来在 JSON 文件中配置 Debezium SQL Server 连接器。
您可以选择为数据库中的模式和表的子集生成事件。可选地,您可以忽略、掩盖或截断包含敏感数据、大于指定大小或您不需要的列。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "1433", (4)
"database.user": "sa", (5)
"database.password": "Password!", (6)
"database.names": "testDB1,testDB2", (7)
"topic.prefix": "fullfillment", (8)
"table.include.list": "dbo.customers", (9)
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092", (10)
"schema.history.internal.kafka.topic": "schemahistory.fullfillment", (11)
"database.ssl.truststore": "path/to/trust-store", (12)
"database.ssl.truststore.password": "password-for-trust-store" (13)
}
}
| 1 | 我们在将其注册到 Kafka Connect 服务时使用的连接器名称。 |
| 2 | 此 SQL Server 连接器的 Java 类名。 |
| 3 | SQL Server 数据库服务器的地址。 |
| 4 | SQL Server 数据库服务器的端口号。 |
| 5 | 连接到 SQL Server 数据库服务器的用户名 |
| 6 | SQL Server 用户密码 |
| 7 | 要从中捕获更改的数据库名称。 |
| 8 | SQL Server 实例/群集的用于命名空间的连接器前缀,该前缀用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 模式名称以及使用Avro 转换器时相应 Avro 模式的命名空间。 |
| 9 | Debezium 要捕获其更改的所有表的列表。 |
| 10 | 此连接器将用于写入和恢复 DDL 语句到数据库模式历史记录主题的 Kafka Broker 的列表。 |
| 11 | 连接器将写入和恢复 DDL 语句的数据库模式历史记录主题的名称。此主题仅供内部使用,消费者不应使用。 |
| 12 | 存储服务器签名证书的 SSL 信任库的路径。除非禁用数据库加密(database.encrypt=false),否则需要此属性。 |
| 13 | SSL 信任库密码。除非禁用数据库加密(database.encrypt=false),否则需要此属性。 |
有关可以为 Debezium SQL Server 连接器设置的配置属性的完整列表,请参阅SQL Server 连接器属性。
您可以通过 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务将记录配置并启动执行以下任务的单个连接器任务:
-
连接到 SQL Server 数据库。
-
读取事务日志。
-
将更改事件记录到 Kafka 主题。
添加连接器配置
要开始运行 Debezium SQL Server 连接器,请创建连接器配置,并将该配置添加到您的 Kafka Connect 群集。
-
Debezium SQL Server 连接器已安装。
-
为 SQL Server 连接器创建配置。
-
使用Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 群集。
当连接器启动时,它将对连接器配置的 SQL Server 数据库执行一致的快照。然后,连接器将开始为行级操作生成数据更改事件,并将更改事件记录流式传输到 Kafka 主题。
连接器属性
Debezium SQL Server 连接器有许多配置属性,您可以使用它们来实现适合您应用程序的正确连接器行为。许多属性都有默认值。
属性信息组织如下:
-
数据库模式历史记录连接器配置属性,用于控制 Debezium 如何处理从数据库模式历史记录主题读取的事件。
必需的 Debezium SQL Server 连接器配置属性
以下配置属性是必需的,除非有默认值可用。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
无默认值 |
连接器的唯一名称。尝试使用相同的名称再次注册将失败。(此属性是所有 Kafka Connect 连接器必需的。) |
|||
无默认值 |
连接器的 Java 类名。对于 SQL Server 连接器,始终使用 |
|||
|
指定连接器可用于从数据库实例捕获数据的最大任务数。如果 |
|||
无默认值 |
SQL Server 数据库服务器的 IP 地址或主机名。 |
|||
|
SQL Server 数据库服务器的整数端口号。如果同时指定了 |
|||
无默认值 |
连接到 SQL Server 数据库服务器时使用的用户名。使用 Kerberos 身份验证时可以省略,Kerberos 身份验证可以使用直通属性进行配置。 |
|||
无默认值 |
连接到 SQL Server 数据库服务器时使用的密码。 |
|||
无默认值 |
指定 SQL Server 命名实例的实例名称。如果同时指定了 |
|||
无默认值 |
要从中流式传输更改的 SQL Server 数据库名称的逗号分隔列表。 |
|||
无默认值 |
提供要 Debezium 捕获的 SQL Server 数据库服务器命名空间的连接器前缀。此前缀在所有其他连接器中都应该是唯一的,因为它用作接收此连接器记录的所有 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、点和下划线。
|
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,用于匹配您希望捕获其更改的模式的名称。未包含在 要匹配模式名称,Debezium 会将您指定的正则表达式应用于整个模式名称字符串;它不会匹配可能存在于模式名称中的子字符串。 |
|||
无默认值 |
一个可选的、逗号分隔的正则表达式列表,匹配您不想捕获更改的模式名称。名称未包含在 要匹配模式名称,Debezium 会将您指定的正则表达式应用于整个模式名称字符串;它不会匹配可能存在于模式名称中的子字符串。 |
|||
无默认值 |
一个可选的逗号分隔的正则表达式列表,用于匹配您希望 Debezium 捕获的表的完全限定表标识符。默认情况下,连接器捕获指定模式的所有非系统表。当设置此属性时,连接器仅从指定的表中捕获更改。每个标识符的格式为 schemaName.tableName。 为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。 |
|||
无默认值 |
一个可选的逗号分隔的正则表达式列表,用于匹配您要排除捕获的表的完全限定表标识符。Debezium 捕获所有未包含在 为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。 |
|||
空字符串 |
一个可选的逗号分隔的正则表达式列表,用于匹配应包含在更改事件消息值中的列的完全限定名称。列的完全限定名称格式为 schemaName.tableName.columnName。
为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。 |
|||
空字符串 |
一个可选的逗号分隔的正则表达式列表,用于匹配应从更改事件消息值中排除的列的完全限定名称。列的完全限定名称格式为 schemaName.tableName.columnName。请注意,主键列始终包含在事件的键中,即使它们已从值中排除。 为了匹配列名,Debezium 将您指定的正则表达式作为锚定正则表达式匹配。也就是说,指定的表达式与列的整个名称字符串进行匹配;它不匹配可能存在于列名中的子字符串。 |
|||
|
指定当包含的列没有更改时是否跳过发布消息。这实际上会过滤掉没有包含列更改的消息(根据 |
|||
n/a |
一个可选的、逗号分隔的正则表达式列表,用于匹配基于字符的列的完全限定名称。列的完全限定名称格式为 `<schemaName>.<tableName>.<columnName>。 假名由应用指定的hashAlgorithm 和salt 产生的哈希值组成。根据使用的哈希函数,可以维护引用完整性,同时用假名替换列值。支持的哈希函数在 Java Cryptography Architecture 标准算法名称文档的MessageDigest 部分中进行了描述。 column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName 如果需要,假名会自动截断为列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。 |
|||
|
时间、日期和时间戳可以表示为不同精度的值,包括: |
|||
|
指定连接器如何处理 |
|||
|
一个布尔值,指定连接器是否将数据库模式中的更改发布到与主题前缀同名的 Kafka 主题。连接器将每个模式更改记录下来,键包含数据库名称,值为描述模式更新的 JSON 结构。此记录模式更改的机制独立于连接器对数据库模式历史记录更改的内部记录。 |
|||
|
控制是否在*delete* 事件后跟一个墓碑事件。 |
|||
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望在列数据超过属性名称中由 length 指定的字符数时截断这些列中的数据,请设置此属性。将 完全限定的列名遵循以下格式: 您可以在单个配置中指定多个具有不同长度的属性。 |
|||
n/a 列的完全限定名称格式为 schemaName.tableName.columnName。 |
一个可选的、逗号分隔的正则表达式列表,匹配基于字符的列的完全限定名称。如果您希望连接器掩盖一组列的值,请设置此属性(例如,如果它们包含敏感数据)。将 «列» 的 «完全限定» 名称遵循以下格式:schemaName.tableName.columnName。为了匹配 «列» 的名称,Debezium 将您指定的 «正则表达式» 应用为*锚定的* «正则表达式»。也就是说,指定的表达式会针对 «列» 的整个名称字符串进行匹配;该 «表达式» «不会» 匹配可能存在于 «列» 名称中的子字符串。 您可以在单个配置中指定多个具有不同长度的属性。 |
|||
n/a |
一个可选的、逗号分隔的正则表达式列表,匹配您希望连接器为其发出表示列元数据的额外参数的列的完全限定名称。设置此属性时,连接器会将以下字段添加到事件记录的模式中:
这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。 «列» 的 «完全限定» 名称遵循以下格式:schemaName.tableName.columnName。 |
|||
n/a |
一个可选的、逗号分隔的 «正则表达式» 列表,用于指定数据库中 «列» 定义的 «数据类型» 的«完全限定»名称。设置此属性时,对于具有匹配 «数据类型» 的 «列»,连接器会发出包含以下额外字段的 «模式» 的事件记录:
这些参数分别传播列的原始类型名称和长度(对于可变宽度类型)。 列的完全限定名称遵循以下格式:schemaName.tableName.typeName。 有关 SQL Server 特定数据类型名称的列表,请参阅SQL Server 数据类型映射。 |
|||
n/a |
«表达式» 列表,用于指定连接器用来为 «发布» 到指定表 «Kafka» «主题» 的更改事件 «记录» «形成» «自定义» «消息» «键» 的 «列»。 默认情况下,Debezium 使用表的 «主键» «列» 作为它发出的 «记录» 的 «消息» «键»。 «代替» 默认值,或为 «缺少» «主键» 的表指定 «键»,您可以根据一个或多个 «列» «配置» «自定义» «消息» «键»。 每个完全限定的表名都是一个正则表达式,格式如下: 用于创建自定义消息键的列数没有限制。但是,最好使用唯一键所需的最小数量。 |
|||
bytes |
指定二进制( |
|||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多信息,请参阅 Avro 命名。 |
高级 SQL Server 连接器配置属性
以下高级配置属性具有可以满足大多数情况的良好默认值,因此很少需要在连接器的配置中指定。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
无默认值 |
枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:
您必须设置 对于为连接器配置的每个转换器,您还必须添加一个
For example, (例如,) isbn.type: io.debezium.test.IsbnConverter 如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如: isbn.schema.name: io.debezium.sqlserver.type.Isbn |
|||
initial (初始) |
一种用于对捕获表进行结构快照(可选包含数据)的模式。快照完成后,连接器将继续从数据库的重做日志中读取更改事件。支持以下值:
有关更多信息,请参阅自定义快照程序 SPI。 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
false |
如果 |
|||
无默认值 |
如果 |
|||
exclusive |
控制连接器持有一个表锁的时间长短。表锁可防止连接器在执行快照时发生某些类型的表更改操作。您可以设置以下值:
|
|||
无默认值 |
当 |
|||
|
指定连接器在执行快照时如何查询数据。
与使用 |
|||
无默认值 |
当 |
|||
|
一个可选的、逗号分隔的正则表达式列表,用于匹配要在快照中包含的表的完全限定名称( 为了匹配表名,Debezium 会将您指定的正则表达式应用为锚定正则表达式。也就是说,指定的表达式会与表的整个名称字符串进行匹配;它不会匹配表中可能存在的子字符串。 |
|||
repeatable_read |
控制使用哪种事务隔离级别以及连接器锁定要捕获的表的时间。支持以下值:
模式选择还会影响数据一致性。只有 |
|||
|
指定连接器在处理事件期间如何响应异常。 |
|||
|
一个正整数值,指定连接器在检查数据库是否有新更改事件之前等待的毫秒数。 为防止此设置延迟心跳的发出,请将其设置为小于或等于 |
|||
|
一个正整数值,指定阻塞队列可以容纳的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列,然后再写入 Kafka。在连接器摄取消息的速度快于其写入 Kafka 的速度,或者 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。队列中保存的事件在连接器定期记录偏移量时会被忽略。始终将 |
|||
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|||
|
正整数值,指定在此连接器的每次迭代中处理的事件批次的最大大小。 |
|||
|
以毫秒为单位指定一个间隔,该间隔决定连接器向 Kafka 心跳主题发送消息的频率,无论数据库是否发生更改。
|
|||
无默认值 |
指定连接器在发送心跳消息时在源数据库上执行的查询。 |
|||
无默认值 |
连接器在启动后等待多长时间(以毫秒为单位)才进行快照。 |
|||
0 |
指定连接器在完成快照后延迟开始流式传输过程的时间(以毫秒为单位)。设置延迟间隔有助于防止连接器在快照完成后但流式传输过程开始之前立即发生故障时重新启动快照。设置一个高于 Kafka Connect 工作节点设置的 |
|||
|
指定在进行快照时每次从每个表中读取的最大行数。连接器将以该大小的多个批次读取表内容。默认为 2000。 |
|||
无默认值 |
指定对给定查询的每次数据库往返将获取的行数。默认为 JDBC 驱动程序的默认获取大小。 |
|||
|
一个整数值,指定在执行快照时获取表锁的最大等待时间(以毫秒为单位)。如果在此时间间隔内无法获取表锁,则快照将失败(另请参阅 快照)。 |
|||
无默认值 |
指定要在快照中包含的表行。如果您希望快照仅包含表中一部分行,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。 此属性包含一个逗号分隔的完全限定表名列表,格式为 从包含软删除列 "snapshot.select.statement.overrides": "customer.orders", "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM customers.orders WHERE delete_flag = 0 ORDER BY id DESC" 在生成的快照中,连接器仅包含 |
|||
v2 |
CDC 事件中 |
|||
|
设置为 |
|||
10000 (10 秒) |
发生可重试错误后等待多少毫秒才能重新启动连接器。 |
|||
|
一个逗号分隔的操作类型列表,您希望连接器在流式传输期间跳过这些操作。您可以配置连接器以跳过以下类型的操作:
如果不希望连接器跳过任何操作,请将值设置为 |
|||
无默认值 |
用于将 信号 发送给连接器的数据集合的完全限定名称。 |
|||
source (源) |
为连接器启用的信号通道名称列表。默认情况下,以下通道可用:
|
|||
无默认值 |
为连接器启用的通知通道名称列表。默认情况下,以下通道可用:
|
|||
|
允许在增量快照期间进行架构更改。启用后,连接器将在增量快照期间检测到架构更改并重新选择当前块,以避免锁定 DDL。 |
|||
|
在增量快照块期间,连接器获取并加载到内存中的最大行数。增加块大小可以提高效率,因为快照运行的快照查询次数较少但规模更大。但是,较大的块大小也需要更多内存来缓冲快照数据。将块大小调整为能为您的环境提供最佳性能的值。 |
|||
|
指定连接器在增量快照期间使用的水印机制,用于对可能被增量快照捕获,然后在流式传输恢复后重新捕获的事件进行去重。
|
|||
500 |
指定用于在从数据库中的多个表流式传输更改时减少内存占用的每次迭代的最大事务数。设置为 |
|||
|
对增量快照期间使用的所有 SELECT 语句使用 OPTION(RECOMPILE) 查询选项。这有助于解决可能发生的参数嗅探问题,但根据查询执行的频率,可能会增加源数据库的 CPU 负载。 |
|||
|
用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 |
|||
|
指定主题名称的分隔符,默认为 |
|||
|
用于保存有界并发哈希映射中主题名称的大小。此缓存将有助于确定与给定数据集合对应的主题名称。 |
|||
|
控制连接器向心跳消息发送的心跳消息的主题名称。主题名称的模式如下: |
|||
|
控制连接器发送事务元数据消息的主题的名称。主题名称的模式为: 有关更多信息,请参阅 事务元数据。 |
|||
|
指定连接器在执行初始快照时使用的线程数。要启用并行初始快照,请将属性设置为大于 1 的值。在并行初始快照中,连接器会并发处理多个表。
|
|||
|
定义用于通过添加提供上下文信息的元数据来定制 MBean 对象名称的标签。指定键值对的逗号分隔列表。每个键代表 MBean 对象名称的标签,对应的值代表该键的值,例如: 连接器会将指定的标签附加到基本 MBean 对象名称。标签可以帮助您组织和分类指标数据。您可以定义标签来识别特定的应用程序实例、环境、区域、版本等。有关更多信息,请参阅 定制 MBean 名称。 |
|||
|
指定连接器在操作导致可重试错误(例如连接错误)后如何响应。
|
|||
|
控制连接器如何查询 CDC 数据。支持以下模式:
|
|||
|
指定连接器等待查询完成的时间(以毫秒为单位)。将值设置为 |
|||
|
指定在流式传输期间每次从每个表中读取的最大行数。连接器将以该大小的多个批次读取表内容。默认为 |
|||
|
指定连接器可以捕获的最大表数。超过此限制将触发 |
|||
|
指定如果连接器捕获的表数超过
|
|||
true |
此属性指定 Debezium 是否将带有 这些头是 OpenLineage 集成所必需的,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。 该属性添加了以下头:
|
Debezium SQL Server 连接器数据库架构历史配置属性
Debezium 提供了一组 schema.history.internal.* 属性来控制连接器如何与模式历史记录主题进行交互。
下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
连接器存储数据库模式历史的 Kafka 主题的完整名称。 |
|
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索连接器先前存储的数据库模式历史,以及写入从源数据库读取的每个 DDL 语句。每个对都应指向 Kafka Connect 进程使用的相同 Kafka 集群。 |
|
|
一个整数值,指定连接器在启动/恢复期间轮询持久数据时应等待的最大毫秒数。默认为 100 毫秒。 |
|
|
一个整数值,指定连接器在使用 Kafka 管理客户端获取集群信息时应等待的最大毫秒数。 |
|
|
一个整数值,指定连接器在使用 Kafka 管理客户端创建 Kafka 历史主题时应等待的最大毫秒数。 |
|
|
连接器在连接器恢复因错误而失败之前尝试读取持久历史数据的最大次数。在收到无数据后的最长等待时间为 |
|
|
一个布尔值,指定连接器是应忽略格式错误或未知的数据库语句,还是停止处理以便人工修复问题。安全默认值为 |
|
|
一个布尔值,指定连接器是记录模式或数据库中所有表的模式结构,还是仅记录指定用于捕获的表的模式结构。
|
|
|
一个布尔值,指定连接器是记录实例中所有逻辑数据库的模式结构。
|
SQL Server 连接器直通配置属性
该连接器支持直通式属性,使 Debezium 能够为微调 Apache Kafka 生产者和消费者行为指定自定义配置选项。有关 Kafka 生产者和消费者配置属性的完整范围,请参阅 Kafka 文档。
用于配置生产者和消费者客户端如何与架构历史主题交互的直通属性
Debezium 依赖 Apache Kafka 生产者将模式更改写入数据库模式历史记录主题。同样,它依赖 Kafka 消费者在连接器启动时从数据库模式历史记录主题读取。您可以通过为一组以 schema.history.internal.producer.* 和 schema.history.internal.consumer.* 前缀开头的传递配置属性赋值来定义 Kafka 生产者和消费者客户端的配置。传递的生产者和消费者数据库模式历史记录属性控制一系列行为,例如这些客户端如何与 Kafka 代理建立安全连接,如下例所示:
schema.history.internal.producer.security.protocol=SSL
schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.producer.ssl.keystore.password=test1234
schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.producer.ssl.truststore.password=test1234
schema.history.internal.producer.ssl.key.password=test1234
schema.history.internal.consumer.security.protocol=SSL
schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
schema.history.internal.consumer.ssl.keystore.password=test1234
schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
schema.history.internal.consumer.ssl.truststore.password=test1234
schema.history.internal.consumer.ssl.key.password=test1234
Debezium 会从属性名称中剥离前缀,然后再将属性传递给 Kafka 客户端。
有关 Kafka 生产者配置属性 和 Kafka 消费者配置属性 的更多信息,请参阅 Apache Kafka 文档。
用于配置 SQL Server 连接器如何与 Kafka 信号主题交互的直通属性
Debezium 提供了一组 signal.* 属性来控制连接器如何与 Kafka 信号主题交互。
下表描述了 Kafka signal 属性。
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
<topic.prefix>-signal |
连接器监视的用于临时信号的 Kafka 主题的名称。
|
|||
kafka-signal |
Kafka 消费者使用的组 ID 的名称。 |
|||
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机和端口对列表。每个对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。 |
|||
|
一个整数值,指定连接器在轮询信号时等待的最大毫秒数。 |
用于配置信号通道的 Kafka 消费者客户端的直通属性
Debezium 为信号 Kafka 消费者的传递配置提供了支持。传递信号属性以 signal.consumer.* 前缀开头。例如,连接器会将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 消费者。
Debezium 在将前缀从属性名称中剥离,然后再将属性传递给 Kafka 信号消费者。
用于配置 SQL Server 连接器接收器通知通道的直通属性
下表描述了可用于配置 Debezium sink notification 通道的属性。
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
接收来自 Debezium 的通知的主题名称。当您将 |
数据库架构演变
当启用 SQL Server 表的变更数据捕获 (CDC) 时,随着表中发生更改,事件记录会持久化到服务器上的捕获表中。如果您更改源表更改的结构,例如,通过添加新列,该更改不会动态反映在捕获表中。只要捕获表继续使用过时的架构,Debezium 连接器就无法正确地为该表发出数据更改事件。您必须进行干预以刷新捕获表,以便连接器能够恢复处理更改事件。
由于 SQL Server 中 CDC 的实现方式,您无法使用 Debezium 更新捕获表。要刷新捕获表,必须是具有提升权限的 SQL Server 数据库操作员。作为 Debezium 用户,您必须与 SQL Server 数据库操作员协调任务,以完成架构刷新并恢复流式传输到 Kafka 主题。
您可以使用以下方法之一在架构更改后更新捕获表:
使用每种类型的方法都有优点和缺点。
|
无论您使用联机还是脱机更新方法,都必须在对同一源表应用后续架构更新之前完成整个架构更新过程。最佳做法是分批执行所有 DDL,以便只能运行一次该过程。 |
|
在已启用 CDC 的源表上不支持某些架构更改。例如,如果 CDC 已在表上启用,SQL Server 不允许您在重命名了表中的列或更改了列类型的情况下更改表的架构。 |
|
在将源表中的列从 |
|
在您使用 |
脱机架构更新
脱机架构更新提供了更新捕获表的最安全的方法。但是,对于需要高可用性的应用程序,脱机更新可能不可行。
-
已提交对已启用 CDC 的 SQL Server 表架构的更新。
-
您是一名具有提升权限的 SQL Server 数据库操作员。
-
暂停更新数据库的应用程序。
-
等待 Debezium 连接器流式传输所有未流式传输的更改事件记录。
-
停止 Debezium 连接器。
-
将所有更改应用于源表的模式。
-
使用
sys.sp_cdc_enable_table过程创建一个用于更新源表的新捕获表,并为@capture_instance参数提供一个唯一值。 -
恢复您在步骤 1 中暂停的应用程序。
-
启动 Debezium 连接器。
-
在 Debezium 连接器开始从新捕获表流式传输后,通过运行存储过程
sys.sp_cdc_disable_table并将@capture_instance参数设置为旧的捕获实例名称来删除旧的捕获表。连接器在完成从旧捕获实例读取后会发出通知。有关更多信息,请参阅 通知。
联机架构更新
完成联机架构更新的过程比运行脱机架构更新的过程更简单,并且您可以在不中断应用程序和数据处理的情况下完成它。但是,在联机架构更新中,在源数据库中更新架构之后,但在创建新的捕获实例之前,可能会发生处理间隙。在此期间,更改事件会继续被更改表的旧实例捕获,并且保存到旧表中的更改数据会保留早期架构的结构。因此,例如,如果您向源表添加了一个新列,那么在新的捕获表就绪之前生成的更改事件不包含新列的字段。如果您的应用程序无法容忍这种过渡期,最好使用脱机架构更新过程。
-
已提交对已启用 CDC 的 SQL Server 表架构的更新。
-
您是一名具有提升权限的 SQL Server 数据库操作员。
-
将所有更改应用于源表的模式。
-
通过运行
sys.sp_cdc_enable_table存储过程为更新源表创建一个新的捕获表,并为@capture_instance参数提供一个唯一值。 -
在 Debezium 开始从新的捕获表流式传输后,您可以通过运行
sys.sp_cdc_disable_table存储过程(将@capture_instance参数设置为旧的捕获实例名称)来删除旧的捕获表。连接器在完成从旧捕获实例读取后会发出通知。有关更多信息,请参阅 通知。
我们将部署基于 SQL Server 的 Debezium 教程 来演示联机架构更新。
在下面的示例中,将列 phone_number 添加到 customers 表。
-
键入以下命令以启动数据库 shell:
docker-compose -f docker-compose-sqlserver.yaml exec sqlserver bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -d testDB'
-
通过运行以下查询来修改
customers源表的架构,以添加phone_number字段:ALTER TABLE customers ADD phone_number VARCHAR(32); -
通过运行
sys.sp_cdc_enable_table存储过程创建新的捕获实例。EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2'; GO -
通过运行以下查询将新数据插入
customers表:INSERT INTO customers(first_name,last_name,email,phone_number) VALUES ('John','Doe','john.doe@example.com', '+1-555-123456'); GOKafka Connect 日志通过类似以下的日志条目报告配置更新:
connect_1 | 2019-01-17 10:11:14,924 INFO || Multiple capture instances present for the same table: Capture instance "dbo_customers" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_CT, startLsn=00000024:00000d98:0036, changeTableObjectId=1525580473, stopLsn=00000025:00000ef8:0048] and Capture instance "dbo_customers_v2" [sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] connect_1 | 2019-01-17 10:11:14,924 INFO || Schema will be changed for ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource] ... connect_1 | 2019-01-17 10:11:33,719 INFO || Migrating schema to ChangeTable [captureInstance=dbo_customers_v2, sourceTableId=testDB.dbo.customers, changeTableId=testDB.cdc.dbo_customers_v2_CT, startLsn=00000025:00000ef8:0048, changeTableObjectId=1749581271, stopLsn=NULL] [io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource]最终,
phone_number字段将添加到架构中,并且其值将显示在写入 Kafka 主题的消息中。... { "type": "string", "optional": true, "field": "phone_number" } ... "after": { "id": 1005, "first_name": "John", "last_name": "Doe", "email": "john.doe@example.com", "phone_number": "+1-555-123456" }, -
通过运行
sys.sp_cdc_disable_table存储过程来删除旧的捕获实例。EXEC sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'dbo_customers', @capture_instance = 'dbo_customers'; GO
通知
如果您为连接器启用了通知,在进行架构更改后,连接器会发出通知来报告更改的状态。有关通知以及如何启用它们的更多信息,请参阅 Debezium 通知文档。
以下示例显示了 Debezium 在架构演变更改后可能发出的状态通知。
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Capture Instance",
"type":"COMPLETED",
"additional_data":{
"connector_name":"my-connector",
"capture_instance":"dbo_customers",
"server":"server1",
"database":"testDB",
"start_lsn":"00000027:00000758:0003",
"stop_lsn":"00000028:00000759:0004",
"commit_lsn":"00000029:00000760:0005"
},
"timestamp": "1695817046353"
}
监控
Debezium SQL Server 连接器除了支持 Kafka 和 Kafka Connect 提供的内置 JMX 指标外,还提供三种类型的指标。该连接器提供以下指标:
有关如何通过 JMX 公开上述指标的信息,请参阅 Debezium 监控文档。
自定义 MBean 名称
Debezium 连接器通过连接器的 MBean 名称公开指标。这些指标特定于每个连接器实例,提供有关连接器快照、流式传输和模式历史记录进程行为的数据。
默认情况下,当您部署正确配置的连接器时,Debezium 会为不同的连接器指标生成唯一的 MBean 名称。要查看连接器进程的指标,请将您的可观察性堆栈配置为监视其 MBean。但这些默认 MBean 名称取决于连接器配置;配置更改可能导致 MBean 名称发生更改。在这种情况下,更改 MBean 名称会破坏连接器实例和 MBean 之间的链接,从而中断监控活动。您必须重新配置可观察性堆栈以使用新的 MBean 名称才能恢复监控。
为了防止因 MBean 名称更改而导致的监控中断,您可以配置自定义指标标签。通过将 custom.metric.tags 属性添加到连接器配置中来配置自定义指标。该属性接受键值对,其中每个键代表 MBean 对象名称的标签,相应的值代表该标签的值。例如:k1=v1,k2=v2。Debezium 将指定的标签附加到连接器的 MBean 名称。
配置连接器的 custom.metric.tags 属性后,您可以配置可观察性堆栈以检索与指定标签关联的指标。然后,可观察性堆栈使用指定的标签而不是可变的 MBean 名称来唯一标识连接器。之后,如果 Debezium 重新定义了其如何构造 MBean 名称,或者连接器配置中的 topic.prefix 发生更改,指标收集将不会中断,因为指标抓取任务使用指定的标签模式来标识连接器。
使用自定义标签的另一个好处是,您可以使用反映数据管道架构的标签,以便以适合您运营需求的方式组织指标。例如,您可以指定带有值的标签来声明连接器活动的类型、应用程序上下文或数据源,例如 db1-streaming-for-application-abc。如果您指定多个键值对,所有指定的对都将附加到连接器的 MBean 名称。
以下示例说明了标签如何修改默认 MBean 名称。
默认情况下,SQL Server 连接器使用以下 MBean 名称来表示流式传输指标:
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>
如果您将 custom.metric.tags 的值设置为 database=salesdb-streaming,table=inventory,Debezium 将生成以下自定义 MBean 名称:
debezium.sqlserver:type=connector-metrics,context=streaming,server=<topic.prefix>,database=salesdb-streaming,table=inventory
快照指标
MBean 为 debezium.sql_server:type=connector-metrics,server=<topic.prefix>,task=<task.id>,context=snapshot。
除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。
下表列出了可用的快照指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个快照事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在快照操作期间识别为错误的更改事件的数量。每次连接器在初始、增量或临时快照期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。如果快照被中断,并且连接器任务重新启动,则指标计数将重置为 |
|
|
自上次启动或重置以来,此连接器已看到的事件总数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在快照程序和主 Kafka Connect 循环之间传递事件的队列的可用容量。 |
|
|
包含在快照中的表总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否已启动。 |
|
|
快照是否已暂停。 |
|
|
快照是否被中止。 |
|
|
快照是否已完成。 |
|
|
快照是否被跳过。 |
|
|
快照到目前为止所花费的总秒数,即使未完成。也包括快照暂停的时间。 |
|
|
快照暂停的总秒数。如果快照被暂停了多次,则暂停时间将累加。 |
|
|
包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。 |
|
|
队列的最大缓冲区(字节)。如果 |
|
|
队列中记录的当前字节卷。 |
当执行增量快照时,连接器还提供以下附加快照指标:
| Attributes | Type | 描述 |
|---|---|---|
|
当前快照块的标识符。 |
|
|
定义当前块的主键集的下限。 |
|
|
定义当前块的主键集的上限。 |
|
|
当前快照表的组成键集(主键)的下限。 |
|
|
当前快照表的组成键集(主键)的上限。 |
流式传输指标
MBean 为 debezium.sql_server:type=connector-metrics,server=<topic.prefix>,task=<task.id>,context=streaming。
下表列出了可用的流式传输指标。
| Attributes | Type | 描述 |
|---|---|---|
|
连接器已读取的最后一个流式传输事件。 |
|
|
自连接器读取和处理最近事件以来经过的毫秒数。 |
|
|
记录连接器在流式传输期间识别为错误的更改事件的数量。每次连接器在流式传输会话期间遇到无法处理的事件时,都会递增此指标。事件可能因格式错误、与模式不兼容或在转换期间遇到失败而无法处理。指标值在连接器任务的整个生命周期内保持不变。连接器重启后,指标计数将重置为 |
|
|
自上次连接器启动或重置以来,源数据库报告的总数据更改事件数。代表 Debezium 需要处理的数据更改工作负载。 |
|
|
自上次启动或重置指标以来,连接器处理的总创建事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总更新事件数。 |
|
|
自上次启动或重置指标以来,连接器处理的总删除事件数。 |
|
|
已被连接器配置的包含/排除列表过滤规则过滤的事件数量。 |
|
|
由连接器捕获的表列表。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的长度。 |
|
|
用于在流式传输器和主 Kafka Connect 循环之间传递事件的队列的剩余容量。 |
|
|
表示连接器当前是否连接到数据库服务器的标志。 |
|
|
上次更改事件的时间戳与连接器处理它之间的时间差(以毫秒为单位)。这些值将包含数据库服务器和连接器运行所在机器之间时钟的任何差异。 |
|
|
已提交的处理过的事务数。 |
|
|
收到的最后一个事件的坐标。 |
|
|
已处理的最后一个事务的事务标识符。 |
|
|
队列的最大缓冲区(字节)。如果 |
|
|
队列中记录的当前字节卷。 |
架构历史指标
MBean 为 debezium.sql_server:type=connector-metrics,context=schema-history,server=<topic.prefix>,task=<task.id>。
下表列出了可用的模式历史记录指标。
| Attributes | Type | 描述 |
|---|---|---|
|
|
|
|
恢复开始的时间(以 epoch 秒为单位)。 |
|
|
恢复阶段读取的更改数。 |
|
|
在恢复和运行时应用的总模式更改数。 |
|
|
自从从历史存储中恢复最后一个更改以来经过的毫秒数。 |
|
|
自应用最后一个更改以来经过的毫秒数。 |
|
|
从历史存储中恢复的最后一个更改的字符串表示。 |
|
|
已应用的最后一个更改的字符串表示。 |