diff --git a/_static/img/thumbnails/cropped/PyTorch-Distributed-Overview.png b/_static/img/thumbnails/cropped/PyTorch-Distributed-Overview.png new file mode 100644 index 00000000000..426a14d98f5 Binary files /dev/null and b/_static/img/thumbnails/cropped/PyTorch-Distributed-Overview.png differ diff --git a/beginner_source/dist_overview.rst b/beginner_source/dist_overview.rst new file mode 100644 index 00000000000..bc9f7fe6bf0 --- /dev/null +++ b/beginner_source/dist_overview.rst @@ -0,0 +1,197 @@ +PyTorch Distributed Overview +============================ +**Author**: `Shen Li `_ + + +This is the overview page for the ``torch.distributed`` package. As there are +more and more documents, examples and tutorials added at different locations, +it becomes unclear which document or tutorial to consult for a specific problem +or what is the best order to read these contents. The goal of this page is to +address this problem by categorizing documents into different topics and briefly +describe each of them. If this is your first time building distributed training +applications using PyTorch, it is recommended to use this document to navigate +to the technology that can best serve your use case. + + +Introduction +------------ + +As of PyTorch v1.6.0, features in ``torch.distributed`` can be categorized into +three main components: + +* `Distributed Data-Parallel Training `__ + (DDP) is a widely adopted single-program multiple-data training paradigm. With + DDP, the model is replicated on every process, and every model replica will be + fed with a different set of input data samples. DDP takes care of gradient + communications to keep model replicas synchronized and overlaps it with the + gradient computations to speed up training. +* `RPC-Based Distributed Training `__ + (RPC) is developed to support general training structures that cannot fit into + data-parallel training, such as distributed pipeline parallelism, parameter + server paradigm, and combination of DDP with other training paradigms. It + helps manage remote object lifetime and extend autograd engine to beyond + machine boundaries. +* `Collective Communication `__ + (c10d) library support sending tensors across processes within a group. It + offers both collective communication APIs (e.g., + `all_reduce `__ + and `all_gather `__) + and P2P communication APIs (e.g., + `send `__ + and `isend `__). + DDP and RPC (`ProcessGroup Backend `__) + are built on c10d as of v1.6.0, where the former uses collective communications + and the latter uses P2P communications. Usually, developers do not need to + directly use this raw communication API, as DDP and RPC features above can serve + many distributed training scenarios. However, there are use cases where this API + is still helpful. One example would be distributed parameter averaging, where + applications would like to compute the average values of all model parameters + after the backward pass instead of using DDP to communicate gradients. This can + decouple communications from computations and allow finer-grain control over + what to communicate, but on the other hand, it also gives up the performance + optimizations offered by DDP. The + `Writing Distributed Applications with PyTorch `__ + shows examples of using c10d communication APIs. + + +Most of the existing documents are written for either DDP or RPC, the remainder +of this page will elaborate materials for these two components. + + +Data Parallel Training +---------------------- + +PyTorch provides several options for data-parallel training. For applications +that gradually grow from simple to complex and from prototype to production, the +common development trajectory would be: + +1. Use single-device training, if the data and model can fit in one GPU, and the + training speed is not a concern. +2. Use single-machine multi-GPU + `DataParallel `__, + if there are multiple GPUs on the server, and you would like to speed up + training with the minimum code change. +3. Use single-machine multi-GPU + `DistributedDataParallel `__, + if you would like to further speed up training and are willing to write a + little more code to set it up. +4. Use multi-machine `DistributedDataParallel `__ + and the `launching script `__, + if the application needs to scale across machine boundaries. +5. Use `torchelastic `__ to launch distributed + training, if errors (e.g., OOM) are expected or if the resources can join and + leave dynamically during the training. + + +.. note:: Data-parallel training also works with `Automatic Mixed Precision (AMP) `__. + + +``torch.nn.DataParallel`` +~~~~~~~~~~~~~~~~~~~~~~~~~ + +The `DataParallel `__ +package enables single-machine multi-GPU parallelism with the lowest coding +hurdle. It only requires a one-line change to the application code. The tutorial +`Optional: Data Parallelism `__ +shows an example. The caveat is that, although ``DataParallel`` is very easy to +use, it usually does not offer the best performance. This is because the +implementation of ``DataParallel`` replicates the model in every forward pass, +and its single-process multi-thread parallelism naturally suffers from GIL +contentions. To get better performance, please consider using +`DistributedDataParallel `__. + + +``torch.nn.parallel.DistributedDataParallel`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Compared to `DataParallel `__, +`DistributedDataParallel `__ +requires one more step to set up, i.e., calling +`init_process_group `__. +DDP uses multi-process parallelism, and hence there is no GIL contention across +model replicas. Moreover, the model is broadcast at DDP construction time instead +of in every forward pass, which also helps to speed up training. DDP is shipped +with several performance optimization technologies. For a more in-depth +explanation, please refer to this +`DDP paper `__ (VLDB'20). + + +DDP materials are listed below: + +1. `DDP notes `__ + offer a starter example and some brief descriptions of its design and + implementation. If this is your first time using DDP, please start from this + document. +2. `Getting Started with Distributed Data Parallel <../intermediate/ddp_tutorial.html>`__ + explains some common problems with DDP training, including unbalanced + workload, checkpointing, and multi-device models. Note that, DDP can be + easily combined with single-machine multi-device model parallelism which is + described in the + `Single-Machine Model Parallel Best Practices <../intermediate/model_parallel_tutorial.html>`__ + tutorial. +3. The `Launching and configuring distributed data parallel applications `__ + document shows how to use the DDP launching script. +4. `PyTorch Distributed Trainer with Amazon AWS `__ + demonstrates how to use DDP on AWS. + +TorchElastic +~~~~~~~~~~~~ + +With the growth of the application complexity and scale, failure recovery +becomes an imperative requirement. Sometimes, it is inevitable to hit errors +like OOM when using DDP, but DDP itself cannot recover from those errors nor +does basic ``try-except`` block work. This is because DDP requires all processes +to operate in a closely synchronized manner and all ``AllReduce`` communications +launched in different processes must match. If one of the processes in the group +throws an OOM exception, it is likely to lead to desynchronization (mismatched +``AllReduce`` operations) which would then cause a crash or hang. If you expect +failures to occur during training or if resources might leave and join +dynamically, please launch distributed data-parallel training using +`torchelastic `__. + + +General Distributed Training +---------------------------- + +Many training paradigms do not fit into data parallelism, e.g., +parameter server paradigm, distributed pipeline parallelism, reinforcement +learning applications with multiple observers or agents, etc. The +`torch.distributed.rpc `__ aims at +supporting general distributed training scenarios. + +The `torch.distributed.rpc `__ package +has four main pillars: + +* `RPC `__ supports running + a given function on a remote worker. +* `RRef `__ helps to manage the + lifetime of a remote object. The reference counting protocol is presented in the + `RRef notes `__. +* `Distributed Autograd `__ + extends the autograd engine beyond machine boundaries. Please refer to + `Distributed Autograd Design `__ + for more details. +* `Distributed Optimizer `__ + that automatically reaches out to all participating workers to update + parameters using gradients computed by the distributed autograd engine. + +RPC Tutorials are listed below: + +1. The `Getting Started with Distributed RPC Framework <../intermediate/rpc_tutorial.html>`__ + tutorial first uses a simple Reinforcement Learning (RL) example to + demonstrate RPC and RRef. Then, it applies a basic distributed model + parallelism to an RNN example to show how to use distributed autograd and + distributed optimizer. +2. The `Implementing a Parameter Server Using Distributed RPC Framework <../intermediate/rpc_param_server_tutorial.html>`__ + tutorial borrows the spirit of + `HogWild! training `__ + and applies it to an asynchronous parameter server (PS) training application. +3. The `Distributed Pipeline Parallelism Using RPC <../intermediate/dist_pipeline_parallel_tutorial.html>`__ + tutorial extends the single-machine pipeline parallel example (presented in + `Single-Machine Model Parallel Best Practices <../intermediate/model_parallel_tutorial.html>`__) + to a distributed environment and shows how to implement it using RPC. +4. The `Implementing Batch RPC Processing Using Asynchronous Executions <../intermediate/rpc_async_execution.html>`__ + tutorial demonstrates how to implement RPC batch processing using the + `@rpc.functions.async_execution `__ + decorator, which can help speed up inference and training. It uses similar + RL and PS examples employed in the above tutorials 1 and 2. diff --git a/index.rst b/index.rst index 8df662113d8..0e04e92bf35 100644 --- a/index.rst +++ b/index.rst @@ -297,6 +297,13 @@ Welcome to PyTorch Tutorials .. Parallel-and-Distributed-Training +.. customcarditem:: + :header: PyTorch Distributed Overview + :card_description: Briefly go over all concepts and features in the distributed package. Use this document to find the distributed training technology that can best serve your application. + :image: _static/img/thumbnails/cropped/PyTorch-Distributed-Overview.png + :link: beginner/dist_overview.html + :tags: Parallel-and-Distributed-Training + .. customcarditem:: :header: Single-Machine Model Parallel Best Practices :card_description: Learn how to implement model parallel, a distributed training technique which splits a single model onto different GPUs, rather than replicating the entire model on each GPU @@ -311,6 +318,13 @@ Welcome to PyTorch Tutorials :link: intermediate/ddp_tutorial.html :tags: Parallel-and-Distributed-Training +.. customcarditem:: + :header: (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS + :card_description: Set up the distributed package of PyTorch, use the different communication strategies, and go over some the internals of the package. + :image: _static/img/thumbnails/cropped/advanced-PyTorch-1point0-Distributed-Trainer-with-Amazon-AWS.png + :link: beginner/aws_distributed_training_tutorial.html + :tags: Parallel-and-Distributed-Training + .. customcarditem:: :header: Writing Distributed Applications with PyTorch :card_description: Set up the distributed package of PyTorch, use the different communication strategies, and go over some the internals of the package. @@ -325,13 +339,6 @@ Welcome to PyTorch Tutorials :link: intermediate/rpc_tutorial.html :tags: Parallel-and-Distributed-Training -.. customcarditem:: - :header: (advanced) PyTorch 1.0 Distributed Trainer with Amazon AWS - :card_description: Set up the distributed package of PyTorch, use the different communication strategies, and go over some the internals of the package. - :image: _static/img/thumbnails/cropped/advanced-PyTorch-1point0-Distributed-Trainer-with-Amazon-AWS.png - :link: beginner/aws_distributed_training_tutorial.html - :tags: Parallel-and-Distributed-Training - .. customcarditem:: :header: Implementing a Parameter Server Using Distributed RPC Framework :card_description: Walk through a through a simple example of implementing a parameter server using PyTorch’s Distributed RPC framework. @@ -513,6 +520,7 @@ Additional Resources :hidden: :caption: Parallel and Distributed Training + beginner/dist_overview intermediate/model_parallel_tutorial intermediate/ddp_tutorial intermediate/dist_tuto diff --git a/intermediate_source/ddp_tutorial.rst b/intermediate_source/ddp_tutorial.rst index 4935b4c5652..68409e4daeb 100644 --- a/intermediate_source/ddp_tutorial.rst +++ b/intermediate_source/ddp_tutorial.rst @@ -2,6 +2,13 @@ Getting Started with Distributed Data Parallel ================================================= **Author**: `Shen Li `_ +Prerequisites: + +- `PyTorch Distributed Overview <../beginner/dist_overview.html>`__ +- `DistributedDataParallel API documents `__ +- `DistributedDataParallel notes `__ + + `DistributedDataParallel `__ (DDP) implements data parallelism at the module level which can run across multiple machines. Applications using DDP should spawn multiple processes and @@ -202,9 +209,9 @@ and elasticity support, please refer to `TorchElastic `__ - `Single-Machine Model Parallel Best Practices `__ - `Getting started with Distributed RPC Framework `__ - RRef helper functions: diff --git a/intermediate_source/dist_tuto.rst b/intermediate_source/dist_tuto.rst index 76538a81c90..1838abe8f72 100644 --- a/intermediate_source/dist_tuto.rst +++ b/intermediate_source/dist_tuto.rst @@ -2,6 +2,10 @@ Writing Distributed Applications with PyTorch ============================================= **Author**: `Séb Arnold `_ +Prerequisites: + +- `PyTorch Distributed Overview <../beginner/dist_overview.html>`__ + In this short tutorial, we will be going over the distributed package of PyTorch. We'll see how to set up the distributed setting, use the different communication strategies, and go over some the internals of diff --git a/intermediate_source/rpc_async_execution.rst b/intermediate_source/rpc_async_execution.rst index e3e42f9135e..08ba5028d5b 100644 --- a/intermediate_source/rpc_async_execution.rst +++ b/intermediate_source/rpc_async_execution.rst @@ -5,8 +5,9 @@ Implementing Batch RPC Processing Using Asynchronous Executions Prerequisites: -- `Getting started with Distributed RPC Framework `__ -- `Implementing a Parameter Server using Distributed RPC Framework `__ +- `PyTorch Distributed Overview <../beginner/dist_overview.html>`__ +- `Getting started with Distributed RPC Framework `__ +- `Implementing a Parameter Server using Distributed RPC Framework `__ - `RPC Asynchronous Execution Decorator `__ This tutorial demonstrates how to build batch-processing RPC applications with diff --git a/intermediate_source/rpc_param_server_tutorial.rst b/intermediate_source/rpc_param_server_tutorial.rst index cea2be7d647..0516cf60031 100644 --- a/intermediate_source/rpc_param_server_tutorial.rst +++ b/intermediate_source/rpc_param_server_tutorial.rst @@ -4,6 +4,11 @@ Implementing a Parameter Server Using Distributed RPC Framework **Author**\ : `Rohan Varma `_ +Prerequisites: + +- `PyTorch Distributed Overview <../beginner/dist_overview.html>`__ +- `RPC API documents `__ + This tutorial walks through a simple example of implementing a parameter server using PyTorch's `Distributed RPC framework `_. The parameter server framework is a paradigm in which a set of servers store parameters, such as large embedding tables, and several trainers query the parameter servers in order to retrieve the most up to date parameters. These trainers can run a training loop locally and occasionally synchronize with the parameter server to get the latest parameters. For more reading on the parameter server approach, check out `this paper `_. Using the Distributed RPC Framework, we'll build an example where multiple trainers use RPC to communicate with the same parameter server and use `RRef `_ to access states on the remote parameter server instance. Each trainer will launch its dedicated backward pass in a distributed fashion through stitching of the autograd graph across multiple nodes using distributed autograd. @@ -78,7 +83,7 @@ Next, let's define some helper functions that will be useful for the rest of our # On the local node, call a method with first arg as the value held by the # RRef. Other args are passed in as arguments to the function called. - # Useful for calling instance methods. method could be any matching function, including + # Useful for calling instance methods. method could be any matching function, including # class methods. def call_method(method, rref, *args, **kwargs): return method(rref.local_value(), *args, **kwargs) @@ -119,7 +124,7 @@ Next, we'll define our forward pass. Note that regardless of the device of the m # Tensors must be moved in and out of GPU memory due to this. out = out.to("cpu") return out -Next, we'll define a few miscellaneous functions useful for training and verification purposes. The first, ``get_dist_gradients``\ , will take in a Distributed Autograd context ID and call into the ``dist_autograd.get_gradients`` API in order to retrieve gradients computed by distributed autograd. More information can be found in the `distributed autograd documentation `_. Note that we also iterate through the resulting dictionary and convert each tensor to a CPU tensor, as the framework currently only supports sending tensors over RPC. Next, ``get_param_rrefs`` will iterate through our model parameters and wrap them as a (local) `RRef `_. This method will be invoked over RPC by trainer nodes and will return a list of the parameters to be optimized. This is required as input to the `Distributed Optimizer `_\ , which requires all parameters it must optimize as a list of ``RRef``\ s. +Next, we'll define a few miscellaneous functions useful for training and verification purposes. The first, ``get_dist_gradients``\ , will take in a Distributed Autograd context ID and call into the ``dist_autograd.get_gradients`` API in order to retrieve gradients computed by distributed autograd. More information can be found in the `distributed autograd documentation `_. Note that we also iterate through the resulting dictionary and convert each tensor to a CPU tensor, as the framework currently only supports sending tensors over RPC. Next, ``get_param_rrefs`` will iterate through our model parameters and wrap them as a (local) `RRef `_. This method will be invoked over RPC by trainer nodes and will return a list of the parameters to be optimized. This is required as input to the `Distributed Optimizer `_\ , which requires all parameters it must optimize as a list of ``RRef``\ s. .. code-block:: python @@ -224,7 +229,7 @@ Below, we initialize our ``TrainerNet`` and build a ``DistributedOptimizer``. No # Build DistributedOptimizer. param_rrefs = net.get_global_param_rrefs() opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03) -Next, we define our main training loop. We loop through iterables given by PyTorch's `DataLoader `_. Before writing our typical forward/backward/optimizer loop, we first wrap the logic within a `Distributed Autograd context `_. Note that this is needed to record RPCs invoked in the model's forward pass, so that an appropriate graph can be constructed which includes all participating distributed workers in the backward pass. The distributed autograd context returns a ``context_id`` which serves as an identifier for accumulating and optimizing gradients corresponding to a particular iteration. +Next, we define our main training loop. We loop through iterables given by PyTorch's `DataLoader `_. Before writing our typical forward/backward/optimizer loop, we first wrap the logic within a `Distributed Autograd context `_. Note that this is needed to record RPCs invoked in the model's forward pass, so that an appropriate graph can be constructed which includes all participating distributed workers in the backward pass. The distributed autograd context returns a ``context_id`` which serves as an identifier for accumulating and optimizing gradients corresponding to a particular iteration. As opposed to calling the typical ``loss.backward()`` which would kick off the backward pass on this local worker, we call ``dist_autograd.backward()`` and pass in our context_id as well as ``loss``\ , which is the root at which we want the backward pass to begin. In addition, we pass this ``context_id`` into our optimizer call, which is required to be able to look up the corresponding gradients computed by this particular backwards pass across all nodes. @@ -259,7 +264,7 @@ The following simply computes the accuracy of our model after we're done trainin model.eval() correct_sum = 0 # Use GPU to evaluate if possible - device = torch.device("cuda:0" if model.num_gpus > 0 + device = torch.device("cuda:0" if model.num_gpus > 0 and torch.cuda.is_available() else "cpu") with torch.no_grad(): for i, (data, target) in enumerate(test_loader): @@ -330,7 +335,7 @@ We've now completed our trainer and parameter server specific code, and all that assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})." os.environ['MASTER_ADDR'] = args.master_addr os.environ["MASTER_PORT"] = args.master_port -Now, we'll create a process corresponding to either a parameter server or trainer depending on our command line arguments. We'll create a ``ParameterServer`` if our passed in rank is 0, and a ``TrainerNet`` otherwise. Note that we're using ``torch.multiprocessing`` to launch a subprocess corresponding to the function that we want to execute, and waiting on this process's completion from the main thread with ``p.join()``. In the case of initializing our trainers, we also use PyTorch's `dataloaders `_ in order to specify train and test data loaders on the MNIST dataset. +Now, we'll create a process corresponding to either a parameter server or trainer depending on our command line arguments. We'll create a ``ParameterServer`` if our passed in rank is 0, and a ``TrainerNet`` otherwise. Note that we're using ``torch.multiprocessing`` to launch a subprocess corresponding to the function that we want to execute, and waiting on this process's completion from the main thread with ``p.join()``. In the case of initializing our trainers, we also use PyTorch's `dataloaders `_ in order to specify train and test data loaders on the MNIST dataset. .. code-block:: python diff --git a/intermediate_source/rpc_tutorial.rst b/intermediate_source/rpc_tutorial.rst index 6d149e80837..c4ee6536830 100644 --- a/intermediate_source/rpc_tutorial.rst +++ b/intermediate_source/rpc_tutorial.rst @@ -3,6 +3,11 @@ Getting Started with Distributed RPC Framework **Author**: `Shen Li `_ +Prerequisites: + +- `PyTorch Distributed Overview <../beginner/dist_overview.html>`__ +- `RPC API documents `__ + This tutorial uses two simple examples to demonstrate how to build distributed training with the `torch.distributed.rpc `__ package which is first introduced as an experimental feature in PyTorch v1.4.