
如何確保機器學習模型獲得茁壯成長所需的高質量資料?在當今的機器學習領域,妥善處理資料與構建強大的模型同等重要。將高質量、結構良好的資料輸入到模型中可以顯著影響效能和訓練速度。TensorFlow Dataset API 透過提供一系列工具來構建、管理和最佳化資料管道,從而簡化了這一過程。在本指南中,我們將從使用 Vertex AI Workbench 配置開發環境,到從各種來源載入資料,再到將這些管道納入模型訓練流程,一步步進行講解。
- 從記憶體陣列以及 CSV 和 TFRecord 檔案等外部資料來源構建資料集。
- 利用對映、洗牌、批處理、快取和預取等操作來簡化資料處理。
- 將您的資料集無縫整合到 TensorFlow 的模型訓練例程中,以實現高效的模型開發。
- 學習啟動 Vertex AI Workbench 例項、設定 Jupyter Notebook 並開始工作。
- 在資料管道中直接應用資料增強技術,從而增強機器學習模型。
什麼是TensorFlow?
TensorFlow 是谷歌為機器學習和深度學習研究開發的開源平臺。它提供了一個廣泛的工具和庫生態系統,使研究人員能夠突破機器學習的極限,使開發人員能夠輕鬆構建和部署智慧應用程式。TensorFlow 既支援高階 API(如 Keras),也支援低階操作,使初學者也能使用,而高階使用者仍能使用其強大功能。
什麼是Vertex AI工作臺?
Vertex AI Workbench 是谷歌雲中的一個託管開發環境,旨在幫助您構建和訓練機器學習模型。它提供完全託管的 Jupyter Notebook 體驗以及預裝的機器學習庫,包括 TensorFlow 和 PyTorch。藉助 Vertex AI Workbench,您可以將本地開發與雲端計算資源無縫整合,從而更輕鬆地開展大型專案,而不必擔心基礎設施的設定。
在本指南中,您不僅將學習如何使用 TensorFlow 的資料集 API,還將瞭解如何使用 Vertex AI Workbench 設定環境。我們將介紹從啟動新例項、建立 Jupyter Notebook 到載入資料集的所有內容。
瞭解TensorFlow資料集API
TensorFlow 資料集 API 是一套旨在簡化資料輸入管道構建過程的工具。在任何機器學習任務中,模型的效能不僅取決於演算法本身,還取決於輸入資料的質量和流量。資料集應用程式介面(Dataset API)可讓您隨時隨地執行載入資料、預處理資料和轉換資料等任務。
該應用程式介面之所以如此強大,是因為它能以簡單易懂的順序將多個操作串聯起來。您可以從各種來源載入資料,應用必要的轉換(如縮放或歸一化),甚至可以對資料進行洗牌,以防止模型過度擬合。這種方法不僅能讓程式碼更簡潔、更易於維護,還能利用快取和預取等技術最佳化效能。
使用Vertex AI工作臺設定環境
在開始使用 TensorFlow 資料集 API 之前,您需要一個強大的環境。Vertex AI Workbench 是一個很好的選擇,因為它提供了一個完全託管、基於雲的開發環境,並預裝了您需要的所有工具。
啟動Vertex AI工作臺例項
- 首先登入谷歌雲賬戶。從導航選單中搜尋並選擇 Vertex AI。

- 點選“Enable All Recommended APIs”按鈕。這將確保您的專案可以訪問所有必要的 API 服務。
- 在導航選單中點選工作臺。確保您處於“Instances”檢視中。

- 單擊“Create New”啟動新的工作臺例項。系統將提示您配置例項:
- 名稱:給例項起一個有意義的名字,如 lab-workbench。
- 地區和區域:選擇例項所在的適當地區和區域。
- 高階選項:如有需要,可透過選擇機器型別或磁碟大小等選項自定義例項設定。
- 配置完成後,單擊“Create”。例項設定可能需要幾分鐘時間。一旦準備就緒,你會看到其名稱旁邊有一個綠色的複選標記。

- 單擊例項名稱旁邊的 Open JupyterLab。這將在瀏覽器的新標籤頁中開啟 Jupyter Lab 介面。

建立Jupyter Notebook
開啟 JupyterLab 介面後,點選 Python 3 圖示就可以建立一個新的 Python 筆記本。將筆記本重新命名為描述性名稱是個好主意。為此,右鍵單擊檔名(最初可能是 Untitled.ipynb),然後選擇重新命名筆記本。選擇一個能反映專案的名稱,如“new_project”。同時將核心從 python 3 改為 TensorFlow 2-11 (Local)。

使用tf.data處理資料
首先將 taxi-train.csv 和 taxi-valid.csv 資料集上傳至筆記本。

匯入所需庫函式
首先,我們需要匯入 TensorFlow 和 NumPy,然後將 TensorFlow 的日誌級別設定為最小。這樣可以減少執行過程中的日誌冗餘。
print("TensorFlow version:", tf.version.VERSION)
# Set minimal TF logging level.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import numpy as np
print("TensorFlow version:", tf.version.VERSION)
# Set minimal TF logging level.
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
import numpy as np
print("TensorFlow version:", tf.version.VERSION)
# Set minimal TF logging level.
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
從記憶體中建立資料集
環境設定完成後,就可以開始處理資料了。最簡單的方法是從記憶體中建立資料集。這意味著將儲存在計算機記憶體中的資料(如列表或 NumPy 陣列)轉換為 TensorFlow 可以處理的格式。
想象一下,你有一小組數字,想用來做一個基本實驗。透過 TensorFlow 資料集 API,您可以快速將這些數字轉換為可進一步處理的資料集。這個過程非常簡單,而且可以擴充套件到更復雜的資料結構。
例如,您可以從一個包含多個數字的簡單 NumPy 陣列開始。使用資料集 API,您可以從這個陣列建立一個資料集。然後可以對資料集進行迭代,並應用各種轉換,例如為每個元素對映一個函式。
建立合成資料集
我們首先建立一個合成資料集。在本例中,我們使用線性方程 y=2x+10 生成特徵向量 X 和相應的標籤向量 Y。
X = tf.constant(range(N_POINTS), dtype=tf.float32)
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10
N_POINTS = 10
X = tf.constant(range(N_POINTS), dtype=tf.float32)
Y = 2 * X + 10
接下來,我們定義了一個函式,該函式接受我們的特徵和標籤陣列,以及訓練次數(epochs)和所需的批次大小。該函式透過切分張量、重複指定次數的訓練和批處理(丟棄任何剩餘示例以保持批處理大小一致)來構建 TensorFlow 資料集。
def make_synthetic_dataset(X, Y, epochs, batch_size):
# Create the dataset from tensor slices
ds = tf.data.Dataset.from_tensor_slices((X, Y))
# Repeat the dataset and batch it (drop the remainder for consistency)
ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True)
def make_synthetic_dataset(X, Y, epochs, batch_size):
# Create the dataset from tensor slices
ds = tf.data.Dataset.from_tensor_slices((X, Y))
# Repeat the dataset and batch it (drop the remainder for consistency)
ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True)
return ds
def make_synthetic_dataset(X, Y, epochs, batch_size):
# Create the dataset from tensor slices
ds = tf.data.Dataset.from_tensor_slices((X, Y))
# Repeat the dataset and batch it (drop the remainder for consistency)
ds = ds.repeat(epochs).batch(batch_size, drop_remainder=True)
return ds
讓我們以 3 個資料點為一批,在資料集上迭代兩次,測試我們的函式:
dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
print("Synthetic dataset batches:")
for i, (x_batch, y_batch) in enumerate(dataset):
print(f"Batch {i}: x: {x_batch.numpy()} y: {y_batch.numpy()}")
assert len(x_batch) == BATCH_SIZE
assert len(y_batch) == BATCH_SIZE
BATCH_SIZE = 3
EPOCHS = 2
dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
print("Synthetic dataset batches:")
for i, (x_batch, y_batch) in enumerate(dataset):
print(f"Batch {i}: x: {x_batch.numpy()} y: {y_batch.numpy()}")
assert len(x_batch) == BATCH_SIZE
assert len(y_batch) == BATCH_SIZE
BATCH_SIZE = 3
EPOCHS = 2
dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
print("Synthetic dataset batches:")
for i, (x_batch, y_batch) in enumerate(dataset):
print(f"Batch {i}: x: {x_batch.numpy()} y: {y_batch.numpy()}")
assert len(x_batch) == BATCH_SIZE
assert len(y_batch) == BATCH_SIZE

損失函式和梯度計算
接下來,我們將定義均方誤差(MSE)損失函式和計算梯度的輔助函式。這些函式仍與我們之前的實現類似。
def loss_mse(X, Y, w0, w1):
error = (Y_pred - Y) ** 2
return tf.reduce_mean(error)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
current_loss = loss_mse(X, Y, w0, w1)
return tape.gradient(current_loss, [w0, w1]), current_loss
def loss_mse(X, Y, w0, w1):
Y_pred = w0 * X + w1
error = (Y_pred - Y) ** 2
return tf.reduce_mean(error)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
current_loss = loss_mse(X, Y, w0, w1)
return tape.gradient(current_loss, [w0, w1]), current_loss
def loss_mse(X, Y, w0, w1):
Y_pred = w0 * X + w1
error = (Y_pred - Y) ** 2
return tf.reduce_mean(error)
def compute_gradients(X, Y, w0, w1):
with tf.GradientTape() as tape:
current_loss = loss_mse(X, Y, w0, w1)
return tape.gradient(current_loss, [w0, w1]), current_loss
訓練迴圈
現在,我們更新訓練迴圈,使其遍歷函式建立的 tf.data.Dataset 資料集。在本例中,我們使用 2 的批次大小對模型進行了 250 次訓練。
首先,將模型引數初始化為 TensorFlow 變數:
# Initialize model parameters
# Create the training dataset (synthetic)
train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)
# Initialize model parameters
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)
EPOCHS_TRAIN = 250
BATCH_SIZE_TRAIN = 2
LEARNING_RATE = 0.02
# Create the training dataset (synthetic)
train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)
# Initialize model parameters
w0 = tf.Variable(0.0)
w1 = tf.Variable(0.0)
EPOCHS_TRAIN = 250
BATCH_SIZE_TRAIN = 2
LEARNING_RATE = 0.02
# Create the training dataset (synthetic)
train_dataset = make_synthetic_dataset(X, Y, epochs=EPOCHS_TRAIN, batch_size=BATCH_SIZE_TRAIN)
然後,我們使用隨機梯度下降法執行訓練迴圈。該迴圈每更新一批模型引數,我們每 100 步列印一次訓練狀態。
print("\nStarting training loop for synthetic linear regression:")
MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}"
for step, (X_batch, Y_batch) in enumerate(train_dataset):
grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1)
# Update the parameters using gradient descent
w0.assign_sub(LEARNING_RATE * grads[0])
w1.assign_sub(LEARNING_RATE * grads[1])
print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy()))
# Final assertions (tolerance based)
assert abs(w0.numpy() - 2) < 1e-3
assert abs(w1.numpy() - 10) < 1e-3
# Training loop
print("\nStarting training loop for synthetic linear regression:")
MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}"
for step, (X_batch, Y_batch) in enumerate(train_dataset):
grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1)
# Update the parameters using gradient descent
w0.assign_sub(LEARNING_RATE * grads[0])
w1.assign_sub(LEARNING_RATE * grads[1])
if step % 100 == 0:
print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy()))
# Final assertions (tolerance based)
assert loss_val < 1e-6
assert abs(w0.numpy() - 2) < 1e-3
assert abs(w1.numpy() - 10) < 1e-3
# Training loop
print("\nStarting training loop for synthetic linear regression:")
MSG = "Step {step} - loss: {loss:.6f}, w0: {w0:.6f}, w1: {w1:.6f}"
for step, (X_batch, Y_batch) in enumerate(train_dataset):
grads, loss_val = compute_gradients(X_batch, Y_batch, w0, w1)
# Update the parameters using gradient descent
w0.assign_sub(LEARNING_RATE * grads[0])
w1.assign_sub(LEARNING_RATE * grads[1])
if step % 100 == 0:
print(MSG.format(step=step, loss=loss_val.numpy(), w0=w0.numpy(), w1=w1.numpy()))
# Final assertions (tolerance based)
assert loss_val < 1e-6
assert abs(w0.numpy() - 2) < 1e-3
assert abs(w1.numpy() - 10) < 1e-3

從磁碟載入資料
在實際應用中,資料通常儲存在磁碟上而不是記憶體中。使用這些方法從磁碟載入資料可確保高效處理大型資料集,併為模型訓練做好準備。兩種常見的資料儲存格式是 CSV 和 TFRecord。
載入CSV檔案
CSV(逗號分隔值)檔案廣泛用於儲存表格資料。TensorFlow 資料集 API 提供了讀取 CSV 檔案的便捷方法。這一過程包括解析檔案的每一行,將文字轉換為數字資料,批處理結果,以及應用任何其他轉換。
下面,我們將定義 CSV 檔案的列名和預設值:
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key'
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
CSV_COLUMNS = [
'fare_amount',
'pickup_datetime',
'pickup_longitude',
'pickup_latitude',
'dropoff_longitude',
'dropoff_latitude',
'passenger_count',
'key'
]
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], ['na'], [0.0], [0.0], [0.0], [0.0], [0.0], ['na']]
接下來,我們將 CSV 資料集的建立封裝到一個函式中,該函式根據檔案模式和指定的批次大小讀取檔案:
def make_csv_dataset(pattern, batch_size):
# Create dataset from CSV files with specified column names and defaults.
ds = tf.data.experimental.make_csv_dataset(
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
# For demonstration, assume the CSV files are located in '../toy_data/'.
temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2)
print("\nSample CSV dataset (prefetched):")
def make_csv_dataset(pattern, batch_size):
# Create dataset from CSV files with specified column names and defaults.
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
return ds
# For demonstration, assume the CSV files are located in '../toy_data/'.
temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2)
print("\nSample CSV dataset (prefetched):")
print(temp_ds)
def make_csv_dataset(pattern, batch_size):
# Create dataset from CSV files with specified column names and defaults.
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
return ds
# For demonstration, assume the CSV files are located in '../toy_data/'.
temp_ds = make_csv_dataset('taxi-train.csv', batch_size=2)
print("\nSample CSV dataset (prefetched):")
print(temp_ds)

為了提高可讀性,讓我們遍歷該資料集的前兩個元素,並將它們轉換成標準的 Python 字典:
for data in temp_ds.take(2):
print({k: v.numpy() for k, v in data.items()})
for data in temp_ds.take(2):
print({k: v.numpy() for k, v in data.items()})
print("\n")
for data in temp_ds.take(2):
print({k: v.numpy() for k, v in data.items()})
print("\n")

載入TFRecord檔案
TFRecord 是一種針對 TensorFlow 最佳化的二進位制格式。與 CSV 檔案相比,它的讀取速度更快,對大型資料集而言效率很高。雖然此處提供的程式碼主要針對 CSV,但在處理 TFRecord 檔案時也可以應用類似的技術。
例如:
def parse_tfrecord(example_proto):
# Define the features expected in the TFRecord
'feature1': tf.io.FixedLenFeature([], tf.float32),
'feature2': tf.io.FixedLenFeature([], tf.float32)
return tf.io.parse_single_example(example_proto, feature_description)
# Create a dataset from a TFRecord file
tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord")
tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)
tfrecord_dataset = tfrecord_dataset.batch(4)
# Iterate through the TFRecord dataset
for batch in tfrecord_dataset:
def parse_tfrecord(example_proto):
# Define the features expected in the TFRecord
feature_description = {
'feature1': tf.io.FixedLenFeature([], tf.float32),
'feature2': tf.io.FixedLenFeature([], tf.float32)
}
return tf.io.parse_single_example(example_proto, feature_description)
# Create a dataset from a TFRecord file
tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord")
tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)
tfrecord_dataset = tfrecord_dataset.batch(4)
# Iterate through the TFRecord dataset
for batch in tfrecord_dataset:
print(batch)
def parse_tfrecord(example_proto):
# Define the features expected in the TFRecord
feature_description = {
'feature1': tf.io.FixedLenFeature([], tf.float32),
'feature2': tf.io.FixedLenFeature([], tf.float32)
}
return tf.io.parse_single_example(example_proto, feature_description)
# Create a dataset from a TFRecord file
tfrecord_dataset = tf.data.TFRecordDataset("data/sample_data.tfrecord")
tfrecord_dataset = tfrecord_dataset.map(parse_tfrecord)
tfrecord_dataset = tfrecord_dataset.batch(4)
# Iterate through the TFRecord dataset
for batch in tfrecord_dataset:
print(batch)
轉換資料集:對映、批處理和洗牌
建立資料集後,下一步就是轉換資料集。轉換是一個寬泛的術語,涵蓋多種操作:
- 對映:該操作將特定函式應用到資料集中的每個元素。例如,您可以將每個數字乘以 2 或執行更復雜的數學運算。
- 洗牌:對資料集進行洗牌至關重要,因為它可以隨機化資料的順序。隨機化有助於防止模型學習到與資料順序相關的偏差,從而提高模型的泛化能力。
- 批處理:批處理涉及將資料分組為更小的塊。批處理允許您一次性處理多個資料點,而不是將單個資料點輸入模型,這樣可以提高訓練效率。
對於我們的計程車資料集,我們希望將特徵與標籤(車費金額)分開。我們還希望刪除不需要的列,如 pickup_datetime 和 key。
# Specify columns that we do not want in our feature dictionary.
UNWANTED_COLS = ['pickup_datetime', 'key']
def extract_features_and_label(row):
# Extract the label (fare_amount)
label = row[LABEL_COLUMN]
# Create a features dictionary by copying the row and removing unwanted columns and the label
features.pop(LABEL_COLUMN)
for col in UNWANTED_COLS:
# Specify columns that we do not want in our feature dictionary.
UNWANTED_COLS = ['pickup_datetime', 'key']
def extract_features_and_label(row):
# Extract the label (fare_amount)
label = row[LABEL_COLUMN]
# Create a features dictionary by copying the row and removing unwanted columns and the label
features = row.copy()
features.pop(LABEL_COLUMN)
for col in UNWANTED_COLS:
features.pop(col, None)
return features, label
# Specify columns that we do not want in our feature dictionary.
UNWANTED_COLS = ['pickup_datetime', 'key']
def extract_features_and_label(row):
# Extract the label (fare_amount)
label = row[LABEL_COLUMN]
# Create a features dictionary by copying the row and removing unwanted columns and the label
features = row.copy()
features.pop(LABEL_COLUMN)
for col in UNWANTED_COLS:
features.pop(col, None)
return features, label
我們可以對 CSV 資料集中的幾個示例進行迭代,以測試我們的函式:
for row in temp_ds.take(2):
features, label = extract_features_and_label(row)
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()
for row in temp_ds.take(2):
features, label = extract_features_and_label(row)
print(features)
print(label, "\n")
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()
for row in temp_ds.take(2):
features, label = extract_features_and_label(row)
print(features)
print(label, "\n")
assert UNWANTED_COLS[0] not in features.keys()
assert UNWANTED_COLS[1] not in features.keys()

批處理資料
我們可以透過批處理和應用特徵標籤提取功能來完善資料集建立過程。這有助於形成可直接用於訓練迴圈的資料批次。
def create_dataset(pattern, batch_size):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(extract_features_and_label)
temp_ds = create_dataset('taxi-train.csv', batch_size=2)
for X_batch, Y_batch in temp_ds.take(2):
print({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE
def create_dataset(pattern, batch_size):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(extract_features_and_label)
BATCH_SIZE = 2
temp_ds = create_dataset('taxi-train.csv', batch_size=2)
for X_batch, Y_batch in temp_ds.take(2):
print({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE
def create_dataset(pattern, batch_size):
# The tf.data.experimental.make_csv_dataset() method reads CSV files into a dataset
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size, CSV_COLUMNS, DEFAULTS)
return dataset.map(extract_features_and_label)
BATCH_SIZE = 2
temp_ds = create_dataset('taxi-train.csv', batch_size=2)
for X_batch, Y_batch in temp_ds.take(2):
print({k: v.numpy() for k, v in X_batch.items()})
print(Y_batch.numpy(), "\n")
assert len(Y_batch) == BATCH_SIZE

洗牌和預取實現高效訓練
在訓練深度學習模型時,關鍵是要對資料進行洗牌,以便不同的工作者同時處理資料集的不同部分。此外,預取資料有助於將資料載入過程與模型訓練重疊,從而提高整體效率。
我們可以擴充套件資料集建立功能,使其包含洗牌、快取和預取功能。我們引入了一個模式引數來區分訓練(需要洗牌和重複)和評估(不需要)。
def build_csv_pipeline(pattern, batch_size=1, mode='eval'):
ds = tf.data.experimental.make_csv_dataset(
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
# Map each row to (features, label)
ds = ds.map(extract_features_and_label)
# Cache the dataset to improve speed if reading from disk repeatedly.
# Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely.
ds = ds.shuffle(buffer_size=1000).repeat()
# Prefetch the next batch (AUTOTUNE uses optimal settings)
ds = ds.prefetch(tf.data.AUTOTUNE)
# Testing the pipeline in training mode
print("\nSample batch from training pipeline:")
train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train')
for features, label in train_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
# Testing the pipeline in evaluation mode
print("\nSample batch from evaluation pipeline:")
eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval')
for features, label in eval_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
def build_csv_pipeline(pattern, batch_size=1, mode='eval'):
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
# Map each row to (features, label)
ds = ds.map(extract_features_and_label)
# Cache the dataset to improve speed if reading from disk repeatedly.
ds = ds.cache()
if mode == 'train':
# Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely.
ds = ds.shuffle(buffer_size=1000).repeat()
# Prefetch the next batch (AUTOTUNE uses optimal settings)
ds = ds.prefetch(tf.data.AUTOTUNE)
return ds
# Testing the pipeline in training mode
print("\nSample batch from training pipeline:")
train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train')
for features, label in train_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
# Testing the pipeline in evaluation mode
print("\nSample batch from evaluation pipeline:")
eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval')
for features, label in eval_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
def build_csv_pipeline(pattern, batch_size=1, mode='eval'):
ds = tf.data.experimental.make_csv_dataset(
file_pattern=pattern,
batch_size=batch_size,
column_names=CSV_COLUMNS,
column_defaults=DEFAULTS,
header=True
)
# Map each row to (features, label)
ds = ds.map(extract_features_and_label)
# Cache the dataset to improve speed if reading from disk repeatedly.
ds = ds.cache()
if mode == 'train':
# Shuffle with a buffer size (here, arbitrarily using 1000) and repeat indefinitely.
ds = ds.shuffle(buffer_size=1000).repeat()
# Prefetch the next batch (AUTOTUNE uses optimal settings)
ds = ds.prefetch(tf.data.AUTOTUNE)
return ds
# Testing the pipeline in training mode
print("\nSample batch from training pipeline:")
train_ds = build_csv_pipeline('taxi-train.csv', batch_size=2, mode='train')
for features, label in train_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())
# Testing the pipeline in evaluation mode
print("\nSample batch from evaluation pipeline:")
eval_ds = build_csv_pipeline('taxi-valid.csv', batch_size=2, mode='eval')
for features, label in eval_ds.take(1):
print({k: v.numpy() for k, v in features.items()})
print("Label:", label.numpy())

資料增強和高階技術
資料增強是深度學習的一項基本技術,尤其是在影像處理等領域。資料集應用程式介面(Dataset API)允許您將增強技術直接整合到您的管道中。例如,如果您希望在資料集中新增隨機噪音,您可以使用資料集 API 來實現這一功能:
return x + tf.random.uniform([], -0.5, 0.5)
# Apply data augmentation
augmented_dataset = dataset.map(augment_data)
def augment_data(x):
return x + tf.random.uniform([], -0.5, 0.5)
# Apply data augmentation
augmented_dataset = dataset.map(augment_data)
def augment_data(x):
return x + tf.random.uniform([], -0.5, 0.5)
# Apply data augmentation
augmented_dataset = dataset.map(augment_data)
這一步驟可以增加資料的多樣性,幫助模型在訓練過程中更好地泛化。
最佳化資料管道
為了進一步提高效能,可以考慮使用快取和預取技術。快取可將已處理資料集的狀態儲存在記憶體或磁碟中,而預取則可將資料準備與模型執行重疊:
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)
optimized_dataset = dataset.cache().shuffle(100).batch(32).prefetch(tf.data.AUTOTUNE)
生產管道的最佳實踐
從實驗轉向生產時,請考慮以下最佳實踐:
- 模組化管道設計:將管道分解為可重複使用的小功能。
- 強大的錯誤處理:實施從容處理損壞或丟失資料的機制。
- 可擴充套件性測試:在擴充套件到更大的資料集之前,先用小型資料子集驗證你的管道。
- 效能監控:持續跟蹤管道效能,識別並解決潛在瓶頸。
透過遵循這些指導原則,您可以確保您的資料管道即使在生產負荷很重的情況下也能保持高效和可靠。
您可以在連結中找到筆記本和輸出結果 – 單擊此處。
參考資料:谷歌雲平臺程式碼庫
小結
TensorFlow 資料集 API 是建立高效、可擴充套件機器學習管道的基本元件。在本指南中,我們首先更新了線性迴歸示例,以便使用在記憶體中建立的 TensorFlow 資料集。然後,我們演示瞭如何從磁碟(尤其是 CSV 檔案)載入資料,並解釋瞭如何為訓練和評估轉換、批處理和洗牌資料。
在本指南中,我們探討了如何使用 TensorFlow 資料集 API 構建和最佳化資料管道。從記憶體中生成的合成資料開始,我們逐步建立資料集、應用轉換並將這些管道整合到訓練迴圈中。我們還介紹了從磁碟(尤其是 CSV 檔案)載入資料的實用技術,並演示瞭如何結合洗牌、快取和預取來提高效能。
透過使用函式來提取特徵和標籤、批次處理資料,並利用洗牌、快取和預取功能構建強大的管道,您可以簡化機器學習模型的資料攝取流程。這些技術不僅能簡化程式碼,還能確保資料高效地進入訓練迴圈,從而提高模型效能。
- 高效的資料處理是關鍵:TensorFlow 資料集 API 簡化了資料管道,提高了模型效能。
- Vertex AI 工作臺簡化了 ML 開發:可管理的 Jupyter Notebook 環境,預裝了 ML 庫。
- 最佳化資料載入:使用批處理、快取和預取等操作來提高訓練效率。
- 無縫模型整合:輕鬆將資料集納入 TensorFlow 訓練例程。
- 資料增強可增強 ML 模型:利用轉換技術增強訓練資料集,提高準確性。
評論留言