Debezium 操作员
- 概述
- 使用 Helm 部署 Debezium Operator
- 跟踪 Operator 部署进度
- DebeziumServer 自定义资源
- DebeziumServer 规范
- DebeziumServer Schema 参考
- DebeziumServerStatus Schema 参考
- Condition Schema 参考
- DebeziumServerSpec Schema 参考
- Sink Schema 参考
- Source Schema 参考
- Format Schema 参考
- FormatType Schema 参考
- Quarkus Schema 参考
- Runtime Schema 参考
- RuntimeApi Schema 参考
- RuntimeStorage Schema 参考
- RuntimeEnvironment Schema 参考
- JmxConfig Schema 参考
- JmxAuthentication Schema 参考
- Templates Schema 参考
- Metrics Schema 参考
- JmxExporter Schema 参考
- Transformation Schema 参考
- Predicate Schema 参考
- ConfigMapOffsetStore Schema 参考
- ContainerEnvVar Schema 参考
- ContainerTemplate Schema 参考
- CustomStore Schema 参考
- DataStorage Schema 参考
- FileOffsetStore Schema 参考
- FileSchemaHistoryStore Schema 参考
- InMemoryOffsetStore Schema 参考
- InMemorySchemaHistoryStore Schema 参考
- JdbcOffsetStore Schema 参考
- JdbcOffsetTableConfig Schema 参考
- JdbcSchemaHistoryStore Schema 参考
- JdbcSchemaHistoryTableConfig Schema 参考
- KafkaOffsetStore Schema 参考
- KafkaSchemaHistoryStore Schema 参考
- MetadataTemplate Schema 参考
- Offset Schema 参考
- PodTemplate Schema 参考
- Probe Schema 参考
- Probes Schema 参考
- RedisOffsetStore Schema 参考
- RedisSchemaHistoryStore Schema 参考
- RedisStoreWaitConfig Schema 参考
- SchemaHistory Schema 参考
- 完整的 Debezium Server 配置示例
概述
Debezium Operator 是一个 Kubernetes Operator,它在 Kubernetes 和 OpenShift 平台上提供 Debezium Server 实例的自动化部署和管理。该 Operator 实现了标准的 Kubernetes Operator 模式,可以将用户定义的 DebeziumServer 自定义资源转化为运行中的变更数据捕获 (CDC) 流水线,将数据库更改流式传输到各种 sink 系统。
使用 Helm 部署 Debezium Operator
Helm 是安装 Operator 的首选方法。Operator 可在 Debezium Helm Chart 仓库 中找到。您可以将 Operator 安装到现有命名空间,也可以安装到一个新的命名空间。
-
运行以下命令以创建专用命名空间
kubectl create namespace debezium -
添加 Debezium charts 仓库并运行以下命令来安装 Operator
helm repo add debezium https://charts.debezium.io helm install my-debezium-operator debezium/debezium-operator --version 3.2.0-final -n debezium
您可以在 Debezium Helm Chart 仓库 中找到 Operator 的可用版本。
跟踪 Operator 部署进度
您可以运行命令来监视部署进度并查看 Operator 日志。
-
要监视部署进度,请运行以下命令
kubectl get pod -n debezium --watch -
要监视 Operator 日志,请运行以下命令
kubectl logs deployment/debezium-operator -n debezium -f
Operator 运行后,它会监视新的自定义资源,并根据这些自定义资源创建 Debezium Server 实例。
DebeziumServer 自定义资源
您可以通过在 DebeziumServer 自定义资源中声明属性来指定 Debezium Server 实例的配置。
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
name: my-debezium-server
spec:
image: String
version: String
runtime:
environment:
vars:
- name: String
value: String
from: # EnvFromSource array
- sourceName: String
# add other EnvFromSource fields as needed
storage:
data:
type: persistent | ephemeral
claimName: String # required if type is "persistent"
external: # Volume array
- name: String
# add other Volume fields as needed
jmx:
enabled: boolean
port: int # defaults to 1099
templates:
container:
resources: ResourceRequirements
securityContext: SecurityContext
pod:
metadata:
annotations: # Map<String, String>
key: value
labels: # Map<String, String>
key: value
imagePullSecrets: # List
- name: String
affinity: Affinity
securityContext: PodSecurityContext
quarkus:
config:
# quarkus properties
format:
value:
type: String
config:
# format properties
key:
type: String
config:
# format properties
header:
type: String
config:
# format properties
transforms:
- type: String
predicate: String
negate: Boolean
config:
# transformation properties
predicates:
name:
type: String
config:
# predicate properties
sink:
type: String
config:
# sink properties
source:
class: String
config:
# source connector properties
DebeziumServer 规范
DebeziumServer Schema 参考
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Debezium Server 的规范 |
||
无默认值 |
Debezium Server 实例的状态。 |
DebeziumServerStatus Schema 参考
在以下位置使用: DebeziumServer
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
状态条件列表 |
||
Long |
0 |
最新的观察到的代 |
Condition Schema 参考
在以下位置使用: DebeziumServerStatus
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
条件的状�?�,可以是 True、False 或 Unknown。 |
|
String |
无默认值 |
人类可读的消息,指示有关条件最后一次转换的详细信息。 |
|
String |
无默认值 |
条件的唯一标识符。 |
DebeziumServerSpec Schema 参考
在以下位置使用: DebeziumServer
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于 Debezium Server 容器的镜像。此属性优先于版本。 |
|
String |
与 Operator 相同 |
要使用的 Debezium Server 的版本。 |
|
无默认值 |
Sink 配置。 |
||
无默认值 |
Debezium 源连接器配置。 |
||
无默认值 |
消息输出格式配置。 |
||
无默认值 |
传递给 Debezium Server 进程的 Quarkus 配置。 |
||
无默认值 |
指定您是否可以修改 Debezium Server 运行时的各种方面。 |
||
无默认值 |
此 Debezium Server 实例使用的单消息转换。 |
||
无默认值 |
此 Debezium Server 实例使用的谓词。 |
Sink Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
此 Debezium Server 实例识别的 Sink 类型。 |
|
Map |
无默认值 |
Sink 配置属性。 |
Source Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
源连接器 Java 类的完全限定名。 |
|
无默认值 |
Offset 存储配置 |
||
无默认值 |
Schema history 存储配置 |
||
Map |
无默认值 |
Source 连接器配置属性。 |
Format Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
消息键格式配置。 |
||
无默认值 |
消息值格式配置。 |
||
无默认值 |
消息头格式配置。 |
FormatType Schema 参考
在以下位置使用: Format
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
json |
Debezium Server 识别的格式类型。 |
|
Map |
无默认值 |
格式配置属性。 |
Quarkus Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
Quarkus 配置属性。 |
Runtime Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
API 配置 |
||
无默认值 |
存储配置 |
||
无默认值 |
此 Debezium Server 实例使用的其他环境变量。 |
||
无默认值 |
JMX 配置。 |
||
无默认值 |
Debezium Server 资源模板。 |
||
String |
无默认值 |
用于运行 Debezium Server Pod 的现有服务帐户 |
|
无默认值 |
Metrics 配置 |
RuntimeApi Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
无默认值 |
是否为该 Debezium Server 实例启用 API |
|
int |
8080 |
用于公开 API 的 k8s 服务的端口号 |
RuntimeStorage Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
此 Debezium Server 实例使用的文件存储配置。 |
||
无默认值 |
挂载到 /debezium/external 的附加卷 |
RuntimeEnvironment Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
应用于容器的环境变量。 |
||
无默认值 |
从容器中的 ConfigMaps 或 Secrets 设置的附加环境变量。 |
JmxConfig Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
是否为该 Debezium Server 实例启用 JMX。 |
|
int |
1099 |
JMX 端口。 |
|
无默认值 |
JMX 认证配置。 |
JmxAuthentication Schema 参考
在以下位置使用: JmxConfig
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
是否为该 Debezium Server 实例启用 JMX 认证。 |
|
String |
无默认值 |
提供凭据文件的 Secret |
|
String |
jmxremote.access |
JMX 访问文件名和 Secret 密钥 |
|
String |
jmxremote.password |
JMX 密码文件名和 Secret 密钥 |
Templates Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
容器模板 |
||
无默认值 |
Pod 模板。 |
||
无默认值 |
如果未指定显式声明,则用于数据卷的 PVC 模板。 |
Metrics Schema 参考
在以下位置使用: Runtime
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Prometheus JMX exporter 配置 |
JmxExporter Schema 参考
在以下位置使用: Metrics
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
无默认值 |
启用 JMX Prometheus exporter |
|
ConfigMapKeySelector |
无默认值 |
Config map 键的引用,其值将用作配置文件 |
Transformation Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现转换的 Java 类的完全限定名。 |
|
Map |
无默认值 |
转换特定的配置属性。 |
|
String |
无默认值 |
将应用于此转换的谓词的名称。 |
|
boolean |
false |
确定应用的谓词结果是否将被否定。 |
Predicate Schema 参考
在以下位置使用: DebeziumServerSpec
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现谓词的 Java 类的完全限定名。 |
|
Map |
无默认值 |
谓词配置属性。 |
ConfigMapOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset ConfigMap 的名称 |
|
Map |
无默认值 |
其他存储配置属性。 |
ContainerEnvVar Schema 参考
在以下位置使用: RuntimeEnvironment
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
环境变量的名称。 |
|
String |
无默认值 |
环境变量的值。 |
ContainerTemplate Schema 参考
在以下位置使用: Templates
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
CPU 和内存资源要求。 |
||
无默认值 |
容器安全上下文。 |
||
无默认值 |
容器探测配置。 |
CustomStore Schema 参考
在以下位置使用: Offset, SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
实现存储的 Java 类的完全限定名。 |
|
Map |
无默认值 |
存储配置属性。 |
DataStorage Schema 参考
在以下位置使用: RuntimeStorage
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
ephemeral,persistent |
ephemeral |
存储类型。 |
|
String |
无默认值 |
持久存储的持久卷声明名称。 |
FileOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 文件的名称(相对于数据根目录) |
|
Map |
无默认值 |
其他存储配置属性。 |
FileSchemaHistoryStore Schema 参考
在以下位置使用: SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 文件的名称(相对于数据根目录) |
|
Map |
无默认值 |
其他存储配置属性。 |
InMemoryOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
其他存储配置属性。 |
InMemorySchemaHistoryStore Schema 参考
在以下位置使用: SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
其他存储配置属性。 |
JdbcOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Offset 表的配置 |
||
String |
无默认值 |
JDBC 连接 URL |
|
String |
无默认值 |
连接存储数据库使用的用户名 |
|
String |
无默认值 |
连接存储数据库使用的密码 |
|
long |
无默认值 |
连接失败时的重试延迟(以毫秒为单位) |
|
int |
无默认值 |
连接失败时的最大重试次数 |
|
Map |
无默认值 |
其他存储配置属性。 |
JdbcOffsetTableConfig Schema 参考
在以下位置使用: JdbcOffsetStore
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 表的名称 |
|
String |
无默认值 |
用于创建 Offset 表的 DDL 语句 |
|
String |
无默认值 |
用于从 Offset 表中选择的语句 |
|
String |
无默认值 |
用于向 Offset 表插入的语句 |
|
String |
无默认值 |
用于更新 Offset 表的语句 |
JdbcSchemaHistoryStore Schema 参考
在以下位置使用: SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
Offset 表的配置 |
||
String |
无默认值 |
JDBC 连接 URL |
|
String |
无默认值 |
连接存储数据库使用的用户名 |
|
String |
无默认值 |
连接存储数据库使用的密码 |
|
long |
无默认值 |
连接失败时的重试延迟(以毫秒为单位) |
|
int |
无默认值 |
连接失败时的最大重试次数 |
|
Map |
无默认值 |
其他存储配置属性。 |
JdbcSchemaHistoryTableConfig Schema 参考
在以下位置使用: JdbcSchemaHistoryStore
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
Offset 表的名称 |
|
String |
无默认值 |
用于创建 Schema History 表的 DDL 语句 |
|
String |
无默认值 |
用于从 Schema History 表中选择的语句 |
|
String |
无默认值 |
用于向 Schema History 表插入的语句 |
|
String |
无默认值 |
用于检查 Schema History 表中是否存在某些数据的语句 |
KafkaOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map |
无默认值 |
其他 Kafka 客户端属性。 |
|
String |
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表 |
|
String |
无默认值 |
用于存储 Offset 的 Kafka Topic 名称 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的分区数 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的副本因子 |
|
Map |
无默认值 |
其他存储配置属性。 |
KafkaSchemaHistoryStore Schema 参考
在以下位置使用: SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表 |
|
String |
无默认值 |
用于存储 Offset 的 Kafka Topic 名称 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的分区数 |
|
int |
无默认值 |
创建 Offset 存储 Topic 时使用的副本因子 |
|
Map |
无默认值 |
其他存储配置属性。 |
MetadataTemplate Schema 参考
在以下位置使用: PodTemplate
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
Map<String, String> |
无默认值 |
添加到 Kubernetes 资源的标签 |
|
Map<String, String> |
无默认值 |
添加到 Kubernetes 资源的注释 |
Offset Schema 参考
在以下位置使用: Source
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
文件后备 Offset 存储配置 |
||
无默认值 |
内存后备 Offset 存储配置 |
||
无默认值 |
Redis 后备 Offset 存储配置 |
||
无默认值 |
Kafka 后备存储配置 |
||
无默认值 |
JDBC 后备存储配置 |
||
无默认值 |
Config Map 后备 Offset 存储配置 |
||
无默认值 |
任意 Offset 存储配置 |
||
long |
60000 |
尝试提交 Offset 的间隔 |
PodTemplate Schema 参考
在以下位置使用: Templates
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
应用于资源的元数据。 |
||
无默认值 |
用于拉取此 Pod 使用的任何镜像的 Secret 的本地引用列表。 |
||
无默认值 |
Pod 亲和性规则 |
||
无默认值 |
Pod 级别的安全属性和容器设置 |
Probe Schema 参考
在以下位置使用: Probes
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
int |
5 |
容器启动后多长时间(秒)开始执行探测。 |
|
int |
10 |
每次执行探测的间隔(秒)。 |
|
int |
10 |
探测超时之前的秒数。 |
|
int |
3 |
连续失败次数,在此之后整个检查将失败。 |
Probes Schema 参考
在以下位置使用: ContainerTemplate
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
应用于容器的就绪探测配置。 |
||
无默认值 |
应用于容器的存活探测配置。 |
RedisOffsetStore Schema 参考
在以下位置使用: Offset
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于连接的 Redis 主机:端口 |
|
String |
无默认值 |
Redis 用户名 |
|
String |
无默认值 |
Redis 密码 |
|
boolean |
false |
Redis 用户名 |
|
String |
无默认值 |
Redis 哈希键 |
|
无默认值 |
配置副本写入的验证 |
||
Map |
无默认值 |
其他存储配置属性。 |
RedisSchemaHistoryStore Schema 参考
在以下位置使用: SchemaHistory
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
String |
无默认值 |
用于连接的 Redis 主机:端口 |
|
String |
无默认值 |
Redis 用户名 |
|
String |
无默认值 |
Redis 密码 |
|
boolean |
false |
Redis 用户名 |
|
String |
无默认值 |
Redis 哈希键 |
|
无默认值 |
配置副本写入的验证 |
||
Map |
无默认值 |
其他存储配置属性。 |
RedisStoreWaitConfig Schema 参考
在以下位置使用: RedisOffsetStore, RedisSchemaHistoryStore
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
boolean |
false |
如果 Redis 具有副本,这允许验证数据是否已写入副本 |
|
long |
1000 |
等待副本的超时时间(毫秒) |
|
boolean |
false |
等待副本时启用重试 |
|
long |
1000 |
等待时重试的延迟 |
SchemaHistory Schema 参考
在以下位置使用: Source
| 属性 | Type | Default (默认值) | 描述 |
|---|---|---|---|
无默认值 |
文件后备 Schema history 存储配置 |
||
无默认值 |
内存后备 Schema history 存储配置 |
||
无默认值 |
Redis 后备 Schema history 存储配置 |
||
无默认值 |
Kafka 后备 Schema history 存储配置 |
||
无默认值 |
JDBC 后备 Schema history 存储配置 |
||
无默认值 |
任意 Schema history 存储配置 |
||
Map |
无默认值 |
其他通用 Schema history 存储配置属性。 |
完整的 Debezium Server 配置示例
以下示例显示了一个完整的 Debezium Server 自定义资源,其中包含前面表格中的属性。
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
name: my-debezium-server
spec:
version: "3.2.0"
source:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.hostname: mysql-server
database.port: 3306
database.user: debezium
database.password: secret
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
offset:
type: kafka
config:
bootstrap.servers: kafka:9092
topic: debezium-offsets
schemaHistory:
type: kafka
config:
bootstrap.servers: kafka:9092
topic: debezium-schema-history
sink:
type: kafka
config:
bootstrap.servers: kafka:9092
format:
key:
type: json
value:
type: json
config:
schemas.enable: false
runtime:
jmx:
enabled: true
port: 1099
storage:
data:
type: persistent
claimName: debezium-data-pvc
metrics:
jmxExporter:
enabled: true
transforms:
- type: io.debezium.transforms.UnwrapFromEnvelope
config:
drop.tombstones: false
predicate: inventory-filter
negate: false
predicates:
inventory-filter:
type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
config:
pattern: "mysql.inventory.*"
quarkus:
config:
quarkus.log.level: INFO
quarkus.http.port: 8080