利用Vertex AI构建TensorFlow管道

利用Vertex AI构建TensorFlow管道

如何确保机器学习模型获得茁壮成长所需的高质量数据?在当今的机器学习领域,妥善处理数据与构建强大的模型同等重要。将高质量、结构良好的数据输入到模型中可以显著影响性能和训练速度。TensorFlow Dataset API 通过提供一系列工具来构建、管理和优化数据管道,从而简化了这一过程。在本指南中,我们将从使用 Vertex AI Workbench 配置开发环境,到从各种来源加载数据,再到将这些管道纳入模型训练流程,一步步进行讲解。

  • 从内存数组以及 CSV 和 TFRecord 文件等外部数据源构建数据集。
  • 利用映射、洗牌、批处理、缓存和预取等操作来简化数据处理。
  • 将您的数据集无缝集成到 TensorFlow 的模型训练例程中,以实现高效的模型开发。
  • 学习启动 Vertex AI Workbench 实例、设置 Jupyter Notebook 并开始工作。
  • 在数据管道中直接应用数据增强技术,从而增强机器学习模型。

什么是TensorFlow?

TensorFlow 是谷歌为机器学习和深度学习研究开发的开源平台。它提供了一个广泛的工具和库生态系统,使研究人员能够突破机器学习的极限,使开发人员能够轻松构建和部署智能应用程序。TensorFlow 既支持高级 API(如 Keras),也支持低级操作,使初学者也能使用,而高级用户仍能使用其强大功能。

什么是Vertex AI工作台?

Vertex AI Workbench 是谷歌云中的一个托管开发环境,旨在帮助您构建和训练机器学习模型。它提供完全托管的 Jupyter Notebook 体验以及预装的机器学习库,包括 TensorFlow 和 PyTorch。借助 Vertex AI Workbench,您可以将本地开发与云计算资源无缝整合,从而更轻松地开展大型项目,而不必担心基础设施的设置。

在本指南中,您不仅将学习如何使用 TensorFlow 的数据集 API,还将了解如何使用 Vertex AI Workbench 设置环境。我们将介绍从启动新实例、创建 Jupyter Notebook 到加载数据集的所有内容。

了解TensorFlow数据集API

TensorFlow 数据集 API 是一套旨在简化数据输入管道构建过程的工具。在任何机器学习任务中,模型的性能不仅取决于算法本身,还取决于输入数据的质量和流量。数据集应用程序接口(Dataset API)可让您随时随地执行加载数据、预处理数据和转换数据等任务。

该应用程序接口之所以如此强大,是因为它能以简单易懂的顺序将多个操作串联起来。您可以从各种来源加载数据,应用必要的转换(如缩放或归一化),甚至可以对数据进行洗牌,以防止模型过度拟合。这种方法不仅能让代码更简洁、更易于维护,还能利用缓存和预取等技术优化性能。

使用Vertex AI工作台设置环境

在开始使用 TensorFlow 数据集 API 之前,您需要一个强大的环境。Vertex AI Workbench 是一个很好的选择,因为它提供了一个完全托管、基于云的开发环境,并预装了您需要的所有工具。

启动Vertex AI工作台实例

  • 首先登录谷歌云账户。从导航菜单中搜索并选择 Vertex AI。

Vertex AI工作台

  • 点击“Enable All Recommended APIs”按钮。这将确保您的项目可以访问所有必要的 API 服务。
  • 在导航菜单中点击工作台。确保您处于“Instances”视图中。

Vertex AI工作台实例

  • 单击“Create New”启动新的工作台实例。系统将提示您配置实例:
    • 名称:给实例起一个有意义的名字,如 lab-workbench。
    • 地区和区域:选择实例所在的适当地区和区域。
    • 高级选项:如有需要,可通过选择机器类型或磁盘大小等选项自定义实例设置。
  • 配置完成后,单击“Create”。实例设置可能需要几分钟时间。一旦准备就绪,你会看到其名称旁边有一个绿色的复选标记。

创建工作台实例 启动工作台实例

  • 单击实例名称旁边的 Open JupyterLab。这将在浏览器的新标签页中打开 Jupyter Lab 界面。

打开 Jupyter Lab 界面

创建Jupyter Notebook

打开 JupyterLab 界面后,点击 Python 3 图标就可以创建一个新的 Python 笔记本。将笔记本重命名为描述性名称是个好主意。为此,右键单击文件名(最初可能是 Untitled.ipynb),然后选择重命名笔记本。选择一个能反映项目的名称,如“new_project”。同时将内核从 python 3 改为 TensorFlow 2-11 (Local)。

创建Jupyter Notebook

使用tf.data处理数据

首先将 taxi-train.csvtaxi-valid.csv 数据集上传至笔记本。

使用tf.data处理数据

导入所需库函数

首先,我们需要导入 TensorFlow 和 NumPy,然后将 TensorFlow 的日志级别设置为最小。这样可以减少执行过程中的日志冗余。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import tensorflow as tf
import numpy as np
print("TensorFlow version:", tf.version.VERSION)
# Set minimal TF logging level.
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf import numpy as np print("TensorFlow version:", tf.version.VERSION) # Set minimal TF logging level. import os os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import numpy as np
print("TensorFlow version:", tf.version.VERSION)
# Set minimal TF logging level.
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

从内存中创建数据集

环境设置完成后,就可以开始处理数据了。最简单的方法是从内存中创建数据集。这意味着将存储在计算机内存中的数据(如列表或 NumPy 数组)转换为 TensorFlow 可以处理的格式。

想象一下,你有一小组数字,想用来做一个基本实验。通过 TensorFlow 数据集 API,您可以快速将这些数字转换为可进一步处理的数据集。这个过程非常简单,而且可以扩展到更复杂的数据结构。

例如,您可以从一个包含多个数字的简单 NumPy 数组开始。使用数据集 API,您可以从这个数组创建一个数据集。然后可以对数据集进行迭代,并应用各种转换,例如为每个元素映射一个函数。

创建合成数据集

我们首先创建一个合成数据集。在本例中,我们使用线性方程 y=2x+10 生成特征向量 X 和相应的标签向量 Y。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10
N_POINTS = 10 X = tf.constant(range(N_POINTS), dtype=tf.float32) Y = 2 * X + 10
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10

接下来,我们定义了一个函数,该函数接受我们的特征和标签数组,以及训练次数(epochs)和所需的批量大小。该函数通过切分张量、重复指定次数的训练和批处理(丢弃任何剩余示例以保持批处理大小一致)来构建 TensorFlow 数据集。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def make_synthetic_dataset(X, Y, epochs, batch_size):
# Create the dataset from tensor slices
ds = tf.data.Dataset.from_tensor_slices((X, Y))
# Repeat the dataset and batch it (drop the remainder for consistency)
ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True)
return ds
def make_synthetic_dataset(X, Y, epochs, batch_size): # Create the dataset from tensor slices ds = tf.data.Dataset.from_tensor_slices((X, Y)) # Repeat the dataset and batch it (drop the remainder for consistency) ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True) return ds
def make_synthetic_dataset(X, Y, epochs, batch_size):
# Create the dataset from tensor slices
ds = tf.data.Dataset.from_tensor_slices((X, Y))
# Repeat the dataset and batch it (drop the remainder for consistency)
ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True)
return ds

让我们以 3 个数据点为一批,在数据集上迭代两次,测试我们的函数:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
BATCH_SIZE = 3
EPOCHS = 2
dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
print("Synthetic dataset batches:")
for i, (x_batch, y_batch) in enumerate(dataset):
print(f"Batch {i}: x: {x_batch.numpy()} y: {y_batch.numpy()}")
assert len(x_batch) == BATCH_SIZE
assert len(y_batch) == BATCH_SIZE
BATCH_SIZE = 3 EPOCHS = 2 dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE) print("Synthetic dataset batches:") for i, (x_batch, y_batch) in enumerate(dataset): print(f"Batch {i}: x: {x_batch.numpy()} y: {y_batch.numpy()}") assert len(x_batch) == BATCH_SIZE assert len(y_batch) == BATCH_SIZE
BATCH_SIZE = 3
EPOCHS = 2
dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
print("Synthetic dataset batches:")
for i, (x_batch, y_batch) in enumerate(dataset):
print(f"Batch {i}: x: {x_batch.numpy()}  y: {y_batch.numpy()}")
assert len(x_batch) == BATCH_SIZE
assert len(y_batch) == BATCH_SIZE

合成数据库

损失函数和梯度计算

接下来,我们将定义均方误差(MSE)损失函数和计算梯度的辅助函数。这些函数仍与我们之前的实现类似。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def loss_mse(X, Y, w0, w1):
Y_pred = w0 * X + w1
error = (Y_pred - Y) ** 2
return tf.reduce_mean(error)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
current_loss = loss_mse(X, Y, w0, w1)
return tape.gradient(current_loss, [w0, w1]), current_loss
def loss_mse(X, Y, w0, w1): Y_pred = w0 * X + w1 error = (Y_pred - Y) ** 2 return tf.reduce_mean(error) def compute_gradients(X, Y, w0, w1): with tf.GradientTape() as tape: current_loss = loss_mse(X, Y, w0, w1) return tape.gradient(current_loss, [w0, w1]), current_loss
def loss_mse(X, Y, w0, w1):
Y_pred = w0 * X + w1
error = (Y_pred - Y) ** 2
return tf.reduce_mean(error)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
current_loss = loss_mse(X, Y, w0, w1)
return tape.gradient(current_loss, [w0, w1]), current_loss

训练循环

现在,我们更新训练循环,使其遍历函数创建的 tf.data.Dataset 数据集。在本例中,我们使用 2 的批量大小对模型进行了 250 次训练。

首先,将模型参数初始化为 TensorFlow 变量:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Initialize model parameters
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)
EPOCHS_TRAIN = 250
BATCH_SIZE_TRAIN = 2
LEARNING_RATE = 0.02
# Create the training dataset (synthetic)
train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)
# Initialize model parameters w0 = tf.Variable(0.0) w1 = tf.Variable(0.0) EPOCHS_TRAIN = 250 BATCH_SIZE_TRAIN = 2 LEARNING_RATE = 0.02 # Create the training dataset (synthetic) train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)
# Initialize model parameters
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)
EPOCHS_TRAIN = 250
BATCH_SIZE_TRAIN = 2
LEARNING_RATE = 0.02
# Create the training dataset (synthetic)
train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)

然后,我们使用随机梯度下降法运行训练循环。该循环每更新一批模型参数,我们每 100 步打印一次训练状态。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Training loop
print("\nStarting training loop for synthetic linear regression:")
MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}"
for step, (X_batch, Y_batch) in enumerate(train_dataset):
grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1)
# Update the parameters using gradient descent
w0.assign_sub(LEARNING_RATE * grads[0])
w1.assign_sub(LEARNING_RATE * grads[1])
if step % 100 == 0:
print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy()))
# Final assertions (tolerance based)
assert loss_val < 1e-6
assert abs(w0.numpy() - 2) < 1e-3
assert abs(w1.numpy() - 10) < 1e-3
# Training loop print("\nStarting training loop for synthetic linear regression:") MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}" for step, (X_batch, Y_batch) in enumerate(train_dataset): grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1) # Update the parameters using gradient descent w0.assign_sub(LEARNING_RATE * grads[0]) w1.assign_sub(LEARNING_RATE * grads[1]) if step % 100 == 0: print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy())) # Final assertions (tolerance based) assert loss_val < 1e-6 assert abs(w0.numpy() - 2) < 1e-3 assert abs(w1.numpy() - 10) < 1e-3
# Training loop
print("\nStarting training loop for synthetic linear regression:")
MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}"
for step, (X_batch, Y_batch) in enumerate(train_dataset):
grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1)
# Update the parameters using gradient descent
w0.assign_sub(LEARNING_RATE * grads[0])
w1.assign_sub(LEARNING_RATE * grads[1])
if step % 100 == 0:
print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy()))
# Final assertions (tolerance based)
assert loss_val < 1e-6
assert abs(w0.numpy() - 2) < 1e-3
assert abs(w1.numpy() - 10) < 1e-3

训练循环

从磁盘加载数据

在实际应用中,数据通常存储在磁盘上而不是内存中。使用这些方法从磁盘加载数据可确保高效处理大型数据集,并为模型训练做好准备。两种常见的数据存储格式是 CSV 和 TFRecord。

加载CSV文件

CSV(逗号分隔值)文件广泛用于存储表格数据。TensorFlow 数据集 API 提供了读取 CSV 文件的便捷方法。这一过程包括解析文件的每一行,将文本转换为数字数据,批处理结果,以及应用任何其他转换。

下面,我们将定义 CSV 文件的列名和默认值:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key'
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
CSV_COLUMNS = [ 'fare_amount', 'pickup_datetime', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude', 'passenger_count', 'key' ] LABEL_COLUMN = 'fare_amount' DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key'
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]

接下来,我们将 CSV 数据集的创建封装到一个函数中,该函数根据文件模式和指定的批量大小读取文件:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def make_csv_dataset(pattern, batch_size):
# Create dataset from CSV files with specified column names and defaults.
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
return ds
# For demonstration, assume the CSV files are located in '../toy_data/'.
temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2)
print("\nSample CSV dataset (prefetched):")
print(temp_ds)
def make_csv_dataset(pattern, batch_size): # Create dataset from CSV files with specified column names and defaults. ds = tf.data.experimental.make_csv_dataset( file_pattern=pattern, batch_size=batch_size, column_names=CSV_COLUMNS, column_defaults=DEFAULTS, header=True ) return ds # For demonstration, assume the CSV files are located in '../toy_data/'. temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2) print("\nSample CSV dataset (prefetched):") print(temp_ds)
def make_csv_dataset(pattern, batch_size):
# Create dataset from CSV files with specified column names and defaults.
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
return ds 
# For demonstration, assume the CSV files are located in '../toy_data/'.
temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2)
print("\nSample CSV dataset (prefetched):")
print(temp_ds)

csv 数据集

为了提高可读性,让我们遍历该数据集的前两个元素,并将它们转换成标准的 Python 字典:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
for data in temp_ds.take(2):
print({k: v.numpy() for k, v in data.items()})
print("\n")
for data in temp_ds.take(2): print({k: v.numpy() for k, v in data.items()}) print("\n")
for data in temp_ds.take(2):
print({k: v.numpy() for k, v in data.items()})
print("\n")

转换成标准的 Python 字典

加载TFRecord文件

TFRecord 是一种针对 TensorFlow 优化的二进制格式。与 CSV 文件相比,它的读取速度更快,对大型数据集而言效率很高。虽然此处提供的代码主要针对 CSV,但在处理 TFRecord 文件时也可以应用类似的技术。

例如:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def parse_tfrecord(example_proto):
# Define the features expected in the TFRecord
feature_description = {
'feature1': tf.io.FixedLenFeature([], tf.float32),
'feature2': tf.io.FixedLenFeature([], tf.float32)
}
return tf.io.parse_single_example(example_proto, feature_description)
# Create a dataset from a TFRecord file
tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord")
tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)
tfrecord_dataset = tfrecord_dataset.batch(4)
# Iterate through the TFRecord dataset
for batch in tfrecord_dataset:
print(batch)
def parse_tfrecord(example_proto): # Define the features expected in the TFRecord feature_description = { 'feature1': tf.io.FixedLenFeature([], tf.float32), 'feature2': tf.io.FixedLenFeature([], tf.float32) } return tf.io.parse_single_example(example_proto, feature_description) # Create a dataset from a TFRecord file tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord") tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord) tfrecord_dataset = tfrecord_dataset.batch(4) # Iterate through the TFRecord dataset for batch in tfrecord_dataset: print(batch)
def parse_tfrecord(example_proto):
# Define the features expected in the TFRecord
feature_description = {
'feature1': tf.io.FixedLenFeature([], tf.float32),
'feature2': tf.io.FixedLenFeature([], tf.float32)
}
return tf.io.parse_single_example(example_proto, feature_description)
# Create a dataset from a TFRecord file
tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord")
tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)
tfrecord_dataset = tfrecord_dataset.batch(4)
# Iterate through the TFRecord dataset
for batch in tfrecord_dataset:
print(batch)

转换数据集:映射、批处理和洗牌

创建数据集后,下一步就是转换数据集。转换是一个宽泛的术语,涵盖多种操作:

  • 映射:该操作将特定函数应用到数据集中的每个元素。例如,您可以将每个数字乘以 2 或执行更复杂的数学运算。
  • 洗牌:对数据集进行洗牌至关重要,因为它可以随机化数据的顺序。随机化有助于防止模型学习到与数据顺序相关的偏差,从而提高模型的泛化能力。
  • 批处理:批处理涉及将数据分组为更小的块。批处理允许您一次性处理多个数据点,而不是将单个数据点输入模型,这样可以提高训练效率。

对于我们的出租车数据集,我们希望将特征与标签(车费金额)分开。我们还希望删除不需要的列,如 pickup_datetime 和 key。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Specify columns that we do not want in our feature dictionary.
UNWANTED_COLS = ['pickup_datetime', 'key']
def extract_features_and_label(row):
# Extract the label (fare_amount)
label = row[LABEL_COLUMN]
# Create a features dictionary by copying the row and removing unwanted columns and the label
features = row.copy()
features.pop(LABEL_COLUMN)
for col in UNWANTED_COLS:
features.pop(col, None)
return features, label
# Specify columns that we do not want in our feature dictionary. UNWANTED_COLS = ['pickup_datetime', 'key'] def extract_features_and_label(row): # Extract the label (fare_amount) label = row[LABEL_COLUMN] # Create a features dictionary by copying the row and removing unwanted columns and the label features = row.copy() features.pop(LABEL_COLUMN) for col in UNWANTED_COLS: features.pop(col, None) return features, label
# Specify columns that we do not want in our feature dictionary.
UNWANTED_COLS = ['pickup_datetime', 'key']
def extract_features_and_label(row):
# Extract the label (fare_amount)
label = row[LABEL_COLUMN]
# Create a features dictionary by copying the row and removing unwanted columns and the label
features = row.copy()
features.pop(LABEL_COLUMN)
for col in UNWANTED_COLS:
features.pop(col, None)
return features, label

我们可以对 CSV 数据集中的几个示例进行迭代,以测试我们的函数:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
for row in temp_ds.take(2):
features, label = extract_features_and_label(row)
print(features)
print(label, "\n")
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()
for row in temp_ds.take(2): features, label = extract_features_and_label(row) print(features) print(label, "\n") assert UNWANTED_COLS[0] not in features.keys() assert UNWANTED_COLS[1] not in features.keys()
for row in temp_ds.take(2):
features, label = extract_features_and_label(row)
print(features)
print(label, "\n")
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()

有序指令

批处理数据

我们可以通过批处理和应用特征标签提取功能来完善数据集创建过程。这有助于形成可直接用于训练循环的数据批次。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def create_dataset(pattern, batch_size):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(extract_features_and_label)
BATCH_SIZE = 2
temp_ds = create_dataset('taxi-train.csv', batch_size=2)
for X_batch, Y_batch in temp_ds.take(2):
print({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE
def create_dataset(pattern, batch_size): # The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset dataset = tf.data.experimental.make_csv_dataset( pattern, batch_size, CSV_COLUMNS, DEFAULTS) return dataset.map(extract_features_and_label) BATCH_SIZE = 2 temp_ds = create_dataset('taxi-train.csv', batch_size=2) for X_batch, Y_batch in temp_ds.take(2): print({k: v.numpy() for k, v in X_batch.items()}) print(Y_batch.numpy(), "\n") assert len(Y_batch) == BATCH_SIZE
def create_dataset(pattern, batch_size):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(extract_features_and_label)
BATCH_SIZE = 2
temp_ds = create_dataset('taxi-train.csv', batch_size=2)
for X_batch, Y_batch in temp_ds.take(2):
print({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE

批处理数据

洗牌和预取实现高效训练

在训练深度学习模型时,关键是要对数据进行洗牌,以便不同的工作者同时处理数据集的不同部分。此外,预取数据有助于将数据加载过程与模型训练重叠,从而提高整体效率。

我们可以扩展数据集创建功能,使其包含洗牌、缓存和预取功能。我们引入了一个模式参数来区分训练(需要洗牌和重复)和评估(不需要)。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def build_csv_pipeline(pattern, batch_size=1, mode='eval'):
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
# Map each row to (features, label)
ds = ds.map(extract_features_and_label)
# Cache the dataset to improve speed if reading from disk repeatedly.
ds = ds.cache()
if mode == 'train':
# Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely.
ds = ds.shuffle(buffer_size=1000).repeat()
# Prefetch the next batch (AUTOTUNE uses optimal settings)
ds = ds.prefetch(tf.data.AUTOTUNE)
return ds
# Testing the pipeline in training mode
print("\nSample batch from training pipeline:")
train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train')
for features, label in train_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
# Testing the pipeline in evaluation mode
print("\nSample batch from evaluation pipeline:")
eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval')
for features, label in eval_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
def build_csv_pipeline(pattern, batch_size=1, mode='eval'): ds = tf.data.experimental.make_csv_dataset( file_pattern=pattern, batch_size=batch_size, column_names=CSV_COLUMNS, column_defaults=DEFAULTS, header=True ) # Map each row to (features, label) ds = ds.map(extract_features_and_label) # Cache the dataset to improve speed if reading from disk repeatedly. ds = ds.cache() if mode == 'train': # Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely. ds = ds.shuffle(buffer_size=1000).repeat() # Prefetch the next batch (AUTOTUNE uses optimal settings) ds = ds.prefetch(tf.data.AUTOTUNE) return ds # Testing the pipeline in training mode print("\nSample batch from training pipeline:") train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train') for features, label in train_ds.take(1): print({k: v.numpy() for k, v in features.items()}) print("Label:", label.numpy()) # Testing the pipeline in evaluation mode print("\nSample batch from evaluation pipeline:") eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval') for features, label in eval_ds.take(1): print({k: v.numpy() for k, v in features.items()}) print("Label:", label.numpy())
def build_csv_pipeline(pattern, batch_size=1, mode='eval'):
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
# Map each row to (features, label)
ds = ds.map(extract_features_and_label)
# Cache the dataset to improve speed if reading from disk repeatedly.
ds = ds.cache()
if mode == 'train':
# Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely.
ds = ds.shuffle(buffer_size=1000).repeat()
# Prefetch the next batch (AUTOTUNE uses optimal settings)
ds = ds.prefetch(tf.data.AUTOTUNE)
return ds
# Testing the pipeline in training mode
print("\nSample batch from training pipeline:")
train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train')
for features, label in train_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
# Testing the pipeline in evaluation mode
print("\nSample batch from evaluation pipeline:")
eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval')
for features, label in eval_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())

洗牌和预取实现高效训练

数据增强和高级技术

数据增强是深度学习的一项基本技术,尤其是在图像处理等领域。数据集应用程序接口(Dataset API)允许您将增强技术直接集成到您的管道中。例如,如果您希望在数据集中添加随机噪音,您可以使用数据集 API 来实现这一功能:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
def augment_data(x):
return x + tf.random.uniform([], -0.5, 0.5)
# Apply data augmentation
augmented_dataset = dataset.map(augment_data)
def augment_data(x): return x + tf.random.uniform([], -0.5, 0.5) # Apply data augmentation augmented_dataset = dataset.map(augment_data)
def augment_data(x):
return x + tf.random.uniform([], -0.5, 0.5)
# Apply data augmentation
augmented_dataset = dataset.map(augment_data)

这一步骤可以增加数据的多样性,帮助模型在训练过程中更好地泛化。

优化数据管道

为了进一步提高性能,可以考虑使用缓存和预取技术。缓存可将已处理数据集的状态保存在内存或磁盘中,而预取则可将数据准备与模型执行重叠:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)

生产管道的最佳实践

从实验转向生产时,请考虑以下最佳实践:

  • 模块化管道设计:将管道分解为可重复使用的小功能。
  • 强大的错误处理:实施从容处理损坏或丢失数据的机制。
  • 可扩展性测试:在扩展到更大的数据集之前,先用小型数据子集验证你的管道。
  • 性能监控:持续跟踪管道性能,识别并解决潜在瓶颈。

通过遵循这些指导原则,您可以确保您的数据管道即使在生产负荷很重的情况下也能保持高效和可靠。

您可以在链接中找到笔记本和输出结果 – 单击此处

参考资料:谷歌云平台代码库

小结

TensorFlow 数据集 API 是创建高效、可扩展机器学习管道的基本组件。在本指南中,我们首先更新了线性回归示例,以便使用在内存中创建的 TensorFlow 数据集。然后,我们演示了如何从磁盘(尤其是 CSV 文件)加载数据,并解释了如何为训练和评估转换、批处理和洗牌数据。

在本指南中,我们探讨了如何使用 TensorFlow 数据集 API 构建和优化数据管道。从内存中生成的合成数据开始,我们逐步创建数据集、应用转换并将这些管道集成到训练循环中。我们还介绍了从磁盘(尤其是 CSV 文件)加载数据的实用技术,并演示了如何结合洗牌、缓存和预取来提高性能。

通过使用函数来提取特征和标签、批量处理数据,并利用洗牌、缓存和预取功能构建强大的管道,您可以简化机器学习模型的数据摄取流程。这些技术不仅能简化代码,还能确保数据高效地进入训练循环,从而提高模型性能。

  • 高效的数据处理是关键:TensorFlow 数据集 API 简化了数据管道,提高了模型性能。
  • Vertex AI 工作台简化了 ML 开发:可管理的 Jupyter Notebook 环境,预装了 ML 库。
  • 优化数据加载:使用批处理、缓存和预取等操作来提高训练效率。
  • 无缝模型集成:轻松将数据集纳入 TensorFlow 训练例程。
  • 数据增强可增强 ML 模型:利用转换技术增强训练数据集,提高准确性。

评论留言