Debezium JDBC 连接器

概述

Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器实现,它可以从多个源主题消费事件,然后使用 JDBC 驱动程序将这些事件写入关系型数据库。该连接器支持多种数据库方言,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。

JDBC 连接器的工作原理

Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器,因此需要 Kafka Connect 运行时。该连接器会定期轮询其订阅的 Kafka 主题,从这些主题消费事件,然后将事件写入配置的关系型数据库。该连接器支持幂等写入操作,使用 upsert 语义和基本的模式演进。

Debezium JDBC 连接器提供以下功能:

消费复杂的 Debezium 变更事件

默认情况下,Debezium 源连接器会生成复杂、分层的变更事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单消息转换 (SMT) 来展平变更事件的有效负载,以便接收器实现能够消费。如果您运行 Debezium JDBC 接收器连接器,则无需部署 SMT,因为 Debezium 接收器连接器可以直接消费原生 Debezium 变更事件,而无需使用转换。

当 JDBC 接收器连接器从 Debezium 源连接器消费复杂变更事件时,它会从原始 insertupdate 事件的 after 部分提取值。当接收器连接器消费删除事件时,不会咨询事件有效负载的任何部分。

Debezium JDBC 接收器连接器并非设计用于读取模式变更主题。如果您的源连接器配置为捕获模式变更,请在 JDBC 连接器配置中设置 topicstopics.regex 属性,以避免连接器消费模式变更主题。

至少一次交付

Debezium JDBC 接收器连接器保证它从 Kafka 主题消费的事件至少被处理一次。

多个任务

您可以跨多个 Kafka Connect 任务运行 Debezium JDBC 接收器连接器。要跨多个任务运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数。Kafka Connect 运行时会启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理来自多个源主题的变更来提高性能。

数据和列类型映射

为了使 Debezium JDBC 接收器连接器能够将入站消息字段的数据类型正确映射到出站消息字段,连接器需要有关源事件中每个字段数据类型的信息。该连接器支持不同数据库方言之间的各种列类型映射。为了正确地将目标列类型从事件字段中的 type 元数据转换为目标列类型,连接器会应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.typedatatype.propagate.source.type 选项来增强连接器解析列数据类型的方式。启用这些选项后,Debezium 会包含额外的参数元数据,这有助于 JDBC 接收器连接器更准确地解析目标列的数据类型。

为了使 Debezium JDBC 接收器连接器能够处理 Kafka 主题中的事件,Kafka 主题消息键(如果存在)必须是原始数据类型或 Struct。此外,源消息的有效负载必须是 Struct,该 Struct 具有展平的结构(没有嵌套的 struct 类型),或者具有符合 Debezium 复杂、分层结构的嵌套 struct 布局。

如果 Kafka 主题中事件的结构不符合这些规则,您必须实现自定义单消息转换,将源事件的结构转换为可用格式。

主键处理

默认情况下,Debezium JDBC 接收器连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少稳定的主键可能会使事件处理复杂化,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主键,您可以将连接器配置为使用以下表格中描述的主键模式之一。

模式 描述

none

创建表时未指定主键字段。

kafka

主键由以下三个列组成:

  • __connect_topic

  • __connect_partition

  • __connect_offset

这些列的值来自 Kafka 事件的坐标。

record_key

主键由 Kafka 事件的键组成。

如果主键是原始类型,请通过设置 primary.key.fields 属性来指定要使用的列名。如果主键是结构类型,则结构中的字段将映射为主键的列。您可以使用 primary.key.fields 属性将主键限制为列的子集。

record_value

主键由 Kafka 事件的值组成。

由于 Kafka 事件的值始终是 Struct,因此默认情况下,值中的所有字段都成为主键的列。要使用主键中的字段子集,请设置 primary.key.fields 属性,以指定要从中派生主键列的值中的字段的逗号分隔列表。

record_header

主键由 Kafka 事件的头组成。

Kafka 事件的头可能包含多个头,每个头都可以是 Struct 或原始数据类型,连接器会将这些头组合成一个 Struct。因此,此 Struct 中的所有字段都成为主键的列。要使用主键中的字段子集,请设置 primary.key.fields 属性,以指定要从中派生主键列的值中的字段的逗号分隔列表。

如果将 primary.key.mode 设置为 kafka 并将 schema.evolution 设置为 basic,某些数据库方言可能会抛出异常。当方言将 STRING 数据类型映射到可变长度字符串数据类型(如 TEXTCLOB)并且方言不允许主键列具有无限长度时,就会发生此异常。为避免此问题,请在环境中应用以下设置:

  • 不要将 schema.evolution 设置为 basic

  • 提前创建数据库表和主键映射。

如果列映射到目标数据库不允许作为主键的数据类型,则在 primary.key.fields 中需要一个显式列列表,排除此类列。请查阅您特定数据库供应商的文档,了解哪些数据类型是允许或不允许的。

删除模式

Debezium JDBC 接收器连接器可以在消费 DELETE墓碑事件时删除目标数据库中的行。默认情况下,JDBC 接收器连接器不启用删除模式。

如果您希望连接器删除行,则必须在连接器配置中显式设置 delete.enabled=true。要使用此模式,您还必须将 primary.key.fields 设置为 none 以外的值。前面的配置是必需的,因为删除是基于主键映射执行的,所以如果目标表没有主键映射,连接器就无法删除行。

幂等写入

Debezium JDBC 接收器连接器可以执行幂等写入,使其能够反复重放相同的记录而不会更改最终数据库状态。

要使连接器能够执行幂等写入,您必须在连接器配置中将 insert.mode 显式设置为 upsertupsert 操作将根据指定的主键是否已存在而执行为 updateinsert

如果主键值已存在,则操作会更新行中的值。如果指定的主键值不存在,则 insert 会添加新行。

每个数据库方言处理幂等写入的方式不同,因为没有标准的 SQL 用于upsert操作。下表显示了 Debezium 支持的数据库方言的 upsert DML 语法。

方言 Upsert 语法

Db2

MERGE …

MySQL

INSERT … ON DUPLICATE KEY UPDATE …

Oracle

MERGE …

PostgreSQL

INSERT … ON CONFLICT … DO UPDATE SET …

SQL Server

MERGE …

模式演进

您可以使用以下模式演进模式与 Debezium JDBC 接收器连接器:

模式 描述

none

该连接器不执行任何 DDL 模式演进。

basic

连接器自动检测事件有效负载中存在但目标表中不存在的字段。连接器会修改目标表以添加新字段。

schema.evolution 设置为 basic 时,连接器会根据入站事件的结构自动创建或修改目标数据库表。

当第一次从主题接收事件,并且目标表尚不存在时,Debezium JDBC 接收器连接器会使用事件的键或记录的模式结构来解析表的列结构。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前准备并执行 CREATE TABLE SQL 语句。

当 Debezium JDBC 连接器从主题接收事件时,如果记录的模式结构与目标表的模式结构不同,连接器会使用事件的键或其模式结构来识别哪些列是新的,并且必须添加到数据库表中。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前准备并执行 ALTER TABLE SQL 语句。由于更改列数据类型、删除列和调整主键可能被视为危险操作,因此连接器被禁止执行这些操作。

每个字段的模式决定了一个列是 NULL 还是 NOT NULL。模式还定义了每个列的默认值。如果连接器尝试创建一个具有您不想要的 nullability 设置或默认值的表,您必须提前手动创建表,或在接收器连接器处理事件之前调整相关字段的模式。要调整 nullability 设置或默认值,您可以引入自定义单消息转换,在管道中应用更改,或修改源数据库中定义的列状态。

字段的数据类型是根据预定义的映射集解析的。有关更多信息,请参阅 JDBC 字段类型

当您将新字段引入到目标数据库中已存在的表的事件结构时,必须将新字段定义为可选的,或者字段必须在数据库模式中指定默认值。如果您希望从目标表中删除某个字段,请使用以下选项之一:

  • 手动删除字段。

  • 删除列。

  • 为字段分配默认值。

  • 将字段定义为可为空。

引用和大小写敏感性

Debezium JDBC 接收器连接器通过构建将在目标数据库上执行的 DDL(模式更改)或 DML(数据更改)SQL 语句来消费 Kafka 消息。默认情况下,连接器使用源主题和事件字段的名称作为目标表中表和列名称的基础。构造的 SQL 不会自动用引号分隔标识符以保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于数据库在未指定大小时如何处理名称字符串。

例如,如果目标数据库方言是 Oracle 并且事件的主题是 orders,则目标表将创建为 ORDERS,因为 Oracle 在未引用名称时默认为大写名称。类似地,如果目标数据库方言是 PostgreSQL 并且事件的主题是 ORDERS,则目标表将创建为 orders,因为 PostgreSQL 在未引用名称时默认为小写名称。

要显式保留 Kafka 事件中存在的表名和字段名的大小写,请在连接器配置中将 quote.identifiers 属性的值设置为 true。当此选项设置为 true 时,如果入站事件的主题为 orders,并且目标数据库方言为 Oracle,则连接器会创建一个名为 orders 的表,因为构造的 SQL 将表名定义为 "orders"。启用引用后,当连接器创建列名时也会发生相同的行为。

连接空闲超时

Debezium 的 JDBC 接收器连接器利用连接池来提高性能。连接池旨在建立一组初始连接,维护指定数量的连接,并根据需要有效地将连接分配给应用程序。但是,当连接在池中长时间空闲时会出现一个挑战,如果它们保持不活动状态超过数据库配置的空闲超时阈值,则可能触发超时。

为减轻空闲连接线程可能触发超时的可能性,连接池提供了一种机制,可以定期验证每个连接的活动状态。此验证可确保连接保持活动状态,并防止数据库将其标记为空闲。如果发生网络中断,并且 Debezium 尝试使用已终止的连接,连接器将提示池生成新连接。

默认情况下,Debezium JDBC 接收器连接器不会执行空闲超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period 属性来配置连接器,使其按指定的时间间隔请求池执行超时测试。例如:

示例超时配置
{
  "hibernate.c3p0.idle_test_period": "300"
}

Debezium JDBC 接收器连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0.*` 配置名称空间中设置属性来定制 CP30 连接池。在前面的示例中,设置 hibernate.c3p0.idle_test_period 属性会将连接池配置为每 300 秒执行一次空闲超时测试。应用配置后,连接池将开始每五分钟评估一次未使用的连接。

数据类型映射

在 Debezium JDBC 接收器连接器将数据发送到接收数据库之前,它会将原始源记录中的数据类型转换为目标系统中相应的数据类型。适当的数据类型映射可确保原始数据在目标数据库中得到准确表示。

Debezium JDBC 接收器连接器使用逻辑或原始类型映射系统来解析列的数据类型。原始类型包括整数、浮点数、布尔值、字符串和字节等值。通常,Kafka 消息使用特定的 Kafka Connect Schema 类型代码来表示原始数据类型。

相比之下,为了表示更复杂的数据,Debezium 使用逻辑模式名称,这些名称提供命名字段的逻辑分组,可以表示一系列数据类型(字符串、数组、JSON、XML 等)。这些结构化类型考虑了数据的语义含义,并对如何序列化底层原始类型提供了更明确的解释。逻辑类型在表示具有特定编码的值时很有用,例如表示自纪元以来的时间数字。

以下示例显示了原始和逻辑数据类型的代表性结构。

示例 1. 原始字段模式
{
  "schema": {
    "type": "INT64"
  }
}
示例 2. 逻辑字段模式
{
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
}

Kafka Connect 并不是这些复杂逻辑类型的唯一来源。事实上,当 Debezium 源连接器发出变更事件时,它可以为事件字段分配类似的逻辑类型来表示时间戳、日期甚至 JSON 数据等数据类型。

Debezium JDBC 接收器连接器使用这些原始和逻辑类型将列类型解析为 JDBC SQL 代码,该代码表示列类型。这些 JDBC SQL 代码随后被底层 Hibernate 持久化框架用于将列类型解析为所用方言的逻辑数据类型。下表说明了 Kafka Connect 和 JDBC SQL 类型之间,以及 Debezium 和 JDBC SQL 类型之间的原始和逻辑映射。实际的最终列类型因数据库类型而异。

Kafka Connect 原始数据类型映射

表 1. Kafka Connect 原始数据类型与列数据类型之间的映射
原始类型 JDBC SQL 类型

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

Types.VARBINARY

对于 Oracle 23,该连接器不支持 BOOLEAN 数据。如果您将 JDBC 连接器配置为使用 Oracle 23 数据库作为接收目标,则无法依赖模式演进自动创建使用 Oracle 23 BOOLEAN 数据类型的字段。连接器将 BOOLEAN 数据映射到 BIT 数据类型,该数据类型在所有 Oracle 版本中通用。作为一种变通方法,如果您需要为 Oracle 23 接收器使用 BOOLEAN 数据类型,请手动将该字段添加到目标表中。

Kafka Connect 逻辑数据类型映射

表 2. Kafka Connect 逻辑数据类型与列数据类型之间的映射
逻辑类型 JDBC SQL 类型

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

Debezium 逻辑数据类型映射

表 3. Debezium 逻辑类型与列数据类型之间的映射
逻辑类型 JDBC SQL 类型

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

如果数据库不支持带时区的日期或时间戳,则映射会解析为其不带时区的等效项。

Debezium 方言特定的数据类型映射

表 4. Debezium 方言特定的逻辑类型与列数据类型之间的映射
逻辑类型 MySQL SQL 类型 PostgreSQL SQL 类型 SQL Server SQL 类型

io.debezium.data.Bits

bit(n)

bit(n)bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

n/a

io.debezium.data.Json

json

json

n/a

io.debezium.data.EnumSet

set

n/a

n/a

io.debezium.time.Year

year(n)

n/a

n/a

io.debezium.time.MicroDuration

n/a

interval

n/a

io.debezium.data.Ltree

n/a

ltree

n/a

io.debezium.data.Uuid

n/a

uuid

n/a

io.debezium.data.Xml

n/a

xml

xml

Debezium 逻辑向量类型映射

Debezium JDBC 接收器连接器支持将逻辑向量数据类型从源事件直接映射到接收目标,前提是目标数据库支持可比的表示。

如果源中的向量字段具有逻辑名称,Debezium 会使用它来确定适当的映射。对于某些数据库,连接器会在 io.debezium.data.vector.* 名称空间下识别以下逻辑名称:

  • FloatVector

  • DoubleVector

  • SparseVector

当连接器处理消息时,如果存在这些逻辑类型,连接器会检查目标数据库中是否存在相应的映射。如果存在映射,则应用它;否则,它将根据 Kafka Connect 模式类型将数据序列化为字符串。

例如,配置为将数据发送到 PostgreSQL 数据库接收器的连接器会将具有 io.debezium.data.FloatVector 逻辑名称的字段映射到 halfvector 列类型,使用特殊的覆盖映射。相比之下,如果接收数据库中没有直接映射,例如对于 Oracle 接收器,连接器将默认以其原始字符串格式序列化向量数据。

并非所有支持数据库的版本都支持特殊的向量类型。接收数据库必须满足以下要求才能支持逻辑向量类型的直接映射:

  • MariaDB 版本 11.7 或更高版本

  • MySQL 版本 9.0 或更高版本

  • PostgreSQL 需要 pgvector 扩展

早期版本的 MariaDB 或 MySQL,以及没有 pgvector 的 PostgreSQL 不支持特殊的向量类型。

表 5. Debezium 向量类型与列数据类型之间的映射
逻辑类型 Db2 MySQL PostgreSQL Oracle SQL Server

io.debezium.data.DoubleVector

不支持

'vector'

vector

不支持

不支持

io.debezium.data.FloatVector

不支持

'vector'

halfvec

不支持

不支持

io.debezium.data.SparseVector

不支持

不支持

sparsevec

不支持

不支持

如果 Debezium 向量逻辑类型与目标关系型数据库的列类型之间没有直接映射,您可以使用 VectorToJsonConverter 将向量逻辑类型转换为 JSON,以便将其写入任何目标关系型数据库。

列和数据类型传播

除了上表中所示的原始和逻辑映射外,如果变更事件的源是 Debezium 源连接器,则列类型及其长度、精度和小数位数的解析可以通过启用列或数据类型传播来进一步影响。列和数据类型传播有助于确定传入数据的结构和数据类型如何被转换并应用于接收目标。如前文 数据和列类型映射 所述,您可以通过在源连接器配置中设置以下属性之一来强制传播:

  • column.propagate.source.type

  • datatype.propagate.source.type

Debezium JDBC 接收器连接器仅应用具有更高优先级的相应值。

为了说明传播如何影响连接器映射数据类型,我们来看一个例子。以下示例显示了一个可能包含在变更事件中的字段模式:

示例 3. 启用了列或数据类型传播的 Debezium 变更事件字段模式
{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

由于发出事件的源连接器配置为使用列或数据类型传播,因此事件包含指定列 typelength 的参数。

如果源连接器未启用传播,则 typelength 参数将不存在,Debezium JDBC 接收器连接器将默认将 type 字段中的 INT8 值映射到 Types.SMALLINT 列类型。根据目标数据库的 SQL 方言,连接器随后会将 JDBC Types.SMALLINT 类型解析为多种逻辑类型之一。例如,对于 MySQL 接收目标,JDBC 连接器默认将 Types.SMALLINT 转换为 TINYINT 列类型,不指定长度。

然而,由于源连接器中启用了类型传播,它发出的事件包含 typelength,提供了更具体的映射说明。因此,Debezium JDBC 接收器连接器不使用默认映射,而是使用给定的参数值来精炼映射,并在接收数据库中创建一个类型为 TINYINT(1) 的列。

通常,在源数据库和接收数据库类型相同时,使用列或数据类型传播的效果最为显著。因为源数据库和接收数据库共享相同的底层模式结构,所以更容易在它们之间映射数据类型,而无需应用复杂的转换或解释。我们一直在寻找改进异构数据库之间映射的方法,当前的类型系统允许我们根据反馈继续精炼这些映射。如果您发现某个映射可以改进,请告知我们。

转换

Debezium JDBC 连接器提供了几种转换,可以添加到连接器配置中,在连接器处理之前动态修改消费的事件。

命名转换

Debezium JDBC 连接器提供两种命名转换,您可以将其应用于更改事件中的主题或字段名称的命名样式。另外,您还可以使用命名转换为转换后的名称添加前缀或后缀。

您可以配置命名转换以使用以下命名样式之一:

camel_case

删除所有句点 (.) 和下划线 (_) 字符,并将紧随其后的字符转换为大写。例如,值 inventory.customers 将更改为 inventoryCustomers

snake_case

删除所有句点 (.) 字符,并将其替换为下划线 (_) 字符。此外,所有数字序列都将加上下划线 (_) 前缀,所有大写字符都将转换为小写,然后加上下划线 (_) 前缀。例如,值 public.inventory 变为 public_inventory,而 TopicWith123Numbers 变为 topic_with_123_numbers

upper_case

转换为大写。例如,值 public.inventory 将变为 PUBLIC.INVENTORY

lower_case

转换为小写。例如,值 PUBLIC.INVENTORY 将变为 public.inventory

连接器提供以下命名转换:

CollectionNameTransformation

CollectionNameTransformation 提供了一种在事件被消费之前更改主题名称大小写的方法。此转换具有以下配置属性:

属性 Default (默认值) 描述

none

指定要应用于事件主题的命名样式。

指定转换后应用于主题名称的前缀。

指定转换后应用于主题名称的后缀。

以下配置示例说明了如何使用 CollectionNameTransformation SMT 将主题名称设置为大写,并在主题名称前加上 ADT_。例如,给定一个名为 public.inventory 的主题,主题名称将变为 ADT_PUBLIC.INVENTORY

示例 CollectionNameTransformation 配置
{
  "transforms": "topic-uppercase",
  "transforms.topic-uppercase.type": "io.debezium.connector.jdbc.transforms.CollectionNameTransformation",
  "transforms.topic-uppercase.collection.naming.style": "upper_case",
  "transforms.topic-uppercase.collection.naming.prefix": "ADT_"
}
FieldNameTransformation

FieldNameTransformation 提供了一种在事件被消费之前更改字段名称大小写的方法。如果事件是 Debezium 源连接器事件,则仅更改 beforeafter 部分中的字段。当事件不是 Debezium 源连接器事件时,将更改所有顶级字段名称。

此转换具有以下配置属性:

属性 Default (默认值) 描述

none

指定要应用于字段名称的命名样式。

指定转换后应用于字段名称的前缀。

指定转换后应用于字段名称的后缀。

以下示例展示了如何配置 FieldNameTransformation SMT 将字段名称转换为小写,并在名称前加上字符串 adt_。将 SMT 应用于包含名称为 ID 的字段的事件后,该字段将重命名为 adt_id

示例 FieldNameTransformation 配置
{
  "transforms": "topic-lowercase",
  "transforms.topic-lowercase.type": "io.debezium.connector.jdbc.transforms.FieldNameTransformation",
  "transforms.topic-lowercase.collection.naming.style": "lower_case",
  "transforms.topic-lowercase.collection.naming.prefix": "adt_"
}

部署

要部署 Debezium JDBC 连接器,您需要安装 Debezium JDBC 连接器归档文件,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。

先决条件
  • 已安装 Apache KafkaKafka Connect

  • 您有一个 Kafka 主题,连接器可以从中读取变更事件记录。

  • 已安装并配置为接受 JDBC 连接的目标数据库。

过程
  1. 下载 Debezium JDBC 连接器插件归档文件

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 可选地从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC 接收器连接器 JAR 文件的目录中。

    Oracle 和 Db2 的驱动程序不包含在 JDBC 接收器连接器中。您必须手动下载并安装这些驱动程序。

  4. 将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。请确保您安装 JDBC 接收器连接器的路径是 Kafka Connect plugin.path 的一部分。

  5. 重新启动 Kafka Connect 进程以加载新的 JAR 文件。

Debezium JDBC 连接器配置

通常,您可以通过提交一个 JSON 请求来注册 Debezium JDBC 连接器,该请求指定连接器的配置属性。以下示例显示了注册 Debezium JDBC 接收器连接器实例的 JSON 请求,该连接器从名为 orders 的主题消费事件,并使用最常见的配置设置。

示例:Debezium JDBC 连接器配置
{
    "name": "jdbc-connector",  (1)
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  (2)
        "tasks.max": "1",  (3)
        "connection.url": "jdbc:postgresql:///db",  (4)
        "connection.username": "pguser",  (5)
        "connection.password": "pgpassword",  (6)
        "insert.mode": "upsert",  (7)
        "delete.enabled": "true",  (8)
        "primary.key.mode": "record_key",  (9)
        "schema.evolution": "basic",  (10)
        "use.time.zone": "UTC",  (11)
        "topics": "orders" (12)
    }
}

JDBC 连接器配置设置的说明

Item 描述

1

在您向 Kafka Connect 服务注册连接器时为其分配的名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器必需的。

2

连接器类的名称。

3

为此连接器创建的最大任务数。

4

连接器用于连接到其写入数据的接收数据库的 JDBC URL。

5

用于身份验证的数据库用户名。

6

用于身份验证的数据库用户的密码。

7

连接器使用的 insert.mode

8

启用在数据库中删除记录。有关更多信息,请参阅 delete.enabled 配置属性。

9

指定用于解析主键列的方法。有关更多信息,请参阅 primary.key.mode 配置属性。

10

启用连接器演进目标数据库的模式。有关更多信息,请参阅 schema.evolution 配置属性。

11

写入时间字段类型时使用的时区。

12

要消费的主题列表,以逗号分隔。

有关您可以为 Debezium JDBC 连接器设置的完整配置属性列表,请参阅 JDBC 连接器属性

您可以将此配置与 POST 命令一起发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个接收器连接器任务,该任务执行以下操作:

  • 连接到数据库。

  • 消费来自订阅的 Kafka 主题的事件。

  • 将事件写入配置的数据库。

连接器属性

Debezium JDBC 接收器连接器具有多个您可以使用的配置属性,以实现满足您需求的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

JDBC 连接器 Kafka 消费者属性

属性 Default (默认值) 描述

无默认值

在 Kafka Connect 服务中注册连接器时为其分配的唯一名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器必需的。

无默认值

连接器的 Java 类名称。对于 Debezium JDBC 连接器,请指定值 io.debezium.connector.jdbc.JdbcSinkConnector

1

此连接器的最大任务数。

无默认值

要消费的主题列表,以逗号分隔。请勿将此属性与 topics.regex 属性结合使用。

无默认值

一个指定要消费主题的正则表达式。在内部,正则表达式会被编译为 java.util.regex.Pattern。请勿将此属性与 topics 属性结合使用。

JDBC 连接器连接属性

属性 Default (默认值) 描述

org.hibernate.c3p0.internal.C3P0ConnectionProvider

要使用的连接提供程序实现。

无默认值

用于连接数据库的 JDBC 连接 URL。

无默认值

连接器用于连接数据库的数据库用户帐户名称。

无默认值

连接器用于连接数据库的密码。

5

指定池中的最小连接数。

32

指定池维护的最大并发连接数。

32

指定当连接池超出其最大大小时,连接器尝试获取的连接数。

1800

指定在丢弃之前保留空闲连接的时间(以秒为单位)。

false

指定连接器在瞬态 JDBC 连接错误后是否重试。

启用时(true),连接器会将连接问题(如套接字关闭或超时)视为可重试的,允许它重试处理而不是使任务失败。这减少了停机时间并提高了对临时中断的弹性。

将此选项设置为 true 可以减少停机时间。但是,在具有异步复制的主备环境中,如果重试发生在所有更改完全复制之前,可能会导致数据丢失。

对于需要强数据一致性的情况,请谨慎使用。

JDBC 连接器运行时属性

属性 Default (默认值) 描述

UTC

指定插入 JDBC 时间值时使用的时区。

false

指定连接器是否处理 DELETE墓碑事件,并从数据库中删除相应的行。此选项的使用要求您将 primary.key.mode 设置为 record.key

false

指定连接器是否处理 TRUNCATE 事件,并从数据库中截断相应的表。

尽管自 9.7 版本以来,Db2 就支持 TRUNCATE 语句,但目前 JDBC 连接器无法处理 Db2 连接器发出的标准 TRUNCATE 事件。

为确保 JDBC 连接器能够处理从 Db2 接收到的 TRUNCATE 事件,请使用 TRUNCATE TABLE 语句以外的其他方法执行截断。例如:

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

提交上述查询的用户帐户需要对要截断的表拥有 ALTER 权限。

insert

指定用于将事件插入数据库的策略。可用的选项如下:

insert

指定所有事件都应构造基于 INSERT 的 SQL 语句。仅当不使用主键,或者您可以确定具有现有主键值的行不会发生更新时,才使用此选项。

update

指定所有事件都应构造基于 UPDATE 的 SQL 语句。仅当您可以确定连接器仅接收适用于现有行的事件时,才使用此选项。

upsert

指定连接器使用 upsert 语义将事件添加到表中。也就是说,如果主键不存在,连接器将执行 INSERT 操作;如果键已存在,连接器将执行 UPDATE 操作。当需要幂等写入时,应将连接器配置为此选项。

none

指定连接器如何从事件解析主键列。

none

指定不创建主键列。

kafka

指定连接器使用 Kafka 坐标作为主键列。键坐标从事件的主题名称、分区和偏移量定义,并映射到以下名称的列:

  • __connect_topic

  • __connect_partition

  • __connect_offset

record_key

指定主键列源自事件的记录键。如果记录键是原始类型,则需要 primary.key.fields 属性来指定主键列的名称。如果记录键是结构类型,则 primary.key.fields 属性是可选的,可用于将事件键中的一部分列指定为主键。

record_value

指定主键列源自事件的值。您可以设置 primary.key.fields 属性将主键定义为事件值中的字段子集;否则,默认情况下使用所有字段。

无默认值

主键列的名称或用于从其派生主键的字段列表(逗号分隔)。

primary.key.mode 设置为 record_key 且事件的键是原始类型时,此属性应指定要用于该键的列名。

primary.key.mode 设置为 record_key(非原始键)或 record_value 时,此属性应指定源自键或值中字段的逗号分隔列表。如果 primary.key.mode 设置为 record_key(非原始键)或 record_value,并且未指定此属性,则连接器将从记录键或记录值的所有字段派生主键,具体取决于指定的模式。

false

指定生成的 SQL 语句是否使用引号分隔表名和列名。有关更多详细信息,请参阅 JDBC 引用大小写敏感性 部分。

none

指定连接器如何演进目标表模式。有关更多信息,请参阅 模式演进。以下选项可用:

none

指定连接器不演进目标模式。

basic

指定发生基本演进。连接器通过比较入站事件的记录模式与数据库表结构来添加表中缺失的列。

${topic}

指定连接器用于构建目标表名称的字符串模式。
当属性设置为默认值 ${topic} 时,在连接器从 Kafka 读取事件后,它会将事件记录写入名称与源主题名称匹配的目标表中。

您还可以配置此属性以从入站事件记录中的特定字段提取值,然后使用这些值动态生成目标表名称。这种从消息源中的值生成表名称的能力,否则需要使用自定义 Kafka Connect 单消息转换 (SMT)。

要将此属性配置为动态生成目标表名称,请将其值设置为类似 ${source._field_} 的模式。当您指定此类型的模式时,连接器会从 Debezium 变更事件的 source 块中提取值,然后使用这些值构建表名。例如,您可以将属性值设置为模式 ${source.schema}_${source.table}。根据此模式,如果连接器读取的事件中源块的 schema 字段包含值 usertable 字段包含值 tab,则连接器会将事件记录写入名为 user_tab 的表中。

public

指定安装了 PostgreSQL PostGIS 扩展的模式名称。默认值为 public;但是,如果 PostGIS 扩展安装在其他模式中,则应使用此属性指定替代模式名称。

false

指定连接器是否在向 SQL Server 表的标识列执行 INSERTUPSERT 操作之前自动设置 IDENTITY_INSERT,并在操作之后立即取消设置。当默认设置(false)生效时,向表的 IDENTITY 列执行 INSERTUPSERT 操作将导致 SQL 异常。

500

指定尝试将多少条记录批量写入目标表。

请注意,如果您将 Connect 工作进程属性中的 consumer.max.poll.records 设置为低于 batch.size 的值,批处理将由 consumer.max.poll.records 限制,并且无法达到所需的 batch.size。您也可以通过连接器配置中的 consumer.override.max.poll.records 来配置连接器底层消费者的 max.poll.records

false

指定是否启用 Debezium JDBC 连接器的缩减缓冲区。

选择以下设置之一:

false

(default) 连接器将从 Kafka 消费的每个变更事件作为单独的逻辑 SQL 更改写入。

true

连接器使用缩减缓冲区在将变更事件写入接收数据库之前对其进行缩减。也就是说,如果多个事件引用了相同的主键,连接器会合并 SQL 查询,并且仅根据最新偏移量记录中报告的行状态写入单个逻辑 SQL 更改。
选择此选项可减少目标数据库上的 SQL 负载。

为了在启用缩减缓冲区的情况下优化 PostgreSQL 接收数据库中的查询处理,您还必须通过将 reWriteBatchedInserts 参数添加到 JDBC 连接 URL 来启用数据库执行批量查询。

空字符串

一个可选的、逗号分隔的字段名称列表,这些字段名称与要从变更事件值中包含的字段的完全限定名称匹配。字段的完全限定名称形式为 fieldNametopicName:_fieldName_

如果在此配置中包含此属性,请不要设置 field.exclude.list 属性。

空字符串

一个可选的、逗号分隔的字段名称列表,这些字段名称与要从变更事件值中排除的字段的完全限定名称匹配。字段的完全限定名称形式为 fieldNametopicName:_fieldName_

如果在此配置中包含此属性,请不要设置 field.include.list 属性。

5

指定在尝试将更改刷新到目标数据库导致某些数据库错误后,连接器执行的最大重试次数。如果重试次数超过重试值,则接收器连接器将进入 FAILED 状态。

1000

指定连接器在重试失败的刷新操作之前等待的毫秒数。

当您同时设置 flush.retry.delay.msflush.max.retries 属性时,这会影响 Kafka max.poll.interval.ms 属性的行为。为防止连接器重新平衡,请将总重试时间(flush.retry.delay.ms * flush.max.retries)设置为小于 max.poll.interval.ms(默认为 5 分钟)的值。

JDBC 连接器可扩展属性

属性 Default (默认值) 描述

io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy

指定 ColumnNamingStrategy 实现的完全限定类名,连接器使用该实现从入站事件字段名解析列名。

默认行为是将字段名用作列名,不进行任何转换。

io.debezium.connector.jdbc.nnaming.DefaultCollectionNamingStrategy

指定 CollectionNamingStrategy 实现的完全限定类名,连接器使用该实现从入站事件主题名解析表名。

默认行为是:

  • 清理主题名称,用下划线 (_) 替换点 (.)。

  • 用事件的主题替换 collection.name.format 配置属性中的 ${topic} 占位符。

JDBC 连接器 hibernate.* 直通属性

Kafka Connect 支持直通配置,允许您通过将某些属性直接从连接器配置传递来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器 连接属性(例如,connection.urlconnection.usernameconnection.pool.*_size)以及通过连接器的 运行时属性(例如,use.time.zonequote.identifiers)暴露。

如果您想自定义其他 Hibernate 行为,可以通过将使用 hibernate.* 名称空间的属性添加到连接器配置中来利用直通机制。例如,为了帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect 属性并将其设置为数据库的完全限定类名,例如 org.hibernate.dialect.MariaDBDialect

常见问题解答

是否需要 ExtractNewRecordState 单消息转换?

否,这实际上是 Debezium JDBC 连接器与其他实现的一个区别点。虽然该连接器能够像竞争对手一样摄取展平的事件,但它也可以原生摄取 Debezium 的复杂变更事件结构,而无需任何特定类型的转换。

如果列的类型发生更改,或者列被重命名或删除,是否由模式演进处理?

否,Debezium JDBC 连接器不对现有列进行任何更改。连接器支持的模式演进非常基本。它只是比较事件结构中的字段与表的列列表,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值发生更改,连接器不会在目标数据库中进行调整。如果列被重命名,旧列将保持原样,连接器会将一个新名称的列添加到表中;但是,具有旧列数据的现有行将保持不变。这些类型的模式更改应手动处理。

如果列的类型未解析为我想要的类型,我该如何强制映射到不同的数据类型?

Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何解析特定字段的模式定义到 JDBC 类型的详细信息,请参阅 数据和列类型映射 部分。如果您想应用不同的数据类型映射,请手动定义表以显式获取首选的列类型。

如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?

为了在目标表名称中添加前缀或后缀,请调整 collection.name.format 连接器配置属性以应用所需的前缀或后缀。例如,要在所有表名前添加 jdbc_ 前缀,请将 collection.name.format 配置属性的值指定为 jdbc_${topic}。如果连接器订阅了一个名为 orders 的主题,则生成的表将创建为 jdbc_orders

为什么有些列会自动被引用,即使未启用标识符引用?

在某些情况下,特定的列名或表名可能会被显式引用,即使未启用 quote.identifiers。当列名或表名以特定约定开头或使用特定约定时,这通常是必需的,因为否则会被视为非法语法。例如,当 primary.key.mode 设置为 kafka 时,一些数据库只允许列名以下划线开头,如果该列名被引用。引用行为是特定于方言的,并且因不同类型的数据库而异。