在 OpenShift 上部署 Debezium

此过程用于在 Red Hat 的 OpenShift 容器平台上设置 Debezium 连接器。对于在 OpenShift 上进行开发或测试,您可以使用 CodeRady Containers

为了更快上手,请尝试 Debezium 在线学习场景。它会为您启动一个 OpenShift 集群,让您可以在几分钟内在浏览器中使用 Debezium。

先决条件

为了将容器与其他工作负载隔离开,请为 Debezium 创建一个专用的项目。在本文档的其余部分,将使用 debezium-example 命名空间

$ oc new-project debezium-example

部署 Strimzi Operator

对于 Debezium 的部署,我们将使用 Strimzi 项目,它负责管理 OpenShift 集群上的 Kafka 部署。安装 Strimzi 最简单的方法是从 OperatorHub 安装 Strimzi 算子。在 OpenShift UI 中导航到“OperatorHub”选项卡,选择“Strimzi”并单击“安装”按钮。

openshift strimzi operator

如果您偏爱命令行工具,也可以通过这种方式安装 Strimzi 算子

$ cat << EOF | oc create -f -
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: my-strimzi-kafka-operator
  namespace: openshift-operators
spec:
  channel: stable
  name: strimzi-kafka-operator
  source: operatorhubio-catalog
  sourceNamespace: olm
EOF

创建数据库的 Secret

稍后,在部署 Debezium Kafka 连接器时,我们需要提供用户名和密码供连接器连接到数据库。出于安全原因,最佳实践是不要直接提供凭据,而是将它们保存在一个单独的安全位置。OpenShift 为此目的提供了 Secret 对象。除了创建 Secret 对象本身,我们还必须创建一个角色和一个角色绑定,以便 Kafka 能够访问凭据。

首先,让我们创建 Secret 对象:

$ cat << EOF | oc create -f -
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
EOF

usernamepassword 包含 base64 编码的凭据(debezium/dbz),用于连接我们稍后将部署的 MySQL 数据库。

现在,我们可以创建一个引用上一步创建的 secret 的角色

$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF

我们还必须将此角色绑定到 Kafka Connect 集群的服务帐户,以便 Kafka Connect 可以访问 secret:

$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

当部署 Kafka Connect 时,Strimzi 将创建服务帐户。服务帐户的名称来自 $KafkaConnectName-connect。稍后,我们将创建一个名为 debezium-connect-cluster 的 Kafka Connect 集群,因此我们在 subjects.name 中使用了 debezium-connect-cluster-connect

部署 Apache Kafka

接下来,部署一个 Kafka 集群(通过 NodePool 部署 3 个节点)

$ cat << EOF | oc create -n debezium-example -f -
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: debezium-cluster-node-pool
  labels:
    strimzi.io/cluster: debezium-cluster
spec:
  replicas: 3
  roles:
    - broker
    - controller
  storage:
    type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: "3.9.0"
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
EOF
  • 等待其就绪:

$ oc wait kafka/debezium-cluster --for=condition=Ready --timeout=300s

部署数据源

在以下示例中,将使用 MySQL 作为数据源。除了运行一个带有 MySQL 的 pod 之外,还需要一个指向数据库 pod 的适当服务。例如,可以这样创建:

$ cat << EOF | oc create -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:3.3
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
EOF

部署 Debezium 连接器

要部署 Debezium 连接器,您需要在实例化实际连接器本身之前,先部署一个带有所需连接器插件的 Kafka Connect 集群。作为第一步,必须创建一个带有插件的 Kafka Connect 容器镜像。如果您已经有一个在注册表中构建好的容器镜像,则可以跳过此步骤。本文档将使用 MySQL 连接器作为示例。

创建 Kafka Connect 集群

同样,我们将使用 Strimzi 来创建 Kafka Connect 集群。Strimzi 也可以为我们构建和推送所需的容器镜像。实际上,这两项任务可以合并在一起,并且构建容器镜像的说明可以直接在 KafkaConnect 对象规范中提供。

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: image-registry.openshift-image-registry.svc:5000/debezium-example/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

在这里,我们利用了 OpenShift 内置的注册表,该注册表已经作为一个服务在 OpenShift 集群上运行。

为了简化,我们跳过了对下载的工件的校验和验证。如果您想确保工件已正确下载,请通过 sha512sum 属性指定其校验和。有关更多详细信息,请参阅 Strimzi 文档

如果您已经在本地或远程注册表(如 quay.io 或 DockerHub)中拥有合适的容器镜像,则可以使用此简化版本:

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: 10.110.154.103/debezium-connect-mysql:latest
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
EOF

您还可以注意到,我们已将 secret 提供程序配置为使用 Strimzi secret 提供程序。Strimzi secret 提供程序将为这个 Kafka Connect 集群创建服务帐户(我们已经将其绑定到相应的角色),并允许 Kafka Connect 访问我们的 Secret 对象。

在创建 Debezium 连接器之前,请检查所有 pod 是否都已运行

openshift pods

创建 Debezium 连接器

要创建 Debezium 连接器,只需创建一个具有适当配置(在此例中为 MySQL)的 KafkaConnector

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
EOF

您会注意到,我们在连接器配置中没有使用明文用户名和密码,而是引用了我们之前创建的 Secret 对象。

验证部署

要验证一切是否正常工作,您可以例如开始监视 mysql.inventory.customers Kafka 主题:

$ oc run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

连接到 MySQL 数据库:

$ oc run -n debezium-example -it --rm --image=mysql:8.2 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

customers 表中进行一些更改:

sql> update customers set first_name="Sally Marie" where id=1001;

您现在应该能够观察到 Kafka 主题中的更改事件:

{
...
  "payload": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Sally Marie",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "{debezium-version}",
      "connector": "mysql",
      "name": "mysql",
      "ts_ms": 1646300467000,
      "ts_us": 1646300467000000,
      "ts_ns": 1646300467000000000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 401,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1646300467746,
    "ts_us": 1646300467746178,
    "ts_ns": 1646300467746178963,
    "transaction": null
  }
}

如果您有任何关于在 Kubernetes 或 OpenShift 上运行 Debezium 的问题或请求,请在我们的 用户组 或 Debezium 开发者聊天 中告诉我们。