-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
55 lines (46 loc) · 1.89 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
import torch.distributed as dist
from torch.utils.data import DataLoader
DEFAULT_MASTER_ADDR = '127.0.0.1'
DEFAULT_MASTER_PORT = '1234'
def init_dist_process_group(backend='nccl', is_high_priority=True):
if os.environ.get('LOCAL_RANK', None) is not None:
local_rank = int(os.environ['LOCAL_RANK'])
world_rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
local_size = int(os.environ['LOCAL_SIZE'])
elif os.environ.get('SLURM_JOBID', None) is not None:
local_rank = int(os.environ['SLURM_LOCALID'])
world_rank = int(os.environ['SLURM_PROCID'])
world_size = int(os.environ['SLURM_NTASKS'])
local_size = int(os.environ['SLURM_NTASKS_PER_NODE'])
else:
local_rank = 0
world_rank = 0
world_size = 1
local_size = 1
if world_size > 1:
assert dist.is_available()
master_addr = os.environ.get('MASTER_ADDR', DEFAULT_MASTER_ADDR)
master_port = os.environ.get('MASTER_PORT', DEFAULT_MASTER_PORT)
init_method = 'tcp://' + master_addr + ':' + master_port
if backend == 'nccl' and is_high_priority:
pg_options = dist.ProcessGroupNCCL.Options(is_high_priority_stream=True)
else:
pg_options = None
dist.init_process_group(backend,
init_method=init_method,
rank=world_rank,
world_size=world_size,
pg_options=pg_options)
assert dist.get_rank() == world_rank
assert dist.get_world_size() == world_size
return local_rank, local_size, world_rank, world_size
def get_data_fetch_fn(loader: DataLoader):
fetcher = iter(loader)
def next_batch():
try:
return next(fetcher)
except StopIteration:
return None
return next_batch