在 OpenShift 上部署 Debezium
本操作流程用于在 Red Hat 的 OpenShift 容器平台上设置 Debezium 连接器。要在 OpenShift 上进行开发或测试,可以使用 CodeRady Containers。
为了更快地开始,请尝试 Debezium 在线学习场景。它会为你启动一个 OpenShift 集群,让你在几分钟内就可以在浏览器中使用 Debezium。
先决条件
为了将容器与其他集群工作负载隔离,请为 Debezium 创建一个专用的项目。在本文档的其余部分,将使用 debezium-example 命名空间
$ oc new-project debezium-example
部署 Strimzi Operator
对于 Debezium 的部署,我们将使用 Strimzi 项目,它负责管理 OpenShift 集群上的 Kafka 部署。安装 Strimzi 的最简单方法是从 OperatorHub 安装 Strimzi Operator。在 OpenShift UI 中导航到“OperatorHub”选项卡,选择“Strimzi”,然后单击“安装”按钮。
如果你更喜欢命令行工具,也可以通过这种方式安装 Strimzi Operator。
$ cat << EOF | oc create -f -
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
name: my-strimzi-kafka-operator
namespace: openshift-operators
spec:
channel: stable
name: strimzi-kafka-operator
source: operatorhubio-catalog
sourceNamespace: olm
EOF
创建数据库的 Secret
稍后,在部署 Debezium Kafka 连接器时,我们需要为连接器提供用户名和密码,以便它能够连接到数据库。出于安全原因,最好不要直接提供凭据,而是将它们保存在一个单独的安全位置。OpenShift 提供 Secret 对象来实现此目的。除了创建 Secret 对象本身,我们还需要创建一个角色和一个角色绑定,以便 Kafka 可以访问凭据。
首先,让我们创建 Secret 对象:
$ cat << EOF | oc create -f -
apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: debezium-example
type: Opaque
data:
username: ZGViZXppdW0=
password: ZGJ6
EOF
username 和 password 包含 base64 编码的凭据(debezium/dbz),用于连接我们稍后将部署的 MySQL 数据库。
现在,我们可以创建一个引用上一步创建的 secret 的角色。
$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: debezium-example
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
EOF
我们还必须将此角色绑定到 Kafka Connect 集群的服务帐户,以便 Kafka Connect 可以访问 secret:
$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: debezium-example
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: debezium-example
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
EOF
Strimzi 在我们部署 Kafka Connect 时会自动创建服务账户。服务账户的名称取自 $KafkaConnectName-connect。稍后,我们将创建一个名为 debezium-connect-cluster 的 Kafka Connect 集群,因此我们在这里使用了 debezium-connect-cluster-connect 作为 subjects.name。
部署 Apache Kafka
接下来,部署一个 Kafka 集群(通过 NodePool 部署 3 个节点)。
$ cat << EOF | oc create -n debezium-example -f -
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: debezium-cluster-node-pool
labels:
strimzi.io/cluster: debezium-cluster
spec:
replicas: 3
roles:
- broker
- controller
storage:
type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: debezium-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: "3.9.0"
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
entityOperator:
topicOperator: {}
userOperator: {}
---
EOF
-
等待其就绪:
$ oc wait kafka/debezium-cluster --for=condition=Ready --timeout=300s
部署数据源
在以下示例中,将使用 MySQL 作为数据源。除了运行一个带有 MySQL 的 pod 之外,还需要一个指向数据库 pod 的适当服务。例如,可以这样创建:
$ cat << EOF | oc create -f -
apiVersion: v1
kind: Service
metadata:
name: mysql
spec:
ports:
- port: 3306
selector:
app: mysql
clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: mysql
spec:
selector:
matchLabels:
app: mysql
strategy:
type: Recreate
template:
metadata:
labels:
app: mysql
spec:
containers:
- image: quay.io/debezium/example-mysql:3.3
name: mysql
env:
- name: MYSQL_ROOT_PASSWORD
value: debezium
- name: MYSQL_USER
value: mysqluser
- name: MYSQL_PASSWORD
value: mysqlpw
ports:
- containerPort: 3306
name: mysql
EOF
部署 Debezium 连接器
要部署 Debezium 连接器,您需要在实例化实际连接器本身之前,先部署一个带有所需连接器插件的 Kafka Connect 集群。作为第一步,必须创建一个带有插件的 Kafka Connect 容器镜像。如果您已经有一个在注册表中构建好的容器镜像,则可以跳过此步骤。本文档将使用 MySQL 连接器作为示例。
创建 Kafka Connect 集群
同样,我们将使用 Strimzi 来创建 Kafka Connect 集群。Strimzi 也可以为我们构建和推送所需的容器镜像。实际上,这两项任务可以合并在一起,并且构建容器镜像的说明可以直接在 KafkaConnect 对象规范中提供。
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: image-registry.openshift-image-registry.svc:5000/debezium-example/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF
在这里,我们利用了 OpenShift 内置的注册表,它已经在 OpenShift 集群上作为服务运行。
|
为了简化,我们跳过了对下载的工件的校验和验证。如果您想确保工件已正确下载,请通过 |
如果您已经在本地或远程注册表(如 quay.io 或 DockerHub)中拥有合适的容器镜像,则可以使用此简化版本:
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: 10.110.154.103/debezium-connect-mysql:latest
replicas: 1
bootstrapServers: debezium-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
EOF
你还可以注意到,我们已将 secret 提供程序配置为使用 Strimzi secret 提供程序。Strimzi secret 提供程序将为这个 Kafka Connect 集群创建一个服务账户(我们已经将其绑定到适当的角色),并允许 Kafka Connect 访问我们的 Secret 对象。
在创建 Debezium 连接器之前,请检查所有 pod 是否都已运行。
创建 Debezium 连接器
要创建 Debezium 连接器,只需创建一个具有适当配置(在此例中为 MySQL)的 KafkaConnector:
$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: ${secrets:debezium-example/debezium-secret:username}
database.password: ${secrets:debezium-example/debezium-secret:password}
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory
EOF
您会注意到,我们在连接器配置中没有使用明文用户名和密码,而是引用了我们之前创建的 Secret 对象。
验证部署
要验证一切是否正常工作,您可以例如开始监视 mysql.inventory.customers Kafka 主题:
$ oc run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers
连接到 MySQL 数据库:
$ oc run -n debezium-example -it --rm --image=mysql:8.2 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium
在 customers 表中进行一些更改:
sql> update customers set first_name="Sally Marie" where id=1001;
您现在应该能够观察到 Kafka 主题中的更改事件:
{
...
"payload": {
"before": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"after": {
"id": 1001,
"first_name": "Sally Marie",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"source": {
"version": "{debezium-version}",
"connector": "mysql",
"name": "mysql",
"ts_ms": 1646300467000,
"ts_us": 1646300467000000,
"ts_ns": 1646300467000000000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 401,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1646300467746,
"ts_us": 1646300467746178,
"ts_ns": 1646300467746178963,
"transaction": null
}
}