随着 ChatGPT 近期取得的成功,我们可以看到 AI 领域和机器学习领域再次引起了广泛关注。这个领域之前的关注浪潮,至少在一定程度上,是由像 TensorFlow、PyTorch 这样的优秀机器学习框架,以及像 Spark 这样的通用数据处理框架变得可用,使得编写机器学习模型变得更加直接。自那时以来,这些框架已经成熟,编写模型变得更加容易,正如您稍后将在本文中看到的那样。然而,数据集的准备和从各种来源收集数据有时会耗费大量时间和精力。创建一个完整的管道,能够提取现有或新创建的数据,对其进行调整,并将其摄取到选定的机器学习库中,这可能具有挑战性。让我们来探讨一下 Debezium 是否能帮助完成这项任务,并了解如何利用 Debezium 的功能来使其更容易。
机器学习管道中的变更数据捕获和 Debezium
变更数据捕获 (CDC) 在机器学习中可能是一个引人注目的概念,尤其是在在线机器学习中。然而,使用预训练模型时,CDC 也可以成为管道的重要组成部分。我们可以利用 CDC 将新数据立即传递给预训练模型,模型可以对其进行评估,管道的其他部分可以根据模型的输出实时采取任何行动。
除了这些用例之外,Debezium 非常适合任何管道,包括从数据库加载数据。Debezium 可以捕获现有数据,也可以流式传输任何新创建的数据。Debezium 的另一个重要特性是支持单个消息转换。我们可以在整个管道的最开始调整数据。在应用转换或过滤器时,我们可以将数据传输限制在仅对我们感兴趣的数据上,从而节省带宽并提高管道内的速度。此外,Debezium 可以将记录传递给多个消息代理,并且正在添加更多代理(在最新的 2.2.0 版本中提供了几个新的代理)。这些持续的改进增加了将 Debezium 与其他工具链或数据管道集成的机会。可能性是无限的,Debezium 的通用连接器框架可能允许 CDC 超出数据库的范围。
这是理论。现在让我们探讨一下它在现实中是如何工作的。本文将介绍如何将数据流式传输到 TensorFlow。根据社区的兴趣,这可能会导致一系列博客文章,我们在其中探索与其他机器学习库和框架的可能集成。
Debezium 和 TensorFlow 集成
TensorFlow 是最受欢迎的机器学习框架之一。它为在各种应用程序中构建、训练和部署机器学习模型提供了一个全面的平台。
为保持简单,我们将实现一个识别手写数字的模型,这在神经网络领域大致相当于“Hello World”。本次演示的最终目标是使用 Debezium 从 Postgres 中加载 MNIST 数据样本(这些数据会持续存储),将其传递给我们在 TensorFlow 中实现的模型进行训练,并使用这个训练好的模型对图像进行实时分类。
下图描绘了完整的管道。
本文后面提到的所有代码都可以在 Debezium 示例存储库 中作为 Debezium 示例找到。
数据样本
我们将使用 MNIST 数据样本。训练样本包含 60,000 张手写数字 0 到 9 的图像以及相同数量的对应数字标签。测试样本包含 1,000 张图像。样本以 gzip 二进制文件的形式提供。由于我们假设一个用例是感兴趣的数据在数据库中,因此我们需要先将数据加载到数据库中。
我们需要生成两个 SQL 文件,一个用于训练数据集,名为 mnist_train.sql,另一个用于测试数据样本,名为 mnist_test.sql。每个文件将包含用于创建具有两列的表的 SQL 命令:pixels 列,类型为 BYTEA,包含原始图像字节;labels 列,类型为 SMALLINT,包含给定表行中图像对应的数字。文件的其余部分将包含用于填充表的命令。图像字节可以解码为十六进制字符串。
由于我们将在稍后的文章中展示如何利用 Debezium 进行数据流式传输,因此我们最初会将训练数据集加载到数据库中。训练数据的 SQL 文件将直接由 Postgres 容器使用 - 当它启动时,它会将这些数据加载到训练表中。我们稍后将使用测试数据 SQL 文件。然而,数据的准备对于训练和测试样本都是相同的,我们可以一次性准备好两者。
要准备这些 SQL 文件,您可以使用 Debezium tensorflow-mnist 示例 中的 mnist2sql.py 脚本。
$ ./mnist2sql.py --download 该脚本假定 MNIST 数据集在 postgres 目录下可用。当使用 --download 参数时,该脚本首先会将 MNIST 数据样本下载到 postgres 目录中。postgres 目录将包含生成的 SQL 文件。
将流式数据加载到 TensorFlow
最常见的 Debezium 用法是将记录流式传输到 Kafka。TensorFlow 提供了 TensorFlow I/O 模块,用于从各种源加载数据。除了其他源之外,它还允许从 Kafka 加载数据。有几种方法可以做到这一点。IODataset.from_kafka() 方法仅加载指定 Kafka 主题中存在的现有数据。两个实验性类支持流式传输数据,KafkaBatchIODataset 和 KafkaGroupIODataset。两者非常相似,允许它们处理流式数据,即它们不仅从 Kafka 主题读取现有数据,还会等待新数据并最终将新记录传递到 TensorFlow。当在指定的时间范围内没有新事件时,流式传输结束。
这听起来不错。然而,最大的问题是 Dataset 中记录的表示。这些 Kafka 加载器完全忽略了 Kafka 提供的记录的模式,这意味着键和值是原始数据字节。此外,摄取管道通过将这些字节转换为字符串(即对对象调用 toString())来使过程复杂化。因此,如果您通过 Kafka 传递,例如,原始图像字节,使用 Kafka BYTES_SCHEMA,其结果将类似于此:
<tf.Tensor: shape=(64,), dtype=string, numpy=
array([b'[B@418b353d', b'[B@6aa28a4c', b'[B@b626485', b'[B@6d7491cd',
b'[B@13fa86c5', b'[B@7c3bc352', b'[B@64e5d61c', b'[B@2dd6d9b4',
b'[B@6addae65', b'[B@48ded13f', b'[B@2c1bb0e', b'[B@19c1d99b',
b'[B@1ee8f240', b'[B@20019f8b', b'[B@2f17494e', b'[B@380d4036',
b'[B@61aecf85', b'[B@4d7fe9fc', b'[B@58b79424', b'[B@ae963f4',
b'[B@1dac57cb', b'[B@2fae7d8b', b'[B@4b5ccaee', b'[B@aebf6b2',
b'[B@7506ea2b', b'[B@29989325', b'[B@43e2742', b'[B@51350f11',
b'[B@13a0f0ae', b'[B@7e4c4844', b'[B@b3d64f8', b'[B@7209bf09',
b'[B@66380466', b'[B@7aaa7e8d', b'[B@1ad0cf84', b'[B@259eca20',
b'[B@3a3f1c1', b'[B@36e4ff1f', b'[B@6578fc29', b'[B@79c924be',
b'[B@765b7f70', b'[B@67567aa3', b'[B@456d4bd4', b'[B@75317b13',
b'[B@58bc3a3a', b'[B@c6bc0ec', b'[B@2377095e', b'[B@5de017c0',
b'[B@64b48bac', b'[B@360a5b76', b'[B@2d2c9910', b'[B@70afd562',
b'[B@3006c930', b'[B@54b3e5ad', b'[B@1d1e0232', b'[B@1394d036',
b'[B@155dd43d', b'[B@5e88d5b6', b'[B@33ea53c7', b'[B@64a30ec',
b'[B@7dcdf024', b'[B@6570bf4e', b'[B@4e5bc4c', b'[B@537f216c'],
dtype=object)>, 您得到的是 Java 字节数组的字符串表示,而不是可以直接在 TensorFlow 中进一步处理的原始图像字节批次,这非常不实用。
最简单的解决方案是在将原始图像字节发送到 Kafka 之前将其转换为数字,以减轻这个问题。由于 TensorFlow 提供了解析 CSV 输入的方法,我们可以将每个图像转换为一行数字的 CSV。由于 TensorFlow 主要处理数字,无论如何我们都需要将图像转换为数字。我们可以将图像上的数字作为消息键传递。现在,Debezium 支持的单个消息转换就派上用场了。转换可以这样进行:
@Override
public R apply(R r) {
final Struct value = (Struct) r.value();
String key = value.getInt16(labelFieldName).toString();
StringBuilder builder = new StringBuilder();
for (byte pixel : value.getBytes(pixlesFieldName)) {
builder.append(pixel & 0xFF).append(",");
}
if (builder.length() > 0) {
builder.deleteCharAt(builder.length() - 1);
}
String newValue = builder.toString();
return r.newRecord(r.topic(), r.kafkaPartition(), Schema.STRING_SCHEMA, key, Schema.STRING_SCHEMA, newValue, r.timestamp());
} 在 TensorFlow 端,我们必须将从 Kafka 消息中获得的字节转换为数字。以下演示了一个方便处理此问题的映射函数:
def decode_kafka_record(record):
img_int = tf.io.decode_csv(record.message, [[0.0] for i in range(NUM_COLUMNS)])
img_norm = tf.cast(img_int, tf.float32) / 255.
label_int = tf.strings.to_number(record.key, out_type=tf.dtypes.int32)
return (img_norm, label_int) 在这里,我们解析 CSV 行(可能以原始字节形式提供),并立即将数字缩放到 <0, 1> 区间,这对于稍后训练我们的模型非常方便。加载数据和创建数据批次非常直接:
train_ds = tfio.IODataset.from_kafka(KAFKA_TRAIN_TOPIC, partition=0, offset=0, servers=KAFKA_SERVERS)
train_ds = train_ds.map(decode_kafka_record)
train_ds = train_ds.batch(BATCH_SIZE) 在这里,我们使用 IODataset.from_kafka() 来加载 Kafka 主题中的现有数据,使用我们的映射函数将字节转换为数字,然后缩放数字。最后一步是从数据集中创建批次以进行更有效的处理。tfio.IODataset.from_kafka() 的参数是不言自明的,可能不需要进一步评论。
结果是,我们得到了一个由二维张量组成的数据集。第一个维度是一个表示图像的浮点向量,而第二个维度是一个描述图片上数字的单个数字(标量)。一旦我们准备好了训练数据集,我们就可以定义我们的神经网络模型。
定义模型
为保持简单,因为本文的主要目标不是展示最佳的手写数字分类器,而是展示如何创建数据管道,让我们使用一个非常简单的模型:
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
]) 该模型仅包含两个层。尽管这个模型非常简单,但它在识别手写数字方面仍然做得相当不错。可能比模型本身更有趣的是在 TensorFlow(或者实际上是 Keras,但它现在是 TensorFlow 的一部分)中编写模型有多么容易。
同样容易的是定义模型优化器和损失函数:
model.compile(
optimizer=tf.keras.optimizers.Adam(0.001),
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=[tf.keras.metrics.SparseCategoricalAccuracy()],
) 解释这些函数超出了本文的范围,您可以查阅几乎任何在线机器学习课程或教科书以获得详细解释。
一旦模型准备好,我们就可以在上一节准备好的训练数据集上进行训练:
model.fit(train_ds,epochs=MAX_EPOCHS) 此步骤可能需要很长时间才能完成。然而,一旦完成,我们的模型就可以识别手写数字了!
将数据流式传输到模型
让我们看看我们的模型在数字识别方面的表现如何。但由于我们的主要目标是探索将数据摄取到 TensorFlow 的方法,我们将从一个空的(或者更准确地说,甚至不存在的)Kafka 主题开始模型评估,看看我们是否能够实时评估数据,当它们首先出现在数据库中,然后出现在相应的 Kafka 主题中时。为此,我们可以使用上面提到的一种流式传输类:
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=[KAFKA_TEST_TOPIC],
group_id=KAFKA_CONSUMER_GROUP,
servers=KAFKA_SERVERS,
stream_timeout=9000,
configuration=[
"session.timeout.ms=10000",
"max.poll.interval.ms=10000",
"auto.offset.reset=earliest"
],
) 同样,参数大多是不言自明的。有两点可能需要进一步解释:stream_timeout 和 configuration 参数。stream_timeout 确定了(以毫秒为单位)不活动的间隔,在此之后流式传输将终止。configuration 是 librdkafka 配置。它是 Kafka 客户端的配置;您至少应配置会话超时(session.timeout.ms)及其轮询间隔(max.poll.interval.ms)。这些参数的值应高于 stream_timeout 的值。
此加载器提供的数据集略有不同 - 它不提供包含消息及其键的单个记录,而是将键和消息分开提供。因此,我们必须定义一个略有修改的映射函数,该函数带有两个参数:
def decode_kafka_stream_record(message, key):
img_int = tf.io.decode_csv(message, [[0.0] for i in range(NUM_COLUMNS)])
img_norm = tf.cast(img_int, tf.float32) / 255.
label_int = tf.strings.to_number(key, out_type=tf.dtypes.int32)
return (img_norm, label_int) 使用此函数,我们可以调整数据集并像以前一样创建批次:
test_ds = test_ds.map(decode_kafka_stream_record)
test_ds = test_ds.batch(BATCH_SIZE) 并评估模型:
model.evaluate(test_ds) 您可以在 Jupyter notebook 中执行模型评估单元格。执行将等待,因为 Kafka 中不存在这样的主题,并且数据库中没有包含测试数据的表。流式传输超时为 9 秒,因此必须在启动模型评估后的此时间范围内提供数据。在此演示开始时,我们在 postgres 目录中创建了一个名为 mnist_test.sql 的 SQL 文件,该文件可以生成我们所需的测试数据。
$ export PGPASSWORD=postgres
$ psql -h localhost -U postgres -f postgres/mnist_test.sql 片刻之后,您应该会在 Jupyter notebook 的输出中看到一些数据已进入模型,然后不久之后,模型将完成最终评估。
为了使结果更接近人类,让我们手动定义一个图像并将其提供给模型。我们也可以在 Jupyter notebook 中轻松显示图像。用于绘制图像并将模型预测作为绘图标题的函数可能如下所示:
def plot_and_predict(pixels):
test = tf.constant([pixels])
tf.shape(test)
test_norm = tf.cast(test, tf.float32) / 255.
prediction = model.predict(test_norm)
number = tf.nn.softmax(prediction).numpy().argmax()
pixels_array = np.asarray(pixels)
raw_img = np.split(pixels_array, 28)
plt.imshow(raw_img)
plt.title(number)
plt.axis("off") 此函数中可能唯一晦涩的行是包含 softmax() 函数的那一行。该函数将结果向量转换为概率向量。该向量的元素表示给定位置的数字是图像上数字的概率。因此,概率最高的那个位置是模型的预测,其中 argmax() 源自于此。
例如,我们可以尝试一个包含手写数字 3 的图像:
pixels = [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,108,43,6,6,6,6,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,10,84,248,254,254,254,254,254,241,45,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,90,254,254,254,223,173,173,173,253,156,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,79,157,228,245,251,188,63,17,0,0,54,252,132,0,0,0,0,0,0,0,0,0,0,0,0,0,0,32,254,254,254,244,131,0,0,0,0,13,220,254,122,0,0,0,0,0,0,0,0,0,0,0,0,0,0,83,254,225,160,47,0,0,0,0,59,211,254,206,50,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,21,14,0,0,0,2,17,146,245,250,194,12,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,81,140,140,171,254,254,254,203,55,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,211,254,254,254,254,179,211,254,254,202,171,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,167,233,193,69,16,3,9,16,107,231,248,195,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,73,229,182,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,26,99,252,254,146,0,0,0,0,0,0,0,0,79,142,0,0,0,0,0,0,0,0,0,26,28,116,147,247,254,239,150,22,0,0,0,0,0,0,0,0,175,230,174,155,66,66,132,174,174,174,174,250,255,254,192,189,99,36,0,0,0,0,0,0,0,0,0,0,106,226,254,254,254,254,254,254,254,254,217,151,80,43,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,4,7,114,114,114,46,5,5,5,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
plot_and_predict(pixels) 结果如下:
您可以通过从 Kafka 流中读取来执行相同的操作,我们可以为此目的重复使用现有的主题。由于我们已经读取了测试流中的所有记录,因此如果我们想使用流式 KafkaGroupIODataset 重新读取它,就需要更改 Kafka 消费者组。
manual_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=[KAFKA_TEST_TOPIC],
group_id="mnistcg2",
servers=KAFKA_SERVERS,
stream_timeout=9000,
configuration=[
"session.timeout.ms=10000",
"max.poll.interval.ms=10000",
"auto.offset.reset=earliest"
],
)
manual_ds = manual_ds.map(decode_kafka_stream_record) 如果您想创建一个新的流并验证我们的模型在新数据到来时是否能提供预测,您可以轻松做到:
$ head -5 mnist_test.sql | sed s/test/manual/ > mnist_manual.sql
$ psql -h localhost -U postgres -f postgres/mnist_manual.sql 在这种情况下,您无需更改 Kafka 消费者组,但必须更改 Kafka 主题。
manual_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["tf.public.mnist_manual"],
group_id=KAFKA_CONSUMER_GROUP,
servers=KAFKA_SERVERS,
stream_timeout=9000,
configuration=[
"session.timeout.ms=10000",
"max.poll.interval.ms=10000",
"auto.offset.reset=earliest"
],
)
manual_ds = manual_ds.map(decode_kafka_stream_record) 无论哪种情况,结果都应如下所示:
结论
在此演示中,我们展示了如何从数据库加载现有数据,即时对其进行转换,通过 Kafka 将其摄取到 TensorFlow 模型中,并用于模型训练。之后,我们使用 CDC 和数据流式传输将新创建的数据摄取到这个预训练模型中,并获得了有意义的结果。Debezium 不仅可以为本文所述的用例提供有价值的服务,还可以在将数据摄取到在线机器学习管道中发挥关键作用。
尽管整个管道相对容易实现,但仍有一些领域可以改进,以提升用户体验和/或使整个管道更加顺畅。由于我们(Debezium 开发者)的背景主要不在机器学习和数据科学领域,因此我们非常希望得到社区的反馈,了解 Debezium 如何帮助机器学习管道(或者是否已用于此类情况,如果有的话),以及哪些方面还有改进空间。我们也欢迎有关 Debezium 或更广泛的变更数据捕获如何在此领域提供帮助的新想法。这些想法进一步揭示了 Debezium 将数据摄取到机器学习管道中的潜力,并为整个过程带来更好的用户体验。如果您有任何相关反馈,请随时通过 Zulip 聊天、邮件列表 与我们联系,或者您可以直接将您的想法转化为 Jira 功能请求。
关于 Debezium
Debezium 是一个开源的分布式平台,可以将现有数据库转变为事件流,使应用程序能够几乎即时地看到并响应数据库中已提交的每个行级更改。Debezium 构建在 Kafka 之上,并提供了 Kafka Connect 兼容的连接器,用于监控特定的数据库管理系统。Debezium 将数据更改的历史记录在 Kafka 日志中,这样您的应用程序可以随时停止和重新启动,并可以轻松地消费在未运行时错过的所有事件,确保所有事件都被正确且完整地处理。Debezium 在 Apache 许可证 2.0 下是 开源 的。