您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium JDBC 连接器

概述

Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器实现,它可以从多个源主题消费事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。该连接器支持多种数据库方言,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。

JDBC 连接器的工作原理

Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器,因此需要 Kafka Connect 运行时。该连接器会定期轮询它订阅的 Kafka 主题,从中消费事件,然后将这些事件写入配置的关系数据库。该连接器通过使用 upsert 语义和基本的模式演进支持幂等写入操作。

Debezium JDBC 连接器提供以下功能:

处理复杂的 Debezium 更改事件

默认情况下,Debezium 源连接器会生成复杂、分层的更改事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单个消息转换 (SMT) 来展平事件的有效负载,以便接收器实现能够消费它们。如果您运行 Debezium JDBC 接收器连接器,则无需部署 SMT,因为 Debezium 接收器连接器可以直接消费原生的 Debezium 更改事件,而无需进行转换。

当 JDBC 接收器连接器从 Debezium 源连接器消费复杂更改事件时,它会提取原始 insertupdate 事件的 after 部分的值。当接收器连接器消费删除事件时,不会考虑事件的有效负载的任何部分。

Debezium JDBC 接收器连接器不设计用于读取模式更改主题。如果您的源连接器配置为捕获模式更改,请在 JDBC 连接器配置中设置 topicstopics.regex 属性,以便连接器不从模式更改主题中进行消费。

至少一次传递

Debezium JDBC 接收器连接器保证它从 Kafka 主题消费的事件至少被处理一次。

多个任务

您可以跨多个 Kafka Connect 任务运行 Debezium JDBC 接收器连接器。要跨多个任务运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时会启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理多个源主题的更改来提高性能。

数据和列类型映射

为了使 Debezium JDBC 接收器连接器能够正确地将入站消息字段中的数据类型映射到出站消息字段中的数据类型,连接器需要有关源事件中存在的每个字段的数据类型的信息。该连接器支持跨不同数据库方言的广泛的列类型映射。为了正确地从事件字段中的 type 元数据转换目标列类型,连接器应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.typedatatype.propagate.source.type 选项来增强连接器解析列数据类型的方式。启用这些选项后,Debezium 会包含额外的参数元数据,这有助于 JDBC 接收器连接器更准确地解析目标列的数据类型。

要使 Debezium JDBC 接收器连接器能够处理 Kafka 主题中的事件,Kafka 主题消息键(如果存在)必须是原始数据类型或 Struct。此外,源消息的有效负载必须是 Struct,它具有一个展平的结构,没有嵌套的 struct 类型,或者具有符合 Debezium 的复杂、分层结构的嵌套 struct 布局。

如果 Kafka 主题中事件的结构不符合这些规则,您必须实现自定义的单个消息转换,将源事件的结构转换为可用的格式。

主键处理

默认情况下,Debezium JDBC 接收器连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,稳定的主键缺失可能会使事件处理复杂化,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主键,您可以将连接器配置为使用下表中描述的主键模式之一。

模式 描述

none

创建表时未指定任何主键字段。

kafka

主键由以下三个列组成:

  • __connect_topic

  • __connect_partition

  • __connect_offset

这些列的值来自 Kafka 事件的坐标。

record_key

主键由 Kafka 事件的键组成。

如果主键是原始类型,请通过设置 primary.key.fields 属性来指定要使用的列名。如果主键是 struct 类型,则 struct 中的字段将被映射为主键的列。您可以使用 primary.key.fields 属性将主键限制为部分列。

record_value

主键由 Kafka 事件的值组成。

由于 Kafka 事件的值始终是 Struct,因此默认情况下,值中的所有字段都成为主键的列。要使用主键的一部分字段,请设置 primary.key.fields 属性,以指定要从中派生主键列的值中的字段的逗号分隔列表。

record_header

主键由 Kafka 事件的头部组成。

Kafka 事件的头部可能包含多个头部,每个头部可能是 Struct 或原始数据类型,连接器会将其制作成一个 Struct。因此,此 Struct 中的所有字段都成为主键的列。要使用主键的一部分字段,请设置 primary.key.fields 属性,以指定要从中派生主键列的值中的字段的逗号分隔列表。

如果将 primary.key.mode 设置为 kafka 并将 schema.evolution 设置为 basic,某些数据库方言可能会抛出异常。当某个方言将 STRING 数据类型映射到可变长度字符串数据类型(如 TEXTCLOB)时,并且该方言不允许主键列具有无界长度时,就会发生此异常。为避免此问题,请在环境中应用以下设置:

  • 不要将 schema.evolution 设置为 basic

  • 提前创建数据库表和主键映射。

如果某个列映射到目标数据库不允许作为主键的数据类型,则需要在 primary.key.fields 中明确列出这些列,排除不允许的列。有关允许和不允许的数据类型,请参阅您特定的数据库供应商文档。

删除模式

当消费到 DELETEtombstone 事件时,Debezium JDBC 接收器连接器可以删除目标数据库中的行。默认情况下,JDBC 接收器连接器不启用删除模式。

如果您希望连接器删除行,则必须在连接器配置中显式设置 delete.enabled=true。要使用此模式,您还必须将 primary.key.fields 设置为 none 以外的值。上述配置是必需的,因为删除是基于主键映射执行的,所以如果目标表没有主键映射,连接器就无法删除行。

幂等写入

Debezium JDBC 接收器连接器可以执行幂等写入,从而使其能够反复重放相同的记录,而不会改变最终的数据库状态。

要使连接器能够执行幂等写入,您必须在连接器中显式将 insert.mode 设置为 upsertupsert 操作根据指定的主键是否已存在,执行为 updateinsert

如果主键值已存在,操作将更新行中的值。如果指定的主键值不存在,insert 将添加新行。

每个数据库方言处理幂等写入的方式不同,因为没有 SQL 标准来处理 upsert 操作。下表显示了 Debezium 支持的数据库方言的 upsert DML 语法。

方言 Upsert 语法

Db2

MERGE …​

MySQL

INSERT …​ ON DUPLICATE KEY UPDATE …​

Oracle

MERGE …​

PostgreSQL

INSERT …​ ON CONFLICT …​ DO UPDATE SET …​

SQL Server

MERGE …​

模式演进

您可以使用以下模式演进与 Debezium JDBC 接收器连接器:

模式 描述

none

连接器不执行任何 DDL 模式演进。

basic

连接器会自动检测事件有效负载中存在但目标表中不存在的字段。连接器会更改目标表以添加新字段。

schema.evolution 设置为 basic 时,连接器会根据传入事件的结构自动创建或修改目标数据库表。

当第一次从主题接收事件,并且目标表尚不存在时,Debezium JDBC 接收器连接器会使用事件的键或记录的结构来解析表的列结构。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前,准备并执行 CREATE TABLE SQL 语句。

当 Debezium JDBC 连接器从主题接收事件时,如果记录的模式结构与目标表的模式结构不同,连接器会使用事件的键或其模式结构来识别哪些列是新的,需要添加到数据库表中。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前,准备并执行 ALTER TABLE SQL 语句。由于更改列数据类型、删除列和调整主键可能被视为危险操作,因此连接器不允许执行这些操作。

每个字段的模式决定了列是 NULL 还是 NOT NULL。模式还定义了每列的默认值。如果连接器尝试创建一个具有您不想要的 nullability 设置或默认值的表,您必须手动提前创建表,或者在接收器连接器处理事件之前调整相关字段的模式。要调整 nullability 设置或默认值,您可以引入自定义的单个消息转换,在管道中应用更改,或修改源数据库中定义的列状态。

字段的数据类型是根据预定义的映射集解析的。有关更多信息,请参阅 JDBC 字段类型

当您向目标数据库中已存在的表的事件结构添加新字段时,必须将新字段定义为可选字段,或者数据库模式中必须指定一个默认值。如果您希望从目标表中删除某个字段,请使用以下选项之一:

  • 手动删除该字段。

  • 删除该列。

  • 为该字段分配默认值。

  • 将该字段定义为可空。

引号和大小写敏感性

Debezium JDBC 接收器连接器通过构建 DDL(模式更改)或 DML(数据更改)SQL 语句来消费 Kafka 消息,这些语句会在目标数据库上执行。默认情况下,连接器使用源主题和事件字段的名称作为目标表中表和列名称的基础。构造的 SQL 不会自动用引号分隔标识符以保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于数据库在未指定大小写时如何处理名称字符串。

例如,如果目标数据库方言是 Oracle,并且事件主题是 orders,则目标表将创建为 ORDERS,因为 Oracle 在未引用名称时默认使用大写名称。同样,如果目标数据库方言是 PostgreSQL,并且事件主题是 ORDERS,则目标表将创建为 orders,因为 PostgreSQL 在未引用名称时默认使用小写名称。

要显式保留 Kafka 事件中的表和字段名称的大小写,请在连接器配置中将 quote.identifiers 属性的值设置为 true。当此选项设置为 true 时,如果传入事件的主题名为 orders,并且目标数据库方言是 Oracle,则连接器将创建一个名为 orders 的表,因为构造的 SQL 将表名定义为 "orders"。启用引号会导致连接器创建列名时产生相同的行为。

连接空闲超时

Debezium 的 JDBC 接收器连接器利用连接池来提高性能。连接池旨在建立一组初始连接,维护指定数量的连接,并根据需要有效地将连接分配给应用程序。但是,当连接在池中长时间空闲时会出现问题,如果它们保持不活动状态超过数据库配置的空闲超时阈值,可能会导致超时。

为了减轻空闲连接线程触发超时的可能性,连接池提供了一种机制,该机制会定期验证每个连接的活动状态。此验证可确保连接保持活动状态,并防止数据库将其标记为空闲。在发生网络中断的情况下,如果 Debezium 尝试使用已终止的连接,连接器会提示池生成新连接。

默认情况下,Debezium JDBC 接收器连接器不执行空闲超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period 属性来配置连接器,使其以指定的时间间隔执行超时测试。例如:

超时配置示例
{
  "hibernate.c3p0.idle_test_period": "300"
}

Debezium JDBC 接收器连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0.* 配置命名空间中设置属性来定制 CP30 连接池。在前面的示例中,设置 hibernate.c3p0.idle_test_period 属性会将连接池配置为每 300 秒执行一次空闲超时测试。应用配置后,连接池将开始每五分钟评估一次未使用的连接。

数据类型映射

在 Debezium JDBC 接收器连接器将数据发送到接收器数据库之前,它会将原始源记录中的数据类型转换为目标系统中的相应类型。适当的数据类型映射可确保原始数据在目标数据库中得到准确表示。

Debezium JDBC 接收器连接器通过使用逻辑或原始类型映射系统来解析列的数据类型。原始类型包括整数、浮点数、布尔值、字符串和字节等值。通常,Kafka 消息使用特定的 Kafka Connect Schema 类型代码来表示原始数据类型。

相比之下,为了表示更复杂的数据,Debezium 使用逻辑模式名称,它们提供命名字段的逻辑分组,可以表示一系列数据类型(字符串、数组、JSON、XML 等)。这些结构化类型会考虑数据的语义含义,并对如何序列化底层原始类型提供更明确的解释。逻辑类型在表示具有特定编码的值时很有用,例如表示自纪元以来的时间的数字。

以下示例显示了原始数据类型和逻辑数据类型的代表性结构。

示例 1. 原始字段模式
{
  "schema": {
    "type": "INT64"
  }
}
示例 2. 逻辑字段模式
{
  "schema": {
    "type": "INT64",
    "name": "org.apache.kafka.connect.data.Date"
  }
}

Kafka Connect 并非这些复杂逻辑类型的唯一来源。实际上,当 Debezium 源连接器发出更改事件时,它可以为事件字段分配类似的逻辑类型来表示时间戳、日期甚至 JSON 数据等数据类型。

Debezium JDBC 接收器连接器使用这些原始类型和逻辑类型来将列的类型解析为 JDBC SQL 代码,后者表示列的类型。这些 JDBC SQL 代码随后被底层 Hibernate 持久化框架用于将列的类型解析为所用方言的逻辑数据类型。下表说明了 Kafka Connect 和 JDBC SQL 类型之间,以及 Debezium 和 JDBC SQL 类型之间的原始和逻辑映射。实际的最终列类型因数据库类型而异。

Kafka Connect 原始数据类型映射

表 1. Kafka Connect 原始数据类型与列数据类型之间的映射
原始类型 JDBC SQL 类型

INT8

Types.TINYINT

INT16

Types.SMALLINT

INT32

Types.INTEGER

INT64

Types.BIGINT

FLOAT32

Types.FLOAT

FLOAT64

Types.DOUBLE

BOOLEAN

Types.BOOLEAN

STRING

Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR

BYTES

Types.VARBINARY

该连接器不支持 Oracle 23 中的 BOOLEAN 数据。如果您将 JDBC 连接器配置为使用 Oracle 23 数据库作为接收器目标,则不能依赖模式演进自动创建使用 Oracle 23 BOOLEAN 数据类型的字段。连接器会将 BOOLEAN 数据映射到 BIT 数据类型,后者在所有 Oracle 版本中都是通用的。作为一种变通方法,如果您需要为 Oracle 23 接收器使用 BOOLEAN 数据类型,请手动将该字段添加到目标表中。

Kafka Connect 逻辑数据类型映射

表 2. Kafka Connect 逻辑数据类型与列数据类型之间的映射
逻辑类型 JDBC SQL 类型

org.apache.kafka.connect.data.Decimal

Types.DECIMAL

org.apache.kafka.connect.data.Date

Types.DATE

org.apache.kafka.connect.data.Time

Types.TIMESTAMP

org.apache.kafka.connect.data.Timestamp

Types.TIMESTAMP

Debezium 逻辑数据类型映射

表 3. Debezium 逻辑类型与列数据类型之间的映射
逻辑类型 JDBC SQL 类型

io.debezium.time.Date

Types.DATE

io.debezium.time.Time

Types.TIMESTAMP

io.debezium.time.MicroTime

Types.TIMESTAMP

io.debezium.time.NanoTime

Types.TIMESTAMP

io.debezium.time.ZonedTime

Types.TIME_WITH_TIMEZONE

io.debezium.time.Timestamp

Types.TIMESTAMP

io.debezium.time.MicroTimestamp

Types.TIMESTAMP

io.debezium.time.NanoTimestamp

Types.TIMESTAMP

io.debezium.time.ZonedTimestamp

Types.TIMESTAMP_WITH_TIMEZONE

io.debezium.data.VariableScaleDecimal

Types.DOUBLE

如果数据库不支持带时区的日期或时间戳,则映射将解析为其不带时区的等效项。

Debezium 特定于方言的数据类型映射

表 4. Debezium 特定于方言的逻辑类型与列数据类型之间的映射
逻辑类型 MySQL SQL 类型 PostgreSQL SQL 类型 SQL Server SQL 类型

io.debezium.data.Bits

bit(n)

bit(n)bit varying

varbinary(n)

io.debezium.data.Enum

enum

Types.VARCHAR

n/a

io.debezium.data.Json

json

json

n/a

io.debezium.data.EnumSet

set

n/a

n/a

io.debezium.time.Year

year(n)

n/a

n/a

io.debezium.time.MicroDuration

n/a

interval

n/a

io.debezium.data.Ltree

n/a

ltree

n/a

io.debezium.data.Uuid

n/a

uuid

n/a

io.debezium.data.Xml

n/a

xml

xml

Debezium 逻辑向量类型映射

Debezium JDBC 接收器连接器支持将逻辑向量数据类型从源事件直接映射到接收器目标,前提是目标数据库支持可比较的表示。

如果源中的向量字段具有逻辑名称,Debezium 会使用它来确定适当的映射。对于某些数据库,连接器会在 io.debezium.data.vector.* 命名空间下识别以下逻辑名称:

  • FloatVector

  • DoubleVector

  • SparseVector

当连接器处理消息时,如果存在这些逻辑类型,连接器会检查目标数据库中是否存在相应的映射。如果存在映射,则应用它;否则,它将根据 Kafka Connect 模式类型将数据序列化为字符串。

例如,配置为将数据发送到 PostgreSQL 数据库接收器的连接器会将具有 io.debezium.data.FloatVector 逻辑名称的字段映射到 halfvector 列类型,使用特殊的覆盖映射。相比之下,如果接收器数据库中不存在直接映射,例如 Oracle 接收器,则连接器将默认将向量数据序列化为其原始字符串格式。

并非所有支持的数据库版本都提供对特殊向量类型的支持。接收器数据库必须满足以下要求才能支持逻辑向量类型的直接映射:

  • MariaDB 版本 11.7 或更高版本

  • MySQL 版本 9.0 或更高版本

  • PostgreSQL 需要 pgvector 扩展

早期版本的 MariaDB 或 MySQL,以及没有 pgvector 的 PostgreSQL 不支持特殊向量类型。

表 5. Debezium 向量类型与列数据类型之间的映射
逻辑类型 Db2 MySQL PostgreSQL Oracle SQL Server

io.debezium.data.DoubleVector

不支持

'vector'

vector

不支持

不支持

io.debezium.data.FloatVector

不支持

'vector'

halfvec

不支持

不支持

io.debezium.data.SparseVector

不支持

不支持

sparsevec

不支持

不支持

如果向量逻辑类型与您的目标关系数据库的列类型之间没有直接映射,您可以使用 VectorToJsonConverter 将向量逻辑类型转换为 JSON,以便将其写入任何目标关系数据库。

列和数据类型传播

除了前面表格中显示的原始和逻辑映射之外,如果更改事件的源是 Debezium 源连接器,那么列类型以及其长度、精度和小数位的解析还可以通过启用列或数据类型传播来进一步影响。列和数据类型传播有助于确定传入数据的结构和数据类型如何被翻译并应用于接收器目标。如前文 数据和列类型映射 中所述,您可以通过在源连接器配置中设置以下属性之一来强制传播:

  • column.propagate.source.type

  • datatype.propagate.source.type

Debezium JDBC 接收器连接器仅应用具有更高优先级的那些值。

为了说明传播如何影响连接器映射数据类型,让我们看一个例子。以下示例显示了可能包含在更改事件中的字段模式。

示例 3. 启用列或数据类型传播的 Debezium 更改事件字段模式
{
  "schema": {
    "type": "INT8",
    "parameters": {
      "__debezium.source.column.type": "TINYINT",
      "__debezium.source.column.length": "1"
    }
  }
}

由于发出事件的源连接器配置为使用列或数据类型传播,因此事件包含了指定列 typelength 的参数。

如果源连接器未启用传播,则 typelength 参数将缺失,Debezium JDBC 接收器连接器将默认将 type 字段中的 INT8 值映射到 Types.SMALLINT 列类型。根据目标数据库的 SQL 方言,连接器随后会将 JDBC Types.SMALLINT 类型解析为多个逻辑类型之一。例如,对于 MySQL 接收器目标,JDBC 连接器默认将 Types.SMALLINT 转换为 TINYINT 列类型,不指定长度。

但是,由于源连接器中启用了类型传播,它发出的事件包含了 typelength,提供了更具体的映射说明。因此,Debezium JDBC 接收器连接器不使用默认映射,而是使用给定的参数值来优化映射,并在接收器数据库中创建一个类型为 TINYINT(1) 的列。

通常,当源数据库和接收器数据库类型相同时,使用列或数据类型传播的效果最为显著。因为源数据库和接收器数据库具有相同的底层模式结构,所以在它们之间映射数据类型而无需应用复杂转换或解释会更容易。我们一直在寻找改善异构数据库之间映射的方法,并且当前的类型系统允许我们根据反馈不断优化这些映射。如果您发现某个映射可以改进,请告诉我们。

转换

Debezium JDBC 连接器提供几种转换,可以添加到连接器配置中,在连接器处理消耗的事件之前对其进行修改。

命名转换

Debezium JDBC 连接器提供了两种命名转换,您可以将其应用于更改事件中的主题或字段名称的命名样式。可选地,您还可以使用命名转换为转换后的名称添加前缀或后缀。

您可以将命名转换配置为使用以下命名样式之一:

camel_case

删除所有句点 (.) 和下划线 (_) 字符,并将紧跟的下一个字符转换为大写。例如,值 inventory.customers 将更改为 inventoryCustomers

snake_case

删除所有句点 (.) 字符并用下划线 (_) 替换。此外,所有数字序列都以一个下划线 (_) 作为前缀,所有大写字符都转换为小写,然后以一个下划线 (_) 作为前缀。例如,值 public.inventory 将变为 public_inventory,而 TopicWith123Numbers 将变为 topic_with_123_numbers

upper_case

转换为大写。例如,值 public.inventory 将变为 PUBLIC.INVENTORY

lower_case

转换为小写。例如,值 PUBLIC.INVENTORY 将变为 public.inventory

连接器提供以下命名转换:

CollectionNameTransformation

CollectionNameTransformation 提供了一种在事件被消费之前更改主题名称大小写的方式。此转换具有以下配置属性:

属性 Default (默认值) 描述

none

指定要应用于事件主题的命名样式。

指定转换后应用于主题名称的前缀。

指定转换后应用于主题名称的后缀。

以下配置示例演示了如何使用 CollectionNameTransformation SMT 将主题名称设置为大写,并以 ADT_ 为前缀。例如,给定一个名为 public.inventory 的主题,主题名称将变为 ADT_PUBLIC.INVENTORY

CollectionNameTransformation 配置示例
{
  "transforms": "topic-uppercase",
  "transforms.topic-uppercase.type": "io.debezium.connector.jdbc.transforms.CollectionNameTransformation",
  "transforms.topic-uppercase.collection.naming.style": "upper_case",
  "transforms.topic-uppercase.collection.naming.prefix": "ADT_"
}
FieldNameTransformation

FieldNameTransformation 提供了一种在事件被消费之前更改字段名称大小写的方式。如果事件是 Debezium 源连接器事件,则仅更改 beforeafter 部分中的字段。如果事件不是 Debezium 源连接器事件,则会更改所有顶层字段名称。

此转换具有以下配置属性:

属性 Default (默认值) 描述

none

指定要应用于字段名称的命名样式。

指定转换后应用于字段名称的前缀。

指定转换后应用于字段名称的后缀。

以下示例展示了如何配置 FieldNameTransformation SMT,将字段名称转换为小写,并以字符串 adt_ 作为前缀。将 SMT 应用于包含名称为 ID 的字段的事件后,该字段将被重命名为 adt_id

FieldNameTransformation 配置示例
{
  "transforms": "topic-lowercase",
  "transforms.topic-lowercase.type": "io.debezium.connector.jdbc.transforms.FieldNameTransformation",
  "transforms.topic-lowercase.collection.naming.style": "lower_case",
  "transforms.topic-lowercase.collection.naming.prefix": "adt_"
}

部署

要部署 Debezium JDBC 连接器,请安装 Debezium JDBC 连接器存档,配置连接器,然后通过将配置添加到 Kafka Connect 来启动连接器。

先决条件
  • 已安装 Apache KafkaKafka Connect

  • 您有一个 Kafka 主题,连接器可以从中读取更改事件记录。

  • 已安装并配置为接受 JDBC 连接的目标数据库。

过程
  1. 下载 Debezium JDBC 连接器插件存档

  2. 将文件解压到您的 Kafka Connect 环境。

  3. 可选地,从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC 接收器连接器 JAR 文件的目录中。

    Oracle 和 Db2 的驱动程序不包含在 JDBC 接收器连接器中。您必须手动下载并安装驱动程序。

  4. 将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。确保您安装 JDBC 接收器连接器的路径是 Kafka Connect plugin.path 的一部分。

  5. 重新启动 Kafka Connect 进程以加载新的 JAR 文件。

Debezium JDBC 连接器配置

通常,您通过提交一个 JSON 请求来注册 Debezium JDBC 连接器,该请求指定连接器的配置属性。以下示例显示了一个用于注册 Debezium JDBC 接收器连接器实例的 JSON 请求,该实例从名为 orders 的主题消费事件,并使用最常见的配置设置。

示例:Debezium JDBC 连接器配置
{
    "name": "jdbc-connector",  (1)
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  (2)
        "tasks.max": "1",  (3)
        "connection.url": "jdbc:postgresql:///db",  (4)
        "connection.username": "pguser",  (5)
        "connection.password": "pgpassword",  (6)
        "insert.mode": "upsert",  (7)
        "delete.enabled": "true",  (8)
        "primary.key.mode": "record_key",  (9)
        "schema.evolution": "basic",  (10)
        "use.time.zone": "UTC",  (11)
        "topics": "orders" (12)
    }
}

JDBC 连接器配置设置说明

Item 描述

1

当您将连接器注册到 Kafka Connect 服务时分配给连接器的名称。

2

连接器的 Java 类名。

3

为此连接器创建的最大任务数。

4

连接器用于连接到其写入的接收器数据库的 JDBC URL。

5

用于身份验证的数据库用户名。

6

用于身份验证的数据库用户的密码。

7

连接器使用的 insert.mode

8

启用数据库中记录的删除。有关更多信息,请参阅 delete.enabled 配置属性。

9

指定用于解析主键列的方法。有关更多信息,请参阅 primary.key.mode 配置属性。

10

启用连接器演进目标数据库的模式。有关更多信息,请参阅 schema.evolution 配置属性。

11

写入时间字段类型时使用的时区。

12

要消费的主题列表,用逗号分隔。

有关您可以为 Debezium JDBC 连接器设置的完整配置属性列表,请参阅 JDBC 连接器属性

您可以将此配置与 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个或多个接收器连接器任务,执行以下操作:

  • 连接到数据库。

  • 从订阅的 Kafka 主题中消费事件。

  • 将事件写入配置的数据库。

连接器属性

Debezium JDBC 接收器连接器有多个配置属性,您可以用来实现满足您需求的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

JDBC 连接器 Kafka 消费者属性

属性 Default (默认值) 描述

无默认值

连接器的唯一名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器所必需的。

无默认值

连接器的 Java 类名。对于 Debezium JDBC 连接器,请指定值 io.debezium.connector.jdbc.JdbcSinkConnector

1

此连接器的最大任务数。

无默认值

要消费的主题列表,用逗号分隔。请勿将此属性与 topics.regex 属性结合使用。

无默认值

一个正则表达式,用于指定要消费的主题。内部,正则表达式会被编译为 java.util.regex.Pattern。请勿将此属性与 topics 属性结合使用。

JDBC 连接器连接属性

属性 Default (默认值) 描述

org.hibernate.c3p0.internal.C3P0ConnectionProvider

要使用的连接提供程序实现。

无默认值

用于连接到数据库的 JDBC 连接 URL。

无默认值

连接器用于连接到数据库的数据库用户帐户名称。

无默认值

连接器用于连接到数据库的密码。

5

指定池中的最小连接数。

32

指定池维护的最大并发连接数。

32

指定当连接池超出其最大大小时,连接器尝试获取的连接数。

1800

指定在丢弃未使用连接之前将其保留的时间(秒)。

false

指定连接器在出现暂时的 JDBC 连接错误后是否重试。

启用(true)后,连接器会将连接问题(如套接字关闭或超时)视为可重试的,从而允许它重试处理而不是使任务失败。这可以减少停机时间,并提高对临时中断的弹性。

将此选项设置为 true 可以减少停机时间。但是,在具有异步复制的主副本环境中,如果重试发生在所有更改完全复制之前,可能会导致数据丢失。

在需要强数据一致性的地方谨慎使用。

JDBC 连接器运行时属性

属性 Default (默认值) 描述

UTC

指定插入 JDBC 时间值时使用的时区。

false

指定连接器是否处理 DELETEtombstone 事件并将相应的行从数据库中删除。使用此选项需要将 primary.key.mode 设置为 record.key

false

指定连接器是否处理 TRUNCATE 事件并将相应的表从数据库中截断。

尽管自 9.7 版本以来 Db2 就支持 TRUNCATE 语句,但目前 JDBC 连接器无法处理 Db2 连接器发出的标准 TRUNCATE 事件。

为确保 JDBC 连接器可以处理从 Db2 收到的 TRUNCATE 事件,请使用标准 TRUNCATE TABLE 语句的替代方法执行截断。例如:

ALTER TABLE <table_name> ACTIVATE NOT LOGGED INITIALLY WITH EMPTY TABLE

提交上述查询的用户帐户需要对要截断的表具有 ALTER 权限。

insert

指定将事件插入数据库的策略。以下选项可用:

insert

指定所有事件都应构造基于 INSERT 的 SQL 语句。仅在未使用主键时,或当您可以确定具有现有主键值的行不会发生更新时,才使用此选项。

update

指定所有事件都应构造基于 UPDATE 的 SQL 语句。仅在您可以确定连接器仅接收适用于现有行的事件时,才使用此选项。

upsert

指定连接器使用 upsert 语义将事件添加到表中。也就是说,如果主键不存在,则连接器执行 INSERT 操作;如果键已存在,则连接器执行 UPDATE 操作。当需要幂等写入时,应配置连接器使用此选项。

none

指定连接器如何从事件中解析主键列。

none

指定不创建任何主键列。

kafka

指定连接器使用 Kafka 坐标作为主键列。键坐标从事件的主题名称、分区和偏移量定义,并映射到以下名称的列:

  • __connect_topic

  • __connect_partition

  • __connect_offset

record_key

指定主键列来自事件的记录键。如果记录键是原始类型,则需要 primary.key.fields 属性来指定主键列的名称。如果记录键是 struct 类型,则 primary.key.fields 属性是可选的,可用于将事件键中的部分列指定为主键。

record_value

指定主键列来自事件的值。您可以设置 primary.key.fields 属性将主键定义为事件值中的部分字段;否则,默认情况下使用所有字段。

无默认值

主键列的名称,或用于派生主键的字段的逗号分隔列表。

primary.key.mode 设置为 record_key 并且事件的键是原始类型时,预期此属性指定要用于键的列名。

primary.key.mode 设置为 record_key(非原始键)或 record_value 时,预期此属性指定从键或值中派生表主键的字段名称的逗号分隔列表。如果 primary.key.mode 设置为 record_key(非原始键)或 record_value,并且未指定此属性,则连接器将从记录键或记录值的所有字段派生主键,具体取决于指定的模式。

false

指定生成的 SQL 语句是否使用引号来分隔表名和列名。有关更多详细信息,请参阅 JDBC 引号大小写敏感性 部分。

none

指定连接器如何演进目标表模式。有关更多信息,请参阅 模式演进。以下选项可用:

none

指定连接器不演进目标模式。

basic

指定发生基本演进。连接器通过比较传入事件的记录模式与数据库表结构来添加缺失的列。

${topic}

指定连接器用于构造目标表名称的字符串模式。
当该属性设置为其默认值 ${topic} 时,在连接器从 Kafka 读取事件后,它会将事件记录写入一个名称与源主题名称匹配的目标表中。

您还可以配置此属性以从传入事件记录中的特定字段提取值,然后使用这些值动态生成目标表名称。这种从消息源中的值生成表名称的能力,否则需要使用自定义 Kafka Connect 单个消息转换 (SMT)。

要将该属性配置为动态生成目标表名称,请将其值设置为类似 ${source._field_} 的模式。当您指定此类型的模式时,连接器会从 Debezium 更改事件的 source 块中提取值,然后使用这些值来构造表名称。例如,您可以将该属性的值设置为模式 ${source.schema}_${source.table}。基于此模式,如果连接器读取一个事件,其中源块中的 schema 字段包含值 user,并且 table 字段包含值 tab,则连接器会将事件记录写入名为 user_tab 的表中。

public

指定安装 PostgreSQL PostGIS 扩展的模式名称。默认值为 public;但是,如果 PostGIS 扩展安装在其他模式中,则应使用此属性来指定替代模式名称。

false

指定连接器是否在将数据插入 SQL Server 表的标识列之前自动设置 IDENTITY_INSERT,并在操作后立即取消设置。当默认设置(false)生效时,将数据插入表的 IDENTITY 列会导致 SQL 异常。

500

指定要尝试将多少条记录批量处理到目标表中。

请注意,如果您将 Connect 工作程序属性中的 consumer.max.poll.records 设置为一个低于 batch.size 的值,则批量处理将被 consumer.max.poll.records 限制,并且不会达到所需的 batch.size。您还可以使用连接器配置中的 consumer.override.max.poll.records 来配置连接器底层消费者的 max.poll.records

false

指定是否启用 Debezium JDBC 连接器的缩减缓冲区。

选择以下设置之一:

false

(默认) 连接器将从 Kafka 消耗的每个更改事件作为单独的逻辑 SQL 更改写入。

true

连接器使用缩减缓冲区在将更改事件写入接收器数据库之前对其进行缩减。也就是说,如果多个事件引用同一个主键,连接器会合并 SQL 查询,并仅根据最新偏移量记录中报告的行状态写入单个逻辑 SQL 更改。
选择此选项以减少对目标数据库的 SQL 负载。

为了在启用了缩减缓冲区的 PostgreSQL 接收器数据库中优化查询处理,您还必须通过将 reWriteBatchedInserts 参数添加到 JDBC 连接 URL 来启用数据库执行批量查询。

空字符串

一个可选的、逗号分隔的字段名称列表,这些字段名称与要包含在更改事件值中的字段的完全限定名称匹配。字段的完全限定名称形式为 fieldNametopicName:_fieldName_

如果在此配置中包含此属性,请不要设置 field.exclude.list 属性。

空字符串

一个可选的、逗号分隔的字段名称列表,这些字段名称与要从更改事件值中排除的字段的完全限定名称匹配。字段的完全限定名称形式为 fieldNametopicName:_fieldName_

如果在此配置中包含此属性,请不要设置 field.include.list 属性。

5

指定连接器在尝试将更改刷新到目标数据库导致某些数据库错误后执行的最大重试次数。如果重试次数超过重试值,接收器连接器将进入 FAILED 状态。

1000

指定连接器等待重试失败的刷新操作的毫秒数。

当您同时设置 flush.retry.delay.msflush.max.retries 属性时,这会影响 Kafka 的 max.poll.interval.ms 属性的行为。为防止连接器重新平衡,请将总重试时间(flush.retry.delay.ms * flush.max.retries)设置为小于 max.poll.interval.ms(默认为 5 分钟)的值。

JDBC 连接器可扩展属性

属性 Default (默认值) 描述

io.debezium.connector.jdbc.naming.DefaultColumnNamingStrategy

指定连接器用于从传入事件字段名称解析列名称的 ColumnNamingStrategy 实现的完全限定类名。

默认行为是使用字段名称作为列名称,不做任何转换。

io.debezium.connector.jdbc.nnaming.DefaultCollectionNamingStrategy

指定连接器用于从传入事件主题名称解析表名称的 CollectionNamingStrategy 实现的完全限定类名。

默认行为是:

  • 清理主题名称,将点 (.) 替换为下划线 (_)。

  • collection.name.format 配置属性中,用事件的主题替换 ${topic} 占位符。

JDBC 连接器 hibernate.* 直通属性

Kafka Connect 支持直通配置,使您能够通过将某些属性直接从连接器配置传递来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器的 连接属性(例如,connection.urlconnection.usernameconnection.pool.*_size)以及通过连接器的 运行时属性(例如,use.time.zonequote.identifiers)公开。

如果您想自定义其他 Hibernate 行为,可以通过在连接器配置中添加使用 hibernate.* 命名空间的属性来利用直通机制。例如,为了帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect 属性并将其设置为数据库的完全限定类名,例如 org.hibernate.dialect.MariaDBDialect

常见问题解答

是否需要 ExtractNewRecordState 单个消息转换?

不需要,这实际上是 Debezium JDBC 连接器与其他实现区分开来的一个因素。虽然连接器能够像竞争对手一样摄取展平的事件,但它也可以原生摄取 Debezium 的复杂更改事件结构,而无需任何特定类型的转换。

如果列的类型更改,或者列被重命名或删除,这是否由模式演进处理?

不,Debezium JDBC 连接器不会对现有列进行任何更改。连接器支持的模式演进非常基本。它只是将事件结构中的字段与表的列列表进行比较,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值发生更改,连接器不会在目标数据库中进行调整。如果列被重命名,旧列将保持不变,连接器会将一个新名称的列添加到表中;但是,旧列中包含数据的现有行将保持不变。这些类型的模式更改应手动处理。

如果列的类型未解析为我想要的类型,我该如何强制映射到不同的数据类型?

Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何将特定字段的模式定义解析为 JDBC 类型的详细信息,请参阅 数据和列类型映射 部分。如果您想应用不同的数据类型映射,请手动定义表以明确获取首选的列类型。

如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?

为了向目标表名称添加前缀或后缀,请调整 collection.name.format 连接器配置属性以应用您想要的前缀或后缀。例如,要将所有表名称加上 jdbc_ 前缀,请将 collection.name.format 配置属性设置为 jdbc_${topic}。如果连接器订阅了名为 orders 的主题,则生成的表将创建为 jdbc_orders

为什么某些列会自动被引用,即使未启用标识符引用?

在某些情况下,特定的列名或表名可能会被显式引用,即使未启用 quote.identifiers。当列名或表名以某个特定约定开头或使用该约定,否则将被视为非法语法时,这通常是必要的。例如,当 primary.key.mode 设置为 kafka 时,某些数据库只允许列名以划线开头,前提是该列名被引用。引用行为是特定于方言的,并且在不同类型的数据库之间有所不同。