更新(2019年10月11日):在Kubernetes上运行Debezium(以及Apache Kafka和Kafka Connect)的一个更简单的方法是使用K8s Operator,例如Strimzi。您可以在此处找到在OpenShift上设置Debezium的说明,对于纯Kubernetes也适用类似的步骤。

我们的Debezium教程将逐步引导您完成Debezium的使用,通过安装、启动和连接运行在单个主机上的所有Docker容器。当然,您可以使用Docker Compose或自己的脚本来简化操作,但这只是自动化在单台机器上运行所有容器。您真正想要的是在多台机器的集群上运行容器。在这篇博文中,我们将使用Red Hat和Google提供的容器集群管理器Kubernetes来运行Debezium。

Kubernetes是一个容器(Docker/Rocket/Hyper.sh)集群管理工具。像许多其他流行的集群管理和计算资源调度平台一样,Kubernetes的根源在于Google,Google在运行大规模容器方面经验丰富。他们每周启动、停止和集群20亿个容器,并且他们贡献了许多使容器成为可能的核心Linux底层技术。他们一篇著名的论文讨论了一个名为Borg的内部集群管理器。有了Kubernetes,Google对大家用Java实现他们的论文感到厌倦,于是决定自己实现这篇论文:)

Kubernetes是用Go语言编写的,并且迅速成为大规模调度、管理和集群容器的实际API。这篇博文并非旨在成为Kubernetes的入门指南,因此我们建议您查阅入门指南文档以了解更多关于Kubernetes的信息。

入门

要开始,我们需要访问一个 Kubernetes 集群。启动一个集群非常简单:只需按照 入门指南 进行操作。我们比较喜欢的是 OpenShift 的 all in one VMRed Hat Container Development Kit,它们提供了一个经过加固、生产就绪的 Kubernetes 发行版。安装并登录后,您应该能够运行 kubectl get pod 来获取您可能正在运行的 Kubernetes Pod 列表。您无需在 Kubernetes 中运行任何其他内容即可开始。

要获取并构建 Kubernetes manifest 文件(yaml 描述符),请克隆 Debezium Kubernetes 仓库并运行以下命令:

$ mvn clean
$ mvn install

该项目使用了强大的 Fabric8 Maven 插件 来自动生成 Kubernetes manifest 文件。以下是 $PROJECT_ROOT/zk-standalone/target/classes/kubernetes.yml 中生成内容的一个示例:

apiVersion: "v1"
items:
- apiVersion: "v1"
  kind: "Service"
  metadata:
    annotations: {}
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zookeeper"
  spec:
    deprecatedPublicIPs: []
    externalIPs: []
    ports:
    - port: 2181
      protocol: "TCP"
      targetPort: 2181
    selector:
      project: "zookeeper"
      provider: "debezium"
      group: "io.debezium"
- apiVersion: "v1"
  kind: "ReplicationController"
  metadata:
    annotations:
      fabric8.io/git-branch: "master"
      fabric8.io/git-commit: "004e222462749fbaf12c3ee33edca9b077ee9003"
    labels:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    name: "zk-standalone"
  spec:
    replicas: 1
    selector:
      project: "zookeeper"
      provider: "debezium"
      version: "0.1-SNAPSHOT"
      group: "io.debezium"
    template:
      metadata:
        annotations: {}
        labels:
          project: "zookeeper"
          provider: "debezium"
          version: "0.1-SNAPSHOT"
          group: "io.debezium"
      spec:
        containers:
        - args: []
          command: []
          env:
          - name: "KUBERNETES_NAMESPACE"
            valueFrom:
              fieldRef:
                fieldPath: "metadata.namespace"
          image: "docker.io/debezium/zookeeper:0.1"
          imagePullPolicy: "IfNotPresent"
          name: "zk-standalone"
          ports:
          - containerPort: 3888
            name: "election"
          - containerPort: 2888
            name: "peer"
          - containerPort: 2181
            name: "client"
          securityContext: {}
          volumeMounts: []
        imagePullSecrets: []
        nodeSelector: {}
        volumes: []
kind: "List"

在 Kubernetes 上启动 Zookeeper 和 Kafka

要在 Kubernetes 中启动 Apache ZookeeperApache Kafka,您有两种选择。如果您在本地机器上安装了 kubectl 命令行工具(或 OpenShift 客户端发行版中的 oc 工具),则可以像这样应用任何新生成的 Kubernetes manifest 文件:

$ kubectl create -f <path_to_file>

或者,您可以使用 Fabric8 Maven 插件及其 fabric8:apply 目标来应用 manifest 文件。请注意,以上两种选项中的任何一种要想正常工作,您都必须当前已登录到您的 Kubernetes 集群。(另外,OpenShift 的 oc login <url> 可以非常轻松地实现这一点,或者请参阅 使用 kubectl 登录 Kubernetes 集群 以获取更多信息。)

首先,让我们将 Zookeeper 部署到我们的 Kubernetes 集群。我们需要切换到 $PROJECT_ROOT/zk-standalone 目录,然后应用我们的 Kubernetes 配置。首先,让我们看看如何使用 kubectl 命令来完成此操作:

$ cd zk-standalone
$ kubectl create -f target/classes/kubernetes.yml

service "zookeeper" created
replicationcontroller "zk-standalone" created

您可以使用 Maven 和 fabric8 maven 插件执行相同的操作:

$ cd zk-standalone
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building zk-standalone 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ zk-standalone ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/zk-standalone/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name zookeeper
[INFO] Created Service: zk-standalone/target/fabric8/applyJson/ticket/service-zookeeper.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name zk-standalone
[INFO] Created ReplicationController: zk-standalone/target/fabric8/applyJson/ticket/replicationcontroller-zk-standalone.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.661 s
[INFO] Finished at: 2016-05-19T15:59:26-07:00
[INFO] Final Memory: 26M/260M
[INFO] ------------------------------------------------------------------------

Zookeeper 已部署,因此让我们继续部署 Kafka。导航到 $PROJECT_ROOT/kafka,然后应用 Kafka 部署配置:

$ cd ../kafka
$ kubectl create -f target/classes/kubernetes.yml

service "kafka" created
replicationcontroller "kafka" created

或者使用 fabric8 maven 插件:

$ cd ../kafka
$ mvn fabric8:apply

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building kafka 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ kafka ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/kafka/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name kafka
[INFO] Created Service: kafka/target/fabric8/applyJson/ticket/service-kafka.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name kafka
[INFO] Created ReplicationController: kafka/target/fabric8/applyJson/ticket/replicationcontroller-kafka.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.563 s
[INFO] Finished at: 2016-05-19T16:03:25-07:00
[INFO] Final Memory: 26M/259M
[INFO] ------------------------------------------------------------------------

使用 kubectl get pod 命令查看正在运行的内容:

$ kubectl get pod

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          46s
zk-standalone-4mo02   1/1       Running   0          4m

您是否注意到,我们在启动时没有手动“链接”容器?Kubernetes 具有集群服务发现功能,称为 Kubernetes Services,它通过负载均衡并允许我们使用内部 DNS(或集群 IP)来发现 Pod。例如,在 Kafka 的 kubernetes.yml 部署配置中,您会看到以下内容:

    ...
    containers:
    - args: []
      command: []
      env:
      - name: "KAFKA_ADVERTISED_PORT"
        value: "9092"
      - name: "KAFKA_ADVERTISED_HOST_NAME"
        value: "kafka"
      - name: "KAFKA_ZOOKEEPER_CONNECT"
        value: "zookeeper:2181"
      - name: "KAFKA_PORT"
        value: "9092"
      - name: "KUBERNETES_NAMESPACE"
        valueFrom:
          fieldRef:
            fieldPath: "metadata.namespace"
      image: "docker.io/debezium/kafka:0.1"
      imagePullPolicy: "IfNotPresent"
      name: "kafka"
    ...

我们为 Docker 镜像使用的 KAFKA_ZOOKEEPER_CONNECT 环境变量指定了值,从而使 Kafka 能够发现运行在任何地方的 Zookeeper Pod。虽然我们可以使用任何主机名,但为了简单起见,我们只使用 zookeeper 作为 DNS 名称。因此,如果您登录到其中一个 Pod 并尝试访问名为 zookeeper 的主机,Kubernetes 会自动将该请求解析到其中一个 Zookeeper Pod(如果存在多个)。太棒了!这个发现机制也用于其余的组件。(注意,DNS 解析到的这个集群 IP 在给定的服务存在多少个 Pod 的情况下,在其生命周期内**永远**不会改变。这意味着您可以依赖此服务发现,而不会遇到其他可能遇到的 DNS 缓存问题。)

下一步是创建一个 schema-changes 主题,Debezium 的 MySQL 连接器将使用该主题。让我们使用 Kafka 工具来创建它:

$ KAFKA_POD_NAME=$(kubectl get pod | grep -i running | grep kafka | awk '{ print $1 }')

$ kubectl exec $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic schema-changes.inventory

在 Kubernetes 上启动 MySQL 数据库

启动 MySQL 数据库遵循的说明与安装 Zookeeper 或 Kafka 相同。我们将导航到 $PROJECT_ROOT/mysql56 目录,并将使用 MySQL 5.6 OpenShift Docker 镜像,以便它可以在纯 Kubernetes 和 OpenShift v3.x 上运行。这是启动我们的 MySQL 实例的 kubectl 命令:

$ cd ../mysql56
$ kubectl create -f target/classes/kubernetes.yml

service "mysql" created
replicationcontroller "mysql56" created

或者等效的 Maven 命令:

$ cd mysql56
$ mvn fabric8:apply

现在,当我们运行 kubectl get pod 时,我们也应该看到我们的 MySQL 数据库正在运行:

NAME                  READY     STATUS    RESTARTS   AGE
kafka-mqmxt           1/1       Running   0          17m
mysql56-b4f36         1/1       Running   0          9m
zk-standalone-4mo02   1/1       Running   0          21m

让我们运行一个命令来获得对数据库的客户端访问。首先,为 Pod 的名称和 IP 地址设置几个环境变量:

$ MYSQL_POD_NAME=$(kubectl get pod | grep Running | grep ^mysql | awk '{ print $1 }')
$ MYSQL_POD_IP=$(kubectl describe pod $MYSQL_POD_NAME | grep IP | awk '{ print $2 }')

然后,登录到正在运行 MySQL 数据库的 Kubernetes Pod,并启动 MySQL 命令行客户端:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin
Warning: Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1
Server version: 5.6.26-log MySQL Community Server (GPL)

Copyright (c) 2000, 2015, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>

这表明 kubectl 命令行可以让我们轻松访问 Pod 或 Docker 容器,无论它在集群中的哪个位置运行。

接下来,退出 mysql shell(输入 exit),然后运行以下命令下载一个 SQL 脚本,该脚本将填充一个 inventory 示例数据库:

$ kubectl exec  -it $MYSQL_POD_NAME -- bash -c "curl -s -L https://gist.github.com/christian-posta/e20ddb5c945845b4b9f6eba94a98af09/raw | /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin"

现在,如果我们重新登录到 MySQL Pod,我们可以显示数据库和表:

$ kubectl exec -it $MYSQL_POD_NAME   -- /opt/rh/rh-mysql56/root/usr/bin/mysql -h$MYSQL_POD_IP -P3306 -uroot -padmin -e 'use inventory; show tables;'

+---------------------+
| Tables_in_inventory |
+---------------------+
| customers           |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
4 rows in set (0.00 sec)

启动 Kafka Connect 和 Debezium

导航到 $PROJECT_ROOT/connect-mysql 目录。在这里,我们将启动一个运行 Kafka Connect 的 Kubernetes Pod,其中已安装 Debezium MySQL 连接器。Debezium MySQL 连接器连接到 MySQL 数据库,读取 binlog,并将这些行事件写入 Kafka。在 Kubernetes 上启动带有 Debezium 的 Kafka Connect,过程与之前的组件类似:

$ cd ../connect-mysql
$ kubectl create -f target/classes/kubernetes.yml

service "connect-mysql" created
replicationcontroller "connect-mysql" created

或者使用 fabric8 maven 插件:

$ cd ../connect-mysql
$ mvn fabric8:apply
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=1512m; support was removed in 8.0
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mysql 0.1-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- fabric8-maven-plugin:2.2.115:apply (default-cli) @ connect-mysql ---
[INFO] Using kubernetes at: https://172.28.128.4:8443/ in namespace ticket
[INFO] Kubernetes JSON: /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/connect-mysql/target/classes/kubernetes.json
[INFO] OpenShift platform detected
[INFO] Using namespace: ticket
[INFO] Looking at repo with directory /Users/ceposta/dev/idea-workspace/dbz/debezium-kubernetes/.git
[INFO] Creating a Service from kubernetes.json namespace ticket name connect-mysql
[INFO] Created Service: connect-mysql/target/fabric8/applyJson/ticket/service-connect-mysql.json
[INFO] Creating a ReplicationController from kubernetes.json namespace ticket name connect-mysql
[INFO] Created ReplicationController: connect-mysql/target/fabric8/applyJson/ticket/replicationcontroller-connect-mysql.json
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.255 s
[INFO] Finished at: 2016-05-25T09:21:04-07:00
[INFO] Final Memory: 27M/313M
[INFO] ------------------------------------------------------------------------

就像在 Debezium 的 Docker 教程中一样,我们现在想向 Kafka Connect API 发送一个 JSON 对象来启动我们的 Debezium 连接器。首先,我们需要公开 Kafka Connect 集群的 API。您可以通过任何您想要的方式实现这一点:在 Kubernetes 上(Ingress 定义NodePort 服务 等)或在 OpenShift 上,您可以使用 OpenShift Routes。对于这个简单的示例,我们将使用简单的 Pod 端口转发,将 connect-mysql Pod 的 8083 端口转发到我们的本地机器(同样,无论 Pod 实际在哪里运行,都可以轻松访问。这是 Kubernetes 的一个非常出色的功能,它使开发分布式服务变得如此简单!)。

让我们确定 Pod 名称,然后使用端口转发到我们的本地机器:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl port-forward $CONNECT_POD_NAME 8083:8083

I0525 09:30:08.390491    6651 portforward.go:213] Forwarding from 127.0.0.1:8083 -> 8083
I0525 09:30:08.390631    6651 portforward.go:213] Forwarding from [::1]:8083 -> 8083

我们将 Pod 的 8083 端口转发到本地机器的 8083。现在,如果我们访问 https://:8083,它将被定向到运行我们的 Kafka Connect 和 Debezium 服务的 Pod。

由于查看 Pod 的输出以了解是否存在任何异常可能很有用,因此请启动另一个终端并输入以下命令来跟踪 Kafka Connect 的输出:

$ CONNECT_POD_NAME=$(kubectl get pod | grep -i running | grep ^connect | awk '{ print $1 }')
$ kubectl logs -f $CONNECT_POD_NAME

现在,让我们使用 HTTP 客户端将 Debezium Connector/Task 发布到我们刚刚本地公开的端点:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" https://:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "replicator", "database.password": "replpass", "database.server.id": "184054", "database.server.name": "mysql-server-1", "database.binlog": "mysql-bin.000001", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.inventory" } }'

如果我们正在监视 connect-mysql Pod 的日志输出,我们会看到它最终看起来像这样:

2016-05-27 18:50:14,580 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 2 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,690 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 3 : {mysql-server-1.inventory.products=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:14,911 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 7 : {mysql-server-1.inventory.products_on_hand=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,136 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 10 : {mysql-server-1.inventory.customers=LEADER_NOT_AVAILABLE}
2016-05-27 18:50:15,362 - WARN  [kafka-producer-network-thread | producer-1:NetworkClient$DefaultMetadataUpdater@582] - Error while fetching metadata with correlation id 13 : {mysql-server-1.inventory.orders=LEADER_NOT_AVAILABLE}

这些错误只是 Kafka 告诉我们主题不存在但已被创建的方式。

如果我们现在列出 Kafka 中的主题,我们应该会看到一个 Kafka 主题对应于 MySQL inventory 数据库中的每个表:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
my-connect-configs
my-connect-offsets
mysql-server-1.inventory.customers
mysql-server-1.inventory.orders
mysql-server-1.inventory.products
mysql-server-1.inventory.products_on_hand
schema-changes.inventory

让我们来看看其中一个主题的内容:

$ kubectl exec  $KAFKA_POD_NAME --  /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --topic mysql-server-1.inventory.customers --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1001}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1002}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1003}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"inventory.customers/pk"},"payload":{"id":1004}}   {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":false,"name":"inventory.customers"},"payload":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}}

发生了什么?当我们启动 Debezium 的 MySQL 连接器时,它开始读取 MySQL 服务器的二进制复制日志,并重放了所有历史记录,并为每个 INSERT、UPDATE 和 DELETE 操作生成了一个事件(尽管在我们示例的 inventory 数据库中,我们只有 INSERTs)。如果我们或某些客户端应用程序向数据库提交其他更改,Debezium 会立即看到它们并将它们写入正确的主题。换句话说,Debezium 将我们 MySQL 数据库的所有更改记录为 Kafka 主题中的事件!然后,任何工具、连接器或服务都可以独立地从 Kafka 消费这些事件流,并处理它们或将它们放入不同的数据库、Hadoop、Elasticsearch、数据网格等中。

清理

如果您想删除连接器,只需发出一个 REST 请求将其移除:

curl -i -X DELETE -H "Accept:application/json" https://:8083/connectors/inventory-connector

Christian Posta

Christian 是 Red Hat 的首席中间件架构师,也是开源软件、Apache、Cloud、Integration、Kubernetes、Docker、OpenShift 和 Fabric8 的爱好者。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×