常见问题解答
- 什么是 Debezium?
- “Debezium”这个名字的由来是什么?
- 什么是变更数据捕获(Change Data Capture)?
- Debezium 可以监控哪些数据库?
- Debezium 的一些用途是什么?
- 为什么 Debezium 是一个分布式系统?
- 我的应用程序可以直接监控单个数据库吗?
- Debezium 平台是什么样的?
- 可以监控多少个数据库?
- Debezium 如何影响源数据库?
- 数据库的事件是如何组织的?
- 为什么事件如此之大?
- 如何使用模式注册表(schema registry)?
- 当应用程序停止或崩溃时会发生什么?
- 当 Debezium 停止或崩溃时会发生什么?
- 当被监控的数据库停止或崩溃时会发生什么?
- 为什么消费应用程序必须预期重复的事件?
- 是什么导致了 MySQL 连接器的间歇性
EventDataDeserializationException
异常? - 什么是 Kafka?
- 什么是 Kafka Connect?
- 使用 Debezium Docker 镜像时如何连接到 Kafka?
- 使用 Connect 镜像的默认内存设置可以处理多大的数据库?
- 如何从二进制表示中检索 DECIMAL 字段?
- 如何更改源数据库的偏移量(offsets)?
- 如何为连接器移除已提交的偏移量?
- 为什么 KSQL 无法打印墓碑事件(tombstone events)?
- 为什么 Debezium MySQL 连接器在进行模式更改时崩溃?
- 如何在每个租户一个数据库模式(database-per-tenant pattern)中降低内存消耗?
- 如何解决偏移量刷新超时(offset flush timeouts)问题?
- 为什么在某些情况下我看不到 DELETE 事件?
- 为什么 Debezium MySQL 连接器无法从 RDS MySQL 只读副本(read replica)消费数据?
- 为什么 Debezium PostgreSQL 连接器导致 WAL 数据库磁盘空间异常消耗?
- 为什么在交换了表中的两个列的内容后,没有创建新的模式版本?
- 如何增大发送到 Kafka 的消息的最大大小?
- 为什么 JSON 消息不包含模式(schema)?
- 如何启用不区分大小写的名称过滤?
- 为什么 MySQL
TIMESTAMP
列的变更事件值在快照和流式传输之间存在差异? - MongoDB 连接器因最大的 BSON 大小错误而失败
- Debezium 引擎应用程序因“无法找到最小快照锁定模式。请检查您的配置”错误而失败
什么是 Debezium?
Debezium 是一组分布式服务,用于捕获数据库中的行级更改,以便您的应用程序能够看到并响应这些更改。Debezium 在事务日志中记录提交到每个数据库表的每个行级更改。每个应用程序只需读取它们感兴趣的事务日志,就能以发生的相同顺序看到所有事件。Debezium 是一组分布式服务,用于捕获数据库中的行级更改,以便您的应用程序能够看到并响应这些更改。Debezium 在事务日志中记录提交到每个数据库表的每个行级更改。每个应用程序只需读取它们感兴趣的事务日志,就能以发生的相同顺序看到所有事件。
“Debezium”这个名字的由来是什么?
该名称是“DBs”(多个数据库的缩写)和许多元素周期表元素名称中使用的“-ium”后缀的组合。快速说出来:“DBs-ium”。如果这对您有帮助,我们发音为“dee-BEE-zee-uhm”。
什么是变更数据捕获(Change Data Capture)?
变更数据捕获(Change Data Capture),简称 CDC,是一个较老的术语,用于描述监控和捕获数据更改的系统,以便其他软件可以响应这些更改。数据仓库通常内置 CDC 支持,因为数据仓库需要随着上游 OLTP 数据库中数据的变化而保持最新。
Debezium 本质上是一个现代的、分布式的开源变更数据捕获平台,最终将支持监控各种数据库系统。
Debezium 可以监控哪些数据库?
Debezium 的最新版本包括对监控 MySQL 数据库服务器、MongoDB 复制集或分片集群、PostgreSQL 服务器、Oracle 服务器(基于 LogMiner 和 XStream)、Db2 服务器、Cassandra 数据库(3.x 和 4.x)以及 SQL Server 数据库的支持。此外,还有正在开发中的 Debezium 连接器,用于 Vitess 服务器和 Google Spanner,截至 Debezium 2.2,它们以预览(孵化)版本发布。
请注意,监控 PostgreSQL 需要在 PostgreSQL 服务器中安装一个扩展(“逻辑解码插件”)。Debezium 与 Decoderbufs(由 Debezium 社区维护)和 pgoutput(由 Postgres 社区维护)配合使用。
如果您的数据库托管在(托管的)云服务上,则必须安装这些插件之一(或者您必须能够自行安装)。否则,您将无法使用 Debezium 监控您的数据库。pgoutput 是 Postgres 10 及更高版本的一部分;是否可以启用它取决于特定数据库的配置。
在 CloudSQL 上,pgoutput 是受支持的扩展,可以安装,如 配置 PostgreSQL 扩展中所述。
在 Amazon RDS 以及 Azure Database for PostgreSQL 上,都可以使用 pgoutput。两者都可以与 Debezium 一起使用。
未来的版本将添加对其他 DBMS 的支持。请参阅我们的 路线图。
Debezium 的一些用途是什么?
Debezium 的主要用途是使应用程序能够几乎立即响应数据库中的数据更改。应用程序可以对插入、更新和删除事件执行任何操作。它们可能使用事件来了解何时从缓存中删除条目。它们可能使用数据更新搜索索引。它们可能使用相同的信息或计算出的信息来更新派生数据存储,例如使用 命令查询责任分离(CQRS)。它们可能发送推送通知给一个或多个移动设备。它们可能聚合更改并生成实体补丁流。它们可能被存储以形成审计日志。它们可能驱动流式查询,例如使用 Apache Flink 或 Kafka Streams。它们可能用于在微服务之间传播数据,例如采用 outbox 模式。
您可以在 2019 年 QCon 旧金山大会的本次演示文稿中了解更多关于 CDC 的用例。
为什么 Debezium 是一个分布式系统?
Debezium 的架构旨在容忍故障和失败,实现这一目标唯一有效的方法是采用分布式系统。Debezium 将监控进程或连接器分布在多台机器上,以便在出现任何问题时可以重新启动连接器。事件被记录并跨多台机器复制,以最大程度地降低信息丢失的风险。
我的应用程序可以直接监控单个数据库吗?
是的。尽管我们建议大多数人使用完整的 Debezium 平台,但单个应用程序也可以 嵌入 Debezium 连接器,以便它可以监控数据库并响应事件。这种方法确实更简单,移动部件更少,但其功能更有限,容错能力也远不如分布式系统。如果您的应用程序需要至少一次(at-least-once)的消息传递保证,请考虑使用完整的分布式系统。
Debezium 平台是什么样的?
正在运行的 Debezium 系统由几个部分组成。一组 Apache Kafka 代理提供了持久化、复制和分区的事务日志,Debezium 在其中记录所有事件,应用程序也从中消费所有事件。Kafka 代理的数量在很大程度上取决于事件的量、正在监控的数据库表的数量以及正在消费事件的应用程序的数量。Kafka 依赖一小组 Zookeeper 节点来管理每个代理的职责。
每个 Debezium 连接器监控一个数据库集群/服务器,连接器被配置并部署到一个 Kafka Connect 服务集群中,该集群确保每个连接器始终运行,即使 Kafka Connect 服务实例离开和加入集群。每个 Kafka Connect 服务集群(又名组)都是独立的,因此每个组织内的组可以管理自己的集群。
所有连接器都将其事件(和其他信息)记录到 Apache Kafka,Kafka 会持久化、复制并为每个表将事件分区到单独的主题中。多个 Kafka Connect 服务集群可以共享一个 Kafka 代理集群,但 Kafka 代理的数量在很大程度上取决于事件的量、正在监控的数据库表的数量以及正在消费事件的应用程序的数量。
应用程序直接连接到 Kafka 并消费相应主题中的事件。
可以监控多少个数据库?
Debezium 可以监控任意数量的数据库。可以部署到单个 Kafka Connect 服务集群的连接器数量取决于事件的量和速率。但是,Debezium 支持多个 Kafka Connect 服务集群,并且如果需要,还可以支持多个 Kafka 集群。
Debezium 如何影响源数据库?
大多数数据库在 Debezium 能够监控它们之前都需要进行配置。例如,MySQL 服务器必须配置为使用行级 binlog,并拥有读取 binlog 的特权用户;Debezium 连接器必须配置正确的信息,包括特权用户。有关详细信息,请参阅特定连接器文档。
Debezium 连接器不将任何信息存储在向上游数据库中。但是,运行连接器可能会给源数据库带来额外负载。
数据库的事件是如何组织的?
大多数连接器会将单个数据库表的所有事件记录到单个主题中。此外,主题中的所有事件都是完全有序的,这意味着所有这些事件的顺序都将被维护。(即使在失败期间事件被重复,应用所有事件后的最终结果也将保持不变。)
例如,一个监控 MySQL 服务器/集群(逻辑名称为“dbserverA”)的 MySQL 连接器会将“Customers”数据库中“Addresses”表的所有更改记录到名为 dbserverA.Customers.Addresses
的主题中。同样,同一数据库中“PaymentMethods”表的所有更改将记录到名为 dbserverA.customers.PaymentMethods
的主题中。
为什么事件如此之大?
Debezium 被设计用于监控上游数据库,并为每个行级更改生成一个或多个相应的事件,这些事件完整地描述了这些更改。但 Debezium 连接器是持续工作的,即使上游数据库中的表结构随时间而改变,其事件也必须是有意义的。对于消费者来说,如果它只需要一次处理一个事件,而不是跟踪事件流整个历史的状态,那么编写起来也更容易。
这意味着每个事件都需要完全自包含:事件的键和值都包含一个包含实际信息的有效负载和一个模式,该模式完整地描述了信息的结构。消费应用程序可以处理每个事件,使用模式来理解该事件中信息的结构,然后正确处理事件的有效负载。消费应用程序可以利用这样一个事实:模式在许多事件中保持不变,只有当模式发生变化时,消费应用程序才可能需要做更多的工作来为更改的结构做准备。
同时,Kafka Connect 服务将连接器的事件序列化并记录到 Kafka 中。JSON 转换器非常通用且非常简单,但它别无选择,只能序列化整个事件信息。因此,以 JSON 表示的事件确实是冗长且庞大的。
然而,有一个替代方案:使用模式注册表。这样,实际的模式信息由注册表管理,而实际的变更事件仅包含注册表中相应模式的 ID。这导致发送到 Kafka 的事件的表示更有效。模式注册表可以与 JSON 或 Avro 等不同格式一起使用。利用 Avro 作为消息格式还有一个额外的优势,即有效负载被序列化成一种非常紧凑的二进制形式。
使用这种方法,Kafka Connect 转换器和模式注册表协同工作,以跟踪每个模式随时间的演变历史。同时,在消费者端,相同的转换器解码事件的紧凑二进制形式,读取该消息使用的模式版本的标识符,如果尚未看到该模式版本,则从模式注册表下载模式,最后使用该模式解码事件的有效负载。同样,一系列事件中的许多事件将共享相同的模式(和模式版本),因此大多数时候转换器都可以简单地将原始的紧凑事件解码为消费者期望的相同模式和有效负载。
如何使用模式注册表(schema registry)?
模式注册表的选项包括 Apicurio API 和模式注册表以及 Confluent 模式注册表。两者都提供了用于在注册表中存储/获取 JSON 和 Avro 模式的转换器。
如果您将 Debezium 连接器部署到 Kafka Connect 工作者服务,只需确保您的注册表的转换器 JAR 文件可用,并配置工作者服务使用正确的转换器。例如,您需要将转换器指向您的 Apicurio 模式注册表。然后,只需将 Debezium 连接器(或者实际上是任何其他 Kafka Connect 连接器)部署到您的工作者服务。请参阅 Avro 序列化,以详细了解如何将 Avro 转换器与 Apicurio 和 Confluent 注册表一起使用。
GitHub 上的 教程示例详细介绍了如何将模式注册表和随附的转换器与 Debezium 一起使用。
我们的 Kafka Connect Docker 镜像将 Avro 转换器作为一种选项。
当应用程序停止或崩溃时会发生什么?
为了消费数据库的变更事件,应用程序会创建一个 Kafka 消费者,该消费者连接到 Kafka 代理并消费与该数据库关联的所有主题的所有事件。消费者被配置为定期记录其在每个主题中的位置(也称为偏移量)。当应用程序正常停止并关闭消费者时,消费者将记录每个主题中最后一个事件的偏移量。当应用程序稍后重新启动时,消费者会查找这些偏移量并开始读取每个主题中的下一个事件。因此,在正常操作场景下,应用程序会看到每个事件恰好一次。
如果应用程序意外崩溃,那么在重新启动后,应用程序的消费者将查找每个主题的上次记录的偏移量,并从每个主题的最后一个偏移量开始消费事件。在大多数情况下,应用程序会看到一些在崩溃前(但在记录偏移量之后)看到的相同事件,然后是它尚未看到的事件。因此,应用程序会看到每个事件至少一次。应用程序可以通过更频繁地记录偏移量来减少看到的重复事件数量,尽管这样做会负面影响客户端的性能和吞吐量。
请注意,Kafka 消费者可以配置为连接并从每个主题的最新偏移量开始读取。这可能导致事件丢失,尽管这对于某些用例是完全可以接受的。
当 Debezium 停止或崩溃时会发生什么?
Debezium 的行为取决于停止或崩溃的组件。如果停止或崩溃的 Kafka 代理数量过多,以至于每个主题分区由少于最低数量的同步副本(in-sync replicas)组成,那么写入这些主题的连接器和从这些主题读取的消费应用程序将简单地阻塞,直到 Kafka 代理可以重新启动或新的代理上线。因此,最低同步副本数量对可用性有很大影响,出于一致性原因,它应始终至少为 1(如果不是 3)。
Kafka Connect 服务被配置为定期记录每个连接器的位置和偏移量。如果其集群中的一个 Kafka Connect 服务实例被正常停止,则在该进程中运行的所有连接器都将被正常停止(这意味着所有位置和偏移量都将被记录),并且这些连接器将在同一集群中的其他 Kafka Connect 服务实例上重新启动。当这些连接器重新启动时,它们将从上次中断的地方继续记录事件,不会记录重复事件。
当其集群中的一个 Kafka Connect 服务实例正常停止时,连接器将完成其当前工作,并将最新的位置和偏移量记录到 Kafka 中。从主题消费的下游应用程序将简单地等待直到添加新事件。
如果其集群中的一个 Kafka Connect 服务实例意外崩溃,则在该崩溃进程中运行的所有连接器将在同一集群中的其他 Kafka Connect 服务实例上重新启动。但是,当这些连接器重新启动时,它们将从连接器崩溃前最后记录的位置/偏移量开始从数据库记录事件。这意味着新重新启动的连接器很可能会记录一些在崩溃前已经记录过的相同事件,并且这些重复事件将始终对下游消费应用程序可见。
当被监控的数据库停止或崩溃时会发生什么?
当 Debezium 监控的数据库服务器停止或崩溃时,Debezium 连接器很可能会尝试重新建立通信。Debezium 会定期将连接器的位置和偏移量记录到 Kafka 中,因此一旦连接器建立通信,它应该会继续从上次记录的位置和偏移量读取。
为什么消费应用程序必须预期重复的事件?
当所有系统正常运行时,或者当某些或所有系统被正常关闭时,消费应用程序可以预期看到每个事件恰好一次。然而,当出现问题时,消费应用程序总有可能看到事件至少一次。
当 Debezium 的系统崩溃时,它们并非总能记录其最后的位置/偏移量。当它们重新启动时,它们将从最后已知的位置恢复,因此消费应用程序将始终看到每个事件,但可能至少会看到一些消息在恢复过程中重复。
此外,网络故障可能导致 Debezium 连接器无法收到写入确认,导致同一事件被记录一次或多次(直到收到确认)。
是什么导致了 MySQL 连接器的间歇性 EventDataDeserializationException
异常?
当您在启动连接器约 1 分钟后遇到间歇性的反序列化异常,根本原因是 EOFException
或 java.net.SocketException: Connection reset
时
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}
Caused by: java.lang.RuntimeException: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}
Caused by: java.io.EOFException
or
Caused by: java.net.SocketException: Connection reset
那么像这样更新这些 MySQL 服务器全局属性将解决问题
set global slave_net_timeout = 120; (default was 30sec)
set global thread_pool_idle_timeout = 120;
什么是 Kafka?
Apache Kafka 是一个快速、可伸缩、持久化且分布式的消息系统,它将所有消息记录在复制、分区和完全排序的事务日志中。消费者跟踪它们在日志中的位置,并且可以独立于所有其他消费者控制此位置。这意味着某些消费者可以从日志的开头开始,而其他消费者则可以跟上最新记录的消息。Kafka 作为一组动态的代理运行。每个日志分区都被复制到多个代理,因此,如果任何代理失败,集群仍然拥有该分区的多个副本。
Debezium 连接器将所有事件记录到 Kafka 集群,应用程序通过 Kafka 消费这些事件。
什么是 Kafka Connect?
Kafka Connect 是一个用于在 Apache Kafka 和其他系统之间大规模可靠地流式传输数据的框架。它是 Kafka 社区最近添加的一个组件,它使得定义将大量数据移入和移出 Kafka 的连接器变得简单,而框架则完成了大部分记录连接器偏移量的繁重工作。Kafka Connect 服务拥有一个 RESTful API,用于管理和部署连接器;该服务可以进行集群化,并会自动将连接器分布到集群中,确保连接器始终运行。
Debezium 使用 Kafka Connect 框架。Debezium 的所有连接器都是 Kafka Connector 的源连接器,因此它们可以使用 Kafka Connect 服务进行部署和管理。
使用 Debezium Docker 镜像时如何连接到 Kafka?
在使用 Docker for Mac 或 Docker for Windows 时,Docker 容器在轻量级 VM 中运行。为了从您的主机系统连接到 Kafka,例如使用 IDE 中测试启动的 Kafka 消费者,您需要将您主机系统的 IP 地址或主机名指定为 Kafka 容器的 ADVERTISED_HOST_NAME
:docker run -it --rm --name kafka -p 9092:9092 -e ADVERTISED_HOST_NAME=<%YOUR_HOST_NAME%> --link zookeeper:zookeeper debezium/kafka:{debezium-docker-label}
。此名称将由 Zookeeper 发布给询问 Kafka 代理名称的客户端。
使用 Connect 镜像的默认内存设置可以处理多大的数据库?
启动和运行时内存消耗取决于 Debezium 监控的数据库中表的总数、每个表中的列数以及数据库产生的事件量。粗略地说,默认内存设置(最大堆设置为 256 MB)可以处理数据库,其中所有表的总列数少于 10000。
如何从二进制表示中检索 DECIMAL 字段?
如果 Debezium 配置为以精确方式处理 DECIMAL 值,则它将其编码为 org.apache.kafka.connect.data.Decimal
。此类型被转换为 BigInteger
并序列化为字节数组。要将其解码回,我们需要预先知道值的精度,或者必须从模式中获取。解包代码可能如下面的代码片段之一,具体取决于编码值是作为字节数组还是字符串提供。
byte[] encoded = ...;
int scale = ...;
final BigDecimal decoded = new BigDecimal(new BigInteger(encoded), scale);
String encoded = ...;
int scale = ...;
final BigDecimal decoded = new BigDecimal(new BigInteger(Base64.getDecoder().decode(encoded)), scale);
如何更改源数据库的偏移量(offsets)?
这是一个高度技术性的操作,涉及操作 Kafka Connect 内部。请仅将此作为最后的解决方案。 |
有时数据库日志包含无效数据(如无效日期)需要跳过,或者有必要重新处理一部分历史日志。通常没有直接的方法(MySQL 连接器的 event.deserialization.failure.handling.mode
除外)来实现此操作,但有一个解决方法可以操作 Kafka Connect 的内部数据。
第一步是找出包含插件偏移量的主题的名称。这在 offset.storage.topic
选项中配置。
下一步是找出给定连接器的最后偏移量、存储它的键以及用于存储偏移量的分区。示例如下
$ kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530088501,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}
inventory-connector
的键是 ["inventory-connector",{"server":"dbserver1"}]
,分区号是 11
,最后一个偏移量是 {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}
。
要回溯到之前的偏移量,应停止连接器并执行以下命令
$ echo '["inventory-connector",{"server":"dbserver1"}]|{"ts_sec":1530168950,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}' | \
kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11
如何为连接器移除已提交的偏移量?
这是一个高度技术性的操作,涉及操作 Kafka Connect 内部。请仅将此作为最后的解决方案。 |
有时在进行实验时(或当连接器在开始时配置错误时),有必要重置连接器的偏移量以从干净状态开始。
根据您的 Kafka Connect 版本,有两种重置偏移量的方法
使用 Kafka Connect 3.6 或更高版本
Kafka Connect 在 KIP-875 中引入了一组用于管理连接器偏移量的专用 REST API 端点。这些端点通过几个简单的 REST 端点调用简化了移除连接器偏移量的过程。
- 步骤 1 - 停止连接器
-
为了干净地移除连接器的偏移量,连接器必须处于停止状态。要将连接器置于停止状态,请向
/connectors/<connector-nane>/stop
端点发出 HTTPPUT
请求。curl -X PUT -H "Content-Type: application/json" \ "https://<host>:<port>/connectors/<connector-name>/stop"
在继续之前,请等待连接器正常关闭。
- 步骤 2 - 重置偏移量
-
下一步涉及实际移除连接器的现有偏移量。这是一个破坏性操作,无法撤销。要移除偏移量,请向
/connectors/<connector-name>/offsets
端点发出 HTTPDELETE
请求。curl -X DELETE -H "Content-Type: application/json" \ "https://<host>:<port>/connectors/<connector-name>/offsets"
在执行此步骤之前,连接器必须处于停止状态,请参阅步骤 1。如果连接器未停止,则重置偏移量将产生不可预测的结果。 - 步骤 3 - 移除模式历史记录主题
-
当连接器不再有偏移量时,模式历史记录主题也不应存在。为确保连接器无错误地启动,还必须移除模式历史记录主题。这通常适用于所有 Debezium 连接器,但 PostgreSQL 除外。
- 步骤 4 - 重启连接器
-
在步骤 1 中,连接器被置于停止状态但未被移除。为了重启连接器使其开始捕获更改,请向
/connectors/<connector-nane>/resume
端点发出 HTTPPUT
请求。curl -X PUT -H "Content-Type: application/json" \ "https://<host>:<port>/connectors/<connector-name>/resume"
如果您选择在步骤 1 中取消部署连接器而不是使用停止端点,那么在此步骤中,您应该重新部署连接器而不是使用恢复端点。
使用 Kafka Connect 3.5 或更早版本
第一步是找出包含插件偏移量的主题的名称。这在 offset.storage.topic
选项中配置。
下一步是找出给定连接器的最后偏移量、存储它的键以及用于存储偏移量的分区。示例如下
$ kafkacat -b localhost -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530088501,"file":"mysql-bin.000003","pos":817,"row":1,"server_id":223344,"event":2}
Partition(11) ["inventory-connector",{"server":"dbserver1"}] {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}
inventory-connector
的键是 ["inventory-connector",{"server":"dbserver1"}]
,分区号是 11
,最后一个偏移量是 {"ts_sec":1530168941,"file":"mysql-bin.000004","pos":3261,"row":1,"server_id":223344,"event":2}
。
要删除连接器偏移量,应停止连接器并执行以下命令
$ echo '["inventory-connector",{"server":"dbserver1"}]|' | \
kafkacat -P -Z -b localhost -t my_connect_offsets -K \| -p 11
此命令为给定键写入一个 NULL
消息,该消息在逻辑上被翻译为移除给定连接器的存储偏移量。
为什么 KSQL 无法打印墓碑事件(tombstone events)?
在使用 KSQL 流式查询引擎时,默认情况下 Debezium 连接器在删除捕获表中的记录时创建的墓碑事件(tombstone events)不受支持。
PRINT 'dbserver.inventory.orders' FROM BEGINNING;
com.fasterxml.jackson.databind.node.NullNode cannot be cast to com.fasterxml.jackson.databind.node.ObjectNode
请考虑使用 after state extraction SMT 及其删除墓碑的选项来移除墓碑事件。
为什么 Debezium MySQL 连接器在进行模式更改时崩溃?
当 MySQL 连接器监控一个正在应用 Gh-ost 或 pt-online-schema-change 等模式更改工具的表时,MySQL 连接器可能会因值转换器抛出的异常而崩溃。这些工具在迁移过程中会创建辅助表,需要将这些辅助表包含在白名单中。
如何在每个租户一个数据库模式(database-per-tenant pattern)中降低内存消耗?
如果您的多租户基于单租户数据库,您的 Debezium 连接器将不得不多次存储列和表的元数据。您可以使用 JVM -XX:+UseStringDeduplication
标志来降低内存消耗。所有 JVM 参数都可以通过 KAFKA_OPTS
环境变量传递。您的 Dockerfile 中的示例如下
ENV KAFKA_OPTS="-XX:+UseStringDeduplication"
如何解决偏移量刷新超时(offset flush timeouts)问题?
当日志中出现类似 Failed to flush, timed out while waiting for producer to flush outstanding 218630 messages
的错误时,这意味着 Kafka Connect 无法足够快地将偏移量记录到偏移量主题中。
可能存在多种解决方案和问题的根本原因
-
Kafka 选项
acks
设置为 all,并且其中一个副本代理在处理写入时速度缓慢 -
连接器记录生成速度非常快,应调整 Kafka Connect 选项
offset.flush.interval.ms
和offset.flush.timeout.ms
。应缩短间隔并增加超时。 -
Debezium 生成非常大的批次记录,请减小参数
max.batch.size
和max.queue.size
。
为什么在某些情况下我看不到 DELETE 事件?
这可能是由于使用了 CASCADE DELETE
语句。在这种情况下,数据库生成的删除事件不包含在 binlog 中,因此无法被 Debezium 捕获。
为什么 Debezium MySQL 连接器无法从 RDS MySQL 只读副本(read replica)消费数据?
Debezium MySQL 需要启用服务器 binlog。对于 RDS MySQL,log_bin
属性由 AWS 直接管理,默认为 OFF
。当 Debezium MySQL 在快照期间执行 SHOW MASTER STATUS
命令时,结果集为空并抛出异常。
Caused by: java.lang.IllegalStateException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
at io.debezium.connector.mysql.SnapshotReader.lambda$readBinlogPosition$16(SnapshotReader.java:761)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:444)
at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:385)
at io.debezium.connector.mysql.SnapshotReader.readBinlogPosition(SnapshotReader.java:745)
at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:370)
解决方案是间接启用 log_bin
属性,激活 RDS MySQL 中的某些产品功能:只读副本和/或自动备份。激活其中任何一个后,bin_log
属性值将自动变为 ON
,连接器将能够成功完成快照。
为什么 Debezium PostgreSQL 连接器导致 WAL 数据库磁盘空间异常消耗?
请参阅 PostgreSQL 连接器文档中的 WAL 磁盘空间消耗。
为什么在交换了表中的两个列的内容后,没有创建新的模式版本?
如果表中两个列的交换方式导致更改后的表结构与之前相同,那么模式注册表中就不会创建新的模式版本。此类操作的一个示例如下
-
原始表 -
id
,c1
,c2
,其中c1
和c2
类型相同 -
列交换 -
id
,c2
,c1
-
列重命名 -
id
,c1
,c2
模式注册表仅在模式逻辑上发生更改时才创建新版本的模式,但在这种情况下,从外部观察者来看,更改后的模式是相同的。
如何增大发送到 Kafka 的消息的最大大小?
对于大型事务,Kafka Connect 可能会发出大于预设最大值消息的消息。日志通常包含类似的异常
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1740572 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
要解决此问题,必须在 Kafka Connect 工作者配置文件 connect-distributed.properties
中设置配置选项 producer.max.request.size
。如果全局更改不理想,连接器可以使用配置选项 producer.override.max.request.size
设置为更大的值来覆盖默认设置。
在后一种情况下,还必须在 Kafka Connect 工作者配置文件 connect-distributed.properties
中配置 connector.client.config.override.policy=ALL
选项。对于 Debezium connect
Docker 镜像,可以使用环境变量 CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY
来配置此选项。
为什么 JSON 消息不包含模式(schema)?
如果您使用 JsonConverter
来转换和序列化 Debezium 发出的消息,默认情况下消息中不包含模式。通过将 schemas.enable
转换器配置参数设置为 true
来启用模式,该参数可以在工作者级别(例如 connect-distibuted.properties
)设置
key.converter.schemas.enable=true
value.converter.schemas.enable=true
或者在连接器级别设置,具体取决于转换器的配置位置。
如何启用不区分大小写的名称过滤?
几个配置选项,如 table.include.list
,它们定义了一组正则表达式,在应用于标识符时是不区分大小写的。
如果您的环境需要区分大小写的匹配(例如,两个标识符仅在字母大小写上有所不同),则可以使用标志 (?-i)
为给定表达式强制执行区分大小写的匹配。
为什么 MySQL TIMESTAMP
列的变更事件值在快照和流式传输之间存在差异?
如果 Debezium(或者更确切地说,MySQL JDBC 驱动程序)由于某种原因无法检索数据库的时区,则可能出现这种情况。在这种情况下,TIMESTAMP
值可能无法规范化为 UTC。在此情况下,必须通过 database.connectionTimeZone
透传连接器选项(对于 Debezium 1.7 之前的版本必须使用 database.serverTimezone
)显式指定数据库时区。
MongoDB 连接器因最大的 BSON 大小错误而失败
当 Debezium(或者更确切地说,MongoDB 更改流游标)遇到总大小超过 16MB 的 BSON 文档大小限制的更改事件文档时,可能出现这种情况。请注意,根据使用的 capture.mode
,即使存储文档的实际值远低于此限制,此问题仍可能出现。
为缓解此问题,请参阅 cursor.oversize.handling.mode 和 cursor.oversize.skip.threshold 属性的文档。
Debezium 引擎应用程序因“无法找到最小快照锁定模式。请检查您的配置”错误而失败
Debezium 使用 ServiceLoader 通过 SPI 加载实现。实现可以基于连接器类型,也可以是自定义实现。
某些接口有多个实现。例如,io.debezium.snapshot.spi.SnapshotLock
在核心中有一个默认实现,以及每个连接器的特定实现。为确保 Debezium 能够找到所需的实现,您必须显式配置构建工具以合并 META-INF/services
文件。
例如,如果您使用的是 Maven shade 插件,请添加 ServicesResourceTransformer
转换器,如下例所示
...
<configuration>
<transformers>
...
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
...
</transformers>
...
</configuration>
或者,如果您使用 Maven Assembly 插件,您可以使用 metaInf-services 容器描述符处理器。