欢迎阅读我们关于 Debezium 信号和通知系列的第三篇文章。在本文中,我们将继续探索 Debezium 信号和通知。特别是,我们将深入探讨如何使用 JMX 通道启用和管理这些功能。
我们还将探讨如何通过利用 Jolokia 的 REST API 发送信号和获取通知。
通过 JMX 与 Debezium 交互
JMX 代表 Java Management Extensions,是一项用于管理和监控 Java 应用程序的 Java 技术。它提供了一种标准化的方法,可以使用各种管理工具和客户端来监控应用程序性能、配置设置以及与运行中的 Java 应用程序进行交互。JMX 对于管理和监控复杂、分布式和企业级 Java 应用程序特别有用。
通过 JMX 通道启用信号
Debezium 中的信号是指在正常执行期间触发操作以执行操作。如前几篇文章所述,Debezium 提供了不同的开箱即用的信号通道。在本文中,我们将重点介绍 JMX 通道。
要开始使用 JMX 信号通道,我们需要
-
在 Kafka Connect 服务上启用 JMX 服务器
-
将
jmx添加到signal.enabled.channels连接器配置属性中 -
使用 JMX 客户端连接到 JMX 服务器以发送信号。
Debezium 公开名为 debezium.<connector-type>:type=management,context=signals,server=<server> 的信号 MBean。此 bean 公开了 signal 操作,该操作接受三个参数
-
p0:信号的 ID。
-
p1:信号的类型,例如 execute-snapshot。
-
p2:一个 JSON 数据字段,其中包含关于指定信号类型的其他信息。
通过 JMX 通道启用通知
通知对于告知您 Debezium 中发生的情况至关重要。通过 JMX 通道访问通知允许您轻松监控 Debezium,例如,增量快照的进度。
要开始使用 JMX 通知通道,我们需要
-
在 Kafka Connect 服务上启用 JMX 服务器
-
将
jmx添加到notification.enabled.channels连接器配置属性中 -
使用 JMX 客户端连接到 JMX 服务器以访问通知。
Debezium 公开名为 debezium.<connector-type>:type=management,context=notifications,server=<server> 的通知 MBean。此 bean 提供了一个 Notification bean,其中包含一系列 JMX CompositeData 类型,具有以下属性
| 属性 | 描述 |
|---|---|
id | 分配给通知的唯一标识符。对于增量快照通知, |
aggregate_type | 与通知相关的聚合根的数据类型。在领域驱动设计中,导出的事件应始终引用一个聚合。 |
type | 提供 |
additional_data | 一个 Map<String,String>,包含有关通知的详细信息。 |
让我们花点时间看看如何通过 JMX 通道发送增量快照并接收有关其进度的通知。
通过 JMX 通道发送增量快照信号
在此示例中,我们将使用带有 PostgreSQL 数据库的 Debezium Docker 镜像。
我们可以使用以下 Docker Compose 文件启动所有必需的服务
version: '2'
services:
zookeeper:
container_name: zookeeper
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name: kafka
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
container_name: postgres
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
container_name: connect
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012 (1)
- 8778:8778
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012 (2)
- JMXHOST=0.0.0.0 (3)
- ENABLE_JOLOKIA=true | 1 | 这将公开用于连接到 JMX 服务器的端口 9012 |
| 2 | 启用 JMX 并指定将用于 JMX 的端口号。该值用于指定 JVM 参数 -Dcom.sun.management.jmxremote.port=$JMX_PORT。 |
| 3 | Docker 主机的 IP 地址或可解析的主机名,JMX 使用它来构造发送到 JMX 客户端的 URL。localhost 或 127.0.0.1 值将不起作用。通常可以使用 0.0.0.0。该值用于指定 JVM 参数 -Djava.rmi.server.hostname=$JMXHOST |
将文件保存为 debezium.yaml 后,我们可以使用以下命令启动所有服务
docker compose -f debezium.yaml up -d 输出将类似于此
[+] Running 5/5
✔ Network deploy_default Created 0.1s
✔ Container deploy-zookeeper-1 Started 0.1s
✔ Container deploy-postgres-1 Started 0.1s
✔ Container deploy-kafka-1 Started 0.1s
✔ Container deploy-connect-1 Started 现在我们可以通过执行以下命令来检查所有服务是否已启动并运行
docker ps 输出应类似于此
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f1d49fb79dba quay.io/debezium/connect:2.4 "/docker-entrypoint.…" 3 seconds ago Up 2 seconds 0.0.0.0:8083->8083/tcp, 0.0.0.0:8778->8778/tcp, 0.0.0.0:9012->9012/tcp, 9092/tcp deploy-connect-1
e164b2651fbf quay.io/debezium/kafka:2.4 "/docker-entrypoint.…" 3 seconds ago Up 2 seconds 0.0.0.0:9092->9092/tcp deploy-kafka-1
e61116f22f9d quay.io/debezium/example-postgres:2.4 "docker-entrypoint.s…" 4 seconds ago Up 2 seconds 0.0.0.0:5432->5432/tcp deploy-postgres-1
ccb502882928 quay.io/debezium/zookeeper:2.4 "/docker-entrypoint.…" 4 seconds ago Up 2 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp deploy-zookeeper-1 此时所有服务都已启动并运行,因此我们可以通过以下配置注册连接器
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"snapshot.mode": "NEVER",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"signal.enabled.channels": "source,jmx", (1)
"signal.data.collection": "inventory.debezium_signal", (2)
"notification.enabled.channels": "jmx"
}
} | 1 | 此配置启用了 source 和 jmx 通道。即使我们只想使用 JMX 发送信号来执行增量快照,仍然需要 source 信号,因为 Debezium 需要使用信号表来为事件去重设置数据库日志的水印。 |
| 2 | 设置用于信号的表 |
| 暂时不用担心 notification.enabled.channels 属性。稍后我们将深入探讨它。 |
将此配置保存到名为 postgres-jmx.json 的文件中后,我们可以注册它。
要注册连接器,我们可以使用 curl 调用 Kafka Connect API
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.server.id":"184054","database.dbname":"postgres","topic.prefix":"dbserver1","snapshot.mode":"NEVER","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","signal.enabled.channels":"source,jmx","signal.data.collection":"inventory.debezium_signal","notification.enabled.channels":"log,sink,jmx","notification.sink.topic.name":"io.debezium.notification"}}' 或者我建议使用 kcctl 工具与 Kafka Connect 进行交互。它是一个现代且直观的 Kafka Connect 命令行客户端。
首先,我们需要创建一个配置上下文来连接 Kafka Connect
kcctl config set-context local --cluster https://:8083 然后,我们可以通过运行以下命令来注册连接器
kcctl apply -f postgres-jmx.json 我们现在可以获取 connect 容器的日志
docker logs connect 并检查连接器是否已开始流式传输事件
INFO Postgres|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] 准备数据库以进行增量快照
由于增量快照需要定义 signal.data.collection,因此我们需要在我们的 postgres 数据库上创建信号表。
在使用具有 GTID 和 read.only 设置为 true 的 MySQL 时,不需要数据集合。 |
要创建信号表,我们需要连接到我们的 postgres 实例。我们可以使用 postgres 容器内的 psql 客户端。
docker exec -it postgres bash 进入容器后,我们可以使用以下命令连接到 postgres 实例
psql -h localhost -d postgres -U postgres | 密码是 postgres |
然后我们可以检查 inventory 架构中是否已经有一些表
\dt inventory.* 命令应返回类似以下内容:
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows) 我们需要使用以下命令创建信号表
CREATE TABLE inventory.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL); 发送增量快照信号
我们必须连接到 JMX 服务器以通过 JMX 通道发送信号。我们使用 jmxterm 客户端,因此在下载它之后,我们可以运行它
java -jar jmxterm-1.0.4-uber.jar (1)
open localhost:9012 (2)
beans -d debezium.postgres (3)
run -b debezium.postgres:context=signals,server=dbserver1,type=management signal 12345 execute-snapshot {"data-collections":["inventory.orders"],"type":"INCREMENTAL"} (4) | 1 | 运行 jmxterm 客户端 |
| 2 | 打开到 JMX 服务器的连接 |
| 3 | 搜索 debezium.postgres 域下的 bean |
| 4 | 执行 signal 操作以执行 inventory.orders 表的增量快照。 |
检查数据
之后,我们想检查 orders 表中的所有数据是否已正确捕获到其对应的 Kafka 主题中。
我们可以使用以下命令进入 Kafka 容器
docker exec -it kafka bash 进入容器后,我们可以使用以下命令获取 dbserver1.inventory.orders 主题中的所有消息
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver1.inventory.orders --from-beginning 输出应类似于此
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10001,
"order_date": 16816,
"purchaser": 1001,
"quantity": 1,
"product_id": 102
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605203,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10002,
"order_date": 16817,
"purchaser": 1002,
"quantity": 2,
"product_id": 105
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10003,
"order_date": 16850,
"purchaser": 1002,
"quantity": 2,
"product_id": 106
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
},
{
"type": "int32",
"optional": false,
"name": "io.debezium.time.Date",
"version": 1,
"field": "order_date"
},
{
"type": "int32",
"optional": false,
"field": "purchaser"
},
{
"type": "int32",
"optional": false,
"field": "quantity"
},
{
"type": "int32",
"optional": false,
"field": "product_id"
}
],
"optional": true,
"name": "dbserver1.inventory.orders.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.orders.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 10004,
"order_date": 16852,
"purchaser": 1003,
"quantity": 1,
"product_id": 107
},
"source": {
"version": "2.4.0-SNAPSHOT",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1695631605204,
"snapshot": "incremental",
"db": "postgres",
"sequence": "[\"34837776\",\"34837776\"]",
"schema": "inventory",
"table": "orders",
"txId": null,
"lsn": null,
"xmin": null
},
"op": "r",
"ts_ms": 1695631605204,
"transaction": null
}
} 就是这样!我们已经使用 JMX 通道发送了增量快照信号。
通过 JMX 通道监控增量快照进度
由于我们已经执行了增量快照,现在可以通过 JMX 通道读取 Debezium 生成的通知。
我们使用了以下配置来注册我们的连接器
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"snapshot.mode": "NEVER",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"signal.enabled.channels": "source,jmx",
"signal.data.collection": "inventory.debezium_signal",
"notification.enabled.channels": "jmx" (1)
}
} | 1 | 此配置启用了 jmx 通知通道。 |
要访问通知,我们需要再次连接到 JMX 服务器。因此,就像我们为信号所做的一样,我们将使用 jmxterm
java -jar jmxterm-1.0.4-uber.jar (1)
open localhost:9012 (2)
beans -d debezium.postgres (3)
get -b debezium.postgres:context=notifications,server=dbserver1,type=management Notifications (4) | 1 | 运行 jmxterm 客户端 |
| 2 | 打开到 JMX 服务器的连接 |
| 3 | 搜索 debezium.postgres 域下的 bean |
| 4 | 获取通知。 |
您应该会看到以下输出
#mbean = debezium.postgres:context=notifications,server=dbserver1,type=management:
Notifications = [ {
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Initial Snapshot;
id = b20bec8d-f21f-4d74-bb75-cdd7f4c7d933;
type = SKIPPED;
}, (1)
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = STARTED;
}, (2)
{
additionalData = {
( current_collection_in_progress ) = {
key = current_collection_in_progress;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( maximum_key ) = {
key = maximum_key;
value = 10004;
};
( last_processed_key ) = {
key = last_processed_key;
value = 10004;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = IN_PROGRESS;
}, (3)
{
additionalData = {
( scanned_collection ) = {
key = scanned_collection;
value = inventory.orders;
};
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
( total_rows_scanned ) = {
key = total_rows_scanned;
value = 4;
};
( status ) = {
key = status;
value = SUCCEEDED;
};
( data_collections ) = {
key = data_collections;
value = inventory.orders;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = TABLE_SCAN_COMPLETED;
}, (4)
{
additionalData = {
( connector_name ) = {
key = connector_name;
value = dbserver1;
};
};
aggregateType = Incremental Snapshot;
id = 12345;
type = COMPLETED;
} (5)
]; | 1 | 这是来自初始快照的通知,状态为 SKIPPED,因为我们的连接器配置为 "snapshot.mode": "NEVER" |
| 2 | 这是关于增量快照开始的通知 |
| 3 | 此通知表明表 inventory.orders 快照正在进行中,并提供了有关已处理的最后一个键和最大键的有用信息。在此示例中,我们只有一个 in progress 通知,但根据您的表大小和 snapshot.fetch.size,您可能会获得更多。 |
| 4 | 此通知表明特定表的快照已完成,并提供了有关已处理的总行数的 P信息。 |
| 5 | 这是我们为此示例的最后一个通知,它表明整个增量快照过程已完成。 |
| JMX 还提供了生成自己的通知的可能性。Debezium 也将生成这些通知。根据您的 JMX 客户端,您可以订阅这些通知,因此您可以立即收到它们,而无需轮询 Notification bean。 |
利用 Jolokia 进行基于 REST 的信号和通知
Jolokia 是一个强大的工具,允许您与 JMX 服务器进行交互并将其通过 REST 公开。使用它,我们可以通过 REST 与 Debezium 进行交互,利用信号和通知 JMX bean。通过这种方式,您可以无缝地发送信号和接收通知,并使用更熟悉的 REST API。
要启用 Jolokia,我们需要在我们的 Kafka Connect 容器中启用其代理。
这是我们示例中使用的 Docker Compose 文件
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:2.4
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.4
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: quay.io/debezium/example-postgres:2.4
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
connect:
image: quay.io/debezium/connect:2.4
ports:
- 8083:8083
- 9012:9012
- 8778:8778 (1)
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- JMXPORT=9012
- JMXHOST=0.0.0.0
- ENABLE_JOLOKIA=true (2) | 1 | 将公开 Jolokia 代理使用的端口 |
| 2 | 这将启用我们测试镜像中已有的 Jolokia 代理。如果您想在自己的安装中启用该代理,请查看 官方文档 |
通过 Jolokia 发送信号
要通过 Jolokia 发送信号,我们可以向 Jolokia 端点发送一个 HTTP POST 请求,其中包含所需的信号和参数。
为了继续我们的增量快照示例,要触发它,您可以运行以下命令
curl -X POST 'https://:8778/jolokia/exec' -d '{"type":"EXEC","mbean":"debezium.postgres:context=signals,server=dbserver1,type=management","operation":"signal","arguments":["12345","execute-snapshot","{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"]}' | jq 这应该是输出
{
"request": {
"mbean": "debezium.postgres:context=signals,server=dbserver1,type=management",
"arguments": [
"12345",
"execute-snapshot",
"{\"data-collections\": [\"inventory.products\"], \"type\": \"INCREMENTAL\"}"
],
"type": "exec",
"operation": "signal"
},
"value": null,
"timestamp": 1695651387,
"status": 200
} 通过 Jolokia 接收通知
Jolokia 还允许您使用 HTTP GET 请求从 Debezium 获取通知。
curl -X GET 'https://:8778/jolokia/read/debezium.postgres:context=notifications,server=dbserver1,type=management/Notifications' | jq 这应该是输出
{
"request": {
"mbean": "debezium.postgres:context=notifications,server=dbserver1,type=management",
"attribute": "Notifications",
"type": "read"
},
"value": [
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "b20bec8d-f21f-4d74-bb75-cdd7f4c7d933",
"type": "SKIPPED",
"aggregateType": "Initial Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "STARTED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"last_processed_key": "10004",
"current_collection_in_progress": "inventory.orders",
"connector_name": "dbserver1",
"maximum_key": "10004",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "IN_PROGRESS",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"scanned_collection": "inventory.orders",
"connector_name": "dbserver1",
"total_rows_scanned": "4",
"status": "SUCCEEDED",
"data_collections": "inventory.orders"
},
"id": "12345",
"type": "TABLE_SCAN_COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "12345",
"type": "COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "STARTED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"last_processed_key": "109",
"current_collection_in_progress": "inventory.products",
"connector_name": "dbserver1",
"maximum_key": "109",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "IN_PROGRESS",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"scanned_collection": "inventory.products",
"connector_name": "dbserver1",
"total_rows_scanned": "9",
"status": "SUCCEEDED",
"data_collections": "inventory.products"
},
"id": "12345",
"type": "TABLE_SCAN_COMPLETED",
"aggregateType": "Incremental Snapshot"
},
{
"additionalData": {
"connector_name": "dbserver1"
},
"id": "12345",
"type": "COMPLETED",
"aggregateType": "Incremental Snapshot"
}
],
"timestamp": 1695652278,
"status": 200
} 您可以看到,现在我们还收到了关于 inventory.products 表增量快照的通知,该快照是通过 REST API 发送的
结论
在我们关于 Debezium 信号和通知系列的第三部分中,我们学习了如何使用 JMX 和 Jolokia 启用和管理信号和通知。信号允许您动态控制 Debezium 的行为,而通知让您随时了解关键事件。通过利用这些功能以及 Jolokia,您可以有效地管理、监控和交互您的数据流工作流,确保您始终控制 Debezium。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。