Debezium JDBC 连接器
概述
Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器实现,它可以从多个源主题消费事件,然后使用 JDBC 驱动程序将这些事件写入关系型数据库。该连接器支持多种数据库方言,包括 Db2、MySQL、Oracle、PostgreSQL 和 SQL Server。
JDBC 连接器的工作原理
Debezium JDBC 连接器是一个 Kafka Connect 接收器连接器,因此需要 Kafka Connect 运行时。该连接器会定期轮询其订阅的 Kafka 主题,从这些主题消费事件,然后将事件写入配置的关系型数据库。该连接器支持幂等写入操作,使用 upsert 语义和基本的模式演进。
Debezium JDBC 连接器提供以下功能:
消费复杂的 Debezium 变更事件
默认情况下,Debezium 源连接器会生成复杂、分层的变更事件。当 Debezium 连接器与其他 JDBC 接收器连接器实现一起使用时,您可能需要应用 ExtractNewRecordState 单消息转换 (SMT) 来展平变更事件的有效负载,以便接收器实现能够消费。如果您运行 Debezium JDBC 接收器连接器,则无需部署 SMT,因为 Debezium 接收器连接器可以直接消费原生 Debezium 变更事件,而无需使用转换。
当 JDBC 接收器连接器从 Debezium 源连接器消费复杂变更事件时,它会从原始 insert 或 update 事件的 after 部分提取值。当接收器连接器消费删除事件时,不会咨询事件有效负载的任何部分。
|
Debezium JDBC 接收器连接器并非设计用于读取模式变更主题。如果您的源连接器配置为捕获模式变更,请在 JDBC 连接器配置中设置 |
多个任务
您可以跨多个 Kafka Connect 任务运行 Debezium JDBC 接收器连接器。要跨多个任务运行连接器,请将 tasks.max 配置属性设置为您希望连接器使用的任务数。Kafka Connect 运行时会启动指定数量的任务,并为每个任务运行一个连接器实例。多个任务可以通过并行读取和处理来自多个源主题的变更来提高性能。
数据和列类型映射
为了使 Debezium JDBC 接收器连接器能够将入站消息字段的数据类型正确映射到出站消息字段,连接器需要有关源事件中每个字段数据类型的信息。该连接器支持不同数据库方言之间的各种列类型映射。为了正确地将目标列类型从事件字段中的 type 元数据转换为目标列类型,连接器会应用为源数据库定义的数据类型映射。您可以通过在源连接器配置中设置 column.propagate.source.type 或 datatype.propagate.source.type 选项来增强连接器解析列数据类型的方式。启用这些选项后,Debezium 会包含额外的参数元数据,这有助于 JDBC 接收器连接器更准确地解析目标列的数据类型。
为了使 Debezium JDBC 接收器连接器能够处理 Kafka 主题中的事件,Kafka 主题消息键(如果存在)必须是原始数据类型或 Struct。此外,源消息的有效负载必须是 Struct,该 Struct 具有展平的结构(没有嵌套的 struct 类型),或者具有符合 Debezium 复杂、分层结构的嵌套 struct 布局。
如果 Kafka 主题中事件的结构不符合这些规则,您必须实现自定义单消息转换,将源事件的结构转换为可用格式。
主键处理
默认情况下,Debezium JDBC 接收器连接器不会将源事件中的任何字段转换为事件的主键。不幸的是,缺少稳定的主键可能会使事件处理复杂化,具体取决于您的业务需求,或者当接收器连接器使用 upsert 语义时。要定义一致的主键,您可以将连接器配置为使用以下表格中描述的主键模式之一。
| 模式 | 描述 |
|---|---|
|
创建表时未指定主键字段。 |
|
主键由以下三个列组成:
这些列的值来自 Kafka 事件的坐标。 |
|
主键由 Kafka 事件的键组成。 |
|
主键由 Kafka 事件的值组成。 |
|
主键由 Kafka 事件的头组成。 |
|
如果将
|
|
如果列映射到目标数据库不允许作为主键的数据类型,则在 |
删除模式
Debezium JDBC 接收器连接器可以在消费 DELETE 或墓碑事件时删除目标数据库中的行。默认情况下,JDBC 接收器连接器不启用删除模式。
如果您希望连接器删除行,则必须在连接器配置中显式设置 delete.enabled=true。要使用此模式,您还必须将 primary.key.fields 设置为 none 以外的值。前面的配置是必需的,因为删除是基于主键映射执行的,所以如果目标表没有主键映射,连接器就无法删除行。
幂等写入
Debezium JDBC 接收器连接器可以执行幂等写入,使其能够反复重放相同的记录而不会更改最终数据库状态。
要使连接器能够执行幂等写入,您必须在连接器配置中将 insert.mode 显式设置为 upsert。upsert 操作将根据指定的主键是否已存在而执行为 update 或 insert。
如果主键值已存在,则操作会更新行中的值。如果指定的主键值不存在,则 insert 会添加新行。
每个数据库方言处理幂等写入的方式不同,因为没有标准的 SQL 用于upsert操作。下表显示了 Debezium 支持的数据库方言的 upsert DML 语法。
| 方言 | Upsert 语法 |
|---|---|
Db2 |
|
MySQL |
|
Oracle |
|
PostgreSQL |
|
SQL Server |
|
模式演进
您可以使用以下模式演进模式与 Debezium JDBC 接收器连接器:
| 模式 | 描述 |
|---|---|
|
该连接器不执行任何 DDL 模式演进。 |
|
连接器自动检测事件有效负载中存在但目标表中不存在的字段。连接器会修改目标表以添加新字段。 |
当 schema.evolution 设置为 basic 时,连接器会根据入站事件的结构自动创建或修改目标数据库表。
当第一次从主题接收事件,并且目标表尚不存在时,Debezium JDBC 接收器连接器会使用事件的键或记录的模式结构来解析表的列结构。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前准备并执行 CREATE TABLE SQL 语句。
当 Debezium JDBC 连接器从主题接收事件时,如果记录的模式结构与目标表的模式结构不同,连接器会使用事件的键或其模式结构来识别哪些列是新的,并且必须添加到数据库表中。如果启用了模式演进,连接器会在应用 DML 事件到目标表之前准备并执行 ALTER TABLE SQL 语句。由于更改列数据类型、删除列和调整主键可能被视为危险操作,因此连接器被禁止执行这些操作。
每个字段的模式决定了一个列是 NULL 还是 NOT NULL。模式还定义了每个列的默认值。如果连接器尝试创建一个具有您不想要的 nullability 设置或默认值的表,您必须提前手动创建表,或在接收器连接器处理事件之前调整相关字段的模式。要调整 nullability 设置或默认值,您可以引入自定义单消息转换,在管道中应用更改,或修改源数据库中定义的列状态。
字段的数据类型是根据预定义的映射集解析的。有关更多信息,请参阅 JDBC 字段类型。
|
当您将新字段引入到目标数据库中已存在的表的事件结构时,必须将新字段定义为可选的,或者字段必须在数据库模式中指定默认值。如果您希望从目标表中删除某个字段,请使用以下选项之一:
|
引用和大小写敏感性
Debezium JDBC 接收器连接器通过构建将在目标数据库上执行的 DDL(模式更改)或 DML(数据更改)SQL 语句来消费 Kafka 消息。默认情况下,连接器使用源主题和事件字段的名称作为目标表中表和列名称的基础。构造的 SQL 不会自动用引号分隔标识符以保留原始字符串的大小写。因此,默认情况下,目标数据库中表或列名称的文本大小写完全取决于数据库在未指定大小时如何处理名称字符串。
例如,如果目标数据库方言是 Oracle 并且事件的主题是 orders,则目标表将创建为 ORDERS,因为 Oracle 在未引用名称时默认为大写名称。类似地,如果目标数据库方言是 PostgreSQL 并且事件的主题是 ORDERS,则目标表将创建为 orders,因为 PostgreSQL 在未引用名称时默认为小写名称。
要显式保留 Kafka 事件中存在的表名和字段名的大小写,请在连接器配置中将 quote.identifiers 属性的值设置为 true。当此选项设置为 true 时,如果入站事件的主题为 orders,并且目标数据库方言为 Oracle,则连接器会创建一个名为 orders 的表,因为构造的 SQL 将表名定义为 "orders"。启用引用后,当连接器创建列名时也会发生相同的行为。
连接空闲超时
Debezium 的 JDBC 接收器连接器利用连接池来提高性能。连接池旨在建立一组初始连接,维护指定数量的连接,并根据需要有效地将连接分配给应用程序。但是,当连接在池中长时间空闲时会出现一个挑战,如果它们保持不活动状态超过数据库配置的空闲超时阈值,则可能触发超时。
为减轻空闲连接线程可能触发超时的可能性,连接池提供了一种机制,可以定期验证每个连接的活动状态。此验证可确保连接保持活动状态,并防止数据库将其标记为空闲。如果发生网络中断,并且 Debezium 尝试使用已终止的连接,连接器将提示池生成新连接。
默认情况下,Debezium JDBC 接收器连接器不会执行空闲超时测试。但是,您可以通过设置 hibernate.c3p0.idle_test_period 属性来配置连接器,使其按指定的时间间隔请求池执行超时测试。例如:
{
"hibernate.c3p0.idle_test_period": "300"
}
Debezium JDBC 接收器连接器使用 Hibernate C3P0 连接池。您可以通过在 hibernate.c3p0.*` 配置名称空间中设置属性来定制 CP30 连接池。在前面的示例中,设置 hibernate.c3p0.idle_test_period 属性会将连接池配置为每 300 秒执行一次空闲超时测试。应用配置后,连接池将开始每五分钟评估一次未使用的连接。
数据类型映射
在 Debezium JDBC 接收器连接器将数据发送到接收数据库之前,它会将原始源记录中的数据类型转换为目标系统中相应的数据类型。适当的数据类型映射可确保原始数据在目标数据库中得到准确表示。
Debezium JDBC 接收器连接器使用逻辑或原始类型映射系统来解析列的数据类型。原始类型包括整数、浮点数、布尔值、字符串和字节等值。通常,Kafka 消息使用特定的 Kafka Connect Schema 类型代码来表示原始数据类型。
相比之下,为了表示更复杂的数据,Debezium 使用逻辑模式名称,这些名称提供命名字段的逻辑分组,可以表示一系列数据类型(字符串、数组、JSON、XML 等)。这些结构化类型考虑了数据的语义含义,并对如何序列化底层原始类型提供了更明确的解释。逻辑类型在表示具有特定编码的值时很有用,例如表示自纪元以来的时间数字。
以下示例显示了原始和逻辑数据类型的代表性结构。
{
"schema": {
"type": "INT64"
}
}
{
"schema": {
"type": "INT64",
"name": "org.apache.kafka.connect.data.Date"
}
}
Kafka Connect 并不是这些复杂逻辑类型的唯一来源。事实上,当 Debezium 源连接器发出变更事件时,它可以为事件字段分配类似的逻辑类型来表示时间戳、日期甚至 JSON 数据等数据类型。
Debezium JDBC 接收器连接器使用这些原始和逻辑类型将列类型解析为 JDBC SQL 代码,该代码表示列类型。这些 JDBC SQL 代码随后被底层 Hibernate 持久化框架用于将列类型解析为所用方言的逻辑数据类型。下表说明了 Kafka Connect 和 JDBC SQL 类型之间,以及 Debezium 和 JDBC SQL 类型之间的原始和逻辑映射。实际的最终列类型因数据库类型而异。
Kafka Connect 原始数据类型映射
| 原始类型 | JDBC SQL 类型 |
|---|---|
INT8 |
Types.TINYINT |
INT16 |
Types.SMALLINT |
INT32 |
Types.INTEGER |
INT64 |
Types.BIGINT |
FLOAT32 |
Types.FLOAT |
FLOAT64 |
Types.DOUBLE |
BOOLEAN |
Types.BOOLEAN |
STRING |
Types.CHAR, Types.NCHAR, Types.VARCHAR, Types.NVARCHAR |
BYTES |
Types.VARBINARY |
|
对于 Oracle 23,该连接器不支持 |
Kafka Connect 逻辑数据类型映射
| 逻辑类型 | JDBC SQL 类型 |
|---|---|
org.apache.kafka.connect.data.Decimal |
Types.DECIMAL |
org.apache.kafka.connect.data.Date |
Types.DATE |
org.apache.kafka.connect.data.Time |
Types.TIMESTAMP |
org.apache.kafka.connect.data.Timestamp |
Types.TIMESTAMP |
Debezium 逻辑数据类型映射
| 逻辑类型 | JDBC SQL 类型 |
|---|---|
io.debezium.time.Date |
Types.DATE |
io.debezium.time.Time |
Types.TIMESTAMP |
io.debezium.time.MicroTime |
Types.TIMESTAMP |
io.debezium.time.NanoTime |
Types.TIMESTAMP |
io.debezium.time.ZonedTime |
Types.TIME_WITH_TIMEZONE |
io.debezium.time.Timestamp |
Types.TIMESTAMP |
io.debezium.time.MicroTimestamp |
Types.TIMESTAMP |
io.debezium.time.NanoTimestamp |
Types.TIMESTAMP |
io.debezium.time.ZonedTimestamp |
Types.TIMESTAMP_WITH_TIMEZONE |
io.debezium.data.VariableScaleDecimal |
Types.DOUBLE |
|
如果数据库不支持带时区的日期或时间戳,则映射会解析为其不带时区的等效项。 |
Debezium 方言特定的数据类型映射
| 逻辑类型 | MySQL SQL 类型 | PostgreSQL SQL 类型 | SQL Server SQL 类型 |
|---|---|---|---|
io.debezium.data.Bits |
|
|
|
io.debezium.data.Enum |
|
Types.VARCHAR |
n/a |
io.debezium.data.Json |
|
|
n/a |
io.debezium.data.EnumSet |
|
n/a |
n/a |
io.debezium.time.Year |
|
n/a |
n/a |
io.debezium.time.MicroDuration |
n/a |
|
n/a |
io.debezium.data.Ltree |
n/a |
|
n/a |
io.debezium.data.Uuid |
n/a |
|
n/a |
io.debezium.data.Xml |
n/a |
|
|
Debezium 逻辑向量类型映射
Debezium JDBC 接收器连接器支持将逻辑向量数据类型从源事件直接映射到接收目标,前提是目标数据库支持可比的表示。
如果源中的向量字段具有逻辑名称,Debezium 会使用它来确定适当的映射。对于某些数据库,连接器会在 io.debezium.data.vector.* 名称空间下识别以下逻辑名称:
-
FloatVector -
DoubleVector -
SparseVector
当连接器处理消息时,如果存在这些逻辑类型,连接器会检查目标数据库中是否存在相应的映射。如果存在映射,则应用它;否则,它将根据 Kafka Connect 模式类型将数据序列化为字符串。
例如,配置为将数据发送到 PostgreSQL 数据库接收器的连接器会将具有 io.debezium.data.FloatVector 逻辑名称的字段映射到 halfvector 列类型,使用特殊的覆盖映射。相比之下,如果接收数据库中没有直接映射,例如对于 Oracle 接收器,连接器将默认以其原始字符串格式序列化向量数据。
并非所有支持数据库的版本都支持特殊的向量类型。接收数据库必须满足以下要求才能支持逻辑向量类型的直接映射:
-
MariaDB 版本 11.7 或更高版本
-
MySQL 版本 9.0 或更高版本
-
PostgreSQL 需要 pgvector 扩展
早期版本的 MariaDB 或 MySQL,以及没有 pgvector 的 PostgreSQL 不支持特殊的向量类型。
| 逻辑类型 | Db2 | MySQL | PostgreSQL | Oracle | SQL Server |
|---|---|---|---|---|---|
io.debezium.data.DoubleVector |
不支持 |
'vector' |
|
不支持 |
不支持 |
io.debezium.data.FloatVector |
不支持 |
'vector' |
|
不支持 |
不支持 |
io.debezium.data.SparseVector |
不支持 |
不支持 |
|
不支持 |
不支持 |
|
如果 Debezium 向量逻辑类型与目标关系型数据库的列类型之间没有直接映射,您可以使用 |
列和数据类型传播
除了上表中所示的原始和逻辑映射外,如果变更事件的源是 Debezium 源连接器,则列类型及其长度、精度和小数位数的解析可以通过启用列或数据类型传播来进一步影响。列和数据类型传播有助于确定传入数据的结构和数据类型如何被转换并应用于接收目标。如前文 数据和列类型映射 所述,您可以通过在源连接器配置中设置以下属性之一来强制传播:
-
column.propagate.source.type -
datatype.propagate.source.type
Debezium JDBC 接收器连接器仅应用具有更高优先级的相应值。
为了说明传播如何影响连接器映射数据类型,我们来看一个例子。以下示例显示了一个可能包含在变更事件中的字段模式:
{
"schema": {
"type": "INT8",
"parameters": {
"__debezium.source.column.type": "TINYINT",
"__debezium.source.column.length": "1"
}
}
}
由于发出事件的源连接器配置为使用列或数据类型传播,因此事件包含指定列 type 和 length 的参数。
如果源连接器未启用传播,则 type 和 length 参数将不存在,Debezium JDBC 接收器连接器将默认将 type 字段中的 INT8 值映射到 Types.SMALLINT 列类型。根据目标数据库的 SQL 方言,连接器随后会将 JDBC Types.SMALLINT 类型解析为多种逻辑类型之一。例如,对于 MySQL 接收目标,JDBC 连接器默认将 Types.SMALLINT 转换为 TINYINT 列类型,不指定长度。
然而,由于源连接器中启用了类型传播,它发出的事件包含 type 和 length,提供了更具体的映射说明。因此,Debezium JDBC 接收器连接器不使用默认映射,而是使用给定的参数值来精炼映射,并在接收数据库中创建一个类型为 TINYINT(1) 的列。
|
通常,在源数据库和接收数据库类型相同时,使用列或数据类型传播的效果最为显著。因为源数据库和接收数据库共享相同的底层模式结构,所以更容易在它们之间映射数据类型,而无需应用复杂的转换或解释。我们一直在寻找改进异构数据库之间映射的方法,当前的类型系统允许我们根据反馈继续精炼这些映射。如果您发现某个映射可以改进,请告知我们。 |
转换
Debezium JDBC 连接器提供了几种转换,可以添加到连接器配置中,在连接器处理之前动态修改消费的事件。
命名转换
Debezium JDBC 连接器提供两种命名转换,您可以将其应用于更改事件中的主题或字段名称的命名样式。另外,您还可以使用命名转换为转换后的名称添加前缀或后缀。
您可以配置命名转换以使用以下命名样式之一:
camel_case-
删除所有句点 (
.) 和下划线 (_) 字符,并将紧随其后的字符转换为大写。例如,值inventory.customers将更改为inventoryCustomers。 snake_case-
删除所有句点 (
.) 字符,并将其替换为下划线 (_) 字符。此外,所有数字序列都将加上下划线 (_) 前缀,所有大写字符都将转换为小写,然后加上下划线 (_) 前缀。例如,值public.inventory变为public_inventory,而TopicWith123Numbers变为topic_with_123_numbers。 upper_case-
转换为大写。例如,值
public.inventory将变为PUBLIC.INVENTORY。 lower_case-
转换为小写。例如,值
PUBLIC.INVENTORY将变为public.inventory。
连接器提供以下命名转换:
- CollectionNameTransformation
-
CollectionNameTransformation提供了一种在事件被消费之前更改主题名称大小写的方法。此转换具有以下配置属性:属性 Default (默认值) 描述 none指定要应用于事件主题的命名样式。
空
指定转换后应用于主题名称的前缀。
空
指定转换后应用于主题名称的后缀。
以下配置示例说明了如何使用
CollectionNameTransformationSMT 将主题名称设置为大写,并在主题名称前加上ADT_。例如,给定一个名为public.inventory的主题,主题名称将变为ADT_PUBLIC.INVENTORY。示例 CollectionNameTransformation 配置{ "transforms": "topic-uppercase", "transforms.topic-uppercase.type": "io.debezium.connector.jdbc.transforms.CollectionNameTransformation", "transforms.topic-uppercase.collection.naming.style": "upper_case", "transforms.topic-uppercase.collection.naming.prefix": "ADT_" } - FieldNameTransformation
-
FieldNameTransformation提供了一种在事件被消费之前更改字段名称大小写的方法。如果事件是 Debezium 源连接器事件,则仅更改before和after部分中的字段。当事件不是 Debezium 源连接器事件时,将更改所有顶级字段名称。此转换具有以下配置属性:
属性 Default (默认值) 描述 none指定要应用于字段名称的命名样式。
空
指定转换后应用于字段名称的前缀。
空
指定转换后应用于字段名称的后缀。
以下示例展示了如何配置
FieldNameTransformationSMT 将字段名称转换为小写,并在名称前加上字符串adt_。将 SMT 应用于包含名称为ID的字段的事件后,该字段将重命名为adt_id。示例 FieldNameTransformation 配置{ "transforms": "topic-lowercase", "transforms.topic-lowercase.type": "io.debezium.connector.jdbc.transforms.FieldNameTransformation", "transforms.topic-lowercase.collection.naming.style": "lower_case", "transforms.topic-lowercase.collection.naming.prefix": "adt_" }
部署
要部署 Debezium JDBC 连接器,您需要安装 Debezium JDBC 连接器归档文件,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
-
已安装 Apache Kafka 和 Kafka Connect。
-
您有一个 Kafka 主题,连接器可以从中读取变更事件记录。
-
已安装并配置为接受 JDBC 连接的目标数据库。
-
下载 Debezium JDBC 连接器插件归档文件。
-
将文件解压到您的 Kafka Connect 环境。
-
可选地从 Maven Central 下载 JDBC 驱动程序,并将下载的驱动程序文件提取到包含 JDBC 接收器连接器 JAR 文件的目录中。
Oracle 和 Db2 的驱动程序不包含在 JDBC 接收器连接器中。您必须手动下载并安装这些驱动程序。
-
将包含 JAR 文件的目录添加到 Kafka Connect 的
plugin.path。请确保您安装 JDBC 接收器连接器的路径是 Kafka Connectplugin.path的一部分。 -
重新启动 Kafka Connect 进程以加载新的 JAR 文件。
Debezium JDBC 连接器配置
通常,您可以通过提交一个 JSON 请求来注册 Debezium JDBC 连接器,该请求指定连接器的配置属性。以下示例显示了注册 Debezium JDBC 接收器连接器实例的 JSON 请求,该连接器从名为 orders 的主题消费事件,并使用最常见的配置设置。
{
"name": "jdbc-connector", (1)
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", (2)
"tasks.max": "1", (3)
"connection.url": "jdbc:postgresql:///db", (4)
"connection.username": "pguser", (5)
"connection.password": "pgpassword", (6)
"insert.mode": "upsert", (7)
"delete.enabled": "true", (8)
"primary.key.mode": "record_key", (9)
"schema.evolution": "basic", (10)
"use.time.zone": "UTC", (11)
"topics": "orders" (12)
}
}
JDBC 连接器配置设置的说明
| Item | 描述 |
|---|---|
1 |
在您向 Kafka Connect 服务注册连接器时为其分配的名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器必需的。 |
2 |
连接器类的名称。 |
3 |
为此连接器创建的最大任务数。 |
4 |
连接器用于连接到其写入数据的接收数据库的 JDBC URL。 |
5 |
用于身份验证的数据库用户名。 |
6 |
用于身份验证的数据库用户的密码。 |
7 |
连接器使用的 insert.mode。 |
8 |
启用在数据库中删除记录。有关更多信息,请参阅 delete.enabled 配置属性。 |
9 |
指定用于解析主键列的方法。有关更多信息,请参阅 primary.key.mode 配置属性。 |
10 |
启用连接器演进目标数据库的模式。有关更多信息,请参阅 schema.evolution 配置属性。 |
11 |
写入时间字段类型时使用的时区。 |
12 |
要消费的主题列表,以逗号分隔。 |
有关您可以为 Debezium JDBC 连接器设置的完整配置属性列表,请参阅 JDBC 连接器属性。
您可以将此配置与 POST 命令一起发送到正在运行的 Kafka Connect 服务。该服务会记录配置并启动一个接收器连接器任务,该任务执行以下操作:
-
连接到数据库。
-
消费来自订阅的 Kafka 主题的事件。
-
将事件写入配置的数据库。
连接器属性
Debezium JDBC 接收器连接器具有多个您可以使用的配置属性,以实现满足您需求的连接器行为。许多属性都有默认值。有关属性的信息组织如下:
JDBC 连接器 Kafka 消费者属性
| 属性 | Default (默认值) | 描述 |
|---|---|---|
无默认值 |
在 Kafka Connect 服务中注册连接器时为其分配的唯一名称。如果尝试在注册连接器时重复使用此名称,则会失败。此属性是所有 Kafka Connect 连接器必需的。 |
|
无默认值 |
连接器的 Java 类名称。对于 Debezium JDBC 连接器,请指定值 |
|
1 |
此连接器的最大任务数。 |
|
无默认值 |
要消费的主题列表,以逗号分隔。请勿将此属性与 |
|
无默认值 |
一个指定要消费主题的正则表达式。在内部,正则表达式会被编译为 |
JDBC 连接器连接属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
要使用的连接提供程序实现。 |
|||
无默认值 |
用于连接数据库的 JDBC 连接 URL。 |
|||
无默认值 |
连接器用于连接数据库的数据库用户帐户名称。 |
|||
无默认值 |
连接器用于连接数据库的密码。 |
|||
|
指定池中的最小连接数。 |
|||
|
指定池维护的最大并发连接数。 |
|||
|
指定当连接池超出其最大大小时,连接器尝试获取的连接数。 |
|||
|
指定在丢弃之前保留空闲连接的时间(以秒为单位)。 |
|||
|
指定连接器在瞬态 JDBC 连接错误后是否重试。 启用时(
|
JDBC 连接器运行时属性
| 属性 | Default (默认值) | 描述 | ||
|---|---|---|---|---|
|
指定插入 JDBC 时间值时使用的时区。 |
|||
|
指定连接器是否处理 |
|||
|
指定连接器是否处理
|
|||
|
指定用于将事件插入数据库的策略。可用的选项如下:
|
|||
|
指定连接器如何从事件解析主键列。
|
|||
无默认值 |
主键列的名称或用于从其派生主键的字段列表(逗号分隔)。 |
|||
|
指定生成的 SQL 语句是否使用引号分隔表名和列名。有关更多详细信息,请参阅 JDBC 引用大小写敏感性 部分。 |
|||
|
指定连接器如何演进目标表模式。有关更多信息,请参阅 模式演进。以下选项可用:
|
|||
|
指定连接器用于构建目标表名称的字符串模式。 |
|||
|
指定安装了 PostgreSQL PostGIS 扩展的模式名称。默认值为 |
|||
|
指定连接器是否在向 SQL Server 表的标识列执行 |
|||
|
指定尝试将多少条记录批量写入目标表。
|
|||
|
指定是否启用 Debezium JDBC 连接器的缩减缓冲区。 选择以下设置之一:
为了在启用缩减缓冲区的情况下优化 PostgreSQL 接收数据库中的查询处理,您还必须通过将 |
|||
空字符串 |
一个可选的、逗号分隔的字段名称列表,这些字段名称与要从变更事件值中包含的字段的完全限定名称匹配。字段的完全限定名称形式为 |
|||
空字符串 |
一个可选的、逗号分隔的字段名称列表,这些字段名称与要从变更事件值中排除的字段的完全限定名称匹配。字段的完全限定名称形式为 |
|||
5 |
指定在尝试将更改刷新到目标数据库导致某些数据库错误后,连接器执行的最大重试次数。如果重试次数超过重试值,则接收器连接器将进入 FAILED 状态。 |
|||
1000 |
指定连接器在重试失败的刷新操作之前等待的毫秒数。
|
JDBC 连接器可扩展属性
| 属性 | Default (默认值) | 描述 |
|---|---|---|
|
指定 |
|
|
指定
|
JDBC 连接器 hibernate.* 直通属性
Kafka Connect 支持直通配置,允许您通过将某些属性直接从连接器配置传递来修改底层系统的行为。默认情况下,一些 Hibernate 属性通过 JDBC 连接器 连接属性(例如,connection.url、connection.username 和 connection.pool.*_size)以及通过连接器的 运行时属性(例如,use.time.zone、quote.identifiers)暴露。
如果您想自定义其他 Hibernate 行为,可以通过将使用 hibernate.* 名称空间的属性添加到连接器配置中来利用直通机制。例如,为了帮助 Hibernate 解析目标数据库的类型和版本,您可以添加 hibernate.dialect 属性并将其设置为数据库的完全限定类名,例如 org.hibernate.dialect.MariaDBDialect。
常见问题解答
- 是否需要
ExtractNewRecordState单消息转换? -
否,这实际上是 Debezium JDBC 连接器与其他实现的一个区别点。虽然该连接器能够像竞争对手一样摄取展平的事件,但它也可以原生摄取 Debezium 的复杂变更事件结构,而无需任何特定类型的转换。
- 如果列的类型发生更改,或者列被重命名或删除,是否由模式演进处理?
-
否,Debezium JDBC 连接器不对现有列进行任何更改。连接器支持的模式演进非常基本。它只是比较事件结构中的字段与表的列列表,然后添加表中尚未定义为列的任何字段。如果列的类型或默认值发生更改,连接器不会在目标数据库中进行调整。如果列被重命名,旧列将保持原样,连接器会将一个新名称的列添加到表中;但是,具有旧列数据的现有行将保持不变。这些类型的模式更改应手动处理。
- 如果列的类型未解析为我想要的类型,我该如何强制映射到不同的数据类型?
-
Debezium JDBC 连接器使用复杂的类型系统来解析列的数据类型。有关此类型系统如何解析特定字段的模式定义到 JDBC 类型的详细信息,请参阅 数据和列类型映射 部分。如果您想应用不同的数据类型映射,请手动定义表以显式获取首选的列类型。
- 如何在不更改 Kafka 主题名称的情况下为表名称指定前缀或后缀?
-
为了在目标表名称中添加前缀或后缀,请调整 collection.name.format 连接器配置属性以应用所需的前缀或后缀。例如,要在所有表名前添加
jdbc_前缀,请将collection.name.format配置属性的值指定为jdbc_${topic}。如果连接器订阅了一个名为orders的主题,则生成的表将创建为jdbc_orders。 - 为什么有些列会自动被引用,即使未启用标识符引用?
-
在某些情况下,特定的列名或表名可能会被显式引用,即使未启用
quote.identifiers。当列名或表名以特定约定开头或使用特定约定时,这通常是必需的,因为否则会被视为非法语法。例如,当 primary.key.mode 设置为kafka时,一些数据库只允许列名以下划线开头,如果该列名被引用。引用行为是特定于方言的,并且因不同类型的数据库而异。