欢迎阅读我们关于 Debezium 信号和通知系列的第三篇文章。在本文中,我们将继续探索 Debezium 信号和通知。特别是,我们将深入探讨如何使用 JMX 通道启用和管理这些功能。

我们还将探讨如何通过利用 Jolokia 的 REST API 发送信号和获取通知。

通过 JMX 与 Debezium 交互

JMX 代表 Java Management Extensions,是一项用于管理和监控 Java 应用程序的 Java 技术。它提供了一种标准化的方法,可以使用各种管理工具和客户端来监控应用程序性能、配置设置以及与运行中的 Java 应用程序进行交互。JMX 对于管理和监控复杂、分布式和企业级 Java 应用程序特别有用。

通过 JMX 通道启用信号

Debezium 中的信号是指在正常执行期间触发操作以执行操作。如前几篇文章所述,Debezium 提供了不同的开箱即用的信号通道。在本文中,我们将重点介绍 JMX 通道。

要开始使用 JMX 信号通道,我们需要

  • 在 Kafka Connect 服务上启用 JMX 服务器

  • jmx 添加到 signal.enabled.channels 连接器配置属性中

  • 使用 JMX 客户端连接到 JMX 服务器以发送信号。

Debezium 公开名为 debezium.<connector-type>:type=management,context=signals,server=<server> 的信号 MBean。此 bean 公开了 signal 操作,该操作接受三个参数

  • p0:信号的 ID。

  • p1:信号的类型,例如 execute-snapshot。

  • p2:一个 JSON 数据字段,其中包含关于指定信号类型的其他信息。

通过 JMX 通道启用通知

通知对于告知您 Debezium 中发生的情况至关重要。通过 JMX 通道访问通知允许您轻松监控 Debezium,例如,增量快照的进度。

要开始使用 JMX 通知通道,我们需要

  • 在 Kafka Connect 服务上启用 JMX 服务器

  • jmx 添加到 notification.enabled.channels 连接器配置属性中

  • 使用 JMX 客户端连接到 JMX 服务器以访问通知。

Debezium 公开名为 debezium.<connector-type>:type=management,context=notifications,server=<server> 的通知 MBean。此 bean 提供了一个 Notification bean,其中包含一系列 JMX CompositeData 类型,具有以下属性

属性 描述

id

分配给通知的唯一标识符。对于增量快照通知,id 与发送的 execute-snapshot 信号相同。

aggregate_type

与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用一个聚合。

type

提供 aggregate_type 字段中指定的事件的状态信息。

additional_data

一个 Map<String,String>,包含有关通知的详细信息。

让我们花点时间看看如何通过 JMX 通道发送增量快照并接收有关其进度的通知。

通过 JMX 通道发送增量快照信号

在此示例中,我们将使用带有 PostgreSQL 数据库的 Debezium Docker 镜像。

我们可以使用以下 Docker Compose 文件启动所有必需的服务

version: '2'
services:
  zookeeper:
    container_name: zookeeper
    image: quay.io/debezium/zookeeper:2.4
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    container_name: kafka
    image: quay.io/debezium/kafka:2.4
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    container_name: postgres
    image: quay.io/debezium/example-postgres:2.4
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  connect:
    container_name: connect
    image: quay.io/debezium/connect:2.4
    ports:
      - 8083:8083
      - 9012:9012 (1)
      - 8778:8778
    links:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - JMXPORT=9012 (2)
      - JMXHOST=0.0.0.0 (3)
      - ENABLE_JOLOKIA=true
1 这将公开用于连接到 JMX 服务器的端口 9012
2 启用 JMX 并指定将用于 JMX 的端口号。该值用于指定 JVM 参数 -Dcom.sun.management.jmxremote.port=$JMX_PORT
3 Docker 主机的 IP 地址或可解析的主机名,JMX 使用它来构造发送到 JMX 客户端的 URL。localhost 或 127.0.0.1 值将不起作用。通常可以使用 0.0.0.0。该值用于指定 JVM 参数 -Djava.rmi.server.hostname=$JMXHOST

将文件保存为 debezium.yaml 后,我们可以使用以下命令启动所有服务

docker compose -f debezium.yaml up -d

输出将类似于此

[+] Running 5/5
 ✔ Network deploy_default        Created                                                                                                                                                                                           0.1s
 ✔ Container deploy-zookeeper-1  Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-postgres-1   Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-kafka-1      Started                                                                                                                                                                                           0.1s
 ✔ Container deploy-connect-1    Started

现在我们可以通过执行以下命令来检查所有服务是否已启动并运行

docker ps

输出应类似于此

CONTAINER ID   IMAGE                            COMMAND                  CREATED         STATUS         PORTS                                                                              NAMES
f1d49fb79dba   quay.io/debezium/connect:2.4                "/docker-entrypoint.…"   3 seconds ago   Up 2 seconds   0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp   deploy-connect-1
e164b2651fbf   quay.io/debezium/kafka:2.4       "/docker-entrypoint.…"   3 seconds ago   Up 2 seconds   0.0.0.0:9092->9092/tcp                                                             deploy-kafka-1
e61116f22f9d   quay.io/debezium/example-postgres:2.4    "docker-entrypoint.s…"   4 seconds ago   Up 2 seconds   0.0.0.0:5432->5432/tcp                                                             deploy-postgres-1
ccb502882928   quay.io/debezium/zookeeper:2.4   "/docker-entrypoint.…"   4 seconds ago   Up 2 seconds   0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp             deploy-zookeeper-1

此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "dbserver1",
    "snapshot.mode": "NEVER",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "signal.enabled.channels": "source,jmx", (1)
    "signal.data.collection": "inventory.debezium_signal", (2)
    "notification.enabled.channels": "jmx"
  }
}
1 此配置启用了 sourcejmx 通道。即使我们只想使用 JMX 发送信号来执行增量快照,仍然需要 source 信号,因为 Debezium 需要使用信号表来为事件去重设置数据库日志的水印。
2 设置用于信号的表
暂时不用担心 notification.enabled.channels 属性。稍后我们将深入探讨它。

将此配置保存到名为 postgres-jmx.json 的文件中后,我们可以注册它。

要注册连接器,我们可以使用 curl 调用 Kafka Connect API

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.server.id":"184054","database.dbname":"postgres","topic.prefix":"dbserver1","snapshot.mode":"NEVER","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","signal.enabled.channels":"source,jmx","signal.data.collection":"inventory.debezium_signal","notification.enabled.channels":"log,sink,jmx","notification.sink.topic.name":"io.debezium.notification"}}'

或者我建议使用 kcctl 工具与 Kafka Connect 进行交互。它是一个现代且直观的 Kafka Connect 命令行客户端。

首先,我们需要创建一个配置上下文来连接 Kafka Connect

kcctl config set-context local --cluster https://:8083

然后,我们可以通过运行以下命令来注册连接器

kcctl apply -f postgres-jmx.json

我们现在可以获取 connect 容器的日志

docker logs connect

并检查连接器是否已开始流式传输事件

INFO   Postgres|dbserver1|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]

准备数据库以进行增量快照

由于增量快照需要定义 signal.data.collection,因此我们需要在我们的 postgres 数据库上创建信号表。

在使用具有 GTID 和 read.only 设置为 true 的 MySQL 时,不需要数据集合。

要创建信号表,我们需要连接到我们的 postgres 实例。我们可以使用 postgres 容器内的 psql 客户端。

docker exec -it postgres bash

进入容器后,我们可以使用以下命令连接到 postgres 实例

psql -h localhost -d postgres -U postgres
密码是 postgres

然后我们可以检查 inventory 架构中是否已经有一些表

\dt inventory.*

命令应返回类似以下内容:

                List of relations
  Schema   |       Name       | Type  |  Owner
-----------+------------------+-------+----------
 inventory | customers        | table | postgres
 inventory | geom             | table | postgres
 inventory | orders           | table | postgres
 inventory | products         | table | postgres
 inventory | products_on_hand | table | postgres
 inventory | spatial_ref_sys  | table | postgres
(6 rows)

我们需要使用以下命令创建信号表

CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);

发送增量快照信号

我们必须连接到 JMX 服务器以通过 JMX 通道发送信号。我们使用 jmxterm 客户端,因此在下载它之后,我们可以运行它

java -jar jmxterm-1.0.4-uber.jar (1)

open localhost:9012 (2)

beans -d debezium.postgres (3)

run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {"data-collections":["inventory.orders"],"type":"INCREMENTAL"} (4)
1 运行 jmxterm 客户端
2 打开到 JMX 服务器的连接
3 搜索 debezium.postgres 域下的 bean
4 执行 signal 操作以执行 inventory.orders 表的增量快照。

检查数据

之后,我们想检查 orders 表中的所有数据是否已正确捕获到其对应的 Kafka 主题中。

我们可以使用以下命令进入 Kafka 容器

docker exec -it kafka bash

进入容器后,我们可以使用以下命令获取 dbserver1.inventory.orders 主题中的所有消息

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning

输出应类似于此

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10001,
      "order_date": 16816,
      "purchaser": 1001,
      "quantity": 1,
      "product_id": 102
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605203,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10002,
      "order_date": 16817,
      "purchaser": 1002,
      "quantity": 2,
      "product_id": 105
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10003,
      "order_date": 16850,
      "purchaser": 1002,
      "quantity": 2,
      "product_id": 106
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "int32",
            "optional": false,
            "name": "io.debezium.time.Date",
            "version": 1,
            "field": "order_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "purchaser"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "quantity"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "product_id"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.orders.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.orders.Envelope",
    "version": 1
  },
  "payload": {
    "before": null,
    "after": {
      "id": 10004,
      "order_date": 16852,
      "purchaser": 1003,
      "quantity": 1,
      "product_id": 107
    },
    "source": {
      "version": "2.4.0-SNAPSHOT",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1695631605204,
      "snapshot": "incremental",
      "db": "postgres",
      "sequence": "[\"34837776\",\"34837776\"]",
      "schema": "inventory",
      "table": "orders",
      "txId": null,
      "lsn": null,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1695631605204,
    "transaction": null
  }
}

就是这样!我们已经使用 JMX 通道发送了增量快照信号。

通过 JMX 通道监控增量快照进度

由于我们已经执行了增量快照,现在可以通过 JMX 通道读取 Debezium 生成的通知。

我们使用了以下配置来注册我们的连接器

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "dbserver1",
    "snapshot.mode": "NEVER",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "signal.enabled.channels": "source,jmx",
    "signal.data.collection": "inventory.debezium_signal",
    "notification.enabled.channels": "jmx" (1)
  }
}
1 此配置启用了 jmx 通知通道。

要访问通知,我们需要再次连接到 JMX 服务器。因此,就像我们为信号所做的一样,我们将使用 jmxterm

java -jar jmxterm-1.0.4-uber.jar (1)

open localhost:9012 (2)

beans -d debezium.postgres (3)

get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications (4)
1 运行 jmxterm 客户端
2 打开到 JMX 服务器的连接
3 搜索 debezium.postgres 域下的 bean
4 获取通知。

您应该会看到以下输出

#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
   };
  aggregateType = Initial Snapshot;
  id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
  type = SKIPPED;
 }, (1)
{
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = STARTED;
 }, (2)
{
  additionalData = {
    ( current_collection_in_progress ) = {
      key = current_collection_in_progress;
      value = inventory.orders;
     };
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( maximum_key ) = {
      key = maximum_key;
      value = 10004;
     };
    ( last_processed_key ) = {
      key = last_processed_key;
      value = 10004;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = IN_PROGRESS;
 }, (3)
{
  additionalData = {
    ( scanned_collection ) = {
      key = scanned_collection;
      value = inventory.orders;
     };
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
    ( total_rows_scanned ) = {
      key = total_rows_scanned;
      value = 4;
     };
    ( status ) = {
      key = status;
      value = SUCCEEDED;
     };
    ( data_collections ) = {
      key = data_collections;
      value = inventory.orders;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = TABLE_SCAN_COMPLETED;
 }, (4)
{
  additionalData = {
    ( connector_name ) = {
      key = connector_name;
      value = dbserver1;
     };
   };
  aggregateType = Incremental Snapshot;
  id = 12345;
  type = COMPLETED;
 } (5)
];
1 这是来自初始快照的通知,状态为 SKIPPED,因为我们的连接器配置为 "snapshot.mode": "NEVER"
2 这是关于增量快照开始的通知
3 此通知表明表 inventory.orders 快照正在进行中,并提供了有关已处理的最后一个键和最大键的有用信息。在此示例中,我们只有一个 in progress 通知,但根据您的表大小和 snapshot.fetch.size,您可能会获得更多。
4 此通知表明特定表的快照已完成,并提供了有关已处理的总行数的 P信息。
5 这是我们为此示例的最后一个通知,它表明整个增量快照过程已完成。
JMX 还提供了生成自己的通知的可能性。Debezium 也将生成这些通知。根据您的 JMX 客户端,您可以订阅这些通知,因此您可以立即收到它们,而无需轮询 Notification bean。

利用 Jolokia 进行基于 REST 的信号和通知

Jolokia 是一个强大的工具,允许您与 JMX 服务器进行交互并将其通过 REST 公开。使用它,我们可以通过 REST 与 Debezium 进行交互,利用信号和通知 JMX bean。通过这种方式,您可以无缝地发送信号和接收通知,并使用更熟悉的 REST API。

要启用 Jolokia,我们需要在我们的 Kafka Connect 容器中启用其代理。

这是我们示例中使用的 Docker Compose 文件

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:2.4
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:2.4
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    image: quay.io/debezium/example-postgres:2.4
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  connect:
    image: quay.io/debezium/connect:2.4
    ports:
      - 8083:8083
      - 9012:9012
      - 8778:8778 (1)
    links:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - JMXPORT=9012
      - JMXHOST=0.0.0.0
      - ENABLE_JOLOKIA=true (2)
1 将公开 Jolokia 代理使用的端口
2 这将启用我们测试镜像中已有的 Jolokia 代理。如果您想在自己的安装中启用该代理,请查看 官方文档

通过 Jolokia 发送信号

要通过 Jolokia 发送信号,我们可以向 Jolokia 端点发送一个 HTTP POST 请求,其中包含所需的信号和参数。

为了继续我们的增量快照示例,要触发它,您可以运行以下命令

curl -X POST 'https://:8778/jolokia/exec' -d '{"type":"EXEC","mbean":"debezium.postgres:context=signals,server=dbserver1,type=management","operation":"signal","arguments":["12345","execute-snapshot","{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"]}' | jq

这应该是输出

{
  "request": {
    "mbean": "debezium.postgres:context=signals,server=dbserver1,type=management",
    "arguments": [
      "12345",
      "execute-snapshot",
      "{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"
    ],
    "type": "exec",
    "operation": "signal"
  },
  "value": null,
  "timestamp": 1695651387,
  "status": 200
}

通过 Jolokia 接收通知

Jolokia 还允许您使用 HTTP GET 请求从 Debezium 获取通知。

curl -X GET 'https://:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications' | jq

这应该是输出

{
  "request": {
    "mbean": "debezium.postgres:context=notifications,server=dbserver1,type=management",
    "attribute": "Notifications",
    "type": "read"
  },
  "value": [
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "b20bec8d-f21f-4d74-bb75-cdd7f4c7d933",
      "type": "SKIPPED",
      "aggregateType": "Initial Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "STARTED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "last_processed_key": "10004",
        "current_collection_in_progress": "inventory.orders",
        "connector_name": "dbserver1",
        "maximum_key": "10004",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "IN_PROGRESS",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "scanned_collection": "inventory.orders",
        "connector_name": "dbserver1",
        "total_rows_scanned": "4",
        "status": "SUCCEEDED",
        "data_collections": "inventory.orders"
      },
      "id": "12345",
      "type": "TABLE_SCAN_COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "12345",
      "type": "COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "STARTED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "last_processed_key": "109",
        "current_collection_in_progress": "inventory.products",
        "connector_name": "dbserver1",
        "maximum_key": "109",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "IN_PROGRESS",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "scanned_collection": "inventory.products",
        "connector_name": "dbserver1",
        "total_rows_scanned": "9",
        "status": "SUCCEEDED",
        "data_collections": "inventory.products"
      },
      "id": "12345",
      "type": "TABLE_SCAN_COMPLETED",
      "aggregateType": "Incremental Snapshot"
    },
    {
      "additionalData": {
        "connector_name": "dbserver1"
      },
      "id": "12345",
      "type": "COMPLETED",
      "aggregateType": "Incremental Snapshot"
    }
  ],
  "timestamp": 1695652278,
  "status": 200
}

您可以看到,现在我们还收到了关于 inventory.products 表增量快照的通知,该快照是通过 REST API 发送的

结论

在我们关于 Debezium 信号和通知系列的第三部分中,我们学习了如何使用 JMX 和 Jolokia 启用和管理信号和通知。信号允许您动态控制 Debezium 的行为,而通知让您随时了解关键事件。通过利用这些功能以及 Jolokia,您可以有效地管理、监控和交互您的数据流工作流,确保您始终控制 Debezium。

Fiore Mario Vitale

Mario 活跃于开源社区,为多个项目做出了贡献,并深度参与 Debezium (分布式变更数据捕获平台) 的开发。在他的职业生涯中,他积累了在受数据影响的事件驱动架构方面的丰富经验。在他的整个职业生涯中,Mario 主要专注于数据密集型软件和产品开发,这提高了他对开发者体验和数据驱动应用程序的敏感度。除了他的职业追求,Mario 在技术和个人兴趣的交汇处找到了自己的舒适区。他喜欢拍照,尤其擅长捕捉美好的瞬间。他对赛车运动和比赛也充满热情。不写代码的时候,您经常会发现他在户外骑山地自行车探索,以此来满足他对冒险的热情。

   


关于 Debezium

Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。

参与进来

我们希望您觉得 Debezium 有趣且有用,并希望尝试一下。在 Twitter @debezium 上关注我们,在 Zulip 上与我们聊天,或加入我们的 邮件列表 与社区交流。所有代码都在 GitHub 上开源,因此请在本地构建代码,帮助我们改进现有连接器并添加更多连接器。如果您发现问题或有改进 Debezium 的想法,请告诉我们或 记录一个问题

版权所有 © Debezium 及其作者。保留所有权利。有关我们的商标详情,请访问我们的 商标政策商标列表。第三方商标属于其各自所有者,在此提及并不表示任何认可或关联。
×