11 min read

Some neural networks models are so large they cannot fit in memory of a single device (GPU). Such models need to be split over many devices, carrying out the training in parallel on the devices. This means anyone can now scale out distributed training to 100s of GPUs using TensorFlow. But that’s not the only advantage of distributed TensorFlow, you can also massively reduce your experimentation time by running many experiments in parallel on many GPUs and servers. Today, we will discuss about distributed TensorFlow and present a number of recipes to work with TensorFlow, GPUs, and multiple servers.

Working with TensorFlow and GPUs

We will learn how to use TensorFlow with GPUs: the operation performed is a simple matrix multiplication either on CPU or on GPU.

Getting ready

The first step is to install a version of TensorFlow that supports GPUs. The official TensorFlow Installation Instruction is your starting point . Remember that you need to have an environment supporting GPUs either via CUDA or CuDNN.

How to do it…

We proceed with the recipe as follows:

  1. Start by importing a few modules
import sys

import numpy as np import tensorflow as tf

from datetime import datetime
  1. Get from command line the type of processing unit that you desire to use (either “gpu” or “cpu”)
device_name = sys.argv[1] # Choose device from cmd line. Options: gpu or cpu

shape = (int(sys.argv[2]), int(sys.argv[2])) if device_name == "gpu":

device_name = "/gpu:0" else:

device_name = "/cpu:0"
  1. Execute the matrix multiplication either on GPU or on CPU. The key instruction is with tf.device(device_name). It creates a new context manager, telling TensorFlow to perform those actions on either the GPU or the CPU
with tf.device(device_name):

random_matrix = tf.random_uniform(shape=shape, minval=0, maxval=1) dot_operation = tf.matmul(random_matrix, tf.transpose(random_matrix)) sum_operation = tf.reduce_sum(dot_operation)

startTime = datetime.now()

with tf.Session(config=tf.ConfigProto(log_device_placement=True)) as session:

result = session.run(sum_operation) print(result)
  1.  Print some debug timing just to verify what is the difference between CPU and GPU
print("Shape:", shape, "Device:", device_name) print("Time taken:", datetime.now() - startTime)

How it works…

This recipe explains how to assign TensorFlow computations either to CPUs or to GPUs. The code is pretty simple and it will be used as a basis for the next recipe.

Playing with Distributed TensorFlow: multiple GPUs and one CPU

We will show an example of data parallelism where data is split across multiple GPUs

Getting ready

This recipe is inspired by a good blog posting written by Neil Tenenholtz and available online: https://clindatsci.com/blog/2017/5/31/distributed-tensorflow

How to do it…

We proceed with the recipe as follows:

  1. Consider this piece of code which runs a matrix multiplication on a single GPU.
# single GPU (baseline) import tensorflow as tf
# place the initial data on the cpu with tf.device('/cpu:0'):
input_data = tf.Variable([[1., 2., 3.],
[4., 5., 6.],
[7., 8., 9.],
[10., 11., 12.]])
b = tf.Variable([[1.], [1.], [2.]])
 
# compute the result on the 0th gpu with tf.device('/gpu:0'):
output = tf.matmul(input_data, b)
 
# create a session and run with tf.Session() as sess:
sess.run(tf.global_variables_initializer()) print sess.run(output)
  1. Partition the code with in graph replication as in the following snippet between 2 different GPUs. Note that the CPU is acting as the master node distributing the graph and collecting the final results.
# in-graph replication import tensorflow as tf num_gpus = 2
# place the initial data on the cpu with tf.device('/cpu:0'):
input_data = tf.Variable([[1., 2., 3.],
[4., 5., 6.],
[7., 8., 9.],
[10., 11., 12.]])
b = tf.Variable([[1.], [1.], [2.]])
 
# split the data into chunks for each gpu inputs = tf.split(input_data, num_gpus) outputs = []
 
# loop over available gpus and pass input data for i in range(num_gpus):
with tf.device('/gpu:'+str(i)): outputs.append(tf.matmul(inputs[i], b))
 
# merge the results of the devices with tf.device('/cpu:0'):
output = tf.concat(outputs, axis=0)
 
# create a session and run with tf.Session() as sess:
sess.run(tf.global_variables_initializer()) print sess.run(output)

How it works…

This is a very simple recipe where the graph is split in two parts by the CPU acting as master and distributed to two GPUs acting as distributed workers. The result of the computation is collected back to the CPU.

Playing with Distributed TensorFlow: multiple servers

We will learn how to distribute a TensorFlow computation across multiple servers. The key assumption is that the code is same for both the workers and the parameter servers. Therefore the role of each computation node is passed by a command line argument.

Getting ready

Again, this recipe is inspired by a good blog posting written by Neil Tenenholtz and available online: https://clindatsci.com/blog/2017/5/31/distributed-tensorflow

How to do it…

We proceed with the recipe as follows:

  1. Consider this piece of code where we specify the cluster architecture with one master running on 192.168.1.1:1111 and two workers running on 192.168.1.2:1111 and 192.168.1.3:1111 respectively.
import sys

import tensorflow as tf

# specify the cluster's architecture

cluster = tf.train.ClusterSpec({'ps': ['192.168.1.1:1111'], 'worker': ['192.168.1.2:1111',

'192.168.1.3:1111']

})
  1. Note that the code is replicated on multiple machines and therefore it is important to know what is the role of the current execution node. This information we get from the command line. A machine can be either a worker or a parameter server (ps).
# parse command-line to specify machine

job_type = sys.argv[1] # job type: "worker" or "ps" task_idx = sys.argv[2] # index job in the worker or ps list

# as defined in the ClusterSpec
  1. Run the training server where given a cluster, we bless each computational with a role (either worker or ps), and an id.
# create TensorFlow Server. This is how the machines communicate.

server = tf.train.Server(cluster, job_name=job_type, task_index=task_idx)
  1. The computation is different according to the role of the specific computation node:

If the role is a parameter server, then the condition is to join the server. Note that in this case there is no code to execute because the workers will continuously push updates and the only thing that the Parameter Server has to do is waiting.

Otherwise the worker code is executed on a specific device within the cluster. This part of code is similar to the one executed on a single machine where we first build the model and then we train it locally. Note that all the distribution of the work and the collection of the updated results is done transparently by Tensoflow. Note that TensorFlow provides a convenient tf.train.replica_device_setter that automatically assigns operations to devices.

# parameter server is updated by remote clients.
# will not proceed beyond this if statement. if job_type == 'ps':
server.join() else:
# workers only
with tf.device(tf.train.replica_device_setter( worker_device='/job:worker/task:'+task_idx, cluster=cluster)):
# build your model here as if you only were using a single machine
 
with tf.Session(server.target):
# train your model here

How it works…

We have seen how to create a cluster with multiple computation nodes. A node can be either playing the role of a Parameter server or playing the role of a worker.

In both cases the code executed is the same but the execution of the code is different according to parameters collected from the command line. The parameter server only needs to wait until the workers send updates. Note that tf.train.replica_device_setter(..) takes the role of automatically assigning operations to available devices, while tf.train.ClusterSpec(..) is used for cluster setup.

There is more…

An example of distributed training for MNIST is available online. In addition, to that you can decide to have more than one parameter server for efficiency reasons. Using parameters the server can provide better network utilization, and it allows to scale models to more parallel machines. It is possible to allocate more than one parameter server. The interested reader can have a look in here.

Training a Distributed TensorFlow MNIST classifier

In this we trained a full MNIST classifier in a distributed way. This recipe is inspired by the blog post in http://ischlag.github.io/2016/06/12/async-distributed- tensorflow/ and the code running on TensorFlow 1.2 is available here https://github. com/ischlag/distributed-tensorflow-example

Getting ready

This recipe is based on the previous one. So it might be convenient to read them in order.

How to do it…

We proceed with the recipe as follows:

  1. Import a few standard modules and define the TensorFlow cluster where the computation is run. Then start a server for a specific task
import tensorflow as tf import sys
import time
# cluster specification parameter_servers = ["pc-01:2222"] workers = [ "pc-02:2222",
"pc-03:2222",
"pc-04:2222"]
cluster = tf.train.ClusterSpec({"ps":parameter_servers, "worker":workers})
# input flags
tf.app.flags.DEFINE_string("job_name", "", "Either 'ps' or 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")FLAGS = tf.app.flags.FLAGS
# start a server for a specific task server = tf.train.Server(
cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
  1. Read MNIST data and define the hyperparameters used for training
# config batch_size = 100

learning_rate = 0.0005

training_epochs = 20 logs_path = "/tmp/mnist/1"

# load mnist data set

from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
  1. Check if your role is Parameter Server or Worker. If worker then define a simple dense neural network, define an optimizer, and the metric used for evaluating the classifier (for example accuracy).
if FLAGS.job_name == "ps": server.join()
elif FLAGS.job_name == "worker":
# Between-graph replication
with tf.device(tf.train.replica_device_setter( 
worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
# count the number of updates
global_step = tf.get_variable( 'global_step', [], initializer = tf.constant_initializer(0),
trainable = False)
 
# input images
with tf.name_scope('input'):
# None -> batch size can be any size, 784 -> flattened mnist image x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
# target 10 output classes
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
 
# model parameters will change during training so we use tf.Variable tf.set_random_seed(1)
with tf.name_scope("weights"):
W1 = tf.Variable(tf.random_normal([784, 100])) W2 = tf.Variable(tf.random_normal([100, 10]))
 
# bias
with tf.name_scope("biases"):
b1 = tf.Variable(tf.zeros([100])) b2 = tf.Variable(tf.zeros([10]))
 
# implement model
with tf.name_scope("softmax"):
# y is our prediction
z2 = tf.add(tf.matmul(x,W1),b1) a2 = tf.nn.sigmoid(z2)
z3 = tf.add(tf.matmul(a2,W2),b2) y = tf.nn.softmax(z3)
 
# specify cost function
with tf.name_scope('cross_entropy'):
# this is our cost cross_entropy = tf.reduce_mean(
-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
 
# specify optimizer
with tf.name_scope('train'):
# optimizer is an "operation" which we can execute in a session grad_op = tf.train.GradientDescentOptimizer(learning_rate)
train_op = grad_op.minimize(cross_entropy, global_step=global_step)
 
with tf.name_scope('Accuracy'):
# accuracy
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
 
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
 
# create a summary for our cost and accuracy tf.summary.scalar("cost", cross_entropy) tf.summary.scalar("accuracy", accuracy)
# merge all summaries into a single "operation" which we can execute in a session
summary_op = tf.summary.merge_all()
init_op = tf.global_variables_initializer() print("Variables initialized ...")
  1. Start a supervisor which acts as a Chief machine for the distributed setting. The chief is the worker machine which manages all the rest of the cluster. The session is maintained by the chief and the key instruction is sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0)). Also, with prepare_or_wait_for_session(server.target) the supervisor will wait for the model to be ready for use. Note that each worker will take care of different batched models and the final model is then available for the chief.
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), begin_time = time.time()
frequency = 100
with sv.prepare_or_wait_for_session(server.target) as sess:
# create log writer object (this will log on every machine)
writer = tf.summary.FileWriter(logs_path, graph=tf.get_default_graph())
# perform training cycles start_time = time.time()
for epoch in range(training_epochs):
# number of batches in one epoch
batch_count = int(mnist.train.num_examples/batch_size) count = 0
for i in range(batch_count):
batch_x, batch_y = mnist.train.next_batch(batch_size)
# perform the operations we defined earlier on batch
_, cost, summary, step = sess.run(
[train_op, cross_entropy, summary_op, global_step], feed_dict={x: batch_x, y_: batch_y}) writer.add_summary(summary, step)
count += 1
if count % frequency == 0 or i+1 == batch_count: elapsed_time = time.time() - start_time start_time = time.time()
print("Step: %d," % (step+1),
" Epoch: %2d," % (epoch+1), " Batch: %3d of %3d," % (i+1, batch_count),
" Cost: %.4f," % cost, 
"AvgTime:%3.2fms" % float(elapsed_time*1000/frequency)) count = 0
print("Test-Accuracy: %2.2f" % sess.run(accuracy, feed_dict={x: mnist.test.images, y_: mnist.test.labels}))
print("Total Time: %3.2fs" % float(time.time() - begin_time)) print("Final Cost: %.4f" % cost)
sv.stop()
print("done")

How it works…

This recipe describes an example of distributed MNIST classifier. In this example, TensorFlow allows us to define a cluster of three machines. One acts as parameter server and two more machines are used as workers working on separate batches of the training data.

If you enjoyed this excerpt, check out the book TensorFlow 1.x Deep Learning Cookbook, to become an expert in implementing deep learning techniques in real-world applications.

Read Next

Emoji Scavenger Hunt showcases TensorFlow.js

The 5 biggest announcements from TensorFlow Developer Summit 2018

Setting up Logistic Regression model using TensorFlow

 

IT Market Research Analyst trying to better understand how technology is being used in businesses. Football aficionado and Professional Procrastinator.

LEAVE A REPLY

Please enter your comment!
Please enter your name here