Have you ever wondered how complex models with millions to billions of parameters are trained on terabytes of data? In fact, the size of such models can get so large that they may not even fit in the memory of a single processor. Thus training such models becomes impossible via conventional means and we need something else to support such a memory-intensive task. Distributed training is one of the solutions to this problem, let us find out what exactly it is and how these issues can be solved with it.
What is distributed training?
Usually, when faced with a gigantic task in any field, we try to divide it into subtasks and run them in parallel. This saves us time and makes such a complex task doable. When we do the same in deep learning, we call it distributed training.
Precisely, in distributed training, we divide our training workload across multiple processors while training a huge deep learning model. These processors are referred to as worker nodes or simply, workers. These workers are trained in parallel to speed up the training process. Principally, there are two approaches to parallelism — data parallelism and model parallelism.
As the name suggests, in this approach:
- We divide the data into n number of partitions, where n is the total number of available workers in the compute cluster.
- We have a copy of the model in each worker node and each one of them performs the training on its own subset of the data.
- The training loops are carried out either synchronously or asynchronously.
Let’s see the two ways of carrying out distributed training loops and the difference between them.
As we already know that in data parallelism we divide the data into partitions and send each part to a worker. Each worker has a full replica of the model and is trained only on a part of the data.
- In synchronous training, the forward pass begins at the same time in all of the workers and they compute a different output and gradients. Here each worker waits for all other workers to complete their training loops and calculate their respective gradients.
- Now after all the workers have completed computing gradients, all of them start communicating with each other and aggregate the gradients using the all-reduce algorithm, which we will discuss ahead.
- After all the gradients are combined, the copy of these updated gradients is sent to all the workers.
- Now after getting the updated gradients using the all-reduce algorithm, each worker continues with the backward pass and updates the local copy of the weights normally. Unless all the workers do not update their weights, the next forward pass doesn’t begin and that’s why it is called synchronous.
It is important to note that all workers produce different gradients as they are trained on different subsets of data, yet at any point in time, all the workers have the exact same weights.
Using all reduce algorithms, all workers have to share the load of storing and maintaining global parameters. In this algorithm, every worker shares its gradients with all other workers and applies a reduction operation. In simpler words, the all reduce algorithm reduces the target arrays in all workers to a single array and returns the resultant array to all workers.
There are different implementations of all-reduce algorithms that dictate how these parameters are calculated and shared.
- In one implementation, all the workers send their gradients to a single worker known as driver worker which is responsible for the reduction of gradients and sending the updated gradients to all workers. But the problem with this approach is that the driver becomes a bottleneck because its communication and application of the reduction operation increase as the number of processes increases, thus it does not scale well.
- Thus we can use a less naive approach called ring-all reduce in which the workers are set up in a ring. Each worker is in charge of a certain subset of parameters which are shared only with the next worker in the ring. This algorithm is a valuable tool that dramatically reduces synchronization overhead.
As you might have already guessed, in the synchronous approach we are not able to use all the resources efficiently as a worker has to wait for other workers in order to move ahead. This is especially a problem when there is a significant difference in the computation powers among all the workers, in which case the whole process is only as fast as the slowest worker in the cluster.
Thus in asynchronous training, we want workers to work independently in such a way that a worker need not wait for any other worker in the cluster. One way to achieve this is by using a parameter server.
In distributed training, we have a cluster of workers and till now we have seen that all workers perform just one task which is training. But we can assign a different role to each worker in such a way that some workers act as parameter servers and the rest as training workers.
The parameter servers are responsible for holding the parameters of the model and are responsible for updating the global state of our model. While the training workers run the actual training loop and produce the gradients and the loss from the data assigned to them.
We can summarize the process as:
- Replicate the model in all of our workers and each worker uses a subset of data for training.
- Each training worker fetches the parameters from the parameter servers.
- Each training worker performs a training loop and sends the gradients back to all the parameter servers which then update the model parameters.
This allows us to run the training independently but still, there are some disadvantages.
- One disadvantage is that at any particular time only one of the workers is using the updated version of the model and the rest are using the stale version of the model.
- If we are just using one worker as a parameter server, it can become a bottleneck for huge clusters it can become a single point of failure. But of course, the bottleneck problem can be reduced to some extent by introducing multiple parallel servers.
So far, we have seen how to distribute the data and train the model in multiple devices with different chunks of data and this approach works most of the time and is easy to implement as well. But as mentioned earlier, in some rare situations the size of the model may be too large for any single worker, which is why we need model parallelism.
In model parallelism, which is also known as network parallelism, the model is divided either horizontally or vertically into different parts that can run concurrently in different workers with each worker running on the same data. Here the worker only needs to synchronize the shared parameters, usually once for each forward or backward-propagation step.
In vertical partitioning the layers are unaffected, as a result, it can be applied to any deep learning model. So we usually use the vertical partitioning method while horizontal partitioning is only considered as a last resort if there is no other way to fit a layer into the memory of any single machine.
There may be even simpler cases where we can use model parallelism, for example in an encoder-decoder architecture, we can train encoder and decoder in separate workers. The most common use case of model parallelism can be found in NLP (natural language processing) models such as Transformers, GPT-3, BERT, etc.
Centralized and decentralized training
In model parallelism as well as data parallelism, we found out that it is essential that the worker nodes communicate with one another so that they can share the model parameters. There are two ways of communication approaches which are centralized training and decentralized training. We have actually used both the approaches in the previous sections but now let us get to know them formally.
In centralized training there exists a node or group of nodes responsible for updating and storing the model parameters; this node is called a parameter server. We have already seen how this approach is used in asynchronous training. We can also use this approach for synchronous training as Centralized Synchronous Systems. In these systems, the parameter server is dependent on the gradient input from all the workers to update the model and training cannot progress unless all workers pass the gradients to the parameter server. Also, the workers are dependent on the parameter server to get updated weights. As discussed earlier the disadvantage of this approach is that the parameter server can itself become a bottleneck for a huge cluster.
In the decentralized communication pattern, each node communicates with every other node to update the parameters. We have already seen how this approach works for synchronous systems. The advantage of this approach is that there is no single point of failure, peer-peer updates are faster, and sparse updates can be made by exchanging only what has changed. We can also use this approach with Asynchronous training known as Decentralized Asynchronous Systems.
Why do we need distributed training?
When it comes to deep learning, it is mostly about optimizing linear algebra and we all know that it is computationally expensive. The problem arises when we have a lot of training data which can be very common in deep learning, in such cases the training may take months, even years on a single machine with even a powerful accelerator.
When we try to solve complex problems involving images, audio, or text, we use models with complex architectures to get better results. During training, these models may calculate and store millions or billions of updated weight parameters which can cause storage issues. Also, sometimes your machine might crash in between training, which would cause losing all progress and this is a quite frequent problem when the training period is very large as mentioned earlier.
But when we move to multiple machines, we can take advantage of the fact that linear algebra is parallelizable which enables us to split a large model across multiple machines. We can also introduce fault tolerance so that even if one of our machines crashes, the training process is not lost completely and we can continue training without any significant loss.
But is distributed training better in every case, even when we have simpler models with smaller training data? No, with the parallelization overhead, it might actually take you more time to train it on a distributed system compared to training it on a single machine.
Benefits of distributed training
Basically in distributed training, we train our models over a distributed system, and training a model on a distributed system has several benefits as listed below:
- Fault tolerance and reliability
Inherently, distributed systems are more failure tolerant than single machines. If a company has an 8-machine cluster over two data centers, it will continue to work without any issues even if one of the data centers goes down. This equates to increased reliability since when a single computer fails, everything else fails with it. But distributed systems remain operational even if one or more nodes or data centers fail.
Distributed systems allow complex problems to be broken down into smaller chunks and handled on numerous computers in parallel, reducing the amount of time it takes to solve these problems.
Distributed systems are naturally scalable since they run across several machines. So instead of repeatedly updating a single system, a user can install another machine to handle the increasing load. When a system is under heavy stress, each machine can operate at maximum capacity, and when the burden is light, some machines can be turned off. We can say the ability of a user to scale is practically endless.
- Cost effectiveness
When compared to some huge centralized systems, distributed systems are far more cost-effective. Their initial cost is higher than that of standalone systems, but after a certain point, they become more economical to scale.
Distributed training frameworks
Here are some of the Python frameworks that allow us to distribute and parallelize the deep learning models.
Horovod is a distributed deep learning training framework for TensorFlow, Keras, and PyTorch. It is developed by Uber and the goal of Horovod is to make distributed deep learning fast and easy. A training script developed for scale with Horovod can operate on a single GPU, several GPUs without requiring any further code changes.
You can easily set up an environment for Horovod with Azure ML which provides curated training environments for setting up training using a variety of frameworks.
Elephas is a Keras add-on that allows you to use Spark to execute distributed deep learning models at scale. Elephas aims to preserve Keras’s simplicity and ease of use, enabling rapid prototyping of distributed models that can operate on large data sets.
3. Amazon Sagemaker
Both data parallelism and model parallelism are supported by Amazon SageMaker’s distributed training libraries. The libraries are tailored to the SageMaker training environment, assisting you in adapting your distributed training jobs to SageMaker and increasing training speed and throughput.
There are 18 libraries of popular machine learning algorithms in SageMaker. Many of them were completely rewritten from the ground up in order to be scalable and distributed right out of the box.
TensorFlow offers built-in support for distributed training. The tf.distribute.Strategy API makes it possible to spread training across many GPUs with little code modifications. This API is also capable of launching the distributed job via Azure ML. It is easy to use and provides good performance out of the box. Because it is easy to use, it attracts a wide variety of user segments, including researchers, machine learning engineers, and many more.
Azure ML supports running distributed jobs using PyTorch’s native distributed training capabilities. The torch.distributed package in PyTorch provides distributed training capabilities. The features in this package are categorized into three main components:
- Distributed Data-Parallel Training: DDP is a single-program multiple-data training paradigm. The model is duplicated on each process through DDP, and each model replica receives a new set of input data samples. To keep model replicas synchronized, DDP handles gradient communication and overlaps it with gradient calculations to speed up training.
- RPC-Based Distributed Training: RPC allows general training topologies that aren’t suitable for data-parallel training, such as distributed pipeline parallelism, the parameter server paradigm, and DDP-based hybrid training. It aids in the management of remote object lifetimes and extends the autograd engine outside the confines of the machine.
- Collective Communication: It supports sending tensors across processes within a group and offers both collective communication APIs and P2P communication APIs. The DDP and RPC APIs can handle many dispersed training scenarios, so developers don’t need to use this raw communication API directly. However, there are some situations in which this API is still useful. One example is distributed parameter averaging, where instead of utilizing DDP to transmit gradients, applications would like to compute the average values of all model parameters after the backward pass.
Learn more about the tools for distributed training from this article: “Distributed Training: Frameworks and Tools”
May be useful
💡 In Neptune, you can track data of your run from many processes, in particular running on different machines.
In this article, we discussed what distributed training is and how it can solve our problem of training a complex machine learning model on a huge dataset. I am sure that now you are able to distinguish between model parallelism and data parallelism, and can decide which approach might be more suitable for your use case. For further reading you can refer to the following material:
How to Organize Deep Learning Projects – Examples of Best Practices
13 mins read | Author Nilesh Barla | Updated May 31st, 2021
For a successful deep learning project, you need a lot of iterations, a lot of time, and a lot of effort. To make this process less painful, you should try to use your resources to the max.
A good step-by-step workflow will help you do that. With it, your projects become productive, reproducible, and understandable.
In this article you’ll see how to structure work on deep learning projects — from the inception to deployment, monitoring the deployed model, and everything in between.
Along the way, we’ll use Neptune to run, monitor, and analyze your experiments. Neptune is a cool tool for increasing productivity in ML projects.
In this article you will learn:
- About the lifecycle of the project.
- Importance of defining an objective or goal of the project.
- Collecting data based on the requirements of the project.
- Model training and results exploration including:
- Establishing baselines for better results.
- Adopting techniques and approaches from the existing open-source state-of-the-art models research papers and code repositories.
- Experiment tracking and management management
- Model refinement techniques to avoid underfitting and overfitting like:
- Controlling hyperparameters
- Testing and evaluating your project before deployment.
- Model deployment
- Project maintenance