카테고리 없음

DistributedDataParallel in Pytorch

잡담연구소 2021. 8. 29. 23:05

개쩌는 의식의 흐름대로 쓰는 distributed data parallel

 

https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

 

Getting Started with Distributed Data Parallel — PyTorch Tutorials 1.9.0+cu102 documentation

Getting Started with Distributed Data Parallel Author: Shen Li Edited by: Joe Zhu Prerequisites: DistributedDataParallel (DDP) implements data parallelism at the module level which can run across multiple machines. Applications using DDP should spawn multi

pytorch.org

 

 

 

Point-to-Point Communication 

: 프로세스 2개가 서로 데이터를 주고받는 방법 , send/recv 함수를 통해 데이터를 송신함.

Collective Communication

: 그룹 내 모든 프로세스끼리 데이터를 주고받는 방법 , 

- world라는 단위의 그룹 내 프로세스끼리 데이터를 주고 받는데, world의 디폴트는 모든 프로세스다. 

- dist.new_group([그룹을 이루고 싶은 랭크들]) 로 world를 재 설정할 수있다. 

import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def setup(rank, world_size):
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "12355"
    dist.init_process_group("gloo", rank=rank, world_size=world_size)


def cleanup():
    dist.destroy_process_group()


def run(rank, size):
    """Simple collective communication."""
    setup(rank, size)
    group = dist.new_group([0, 1])
    tensor = torch.ones(1).cuda()
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
    print("Rank ", rank, " has data ", tensor[0])


if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    mp.spawn(run, args=(world_size,), nprocs=world_size, join=True)

collective communcation 중 "all-reduce" 에 대한 예제이다. 

진짜 프로세스끼리 소통(?)할 수 있나...? 🤔 하고 해봤는데 오 진짜 된다. 

지금 vm에서 작업 중인데 vm 내에는 v100이 4개가 있다. 그래서 world_size가 4가 되고, 이대로 프로세스 그룹을 초기화 했다. 

그 후 run 함수에서  rank가 0,1인 프로세스들만을 가지고 새로운 group을 만들었다. 그리고 torch.distributed.all_reduce 함수에 새로운 group을 넣어주고 매 Rank마다 출력문을 찍어보면,,,,!!?!??! 

# Rank  2  has data  tensor(1., device='cuda:0')
# Rank  3  has data  tensor(1., device='cuda:0')
# Rank  1  has data  tensor(2., device='cuda:0')
# Rank  0  has data  tensor(2., device='cuda:0')

새로운 group인 0,1 rank들을 서로 데이터들끼리 주고 받아 "all_reduce"연산을 해서 2가 tensor가 2가 됐다.

하지만 연산자 내 그룹에 속하지 못한 2,3은 tensor가 그대로 1인 걸 볼 수 있다. 

오오오오오오오옹 신기하다.

all reduce 말고도 6개의 연산이 존재한다. 

 

하나하나 해보다가 X나 신기한 거 발견 

reduce는 all_reduce 와 다르게 연산에 대한 결과를 dst에 저장해준다. 

def run(rank, size):
    """Simple collective communication."""
    setup(rank, size)
    tensor = torch.ones(1).cuda()
    dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM)
    print("Rank ", rank, " has data ", tensor[0])

이렇게 되면 0을 제외한 모든 rank는 1이고, 0만 4일줄 알았는데 아래처럼 순차적으로 나온다,,, 왜지?

Rank  3  has data  tensor(1., device='cuda:0')
Rank  1  has data  tensor(3., device='cuda:0')
Rank  2  has data  tensor(2., device='cuda:0')
Rank  0  has data  tensor(4., device='cuda:0')

 

all_gather는 모든 그룹에 있는 tensor들이 하나의 list (= tensor_list)로 저장된다. 

tensor_list를 넘겨줘야 하는데,,, 그러면 미리 gather되었을 때 어떤 모양일지, 각 process에서 gather할 tensor들이 어떻게 생겼을 지 알아야 하는 건가?

 

오 이래서 백엔드를 nccl을 선택하는 건가 보다. 

근데 백엔드가 왜 필요한건지 모르겠다; 네트워크의 영역인 것인가?

 

 

페이지 맨 처음에 나오는 dist.init_process_group을 보자.

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP


def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

근데 공식 docs를 보고 좀 이상하다고 느꼈는데,,,

분명 프로세스 그룹을 초기화 하기 위해서는 

1. store, rank, world_size를 명시해주거나

2. init_method를 구체화해서 넣어주라고 한다. 

 

근데 왜 위의 basic usecase의 코드에는 rank, world_size만 명시해주고 환경변수로 addr, port를 받는걸까,...ㅜㅜ...

그래서 코드를 까본 건 아니고 좀 타고타고 들어가봤다. 

위 그림의 조건에서 1,2를 둘 다 만족시키지 못하면 Init_method가 env://로 자동 설정된다. 

rendezvous함수로 넘어가는데 자동으로 환경변수 설정한 거 중에 port랑 ip adress가 있는 지 찾는다. 

 

 



느낌 점,,, 네트워크 ㅈ됐다 시급하다