在 Kubernetes 上部署 Debezium

Debezium 可以轻松地部署在开源容器管理平台 Kubernetes 上。部署利用了 Strimzi 项目,该项目旨在通过*自定义资源*来简化 Apache Kafka 在 Kubernetes 上的部署。

为了测试您的部署,您可以使用 minikube,它可以在您的本地计算机上启动一个 Kubernetes 集群。有关如何在您的计算机上安装 minikube 的详细信息,请参阅 minikube 文档。如果您想完全在 minikube 上测试本文档所述的 Debezium 部署,您需要设置一个不安全的容器镜像仓库。为此,您需要使用 --insecure-registry 标志启动 minikube

$ minikube start --insecure-registry "10.0.0.0/24"

10.0.0.1 是默认的服务集群 IP,因此此设置允许在整个集群内部拉取镜像。有关更多详细信息,请参阅 minikube 文档。您还需要启用 registry minikube 插件

$ minikube addons enable registry

先决条件

为了将容器与集群中的其他工作负载隔离开,请为 Debezium 创建一个专用命名空间。在本档的其余部分,将使用 debezium-example 命名空间

$ kubectl create ns debezium-example

部署 Strimzi Operator

如上所述,对于 Debezium 部署,我们将使用 Strimzi,它负责在 Kubernetes 上管理 Kafka 部署。有关如何在您的 Kubernetes 集群上部署 Strimzi 的更多详细信息,请参阅 Strimzi 部署文档

安装 Strimzi 最简单的方法是通过 Operator Lifecycle Manager (OLM)。如果您尚未在集群中安装 OLM,可以通过运行以下命令进行安装

$ curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

现在,安装 Strimzi 操作程序本身

$ kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

创建数据库的 Secret

稍后,在部署 Debezium Kafka 连接器时,我们需要提供用户名和密码,以便连接器能够连接到数据库。出于安全原因,最好不要直接提供凭据,而是将其保存在单独的安全位置。Kubernetes 为此目的提供了 Secret 对象。除了创建 Secret 对象本身之外,我们还必须创建一个角色和一个角色绑定,以便 Kafka 可以访问凭据。

首先,让我们创建 Secret 对象:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
EOF

usernamepassword 包含 base64 编码的凭据(debezium/dbz),用于连接我们稍后将部署的 MySQL 数据库。

现在,我们可以创建一个引用上一步中创建的 secret 的角色

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF

我们还必须将此角色绑定到 Kafka Connect 集群的服务帐户,以便 Kafka Connect 可以访问 secret:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

当我们部署 Kafka Connect 时,Strimzi 将创建服务帐户。服务帐户的名称形式为 $KafkaConnectName-connect。稍后,我们将创建名为 debezium-connect-cluster 的 Kafka Connect 集群,因此这里我们将 debezium-connect-cluster-connect 用作 subjects.name

部署 Apache Kafka

接下来,部署一个(单节点)Kafka 集群

$ cat << EOF | kubectl create -n debezium-example -f -
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: debezium-cluster-node-pool
  labels:
    strimzi.io/cluster: debezium-cluster
spec:
  replicas: 3
  roles:
    - broker
    - controller
  storage:
    type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: "3.9.0"
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
EOF
  • 等待其就绪:

$ kubectl wait kafka/debezium-cluster --for=condition=Ready --timeout=300s -n debezium-example

部署数据源

在以下示例中,将使用 MySQL 作为数据源。除了运行一个带有 MySQL 的 pod 之外,还需要一个指向数据库 pod 的适当服务。例如,可以这样创建:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:3.3
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
EOF

部署 Debezium 连接器

要部署 Debezium 连接器,您需要在实例化实际连接器本身之前,先部署一个带有所需连接器插件的 Kafka Connect 集群。作为第一步,必须创建一个带有插件的 Kafka Connect 容器镜像。如果您已经有一个在注册表中构建好的容器镜像,则可以跳过此步骤。本文档将使用 MySQL 连接器作为示例。

创建 Kafka Connect 集群

同样,我们将使用 Strimzi 来创建 Kafka Connect 集群。Strimzi 也可以为我们构建和推送所需的容器镜像。实际上,这两项任务可以合并在一起,并且构建容器镜像的说明可以直接在 KafkaConnect 对象规范中提供。

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: 10.110.154.103/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

您必须将镜像仓库的 IP 地址 10.110.154.103 替换为您自己的镜像仓库地址。如果您在 minikube 上使用 registry 插件运行,您可以将镜像推送到内部 minikube 镜像仓库。镜像仓库的 IP 地址可以通过例如运行以下命令获得

$ kubectl -n kube-system get svc registry -o jsonpath='{.spec.clusterIP}'

为了简化,我们跳过了对下载的工件的校验和验证。如果您想确保工件已正确下载,请通过 sha512sum 属性指定其校验和。有关更多详细信息,请参阅 Strimzi 文档

如果您已经在本地或远程注册表(如 quay.io 或 DockerHub)中拥有合适的容器镜像,则可以使用此简化版本:

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: 10.110.154.103/debezium-connect-mysql:latest
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
EOF

另请注意,我们已配置了 Strimzi secret provider。此 secret provider 将为该 Kafka Connect 集群创建服务帐户(我们已将其绑定到适当的角色),并允许 Kafka Connect 访问我们的 Secret 对象。

创建 Debezium 连接器

要创建 Debezium 连接器,只需创建一个具有适当配置(在此例中为 MySQL)的 KafkaConnector

$ cat << EOF | kubectl create -n debezium-example -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
EOF

您会注意到,我们在连接器配置中没有使用明文用户名和密码,而是引用了我们之前创建的 Secret 对象。

验证部署

要验证一切是否正常工作,您可以例如开始监视 mysql.inventory.customers Kafka 主题:

$ kubectl run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

连接到 MySQL 数据库:

$ kubectl run -n debezium-example -it --rm --image=mysql:8.2 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

customers 表中进行一些更改:

sql> update customers set first_name="Sally Marie" where id=1001;

您现在应该能够观察到 Kafka 主题中的更改事件:

{
...
  "payload": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Sally Marie",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "{debezium-version}",
      "connector": "mysql",
      "name": "mysql",
      "ts_ms": 1646300467000,
      "ts_us": 1646300467000000,
      "ts_ns": 1646300467000000000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 401,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1646300467746,
    "ts_us": 1646300467746041,
    "ts_ns": 1646300467746041975,
    "transaction": null
  }
}