|
| 1 | +Introduction to Libuv TCPStore Backend |
| 2 | +====================================== |
| 3 | +**Authors**: `Xilun Wu <https://github.com/XilunWu>`_ |
| 4 | + |
| 5 | +.. note:: |
| 6 | + |edit| View and edit this tutorial in `github <https://github.com/pytorch/tutorials/blob/main/intermediate_source/TCPStore_libuv_backend.rst>`__. |
| 7 | + |
| 8 | +Prerequisites: |
| 9 | + |
| 10 | +- `TCPStore API Documents <https://pytorch.org/docs/main/distributed.html#torch.distributed.TCPStore>`__ |
| 11 | + |
| 12 | +Introduction |
| 13 | +------------ |
| 14 | + |
| 15 | +Recently, we have rolled out a new TCPStore server backend using libuv, a third-party library for asynchronous I/O. This new server backend aims to |
| 16 | +address scalability and robustness challenges in large-scale distributed training jobs, such as those with more than 1024 ranks. We ran a series of |
| 17 | +benchmarks to compare the libuv backend against the old one, and the experiment results demonstrated significant improvements in store initialization |
| 18 | +time and maintained a comparable performance in store I/O operations. |
| 19 | + |
| 20 | +As a result of these findings, the libuv backend has been set as the default TCPStore server backend in PyTorch 2.4. This change is expected to enhance |
| 21 | +the performance and scalability of distributed training jobs. |
| 22 | + |
| 23 | +This change does introduce a slight incompatibility to store initialization. For users who wish to continue using the old backend, the tutorial will |
| 24 | +provide guidance on how to specify to use the previous TCPStore server backend. |
| 25 | + |
| 26 | + |
| 27 | +Performance Benchmark |
| 28 | +--------------------- |
| 29 | + |
| 30 | +To better demonstrate the benefit of our new libuv TCPStore backend, we set up a benchmark over a wide range of job size, from 1024 (1K) to 98304 (96K) ranks. |
| 31 | +We first measured the TCPStore initialization time using the code snippet below: |
| 32 | + |
| 33 | +.. code:: python |
| 34 | +
|
| 35 | + import logging |
| 36 | + import os |
| 37 | +
|
| 38 | + from time import perf_counter |
| 39 | +
|
| 40 | + import torch |
| 41 | + import torch.distributed as dist |
| 42 | +
|
| 43 | + logger: logging.Logger = logging.getLogger(__name__) |
| 44 | +
|
| 45 | + # Env var are preset when launching the benchmark |
| 46 | + env_rank = os.environ.get("RANK", 0) |
| 47 | + env_world_size = os.environ.get("WORLD_SIZE", 1) |
| 48 | + env_master_addr = os.environ.get("MASTER_ADDR", "localhost") |
| 49 | + env_master_port = os.environ.get("MASTER_PORT", "23456") |
| 50 | +
|
| 51 | + start = perf_counter() |
| 52 | + tcp_store = dist.TCPStore( |
| 53 | + env_master_addr, |
| 54 | + int(env_master_port), |
| 55 | + world_size=int(env_world_size), |
| 56 | + is_master=(int(env_rank) == 0), |
| 57 | + ) |
| 58 | + end = perf_counter() |
| 59 | + time_elapsed = end - start |
| 60 | + logger.info( |
| 61 | + f"Complete TCPStore init with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds." |
| 62 | + ) |
| 63 | +
|
| 64 | +Since the execution of the TCPStore server thread will be blocked until all clients are successfully connected, we take the time measured on rank 0 as the total |
| 65 | +TCPStore initialization runtime. The experiment numbers are reported in the figure below: |
| 66 | +.. figure:: /_static/img/distributed/tcpstore_init_time.png |
| 67 | + :width: 100% |
| 68 | + :align: center |
| 69 | + :alt: TCPStore Initialization Runtime Benchmark Result |
| 70 | + |
| 71 | +Figure 1. shows some significant evidence that the libuv backend is superior to the old backend: |
| 72 | + |
| 73 | +- TCPStore with libuv backend always has a faster initialization than the old backend, especially at super-large scale |
| 74 | +- The old backend would timeout at server-client connecting at 96K scale (i.e. over 30 minutes) while the libuv backend completed the initialization in 100 seconds |
| 75 | + |
| 76 | +The second benchmark we did is to measure the runtime of TCPStore ``store_based_barrier`` operation: |
| 77 | + |
| 78 | +.. code:: python |
| 79 | +
|
| 80 | + import logging |
| 81 | + import os |
| 82 | + import time |
| 83 | +
|
| 84 | + from datetime import timedelta |
| 85 | + from time import perf_counter |
| 86 | +
|
| 87 | + import torch |
| 88 | + import torch.distributed as dist |
| 89 | +
|
| 90 | + DistStoreError = torch._C._DistStoreError |
| 91 | + logger: logging.Logger = logging.getLogger(__name__) |
| 92 | +
|
| 93 | + # since dist._store_based_barrier is a private function and cannot be directly called, we need to write a function which does the same |
| 94 | + def store_based_barrier( |
| 95 | + rank, |
| 96 | + store, |
| 97 | + group_name, |
| 98 | + rendezvous_count, |
| 99 | + timeout=dist.constants.default_pg_timeout, |
| 100 | + logging_interval=timedelta(seconds=10), |
| 101 | + ): |
| 102 | + store_key = f"store_based_barrier_key:{group_name}" |
| 103 | + store.add(store_key, 1) |
| 104 | +
|
| 105 | + world_size = rendezvous_count |
| 106 | + worker_count = store.add(store_key, 0) |
| 107 | +
|
| 108 | + last_worker_key = f"{store_key}:last_worker" |
| 109 | + if worker_count == world_size: |
| 110 | + store.set(last_worker_key, "1") |
| 111 | +
|
| 112 | + start = time.time() |
| 113 | + while True: |
| 114 | + try: |
| 115 | + # This will throw an exception after the logging_interval in which we print out |
| 116 | + # the status of the group or time out officially, throwing runtime error |
| 117 | + store.wait([last_worker_key], logging_interval) |
| 118 | + break |
| 119 | + except RuntimeError as e: |
| 120 | + worker_count = store.add(store_key, 0) |
| 121 | + # Print status periodically to keep track. |
| 122 | + logger.info( |
| 123 | + "Waiting in store based barrier to initialize process group for " |
| 124 | + "rank: %s, key: %s (world_size=%s, num_workers_joined=%s, timeout=%s)" |
| 125 | + "error: %s", |
| 126 | + rank, |
| 127 | + store_key, |
| 128 | + world_size, |
| 129 | + worker_count, |
| 130 | + timeout, |
| 131 | + e, |
| 132 | + ) |
| 133 | +
|
| 134 | + if timedelta(seconds=(time.time() - start)) > timeout: |
| 135 | + raise DistStoreError( |
| 136 | + "Timed out initializing process group in store based barrier on " |
| 137 | + "rank {}, for key: {} (world_size={}, num_workers_joined={}, timeout={})".format( |
| 138 | + rank, store_key, world_size, worker_count, timeout |
| 139 | + ) |
| 140 | + ) |
| 141 | +
|
| 142 | + logger.info( |
| 143 | + "Rank %s: Completed store-based barrier for key:%s with %s nodes.", |
| 144 | + rank, |
| 145 | + store_key, |
| 146 | + world_size, |
| 147 | + ) |
| 148 | +
|
| 149 | + # Env var are preset when launching the benchmark |
| 150 | + env_rank = os.environ.get("RANK", 0) |
| 151 | + env_world_size = os.environ.get("WORLD_SIZE", 1) |
| 152 | + env_master_addr = os.environ.get("MASTER_ADDR", "localhost") |
| 153 | + env_master_port = os.environ.get("MASTER_PORT", "23456") |
| 154 | +
|
| 155 | + tcp_store = dist.TCPStore( |
| 156 | + env_master_addr, |
| 157 | + int(env_master_port), |
| 158 | + world_size=int(env_world_size), |
| 159 | + is_master=(int(env_rank) == 0), |
| 160 | + ) |
| 161 | +
|
| 162 | + # sync workers |
| 163 | + store_based_barrier(int(env_rank), tcp_store, "tcpstore_test", int(env_world_size)) |
| 164 | +
|
| 165 | + number_runs = 10 |
| 166 | + start = perf_counter() |
| 167 | + for _ in range(number_runs): |
| 168 | + store_based_barrier( |
| 169 | + int(env_rank), tcp_store, "tcpstore_test", int(env_world_size) |
| 170 | + ) |
| 171 | + end = perf_counter() |
| 172 | + time_elapsed = end - start |
| 173 | + logger.info( |
| 174 | + f"Complete {number_runs} TCPStore barrier runs with rank={env_rank}, world_size={env_world_size} in {time_elapsed} seconds." |
| 175 | + ) |
| 176 | +
|
| 177 | +We compute the average by dividing the runtime measured on rank 0 by ``number_runs`` and report it in the figure below: |
| 178 | +.. figure:: /_static/img/distributed/tcpstore_barrier_time.png |
| 179 | + :width: 100% |
| 180 | + :align: center |
| 181 | + :alt: TCPStore Barrier Runtime Benchmark Result |
| 182 | + |
| 183 | +Figure 2. shows that the I/O performance of libuv backend is comparable to the old backend: |
| 184 | + |
| 185 | +- The libuv backend has a comparable performance over the whole spectrum in terms of the number of ranks |
| 186 | +- The libuv backend runtime is more stable than the old backend as the number of ranks grows |
| 187 | + |
| 188 | + |
| 189 | +Impact |
| 190 | +------ |
| 191 | + |
| 192 | +One incompatibility that users may need to pay attention is, TCPStore currently does not support initialization with a listen fd when using libuv backend. |
| 193 | +If the user wants to keep using this initialization method, the user can simply pass ``use_libuv=False`` to stay with the old TCPStore backend. |
| 194 | + |
| 195 | +.. code:: python |
| 196 | +
|
| 197 | + import socket |
| 198 | +
|
| 199 | + import torch |
| 200 | + import torch.distributed as dist |
| 201 | +
|
| 202 | + listen_sock: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 203 | + listen_sock.bind(("localhost", 0)) |
| 204 | + addr, port, *_ = listen_sock.getsockname() |
| 205 | + listen_fd = listen_sock.detach() |
| 206 | +
|
| 207 | + tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd) # expect NotImplementedError |
| 208 | + tcpstore = dist.TCPStore(addr, port, 1, True, master_listen_fd=listen_fd, use_libuv=False) # OK. Use old backend |
| 209 | +
|
| 210 | +
|
| 211 | +Exit Route 1: Pass ``use_libuv=False`` to TCPStore Initialization |
| 212 | +----------------------------------------------------------------- |
| 213 | + |
| 214 | +As the above code snippet shows, if user calls TCPStore init method to create a store, simply passing ``use_libuv=False`` allows user to remain using the old |
| 215 | +TCPStore backend. This override has the highest priority over other approaches determining which backend the TCPStore server should choose. |
| 216 | + |
| 217 | + |
| 218 | +Exit Route 2: Add ``use_libuv=0`` to ``init_method`` at ProcessGroup Initialization |
| 219 | +----------------------------------------------------------------------------------- |
| 220 | + |
| 221 | +ProcessGroup creates a TCPStore if user does not explicitly pass one to its initialization. User can add the query option ``use_libuv=0`` to ``init_method`` when |
| 222 | +initializing the ProcessGroup. This approach has lower priority than Exit Route 1. |
| 223 | + |
| 224 | +.. code:: python |
| 225 | +
|
| 226 | + import torch |
| 227 | + import torch.distributed as dist |
| 228 | +
|
| 229 | + addr = "localhost" |
| 230 | + port = 23456 |
| 231 | + dist.init_process_group( |
| 232 | + backend="cpu:gloo,cuda:nccl", |
| 233 | + rank=0, |
| 234 | + world_size=1, |
| 235 | + init_method=f"tcp://{addr}:{port}?use_libuv=0", |
| 236 | + ) |
| 237 | + dist.destroy_process_group() |
| 238 | +
|
| 239 | +
|
| 240 | +Exit Route 3: Set Environment Variable ``USE_LIBUV`` to ``0`` |
| 241 | +------------------------------------------------------------- |
| 242 | + |
| 243 | +When ProcessGroup creates a TCPStore, it also checks the environment vairable ``USE_LIBUV`` to determine which TCPStore backend to use. User can set the environment |
| 244 | +variable ``"USE_LIBUV"`` to ``"0"`` to specify the use of old TCPStore backend. This approach has lower priority than Exit Route 2, i.e. if the user sets environment |
| 245 | +variable ``USE_LIBUV`` to ``1`` and also passes ``use_libuv=0`` in ``init_method``, then the old store backend will be chosen. |
| 246 | + |
| 247 | +.. code:: python |
| 248 | +
|
| 249 | + import os |
| 250 | +
|
| 251 | + import torch |
| 252 | + import torch.distributed as dist |
| 253 | +
|
| 254 | + addr = "localhost" |
| 255 | + port = 23456 |
| 256 | + os.environ["USE_LIBUV"] = "0" |
| 257 | + dist.init_process_group( |
| 258 | + backend="cpu:gloo,cuda:nccl", |
| 259 | + rank=0, |
| 260 | + world_size=1, |
| 261 | + init_method=f"tcp://{addr}:{port}", |
| 262 | + ) |
| 263 | + dist.destroy_process_group() |
| 264 | +
|
| 265 | +
|
| 266 | +Conclusion |
| 267 | +---------- |
| 268 | +In PyTorch 2.4, we made the new libuv TCPStore backend the default. Although the new backend has incompatibility with initialization from a listen fd, it |
| 269 | +shows significant performance improvement on store initialization at large-scale and compatible performance on store I/O at small/medium/large scales, which |
| 270 | +brings a major benefit to Distributed Training's control plane. This tutorial explains our motivation, goes through the performance benchmark, notifies users |
| 271 | +of the potential impact, and introduces three exit routes to remain using the old backend. In the long term, we aim to eventually deprecate the old backend. |
0 commit comments