2022. 6. 10. 13:28ㆍTool/PyTorch
Introduction
The distributed package included in PyTorch(i.e., torch.distributed
) enables researchers and practitioners to easily parallelize their computations across processes and clusters of machines. To do so, it leverages message passing semantics allowing each process to communicate data to any of the other processes.
Setup
In order to get started we need the ability to run multiple processes simultaneously. So, we run two python prompt(=process).
# 1st prompt
in32
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> import torch.distributed as dist
>>> os.getpid()
1092
>>> os.environ['MASTER_ADDR'] = '127.0.0.1'
>>> os.environ['MASTER_PORT'] = '29500'
>>> dist.init_process_group('gloo', rank=0, world_size=2)
>>>
# 2nd prompt
in32
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> import torch.distributed as dist
>>> os.getpid()
12504
>>> os.environ['MASTER_ADDR'] = '127.0.0.1'
>>> os.environ['MASTER_PORT'] = '29500'
>>> dist.init_process_group('gloo', rank=1, world_size=2)
>>>
To set up the distributed setting, we have to initialize the process group at each prompt(=process).
Point-to-Point Communication
A transfer of data from one process to another is called a point-to-point communication. These are achieved through the send
and recv
functions or their immediate counter-parts, isend
and irecv
.
# 1st prompt
>>> import torch
>>> tensor = torch.rand(2)
>>> tensor
tensor([0.1447, 0.4135])
>>> dist.send(tensor, 1)
>>> tensor
tensor([0.1447, 0.4135])
>>>
# 2nd prompt
>>> import torch
>>> tensor = torch.ones(2)
>>> tensor
tensor([1., 1.])
>>> dist.recv(tensor, 0)
0
>>> tensor
tensor([0.1447, 0.4135])
>>>
Notice that send
/recv
are blocking: both processes stop until the communication is completed. On the other land immediates are non-blocking; the script continues its execution and the methods return a Work
object upon which we can choose to wait()
.
Collective Coummication
As opposed to point-to-point communication, collectives allow for communication patterns across all processes in a group. A group is a subset of all our processes. To create a group, we can pass a list of ranks to dist.new_group(group)
. By default, collectives are executed on all processes, also known as the world.
# 1st prompt
>>> dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
>>> tensor
tensor([0.2894, 0.8269])
>>>
# 2nd prompt
>>> dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
>>> tensor
tensor([0.2894, 0.8269])
>>>
Pytorch have four communicatve mathematical operations:
dist.ReduceOp.SUM
dist.ReduceOp.PRODUCT
dist.ReduceOp.MAX
dist.ReduceOp.MIN
In addition to dist.all_reduce(tensor, op, group)
, there are a total of six collectives currently implemented by PyTorch.
dist.broadcast(tensor, src, group)
: Copiestensor
fromsrc
to all other processes.dist.reduce(tensor, dst, op, group)
: Appliesop
to everytensor
and stores the result indist
.dist.all_reduce(tensor, op, group)
: Same as reduce, but the result is stored in all processes.dist.scatter(tensor, scatter_list, src, group)
: Copies the $i^\text{th}$ tensorscatter_list[i]
to the $i^\text{th}$ process.dist.gather(tensor, gather_list, dst, group)
: Copiestensor
from all processes indst
.dist.all_gather(tensor_list, tensor, group)
: Copiestensor
from all processes totensor_list
, on all processes.dist.barrier(group)
: Blocks all processes in group until each one has entered this function.
Initialization Methods
We will go over the different initialization methods which are responsible for the initial coordination step between each process.
1. Environment Variable
By setting the following four environment variables on all machines, all processes will be able to properly connect to the master, obtain information about the other processes, and finally handshake with them.
MASTER_PORT
: A free port on the machine that will host the process with rank 0.MASTER_ADDR
: IP address of the machine that will host the process with rank 0.WORLD_SIZE
: The total number of processes, so that the master knows how many workers to wait for.RANK
: Rank of each process, so they will know whether it is the master of a worker.
2. TCP
Initalizing via TCP can be achieved by providing the IP address of the process with rank 0 and a reachable port number. Here, all workers will be able to connect to the process with rank 0 and exchange information on how to reach each other.
dist.init_process_group(
init_method:'tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)