Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dygraph] Support process group in dp with fleet api #41119

Merged
merged 6 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/paddle/distributed/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def train():
"required to create a process group.")
master_addr = os.getenv("MASTER_ADDR", None)
master_port = os.getenv("MASTER_PORT", None)
endpoints = None
if not master_addr or not master_port:
endpoints = os.getenv("PADDLE_MASTER", None)
if endpoints is None:
Expand Down
32 changes: 10 additions & 22 deletions python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,16 +398,6 @@ def sync_params_buffers(model,
'axis': 0})


@imperative_base.no_grad
@framework.dygraph_only
def sync_eager_params(model, comm_group=None, src_rank=0):
for _, param in model._obtain_parameters_buffers().items():
if not isinstance(param, core.eager.Tensor):
raise TypeError("The data type of '%s' must be '%s'" %
(param.name, core.eager.Tensor))
comm_group.broadcast(param, src_rank).synchronize()


class DataParallel(layers.Layer):
"""
Run the dygraph module with data parallelism.
Expand Down Expand Up @@ -575,7 +565,7 @@ def __init__(self,
comm_buffer_size=25,
last_comm_buffer_size=1,
find_unused_parameters=False,
process_group=None):
group=None):
super(DataParallel,
self).__init__(layers.full_name() + "_data_parallel")

Expand All @@ -585,7 +575,7 @@ def __init__(self,
self._layers = layers
self.find_unused_parameters = find_unused_parameters
self.grad_need_sync = True
self.process_group = process_group
self.group = group
self.var_dtype = core.eager.Tensor if in_dygraph_mode(
) else core.VarBase

Expand All @@ -604,20 +594,18 @@ def __init__(self,
"ParallelContext must be initialized before. You should use init_parallel_env() before" \
"constructing the DataParallel."

if self.process_group is None and in_dygraph_mode():
raise RuntimeError(
"Process group should be built for DataParallel in eager mode."
)
if in_dygraph_mode():
self.group = paddle.distributed.collective._get_default_group(
) if self.group is None else self.group

assert isinstance(self.group, paddle.distributed.collective.Group), \
"ProcessGroup must be an instance of Group in DataParallel."

# sync buffer and params
# TODO(liuyuhui) Currently not support xpu. xpu is
# still broadcasting parameters when calling layer
if not paddle.is_compiled_with_xpu():
if in_dygraph_mode():
sync_eager_params(
self._layers, comm_group=self.process_group)
elif _in_legacy_dygraph():
sync_params_buffers(self._layers)
sync_params_buffers(self._layers)

self.comm_buffer_size = int(comm_buffer_size * 1024 * 1024)
# NOTE(shenliang03): We can set environment variables to control
Expand Down Expand Up @@ -678,7 +666,7 @@ def check_layer_sparse(sublayer):
self._reducer = core.EagerReducer(
trainable_parameters,
list(reversed(self.group_indices)), is_sparse_gradient,
self.process_group,
self.group.process_group,
[self.last_comm_buffer_size, self.comm_buffer_size],
self.find_unused_parameters)
elif _in_legacy_dygraph():
Expand Down
17 changes: 6 additions & 11 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ if (WITH_GPU OR WITH_XPU OR WITH_ASCEND OR WITH_ASCEND_CL)
endif()
list(APPEND DIST_TEST_OPS test_parallel_dygraph_unused_variables)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_control_flow_in_eager_mode)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync_in_eager_mode)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_dataparallel)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_pipeline_parallel)
Expand Down Expand Up @@ -278,9 +276,7 @@ if ((NOT WITH_GPU) AND (NOT WITH_ROCM))
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_transformer)
LIST(REMOVE_ITEM TEST_OPS test_parallel_dygraph_sync_batch_norm)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_control_flow_in_eager_mode)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync_in_eager_mode)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_no_sync_gradient_check)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_dataparallel)
list(REMOVE_ITEM TEST_OPS test_parallel_dygraph_pipeline_parallel)
Expand Down Expand Up @@ -1127,12 +1123,11 @@ set_tests_properties(test_cumprod_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_split_program PROPERTIES TIMEOUT 120)
if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)
set_tests_properties(test_parallel_dygraph_dataparallel PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_mnist PROPERTIES TIMEOUT 120)
set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 300)
set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_control_flow_in_eager_mode PROPERTIES TIMEOUT 150)
set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 150)
set_tests_properties(test_parallel_dygraph_no_sync_in_eager_mode PROPERTIES TIMEOUT 150)
set_tests_properties(test_parallel_dygraph_mnist PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_se_resnext PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_unused_variables PROPERTIES TIMEOUT 350)
set_tests_properties(test_parallel_dygraph_control_flow PROPERTIES TIMEOUT 350)
set_tests_properties(test_parallel_dygraph_no_sync PROPERTIES TIMEOUT 300)
set_tests_properties(test_parallel_dygraph_no_sync_gradient_check PROPERTIES TIMEOUT 30)
set_tests_properties(test_parallel_dygraph_pipeline_parallel PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_tensor_parallel PROPERTIES TIMEOUT 200)
Expand All @@ -1154,8 +1149,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU AND WITH_NCCL)

if(${NCCL_VERSION} VERSION_GREATER_EQUAL 2212)
set_tests_properties(test_parallel_dygraph_sparse_embedding PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_transformer PROPERTIES TIMEOUT 200)
set_tests_properties(test_parallel_dygraph_sparse_embedding_over_height PROPERTIES TIMEOUT 150)
set_tests_properties(test_parallel_dygraph_transformer PROPERTIES TIMEOUT 150)
endif()
endif()

Expand Down
2 changes: 2 additions & 0 deletions python/paddle/fluid/tests/unittests/dygraph_fleet_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ def test_dygraph_fleet_api(self):


if __name__ == "__main__":
with _test_eager_guard():
pass
unittest.main()

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,6 @@
out_dim = 20


def init_process_group(strategy=None):
nranks = ParallelEnv().nranks
rank = ParallelEnv().local_rank
is_master = True if rank == 0 else False
current_env = copy.copy(os.environ.copy())
port = 6175
if 'PADDLE_DIST_UT_PORT' in current_env.keys():
port = int(current_env['PADDLE_DIST_UT_PORT'])
store = paddle.fluid.core.TCPStore("127.0.0.1", port, is_master, nranks)
group = core.ProcessGroupNCCL(store, rank, nranks)
return group


class SimpleNet(fluid.Layer):
def __init__(self, train_id):
super(SimpleNet, self).__init__()
Expand Down Expand Up @@ -83,12 +70,9 @@ def forward(self, x):

class TestDistTraning(unittest.TestCase):
def test_multiple_gpus(self):
dist.init_parallel_env()
self.trainer_id = dist.get_rank()

process_group = init_process_group()
self.pg = process_group
with _test_eager_guard():
self.pg = dist.init_parallel_env()

model_a = SimpleNet(self.trainer_id)
model_b = SimpleNet(self.trainer_id)
Expand All @@ -97,13 +81,9 @@ def test_multiple_gpus(self):
model_b.set_state_dict(state_dict)

model_a = paddle.DataParallel(
model_a,
find_unused_parameters=True,
process_group=process_group)
model_a, find_unused_parameters=True, group=self.pg)
model_b = paddle.DataParallel(
model_b,
find_unused_parameters=True,
process_group=process_group)
model_b, find_unused_parameters=True, group=self.pg)

ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
Expand Down Expand Up @@ -150,7 +130,7 @@ def print_trainer_0(self, *args):
print(*args)

def broadcast_param(self, param, root):
self.pg.broadcast(param, root)
self.pg.process_group.broadcast(param, root)
return param

def check_gradient(self, params):
Expand Down
Loading