Shortcuts

Source code for torch.distributed.rpc

from __future__ import absolute_import, division, print_function, unicode_literals

import numbers
import sys

import torch
import torch.distributed as dist


def is_available():
    return sys.version_info >= (3, 0) and hasattr(torch._C, "_rpc_init")


if is_available() and not torch._C._rpc_init():
    raise RuntimeError("Failed to initialize torch.distributed.rpc")


if is_available():
    from . import api, backend_registry
    from .api import *  # noqa: F401
    import torch.distributed.autograd as dist_autograd

[docs] def init_rpc( name, backend=backend_registry.BackendType.PROCESS_GROUP, rank=-1, world_size=None, rpc_backend_options=None, ): r""" Initializes RPC primitives such as the local RPC agent and distributed autograd. Initializes the local RPC agent which immediately makes the current process ready to send and receive RPCs. This method also properly initializes a default process group backend that uses Gloo for communication. Arguments: backend (Enum): type of RPC backend implementation. Currently, process group backend is the only available backend implementation. (default: ``RpcBackend.PROCESS_GROUP``). name (str): a globally unique name of this node. (e.g., ``Trainer3``, ``ParameterServer2``, ``Master``, ``Worker1``) Name can only contain number, alphabet, underscore, and/or dash, and must be shorter than 128 characters. rank (int): a globally unique id/rank of this node. world_size (int): The number of workers in the group. rpc_backend_options (RpcBackendOptions): The options passed to RpcAgent constructor. It contains RpcAgent specific initialization configurations. By default, it contains ``rpc_timeout = timedelta(seconds=60)``, ``init_method = "env://"``, ``num_send_recv_threads = 4`` for process group agent. If using the default ``rpc_backend_options``, RPC would initialize the underlying process group backend using ``init_method = "env://"``, meaning that environment variables ``MASTER_ADDRESS`` and ``MASTER_PORT`` needs to be set properly. See :class:`~torch.distributed.rpc.ProcessGroupRpcBackendOptions` for examples. """ if not rpc_backend_options: # default construct a set of RPC backend options. rpc_backend_options = backend_registry.construct_rpc_backend_options( backend ) # Rendezvous. # This rendezvous state sometimes is destroyed before all processes # finishing handshaking. To avoid that issue, we make it global to # keep it alive. global rendezvous_iterator rendezvous_iterator = torch.distributed.rendezvous( rpc_backend_options.init_method, rank=rank, world_size=world_size ) store, _, _ = next(rendezvous_iterator) # Initialize autograd before RPC since _init_rpc_backend guarantees all # processes sync via the store. If we initialize autograd after RPC, # there could be a race where some nodes might have initialized autograd # and others might not have. As a result, a node calling # torch.distributed.autograd.backward() would run into errors since # other nodes might not have been initialized. dist_autograd._init(rank) # Initialize RPC. api._init_rpc_backend(backend, store, name, rank, world_size, rpc_backend_options)
@api._require_initialized def _get_debug_info(): from . import _rref_context_get_debug_info info = _rref_context_get_debug_info() info.update(api._get_current_rpc_agent().get_debug_info()) info.update(dist_autograd._get_debug_info()) return info

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources