本文是三部分系列文章的一部分,旨在探讨如何使用 Debezium 和 Oracle LogMiner 从 Oracle 数据库摄取变更。如果您错过了第一部分,可以在 这里 查看。
在第二部分中,我们将基于第一部分的内容,使用 Zookeeper、Kafka 和 Kafka Connect 部署 Oracle 连接器。我们将讨论连接器的各种配置选项以及它们的重要性。最后,我们将实际演示连接器如何工作!
设置 Kafka Connect 和先决条件
为了使用 Debezium,需要启动三个独立的服务
我们将使用 Docker 容器来运行上述服务。使用独立的容器可以简化部署过程,以便您能够看到 Debezium 的运行。此外,我们还将下载 Oracle JDBC 驱动程序,并将其作为 Kafka Connect 容器的一部分进行挂载。
| 在生产环境中使用多个服务实例可提供性能、可靠性和容错能力。部署通常会涉及像 OpenShift 或 Kubernetes 这样的平台来管理多个容器,或者您可以使用专用硬件并手动管理。 在本博文中,为了保持简单,我们将使用每个服务的单个实例。 |
| Zookeeper 和 Kafka 容器是临时的。通常,会将卷挂载到主机上,以便当容器停止时,容器管理的数据仍然存在。为了简单起见,我们跳过此步骤,这样当容器停止时,数据就会丢失。 |
先决条件:启动 Zookeeper
Zookeeper 服务是启动的第一个服务。Kafka 代理使用 Zookeeper 来处理 Kafka 代理的领导者选举,并管理集群内的服务发现,以便每个代理都知道当同级加入或离开时,或者当代理终止时,以及对于给定的主题/分区元组,谁是新领导者。
打开一个新的终端窗口并运行以下命令
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 \
quay.io/debezium/zookeeper:1.9 zookeeper 容器以交互模式启动,并在停止时销毁。该容器的名称为 zookeeper,这在启动后续容器时很重要。
先决条件:启动 Kafka
Kafka 服务是必须启动的第二个服务,它依赖于 Zookeeper 服务。Debezium 会生成更改事件,发送到由 Kafka 代理管理的主题。
打开一个新的终端窗口并运行以下命令
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper \
quay.io/debezium/kafka:1.9 kafka 容器以交互模式启动,并在停止时销毁。该容器的名称为 kafka,这在启动后续容器时很重要。此外,kafka 服务还链接到 zookeeper 服务,这意味着规范名称 zookeeper 将解析为运行 zookeeper 服务的容器。
先决条件:下载 Oracle JDBC 驱动程序
Debezium Kafka Connect 镜像不包含 Oracle JDBC 驱动程序。要使用 Debezium for Oracle,必须手动下载 JDBC 驱动程序并将其挂载到 Debezium Kafka Connect 镜像中。
导航到 Oracle Database JDBC 驱动程序 下载页面。在本文发布时,最新的 Oracle 数据库是 Oracle 21,因此请点击 Oracle 21c 部分下的 ojdbc8.jar 链接。下载的 jar 将在下一部分中使用,将驱动程序添加到 Debezium 的 Kafka Connect 容器的基镜像中。
先决条件:启动 Kafka Connect
Kafka Connect 服务是必须启动的第三个也是最后一个服务,它依赖于 Kafka 服务。Kafka Connect 负责管理所有连接器及其相关工作负载,并且是负责在我们将很快部署 Debezium Connector for Oracle 时运行它的运行时环境。
打开一个新的终端窗口并运行以下命令
docker run -it --rm --name connect -p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link kafka:kafka \
--link dbz_oracle21:dbz_oracle21 \
-v /path/to/ojdbc8.jar:/kafka/libs/ojdbc8.jar \
quay.io/debezium/connect:1.9 connect 容器以交互模式启动,并在停止时销毁。该容器的名称为 connect,并且有几个环境变量控制着几个必需主题的命名以及一些必需的配置参数。此外,connect 容器链接到 kafka 容器,这意味着规范名称 kafka 将解析为运行 kafka 代理服务的容器。
| 与之前的容器不同,我们使用 local-path 代表 |
创建一些初始测试数据
如果在本系列第一部分中创建的 Oracle 数据库使用了 Oracle 容器注册表镜像,则数据库中不存在任何初始数据。虽然这不一定会造成问题,但我们理想情况下希望在部署 Oracle 连接器时快照一些数据;因此,在部署之前必须存在一些初始数据。
在一个新的终端中,让我们使用 SQL*Plus 连接到数据库,并创建一个带有初始数据的新表。以下命令使用了公共用户,连接到可插拔数据库 ORCLPDB1。当连接到具有要捕获的表的现有环境时,您可以安全地跳过此步骤。
docker exec -it -e ORACLE_SID=ORCLPDB1 dbz_oracle21 sqlplus c##dbzuser@ORCLPDB1 连接后,使用以下 SQL 创建一个表和一些初始数据
CREATE TABLE customers (id number(9,0) primary key, name varchar2(50));
INSERT INTO customers VALUES (1001, 'Salles Thomas');
INSERT INTO customers VALUES (1002, 'George Bailey');
INSERT INTO customers VALUES (1003, 'Edward Walker');
INSERT INTO customers VALUES (1004, 'Anne Kretchmar');
COMMIT; 默认情况下,重做日志仅捕获 CUSTOMERS 表中更改的少量信息,因为补充日志记录仅在数据库级别设置。
如果您熟悉 PostgreSQL 的 REPLICA IDENTITY 或 MySQL 的 binlog_format,Oracle 提供了一个类似的机制,称为表级补充日志记录,我们在本系列第一部分中提到了它。表级补充日志记录控制用户修改行时在重做日志中捕获的列。将表的补充日志级别设置为 (ALL) COLUMNS 可确保 Oracle 在重做日志中捕获与 INSERT、UPDATE 和 DELETE 操作相关的更改。
使用以下 SQL 设置表的补充日志级别
ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; | 如果捕获表的补充日志级别设置不正确,连接器将记录一个警告,告知您存在问题,以便您可以调整表的设置来捕获所有更改。 |
值得注意的是,虽然此示例使用与连接器连接相同的用户帐户创建此 CUSTOMERS 表,但连接器使用的用户与 Oracle 数据库中表的拥有者不同并非不常见。在这种情况下,连接器用户必须具有读取捕获表的权限,这需要每个表的 SELECT 权限。
部署 Oracle 连接器
我们现在已准备好部署 Debezium Oracle 连接器。在向 Kafka Connect 注册连接器之前,让我们深入了解一下配置。
以下是我们将在本示例中使用的配置样本
{
"name": "customers-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "dbz_oracle21",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.server.name": "server1",
"table.include.list": "C##DBZUSER.CUSTOMERS",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes"
}
} 让我们深入了解每个配置选项的含义。
name (名称)-
这是分配给连接器的名称,在 Kafka Connect 集群中必须是唯一的。
connector.class-
这是已部署连接器的类实现。每个 Debezium 源连接器都有一个唯一的类名来标识正在部署的连接器。
tasks.max-
这是将分配给 Kafka Connect 中连接器部署的最大任务数。大多数 Debezium 连接器按顺序读取源数据库中的更改,因此,值
1通常是有意义的。 database.hostname-
这是数据库主机名或 IP 地址。由于我们在启动 Kafka Connect 时指定了链接到
dbz_oracle21容器,因此我们可以在此处使用该名称来标识运行 Oracle 数据库的容器。如果您在其他主机上拥有预先存在的 Oracle 环境,请在此配置属性中指定该主机的名称。 database.port-
这是数据库用于监听连接的端口。Oracle 的默认端口是
1521,但数据库管理员可以将其配置为任何可用端口。如果您连接到预先存在的 Oracle 实例,请使用数据库使用的端口。 database.user-
这是用于 JDBC 连接的数据库用户帐户。这应该是我们在第一部分中创建的公共用户
c##dbzuser。如果您连接到不支持多租户的环境,这将是您在根数据库中创建的用户,不带公共用户前缀。 database.password-
这是数据库用户帐户的密码。
database.dbname-
这是连接器通信的数据库服务。无论是否启用多租户,这始终是单一或根容器数据库。
database.pdb.name-
这是可选的可插拔数据库系统标识符。连接到支持多租户的数据库并引用 PDB 时,必须提供此属性。如果省略此字段,连接器将假定数据库不支持多租户。
database.server.name-
连接器创建的所有主题的前缀。此值在 Kafka Connect 集群中的所有主题部署中必须是唯一的。
table.include.list-
一个由逗号分隔的正则表达式或简单表名列表,格式为
<schema>.<table>,用于标识连接器将捕获哪些表。 database.history.kafka.bootstrap.servers-
这是数据库历史记录主题将存储的 Kafka 代理的 URL。由于我们在启动 Kafka Connect 时指定了链接到
kafka容器,因此我们可以在此处使用该名称指向代理及其端口。 database.history.kafka.topic-
这是将存储数据库模式历史记录的主题的名称。当连接器重新启动时,将从该主题恢复此主题,并使用此主题填充内存中的关系模型。
| 除 PostgreSQL 之外,所有 Debezium 连接器都使用模式历史记录来存储所有表的模式。对于 Oracle 数据库来说,这通常不是理想的选择,尤其是在未启用多租户的情况下部署连接器时。 要将存储限制为仅包含包含列表中的表,请将连接器的配置修改为将 |
有关其他连接器属性的更多信息,您可以查阅 Oracle 文档以获取更多详细信息。
要部署连接器,请将以上配置保存到一个名为 register-oracle.json 的文件中。现在,打开一个新的终端窗口,并使用 curl 命令将连接器注册到 Kafka Connect
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \
localhost:8083/connectors \
-d @register-oracle.json 如果注册成功,正在运行 connect 容器的终端将开始对 CUSTOMERS 表中的数据进行快照。我们还可以通过使用 Kafka 控制台消费者工具并读取主题内容到本地终端来确认数据是否存在于 Kafka 中。
要检查主题的内容,请使用与注册连接器相同的终端并执行以下命令
docker exec -it kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server 0.0.0.0:9092 \
--from-beginning \
--property print.key=true \
--topic server1.C__DBZUSER.CUSTOMERS | 由于主题命名策略会自动确保主题名称与 Avro 兼容(Avro 不允许使用哈希符号),因此该主题会将模式名称从 |
上述命令的输出应与以下内容类似
{
"schema":{
...
},
"payload":{
"before":null,
"after":{
"ID":"1001",
"NAME":"Salles Thomas"
},
"source":{
"version":"1.9.6.Final",
"connector":"oracle",
"name":"server1",
"ts_ms":1665102121000,
"snapshot":"true",
"db":"ORCLPDB1",
"sequence":null,
"schema":"C##DBZUSER",
"table":"CUSTOMERS",
"txId":null,
"scn":"2868546",
"commit_scn":null,
"lcr_position":null,
"rs_id":null,
"ssn":0,
"redo_thread":null
},
"op":"r",
"ts_ms":1665102126961,
"transaction":null
}
}
... 您现在可以使用创建初始测试数据的 SQLPlus 终端在 CUSTOMERS 表中 INSERT、UPDATE 或 DELETE 记录。您将在当前正在跟踪 server1.C__DBZUSER.CUSTOMERS 主题的终端中看到相应的更改事件。
| 请注意,SQLPlus 默认不启用 |
结论
在本系列第一部分中,我们讨论了 Oracle 是什么,为什么它在数据库领域如此受欢迎,以及如何安装和配置数据库。在本系列的这一部分中,我们讨论了如何安装所有先决服务,包括 Zookeeper、Apache Kafka 和 Apache Kafka Connect。此外,我们还部署了一个示例 Oracle 连接器,捕获了 CUSTOMERS 表的更改。
在本系列的下一部分中,我将讨论性能、如何监控连接器,以及最重要的指标及其重要性。我们甚至可能会构建一个小型仪表板来显示这些指标。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。