
import os
import time
#!pip install -q -U tensorflow-gpu
import tensorflow as tf
import numpy as np
我们将使用Fashion-MNIST数据集,这是MNIST的替代品,其中包含数千张Zalando时尚文章的灰度图像。获取训练和测试数据非常简单:
(train_images,train_labels),(test_images,test_labels)=
tf.keras.datasets.fashion_mnist.load_data()
TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)
train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1))
test_images = np.asarray(test_images, dtype=np.float32) / 255
# Convert the test images and add channels
test_images = test_images.reshape((TEST_SIZE, 28, 28, 1))
# How many categories we are predicting from (0-9)
LABEL_DIMENSIONS = 10
train_labels = tf.keras.utils.to_categorical(train_labels,
LABEL_DIMENSIONS)
test_labels = tf.keras.utils.to_categorical(test_labels,
LABEL_DIMENSIONS)
# Cast the labels to floats, needed later
train_labels = train_labels.astype(np.float32)
test_labels = test_labels.astype(np.float32)
tf.keras 模型inputs = tf.keras.Input(shape=(28,28,1)) # Returns a placeholder
x = tf.keras.layers.Conv2D(filters=32,
kernel_size=(3, 3),
activation=tf.nn.relu)(inputs)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64,
kernel_size=(3, 3),
activation=tf.nn.relu)(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)
x = tf.keras.layers.Conv2D(filters=64,
kernel_size=(3, 3),
activation=tf.nn.relu)(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
activation=tf.nn.softmax)(x)
model = tf.keras.Model(inputs = inputs,outputs = predictions)
optimizer = tf.train.AdamOptimizer(learning_rate = 0.001)
model.compile(loss ='categorical_crossentropy',
optimizer = optimizer,
metrics = ['accuracy'])
那么Estimator有什么好处呢?那么开始:
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,
config=config)
def input_fn(images, labels, epochs, batch_size):
# Convert the inputs to a Dataset. (E)
ds = tf.data.Dataset.from_tensor_slices((images, labels))
# Shuffle, repeat, and batch the examples. (T)
SHUFFLE_SIZE = 5000
ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
ds = ds.prefetch(2)
# Return the dataset. (L)
return ds
class TimeHistory(tf.train.SessionRunHook):
def begin(self):
self.times = []
def before_run(self,run_context):
self.iter_time_start = time.time()
def after_run(self,run_context,run_values):
self.times.append(time.time() - self.iter_time_start)
time_hist = TimeHistory()
BATCH_SIZE = 512
EPOCHS = 5
estimator.train(lambda:input_fn(train_images,train_labels,
epochs
= EPOCHS,
batch_size = BATCH_SIZE),
hooks = [time_hist])
由于我们的计时了hook,我们现在可以用它来计算训练的总时间以及我们每秒训练的平均图像数(平均吞吐量):
total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")
avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with
{NUM_GPUS} GPU(s)")


两个不同的NUM_GPUS的K80 GPU上Fashion-MNIST训练的吞吐量和总时间表现较少扩展。
estimator.evaluate(lambda:input_fn(test_images,
test_labels ,epochs
= 1,
batch_size = BATCH_SIZE))

#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018
labels = ['CNV','DME','DRUSEN','NORMAL']
train_folder = os.path.join('OCT2017','train','**','*。jpeg')
test_folder = os.path.join('OCT2017','test','**','* .JPEG“)
def input_fn(file_pattern, labels,
image_size=(224,224),
shuffle=False,
batch_size=64,
num_epochs=None,
buffer_size=4096,
prefetch_buffer_size=None):
table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
num_classes = len(labels)
def _map_func(filename):
label = tf.string_split([filename], delimiter=os.sep).values[-2]
image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
image = tf.image.convert_image_dtype(image, dtype=tf.float32)
image = tf.image.resize_images(image, size=image_size)
return (image, tf.one_hot(table.lookup(label), num_classes))
dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
if num_epochs is not None and shuffle:
dataset = dataset.apply(
tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
elif shuffle:
dataset = dataset.shuffle(buffer_size)
elif num_epochs is not None:
dataset = dataset.repeat(num_epochs)
dataset = dataset.apply(
tf.contrib.data.map_and_batch(map_func=_map_func,
batch_size=batch_size,
num_parallel_calls=os.cpu_count()))
dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
return dataset
keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
include_top=False)
output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
activation=tf.nn.softmax)(output)
model = tf.keras.Model(inputs=keras_vgg16.input,
outputs=prediction)
for layer in keras_vgg16.layers[:-4]:
layer.trainable = False
model.compile(loss ='categorical_crossentropy',
optimizer = tf.train.AdamOptimizer(),
metrics = ['accuracy'])
NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus = NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute = strategy)
estimator = tf.keras.estimator.model_to_estimator(model,
config = config)
BATCH_SIZE = 64
EPOCHS = 1
estimator.train(input_fn = lambda:input_fn(train_folder,
labels,
shuffle = True,
batch_size = BATCH_SIZE,
buffer_size = 2048,
num_epochs = EPOCHS,
prefetch_buffer_size = 4),
hooks = [time_hist])
estimator.evaluate(input_fn = lambda:input_fn(test_folder,
labels,
shuffle = False,
batch_size = BATCH_SIZE,
buffer_size = 1024,
num_epochs = 1))

两个不同的NUM_GPUS的K80 GPU上FRetinal OCT训练的吞吐量和总时间表现线性扩展
我们在上面展示了使用Estimators API在多个GPU上训练深度学习Keras模型,如何编写一个遵循最佳实践的输入管道来充分利用我们的资源(线性扩展)以及如何通过hook计时训练吞吐量。
