Create efficient data pipelines with tf.data
Limited to specific tools
Additional assets for this skill
This skill is limited to using the following tools:
Build efficient, scalable data pipelines using the tf.data API for optimal training performance. This skill covers dataset creation, transformations, batching, shuffling, prefetching, and advanced optimization techniques to maximize GPU/TPU utilization.
import tensorflow as tf
import numpy as np
# Create dataset from numpy arrays
x_train = np.random.rand(1000, 28, 28, 1)
y_train = np.random.randint(0, 10, 1000)
# Method 1: from_tensor_slices
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Apply transformations
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)
# Iterate through dataset
for batch_x, batch_y in dataset.take(2):
print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")
def data_generator():
"""Generator function for custom data loading."""
for i in range(1000):
# Simulate loading data from disk or API
x = np.random.rand(28, 28, 1).astype(np.float32)
y = np.random.randint(0, 10)
yield x, y
# Create dataset from generator
dataset = tf.data.Dataset.from_generator(
data_generator,
output_signature=(
tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32),
tf.TensorSpec(shape=(), dtype=tf.int32)
)
)
dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
# Create simple range dataset
dataset = tf.data.Dataset.range(1000)
# Use with custom mapping
dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10))
dataset = dataset.batch(32)
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Apply normalization
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def augment(image, label):
"""Apply random augmentations."""
image = tf.image.random_flip_left_right(image)
image = tf.image.random_brightness(image, 0.2)
image = tf.image.random_contrast(image, 0.8, 1.2)
return image, label
def normalize(image, label):
"""Normalize pixel values."""
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Build complete pipeline
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after normalization
.shuffle(1000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def resize_image(image, label):
"""Resize images to target size."""
image = tf.image.resize(image, [224, 224])
return image, label
def apply_random_rotation(image, label):
"""Apply random rotation augmentation."""
angle = tf.random.uniform([], -0.2, 0.2)
image = tfa.image.rotate(image, angle)
return image, label
# Chain multiple transformations
dataset = (
tf.data.Dataset.from_tensor_slices((images, labels))
.map(resize_image, num_parallel_calls=tf.data.AUTOTUNE)
.map(normalize, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.map(augment, num_parallel_calls=tf.data.AUTOTUNE)
.map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
# Batch size
BATCH_SIZE = 64
# Buffer size to shuffle the dataset
# (TF data is designed to work with possibly infinite sequences,
# so it doesn't attempt to shuffle the entire sequence in memory. Instead,
# it maintains a buffer in which it shuffles elements).
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
# Variable batch sizes based on sequence length
def batch_by_sequence_length(dataset, batch_size, max_length):
"""Batch sequences by length for efficient padding."""
def key_func(x, y):
# Bucket by length
return tf.cast(tf.size(x) / max_length * 10, tf.int64)
def reduce_func(key, dataset):
return dataset.batch(batch_size)
return dataset.group_by_window(
key_func=key_func,
reduce_func=reduce_func,
window_size=batch_size
)
def create_stratified_dataset(features, labels, batch_size):
"""Create dataset with balanced class sampling."""
# Separate by class
datasets = []
for class_id in range(num_classes):
mask = labels == class_id
class_dataset = tf.data.Dataset.from_tensor_slices(
(features[mask], labels[mask])
)
datasets.append(class_dataset)
# Sample equally from each class
balanced_dataset = tf.data.Dataset.sample_from_datasets(
datasets,
weights=[1.0/num_classes] * num_classes
)
return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
# Cache in memory (for small datasets)
dataset = dataset.cache()
# Cache to disk (for larger datasets)
dataset = dataset.cache('/tmp/dataset_cache')
# Optimal caching placement
dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE)
.cache() # Cache after expensive operations
.shuffle(buffer_size)
.map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(tf.data.AUTOTUNE)
)
# Automatic prefetching
dataset = dataset.prefetch(tf.data.AUTOTUNE)
# Manual prefetch buffer size
dataset = dataset.prefetch(buffer_size=2)
# Complete optimized pipeline
optimized_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(10000)
.batch(64)
.prefetch(tf.data.AUTOTUNE)
)
# Use num_parallel_calls for CPU-bound operations
dataset = dataset.map(
preprocessing_function,
num_parallel_calls=tf.data.AUTOTUNE
)
# Interleave for parallel file reading
def make_dataset_from_file(filename):
return tf.data.TextLineDataset(filename)
filenames = tf.data.Dataset.list_files('/path/to/data/*.csv')
dataset = filenames.interleave(
make_dataset_from_file,
cycle_length=4,
num_parallel_calls=tf.data.AUTOTUNE
)
# Use take() and skip() for train/val split without loading all data
total_size = 10000
train_size = int(0.8 * total_size)
full_dataset = tf.data.Dataset.from_tensor_slices((x, y))
train_dataset = (
full_dataset
.take(train_size)
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
full_dataset
.skip(train_size)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Basic iteration
for i in tf.data.Dataset.range(3):
tf.print('iteration:', i)
# With dataset iterator
for i in iter(tf.data.Dataset.range(3)):
tf.print('iteration:', i)
# Distribute dataset across devices
for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset(
tf.data.Dataset.range(3)):
tf.print('iteration:', i)
# Multi-GPU distribution
strategy = tf.distribute.MirroredStrategy()
distributed_dataset = strategy.experimental_distribute_dataset(dataset)
# Execute training loop over dataset
for images, labels in train_ds:
if optimizer.iterations > TRAIN_STEPS:
break
train_step(images, labels)
def f(args):
embeddings, index = args
# embeddings [vocab_size, embedding_dim]
# index []
# desired result: [embedding_dim]
return tf.gather(params=embeddings, indices=index)
@tf.function
def f_auto_vectorized(embeddings, indices):
# embeddings [num_heads, vocab_size, embedding_dim]
# indices [num_heads]
# desired result: [num_heads, embedding_dim]
return tf.vectorized_map(f, [embeddings, indices])
concrete_vectorized = f_auto_vectorized.get_concrete_function(
tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32),
tf.TensorSpec(shape=[None], dtype=tf.int32))
# Use dataset with model
model = tf.keras.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
model.fit(train_dataset, epochs=1)
# Create separate train and validation datasets
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
val_dataset = (
tf.data.Dataset.from_tensor_slices((x_val, y_val))
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Train with validation
history = model.fit(
train_dataset,
validation_data=val_dataset,
epochs=10
)
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = loss_fn(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return loss
# Training loop with dataset
for epoch in range(epochs):
for images, labels in train_dataset:
loss = train_step(images, labels)
print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')
# Reading TFRecord files
def parse_tfrecord(example_proto):
feature_description = {
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}
parsed = tf.io.parse_single_example(example_proto, feature_description)
image = tf.io.decode_raw(parsed['image'], tf.float32)
image = tf.reshape(image, [28, 28, 1])
label = parsed['label']
return image, label
# Load TFRecord dataset
tfrecord_dataset = (
tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord'])
.map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE)
.shuffle(10000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Load CSV dataset
def parse_csv(line):
columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785)
label = tf.cast(columns[0], tf.int32)
features = tf.stack(columns[1:])
features = tf.reshape(features, [28, 28, 1])
return features, label
csv_dataset = (
tf.data.TextLineDataset(['data.csv'])
.skip(1) # Skip header
.map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
def load_and_preprocess_image(path, label):
"""Load image from file and preprocess."""
image = tf.io.read_file(path)
image = tf.image.decode_jpeg(image, channels=3)
image = tf.image.resize(image, [224, 224])
image = tf.cast(image, tf.float32) / 255.0
return image, label
# Create dataset from image paths
image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...]
labels = [0, 1, ...]
image_dataset = (
tf.data.Dataset.from_tensor_slices((image_paths, labels))
.map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE)
.cache()
.shuffle(1000)
.batch(32)
.prefetch(tf.data.AUTOTUNE)
)
# Generate TensorFlow dataset with batching
def gen_dataset(
batch_size=1,
is_training=False,
shuffle=False,
input_pipeline_context=None,
preprocess=None,
drop_remainder=True,
total_steps=None
):
"""Generate dataset with specified configuration."""
dataset = tf.data.Dataset.from_tensor_slices((features, labels))
if shuffle:
dataset = dataset.shuffle(buffer_size=10000)
if preprocess:
dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)
dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)
if is_training:
dataset = dataset.repeat()
dataset = dataset.prefetch(tf.data.AUTOTUNE)
if total_steps:
dataset = dataset.take(total_steps)
return dataset
Use the tensorflow-data-pipelines skill when you need to: