Debezium JDBC 连接器
概述
Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器实现,它可以从多个源主题消费事件,然后使用 JDBC 驱动程序将这些事件写入关系数据库。该连接器支持多种数据库方言,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。
JDBC 连接器的工作原理
Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器,因此需要 Kafka Connect 运行时。该连接器会定期轮询它订阅的 Kafka 主题,从中消费事件,然后将这些事件写入配置的关系数据库。该连接器通过使用 upsert 语义和基本的模式演进支持幂等写入操作。
Debezium JDBC 连接器提供以下功能:
处理复杂的 Debezium 更改事件
默认情况下,Debezium 源连接器会生成复杂、分层的更改事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单个消息转换 (SMT) 来展平事件的有效负载,以便接收器实现能够消费它们。如果您运行 Debezium JDBC 接收器连接器,则无需部署 SMT,因为 Debezium 接收器连接器可以直接消费原生的 Debezium 更改事件,而无需进行转换。
当 JDBC 接收器连接器从 Debezium 源连接器消费复杂更改事件时,它会提取原始 insert 或 update 事件的 after 部分的值。当接收器连接器消费删除事件时,不会考虑事件的有效负载的任何部分。
|
Debezium JDBC 接收器连接器不设计用于读取模式更改主题。如果您的源连接器配置为捕获模式更改,请在 JDBC 连接器配置中设置 |
多个任务
您可以跨多个 Kafka Connect 任务运行 Debezium JDBC 接收器连接器。要跨多个任务运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数量。Kafka Connect 运行时会启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理多个源主题的更改来提高性能。
数据和列类型映射
为了使 Debezium JDBC 接收器连接器能够正确地将入站消息字段中的数据类型映射到出站消息字段中的数据类型,连接器需要有关源事件中存在的每个字段的数据类型的信息。该连接器支持跨不同数据库方言的广泛的列类型映射。为了正确地从事件字段中的 type 元数据转换目标列类型,连接器应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.type 或 datatype.propagate.source.type 选项来增强连接器解析列数据类型的方式。启用这些选项后,Debezium 会包含额外的参数元数据,这有助于 JDBC 接收器连接器更准确地解析目标列的数据类型。
要使 Debezium JDBC 接收器连接器能够处理 Kafka 主题中的事件,Kafka 主题消息键(如果存在)必须是原始数据类型或 Struct。此外,源消息的有效负载必须是 Struct,它具有一个展平的结构,没有嵌套的 struct 类型,或者具有符合 Debezium 的复杂、分层结构的嵌套 struct 布局。
如果 Kafka 主题中事件的结构不符合这些规则,您必须实现自定义的单个消息转换,将源事件的结构转换为可用的格式。
主键处理
默认情况下,Debezium JDBC 接收器连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,稳定的主键缺失可能会使事件处理复杂化,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主键,您可以将连接器配置为使用下表中描述的主键模式之一。
| 模式 | 描述 |
|---|---|
|
创建表时未指定任何主键字段。 |
|
主键由以下三个列组成:
这些列的值来自 Kafka 事件的坐标。 |
|
主键由 Kafka 事件的键组成。 |
|
主键由 Kafka 事件的值组成。 |
|
主键由 Kafka 事件的头部组成。 |
|
如果将
|
|
如果某个列映射到目标数据库不允许作为主键的数据类型,则需要在 |
删除模式
当消费到 DELETE 或 tombstone 事件时,Debezium JDBC 接收器连接器可以删除目标数据库中的行。默认情况下,JDBC 接收器连接器不启用删除模式。
如果您希望连接器删除行,则必须在连接器配置中显式设置 delete.enabled=true。要使用此模式,您还必须将 primary.key.fields 设置为 none 以外的值。上述配置是必需的,因为删除是基于主键映射执行的,所以如果目标表没有主键映射,连接器就无法删除行。
幂等写入
Debezium JDBC 接收器连接器可以执行幂等写入,从而使其能够反复重放相同的记录,而不会改变最终的数据库状态。
要使连接器能够执行幂等写入,您必须在连接器中显式将 insert.mode 设置为 upsert。upsert 操作根据指定的主键是否已存在,执行为 update 或 insert。
如果主键值已存在,操作将更新行中的值。如果指定的主键值不存在,insert 将添加新行。
每个数据库方言处理幂等写入的方式不同,因为没有 SQL 标准来处理 upsert 操作。下表显示了 Debezium 支持的数据库方言的 upsert DML 语法。
| 方言 | Upsert 语法 |
|---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
模式演进
您可以使用以下模式演进与 Debezium JDBC 接收器连接器:
| 模式 | 描述 |
|---|---|
|
连接器不执行任何 DDL 模式演进。 |
|
连接器会自动检测事件有效负载中存在但目标表中不存在的字段。连接器会更改目标表以添加新字段。 |
当 schema.evolution 设置为 basic 时,连接器会根据传入事件的结构自动创建或修改目标数据库表。
当第一次从主题接收事件,并且目标表尚不存在时,Debezium JDBC 接收器连接器会使用事件的键或记录的结构来解析表的列结构。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前,准备并执行 CREATE TABLE SQL 语句。
当 Debezium JDBC 连接器从主题接收事件时,如果记录的模式结构与目标表的模式结构不同,连接器会使用事件的键或其模式结构来识别哪些列是新的,需要添加到数据库表中。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前,准备并执行 ALTER TABLE SQL 语句。由于更改列数据类型、删除列和调整主键可能被视为危险操作,因此连接器不允许执行这些操作。
每个字段的模式决定了列是 NULL 还是 NOT NULL。模式还定义了每列的默认值。如果连接器尝试创建一个具有您不想要的 nullability 设置或默认值的表,您必须手动提前创建表,或者在接收器连接器处理事件之前调整相关字段的模式。要调整 nullability 设置或默认值,您可以引入自定义的单个消息转换,在管道中应用更改,或修改源数据库中定义的列状态。
字段的数据类型是根据预定义的映射集解析的。有关更多信息,请参阅 JDBC 字段类型。
|
当您向目标数据库中已存在的表的事件结构添加新字段时,必须将新字段定义为可选字段,或者数据库模式中必须指定一个默认值。如果您希望从目标表中删除某个字段,请使用以下选项之一:
|
引号和大小写敏感性
Debezium JDBC 接收器连接器通过构建 DDL(模式更改)或 DML(数据更改)SQL 语句来消费 Kafka 消息,这些语句会在目标数据库上执行。默认情况下,连接器使用源主题和事件字段的名称作为目标表中表和列名称的基础。构造的 SQL 不会自动用引号分隔标识符以保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于数据库在未指定大小写时如何处理名称字符串。
例如,如果目标数据库方言是 Oracle,并且事件主题是 orders,则目标表将创建为 ORDERS,因为 Oracle 在未引用名称时默认使用大写名称。同样,如果目标数据库方言是 PostgreSQL,并且事件主题是 ORDERS,则目标表将创建为 orders,因为 PostgreSQL 在未引用名称时默认使用小写名称。
要显式保留 Kafka 事件中的表和字段名称的大小写,请在连接器配置中将 quote.identifiers 属性的值设置为 true。当此选项设置为 true 时,如果传入事件的主题名为 orders,并且目标数据库方言是 Oracle,则连接器将创建一个名为 orders 的表,因为构造的 SQL 将表名定义为 "orders"。启用引号会导致连接器创建列名时产生相同的行为。
连接空闲超时
Debezium 的 JDBC 接收器连接器利用连接池来提高性能。连接池旨在建立一组初始连接,维护指定数量的连接,并根据需要有效地将连接分配给应用程序。但是,当连接在池中长时间空闲时会出现问题,如果它们保持不活动状态超过数据库配置的空闲超时阈值,可能会导致超时。
为了减轻空闲连接线程触发超时的可能性,连接池提供了一种机制,该机制会定期验证每个连接的活动状态。此验证可确保连接保持活动状态,并防止数据库将其标记为空闲。在发生网络中断的情况下,如果 Debezium 尝试使用已终止的连接,连接器会提示池生成新连接。
默认情况下,Debezium JDBC 接收器连接器不执行空闲超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period 属性来配置连接器,使其以指定的时间间隔执行超时测试。例如:
{
"hibernate.c3p0.idle_test_period": "300"
}
Debezium JDBC 接收器连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0.* 配置命名空间中设置属性来定制 CP30 连接池。在前面的示例中,设置 hibernate.c3p0.idle_test_period 属性会将连接池配置为每 300 秒执行一次空闲超时测试。应用配置后,连接池将开始每五分钟评估一次未使用的连接。
数据类型映射
在 Debezium JDBC 接收器连接器将数据发送到接收器数据库之前,它会将原始源记录中的数据类型转换为目标系统中的相应类型。适当的数据类型映射可确保原始数据在目标数据库中得到准确表示。
Debezium JDBC 接收器连接器通过使用逻辑或原始类型映射系统来解析列的数据类型。原始类型包括整数、浮点数、布尔值、字符串和字节等值。通常,Kafka 消息使用特定的 Kafka Connect Schema 类型代码来表示原始数据类型。
相比之下,为了表示更复杂的数据,Debezium 使用逻辑模式名称,它们提供命名字段的逻辑分组,可以表示一系列数据类型(字符串、数组、JSON、XML 等)。这些结构化类型会考虑数据的语义含义,并对如何序列化底层原始类型提供更明确的解释。逻辑类型在表示具有特定编码的值时很有用,例如表示自纪元以来的时间的数字。
以下示例显示了原始数据类型和逻辑数据类型的代表性结构。
{
"schema": {
"type": "INT64"
}
}
{
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
}
Kafka Connect 并非这些复杂逻辑类型的唯一来源。实际上,当 Debezium 源连接器发出更改事件时,它可以为事件字段分配类似的逻辑类型来表示时间戳、日期甚至 JSON 数据等数据类型。
Debezium JDBC 接收器连接器使用这些原始类型和逻辑类型来将列的类型解析为 JDBC SQL 代码,后者表示列的类型。这些 JDBC SQL 代码随后被底层 Hibernate 持久化框架用于将列的类型解析为所用方言的逻辑数据类型。下表说明了 Kafka Connect 和 JDBC SQL 类型之间,以及 Debezium 和 JDBC SQL 类型之间的原始和逻辑映射。实际的最终列类型因数据库类型而异。
Kafka Connect 原始数据类型映射
| 原始类型 | JDBC SQL 类型 |
|---|---|
INT8 |
Types.TINYINT |
INT16 |
Types.SMALLINT |
INT32 |
Types.INTEGER |
INT64 |
Types.BIGINT |
FLOAT32 |
Types.FLOAT |
FLOAT64 |
Types.DOUBLE |
BOOLEAN |
Types.BOOLEAN |
STRING |
Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES |
Types.VARBINARY |
|
该连接器不支持 Oracle 23 中的 |
Kafka Connect 逻辑数据类型映射
| 逻辑类型 | JDBC SQL 类型 |
|---|---|
org.apache.kafka.connect.data.Decimal |
Types.DECIMAL |
org.apache.kafka.connect.data.Date |
Types.DATE |
org.apache.kafka.connect.data.Time |
Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp |
Types.TIMESTAMP |
Debezium 逻辑数据类型映射
| 逻辑类型 | JDBC SQL 类型 |
|---|---|
io.debezium.time.Date |
Types.DATE |
io.debezium.time.Time |
Types.TIMESTAMP |
io.debezium.time.MicroTime |
Types.TIMESTAMP |
io.debezium.time.NanoTime |
Types.TIMESTAMP |
io.debezium.time.ZonedTime |
Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp |
Types.TIMESTAMP |
io.debezium.time.MicroTimestamp |
Types.TIMESTAMP |
io.debezium.time.NanoTimestamp |
Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp |
Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal |
Types.DOUBLE |
|
如果数据库不支持带时区的日期或时间戳,则映射将解析为其不带时区的等效项。 |
Debezium 特定于方言的数据类型映射
| 逻辑类型 | MySQL SQL 类型 | PostgreSQL SQL 类型 | SQL Server SQL 类型 |
|---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
|
Types.VARCHAR |
n/a |
io.debezium.data.Json |
|
|
n/a |
io.debezium.data.EnumSet |
|
n/a |
n/a |
io.debezium.time.Year |
|
n/a |
n/a |
io.debezium.time.MicroDuration |
n/a |
|
n/a |
io.debezium.data.Ltree |
n/a |
|
n/a |
io.debezium.data.Uuid |
n/a |
|
n/a |
io.debezium.data.Xml |
n/a |
|
|
Debezium 逻辑向量类型映射
Debezium JDBC 接收器连接器支持将逻辑向量数据类型从源事件直接映射到接收器目标,前提是目标数据库支持可比较的表示。
如果源中的向量字段具有逻辑名称,Debezium 会使用它来确定适当的映射。对于某些数据库,连接器会在 io.debezium.data.vector.* 命名空间下识别以下逻辑名称:
-
FloatVector -
DoubleVector -
SparseVector
当连接器处理消息时,如果存在这些逻辑类型,连接器会检查目标数据库中是否存在相应的映射。如果存在映射,则应用它;否则,它将根据 Kafka Connect 模式类型将数据序列化为字符串。
例如,配置为将数据发送到 PostgreSQL 数据库接收器的连接器会将具有 io.debezium.data.FloatVector 逻辑名称的字段映射到 halfvector 列类型,使用特殊的覆盖映射。相比之下,如果接收器数据库中不存在直接映射,例如 Oracle 接收器,则连接器将默认将向量数据序列化为其原始字符串格式。
并非所有支持的数据库版本都提供对特殊向量类型的支持。接收器数据库必须满足以下要求才能支持逻辑向量类型的直接映射:
-
MariaDB 版本 11.7 或更高版本
-
MySQL 版本 9.0 或更高版本
-
PostgreSQL 需要 pgvector 扩展
早期版本的 MariaDB 或 MySQL,以及没有 pgvector 的 PostgreSQL 不支持特殊向量类型。
| 逻辑类型 | Db2 | MySQL | PostgreSQL | Oracle | SQL Server |
|---|---|---|---|---|---|
io.debezium.data.DoubleVector |
不支持 |
'vector' |
|
不支持 |
不支持 |
io.debezium.data.FloatVector |
不支持 |
'vector' |
|
不支持 |
不支持 |
io.debezium.data.SparseVector |
不支持 |
不支持 |
|
不支持 |
不支持 |
|
如果向量逻辑类型与您的目标关系数据库的列类型之间没有直接映射,您可以使用 |
列和数据类型传播
除了前面表格中显示的原始和逻辑映射之外,如果更改事件的源是 Debezium 源连接器,那么列类型以及其长度、精度和小数位的解析还可以通过启用列或数据类型传播来进一步影响。列和数据类型传播有助于确定传入数据的结构和数据类型如何被翻译并应用于接收器目标。如前文 数据和列类型映射 中所述,您可以通过在源连接器配置中设置以下属性之一来强制传播:
-
column.propagate.source.type -
datatype.propagate.source.type
Debezium JDBC 接收器连接器仅应用具有更高优先级的那些值。
为了说明传播如何影响连接器映射数据类型,让我们看一个例子。以下示例显示了可能包含在更改事件中的字段模式。
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
由于发出事件的源连接器配置为使用列或数据类型传播,因此事件包含了指定列 type 和 length 的参数。
如果源连接器未启用传播,则 type 和 length 参数将缺失,Debezium JDBC 接收器连接器将默认将 type 字段中的 INT8 值映射到 Types.SMALLINT 列类型。根据目标数据库的 SQL 方言,连接器随后会将 JDBC Types.SMALLINT 类型解析为多个逻辑类型之一。例如,对于 MySQL 接收器目标,JDBC 连接器默认将 Types.SMALLINT 转换为 TINYINT 列类型,不指定长度。
但是,由于源连接器中启用了类型传播,它发出的事件包含了 type 和 length,提供了更具体的映射说明。因此,Debezium JDBC 接收器连接器不使用默认映射,而是使用给定的参数值来优化映射,并在接收器数据库中创建一个类型为 TINYINT(1) 的列。
|
通常,当源数据库和接收器数据库类型相同时,使用列或数据类型传播的效果最为显著。因为源数据库和接收器数据库具有相同的底层模式结构,所以在它们之间映射数据类型而无需应用复杂转换或解释会更容易。我们一直在寻找改善异构数据库之间映射的方法,并且当前的类型系统允许我们根据反馈不断优化这些映射。如果您发现某个映射可以改进,请告诉我们。 |
转换
Debezium JDBC 连接器提供几种转换,可以添加到连接器配置中,在连接器处理消耗的事件之前对其进行修改。
命名转换
Debezium JDBC 连接器提供了两种命名转换,您可以将其应用于更改事件中的主题或字段名称的命名样式。可选地,您还可以使用命名转换为转换后的名称添加前缀或后缀。
您可以将命名转换配置为使用以下命名样式之一:
camel_case-
删除所有句点 (
.) 和下划线 (_) 字符,并将紧跟的下一个字符转换为大写。例如,值inventory.customers将更改为inventoryCustomers。 snake_case-
删除所有句点 (
.) 字符并用下划线 (_) 替换。此外,所有数字序列都以一个下划线 (_) 作为前缀,所有大写字符都转换为小写,然后以一个下划线 (_) 作为前缀。例如,值public.inventory将变为public_inventory,而TopicWith123Numbers将变为topic_with_123_numbers。 upper_case-
转换为大写。例如,值
public.inventory将变为PUBLIC.INVENTORY。 lower_case-
转换为小写。例如,值
PUBLIC.INVENTORY将变为public.inventory。
连接器提供以下命名转换:
- CollectionNameTransformation
-
CollectionNameTransformation提供了一种在事件被消费之前更改主题名称大小写的方式。此转换具有以下配置属性:属性 Default (默认值) 描述 none指定要应用于事件主题的命名样式。
空
指定转换后应用于主题名称的前缀。
空
指定转换后应用于主题名称的后缀。
以下配置示例演示了如何使用
CollectionNameTransformationSMT 将主题名称设置为大写,并以ADT_为前缀。例如,给定一个名为public.inventory的主题,主题名称将变为ADT_PUBLIC.INVENTORY。CollectionNameTransformation 配置示例{ "transforms": "topic-uppercase", "transforms.topic-uppercase.type": "io.debezium.connector.jdbc.transforms.CollectionNameTransformation", "transforms.topic-uppercase.collection.naming.style": "upper_case", "transforms.topic-uppercase.collection.naming.prefix": "ADT_" } - FieldNameTransformation
-
FieldNameTransformation提供了一种在事件被消费之前更改字段名称大小写的方式。如果事件是 Debezium 源连接器事件,则仅更改before和after部分中的字段。如果事件不是 Debezium 源连接器事件,则会更改所有顶层字段名称。此转换具有以下配置属性:
属性 Default (默认值) 描述 none指定要应用于字段名称的命名样式。
空
指定转换后应用于字段名称的前缀。
空
指定转换后应用于字段名称的后缀。
以下示例展示了如何配置
FieldNameTransformationSMT,将字段名称转换为小写,并以字符串adt_作为前缀。将 SMT 应用于包含名称为ID的字段的事件后,该字段将被重命名为adt_id。FieldNameTransformation 配置示例{ "transforms": "topic-lowercase", "transforms.topic-lowercase.type": "io.debezium.connector.jdbc.transforms.FieldNameTransformation", "transforms.topic-lowercase.collection.naming.style": "lower_case", "transforms.topic-lowercase.collection.naming.prefix": "adt_" }
部署
要部署 Debezium JDBC 连接器,请安装 Debezium JDBC 连接器存档,配置连接器,然后通过将配置添加到 Kafka Connect 来启动连接器。
-
已安装 Apache Kafka 和 Kafka Connect。
-
您有一个 Kafka 主题,连接器可以从中读取更改事件记录。
-
已安装并配置为接受 JDBC 连接的目标数据库。
-
下载 Debezium JDBC 连接器插件存档。
-
将文件解压到您的 Kafka Connect 环境。
-
可选地,从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC 接收器连接器 JAR 文件的目录中。
Oracle 和 Db2 的驱动程序不包含在 JDBC 接收器连接器中。您必须手动下载并安装驱动程序。
-
将包含 JAR 文件的目录添加到 Kafka Connect 的
plugin.path。确保您安装 JDBC 接收器连接器的路径是 Kafka Connectplugin.path的一部分。 -
重新启动 Kafka Connect 进程以加载新的 JAR 文件。
Debezium JDBC 连接器配置
通常,您通过提交一个 JSON 请求来注册 Debezium JDBC 连接器,该请求指定连接器的配置属性。以下示例显示了一个用于注册 Debezium JDBC 接收器连接器实例的 JSON 请求,该实例从名为 orders 的主题消费事件,并使用最常见的配置设置。
{
"name": "jdbc-connector", (1)
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", (2)
"tasks.max": "1", (3)
"connection.url": "jdbc:postgresql:///db", (4)
"connection.username": "pguser", (5)
"connection.password": "pgpassword", (6)
"insert.mode": "upsert", (7)
"delete.enabled": "true", (8)
"primary.key.mode": "record_key", (9)
"schema.evolution": "basic", (10)
"use.time.zone": "UTC", (11)
"topics": "orders" (12)
}
}
JDBC 连接器配置设置说明
| Item | 描述 |
|---|---|
1 |
当您将连接器注册到 Kafka Connect 服务时分配给连接器的名称。 |
2 |
连接器的 Java 类名。 |
3 |
为此连接器创建的最大任务数。 |
4 |
连接器用于连接到其写入的接收器数据库的 JDBC URL。 |
5 |
用于身份验证的数据库用户名。 |
6 |
用于身份验证的数据库用户的密码。 |
7 |
连接器使用的 |
8 |
启用数据库中记录的删除。有关更多信息,请参阅 |
9 |
指定用于解析主键列的方法。有关更多信息,请参阅 |
10 |
启用连接器演进目标数据库的模式。有关更多信息,请参阅 |
11 |
写入时间字段类型时使用的时区。 |
12 |
要消费的主题列表,用逗号分隔。 |
有关您可以为 Debezium JDBC 连接器设置的完整配置属性列表,请参阅 JDBC 连接器属性。
您可以将此配置与 POST 命令发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个或多个接收器连接器任务,执行以下操作:
-
连接到数据库。
-
从订阅的 Kafka 主题中消费事件。
-
将事件写入配置的数据库。
连接器属性
Debezium JDBC 接收器连接器有多个配置属性,您可以用来实现满足您需求的连接器行为。许多属性都有默认值。有关属性的信息组织如下:
JDBC 连接器 Kafka 消费者属性
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
连接器的唯一名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器所必需的。 |
|
无默认值 |
连接器的 Java 类名。对于 Debezium JDBC 连接器,请指定值 |
|
1 |
此连接器的最大任务数。 |
|
无默认值 |
要消费的主题列表,用逗号分隔。请勿将此属性与 |
|
无默认值 |
一个正则表达式,用于指定要消费的主题。内部,正则表达式会被编译为 |
JDBC 连接器连接属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
要使用的连接提供程序实现。 |
|||
无默认值 |
用于连接到数据库的 JDBC 连接 URL。 |
|||
无默认值 |
连接器用于连接到数据库的数据库用户帐户名称。 |
|||
无默认值 |
连接器用于连接到数据库的密码。 |
|||
|
指定池中的最小连接数。 |
|||
|
指定池维护的最大并发连接数。 |
|||
|
指定当连接池超出其最大大小时,连接器尝试获取的连接数。 |
|||
|
指定在丢弃未使用连接之前将其保留的时间(秒)。 |
|||
|
指定连接器在出现暂时的 JDBC 连接错误后是否重试。 启用(
|
JDBC 连接器运行时属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
指定插入 JDBC 时间值时使用的时区。 |
|||
|
指定连接器是否处理 |
|||
|
指定连接器是否处理
|
|||
|
指定将事件插入数据库的策略。以下选项可用:
|
|||
|
指定连接器如何从事件中解析主键列。
|
|||
无默认值 |
主键列的名称,或用于派生主键的字段的逗号分隔列表。 |
|||
|
指定生成的 SQL 语句是否使用引号来分隔表名和列名。有关更多详细信息,请参阅 JDBC 引号大小写敏感性 部分。 |
|||
|
指定连接器如何演进目标表模式。有关更多信息,请参阅 模式演进。以下选项可用:
|
|||
|
指定连接器用于构造目标表名称的字符串模式。 |
|||
|
指定安装 PostgreSQL PostGIS 扩展的模式名称。默认值为 |
|||
|
指定连接器是否在将数据插入 SQL Server 表的标识列之前自动设置 |
|||
|
指定要尝试将多少条记录批量处理到目标表中。
|
|||
|
指定是否启用 Debezium JDBC 连接器的缩减缓冲区。 选择以下设置之一:
为了在启用了缩减缓冲区的 PostgreSQL 接收器数据库中优化查询处理,您还必须通过将 |
|||
空字符串 |
一个可选的、逗号分隔的字段名称列表,这些字段名称与要包含在更改事件值中的字段的完全限定名称匹配。字段的完全限定名称形式为 |
|||
空字符串 |
一个可选的、逗号分隔的字段名称列表,这些字段名称与要从更改事件值中排除的字段的完全限定名称匹配。字段的完全限定名称形式为 |
|||
5 |
指定连接器在尝试将更改刷新到目标数据库导致某些数据库错误后执行的最大重试次数。如果重试次数超过重试值,接收器连接器将进入 FAILED 状态。 |
|||
1000 |
指定连接器等待重试失败的刷新操作的毫秒数。
|
JDBC 连接器可扩展属性
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
指定连接器用于从传入事件字段名称解析列名称的 |
|
|
指定连接器用于从传入事件主题名称解析表名称的
|
JDBC 连接器 hibernate.* 直通属性
Kafka Connect 支持直通配置,使您能够通过将某些属性直接从连接器配置传递来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器的 连接属性(例如,connection.url、connection.username 和 connection.pool.*_size)以及通过连接器的 运行时属性(例如,use.time.zone、quote.identifiers)公开。
如果您想自定义其他 Hibernate 行为,可以通过在连接器配置中添加使用 hibernate.* 命名空间的属性来利用直通机制。例如,为了帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect 属性并将其设置为数据库的完全限定类名,例如 org.hibernate.dialect.MariaDBDialect。
常见问题解答
- 是否需要
ExtractNewRecordState单个消息转换? -
不需要,这实际上是 Debezium JDBC 连接器与其他实现区分开来的一个因素。虽然连接器能够像竞争对手一样摄取展平的事件,但它也可以原生摄取 Debezium 的复杂更改事件结构,而无需任何特定类型的转换。
- 如果列的类型更改,或者列被重命名或删除,这是否由模式演进处理?
-
不,Debezium JDBC 连接器不会对现有列进行任何更改。连接器支持的模式演进非常基本。它只是将事件结构中的字段与表的列列表进行比较,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值发生更改,连接器不会在目标数据库中进行调整。如果列被重命名,旧列将保持不变,连接器会将一个新名称的列添加到表中;但是,旧列中包含数据的现有行将保持不变。这些类型的模式更改应手动处理。
- 如果列的类型未解析为我想要的类型,我该如何强制映射到不同的数据类型?
-
Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何将特定字段的模式定义解析为 JDBC 类型的详细信息,请参阅 数据和列类型映射 部分。如果您想应用不同的数据类型映射,请手动定义表以明确获取首选的列类型。
- 如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?
-
为了向目标表名称添加前缀或后缀,请调整
collection.name.format连接器配置属性以应用您想要的前缀或后缀。例如,要将所有表名称加上jdbc_前缀,请将collection.name.format配置属性设置为jdbc_${topic}。如果连接器订阅了名为orders的主题,则生成的表将创建为jdbc_orders。 - 为什么某些列会自动被引用,即使未启用标识符引用?
-
在某些情况下,特定的列名或表名可能会被显式引用,即使未启用
quote.identifiers。当列名或表名以某个特定约定开头或使用该约定,否则将被视为非法语法时,这通常是必要的。例如,当primary.key.mode设置为kafka时,某些数据库只允许列名以划线开头,前提是该列名被引用。引用行为是特定于方言的,并且在不同类型的数据库之间有所不同。