Cassandra 连接器
Cassanadra 连接器可以监控 Cassandra 集群并记录所有行级别更改。该连接器必须本地部署在 Cassandra 集群的每个节点上。当连接器首次连接到 Cassandra 节点时,它会对所有键空间中启用了 CDC 的表进行快照。连接器还将读取写入 Cassandra 提交日志的更改,并生成相应的插入、更新和删除事件。每个表的事件都记录在单独的 Kafka 主题中,应用程序和服务可以轻松地对其进行消费。
有关此连接器兼容的 Cassandra 版本信息,请参阅 Debezium 版本概述。
概述
Cassandra 是一个开源的 NoSQL 数据库。与大多数数据库类似,Cassandra 的写入路径以立即将更改记录到其提交日志开始。提交日志驻留在每个节点本地,记录对该节点的每一次写入。
自 Cassandra 3.0 起,引入了 更改数据捕获 (CDC) 功能。可以通过设置表属性 cdc=true 在表级别启用 CDC 功能,之后包含 CDC 启用表数据的任何提交日志在丢弃时都会被移动到 cassandra.yaml 中指定的 CDC 目录。
Cassandra 连接器驻留在每个 Cassandra 节点上,并监控 cdc_raw 目录中的更改。它会处理检测到的所有本地提交日志段,为提交日志中的每一行插入、更新和删除操作生成一个更改事件,将每个表的事件发布到单独的 Kafka 主题,最后从 cdc_raw 目录中删除提交日志。最后一步很重要,因为一旦启用了 CDC,Cassandra 本身就无法清除提交日志。如果 cdc_free_space_in_mb 填满,将拒绝对已启用 CDC 的表的写入。
该连接器具有故障容忍能力。当连接器读取提交日志并生成事件时,它会记录每个提交日志段的文件名和位置以及每个事件。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它将继续从上次停止的地方读取提交日志。这包括快照:如果在连接器停止时快照未完成,重新启动时将开始一个新的快照。我们将在后面讨论连接器 在出现问题时 的行为。
|
Cassandra 与其他 Debezium 连接器不同,因为它不是基于 Kafka Connect 框架实现的。相反,它是一个独立的 JVM 进程,驻留在每个 Cassandra 节点上,并通过 Kafka 生产者将事件发布到 Kafka。 |
|
以下功能目前不受 Cassandra 连接器支持。任何这些功能导致的更改都将被忽略
|
设置 Cassandra
在使用 Debezium Cassandra 连接器监控 Cassandra 集群的更改之前,必须在节点级别和表级别启用 CDC。
在节点上启用 CDC
要启用 CDC,请在 cassandra.yaml 中更新以下 CDC 配置
cdc_enabled: true
其他 CDC 配置具有以下默认值
cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw
cdc_free_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
-
cdc_enabled启用或禁用节点范围内的 CDC 操作 -
cdc_raw_directory确定在所有 memtables 刷新后提交日志段移动到的目标位置。 -
cdc_free_space_in_mb是分配用于存储提交日志段的最大容量,默认为 4096 MB 和卷空间 1/8 中的较小者。 -
cdc_free_space_check_interval_ms是重新计算cdc_raw_directory所占用空间频率,以避免在容量已满时浪费 CPU 周期。
Cassandra 连接器的工作原理
本节详细介绍了 Cassandra 连接器如何执行快照,将提交日志事件转换为 Debezium 更改事件,处理提交日志生命周期,将事件记录到 Kafka,管理模式演进以及在出现问题时的行为。
快照
当 Cassandra 连接器首次在 Cassandra 节点上启动时,默认情况下将执行初始快照。这是默认模式,因为大多数情况下 CDC 是在非空表上启用的,并且提交日志不包含完整的历史记录。
快照读取器发出 SELECT 语句来查询表中的所有列。Cassandra 允许全局或在语句级别设置一致性级别。对于快照,一致性级别默认在语句级别设置为 ALL,以提供最高的一致性。这意味着如果在快照期间一个节点宕机,快照将无法继续,并且在节点恢复在线后需要进行后续的重新快照。您可以将快照的一致性级别调整为较低的一致性级别以提高可用性,前提是您了解一致性方面的权衡。
|
在 Cassandra 3.X 中,无法仅从本地 Cassandra 节点读取。从 Cassandra 4.0 开始,将添加 |
与关系数据库不同,快照期间不应用读锁定,因此写入 Cassandra 不会被阻塞。如果在快照期间查询的数据被另一个客户端修改,这些更改可能会反映在快照结果集中。
如果连接器在快照完成之前失败或停止,连接器将在重新启动时开始一个新的快照。在默认快照模式 (initial) 下,一旦连接器完成初始快照,它将不再执行任何其他快照。唯一的例外是在连接器重启期间:如果对某个表启用了 CDC,然后重启了连接器,那么该表将被快照。
第二个快照模式 (always) 允许连接器在需要时执行快照。它会定期检查新启用的 CDC 表,并在检测到后立即对其进行快照。
第三个快照模式 ('never') 确保连接器从不执行快照。当以这种方式配置新连接器时,它只会读取 CDC 目录中的提交日志。这不是默认行为,因为以这种模式(无快照)启动新连接器要求提交日志包含所有已启用 CDC 表的完整历史记录,而这通常并非如此。此模式的另一个用例是,如果已经有一个连接器正在进行快照,您可以禁用其他连接器的快照以避免重复工作。
读取提交日志
Cassandra 连接器通常会花费大部分时间读取 Cassandra 节点上的本地提交日志。在 Cassandra 4.0 中,每次 fsync 都会更新一个索引文件以反映最新的偏移量。这消除了 Cassandra 3.X 中 CDC 功能的处理延迟,并且可以通过将配置设置为 commit.log.real.time.processing.enabled 为 true 来在 Cassandra 4 Debezium 连接器中启用。索引文件轮询的频率由 commit.log.marked.complete.poll.interval.ms 决定。
提交日志的二进制数据使用 Cassandra 的 CommitLogReader 和 CommitLogReadHandler 进行反序列化。每个反序列化的对象在 Cassandra 中称为一个 mutation。一个 mutation 包含一个或多个更改事件。
当 Cassandra 连接器读取提交日志时,它会将日志事件转换为 Debezium 的*创建*、*更新*或*删除*事件,其中包括事件在提交日志中的位置。Cassandra 连接器使用 Kafka Connect 转换器对这些更改事件进行编码,并将它们发布到相应的 Kafka 主题。
提交日志的局限性
Cassandra 的提交日志存在一系列限制,这些限制对于正确解释 CDC 事件至关重要
-
只有当提交日志已满时,它们才会进入
cdc_raw目录,然后会被刷新/丢弃。这意味着事件被记录与事件被捕获之间存在延迟。 -
单个 Cassandra 节点上的提交日志不反映集群的所有写入,它们仅反映存储在该节点上的写入。因此,有必要监控 Cassandra 集群中所有节点的更改。然而,由于复制因子,这也意味着下游事件消费者需要处理去重。
-
单个 Cassandra 节点上的写入按到达顺序记录。然而,这些事件可能以与其发出顺序不同的顺序到达。这些事件的下游消费者必须理解并实现类似于 Cassandra 读取路径的逻辑以获得正确的结果。
-
表的模式更改未记录在提交日志中,仅记录数据更改。因此,模式更改的检测是尽力而为的。为避免数据丢失,建议在模式更改期间暂停对表的写入。
-
Cassandra 不执行写前读,因此提交日志不记录已更改行的每一列的值,它只记录已修改列的值(分区键列除外,它们始终被记录,因为它们在 Cassandra DML 命令中是必需的)。
-
由于 CQL 的特性,*insert* DML 可能导致行插入或更新;*update* DML 可能导致行插入、更新或删除;*delete* DML 可能导致行更新或删除。由于查询未记录在提交日志中,CDC 事件类型根据其对关系数据库意义上的行的影响进行分类。
待办:是否有方法确定与实际 Cassandra DML 语句对应的事件类型?如果有,这是否比这些事件的语义更可取?
管理提交日志生命周期
默认情况下,Cassandra 连接器会删除已处理的提交日志。不建议在禁用提交日志删除的情况下启动连接器,因为这可能会导致磁盘存储膨胀并阻止对 Cassandra 集群进行进一步写入。要以自定义方式管理提交日志(例如,将其上传到云提供商),可以实现 CommitLogTransfer 接口。
主题名称
Cassandra 连接器将单个表上的所有插入、更新和删除操作的事件写入单个 Kafka 主题。Kafka 主题的名称始终采用以下形式
clusterName.keyspaceName.tableName
其中 clusterName 是连接器的逻辑名称,如 topic.prefix 配置属性指定,keyspaceName 是操作发生的键空间的名称,tableName 是操作发生的表的名称。
例如,假设有一个 Cassandra 安装,其中有一个 inventory 键空间,包含四个表:products、products_on_hand、customers 和 orders。如果监控此数据库的连接器被赋予的逻辑服务器名称为 fulfillment,则该连接器将在以下四个 Kafka 主题上生成事件
-
fulfillment.inventory.products -
fulfillment.inventory.products_on_hand -
fulfillment.inventory.customers -
fulfillment.inventory.orders
待办:对于主题名称,*clusterName*.*keyspaceName*.*tableName* 是否可以?还是应该是 *connectorName*.*keyspaceName*.*tableName* 或 *connectorName*.*clusterName*.*keyspaceName*.*tableName*?
模式演进
DDL 未记录在提交日志中。当表模式发生更改时,此更改从 Cassandra 节点之一发出,并通过 Gossip 协议传播到其他节点。
Cassandra 中的模式更改将被实现的 SchemaChangeListener 检测到,延迟小于 1 秒,然后更新从 Cassandra 加载的模式实例以及为每个表缓存的 Kafka 键值模式。
请注意,通过当前的模式演进方法,在以下情况下,Cassandra 连接器将在很短的时间内无法提供准确的数据更改信息
-
如果表禁用了 CDC,那么在禁用 CDC 之前发生的数据更改将被跳过。
-
如果从表中删除了一个列,那么在此列删除之前涉及该列的数据更改将无法正确反序列化,将被跳过。
事件
Cassandra 连接器生成的所有数据更改事件都具有键和值,尽管键和值的结构取决于更改事件起源的表(请参阅 主题名称)。
更改事件的键
对于给定的表,更改事件的键的结构包含一个字段,用于表示事件创建时表中主键的每个列。考虑一个 inventory 数据库,其中一个 customers 表定义如下
CREATE TABLE customers (
id bigint,
registration_date timestamp,
first_name text,
last_name text,
email text,
PRIMARY KEY (id, registration_date)
);
对于 customers 表的所有更改事件(在其具有此定义时),将具有相同的键架构,在 JSON 表示中如下所示
{
"type": "record",
"name": "cassandra-cluster-1.inventory.customers.Key",
"namespace": "io.debezium.connector.cassandra",
"fields": [
{
"name": "id",
"type": "long"
},
{
"name": "registration_date",
"type": "long",
"logicalType": "timestamp-millis"
}
]
}
对于 id = 1001 和 registration_date = 1562202942545,键的 JSON 表示如下
{
"id": 1001,
"registration_date": 1562202942545
}
|
虽然 |
更改事件的值
更改事件消息的值更复杂一些。Cassandra 连接器生成的每个更改事件值都包含一个信封结构,其中包含以下字段
op (操作)-
一个强制字段,包含一个字符串值,描述操作类型。Cassandra 连接器的值为
i表示插入,u表示更新,d表示删除。 after-
如果存在,一个可选字段,包含事件发生*后*行的状态。结构将由
cassandra-cluster-1.inventory.customers.ValueKafka Connect 模式描述,该模式表示事件引用的集群、键空间和表。 source (源)-
一个强制字段,包含描述事件源元数据的结构,对于 Cassandra,该结构包含以下字段
-
Debezium 版本。
-
连接器名称。
-
Cassandra 集群名称。
-
事件在其中记录的提交日志文件名,该提交日志文件中的事件出现位置,此事件是否属于快照,受影响的键空间和表的名称,以及分区更新的最大时间戳(微秒)。
-
ts_ms-
(可选) 如果存在,包含连接器处理事件的时间,基于运行 Cassandra 连接器的 JVM 的系统时钟。
|
由于 Cassandra 不执行写前读,Cassandra 提交日志不记录更改应用前行的值。因此,Cassandra 更改事件记录不包含 |
以下是我们 customers 表的*创建*事件的值模式的 JSON 表示
{
"type": "record",
"name": "cassandra-cluster-1.inventory.customers.Envelope",
"namespace": "io.debezium.connector.cassandra",
"fields": [
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": "long",
"logicalType": "timestamp-millis"
},
{
"name": "after",
"type": "record",
"fields": [
{
"name": "id",
"type": [
"null",
{
"name": "id",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "registration_date",
"type": [
"null",
{
"name": "registration_date",
"type": "record",
"fields": [
{
"name":"value",
"type": "long",
"logical_type": "timestamp-millis"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "first_name",
"type": [
"null",
{
"name": "first_name",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "last_name",
"type": [
"null",
{
"name": "last_name",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
},
{
"name": "last_name",
"type": [
"null",
{
"name": "email",
"type": "record",
"fields": [
{
"name":"value",
"type": "string"
},
{
"name":"deletion_ts",
"type": ["null", "long"],
"default" : "null"
},
{
"name":"set",
"type": "boolean"
}
]
}
]
}
]
},
{
"name": "source",
"type": "record",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "cluster",
"type": "string"
},
{
"name": "snapshot",
"type": "boolean"
},
{
"name": "keyspace",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "file",
"type": "string"
},
{
"name": "position",
"type": "int"
},
{
"name": "ts_ms",
"type": "long",
"logicalType": "timestamp-micros"
}
]
}
]
}
待办:在删除 DDL 的情况下,验证 max timestamp != deletion timestamp
给定以下 insert DML
INSERT INTO customers (
id,
registration_date,
first_name,
last_name,
email)
VALUES (
1001,
now(),
"Anne",
"Kretchmar",
"annek@noanswer.org"
);
JSON 表示的值负载如下
{
"op": "c",
"ts_ms": 1562202942832,
"ts_us": 1562202942832014,
"ts_ns": 1562202942832014962,
"after": {
"id": {
"value": 1001,
"deletion_ts": null,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": null,
"set": true
},
"first_name": {
"value": "Anne",
"deletion_ts": null,
"set": true
},
"last_name": {
"value": "Kretchmar",
"deletion_ts": null,
"set": true
},
"email": {
"value": "annek@noanswer.org",
"deletion_ts": null,
"set": true
}
},
"source": {
"version": "3.3.1.Final",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 54,
"ts_ms": 1562202942666382,
"ts_us": 1562202942666382000,
"ts_ns": 1562202942666382000000
}
}
给定以下 update DML
UPDATE customers
SET email = "annek_new@noanswer.org"
WHERE id = 1001 AND registration_date = 1562202942545
JSON 表示的值负载如下
{
"op": "u",
"ts_ms": 1562202942912,
"ts_us": 1562202942912014,
"ts_ns": 1562202942912014982,
"after": {
"id": {
"value": 1001,
"deletion_ts": null,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": null,
"set": true
},
"first_name": null,
"last_name": null,
"email": {
"value": "annek_new@noanswer.org",
"deletion_ts": null,
"set": true
}
},
"source": {
"version": "3.3.1.Final",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 102,
"ts_ms": 1562202942666490,
"ts_us": 1562202942666490000,
"ts_ns": 1562202942666490000000
}
}
与*insert* 事件中的值进行比较,我们可以看到几个不同之处
-
op字段值现在是u,表示该行由于更新而更改。 -
after字段现在包含行的更新状态,这里可以看到 email 值现在是annek_new@noanswer.org。请注意,first_name和last_name为 null,因为这些字段在此更新中未更改。但是,id和registration_date仍然包含在内,因为它们是此表的主键。 -
source字段结构与以前的字段相同,但值不同,因为此事件来自提交日志中的不同位置。 -
ts_ms显示连接器处理此事件的时间戳(毫秒)。
最后,给定以下 delete DML
DELETE FROM customers
WHERE id = 1001 AND registration_date = 1562202942545;
JSON 表示的值负载如下
{
"op": "d",
"ts_ms": 1562202942912,
"ts_us": 1562202942912047,
"ts_ns": 1562202942912047921,
"after": {
"id": {
"value": 1001,
"deletion_ts": 1562202972545,
"set": true
},
"registration_date": {
"value": 1562202942545,
"deletion_ts": 1562202972545,
"set": true
},
"first_name": null,
"last_name": null,
"email": null
},
"source": {
"version": "3.3.1.Final",
"connector": "cassandra",
"cluster": "cassandra-cluster-1",
"snapshot": false,
"keyspace": "inventory",
"table": "customers",
"file": "commitlog-6-123456.log",
"pos": 102,
"ts_ms": 1562202942666490,
"ts_us": 1562202942666490000,
"ts_ns": 1562202942666490000000
}
}
与*insert* 和 *update* 事件中的值进行比较,我们可以看到几个不同之处
-
op字段的值现在是d,表示此行因删除而更改。 -
after字段仅包含id和registration_date的值,因为这是按主键删除。 -
source字段结构与以前的字段相同,但值不同,因为此事件来自提交日志中的不同位置。 -
ts_ms显示连接器处理此事件的时间戳(毫秒)。
待办:鉴于 TTL 目前不受支持,是否最好移除 delete_ts?通过查看每个列是否为 null 来推断字段是否已设置是否也可以?
待办:讨论 Cassandra 连接器中的墓碑事件
数据类型
如上所述,Cassandra 连接器使用与行所在表结构相似的事件来表示行的更改。事件包含一个字段用于表示每个列的值,该值如何表示在事件中取决于列的 Cassandra 数据类型。本节描述了此映射。
下表描述了连接器如何将每种 Cassandra 数据类型映射到 Kafka Connect 数据类型。
Cassandra 数据类型 |
字面类型 (Schema 类型) |
语义类型 (Schema 名称) |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
|
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
n/a |
|
|
|
|
|
|
|
|
n/a |
|
|
n/a |
|
|
|
|
|
n/a |
|
|
n/a |
|
|
|
待办:添加逻辑类型
任意精度整数类型
Cassandra 连接器根据 varint.handling.mode 连接器配置属性 的设置来处理 varint 值。
- varint.handling.mode=long
-
表 1. 当 varint.handling.mode=long时的映射Cassandra 类型 文字类型 语义类型 varintINT64n/a
- varint.handling.mode=precise
-
表 2. 当 decimal.handling.mode=precise时的映射Cassandra 类型 文字类型 语义类型 varintBYTESorg.apache.kafka.connect.data.Decimalscaleschema 参数设置为零。 - varint.handling.mode=string
-
表 3. 当 varint.handling.mode=string时的映射Cassandra 类型 文字类型 语义类型 varintSTRINGn/a
Decimal 类型
Cassandra 连接器根据 decimal.handling.mode 连接器配置属性 的设置来处理 decimal 值。
- decimal.handling.mode=double
-
表 4. 当 decimal.handling.mode=double时的映射Cassandra 类型 文字类型 语义类型 decimalFLOAT64n/a
- decimal.handling.mode=precise
-
表 5. 当 decimal.handling.mode=precise时的映射Cassandra 类型 文字类型 语义类型 decimalSTRUCTio.debezium.data.VariableScaleDecimal
包含一个具有两个字段的结构:类型为INT32的scale字段,包含传输值的精度;类型为BYTES的value字段,包含原始值(未缩放形式)。 - decimal.handling.mode=string
-
表 6. 当 decimal.handling.mode=string时的映射Cassandra 类型 文字类型 语义类型 decimalSTRINGn/a
如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。
出现问题时
配置和启动错误
如果配置无效或连接器无法使用指定的连接参数成功连接到 Cassandra,Cassandra 连接器将在启动时失败,并在日志中报告错误或异常,然后停止运行。在这种情况下,错误将提供有关问题的更多详细信息,并可能建议一种解决方法。在更正配置后,可以重新启动连接器。
Cassandra 变得不可用
一旦连接器正在运行,如果 Cassandra 节点因任何原因变得不可用,连接器将失败并停止。在这种情况下,请在服务器可用后重新启动连接器。如果这发生在快照期间,它将从表的开头重新引导整个表。
Cassandra 连接器正常停止
如果 Cassandra 连接器被正常关闭,在停止进程之前,它将确保将 ChangeEventQueue 中的所有事件刷新到 Kafka。Cassandra 连接器跟踪每次流式记录发送到 Kafka 时的文件名和偏移量。因此,当连接器重新启动时,它将从上次停止的地方继续。它通过搜索目录中最旧的提交日志,开始处理该提交日志,跳过已读取的记录,直到找到尚未处理的最新记录来实现这一点。如果在快照期间停止了 Cassandra 连接器,它将从该表继续,但会重新引导整个表。
Cassandra 连接器崩溃
如果 Cassandra 连接器意外崩溃,那么 Cassandra 连接器可能会在未记录最近处理的偏移量的情况下终止。在这种情况下,当连接器重新启动时,它将从最近记录的偏移量继续。这意味着可能会有重复(鉴于我们已经从 RF 获得重复,这很容易)。请注意,由于偏移量仅在记录成功发送到 Kafka 时更新,因此在崩溃期间丢失 ChangeEventQueue 中未发出的数据是可以接受的,因为这些事件将被重新创建。
Kafka 变得不可用
当连接器生成更改事件时,它将使用 Kafka 生产者 API 将这些事件发布到 Kafka。如果 Kafka 代理变得不可用(生产者遇到 TimeoutException),Cassandra 连接器将每秒尝试重新连接到代理一次,直到成功为止。
Cassandra 连接器停止一段时间
根据表的写入负载,当 Cassandra 连接器长时间停止时,它可能会达到 cdc_total_space_in_mb 容量。一旦达到此上限,Cassandra 将停止接受对此表的写入;这意味着在运行 Cassandra 连接器时监控此空间非常重要。在最坏的情况下,如果发生这种情况,请完成以下步骤
-
关闭 Cassandra 连接器。
-
禁用表的 CDC,使其停止生成其他写入。由于提交日志未进行过滤,同一节点上其他已启用 CDC 的表的写入仍可能影响提交日志文件的生成。
-
从偏移量文件中删除记录的偏移量
-
在增加容量或控制目录已用空间后,重新启动连接器以重新引导该表。
部署连接器
Cassandra 连接器应部署在 Cassandra 集群的每个 Cassandra 节点上。Cassandra 连接器 Jar 文件接受一个 CDC 配置(.properties)文件。有关参考,请参阅 示例配置。
示例配置
以下是一个用于在本地运行和测试 Cassandra 连接器的 .properties 配置文件的示例
connector.name=test_connector
commit.log.relocation.dir=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/
http.port=8000
cassandra.config=/usr/local/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042
kafka.producer.bootstrap.servers=127.0.0.1:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_prefix
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: https://:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: https://:8081
offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/
snapshot.consistency=ONE
snapshot.mode=ALWAYS
latest.commit.log.only=true
连接配置
Cassanadra 连接器使用 Cassandra 驱动程序来配置到 Cassanadra 的连接,这必须使用单独的 application.conf 文件提供。您可以在 这里 找到驱动程序配置的完整参考,下面是一个示例
datastax-java-driver {
basic {
request.timeout = 20 seconds
contact-points = [ "spark-master-1:9042" ]
load-balancing-policy {
local-datacenter = "dc1"
}
}
advanced {
auth-provider {
class = PlainTextAuthProvider
username = user
password = pass
}
ssl-engine-factory {
...
}
}
}
为了让 Debezium 连接器读取/使用此应用程序配置文件,必须在连接器属性文件中进行设置,如下所示
cassandra.driver.config.file=/path/to/application/configuration.conf
监控
Cassandra 连接器内置了对 JMX 指标的支持。Cassandra 驱动程序还发布了许多关于驱动程序活动的指标,这些指标可以通过 JMX 进行监控。该连接器有两种类型的指标。
快照指标
MBean 是 debezium.cassandra:type=connector-metrics,context=snapshot,server=<topic.prefix>。
除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。
下表列出了可用的快照指标。
| 属性名称 | Type | 描述 |
|---|---|---|
|
|
包含在快照中的表总数。 |
|
|
快照尚未复制的表数。 |
|
|
快照是否已启动。 |
|
|
快照是否被中止。 |
|
|
快照是否已完成。 |
|
|
到目前为止快照所花费的总秒数,即使未完成。 |
|
|
包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。 |
流式处理指标
MBean 是 debezium.cassandra:type=connector-metrics,context=streaming,server=<topic.prefix>。
下表列出了可用的流式传输指标。
| 属性名称 | Type | 描述 |
|---|---|---|
|
|
连接器最近读取的提交日志文件名。 |
|
|
连接器读取的提交日志中的最新位置(以字节为单位)。 |
|
|
已处理的 mutations 数量。 |
|
|
处理提交日志时发生不可恢复错误的数量。 |
连接器属性
属性 |
Default (默认值) |
描述 |
||
|
指定 Cassandra 连接器代理启动时运行快照(例如,初始同步)的标准。必须是 'INITIAL'、'ALWAYS' 或 'NEVER' 之一。默认快照模式为 'INITIAL'。 |
|||
|
指定用于快照查询的 {@link ConsistencyLevel}。 |
|||
|
HTTP 服务器用于 ping、健康检查和构建信息的端口 |
|||
无默认值 |
Cassandra 节点使用的 YAML 配置文件绝对路径。 |
|||
|
Cassandra 驱动程序配置文件路径 |
|||
|
仅适用于 Cassandra 4,如果设置为 true,Cassandra 连接器代理将通过监视提交日志索引文件的更新来增量读取提交日志,并以 |
|||
10000 |
仅适用于 Cassandra 4,并且当通过 |
|||
无默认值 |
提交日志处理后从 cdc_raw 目录移动到的本地目录。 |
|||
|
确定 CommitLogPostProcessor 是否运行以将已处理的提交日志从重定位目录移出。如果禁用,提交日志将不会从重定位目录中移出。 |
|||
10000 |
CommitLogPostProcessor 等待重新获取重定位目录中所有已处理提交日志的时间。 |
|||
|
CommitLogPostProcessor 用于将已处理的提交日志从重定位目录移出的类。内置的传输类是 |
|||
false |
确定 CommitLogProcessor 是否重新处理错误提交日志。 |
|||
COMMITLOG_FILE |
指定如何确保更改事件的顺序。每个选项都指示用于哈希的属性。共享相同哈希值的事件将保持其顺序。建议使用 'PARTITION_VALUES' 来使哈希策略与 Kafka 中的消息对齐。必须是 'COMMITLOG_FILE' 或 'PARTITION_VALUES' 之一。 |
|||
无默认值 |
枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:
您必须设置 对于为连接器配置的每个转换器,您还必须添加一个
For example, (例如,) isbn.type: io.debezium.test.IsbnConverter 如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如: isbn.schema.name: io.debezium.cassandra.type.Isbn |
|||
无默认值 |
存储偏移量跟踪文件的目录。 |
|||
|
提交偏移量之前等待的最短时间。默认值 0 表示每次都刷新偏移量。 |
|||
|
在需要将偏移量刷新到磁盘之前允许处理的最大记录数。此配置仅在 offset_flush_interval_ms != 0 时有效。 |
|||
|
正整数值,指定从提交日志读取的更改事件在写入 Kafka 之前放置的阻塞队列的最大大小。此队列可以为提交日志读取器提供回压,例如,当写入 Kafka 变慢或 Kafka 不可用时。出现在队列中的事件不包括在此连接器定期记录的偏移量中。默认为 8192,应始终大于 max.batch.size 属性中指定的 maximum batch size。在这些事件被转换为 Kafka Connect struct 并发送到 Kafka 之前,队列容纳反序列化记录的能力。 |
|||
|
每次要出队的更改事件的最大数量。 |
|||
|
一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。 |
|||
|
正整数值,指定提交日志处理器在每次迭代中等待新更改事件出现在队列中的毫秒数。默认为 1000 毫秒,即 1 秒。 |
|||
|
正整数值,指定模式处理器在刷新缓存的 Cassandra 表模式之前等待的毫秒数。 |
|||
|
每次轮询最多等待的时间,然后重试。 |
|||
|
正整数值,指定快照处理器在重新扫描表以查找新的 CDC 启用表之前等待的毫秒数。默认为 10000 毫秒,即 10 秒。 |
|||
|
删除事件是否应具有后续的墓碑事件(true)或不(false)。重要的是要注意,在 Cassandra 中,两个具有相同键的事件可能正在更新给定表的不同列。因此,如果在压缩过程中这些记录尚未被消费者消费,可能会导致记录丢失。换句话说,如果您启用了 Kafka 压缩,请勿将此设置为 true。 |
|||
无默认值 |
一个逗号分隔的字段完全限定名称列表,这些字段应从更改事件消息值中排除。字段的完全限定名称格式为 keyspace_name>.<field_name>.<nested_field_name>。 |
|||
|
更改事件队列和队列处理器的数量。默认为 1。 |
|||
|
在流式处理期间将被跳过的操作类型的逗号分隔列表。操作包括: |
|||
|
用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 |
|||
|
指定主题名称的分隔符,默认为 |
|||
无默认值 |
将用于所有主题的前缀名称。
|
|||
|
用于保存主题名称的有界并发哈希映射的大小。此缓存有助于确定与给定数据集合对应的主题名称。 |
|||
|
控制连接器发送心跳消息的主题名称。主题名称具有以下模式 |
|||
|
指定 |
|||
|
指定 |
|||
none |
指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:
|
|||
none |
指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:
有关更多详细信息,请参阅 Avro 命名。 |
|||
|
自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如: |
|||
|
指定在操作因可重试错误(如连接错误)失败后连接器的响应方式。
|
|||
true |
此属性指定 Debezium 是否将带有 该属性添加了以下头:
|
如果 Cassandra 代理使用 SSL 连接到 Cassandra 节点,则需要 SSL 配置文件。以下示例显示了如何编写 SSL 配置文件
keyStore.location=/var/private/ssl/cassandra.keystore.jks
keyStore.password=cassandra
keyStore.type=JKS
trustStore.location=/var/private/ssl/cassandra.truststore.jks
trustStore.password=cassandra
trustStore.type=JKS
keyManager.algorithm=SunX509
trustManager.algorithm=SunX509
cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
|
cipherSuites 字段不是强制性的,它只是允许您添加一个(或多个)不存在的密码。trustStore.type 和 keyStore.type 的默认值是 JKS。keyManager.algorithm 和 trustManager.algorithm 的默认值是 SunX509。 |
该连接器还支持在创建 Kafka 生产者时使用的传递配置属性。具体来说,所有以 kafka.producer. 前缀开头的连接器配置属性(不带前缀)在创建将事件写入 Kafka 的 Kafka 生产者时都会被使用。
例如,以下连接器配置属性可用于 保护到 Kafka 代理的连接
kafka.producer.security.protocol=SSL
kafka.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.producer.ssl.keystore.password=test1234
kafka.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.producer.ssl.truststore.password=test1234
kafka.producer.ssl.key.password=test1234
kafka.consumer.security.protocol=SSL
kafka.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.consumer.ssl.keystore.password=test1234
kafka.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.consumer.ssl.truststore.password=test1234
kafka.consumer.ssl.key.password=test1234
请务必查阅 Kafka 文档 以获取 Kafka 生产者的所有配置属性。
该连接器支持以下 Kafka Connect 转换器用于键/值序列化
io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
com.blueapron.connect.protobuf.ProtobufConverter