您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

Debezium Cassandra 连接器

Cassanadra 连接器可以监视 Cassandra 集群并记录所有行级别的更改。连接器必须本地部署在 Cassandra 集群的每个节点上。连接器首次连接到 Cassandra 节点时,它会为所有键空间中的所有启用了 CDC 的表执行一次快照。连接器还会读取写入 Cassandra 提交日志的更改,并生成相应的插入、更新和删除事件。每个表的事件都记录在单独的 Kafka 主题中,应用程序和服务可以轻松地对其进行消费。

有关此连接器兼容的 Cassandra 版本的信息,请参阅 Debezium 版本概述

概述

Cassandra 是一个开源的 NoSQL 数据库。与大多数数据库类似,Cassandra 的写入路径始于将更改立即记录到其提交日志中。提交日志位于每个节点的本地,记录对该节点进行的每一次写入。

自 Cassandra 3.0 起,引入了 更改数据捕获 (CDC) 功能。通过将表属性 cdc=true 设置为 true,可以在表级别启用 CDC 功能。之后,包含 CDC 启用表数据的任何提交日志在被丢弃时,都会被移动到 cassandra.yaml 中指定的 CDC 目录。

Cassandra 连接器驻留在每个 Cassandra 节点上,并监视 cdc_raw 目录以捕获更改。它处理检测到的所有本地提交日志段,为提交日志中的每个行级别插入、更新和删除操作生成更改事件,将每个表的更改事件发布到单独的 Kafka 主题,最后从 cdc_raw 目录中删除提交日志。最后一步很重要,因为一旦启用了 CDC,Cassandra 本身就无法清除提交日志。如果 cdc_free_space_in_mb 满了,对启用了 CDC 的表的写入将被拒绝。

连接器具有故障容忍性。连接器读取提交日志并生成事件时,会记录每个提交日志段的文件名和位置以及每个事件。如果连接器因任何原因停止(包括通信故障、网络问题或崩溃),在重新启动时,它将从上次中断的地方继续读取提交日志。这包括快照:如果连接器停止时快照未完成,在重新启动时它将开始新的快照。稍后我们将讨论连接器 出现问题时 的行为。

Cassandra 与其他 Debezium 连接器不同,因为它不是构建在 Kafka Connect 框架之上的。相反,它是一个单一的 JVM 进程,旨在驻留在每个 Cassandra 节点上,并通过 Kafka 生产者将事件发布到 Kafka。

以下功能目前不受 Cassandra 连接器支持。任何这些功能导致的更改都将被忽略

  • 集合类型列上的 TTL

  • 范围删除

  • 静态列

  • 触发器

  • 物化视图

  • 二级索引

  • 轻量级事务

设置 Cassandra

在使用 Debezium Cassandra 连接器监视 Cassandra 集群中的更改之前,必须在节点级别和表级别启用 CDC。

在节点上启用 CDC

要启用 CDC,请在 cassandra.yaml 中更新以下 CDC 配置

cdc_enabled: true

其他 CDC 配置具有以下默认值

cdc_raw_directory: $CASSANDRA_HOME/data/cdc_raw
cdc_free_space_in_mb: 4096
cdc_free_space_check_interval_ms: 250
  • cdc_enabled 启用或禁用整个节点上的 CDC 操作

  • cdc_raw_directory 确定提交日志段在刷新所有相应的 memtables 后要移动到的目标位置。

  • cdc_free_space_in_mb 是用于存储提交日志段的最大容量,默认为 4096 MB 和卷空间 1/8 中的最小值。

  • cdc_free_space_check_interval_ms 是重新计算 cdc_raw_directory 所占空间频率,以避免在容量不足时浪费 CPU 周期。

在表上启用 CDC

在 Cassandra 节点上启用 CDC 后,还必须通过 CREATE TABLE 或 ALTER TABLE 命令为每个表显式启用 CDC。例如

CREATE TABLE foo (a int, b text, PRIMARY KEY(a)) WITH cdc=true;

ALTER TABLE foo WITH cdc=true;

Cassandra 连接器的工作原理

本节详细介绍了 Cassandra 连接器如何执行快照、将提交日志事件转换为 Debezium 更改事件、处理提交日志生命周期、将事件记录到 Kafka、管理模式演进以及在出现问题时的行为。

快照

当 Cassandra 连接器首次在 Cassandra 节点上启动时,默认情况下它将执行一次集群的初始快照。这是默认模式,因为大多数时候 CDC 是在非空表上启用的,并且提交日志不包含完整的历史记录。

快照读取器发出 SELECT 语句来查询表中的所有列。Cassandra 允许全局或语句级别的 Consistency Level 设置。对于快照,Consistency Level 默认在语句级别设置为 ALL 以提供最高的 Consistency。这意味着如果一个节点在快照期间宕机,快照将无法继续,并且在节点恢复联机后需要进行后续的重新快照。您可以将快照的 Consistency Level 调整为较低的 Consistency Level 以提高可用性,前提是您了解 Consistency 的权衡。

在 Cassandra 3.X 中,无法仅从本地 Cassandra 节点读取。从 Cassandra 4.0 开始,将添加 NODE_LOCAL Consistency Level。这将允许 Cassandra 连接器仅从其所在的节点读取(这将与提交日志的处理方式一致)。

与关系型数据库不同,快照期间没有应用读锁定,因此在快照期间不会阻止对 Cassandra 的写入。如果查询的数据在快照期间被另一个客户端修改,这些更改可能会反映在快照结果集中。

如果连接器在快照完成之前失败或停止,连接器将在重新启动时开始新的快照。在默认快照模式(initial)下,连接器完成初始快照后,将不再执行任何其他快照。唯一的例外是连接器重启期间:如果启用了表的 CDC,然后连接器重启,则该表将被快照。

第二个快照模式(always)允许连接器在需要时执行快照。它定期检查新启用的 CDC 的表,并在检测到后立即对其进行快照。

第三个快照模式('never')确保连接器从不执行快照。当以这种方式配置新连接器时,它将仅读取 CDC 目录中的提交日志。这不是默认行为,因为在这种模式下(没有快照)启动新连接器需要提交日志包含所有启用 CDC 的表的完整历史记录,这通常不是这种情况。此模式的另一个用例是,如果已经有一个连接器在进行快照,您可以禁用其他连接器的快照以避免重复工作。

读取提交日志

Cassandra 连接器通常会花费大部分时间读取 Cassandra 节点上的本地提交日志。在 Cassandra 4.0 中,每次 fsync 后,都会更新索引文件以反映最新的偏移量。这消除了 Cassandra 3.X 中 CDC 功能的处理延迟,并且可以通过将配置设置为 commit.log.real.time.processing.enabledtrue 来在 Cassandra 4 Debezium 连接器中启用。索引文件的轮询频率由 commit.log.marked.complete.poll.interval.ms 确定。

使用 Cassandra 的 CommitLogReader 和 CommitLogReadHandler 反序列化提交日志的二进制数据。每个反序列化的对象称为 Cassandra 中的一个 mutation。一个 mutation 包含一个或多个更改事件。

当 Cassandra 连接器读取提交日志时,它会将日志事件转换为 Debezium 的创建更新删除事件,其中包括事件在提交日志中的位置。Cassandra 连接器使用 Kafka Connect 转换器对这些更改事件进行编码,并将它们发布到相应的 Kafka 主题。

提交日志的限制

Cassandra 的提交日志存在一系列限制,这些限制对于正确解释 CDC 事件至关重要

  • 提交日志仅在 cdc_raw 目录已满时才会被写入,此时它将被刷新/丢弃。这意味着事件记录和事件捕获之间存在延迟。

  • 单个 Cassandra 节点上的提交日志并不反映集群的所有写入,它们只反映存储在该节点上的写入。这就是为什么必须监视 Cassandra 集群中所有节点的更改。然而,由于副本因子,这也意味着下游的事件消费者需要处理去重。

  • 单个 Cassandra 节点上的写入在到达时被记录。然而,这些事件可能与它们发出的顺序相反。下游的事件消费者必须理解并实现类似于 Cassandra 读取路径的逻辑以获得正确的输出。

  • 表的模式更改未记录在提交日志中,仅记录数据更改。因此,模式更改是基于最佳努力检测的。为避免数据丢失,建议在模式更改期间暂停对表的写入。

  • Cassandra 不执行写前读,因此提交日志不记录更改行中每个列的值,它仅记录已修改的列的值(分区键列除外,因为它们在 Cassandra DML 命令中总是必需的)。

  • 由于 CQL 的性质,插入 DML 可能导致行插入或更新;更新 DML 可能导致行插入、更新或删除;删除 DML 可能导致行更新或删除。由于查询未记录在提交日志中,CDC 事件类型是根据其对关系数据库中的行的影响来分类的。

待办事项:是否有方法确定与实际 Cassandra DML 语句对应的事件类型?如果有,这是否比这些事件的语义更优?

管理提交日志的生命周期

默认情况下,Cassandra 连接器将删除已处理的提交日志。不建议在禁用提交日志删除的情况下启动连接器,因为这可能会导致磁盘存储膨胀并阻止对 Cassandra 集群的进一步写入。要以自定义方式管理提交日志(例如,将其上传到云提供商),可以实现 CommitLogTransfer 接口。

主题名称

Cassandra 连接器将对单个表上的所有插入、更新和删除操作的事件写入单个 Kafka 主题。Kafka 主题的名称始终采用以下形式

clusterName.keyspaceName.tableName

其中 clusterName 是连接器的逻辑名称,如 topic.prefix 配置属性所指定的,keyspaceName 是操作发生键空间的名称,tableName 是操作发生表的名称。

例如,考虑一个具有 inventory 键空间,其中包含四个表的 Cassandra 安装:productsproducts_on_handcustomersorders。如果监视此数据库的连接器被赋予逻辑服务器名称 fulfillment,则连接器将在以下四个 Kafka 主题上生成事件

  • fulfillment.inventory.products

  • fulfillment.inventory.products_on_hand

  • fulfillment.inventory.customers

  • fulfillment.inventory.orders

待办事项:对于主题名称,clusterName.keyspaceName.tableName 可以吗?还是应该是 connectorName.keyspaceName.tableNameconnectorName.clusterName.keyspaceName.tableName

模式演进

DDL 未在提交日志中记录。当表的模式发生更改时,此更改将从 Cassandra 节点之一发出,并通过 Gossip 协议传播到其他节点。

Cassandra 中的模式更改将被实现的 SchemaChangeListener 检测到,延迟小于 1 秒,然后它将更新从 Cassandra 加载的模式实例以及为每个表缓存的 Kafka 键值模式。

请注意,使用当前的模式演进方法,在以下情况下,Cassandra 连接器无法在很短的时间内提供准确的数据更改信息

  1. 如果表的 CDC 被禁用,CDC 被禁用之前发生的数据更改将被跳过。

  2. 如果从表中删除了列,在删除之前涉及此列的数据更改将无法正确反序列化,将被跳过。

事件

Cassandra 连接器生成的所有数据更改事件都包含一个键和一个值,尽管键和值的结构取决于更改事件产生的表(请参阅 主题名称)。

更改事件的键

对于给定的表,更改事件的键将具有一个包含表中主键中每个列的字段的结构,截至事件创建时。考虑一个 inventory 数据库,其中有一个 customers 表定义如下

CREATE TABLE customers (
  id bigint,
  registration_date timestamp,
  first_name text,
  last_name text,
  email text,
  PRIMARY KEY (id, registration_date)
);

在具有此定义的 customers 表的所有更改事件中,键的模式将是相同的,其 JSON 表示形式如下

{
  "type": "record",
  "name": "cassandra-cluster-1.inventory.customers.Key",
  "namespace": "io.debezium.connector.cassandra",
  "fields": [
    {
      "name": "id",
      "type": "long"
    },
    {
      "name": "registration_date",
      "type": "long",
      "logicalType": "timestamp-millis"
    }
  ]
}

对于 id = 1001 和 registration_date = 1562202942545,键的 JSON 表示形式的有效负载如下

{
  "id": 1001,
  "registration_date": 1562202942545
}

虽然 field.exclude.list 配置属性允许您从事件值中删除列,但主键中的所有列始终包含在事件的键中。

更改事件的值

更改事件消息的值要复杂一些。Cassandra 连接器生成的每个更改事件值都有一个包含以下字段的信封结构

op (操作)

一个强制字段,包含一个字符串值,描述操作的类型。Cassandra 连接器的值为 i 表示插入,u 表示更新,d 表示删除。

after

如果存在,则是一个可选字段,包含事件发生行的状态。其结构由 cassandra-cluster-1.inventory.customers.Value Kafka Connect 模式描述,该模式表示事件所指的集群、键空间和表。

source (源)

一个强制字段,包含一个描述事件源元数据的结构,在 Cassandra 的情况下,该结构包含以下字段

  • Debezium 版本。

  • 连接器名称。

  • Cassandra 集群名称。

  • 事件记录的提交日志文件名,事件在该提交日志文件中的位置,此事件是否为快照的一部分,受影响的键空间和表名,以及分区更新的最大时间戳(以微秒为单位)。

ts_ms

(可选) 如果存在,包含连接器处理事件的时间,基于运行 Cassandra 连接器的 JVM 的系统时钟。

由于 Cassandra 不执行写前读,Cassandra 提交日志不记录更改应用前行的值。因此,Cassandra 更改事件记录不包含 before 字段。

以下是我们 customers 表的创建事件的值模式的 JSON 表示

{
  "type": "record",
  "name": "cassandra-cluster-1.inventory.customers.Envelope",
  "namespace": "io.debezium.connector.cassandra",
  "fields": [
      {
        "name": "op",
        "type": "string"
      },
      {
        "name": "ts_ms",
        "type": "long",
        "logicalType": "timestamp-millis"
      },
      {
        "name": "after",
        "type": "record",
        "fields": [
          {
            "name": "id",
            "type": [
              "null",
              {
            "name": "id",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "registration_date",
            "type": [
              "null",
              {
            "name": "registration_date",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "long",
                "logical_type": "timestamp-millis"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "first_name",
            "type": [
              "null",
              {
            "name": "first_name",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "last_name",
            "type": [
              "null",
              {
            "name": "last_name",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          },
          {
            "name": "last_name",
            "type": [
              "null",
              {
            "name": "email",
            "type": "record",
            "fields": [
              {
                "name":"value",
                "type": "string"
              },
              {
                "name":"deletion_ts",
                "type": ["null", "long"],
                "default" : "null"
              },
              {
                "name":"set",
                "type": "boolean"
              }
            ]
            }
          ]
          }
        ]
      },
      {
        "name": "source",
        "type": "record",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "cluster",
            "type": "string"
          },
          {
            "name": "snapshot",
            "type": "boolean"
          },
          {
            "name": "keyspace",
            "type": "string"
          },
          {
            "name": "table",
            "type": "string"
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "position",
            "type": "int"
          },
          {
            "name": "ts_ms",
            "type": "long",
            "logicalType": "timestamp-micros"
          }
        ]
      }
  ]
}

待办事项:验证在删除 DDL 的情况下,最大时间戳是否不等于删除时间戳

给定以下 insert DML

INSERT INTO customers (
  id,
  registration_date,
  first_name,
  last_name,
  email)
VALUES (
  1001,
  now(),
  "Anne",
  "Kretchmar",
  "annek@noanswer.org"
);

JSON 表示形式的值有效负载如下

{
  "op": "c",
  "ts_ms": 1562202942832,
  "ts_us": 1562202942832014,
  "ts_ns": 1562202942832014962,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": null,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": null,
    "set": true
  },
  "first_name": {
    "value": "Anne",
    "deletion_ts": null,
    "set": true
  },
  "last_name": {
    "value": "Kretchmar",
    "deletion_ts": null,
    "set": true
  },
  "email": {
    "value": "annek@noanswer.org",
    "deletion_ts": null,
    "set": true
  }
  },
  "source": {
    "version": "3.3.0.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 54,
    "ts_ms": 1562202942666382,
    "ts_us": 1562202942666382000,
    "ts_ns": 1562202942666382000000
  }
}

给定以下 update DML

UPDATE customers
SET email = "annek_new@noanswer.org"
WHERE id = 1001 AND registration_date = 1562202942545

JSON 表示形式的值有效负载如下

{
  "op": "u",
  "ts_ms": 1562202942912,
  "ts_us": 1562202942912014,
  "ts_ns": 1562202942912014982,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": null,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": null,
    "set": true
  },
  "first_name": null,
  "last_name": null,
  "email": {
    "value": "annek_new@noanswer.org",
    "deletion_ts": null,
    "set": true
  }
  },
  "source": {
    "version": "3.3.0.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 102,
    "ts_ms": 1562202942666490,
    "ts_us": 1562202942666490000,
    "ts_ns": 1562202942666490000000
  }
}

插入事件中的值进行比较时,我们会看到几个不同之处

  • op 字段值现在是 u,表示该行由于更新而更改。

  • after 字段现在包含了行的更新状态,在这里我们可以看到 email 的值现在是 annek_new@noanswer.org。请注意,first_namelast_name 为 null,因为这些字段在此更新期间未更改。但是,idregistration_date 仍包含在内,因为它们是此表的主键。

  • source 字段的结构与之前相同,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms 显示连接器处理此事件的毫秒时间戳。

最后,给定以下 delete DML

DELETE FROM customers
WHERE id = 1001 AND registration_date = 1562202942545;

JSON 表示形式的值有效负载如下

{
  "op": "d",
  "ts_ms": 1562202942912,
  "ts_us": 1562202942912047,
  "ts_ns": 1562202942912047921,
  "after": {
    "id": {
    "value": 1001,
    "deletion_ts": 1562202972545,
    "set": true
  },
    "registration_date": {
    "value": 1562202942545,
    "deletion_ts": 1562202972545,
    "set": true
  },
  "first_name": null,
  "last_name": null,
  "email": null
  },
  "source": {
    "version": "3.3.0.Final",
    "connector": "cassandra",
    "cluster": "cassandra-cluster-1",
    "snapshot": false,
    "keyspace": "inventory",
    "table": "customers",
    "file": "commitlog-6-123456.log",
    "pos": 102,
    "ts_ms": 1562202942666490,
    "ts_us": 1562202942666490000,
    "ts_ns": 1562202942666490000000
  }
}

插入更新事件中的值进行比较时,我们会看到几个不同之处

  • op 字段值现在是 d,表示该行由于删除而更改。

  • after 字段仅包含 idregistration_date 的值,因为这是通过主键进行的删除。

  • source 字段的结构与之前相同,但值不同,因为此事件来自提交日志中的不同位置。

  • ts_ms 显示连接器处理此事件的毫秒时间戳。

待办事项:鉴于 TTL 目前不受支持,删除删除时间戳是否更好?通过查看每个列是否为 null 来确定字段是否已设置是否也可以?

待办事项:讨论 Cassandra 连接器中的墓碑事件

数据类型

如上所述,Cassandra 连接器使用与行所在表结构相似的事件来表示行的更改。事件包含一个字段用于每个列值,该值在事件中的表示方式取决于列的 Cassandra 数据类型。本节将描述这种映射。

下表描述了连接器如何将每种 Cassandra 数据类型映射到 Kafka Connect 数据类型。

Cassandra 数据类型

字面类型 (Schema 类型)

语义类型 (Schema 名称)

ascii

string

n/a

bigint

int64

n/a

blob

bytes

n/a

boolean

boolean

n/a

counter

int64

n/a

date

int32

io.debezium.time.Date

decimal

float64

n/a

double

float64

n/a

float

float32

n/a

frozen

bytes

n/a

inet

string

n/a

int

int32

n/a

列表

array

n/a

map

map

n/a

set

array

n/a

smallint

int16

n/a

text

string

n/a

time

int64

n/a

timestamp

int64

io.debezium.time.Timestamp

timeuuid

string

io.debezium.data.Uuid

tinyint

int8

n/a

tuple

map

n/a

uuid

string

io.debezium.data.Uuid

varchar

string

n/a

varint

int64

n/a

duration

int64

io.debezium.time.NanoDuration (纳秒的持续时间值的近似表示)

待办事项:添加逻辑类型

任意精度整数类型

Cassandra 连接器根据 varint.handling.mode 连接器配置属性 的设置来处理 varint 值。

varint.handling.mode=long
表 1. varint.handling.mode=long 时的映射
Cassandra 类型 文字类型 语义类型

varint

INT64

n/a

varint.handling.mode=precise
表 2. decimal.handling.mode=precise 时的映射
Cassandra 类型 文字类型 语义类型

varint

BYTES

org.apache.kafka.connect.data.Decimal
scale schema 参数设置为零。

varint.handling.mode=string
表 3. varint.handling.mode=string 时的映射
Cassandra 类型 文字类型 语义类型

varint

STRING

n/a

Decimal 类型

Cassandra 连接器根据 decimal.handling.mode 连接器配置属性 的设置来处理 decimal 值。

decimal.handling.mode=double
表 4. decimal.handling.mode=double 时的映射
Cassandra 类型 文字类型 语义类型

decimal

FLOAT64

n/a

decimal.handling.mode=precise
表 5. decimal.handling.mode=precise 时的映射
Cassandra 类型 文字类型 语义类型

decimal

STRUCT

io.debezium.data.VariableScaleDecimal
包含一个具有两个字段的结构:类型为 INT32scale 字段,包含传输值的精度;类型为 BYTESvalue 字段,包含原始值(未缩放形式)。

decimal.handling.mode=string
表 6. decimal.handling.mode=string 时的映射
Cassandra 类型 文字类型 语义类型

decimal

STRING

n/a

如果默认数据类型转换不满足您的需求,您可以创建自定义转换器以供连接器使用。

出现问题时

配置和启动错误

如果配置无效或连接器无法使用指定的连接参数成功连接到 Cassandra,Cassandra 连接器将在启动时失败,在日志中报告错误或异常,并停止运行。在这种情况下,错误将包含有关问题的更多详细信息,并可能建议一种解决方法。在纠正配置后,可以重新启动连接器。

Cassandra 变得不可用

连接器运行后,如果 Cassandra 节点因任何原因变得不可用,连接器将失败并停止。在这种情况下,请在服务器可用后重新启动连接器。如果这发生在快照期间,它将从表的开头重新引导整个表。

Cassandra 连接器正常停止

如果 Cassandra 连接器被正常关闭,在停止进程之前,它将确保将 ChangeEventQueue 中的所有事件刷新到 Kafka。Cassandra 连接器跟踪每次将流式记录发送到 Kafka 时的文件名和偏移量。因此,当连接器重新启动时,它将从上次中断的地方继续。它通过查找目录中最旧的提交日志,开始处理该提交日志,跳过已读取的记录,直到找到尚未处理的最新记录来实现这一点。如果 Cassandra 连接器在快照期间停止,它将从该表继续,但会重新引导整个表。

Cassandra 连接器崩溃

如果 Cassandra 连接器意外崩溃,则 Cassandra 连接器可能在未记录最近处理的偏移量的情况下终止。在这种情况下,当连接器重新启动时,它将从最近记录的偏移量继续。这意味着可能会有重复(由于我们已经从 RF 获得重复,因此这很容易)。请注意,由于偏移量仅在记录成功发送到 Kafka 时才更新,因此在崩溃期间丢失 ChangeEventQueue 中未发出的数据是可以的,因为这些事件将被重新生成。

Kafka 变得不可用

当连接器生成更改事件时,它将使用 Kafka producer API 将这些事件发布到 Kafka。如果 Kafka 代理变得不可用(生产者遇到 TimeoutException),Cassandra 连接器将每秒尝试一次重新连接到代理,直到成功重试。

Cassandra 连接器停止一段时间

根据表的写入负载,当 Cassandra 连接器停止很长时间时,它可能会面临 cdc_total_space_in_mb 容量问题。一旦达到此上限,Cassandra 将停止接受对该表的写入;这意味着在运行 Cassandra 连接器时监视此空间非常重要。在最坏的情况下,如果发生这种情况,请完成以下步骤

  1. 关闭 Cassandra 连接器。

  2. 禁用表的 CDC,使其停止生成额外的写入。由于提交日志未经过滤,同一节点上其他启用 CDC 的表的写入仍可能影响提交日志文件生成。

  3. 从偏移量文件中删除已记录的偏移量

  4. 在增加容量或目录使用的空间得到控制后,重新启动连接器,使其重新引导表。

Cassandra 表 CDC 被启用,然后暂时禁用,之后再次启用

如果 Cassandra 表暂时禁用 CDC,然后在一段时间后重新启用它,则必须重新引导。要重新引导单个表,可以手动删除 snapshot_offset.properties 文件中与该表对应的已记录偏移量行。

部署连接器

Cassandra 连接器应部署在 Cassandra 集群的每个 Cassandra 节点上。Cassandra 连接器 Jar 文件需要一个 CDC 配置文件(.properties 文件)。有关参考,请参阅 配置示例

配置示例

以下是一个用于在本地运行和测试 Cassandra 连接器的 .properties 配置文件的示例

connector.name=test_connector
commit.log.relocation.dir=/Users/test_user/debezium-connector-cassandra/test_dir/relocation/
http.port=8000

cassandra.config=/usr/local/etc/cassandra/cassandra.yaml
cassandra.hosts=127.0.0.1
cassandra.port=9042

kafka.producer.bootstrap.servers=127.0.0.1:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_prefix

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: https://:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: https://:8081

offset.backing.store.dir=/Users/test_user/debezium-connector-cassandra/test_dir/

snapshot.consistency=ONE
snapshot.mode=ALWAYS

latest.commit.log.only=true

连接配置

Cassanadra 连接器使用 Cassandra 驱动程序配置与 Cassanadra 的连接,该连接必须使用单独的 application.conf 文件提供。您可以在此处找到驱动程序配置的完整参考 这里,下面是一个示例

datastax-java-driver {
  basic {
    request.timeout = 20 seconds
    contact-points = [ "spark-master-1:9042" ]
    load-balancing-policy {
      local-datacenter = "dc1"
    }
  }
  advanced {
    auth-provider {
      class = PlainTextAuthProvider
      username = user
      password = pass
    }
    ssl-engine-factory {
     ...
    }
  }
}

为了让 Debezium 连接器读取/使用此应用程序配置文件,它必须在连接器属性文件中按如下方式设置

cassandra.driver.config.file=/path/to/application/configuration.conf

监控

Cassandra 连接器内置了对 JMX 指标的支持。Cassandra 驱动程序还发布了许多有关驱动程序活动的指标,这些指标可以通过 JMX 进行监视。连接器有两种类型的指标。

  • 快照指标 提供有关连接器执行快照期间操作的信息。

  • 流式指标 提供有关连接器捕获更改并流式传输更改事件记录时操作的信息。

快照指标

MBeandebezium.cassandra:type=connector-metrics,context=snapshot,server=<topic.prefix>

除非快照操作正在活动中,或者自上次连接器启动以来发生过快照,否则快照指标不会暴露。

下表列出了可用的快照指标。

属性名称 Type 描述

TotalTableCount

int

包含在快照中的表总数。

RemainingTableCount

int

快照尚未复制的表数。

SnapshotRunning

boolean

快照是否已启动。

SnapshotAborted

boolean

快照是否被中止。

SnapshotCompleted

boolean

快照是否已完成。

SnapshotDurationInSeconds

long

快照至今所花费的总秒数,即使未完成。

RowsScanned

Map<String, Long>

包含快照中每个表扫描的行数的映射。表在处理过程中被增量地添加到 Map 中。每扫描 10,000 行和完成一个表时更新。

流式指标

MBeandebezium.cassandra:type=connector-metrics,context=streaming,server=<topic.prefix>

下表列出了可用的流式传输指标。

属性名称 Type 描述

CommitLogFilename

string

连接器最近读取的提交日志文件名。

CommitLogPosition

long

连接器已读取的提交日志中的最近位置(以字节为单位)。

NumberOfProcessedMutations

long

已处理的变异数量。

NumberOfUnrecoverableErrors

long

处理提交日志时不可恢复错误的数量。

连接器属性

属性

Default (默认值)

描述

INITIAL

指定 Cassandra 连接器代理启动时执行快照(例如,初始同步)的标准。必须是 'INITIAL'、'ALWAYS' 或 'NEVER' 之一。默认快照模式为 'INITIAL'。

ALL

指定用于快照查询的 {@link ConsistencyLevel}。

8000

HTTP 服务器用于 ping、健康检查和构建信息的端口

无默认值

Cassandra 节点使用的 YAML 配置文件绝对路径。

application.conf

Cassandra 驱动程序配置文件路径

false

仅适用于 Cassandra 4,如果设置为 true,Cassandra 连接器代理将通过监视提交日志索引文件的更新来增量读取提交日志,并以由 commit.log.marked.complete.poll.interval.ms 确定的频率实时流式传输数据。如果设置为 false,则 Cassandra 4 连接器等待提交日志文件被标记为完成后再进行处理。

10000

仅适用于 Cassandra 4,并且在 commit.log.real.time.processing.enabled 启用了实时流式传输时。此配置确定轮询提交日志索引文件以获取偏移量更新的频率。

无默认值

提交日志在处理后从 cdc_raw 目录迁移到的本地目录。

true

确定 CommitLogPostProcessor 是否应运行以从迁移目录中移动已处理的提交日志。如果禁用,提交日志将不会移出迁移目录。

10000

CommitLogPostProcessor 等待重新获取迁移目录中所有已处理提交日志的时间。

io.debezium.connector.cassandra.BlackHoleCommitLogTransfer

CommitLogPostProcessor 用于将已处理的提交日志从迁移目录中移出的类。内置的传输类是 BlackHoleCommitLogTransfer,它只是从迁移目录中删除所有已处理的提交日志。如果需要,用户应实现自己的自定义提交日志传输类。

false

确定 CommitLogProcessor 是否应重新处理错误提交日志。

COMMITLOG_FILE

指定如何确保更改事件的顺序。每个选项都表示用于哈希的属性。共享相同哈希值的事件将保持其顺序。建议使用 'PARTITION_VALUES' 将哈希策略与 Kafka 中的消息对齐。必须是 'COMMITLOG_FILE' 或 'PARTITION_VALUES' 之一。

无默认值

枚举连接器可以使用*自定义转换器*的符号名称的逗号分隔列表。例如:

isbn

您必须设置 converters 属性才能启用连接器使用自定义转换器。

对于为连接器配置的每个转换器,您还必须添加一个 .type 属性,该属性指定实现转换器接口的类的完全限定名称。.type 属性使用以下格式:

<converterSymbolicName>.type

For example, (例如,)

isbn.type: io.debezium.test.IsbnConverter

如果您想进一步控制已配置转换器的行为,您可以添加一个或多个配置参数来传递值给转换器。要将任何其他配置参数与转换器关联,请在参数名称前加上转换器的符号名称。例如:

isbn.schema.name: io.debezium.cassandra.type.Isbn

无默认值

用于存储偏移量跟踪文件的目录。

0

在提交偏移量之前等待的最短时间。默认值 0 表示偏移量将在每次触发时刷新。

100

在需要将偏移量刷新到磁盘之前允许处理的最大记录数。此配置仅在 offset_flush_interval_ms != 0 时有效。

8192

正整数值,指定从提交日志读取的更改事件在写入 Kafka 之前放入的阻塞队列的最大大小。此队列可以为提交日志读取器提供反压,例如,当写入 Kafka 速度较慢或 Kafka 不可用时。出现在队列中的事件不包含在此连接器定期记录的偏移量中。默认为 8192,并且应始终大于 max.batch.size 属性中指定的 max batch size。队列持有反序列化记录直到它们被转换为 Kafka Connect struct 并发射到 Kafka 的容量。

2048

每次要出队的更改事件的最大数量。

0

一个长整数值,指定阻塞队列的最大字节容量。默认情况下,阻塞队列没有容量限制。要指定队列可以消耗的字节数,请将此属性设置为一个正长整型值。
如果同时设置了 max.queue.size,当队列大小达到任一属性指定的限制时,写入队列将被阻塞。例如,如果您设置了 max.queue.size=1000max.queue.size.in.bytes=5000,那么当队列包含 1000 条记录后,或者当队列中的记录总量达到 5000 字节后,写入队列将被阻塞。

1000

正整数值,指定提交日志处理器在每次迭代中等待新更改事件出现在队列中的毫秒数。默认为 1000 毫秒,或 1 秒。

10000

正整数值,指定模式处理器在刷新缓存的 Cassandra 表模式之前等待的毫秒数。

10000

在每次轮询之前等待的最大时间,然后重试。

10000

正整数值,指定快照处理器在重新扫描表以查找新启用的 CDC 表之前等待的毫秒数。默认为 10000 毫秒,或 10 秒。

false

删除事件是否应该有一个后续的墓碑事件(true)或不(false)。需要注意的是,在 Cassandra 中,两个具有相同键的事件可能正在更新给定表的不同列。因此,如果这些记录尚未被消费者消费,可能会导致在压缩期间丢失记录。换句话说,如果您启用了 Kafka 压缩,请不要将此设置为 true。

无默认值

字段的完全限定名称的逗号分隔列表,这些字段应从更改事件消息值中排除。字段的完全限定名称为 keyspace_name>.<field_name>.<nested_field_name> 格式。

1

更改事件队列和队列处理器的数量。默认为 1。

t

在流式传输期间将被跳过的操作类型的逗号分隔列表。操作包括:c 表示插入/创建,u 表示更新,d 表示删除,t 表示截断,none 表示不跳过任何操作。默认情况下,截断操作被跳过(此连接器不发出)。

io.debezium.schema.SchemaTopicNamingStrategy

用于确定数据更改、schema 更改、事务、心跳事件等的*主题名称*的 TopicNamingStrategy 类的名称,默认为 SchemaTopicNamingStrategy

.

指定主题名称的分隔符,默认为 .

无默认值

将用于所有主题的名称前缀。

请勿更改此属性的值。如果您更改名称值,重新启动后,连接器将不会继续将事件发送到原始主题,而是会将后续事件发送到名称基于新值的那些主题。连接器也无法恢复其数据库模式历史主题。

10000

用于在有界并发哈希映射中保存主题名称的大小。此缓存有助于确定与给定数据集合对应的主题名称。

__debezium-heartbeat

控制连接器发送心跳消息的主题的名称。主题名称的模式如下

topic.heartbeat.prefix.topic.prefix

例如,如果数据库服务器名称或主题前缀是 fulfillment,则默认主题名称是 __debezium-heartbeat.fulfillment

long

指定 varint 列在更改事件中应如何表示。可能的设置是

long(默认)使用 Java 的 long 表示值,它可能不提供精度,但易于在消费者中使用。

precise 使用 java.math.BigDecimal 来表示值,这些值在更改事件中使用二进制表示和 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 类型进行编码。

string 将值编码为格式化的字符串,易于消费。

double

指定 decimal 列在更改事件中应如何表示。可能的设置是

double(默认)使用 Java 的 double 表示值,它可能不提供精度,但易于在消费者中使用。

precise 使用 java.math.BigDecimal 来表示值,这些值在更改事件中使用二进制表示和 Kafka Connect 的 org.apache.kafka.connect.data.VariableScaleDecimal 类型进行编码。

string 将值编码为格式化的字符串,易于消费。

none

指定如何调整 schema 名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

none

指定如何调整字段名称以兼容连接器使用的消息转换器。可能设置:

  • none 不进行任何调整。

  • avro 用下划线替换 Avro 类型名称中不能使用的字符。

  • avro_unicode 用对应的 Unicode(如 _uxxxx)替换 Avro 类型名称中不能使用的下划线或字符。注意:_ 像 Java 中的反斜杠一样是一个转义序列。

有关更多详细信息,请参阅 Avro 命名

无默认值

自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加到常规名称的末尾。每个键将代表 MBean 对象名称的标签,其对应的值将是该标签的值。例如:k1=v1,k2=v2

-1

指定连接器在操作因可重试错误(如连接错误)而失败后如何响应。
设置以下选项之一:

-1

无限制。连接器将自动重新启动,并重试操作,无论先前的失败次数如何。

0

禁用。连接器将立即失败,并且永远不会重试操作。需要用户干预才能重新启动连接器。

> 0

连接器将自动重新启动,直到达到指定的最大重试次数。下一次失败后,连接器将停止,需要用户干预才能重新启动它。

true

此属性指定 Debezium 是否将具有 __debezium.context. 前缀的上下文标头添加到其发出的消息中。OpenLineage 集成需要这些标头,并提供使下游处理系统能够跟踪和识别更改事件源的元数据。

该属性添加了以下头:

__debezium.context.connectorLogicalName

Debezium 连接器的逻辑名称。

__debezium.context.taskId

连接器任务的唯一标识符。

__debezium.context.connectorName

Debezium 连接器的名称。

如果 Cassandra 代理使用 SSL 连接到 Cassandra 节点,则需要 SSL 配置文件。以下示例显示了如何编写 SSL 配置文件

keyStore.location=/var/private/ssl/cassandra.keystore.jks
keyStore.password=cassandra
keyStore.type=JKS
trustStore.location=/var/private/ssl/cassandra.truststore.jks
trustStore.password=cassandra
trustStore.type=JKS
keyManager.algorithm=SunX509
trustManager.algorithm=SunX509
cipherSuites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

cipherSuites 字段不是强制性的,它只是允许您添加一个(或多个)不存在的密码。trustStore.type 和 keyStore.type 的默认值为 JKS。keyManager.algorithm 和 trustManager.algorithm 的默认值为 SunX509。

连接器还支持在创建 Kafka 生产者时使用的直通配置属性。具体来说,所有以 kafka.producer. 前缀开头的连接器配置属性(不带前缀)在创建将事件写入 Kafka 的 Kafka 生产者时使用。

例如,以下连接器配置属性可用于 保护到 Kafka 代理的连接

kafka.producer.security.protocol=SSL
kafka.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.producer.ssl.keystore.password=test1234
kafka.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.producer.ssl.truststore.password=test1234
kafka.producer.ssl.key.password=test1234
kafka.consumer.security.protocol=SSL
kafka.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
kafka.consumer.ssl.keystore.password=test1234
kafka.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
kafka.consumer.ssl.truststore.password=test1234
kafka.consumer.ssl.key.password=test1234

请务必查阅 Kafka 文档 以获取 Kafka 生产者的所有配置属性。

连接器支持以下 Kafka Connect 转换器进行键/值序列化

io.confluent.connect.avro.AvroConverter
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
com.blueapron.connect.protobuf.ProtobufConverter