您正在查看 Debezium 未发布版本的文档。
如果您想查看此页面的最新稳定版本,请在此 查看。

在 OpenShift 上部署 Debezium

本操作流程用于在 Red Hat 的 OpenShift 容器平台上设置 Debezium 连接器。要在 OpenShift 上进行开发或测试,可以使用 CodeRady Containers

为了更快地开始,请尝试 Debezium 在线学习场景。它会为你启动一个 OpenShift 集群,让你在几分钟内就可以在浏览器中使用 Debezium。

先决条件

为了将容器与其他集群工作负载隔离,请为 Debezium 创建一个专用的项目。在本文档的其余部分,将使用 debezium-example 命名空间

$ oc new-project debezium-example

部署 Strimzi Operator

对于 Debezium 的部署,我们将使用 Strimzi 项目,它负责管理 OpenShift 集群上的 Kafka 部署。安装 Strimzi 的最简单方法是从 OperatorHub 安装 Strimzi Operator。在 OpenShift UI 中导航到“OperatorHub”选项卡,选择“Strimzi”,然后单击“安装”按钮。

openshift strimzi operator

如果你更喜欢命令行工具,也可以通过这种方式安装 Strimzi Operator。

$ cat << EOF | oc create -f -
apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: my-strimzi-kafka-operator
  namespace: openshift-operators
spec:
  channel: stable
  name: strimzi-kafka-operator
  source: operatorhubio-catalog
  sourceNamespace: olm
EOF

创建数据库的 Secret

稍后,在部署 Debezium Kafka 连接器时,我们需要为连接器提供用户名和密码,以便它能够连接到数据库。出于安全原因,最好不要直接提供凭据,而是将它们保存在一个单独的安全位置。OpenShift 提供 Secret 对象来实现此目的。除了创建 Secret 对象本身,我们还需要创建一个角色和一个角色绑定,以便 Kafka 可以访问凭据。

首先,让我们创建 Secret 对象:

$ cat << EOF | oc create -f -
apiVersion: v1
kind: Secret
metadata:
  name: debezium-secret
  namespace: debezium-example
type: Opaque
data:
  username: ZGViZXppdW0=
  password: ZGJ6
EOF

usernamepassword 包含 base64 编码的凭据(debezium/dbz),用于连接我们稍后将部署的 MySQL 数据库。

现在,我们可以创建一个引用上一步创建的 secret 的角色。

$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: connector-configuration-role
  namespace: debezium-example
rules:
- apiGroups: [""]
  resources: ["secrets"]
  resourceNames: ["debezium-secret"]
  verbs: ["get"]
EOF

我们还必须将此角色绑定到 Kafka Connect 集群的服务帐户,以便 Kafka Connect 可以访问 secret:

$ cat << EOF | oc create -f -
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: connector-configuration-role-binding
  namespace: debezium-example
subjects:
- kind: ServiceAccount
  name: debezium-connect-cluster-connect
  namespace: debezium-example
roleRef:
  kind: Role
  name: connector-configuration-role
  apiGroup: rbac.authorization.k8s.io
EOF

Strimzi 在我们部署 Kafka Connect 时会自动创建服务账户。服务账户的名称取自 $KafkaConnectName-connect。稍后,我们将创建一个名为 debezium-connect-cluster 的 Kafka Connect 集群,因此我们在这里使用了 debezium-connect-cluster-connect 作为 subjects.name

部署 Apache Kafka

接下来,部署一个 Kafka 集群(通过 NodePool 部署 3 个节点)。

$ cat << EOF | oc create -n debezium-example -f -
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: debezium-cluster-node-pool
  labels:
    strimzi.io/cluster: debezium-cluster
spec:
  replicas: 3
  roles:
    - broker
    - controller
  storage:
    type: ephemeral
---
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: debezium-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: "3.9.0"
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
  entityOperator:
    topicOperator: {}
    userOperator: {}
---
EOF
  • 等待其就绪:

$ oc wait kafka/debezium-cluster --for=condition=Ready --timeout=300s

部署数据源

在以下示例中,将使用 MySQL 作为数据源。除了运行一个带有 MySQL 的 pod 之外,还需要一个指向数据库 pod 的适当服务。例如,可以这样创建:

$ cat << EOF | oc create -f -
apiVersion: v1
kind: Service
metadata:
  name: mysql
spec:
  ports:
  - port: 3306
  selector:
    app: mysql
  clusterIP: None
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mysql
spec:
  selector:
    matchLabels:
      app: mysql
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: mysql
    spec:
      containers:
      - image: quay.io/debezium/example-mysql:3.3
        name: mysql
        env:
        - name: MYSQL_ROOT_PASSWORD
          value: debezium
        - name: MYSQL_USER
          value: mysqluser
        - name: MYSQL_PASSWORD
          value: mysqlpw
        ports:
        - containerPort: 3306
          name: mysql
EOF

部署 Debezium 连接器

要部署 Debezium 连接器,您需要在实例化实际连接器本身之前,先部署一个带有所需连接器插件的 Kafka Connect 集群。作为第一步,必须创建一个带有插件的 Kafka Connect 容器镜像。如果您已经有一个在注册表中构建好的容器镜像,则可以跳过此步骤。本文档将使用 MySQL 连接器作为示例。

创建 Kafka Connect 集群

同样,我们将使用 Strimzi 来创建 Kafka Connect 集群。Strimzi 也可以为我们构建和推送所需的容器镜像。实际上,这两项任务可以合并在一起,并且构建容器镜像的说明可以直接在 KafkaConnect 对象规范中提供。

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      image: image-registry.openshift-image-registry.svc:5000/debezium-example/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/{debezium-version}/debezium-connector-mysql-{debezium-version}-plugin.tar.gz
EOF

在这里,我们利用了 OpenShift 内置的注册表,它已经在 OpenShift 集群上作为服务运行。

为了简化,我们跳过了对下载的工件的校验和验证。如果您想确保工件已正确下载,请通过 sha512sum 属性指定其校验和。有关更多详细信息,请参阅 Strimzi 文档

如果您已经在本地或远程注册表(如 quay.io 或 DockerHub)中拥有合适的容器镜像,则可以使用此简化版本:

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: 10.110.154.103/debezium-connect-mysql:latest
  replicas: 1
  bootstrapServers: debezium-cluster-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
EOF

你还可以注意到,我们已将 secret 提供程序配置为使用 Strimzi secret 提供程序。Strimzi secret 提供程序将为这个 Kafka Connect 集群创建一个服务账户(我们已经将其绑定到适当的角色),并允许 Kafka Connect 访问我们的 Secret 对象。

在创建 Debezium 连接器之前,请检查所有 pod 是否都已运行。

openshift pods

创建 Debezium 连接器

要创建 Debezium 连接器,只需创建一个具有适当配置(在此例中为 MySQL)的 KafkaConnector

$ cat << EOF | oc create -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: ${secrets:debezium-example/debezium-secret:username}
    database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
EOF

您会注意到,我们在连接器配置中没有使用明文用户名和密码,而是引用了我们之前创建的 Secret 对象。

验证部署

要验证一切是否正常工作,您可以例如开始监视 mysql.inventory.customers Kafka 主题:

$ oc run -n debezium-example -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b debezium-cluster-kafka-bootstrap:9092 -C -o beginning -t mysql.inventory.customers

连接到 MySQL 数据库:

$ oc run -n debezium-example -it --rm --image=mysql:8.2 --restart=Never --env MYSQL_ROOT_PASSWORD=debezium mysqlterm -- mysql -hmysql -P3306 -uroot -pdebezium

customers 表中进行一些更改:

sql> update customers set first_name="Sally Marie" where id=1001;

您现在应该能够观察到 Kafka 主题中的更改事件:

{
...
  "payload": {
    "before": {
      "id": 1001,
      "first_name": "Sally",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "after": {
      "id": 1001,
      "first_name": "Sally Marie",
      "last_name": "Thomas",
      "email": "sally.thomas@acme.com"
    },
    "source": {
      "version": "{debezium-version}",
      "connector": "mysql",
      "name": "mysql",
      "ts_ms": 1646300467000,
      "ts_us": 1646300467000000,
      "ts_ns": 1646300467000000000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 401,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1646300467746,
    "ts_us": 1646300467746178,
    "ts_ns": 1646300467746178963,
    "transaction": null
  }
}

如果您有任何关于在 Kubernetes 或 OpenShift 上运行 Debezium 的问题或要求,请在我们的 用户组 或 Debezium 开发者聊天室 中告诉我们。