[PyTorch] Writing Distributed Applications with Pytorch

2022. 6. 10. 13:28Tool/PyTorch

Introduction

The distributed package included in PyTorch(i.e., torch.distributed) enables researchers and practitioners to easily parallelize their computations across processes and clusters of machines. To do so, it leverages message passing semantics allowing each process to communicate data to any of the other processes.

Setup

In order to get started we need the ability to run multiple processes simultaneously. So, we run two python prompt(=process).

# 1st prompt
in32
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> import torch.distributed as dist
>>> os.getpid()
1092
>>> os.environ['MASTER_ADDR'] = '127.0.0.1'
>>> os.environ['MASTER_PORT'] = '29500'
>>> dist.init_process_group('gloo', rank=0, world_size=2)
>>>

# 2nd prompt
in32
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> import torch.distributed as dist
>>> os.getpid()
12504
>>> os.environ['MASTER_ADDR'] = '127.0.0.1'
>>> os.environ['MASTER_PORT'] = '29500'
>>> dist.init_process_group('gloo', rank=1, world_size=2)
>>>

To set up the distributed setting, we have to initialize the process group at each prompt(=process).

Point-to-Point Communication

A transfer of data from one process to another is called a point-to-point communication. These are achieved through the send and recv functions or their immediate counter-parts, isend and irecv.

# 1st prompt
>>> import torch
>>> tensor = torch.rand(2)
>>> tensor
tensor([0.1447, 0.4135])
>>> dist.send(tensor, 1)
>>> tensor
tensor([0.1447, 0.4135])
>>>

# 2nd prompt
>>> import torch
>>> tensor = torch.ones(2)
>>> tensor
tensor([1., 1.])
>>> dist.recv(tensor, 0)
0
>>> tensor
tensor([0.1447, 0.4135])
>>>

Notice that send/recv are blocking: both processes stop until the communication is completed. On the other land immediates are non-blocking; the script continues its execution and the methods return a Work object upon which we can choose to wait().

Collective Coummication

As opposed to point-to-point communication, collectives allow for communication patterns across all processes in a group. A group is a subset of all our processes. To create a group, we can pass a list of ranks to dist.new_group(group). By default, collectives are executed on all processes, also known as the world.

# 1st prompt
>>> dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
>>> tensor
tensor([0.2894, 0.8269])
>>>

# 2nd prompt
>>> dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
>>> tensor
tensor([0.2894, 0.8269])
>>>

Pytorch have four communicatve mathematical operations:

  • dist.ReduceOp.SUM
  • dist.ReduceOp.PRODUCT
  • dist.ReduceOp.MAX
  • dist.ReduceOp.MIN

 

In addition to dist.all_reduce(tensor, op, group), there are a total of six collectives currently implemented by PyTorch.

  • dist.broadcast(tensor, src, group): Copies tensor from src to all other processes.
  • dist.reduce(tensor, dst, op, group): Applies op to every tensor and stores the result in dist.
  • dist.all_reduce(tensor, op, group): Same as reduce, but the result is stored in all processes.
  • dist.scatter(tensor, scatter_list, src, group): Copies the $i^\text{th}$ tensor scatter_list[i] to the $i^\text{th}$ process.
  • dist.gather(tensor, gather_list, dst, group): Copies tensor from all processes in dst.
  • dist.all_gather(tensor_list, tensor, group): Copies tensor from all processes to tensor_list, on all processes.
  • dist.barrier(group): Blocks all processes in group until each one has entered this function.

Initialization Methods

We will go over the different initialization methods which are responsible for the initial coordination step between each process.

1. Environment Variable

By setting the following four environment variables on all machines, all processes will be able to properly connect to the master, obtain information about the other processes, and finally handshake with them.

  • MASTER_PORT: A free port on the machine that will host the process with rank 0.
  • MASTER_ADDR: IP address of the machine that will host the process with rank 0.
  • WORLD_SIZE: The total number of processes, so that the master knows how many workers to wait for.
  • RANK: Rank of each process, so they will know whether it is the master of a worker.

2. TCP

Initalizing via TCP can be achieved by providing the IP address of the process with rank 0 and a reachable port number. Here, all workers will be able to connect to the process with rank 0 and exchange information on how to reach each other.

dist.init_process_group(
    init_method:'tcp://10.1.1.20:23456',
    rank=args.rank,
    world_size=4)