Cassandra 连接器

Cassanadra 连接器可以监控 Cassandra 集群并记录所有行级别更改。该连接器必须本地部署在 Cassandra 集群的每个节点上。当连接器首次连接到 Cassandra 节点时,它会对所有键空间中启用了 CDC 的表进行快照。连接器还将读取写入 Cassandra 提交日志的更改,并生成相应的插入、更新和删除事件。每个表的事件都记录在单独的 Kafka 主题中,应用程序和服务可以轻松地对其进行消费。

有关此连接器兼容的 Cassandra 版本信息,请参阅 Debezium 版本概述

概述

Cassandra 是一个开源的 NoSQL 数据库。与大多数数据库类似,Cassandra 的写入路径以立即将更改记录到其提交日志开始。提交日志驻留在每个节点本地,记录对该节点的每一次写入。

自 Cassandra 3.0 起,引入了 更改数据捕获 (CDC) 功能。可以通过设置表属性 cdc=true 在表级别启用 CDC 功能,之后包含 CDC 启用表数据的任何提交日志在丢弃时都会被移动到 cassandra.yaml 中指定的 CDC 目录。

Cassandra 连接器驻留在每个 Cassandra 节点上,并监控 cdc_raw 目录中的更改。它会处理检测到的所有本地提交日志段,为提交日志中的每一行插入、更新和删除操作生成一个更改事件,将每个表的事件发布到单独的 Kafka 主题,最后从 cdc_raw 目录中删除提交日志。最后一步很重要,因为一旦启用了 CDC,Cassandra 本身就无法清除提交日志。如果 cdc_free_space_in_mb 填满,将拒绝对已启用 CDC 的表的写入。

该连接器具有故障容忍能力。当连接器读取提交日志并生成事件时,它会记录每个提交日志段的文件名和位置以及每个事件。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它将继续从上次停止的地方读取提交日志。这包括快照:如果在连接器停止时快照未完成,重新启动时将开始一个新的快照。我们将在后面讨论连接器 在出现问题时 的行为。

Cassandra 与其他 Debezium 连接器不同,因为它不是基于 Kafka Connect 框架实现的。相反,它是一个独立的 JVM 进程,驻留在每个 Cassandra 节点上,并通过 Kafka 生产者将事件发布到 Kafka。

以下功能目前不受 Cassandra 连接器支持。任何这些功能导致的更改都将被忽略

  • 集合类型列上的 TTL

  • 范围删除

  • 静态列

  • 触发器

  • 物化视图

  • 二级索引

  • 轻量级事务

设置 Cassandra

在使用 Debezium Cassandra 连接器监控 Cassandra 集群的更改之前,必须在节点级别和表级别启用 CDC。

在节点上启用 CDC

要启用 CDC,请在 cassandra.yaml 中更新以下 CDC 配置

cdc_enabled: true

其他 CDC 配置具有以下默认值

cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw
cdc_free_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
  • cdc_enabled 启用或禁用节点范围内的 CDC 操作

  • cdc_raw_directory 确定在所有 memtables 刷新后提交日志段移动到的目标位置。

  • cdc_free_space_in_mb 是分配用于存储提交日志段的最大容量,默认为 4096 MB 和卷空间 1/8 中的较小者。

  • cdc_free_space_check_interval_ms 是重新计算 cdc_raw_directory 所占用空间频率,以避免在容量已满时浪费 CPU 周期。

在表上启用 CDC

一旦在 Cassandra 节点上启用了 CDC,每个表也必须通过 CREATE TABLE 或 ALTER TABLE 命令显式启用 CDC。例如

CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;

ALTER TABLE foo WITH cdc=true;

Cassandra 连接器的工作原理

本节详细介绍了 Cassandra 连接器如何执行快照,将提交日志事件转换为 Debezium 更改事件,处理提交日志生命周期,将事件记录到 Kafka,管理模式演进以及在出现问题时的行为。

快照

当 Cassandra 连接器首次在 Cassandra 节点上启动时,默认情况下将执行初始快照。这是默认模式,因为大多数情况下 CDC 是在非空表上启用的,并且提交日志不包含完整的历史记录。

快照读取器发出 SELECT 语句来查询表中的所有列。Cassandra 允许全局或在语句级别设置一致性级别。对于快照,一致性级别默认在语句级别设置为 ALL,以提供最高的一致性。这意味着如果在快照期间一个节点宕机,快照将无法继续,并且在节点恢复在线后需要进行后续的重新快照。您可以将快照的一致性级别调整为较低的一致性级别以提高可用性,前提是您了解一致性方面的权衡。

在 Cassandra 3.X 中,无法仅从本地 Cassandra 节点读取。从 Cassandra 4.0 开始,将添加 NODE_LOCAL 一致性级别。这将允许 Cassandra 连接器仅从其所在的节点读取(这与提交日志的处理方式一致)。

与关系数据库不同,快照期间不应用读锁定,因此写入 Cassandra 不会被阻塞。如果在快照期间查询的数据被另一个客户端修改,这些更改可能会反映在快照结果集中。

如果连接器在快照完成之前失败或停止,连接器将在重新启动时开始一个新的快照。在默认快照模式 (initial) 下,一旦连接器完成初始快照,它将不再执行任何其他快照。唯一的例外是在连接器重启期间:如果对某个表启用了 CDC,然后重启了连接器,那么该表将被快照。

第二个快照模式 (always) 允许连接器在需要时执行快照。它会定期检查新启用的 CDC 表,并在检测到后立即对其进行快照。

第三个快照模式 ('never') 确保连接器从不执行快照。当以这种方式配置新连接器时,它只会读取 CDC 目录中的提交日志。这不是默认行为,因为以这种模式(无快照)启动新连接器要求提交日志包含所有已启用 CDC 表的完整历史记录,而这通常并非如此。此模式的另一个用例是,如果已经有一个连接器正在进行快照,您可以禁用其他连接器的快照以避免重复工作。

读取提交日志

Cassandra 连接器通常会花费大部分时间读取 Cassandra 节点上的本地提交日志。在 Cassandra 4.0 中,每次 fsync 都会更新一个索引文件以反映最新的偏移量。这消除了 Cassandra 3.X 中 CDC 功能的处理延迟,并且可以通过将配置设置为 commit.log.real.time.processing.enabledtrue 来在 Cassandra 4 Debezium 连接器中启用。索引文件轮询的频率由 commit.log.marked.complete.poll.interval.ms 决定。

提交日志的二进制数据使用 Cassandra 的 CommitLogReader 和 CommitLogReadHandler 进行反序列化。每个反序列化的对象在 Cassandra 中称为一个 mutation。一个 mutation 包含一个或多个更改事件。

当 Cassandra 连接器读取提交日志时,它会将日志事件转换为 Debezium 的*创建*、*更新*或*删除*事件,其中包括事件在提交日志中的位置。Cassandra 连接器使用 Kafka Connect 转换器对这些更改事件进行编码,并将它们发布到相应的 Kafka 主题。

提交日志的局限性

Cassandra 的提交日志存在一系列限制,这些限制对于正确解释 CDC 事件至关重要

  • 只有当提交日志已满时,它们才会进入 cdc_raw 目录,然后会被刷新/丢弃。这意味着事件被记录与事件被捕获之间存在延迟。

  • 单个 Cassandra 节点上的提交日志不反映集群的所有写入,它们仅反映存储在该节点上的写入。因此,有必要监控 Cassandra 集群中所有节点的更改。然而,由于复制因子,这也意味着下游事件消费者需要处理去重。

  • 单个 Cassandra 节点上的写入按到达顺序记录。然而,这些事件可能以与其发出顺序不同的顺序到达。这些事件的下游消费者必须理解并实现类似于 Cassandra 读取路径的逻辑以获得正确的结果。

  • 表的模式更改未记录在提交日志中,仅记录数据更改。因此,模式更改的检测是尽力而为的。为避免数据丢失,建议在模式更改期间暂停对表的写入。

  • Cassandra 不执行写前读,因此提交日志不记录已更改行的每一列的值,它只记录已修改列的值(分区键列除外,它们始终被记录,因为它们在 Cassandra DML 命令中是必需的)。

  • 由于 CQL 的特性,*insert* DML 可能导致行插入或更新;*update* DML 可能导致行插入、更新或删除;*delete* DML 可能导致行更新或删除。由于查询未记录在提交日志中,CDC 事件类型根据其对关系数据库意义上的行的影响进行分类。

待办:是否有方法确定与实际 Cassandra DML 语句对应的事件类型?如果有,这是否比这些事件的语义更可取?

管理提交日志生命周期

默认情况下,Cassandra 连接器会删除已处理的提交日志。不建议在禁用提交日志删除的情况下启动连接器,因为这可能会导致磁盘存储膨胀并阻止对 Cassandra 集群进行进一步写入。要以自定义方式管理提交日志(例如,将其上传到云提供商),可以实现 CommitLogTransfer 接口。

主题名称

Cassandra 连接器将单个表上的所有插入、更新和删除操作的事件写入单个 Kafka 主题。Kafka 主题的名称始终采用以下形式

clusterName.keyspaceName.tableName

其中 clusterName 是连接器的逻辑名称,如 topic.prefix 配置属性指定,keyspaceName 是操作发生的键空间的名称,tableName 是操作发生的表的名称。

例如,假设有一个 Cassandra 安装,其中有一个 inventory 键空间,包含四个表:productsproducts_on_handcustomersorders。如果监控此数据库的连接器被赋予的逻辑服务器名称为 fulfillment,则该连接器将在以下四个 Kafka 主题上生成事件

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

待办:对于主题名称,*clusterName*.*keyspaceName*.*tableName* 是否可以?还是应该是 *connectorName*.*keyspaceName*.*tableName* 或 *connectorName*.*clusterName*.*keyspaceName*.*tableName*?

模式演进

DDL 未记录在提交日志中。当表模式发生更改时,此更改从 Cassandra 节点之一发出,并通过 Gossip 协议传播到其他节点。

Cassandra 中的模式更改将被实现的 SchemaChangeListener 检测到,延迟小于 1 秒,然后更新从 Cassandra 加载的模式实例以及为每个表缓存的 Kafka 键值模式。

请注意,通过当前的模式演进方法,在以下情况下,Cassandra 连接器将在很短的时间内无法提供准确的数据更改信息

  1. 如果表禁用了 CDC,那么在禁用 CDC 之前发生的数据更改将被跳过。

  2. 如果从表中删除了一个列,那么在此列删除之前涉及该列的数据更改将无法正确反序列化,将被跳过。

事件

Cassandra 连接器生成的所有数据更改事件都具有键和值,尽管键和值的结构取决于更改事件起源的表(请参阅 主题名称)。

更改事件的键

对于给定的表,更改事件的键的结构包含一个字段,用于表示事件创建时表中主键的每个列。考虑一个 inventory 数据库,其中一个 customers 表定义如下

CREATE TABLE customers (
  id bigint,
  registration_date timestamp,
  first_name text,
  last_name text,
  email text,
  PRIMARY KEY (id, registration_date)
);

对于 customers 表的所有更改事件(在其具有此定义时),将具有相同的键架构,在 JSON 表示中如下所示

{
  "type": "record",
  "name": "cassandra-cluster-1.inventory.customers.Key",
  "namespace": "io.debezium.connector.cassandra",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "registration_date",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

对于 id = 1001 和 registration_date = 1562202942545,键的 JSON 表示如下

{
  "id": 1001,
  "registration_date": 1562202942545
}

虽然 field.exclude.list 配置属性允许您从事件值中删除列,但主键中的所有列始终包含在事件的键中。

更改事件的值

更改事件消息的值更复杂一些。Cassandra 连接器生成的每个更改事件值都包含一个信封结构,其中包含以下字段

op (操作)

一个强制字段,包含一个字符串值,描述操作类型。Cassandra 连接器的值为 i 表示插入,u 表示更新,d 表示删除。

after

如果存在,一个可选字段,包含事件发生*后*行的状态。结构将由 cassandra-cluster-1.inventory.customers.Value Kafka Connect 模式描述,该模式表示事件引用的集群、键空间和表。

source (源)

一个强制字段,包含描述事件源元数据的结构,对于 Cassandra,该结构包含以下字段

  • Debezium 版本。

  • 连接器名称。

  • Cassandra 集群名称。

  • 事件在其中记录的提交日志文件名,该提交日志文件中的事件出现位置,此事件是否属于快照,受影响的键空间和表的名称,以及分区更新的最大时间戳(微秒)。

ts_ms

(可选) 如果存在,包含连接器处理事件的时间,基于运行 Cassandra 连接器的 JVM 的系统时钟。

由于 Cassandra 不执行写前读,Cassandra 提交日志不记录更改应用前行的值。因此,Cassandra 更改事件记录不包含 before 字段。

以下是我们 customers 表的*创建*事件的值模式的 JSON 表示

{
  "type": "record",
  "name": "cassandra-cluster-1.inventory.customers.Envelope",
  "namespace": "io.debezium.connector.cassandra",
  "fields": [
      {
        "name": "op",
        "type": "string"
      },
      {
        "name": "ts_ms",
        "type": "long",
        "logicalType": "timestamp-millis"
      },
      {
        "name": "after",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": [
              "null",
              {
            "name": "id",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "registration_date",
            "type": [
              "null",
              {
            "name": "registration_date",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "long",
                "logical_type": "timestamp-millis"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "first_name",
            "type": [
              "null",
              {
            "name": "first_name",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "last_name",
            "type": [
              "null",
              {
            "name": "last_name",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "last_name",
            "type": [
              "null",
              {
            "name": "email",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          }
        ]
      },
      {
        "name": "source",
        "type": "record",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "cluster",
            "type": "string"
          },
          {
            "name": "snapshot",
            "type": "boolean"
          },
          {
            "name": "keyspace",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "position",
            "type": "int"
          },
          {
            "name": "ts_ms",
            "type": "long",
            "logicalType": "timestamp-micros"
          }
        ]
      }
  ]
}

待办:在删除 DDL 的情况下,验证 max timestamp != deletion timestamp

给定以下 insert DML

INSERT INTO customers (
  id,
  registration_date,
  first_name,
  last_name,
  email)
VALUES (
  1001,
  now(),
  "Anne",
  "Kretchmar",
  "annek@noanswer.org"
);

JSON 表示的值负载如下

{
  "op": "c",
  "ts_ms": 1562202942832,
  "ts_us": 1562202942832014,
  "ts_ns": 1562202942832014962,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": null,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": null,
    "set": true
  },
  "first_name": {
    "value": "Anne",
    "deletion_ts": null,
    "set": true
  },
  "last_name": {
    "value": "Kretchmar",
    "deletion_ts": null,
    "set": true
  },
  "email": {
    "value": "annek@noanswer.org",
    "deletion_ts": null,
    "set": true
  }
  },
  "source": {
    "version": "3.3.1.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 54,
    "ts_ms": 1562202942666382,
    "ts_us": 1562202942666382000,
    "ts_ns": 1562202942666382000000
  }
}

给定以下 update DML

UPDATE customers
SET email = "annek_new@noanswer.org"
WHERE id = 1001 AND registration_date = 1562202942545

JSON 表示的值负载如下

{
  "op": "u",
  "ts_ms": 1562202942912,
  "ts_us": 1562202942912014,
  "ts_ns": 1562202942912014982,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": null,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": null,
    "set": true
  },
  "first_name": null,
  "last_name": null,
  "email": {
    "value": "annek_new@noanswer.org",
    "deletion_ts": null,
    "set": true
  }
  },
  "source": {
    "version": "3.3.1.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 102,
    "ts_ms": 1562202942666490,
    "ts_us": 1562202942666490000,
    "ts_ns": 1562202942666490000000
  }
}

与*insert* 事件中的值进行比较,我们可以看到几个不同之处

  • op 字段值现在是 u,表示该行由于更新而更改。

  • after 字段现在包含行的更新状态,这里可以看到 email 值现在是 annek_new@noanswer.org。请注意,first_namelast_name 为 null,因为这些字段在此更新中未更改。但是,idregistration_date 仍然包含在内,因为它们是此表的主键。

  • source 字段结构与以前的字段相同,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms 显示连接器处理此事件的时间戳(毫秒)。

最后,给定以下 delete DML

DELETE FROM customers
WHERE id = 1001 AND registration_date = 1562202942545;

JSON 表示的值负载如下

{
  "op": "d",
  "ts_ms": 1562202942912,
  "ts_us": 1562202942912047,
  "ts_ns": 1562202942912047921,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": 1562202972545,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": 1562202972545,
    "set": true
  },
  "first_name": null,
  "last_name": null,
  "email": null
  },
  "source": {
    "version": "3.3.1.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 102,
    "ts_ms": 1562202942666490,
    "ts_us": 1562202942666490000,
    "ts_ns": 1562202942666490000000
  }
}

与*insert* 和 *update* 事件中的值进行比较,我们可以看到几个不同之处

  • op 字段的值现在是 d,表示此行因删除而更改。

  • after 字段仅包含 idregistration_date 的值,因为这是按主键删除。

  • source 字段结构与以前的字段相同,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms 显示连接器处理此事件的时间戳(毫秒)。

待办:鉴于 TTL 目前不受支持,是否最好移除 delete_ts?通过查看每个列是否为 null 来推断字段是否已设置是否也可以?

待办:讨论 Cassandra 连接器中的墓碑事件

数据类型

如上所述,Cassandra 连接器使用与行所在表结构相似的事件来表示行的更改。事件包含一个字段用于表示每个列的值,该值如何表示在事件中取决于列的 Cassandra 数据类型。本节描述了此映射。

下表描述了连接器如何将每种 Cassandra 数据类型映射到 Kafka Connect 数据类型。

Cassandra 数据类型

字面类型 (Schema 类型)

语义类型 (Schema 名称)

ascii

string

n/a

bigint

int64

n/a

blob

bytes

n/a

boolean

boolean

n/a

counter

int64

n/a

date

int32

io.debezium.time.Date

decimal

float64

n/a

double

float64

n/a

float

float32

n/a

frozen

bytes

n/a

inet

string

n/a

int

int32

n/a

列表

array

n/a

map

map

n/a

set

array

n/a

smallint

int16

n/a

text

string

n/a

time

int64

n/a

timestamp

int64

io.debezium.time.Timestamp

timeuuid

string

io.debezium.data.Uuid

tinyint

int8

n/a

tuple

map

n/a

uuid

string

io.debezium.data.Uuid

varchar

string

n/a

varint

int64

n/a

duration

int64

io.debezium.time.NanoDuration (纳秒级持续时间值的近似表示)

待办:添加逻辑类型

任意精度整数类型

Cassandra 连接器根据 varint.handling.mode 连接器配置属性 的设置来处理 varint 值。

varint.handling.mode=long
表 1. 当 varint.handling.mode=long 时的映射
Cassandra 类型 文字类型 语义类型

varint

INT64

n/a

varint.handling.mode=precise
表 2. 当 decimal.handling.mode=precise 时的映射
Cassandra 类型 文字类型 语义类型

varint

BYTES

org.apache.kafka.connect.data.Decimal
scale schema 参数设置为零。

varint.handling.mode=string
表 3. 当 varint.handling.mode=string 时的映射
Cassandra 类型 文字类型 语义类型

varint

STRING

n/a

Decimal 类型

Cassandra 连接器根据 decimal.handling.mode 连接器配置属性 的设置来处理 decimal 值。

decimal.handling.mode=double
表 4. 当 decimal.handling.mode=double 时的映射
Cassandra 类型 文字类型 语义类型

decimal

FLOAT64

n/a

decimal.handling.mode=precise
表 5. 当 decimal.handling.mode=precise 时的映射
Cassandra 类型 文字类型 语义类型

decimal

STRUCT

io.debezium.data.VariableScaleDecimal
包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode=string
表 6. 当 decimal.handling.mode=string 时的映射
Cassandra 类型 文字类型 语义类型

decimal

STRING

n/a

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

出现问题时

配置和启动错误

如果配置无效或连接器无法使用指定的连接参数成功连接到 Cassandra,Cassandra 连接器将在启动时失败,并在日志中报告错误或异常,然后停止运行。在这种情况下,错误将提供有关问题的更多详细信息,并可能建议一种解决方法。在更正配置后,可以重新启动连接器。

Cassandra 变得不可用

一旦连接器正在运行,如果 Cassandra 节点因任何原因变得不可用,连接器将失败并停止。在这种情况下,请在服务器可用后重新启动连接器。如果这发生在快照期间,它将从表的开头重新引导整个表。

Cassandra 连接器正常停止

如果 Cassandra 连接器被正常关闭,在停止进程之前,它将确保将 ChangeEventQueue 中的所有事件刷新到 Kafka。Cassandra 连接器跟踪每次流式记录发送到 Kafka 时的文件名和偏移量。因此,当连接器重新启动时,它将从上次停止的地方继续。它通过搜索目录中最旧的提交日志,开始处理该提交日志,跳过已读取的记录,直到找到尚未处理的最新记录来实现这一点。如果在快照期间停止了 Cassandra 连接器,它将从该表继续,但会重新引导整个表。

Cassandra 连接器崩溃

如果 Cassandra 连接器意外崩溃,那么 Cassandra 连接器可能会在未记录最近处理的偏移量的情况下终止。在这种情况下,当连接器重新启动时,它将从最近记录的偏移量继续。这意味着可能会有重复(鉴于我们已经从 RF 获得重复,这很容易)。请注意,由于偏移量仅在记录成功发送到 Kafka 时更新,因此在崩溃期间丢失 ChangeEventQueue 中未发出的数据是可以接受的,因为这些事件将被重新创建。

Kafka 变得不可用

当连接器生成更改事件时,它将使用 Kafka 生产者 API 将这些事件发布到 Kafka。如果 Kafka 代理变得不可用(生产者遇到 TimeoutException),Cassandra 连接器将每秒尝试重新连接到代理一次,直到成功为止。

Cassandra 连接器停止一段时间

根据表的写入负载,当 Cassandra 连接器长时间停止时,它可能会达到 cdc_total_space_in_mb 容量。一旦达到此上限,Cassandra 将停止接受对此表的写入;这意味着在运行 Cassandra 连接器时监控此空间非常重要。在最坏的情况下,如果发生这种情况,请完成以下步骤

  1. 关闭 Cassandra 连接器。

  2. 禁用表的 CDC,使其停止生成其他写入。由于提交日志未进行过滤,同一节点上其他已启用 CDC 的表的写入仍可能影响提交日志文件的生成。

  3. 从偏移量文件中删除记录的偏移量

  4. 在增加容量或控制目录已用空间后,重新启动连接器以重新引导该表。

Cassandra 表 CDC 已启用,然后暂时禁用,之后再次启用

如果 Cassandra 表暂时禁用 CDC,然后过一段时间后重新启用它,则必须重新引导。要重新引导单个表,可以手动删除 snapshot_offset.properties 文件中与该表对应的记录的偏移量行。

部署连接器

Cassandra 连接器应部署在 Cassandra 集群的每个 Cassandra 节点上。Cassandra 连接器 Jar 文件接受一个 CDC 配置(.properties)文件。有关参考,请参阅 示例配置

示例配置

以下是一个用于在本地运行和测试 Cassandra 连接器的 .properties 配置文件的示例

connector.name=test_connector
commit.log.relocation.dir=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/
http.port=8000

cassandra.config=/usr/local/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042

kafka.producer.bootstrap.servers=127.0.0.1:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_prefix

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: https://:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: https://:8081

offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/

snapshot.consistency=ONE
snapshot.mode=ALWAYS

latest.commit.log.only=true

连接配置

Cassanadra 连接器使用 Cassandra 驱动程序来配置到 Cassanadra 的连接,这必须使用单独的 application.conf 文件提供。您可以在 这里 找到驱动程序配置的完整参考,下面是一个示例

datastax-java-driver {
  basic {
    request.timeout = 20 seconds
    contact-points = [ "spark-master-1:9042" ]
    load-balancing-policy {
      local-datacenter = "dc1"
    }
  }
  advanced {
    auth-provider {
      class = PlainTextAuthProvider
      username = user
      password = pass
    }
    ssl-engine-factory {
     ...
    }
  }
}

为了让 Debezium 连接器读取/使用此应用程序配置文件,必须在连接器属性文件中进行设置,如下所示

cassandra.driver.config.file=/path/to/application/configuration.conf

监控

Cassandra 连接器内置了对 JMX 指标的支持。Cassandra 驱动程序还发布了许多关于驱动程序活动的指标,这些指标可以通过 JMX 进行监控。该连接器有两种类型的指标。

  • 快照指标 提供有关连接器在执行快照期间的操作信息。

  • 流式处理指标 提供有关连接器在捕获更改并流式传输更改事件记录时操作的信息。

快照指标

MBeandebezium.cassandra:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

属性名称 Type 描述

TotalTableCount

int

包含在快照中的表总数。

RemainingTableCount

int

快照尚未复制的表数。

SnapshotRunning

boolean

快照是否已启动。

SnapshotAborted

boolean

快照是否被中止。

SnapshotCompleted

boolean

快照是否已完成。

SnapshotDurationInSeconds

long

到目前为止快照所花费的总秒数,即使未完成。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

流式处理指标

MBeandebezium.cassandra:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

属性名称 Type 描述

CommitLogFilename

string

连接器最近读取的提交日志文件名。

CommitLogPosition

long

连接器读取的提交日志中的最新位置(以字节为单位)。

NumberOfProcessedMutations

long

已处理的 mutations 数量。

NumberOfUnrecoverableErrors

long

处理提交日志时发生不可恢复错误的数量。

连接器属性

属性

Default (默认值)

描述

INITIAL

指定 Cassandra 连接器代理启动时运行快照(例如,初始同步)的标准。必须是 'INITIAL'、'ALWAYS' 或 'NEVER' 之一。默认快照模式为 'INITIAL'。

ALL

指定用于快照查询的 {@link ConsistencyLevel}。

8000

HTTP 服务器用于 ping、健康检查和构建信息的端口

无默认值

Cassandra 节点使用的 YAML 配置文件绝对路径。

application.conf

Cassandra 驱动程序配置文件路径

false

仅适用于 Cassandra 4,如果设置为 true,Cassandra 连接器代理将通过监视提交日志索引文件的更新来增量读取提交日志,并以 commit.log.marked.complete.poll.interval.ms 确定的频率实时流式传输数据。如果设置为 false,则 Cassandra 4 连接器会等待提交日志文件被标记为完成,然后才进行处理。

10000

仅适用于 Cassandra 4,并且当通过 commit.log.real.time.processing.enabled 启用实时流式处理时。此配置决定了提交日志索引文件轮询偏移量值更新的频率。

无默认值

提交日志处理后从 cdc_raw 目录移动到的本地目录。

true

确定 CommitLogPostProcessor 是否运行以将已处理的提交日志从重定位目录移出。如果禁用,提交日志将不会从重定位目录中移出。

10000

CommitLogPostProcessor 等待重新获取重定位目录中所有已处理提交日志的时间。

io.debezium.connector.cassandra.BlackHoleCommitLogTransfer

CommitLogPostProcessor 用于将已处理的提交日志从重定位目录移出的类。内置的传输类是 BlackHoleCommitLogTransfer,它只是从重定位目录中删除所有已处理的提交日志。用户应根据需要实现自己的自定义提交日志传输类。

false

确定 CommitLogProcessor 是否重新处理错误提交日志。

COMMITLOG_FILE

指定如何确保更改事件的顺序。每个选项都指示用于哈希的属性。共享相同哈希值的事件将保持其顺序。建议使用 'PARTITION_VALUES' 来使哈希策略与 Kafka 中的消息对齐。必须是 'COMMITLOG_FILE' 或 'PARTITION_VALUES' 之一。

无默认值

枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:

isbn

您必须设置 converters 属性才能启用连接器使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

isbn.type: io.debezium.test.IsbnConverter

如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如:

isbn.schema.name: io.debezium.cassandra.type.Isbn

无默认值

存储偏移量跟踪文件的目录。

0

提交偏移量之前等待的最短时间。默认值 0 表示每次都刷新偏移量。

100

在需要将偏移量刷新到磁盘之前允许处理的最大记录数。此配置仅在 offset_flush_interval_ms != 0 时有效。

8192

正整数值,指定从提交日志读取的更改事件在写入 Kafka 之前放置的阻塞队列的最大大小。此队列可以为提交日志读取器提供回压,例如,当写入 Kafka 变慢或 Kafka 不可用时。出现在队列中的事件不包括在此连接器定期记录的偏移量中。默认为 8192,应始终大于 max.batch.size 属性中指定的 maximum batch size。在这些事件被转换为 Kafka Connect struct 并发送到 Kafka 之前,队列容纳反序列化记录的能力。

2048

每次要出队的更改事件的最大数量。

0

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果同时设置了 max.queue.size,当队列大小达到任一属性指定的限制时,写入队列将被阻塞。例如,如果设置了 max.queue.size=1000max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后,或在队列中的记录量达到 5000 字节后,写入队列将被阻塞。

1000

正整数值,指定提交日志处理器在每次迭代中等待新更改事件出现在队列中的毫秒数。默认为 1000 毫秒,即 1 秒。

10000

正整数值,指定模式处理器在刷新缓存的 Cassandra 表模式之前等待的毫秒数。

10000

每次轮询最多等待的时间,然后重试。

10000

正整数值,指定快照处理器在重新扫描表以查找新的 CDC 启用表之前等待的毫秒数。默认为 10000 毫秒,即 10 秒。

false

删除事件是否应具有后续的墓碑事件(true)或不(false)。重要的是要注意,在 Cassandra 中,两个具有相同键的事件可能正在更新给定表的不同列。因此,如果在压缩过程中这些记录尚未被消费者消费,可能会导致记录丢失。换句话说,如果您启用了 Kafka 压缩,请勿将此设置为 true。

无默认值

一个逗号分隔的字段完全限定名称列表,这些字段应从更改事件消息值中排除。字段的完全限定名称格式为 keyspace_name>.<field_name>.<nested_field_name>。

1

更改事件队列和队列处理器的数量。默认为 1。

t

在流式处理期间将被跳过的操作类型的逗号分隔列表。操作包括:c 表示插入/创建,u 表示更新,d 表示删除,t 表示截断,none 表示不跳过任何操作。默认情况下,截断操作被跳过(不从此连接器发出)。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

.

指定主题名称的分隔符,默认为 .

无默认值

将用于所有主题的前缀名称。

请勿更改此属性的值。如果您更改名称值,重新启动后,连接器将不会继续将事件发送到原始主题,而是会将后续事件发送到名称基于新值的那些主题。连接器也无法恢复其数据库模式历史主题。

10000

用于保存主题名称的有界并发哈希映射的大小。此缓存有助于确定与给定数据集合对应的主题名称。

__debezium-heartbeat

控制连接器发送心跳消息的主题名称。主题名称具有以下模式

topic.heartbeat.prefix.topic.prefix

例如,如果数据库服务器名称或主题前缀是 fulfillment,则默认主题名称是 __debezium-heartbeat.fulfillment

long

指定 varint 列如何在更改事件中表示。可能设置为

long(默认)使用 Java 的 long 表示值,这可能不提供精度,但消费者易于使用。

precise 使用 java.math.BigDecimal 表示值,这些值使用二进制表示和 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 类型在更改事件中进行编码。

string 将值编码为格式化字符串,易于消费。

double

指定 decimal 列如何在更改事件中表示。可能设置为

double(默认)使用 Java 的 double 表示值,这可能不提供精度,但消费者易于使用。

precise 使用 java.math.BigDecimal 表示值,这些值使用二进制表示和 Kafka Connect 的 org.apache.kafka.connect.data.VariableScaleDecimal 类型在更改事件中进行编码。

string 将值编码为格式化字符串,易于消费。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

无默认值

自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如:k1=v1,k2=v2

-1

指定在操作因可重试错误(如连接错误)失败后连接器的响应方式。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将带有 __debezium.context. 前缀的上下文头添加到其发出的消息中。OpenLineage 集成需要这些头,并提供元数据,使下游处理系统能够跟踪和识别更改事件的来源。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

如果 Cassandra 代理使用 SSL 连接到 Cassandra 节点,则需要 SSL 配置文件。以下示例显示了如何编写 SSL 配置文件

keyStore.location=/var/private/ssl/cassandra.keystore.jks
keyStore.password=cassandra
keyStore.type=JKS
trustStore.location=/var/private/ssl/cassandra.truststore.jks
trustStore.password=cassandra
trustStore.type=JKS
keyManager.algorithm=SunX509
trustManager.algorithm=SunX509
cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

cipherSuites 字段不是强制性的,它只是允许您添加一个(或多个)不存在的密码。trustStore.type 和 keyStore.type 的默认值是 JKS。keyManager.algorithm 和 trustManager.algorithm 的默认值是 SunX509。

该连接器还支持在创建 Kafka 生产者时使用的传递配置属性。具体来说,所有以 kafka.producer. 前缀开头的连接器配置属性(不带前缀)在创建将事件写入 Kafka 的 Kafka 生产者时都会被使用。

例如,以下连接器配置属性可用于 保护到 Kafka 代理的连接

kafka.producer.security.protocol=SSL
kafka.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.producer.ssl.keystore.password=test1234
kafka.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.producer.ssl.truststore.password=test1234
kafka.producer.ssl.key.password=test1234
kafka.consumer.security.protocol=SSL
kafka.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.consumer.ssl.keystore.password=test1234
kafka.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.consumer.ssl.truststore.password=test1234
kafka.consumer.ssl.key.password=test1234

请务必查阅 Kafka 文档 以获取 Kafka 生产者的所有配置属性。

该连接器支持以下 Kafka Connect 转换器用于键/值序列化

io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
com.blueapron.connect.protobuf.ProtobufConverter