Debezium 操作员
- 概述
- 使用 Helm 部署 Debezium Operator
- 跟踪 Operator 部署的进度
- DebeziumServer 自定义资源
- DebeziumServer 规范
- DebeziumServer 架构参考
- DebeziumServerStatus 架构参考
- Condition 架构参考
- DebeziumServerSpec 架构参考
- Sink 架构参考
- Source 架构参考
- Format 架构参考
- FormatType 架构参考
- Quarkus 架构参考
- Runtime 架构参考
- RuntimeApi 架构参考
- RuntimeStorage 架构参考
- RuntimeEnvironment 架构参考
- JmxConfig 架构参考
- JmxAuthentication 架构参考
- Templates 架构参考
- Metrics 架构参考
- JmxExporter 架构参考
- Transformation 架构参考
- Predicate 架构参考
- ConfigMapOffsetStore 架构参考
- ContainerEnvVar 架构参考
- ContainerTemplate 架构参考
- CustomStore 架构参考
- DataStorage 架构参考
- FileOffsetStore 架构参考
- FileSchemaHistoryStore 架构参考
- InMemoryOffsetStore 架构参考
- InMemorySchemaHistoryStore 架构参考
- JdbcOffsetStore 架构参考
- JdbcOffsetTableConfig 架构参考
- JdbcSchemaHistoryStore 架构参考
- JdbcSchemaHistoryTableConfig 架构参考
- KafkaOffsetStore 架构参考
- KafkaSchemaHistoryStore 架构参考
- MetadataTemplate 架构参考
- Offset 架构参考
- PodTemplate 架构参考
- Probe 架构参考
- Probes 架构参考
- RedisOffsetStore 架构参考
- RedisSchemaHistoryStore 架构参考
- RedisStoreWaitConfig 架构参考
- SchemaHistory 架构参考
- 完整的 Debezium Server 配置示例
概述
Debezium Operator 是一个 Kubernetes Operator,它在 Kubernetes 和 OpenShift 平台提供 Debezium Server 实例的自动化部署和管理。该 Operator 实现了标准的 Kubernetes Operator 模式,将用户定义的 DebeziumServer 自定义资源转化为正在运行的变更数据捕获 (CDC) 管道,将数据库更改流式传输到各种 Sink 系统。
使用 Helm 部署 Debezium Operator
Helm 是安装 Operator 的首选方法。Operator 可在 Debezium Helm Chart Repository 中找到。您可以将 Operator 安装到现有命名空间,也可以安装到新命名空间。
-
输入以下命令创建专用命名空间
kubectl create namespace debezium -
添加 Debezium charts 仓库并通过输入以下命令安装 Operator
helm repo add debezium https://charts.debezium.io helm install my-debezium-operator debezium/debezium-operator --version 3.2.0-final -n debezium由于版本问题,目前您必须指定一个版本号,即使您只想安装最新的稳定版本。
您可以在 Debezium Helm Chart Repository 中找到 Operator 的可用版本。
跟踪 Operator 部署的进度
您可以运行命令来监控部署进度并查看 Operator 日志。
-
要监控部署进度,请运行以下命令
kubectl get pod -n debezium --watch -
要监控 Operator 日志,请输入以下命令
kubectl logs deployment/debezium-operator -n debezium -f
Operator 运行后,它会监听新的自定义资源,并基于这些自定义资源创建 Debezium Server 实例。
DebeziumServer 自定义资源
您可以通过在 DebeziumServer 自定义资源中声明属性来指定 Debezium Server 实例的配置。
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
name: my-debezium-server
spec:
image: String
version: String
runtime:
environment:
vars:
- name: String
value: String
from: # EnvFromSource array
- sourceName: String
# add other EnvFromSource fields as needed
storage:
data:
type: persistent | ephemeral
claimName: String # required if type is "persistent"
external: # Volume array
- name: String
# add other Volume fields as needed
jmx:
enabled: boolean
port: int # defaults to 1099
templates:
container:
resources: ResourceRequirements
securityContext: SecurityContext
pod:
metadata:
annotations: # Map<String, String>
key: value
labels: # Map<String, String>
key: value
imagePullSecrets: # List
- name: String
affinity: Affinity
securityContext: PodSecurityContext
quarkus:
config:
# quarkus properties
format:
value:
type: String
config:
# format properties
key:
type: String
config:
# format properties
header:
type: String
config:
# format properties
transforms:
- type: String
predicate: String
negate: Boolean
config:
# transformation properties
predicates:
name:
type: String
config:
# predicate properties
sink:
type: String
config:
# sink properties
source:
class: String
config:
# source connector properties
DebeziumServer 规范
DebeziumServer 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Debezium Server 的规范 |
||
无默认值 |
Debezium Server 实例的状态。 |
DebeziumServerStatus 架构参考
使用在:DebeziumServer
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
状态条件的列表 |
||
Long |
0 |
最后观测到的代数 |
Condition 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
条件的状态,可以是 True、False 或 Unknown。 |
|
String |
无默认值 |
人类可读的消息,指示有关条件最后一次转换的详细信息。 |
|
String |
无默认值 |
条件的唯一标识符。 |
DebeziumServerSpec 架构参考
使用在:DebeziumServer
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于 Debezium Server 容器的镜像。此属性优先于 version。 |
|
String |
与 operator 相同 |
要使用的 Debezium Server 版本。 |
|
无默认值 |
Sink 配置。 |
||
无默认值 |
Debezium 源连接器配置。 |
||
无默认值 |
消息输出格式配置。 |
||
无默认值 |
传递给 Debezium Server 进程的 Quarkus 配置。 |
||
无默认值 |
指定您是否可以修改 Debezium Server 运行时的各种方面。 |
||
无默认值 |
此 Debezium Server 实例使用的单消息转换。 |
||
无默认值 |
此 Debezium Server 实例使用的谓词。 |
Sink 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
此 Debezium Server 实例识别的 Sink 类型。 |
|
Map |
无默认值 |
Sink 配置属性。 |
Source 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
源连接器 Java 类的完全限定名。 |
|
无默认值 |
Offset 存储配置 |
||
无默认值 |
Schema history 存储配置 |
||
Map |
无默认值 |
Source 连接器配置属性。 |
FormatType 架构参考
使用在:Format
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
json |
Debezium Server 识别的格式类型。 |
|
Map |
无默认值 |
格式配置属性。 |
Runtime 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
API 配置 |
||
无默认值 |
存储配置 |
||
无默认值 |
此 Debezium Server 实例使用的附加环境变量。 |
||
无默认值 |
JMX 配置。 |
||
无默认值 |
Debezium Server 资源模板。 |
||
String |
无默认值 |
用于运行 Debezium Server pod 的现有服务帐户 |
|
无默认值 |
Metrics 配置 |
RuntimeApi 架构参考
使用在:Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
无默认值 |
是否应为此 Debezium Server 实例启用 API |
|
int |
8080 |
用于公开 API 的 k8s 服务的端口号 |
RuntimeStorage 架构参考
使用在:Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
此 Debezium Server 实例使用的文件存储配置。 |
||
无默认值 |
挂载到 /debezium/external 的附加卷 |
RuntimeEnvironment 架构参考
使用在:Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
应用于容器的环境变量。 |
||
无默认值 |
从容器中的 ConfigMaps 或 Secrets 设置的附加环境变量。 |
JmxConfig 架构参考
使用在:Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
是否应为此 Debezium Server 实例启用 JMX。 |
|
int |
1099 |
JMX 端口。 |
|
无默认值 |
JMX 认证配置。 |
JmxAuthentication 架构参考
使用在:JmxConfig
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
是否应为此 Debezium Server 实例启用 JMX 认证。 |
|
String |
无默认值 |
提供凭证文件的 Secret |
|
String |
jmxremote.access |
JMX 访问文件名和 Secret 密钥 |
|
String |
jmxremote.password |
JMX 密码文件名和 Secret 密钥 |
Templates 架构参考
使用在:Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
容器模板 |
||
无默认值 |
Pod 模板。 |
||
无默认值 |
当未指定显式声明时,用于数据卷的 PVC 模板。 |
JmxExporter 架构参考
使用在:Metrics
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
无默认值 |
启用 JMX Prometheus exporter |
|
ConfigMapKeySelector |
无默认值 |
Config map key 引用,其值将用作配置文件 |
Transformation 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现转换的 Java 类的完全限定名。 |
|
Map |
无默认值 |
特定于转换的配置属性。 |
|
String |
无默认值 |
应用于此转换的谓词的名称。 |
|
boolean |
false |
确定所应用谓词的结果是否将被否定。 |
Predicate 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现谓词的 Java 类的完全限定名。 |
|
Map |
无默认值 |
谓词配置属性。 |
ConfigMapOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset config map 的名称 |
|
Map |
无默认值 |
附加的存储配置属性。 |
ContainerEnvVar 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
环境变量的名称。 |
|
String |
无默认值 |
环境变量的值。 |
ContainerTemplate 架构参考
使用在:Templates
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
CPU 和内存资源要求。 |
||
无默认值 |
容器安全上下文。 |
||
无默认值 |
容器探针配置。 |
CustomStore 架构参考
使用在:Offset, SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现存储的 Java 类的完全限定名。 |
|
Map |
无默认值 |
存储配置属性。 |
DataStorage 架构参考
使用在:RuntimeStorage
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
ephemeral,persistent |
ephemeral |
存储类型。 |
|
String |
无默认值 |
用于持久化存储的持久卷声明的名称。 |
FileOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 文件的名称(相对于数据根目录) |
|
Map |
无默认值 |
附加的存储配置属性。 |
FileSchemaHistoryStore 架构参考
使用在:SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 文件的名称(相对于数据根目录) |
|
Map |
无默认值 |
附加的存储配置属性。 |
InMemoryOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
附加的存储配置属性。 |
InMemorySchemaHistoryStore 架构参考
使用在:SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
附加的存储配置属性。 |
JdbcOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Offset 表的配置 |
||
String |
无默认值 |
JDBC 连接 URL |
|
String |
无默认值 |
连接到存储数据库时使用的用户名 |
|
String |
无默认值 |
连接到存储数据库时使用的密码 |
|
long |
无默认值 |
连接失败时的重试延迟(以毫秒为单位) |
|
int |
无默认值 |
连接失败的最大重试次数 |
|
Map |
无默认值 |
附加的存储配置属性。 |
JdbcOffsetTableConfig 架构参考
使用在:JdbcOffsetStore
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 表的名称 |
|
String |
无默认值 |
用于创建 Offset 表的 DDL 语句 |
|
String |
无默认值 |
用于从 Offset 表中选择的语句 |
|
String |
无默认值 |
用于向 Offset 表插入的语句 |
|
String |
无默认值 |
用于更新 Offset 表的语句 |
JdbcSchemaHistoryStore 架构参考
使用在:SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Offset 表的配置 |
||
String |
无默认值 |
JDBC 连接 URL |
|
String |
无默认值 |
连接到存储数据库时使用的用户名 |
|
String |
无默认值 |
连接到存储数据库时使用的密码 |
|
long |
无默认值 |
连接失败时的重试延迟(以毫秒为单位) |
|
int |
无默认值 |
连接失败的最大重试次数 |
|
Map |
无默认值 |
附加的存储配置属性。 |
JdbcSchemaHistoryTableConfig 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 表的名称 |
|
String |
无默认值 |
用于创建 Schema history 表的 DDL 语句 |
|
String |
无默认值 |
用于从 Schema history 表中选择的语句 |
|
String |
无默认值 |
用于向 Schema history 表插入的语句 |
|
String |
无默认值 |
用于检查 Schema history 表中是否存在某些数据的语句 |
KafkaOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
附加的 Kafka 客户端属性。 |
|
String |
无默认值 |
连接器用于建立到 Kafka 集群的初始连接的主机/端口对列表 |
|
String |
无默认值 |
要存储 Offset 的 Kafka Topic 名称 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的分区数 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的副本因子 |
|
Map |
无默认值 |
附加的存储配置属性。 |
KafkaSchemaHistoryStore 架构参考
使用在:SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
连接器用于建立到 Kafka 集群的初始连接的主机/端口对列表 |
|
String |
无默认值 |
要存储 Offset 的 Kafka Topic 名称 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的分区数 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的副本因子 |
|
Map |
无默认值 |
附加的存储配置属性。 |
MetadataTemplate 架构参考
使用在:PodTemplate
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map<String, String> |
无默认值 |
添加到 Kubernetes 资源的标签 |
|
Map<String, String> |
无默认值 |
添加到 Kubernetes 资源的注解 |
Offset 架构参考
使用在:Source
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
文件后备 Offset 存储配置 |
||
无默认值 |
内存后备 Offset 存储配置 |
||
无默认值 |
Redis 后备 Offset 存储配置 |
||
无默认值 |
Kafka 后备存储配置 |
||
无默认值 |
JDBC 后备存储配置 |
||
无默认值 |
Config map 后备 Offset 存储配置 |
||
无默认值 |
任意 Offset 存储配置 |
||
long |
60000 |
尝试提交 Offset 的间隔 |
PodTemplate 架构参考
使用在:Templates
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
应用于资源的元数据。 |
||
无默认值 |
用于拉取此 Pod 使用的任何镜像的 Secret 的本地引用列表。 |
||
无默认值 |
Pod 亲和性规则 |
||
无默认值 |
Pod 级别的安全属性和容器设置 |
Probe 架构参考
使用在:Probes
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
int |
5 |
容器启动后开始进行探测的秒数。 |
|
int |
10 |
执行探测的频率(以秒为单位)。 |
|
int |
10 |
探测超时前的秒数。 |
|
int |
3 |
连续失败多少次后,整体检查才算失败。 |
RedisOffsetStore 架构参考
使用在:Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于连接的 Redis 主机:端口 |
|
String |
无默认值 |
Redis 用户名 |
|
String |
无默认值 |
Redis 密码 |
|
boolean |
false |
Redis 用户名 |
|
String |
无默认值 |
Redis 哈希键 |
|
无默认值 |
配置副本写入验证 |
||
Map |
无默认值 |
附加的存储配置属性。 |
RedisSchemaHistoryStore 架构参考
使用在:SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于连接的 Redis 主机:端口 |
|
String |
无默认值 |
Redis 用户名 |
|
String |
无默认值 |
Redis 密码 |
|
boolean |
false |
Redis 用户名 |
|
String |
无默认值 |
Redis 哈希键 |
|
无默认值 |
配置副本写入验证 |
||
Map |
无默认值 |
附加的存储配置属性。 |
RedisStoreWaitConfig 架构参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
当 Redis 带有副本时,此选项允许验证数据是否已写入副本 |
|
long |
1000 |
等待副本的超时时间(毫秒) |
|
boolean |
false |
启用等待副本的重试 |
|
long |
1000 |
等待副本的重试延迟 |
SchemaHistory 架构参考
使用在:Source
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
文件后备 Schema history 存储配置 |
||
无默认值 |
内存后备 Schema history 存储配置 |
||
无默认值 |
Redis 后备 Schema history 存储配置 |
||
无默认值 |
Kafka 后备 Schema history 存储配置 |
||
无默认值 |
JDBC 后备 Schema history 存储配置 |
||
无默认值 |
任意 Schema history 存储配置 |
||
Map |
无默认值 |
附加的通用 Schema history 存储配置属性。 |
完整的 Debezium Server 配置示例
以下示例显示了一个完整的 Debezium Server 自定义资源,其中包含了前面表格中的属性。
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
name: my-debezium-server
spec:
version: "3.2.0"
source:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.hostname: mysql-server
database.port: 3306
database.user: debezium
database.password: secret
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
offset:
type: kafka
config:
bootstrap.servers: kafka:9092
topic: debezium-offsets
schemaHistory:
type: kafka
config:
bootstrap.servers: kafka:9092
topic: debezium-schema-history
sink:
type: kafka
config:
bootstrap.servers: kafka:9092
format:
key:
type: json
value:
type: json
config:
schemas.enable: false
runtime:
jmx:
enabled: true
port: 1099
storage:
data:
type: persistent
claimName: debezium-data-pvc
metrics:
jmxExporter:
enabled: true
transforms:
- type: io.debezium.transforms.UnwrapFromEnvelope
config:
drop.tombstones: false
predicate: inventory-filter
negate: false
predicates:
inventory-filter:
type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
config:
pattern: "mysql.inventory.*"
quarkus:
config:
quarkus.log.level: INFO
quarkus.http.port: 8080