Debezium 操作员

概述

Debezium Operator 是一个 Kubernetes Operator,它在 Kubernetes 和 OpenShift 平台提供 Debezium Server 实例的自动化部署和管理。该 Operator 实现了标准的 Kubernetes Operator 模式,将用户定义的 DebeziumServer 自定义资源转化为正在运行的变更数据捕获 (CDC) 管道,将数据库更改流式传输到各种 Sink 系统。

使用 Helm 部署 Debezium Operator

Helm 是安装 Operator 的首选方法。Operator 可在 Debezium Helm Chart Repository 中找到。您可以将 Operator 安装到现有命名空间,也可以安装到新命名空间。

  1. 输入以下命令创建专用命名空间

    kubectl create namespace debezium
  2. 添加 Debezium charts 仓库并通过输入以下命令安装 Operator

    helm repo add debezium https://charts.debezium.io
    helm install my-debezium-operator debezium/debezium-operator --version 3.2.0-final -n debezium

    由于版本问题,目前您必须指定一个版本号,即使您只想安装最新的稳定版本。

    您可以在 Debezium Helm Chart Repository 中找到 Operator 的可用版本。

跟踪 Operator 部署的进度

您可以运行命令来监控部署进度并查看 Operator 日志。

过程
  • 要监控部署进度,请运行以下命令

    kubectl get pod -n debezium --watch
  • 要监控 Operator 日志,请输入以下命令

    kubectl logs deployment/debezium-operator -n debezium -f

Operator 运行后,它会监听新的自定义资源,并基于这些自定义资源创建 Debezium Server 实例。

DebeziumServer 自定义资源

您可以通过在 DebeziumServer 自定义资源中声明属性来指定 Debezium Server 实例的配置。

示例 1. Debezium Server 自定义资源的基本结构
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
  name: my-debezium-server
spec:
  image: String
  version: String
  runtime:
    environment:
      vars:
        - name: String
          value: String
      from:  # EnvFromSource array
        - sourceName: String
          # add other EnvFromSource fields as needed
    storage:
      data:
        type: persistent | ephemeral
        claimName: String  # required if type is "persistent"
      external:  # Volume array
        - name: String
          # add other Volume fields as needed
    jmx:
      enabled: boolean
      port: int  # defaults to 1099
    templates:
      container:
        resources: ResourceRequirements
        securityContext: SecurityContext
      pod:
        metadata:
          annotations:  # Map<String, String>
            key: value
          labels:  # Map<String, String>
            key: value
        imagePullSecrets:  # List
          - name: String
        affinity: Affinity
        securityContext: PodSecurityContext
  quarkus:
    config:
      # quarkus properties
      format:
        value:
          type: String
          config:
            # format properties
      key:
        type: String
        config:
          # format properties
      header:
        type: String
        config:
          # format properties
  transforms:
    - type: String
      predicate: String
      negate: Boolean
      config:
        # transformation properties
  predicates:
    name:
      type: String
      config:
        # predicate properties
  sink:
    type: String
    config:
      # sink properties
  source:
    class: String
    config:
      # source connector properties

DebeziumServer 规范

DebeziumServer 架构参考

表 1. DebeziumServer 属性
属性 Type Default (默认值) 描述

DebeziumServerSpec

无默认值

Debezium Server 的规范

DebeziumServerStatus

无默认值

Debezium Server 实例的状态。

DebeziumServerStatus 架构参考

使用在:DebeziumServer

表 2. DebeziumServerStatus 属性
属性 Type Default (默认值) 描述

List<Condition>

无默认值

状态条件的列表

Long

0

最后观测到的代数

Condition 架构参考

使用在:DebeziumServerStatus

表 3. Condition 属性
属性 Type Default (默认值) 描述

String

无默认值

条件的状态,可以是 True、False 或 Unknown。

String

无默认值

人类可读的消息,指示有关条件最后一次转换的详细信息。

String

无默认值

条件的唯一标识符。

DebeziumServerSpec 架构参考

使用在:DebeziumServer

表 4. DebeziumServerSpec 属性
属性 Type Default (默认值) 描述

String

无默认值

用于 Debezium Server 容器的镜像。此属性优先于 version。

String

与 operator 相同

要使用的 Debezium Server 版本。

Sink

无默认值

Sink 配置。

无默认值

Debezium 源连接器配置。

Format

无默认值

消息输出格式配置。

Quarkus

无默认值

传递给 Debezium Server 进程的 Quarkus 配置。

Runtime

无默认值

指定您是否可以修改 Debezium Server 运行时的各种方面。

List<Transformation>

无默认值

此 Debezium Server 实例使用的单消息转换。

Map<String, Predicate>

无默认值

此 Debezium Server 实例使用的谓词。

Sink 架构参考

使用在:DebeziumServerSpec

表 5. Sink 属性
属性 Type Default (默认值) 描述

String

无默认值

此 Debezium Server 实例识别的 Sink 类型。

Map

无默认值

Sink 配置属性。

Source 架构参考

使用在:DebeziumServerSpec

表 6. Source 属性
属性 Type Default (默认值) 描述

String

无默认值

源连接器 Java 类的完全限定名。

Offset

无默认值

Offset 存储配置

SchemaHistory

无默认值

Schema history 存储配置

Map

无默认值

Source 连接器配置属性。

Format 架构参考

使用在:DebeziumServerSpec

表 7. Format 属性
属性 Type Default (默认值) 描述

key

FormatType

无默认值

消息键格式配置。

FormatType

无默认值

消息值格式配置。

FormatType

无默认值

消息头格式配置。

FormatType 架构参考

使用在:Format

表 8. FormatType 属性
属性 Type Default (默认值) 描述

String

json

Debezium Server 识别的格式类型。

Map

无默认值

格式配置属性。

Quarkus 架构参考

使用在:DebeziumServerSpec

表 9. Quarkus 属性
属性 Type Default (默认值) 描述

Map

无默认值

Quarkus 配置属性。

Runtime 架构参考

使用在:DebeziumServerSpec

表 10. Runtime 属性
属性 Type Default (默认值) 描述

api

RuntimeApi

无默认值

API 配置

RuntimeStorage

无默认值

存储配置

RuntimeEnvironment

无默认值

此 Debezium Server 实例使用的附加环境变量。

jmx

JmxConfig

无默认值

JMX 配置。

Templates

无默认值

Debezium Server 资源模板。

String

无默认值

用于运行 Debezium Server pod 的现有服务帐户

Metrics

无默认值

Metrics 配置

RuntimeApi 架构参考

使用在:Runtime

表 11. RuntimeApi 属性
属性 Type Default (默认值) 描述

boolean

无默认值

是否应为此 Debezium Server 实例启用 API

int

8080

用于公开 API 的 k8s 服务的端口号

RuntimeStorage 架构参考

使用在:Runtime

表 12. RuntimeStorage 属性
属性 Type Default (默认值) 描述

DataStorage

无默认值

此 Debezium Server 实例使用的文件存储配置。

List<Volume>

无默认值

挂载到 /debezium/external 的附加卷

RuntimeEnvironment 架构参考

使用在:Runtime

表 13. RuntimeEnvironment 属性
属性 Type Default (默认值) 描述

List<ContainerEnvVar>

无默认值

应用于容器的环境变量。

List<EnvFromSource>

无默认值

从容器中的 ConfigMaps 或 Secrets 设置的附加环境变量。

JmxConfig 架构参考

使用在:Runtime

表 14. JmxConfig 属性
属性 Type Default (默认值) 描述

boolean

false

是否应为此 Debezium Server 实例启用 JMX。

int

1099

JMX 端口。

JmxAuthentication

无默认值

JMX 认证配置。

JmxAuthentication 架构参考

使用在:JmxConfig

表 15. JmxAuthentication 属性
属性 Type Default (默认值) 描述

boolean

false

是否应为此 Debezium Server 实例启用 JMX 认证。

String

无默认值

提供凭证文件的 Secret

String

jmxremote.access

JMX 访问文件名和 Secret 密钥

String

jmxremote.password

JMX 密码文件名和 Secret 密钥

Templates 架构参考

使用在:Runtime

表 16. Templates 属性
属性 Type Default (默认值) 描述

ContainerTemplate

无默认值

容器模板

pod

PodTemplate

无默认值

Pod 模板。

PersistentVolumeClaimSpec

无默认值

当未指定显式声明时,用于数据卷的 PVC 模板。

Metrics 架构参考

使用在:Runtime

表 17. Metrics 属性
属性 Type Default (默认值) 描述

JmxExporter

无默认值

Prometheus JMX exporter 配置

JmxExporter 架构参考

使用在:Metrics

表 18. JmxExporter 属性
属性 Type Default (默认值) 描述

boolean

无默认值

启用 JMX Prometheus exporter

ConfigMapKeySelector

无默认值

Config map key 引用,其值将用作配置文件

Transformation 架构参考

使用在:DebeziumServerSpec

表 19. Transformation 属性
属性 Type Default (默认值) 描述

String

无默认值

实现转换的 Java 类的完全限定名。

Map

无默认值

特定于转换的配置属性。

String

无默认值

应用于此转换的谓词的名称。

boolean

false

确定所应用谓词的结果是否将被否定。

Predicate 架构参考

使用在:DebeziumServerSpec

表 20. Predicate 属性
属性 Type Default (默认值) 描述

String

无默认值

实现谓词的 Java 类的完全限定名。

Map

无默认值

谓词配置属性。

ConfigMapOffsetStore 架构参考

使用在:Offset

表 21. ConfigMapOffsetStore 属性
属性 Type Default (默认值) 描述

String

无默认值

Offset config map 的名称

Map

无默认值

附加的存储配置属性。

ContainerEnvVar 架构参考

使用在:RuntimeEnvironment

表 22. ContainerEnvVar 属性
属性 Type Default (默认值) 描述

String

无默认值

环境变量的名称。

String

无默认值

环境变量的值。

ContainerTemplate 架构参考

使用在:Templates

表 23. ContainerTemplate 属性
属性 Type Default (默认值) 描述

ResourceRequirements

无默认值

CPU 和内存资源要求。

SecurityContext

无默认值

容器安全上下文。

Probes

无默认值

容器探针配置。

CustomStore 架构参考

使用在:Offset, SchemaHistory

表 24. CustomStore 属性
属性 Type Default (默认值) 描述

String

无默认值

实现存储的 Java 类的完全限定名。

Map

无默认值

存储配置属性。

DataStorage 架构参考

使用在:RuntimeStorage

表 25. DataStorage 属性
属性 Type Default (默认值) 描述

ephemeral,persistent

ephemeral

存储类型。

String

无默认值

用于持久化存储的持久卷声明的名称。

FileOffsetStore 架构参考

使用在:Offset

表 26. FileOffsetStore 属性
属性 Type Default (默认值) 描述

String

无默认值

Offset 文件的名称(相对于数据根目录)

Map

无默认值

附加的存储配置属性。

FileSchemaHistoryStore 架构参考

使用在:SchemaHistory

表 27. FileSchemaHistoryStore 属性
属性 Type Default (默认值) 描述

String

无默认值

Offset 文件的名称(相对于数据根目录)

Map

无默认值

附加的存储配置属性。

InMemoryOffsetStore 架构参考

使用在:Offset

表 28. InMemoryOffsetStore 属性
属性 Type Default (默认值) 描述

Map

无默认值

附加的存储配置属性。

InMemorySchemaHistoryStore 架构参考

使用在:SchemaHistory

表 29. InMemorySchemaHistoryStore 属性
属性 Type Default (默认值) 描述

Map

无默认值

附加的存储配置属性。

JdbcOffsetStore 架构参考

使用在:Offset

表 30. JdbcOffsetStore 属性
属性 Type Default (默认值) 描述

JdbcOffsetTableConfig

无默认值

Offset 表的配置

url

String

无默认值

JDBC 连接 URL

String

无默认值

连接到存储数据库时使用的用户名

String

无默认值

连接到存储数据库时使用的密码

long

无默认值

连接失败时的重试延迟(以毫秒为单位)

int

无默认值

连接失败的最大重试次数

Map

无默认值

附加的存储配置属性。

JdbcOffsetTableConfig 架构参考

使用在:JdbcOffsetStore

表 31. JdbcOffsetTableConfig 属性
属性 Type Default (默认值) 描述

String

无默认值

Offset 表的名称

ddl

String

无默认值

用于创建 Offset 表的 DDL 语句

String

无默认值

用于从 Offset 表中选择的语句

String

无默认值

用于向 Offset 表插入的语句

String

无默认值

用于更新 Offset 表的语句

JdbcSchemaHistoryStore 架构参考

使用在:SchemaHistory

表 32. JdbcSchemaHistoryStore 属性
属性 Type Default (默认值) 描述

JdbcSchemaHistoryTableConfig

无默认值

Offset 表的配置

url

String

无默认值

JDBC 连接 URL

String

无默认值

连接到存储数据库时使用的用户名

String

无默认值

连接到存储数据库时使用的密码

long

无默认值

连接失败时的重试延迟(以毫秒为单位)

int

无默认值

连接失败的最大重试次数

Map

无默认值

附加的存储配置属性。

JdbcSchemaHistoryTableConfig 架构参考

表 33. JdbcSchemaHistoryTableConfig 属性
属性 Type Default (默认值) 描述

String

无默认值

Offset 表的名称

ddl

String

无默认值

用于创建 Schema history 表的 DDL 语句

String

无默认值

用于从 Schema history 表中选择的语句

String

无默认值

用于向 Schema history 表插入的语句

String

无默认值

用于检查 Schema history 表中是否存在某些数据的语句

KafkaOffsetStore 架构参考

使用在:Offset

表 34. KafkaOffsetStore 属性
属性 Type Default (默认值) 描述

Map

无默认值

附加的 Kafka 客户端属性。

String

无默认值

连接器用于建立到 Kafka 集群的初始连接的主机/端口对列表

String

无默认值

要存储 Offset 的 Kafka Topic 名称

int

无默认值

创建 Offset 存储 Topic 时使用的分区数

int

无默认值

创建 Offset 存储 Topic 时使用的副本因子

Map

无默认值

附加的存储配置属性。

KafkaSchemaHistoryStore 架构参考

使用在:SchemaHistory

表 35. KafkaSchemaHistoryStore 属性
属性 Type Default (默认值) 描述

String

无默认值

连接器用于建立到 Kafka 集群的初始连接的主机/端口对列表

String

无默认值

要存储 Offset 的 Kafka Topic 名称

int

无默认值

创建 Offset 存储 Topic 时使用的分区数

int

无默认值

创建 Offset 存储 Topic 时使用的副本因子

Map

无默认值

附加的存储配置属性。

MetadataTemplate 架构参考

使用在:PodTemplate

表 36. MetadataTemplate 属性
属性 Type Default (默认值) 描述

Map<String, String>

无默认值

添加到 Kubernetes 资源的标签

Map<String, String>

无默认值

添加到 Kubernetes 资源的注解

Offset 架构参考

使用在:Source

表 37. Offset 属性
属性 Type Default (默认值) 描述

FileOffsetStore

无默认值

文件后备 Offset 存储配置

InMemoryOffsetStore

无默认值

内存后备 Offset 存储配置

RedisOffsetStore

无默认值

Redis 后备 Offset 存储配置

KafkaOffsetStore

无默认值

Kafka 后备存储配置

JdbcOffsetStore

无默认值

JDBC 后备存储配置

ConfigMapOffsetStore

无默认值

Config map 后备 Offset 存储配置

CustomStore

无默认值

任意 Offset 存储配置

long

60000

尝试提交 Offset 的间隔

PodTemplate 架构参考

使用在:Templates

表 38. PodTemplate 属性
属性 Type Default (默认值) 描述

MetadataTemplate

无默认值

应用于资源的元数据。

List<LocalObjectReference>

无默认值

用于拉取此 Pod 使用的任何镜像的 Secret 的本地引用列表。

Affinity

无默认值

Pod 亲和性规则

PodSecurityContext

无默认值

Pod 级别的安全属性和容器设置

Probe 架构参考

使用在:Probes

表 39. Probe 属性
属性 Type Default (默认值) 描述

int

5

容器启动后开始进行探测的秒数。

int

10

执行探测的频率(以秒为单位)。

int

10

探测超时前的秒数。

int

3

连续失败多少次后,整体检查才算失败。

Probes 架构参考

使用在:ContainerTemplate

表 40. Probes 属性
属性 Type Default (默认值) 描述

Probe

无默认值

应用于容器的就绪探测配置。

Probe

无默认值

应用于容器的存活探测配置。

RedisOffsetStore 架构参考

使用在:Offset

表 41. RedisOffsetStore 属性
属性 Type Default (默认值) 描述

String

无默认值

用于连接的 Redis 主机:端口

String

无默认值

Redis 用户名

String

无默认值

Redis 密码

boolean

false

Redis 用户名

key

String

无默认值

Redis 哈希键

RedisStoreWaitConfig

无默认值

配置副本写入验证

Map

无默认值

附加的存储配置属性。

RedisSchemaHistoryStore 架构参考

使用在:SchemaHistory

表 42. RedisSchemaHistoryStore 属性
属性 Type Default (默认值) 描述

String

无默认值

用于连接的 Redis 主机:端口

String

无默认值

Redis 用户名

String

无默认值

Redis 密码

boolean

false

Redis 用户名

key

String

无默认值

Redis 哈希键

RedisStoreWaitConfig

无默认值

配置副本写入验证

Map

无默认值

附加的存储配置属性。

RedisStoreWaitConfig 架构参考

表 43. RedisStoreWaitConfig 属性
属性 Type Default (默认值) 描述

boolean

false

当 Redis 带有副本时,此选项允许验证数据是否已写入副本

long

1000

等待副本的超时时间(毫秒)

boolean

false

启用等待副本的重试

long

1000

等待副本的重试延迟

SchemaHistory 架构参考

使用在:Source

表 44. SchemaHistory 属性
属性 Type Default (默认值) 描述

FileSchemaHistoryStore

无默认值

文件后备 Schema history 存储配置

InMemorySchemaHistoryStore

无默认值

内存后备 Schema history 存储配置

RedisSchemaHistoryStore

无默认值

Redis 后备 Schema history 存储配置

KafkaSchemaHistoryStore

无默认值

Kafka 后备 Schema history 存储配置

JdbcSchemaHistoryStore

无默认值

JDBC 后备 Schema history 存储配置

CustomStore

无默认值

任意 Schema history 存储配置

Map

无默认值

附加的通用 Schema history 存储配置属性。

完整的 Debezium Server 配置示例

以下示例显示了一个完整的 Debezium Server 自定义资源,其中包含了前面表格中的属性。

示例 2. 完整的 Debezium server 自定义资源
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
  name: my-debezium-server
spec:
  version: "3.2.0"
  source:
    class: io.debezium.connector.mysql.MySqlConnector
    config:
      database.hostname: mysql-server
      database.port: 3306
      database.user: debezium
      database.password: secret
      database.server.id: 184054
      topic.prefix: mysql
      database.include.list: inventory
    offset:
      type: kafka
      config:
        bootstrap.servers: kafka:9092
        topic: debezium-offsets
    schemaHistory:
      type: kafka
      config:
        bootstrap.servers: kafka:9092
        topic: debezium-schema-history
  sink:
    type: kafka
    config:
      bootstrap.servers: kafka:9092
  format:
    key:
      type: json
    value:
      type: json
      config:
        schemas.enable: false
  runtime:
    jmx:
      enabled: true
      port: 1099
    storage:
      data:
        type: persistent
        claimName: debezium-data-pvc
    metrics:
      jmxExporter:
        enabled: true
  transforms:
    - type: io.debezium.transforms.UnwrapFromEnvelope
      config:
        drop.tombstones: false
      predicate: inventory-filter
      negate: false
  predicates:
    inventory-filter:
      type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
      config:
        pattern: "mysql.inventory.*"
  quarkus:
    config:
      quarkus.log.level: INFO
      quarkus.http.port: 8080