From 2ef88d6f0726a773028751f859a122e9dcd6ff96 Mon Sep 17 00:00:00 2001 From: GhostScreaming Date: Wed, 13 Sep 2023 20:39:09 +0800 Subject: [PATCH 1/5] Add paddle.distributed.reshard API. It supports reshard for DistTensor. --- paddle/fluid/pybind/auto_parallel_py.cc | 23 +++ python/paddle/distributed/__init__.py | 1 + .../paddle/distributed/auto_parallel/api.py | 39 +++++ test/auto_parallel/reshard_api.py | 144 ++++++++++++++++++ test/auto_parallel/test_reshard_api.py | 44 ++++++ 5 files changed, 251 insertions(+) create mode 100644 test/auto_parallel/reshard_api.py create mode 100644 test/auto_parallel/test_reshard_api.py diff --git a/paddle/fluid/pybind/auto_parallel_py.cc b/paddle/fluid/pybind/auto_parallel_py.cc index 27d6a75ba0736..875c686f6d4f5 100644 --- a/paddle/fluid/pybind/auto_parallel_py.cc +++ b/paddle/fluid/pybind/auto_parallel_py.cc @@ -32,11 +32,15 @@ #include "paddle/fluid/distributed/auto_parallel/spmd_rules/common.h" #include "paddle/fluid/distributed/auto_parallel/spmd_rules/dist_tensor_spec.h" +#include "paddle/phi/api/lib/data_transform.h" +#include "paddle/phi/backends/context_pool.h" +#include "paddle/phi/core/dense_tensor.h" #include "paddle/phi/core/distributed/auto_parallel/dist_tensor.h" #include "paddle/phi/core/distributed/auto_parallel/p_to_r_reshard_function.h" #include "paddle/phi/core/distributed/auto_parallel/r_to_p_reshard_function.h" #include "paddle/phi/core/distributed/auto_parallel/r_to_s_reshard_function.h" #include "paddle/phi/core/distributed/auto_parallel/s_to_r_reshard_function.h" +#include "paddle/phi/core/enforce.h" #ifdef PADDLE_WITH_DISTRIBUTE #include "paddle/phi/infermeta/spmd_rules/rules.h" @@ -625,6 +629,25 @@ void BindAutoParallel(py::module *m) { }, py::return_value_policy::reference); + m->def( + "reshard", + [](py::handle py_tensor, const TensorDistAttr &dist_attr) { + auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); + auto dev_ctx = phi::DeviceContextPool::Instance().Get(tensor.place()); + if (phi::distributed::DistTensor::classof(tensor.impl().get())) { + auto dist_out_ptr = paddle::experimental::ReshardDistTensor( + dev_ctx, tensor, dist_attr); + return paddle::Tensor(dist_out_ptr); + } else { + PADDLE_THROW(phi::errors::InvalidArgument( + "The input tensor of shard function should be " + "``phi::distributed::DistTensor``. " + "However it's %s", + typeid(*tensor.impl().get()).name())); + } + }, + py::return_value_policy::reference); + // TODO(liuzhenhai): DistributedMapper is not used for now, but // dist_mapper_test need the symbols forch DistributedMapper to be linked, // remove it latter diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index fe914bbb3422a..996a4376819b3 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -67,6 +67,7 @@ from .auto_parallel import shard_op # noqa: F401 from .auto_parallel.api import shard_tensor # noqa: F401 from .auto_parallel.api import dtensor_from_fn # noqa: F401 +from .auto_parallel.api import reshard # noqa: F401 from .fleet import BoxPSDataset # noqa: F401 diff --git a/python/paddle/distributed/auto_parallel/api.py b/python/paddle/distributed/auto_parallel/api.py index 680b9cc95bc2b..5cdfe59301265 100644 --- a/python/paddle/distributed/auto_parallel/api.py +++ b/python/paddle/distributed/auto_parallel/api.py @@ -171,3 +171,42 @@ def dtensor_from_fn(fn, dist_attr, *args, **kwargs): """ tensor = fn(*args, **kwargs) return shard_tensor(tensor, dist_attr=dist_attr) + + +def reshard(dist_tensor, dist_attr): + """ + Reshard a distributed ``paddle.Tensor`` with given distributed attributes. + + Args: + data(Tensor): the distributed tensor to be resharded. + dist_attr(paddle.distributed.DistAttr): Specify how tensors are distributed or sliced on ProcessMesh. + + Returns: + Tensor: A Distributed Tensor reshared with distributed attributes. + + Examples: + + .. code-block:: python + + import paddle + import paddle.distributed as dist + + mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=["x", "y"]) + dist_attr = dist.DistAttr(mesh=mesh, sharding_specs=['x', 'y']) + + out_mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=["x", "y"]) + out_dist_attr = dist.DistAttr(mesh=out_mesh, sharding_specs=[None, None]) + + # dense tensor + a = paddle.to_tensor([[1,2,3], + [5,6,7]]) + # distributed tensor + d_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + + out_d_tensor = dist.reshard(d_tensor, out_dist_attr) + + print(d_tensor) + print(out_d_tensor) + """ + + return paddle.base.core.reshard(dist_tensor, dist_attr) diff --git a/test/auto_parallel/reshard_api.py b/test/auto_parallel/reshard_api.py new file mode 100644 index 0000000000000..8d3d8d8144ab0 --- /dev/null +++ b/test/auto_parallel/reshard_api.py @@ -0,0 +1,144 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import numpy as np + +import paddle +import paddle.distributed as dist + + +class TestReshardAPI: + def __init__(self): + self._shape = eval(os.getenv("shape")) + self._dtype = os.getenv("dtype") + self._seeds = eval(os.getenv("seeds")) + self._backend = os.getenv("backend") + self._mesh = dist.ProcessMesh([0, 1], dim_names=["x"]) + + def run_test_cases(self): + if self._backend == "cpu": + paddle.set_device("cpu") + self.test_case_p_to_r() + self.test_case_r_to_p() + + for shape in [[5, 7], [10, 20]]: + for shard in [0, 1]: + self._shape = shape + self._shard = shard + self.test_case_r_to_s() + + for shard in [0, 1]: + self._shard = shard + self.test_case_s_to_r() + + def test_case_p_to_r(self): + a = paddle.ones(self._shape) + in_shard_specs = [None for i in range(len(self._shape))] + out_shard_specs = [None for i in range(len(self._shape))] + dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=in_shard_specs + ) + dist_attr._set_partial_dims([0]) + out_dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=out_shard_specs + ) + + input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) + + input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + assert np.equal(output_tensor.shape, input_tensor.shape).all() + np.testing.assert_equal(output_tensor._local_value().numpy(), a.numpy()) + + def test_case_r_to_p(self): + a = paddle.ones(self._shape) + in_shard_specs = [None for i in range(len(self._shape))] + out_shard_specs = [None for i in range(len(self._shape))] + dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=in_shard_specs + ) + out_dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=out_shard_specs + ) + out_dist_attr._set_partial_dims([0]) + + input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) + + if dist.get_rank() == 0: + np.testing.assert_equal( + output_tensor._local_value().numpy(), input_tensor.numpy() + ) + else: + zeros = paddle.zeros(self._shape) + np.testing.assert_equal( + output_tensor._local_value().numpy(), zeros.numpy() + ) + assert np.equal(output_tensor.shape, input_tensor.shape).all() + assert np.equal( + output_tensor._local_shape, input_tensor._local_shape + ).all() + + def test_case_r_to_s(self): + a = paddle.ones(self._shape) + in_shard_specs = [None for i in range(len(self._shape))] + out_shard_specs = [None for i in range(len(self._shape))] + out_shard_specs[self._shard] = "x" + dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=in_shard_specs + ) + out_dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=out_shard_specs + ) + + input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) + + out_shape = list(self._shape) + if out_shape[self._shard] % 2 == 0: + out_shape[self._shard] = out_shape[self._shard] // 2 + np.testing.assert_equal(output_tensor.numpy(), input_tensor.numpy()) + else: + out_shape[self._shard] = ( + out_shape[self._shard] // 2 + if dist.get_rank() == 1 + else out_shape[self._shard] // 2 + 1 + ) + + assert np.equal(output_tensor.shape, input_tensor.shape).all() + assert np.equal(output_tensor._local_shape, out_shape).all() + + def test_case_s_to_r(self): + a = paddle.ones(self._shape) + in_shard_specs = [None for i in range(len(self._shape))] + in_shard_specs[self._shard] = "x" + out_shard_specs = [None for i in range(len(self._shape))] + + dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=in_shard_specs + ) + out_dist_attr = dist.DistAttr( + mesh=self._mesh, sharding_specs=out_shard_specs + ) + + input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) + output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) + assert np.equal(output_tensor.shape, output_tensor._local_shape).all() + assert np.equal(output_tensor.shape, input_tensor.shape).all() + + +if __name__ == '__main__': + TestReshardAPI().run_test_cases() diff --git a/test/auto_parallel/test_reshard_api.py b/test/auto_parallel/test_reshard_api.py new file mode 100644 index 0000000000000..b4b8206f8fc29 --- /dev/null +++ b/test/auto_parallel/test_reshard_api.py @@ -0,0 +1,44 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import collective.test_communication_api_base as test_base + + +class TestReshardAPI(test_base.CommunicationTestDistBase): + def setUp(self): + super().setUp(num_of_devices=2, timeout=120) + self._default_envs = { + "shape": "(10, 20)", + "dtype": "float32", + "seeds": str(self._seeds), + } + self._changeable_envs = { + "backend": ["cpu", "gpu"], + } + + def test_reshard_api(self): + envs_list = test_base.gen_product_envs_list( + self._default_envs, self._changeable_envs + ) + for envs in envs_list: + self.run_test_case( + "reshard_api.py", + user_defined_envs=envs, + ) + + +if __name__ == "__main__": + unittest.main() From 50d2740c5be54c72f93c934f0f28d520e3a32523 Mon Sep 17 00:00:00 2001 From: GhostScreaming Date: Thu, 14 Sep 2023 18:14:43 +0800 Subject: [PATCH 2/5] Polish code with review comments. --- paddle/fluid/pybind/auto_parallel_py.cc | 21 ++++++- .../paddle/distributed/auto_parallel/api.py | 10 +++- test/auto_parallel/reshard_api.py | 59 +------------------ test/auto_parallel/test_reshard_api.py | 1 + 4 files changed, 28 insertions(+), 63 deletions(-) diff --git a/paddle/fluid/pybind/auto_parallel_py.cc b/paddle/fluid/pybind/auto_parallel_py.cc index 875c686f6d4f5..66826addd4f7b 100644 --- a/paddle/fluid/pybind/auto_parallel_py.cc +++ b/paddle/fluid/pybind/auto_parallel_py.cc @@ -634,16 +634,31 @@ void BindAutoParallel(py::module *m) { [](py::handle py_tensor, const TensorDistAttr &dist_attr) { auto tensor = CastPyArg2Tensor(py_tensor.ptr(), 0); auto dev_ctx = phi::DeviceContextPool::Instance().Get(tensor.place()); + std::shared_ptr dist_out_ptr = nullptr; if (phi::distributed::DistTensor::classof(tensor.impl().get())) { - auto dist_out_ptr = paddle::experimental::ReshardDistTensor( - dev_ctx, tensor, dist_attr); + auto tensor_in = tensor.impl(); + if (tensor_in) { + phi::distributed::DistTensor *dist_tensor = + static_cast(tensor_in.get()); + if (dist_tensor->dist_attr() != dist_attr) { + VLOG(6) << "reshard func, reshard tensor from " + << dist_tensor->dist_attr() << " to " << dist_attr; + auto *func = phi::distributed::ChooseProperReshardFunction( + *dist_tensor, dist_attr); + dist_out_ptr = func->Eval(dev_ctx, *dist_tensor, dist_attr); + } else { + dist_out_ptr = + std::static_pointer_cast( + tensor_in); + } + } return paddle::Tensor(dist_out_ptr); } else { PADDLE_THROW(phi::errors::InvalidArgument( "The input tensor of shard function should be " "``phi::distributed::DistTensor``. " "However it's %s", - typeid(*tensor.impl().get()).name())); + typeid(tensor.impl().get()).name())); } }, py::return_value_policy::reference); diff --git a/python/paddle/distributed/auto_parallel/api.py b/python/paddle/distributed/auto_parallel/api.py index 5cdfe59301265..b1c4ee71b0454 100644 --- a/python/paddle/distributed/auto_parallel/api.py +++ b/python/paddle/distributed/auto_parallel/api.py @@ -178,7 +178,7 @@ def reshard(dist_tensor, dist_attr): Reshard a distributed ``paddle.Tensor`` with given distributed attributes. Args: - data(Tensor): the distributed tensor to be resharded. + dist_tensor(Tensor): the distributed tensor to be resharded. dist_attr(paddle.distributed.DistAttr): Specify how tensors are distributed or sliced on ProcessMesh. Returns: @@ -209,4 +209,10 @@ def reshard(dist_tensor, dist_attr): print(out_d_tensor) """ - return paddle.base.core.reshard(dist_tensor, dist_attr) + if paddle.framework.in_dynamic_mode: + return paddle.base.core.reshard(dist_tensor, dist_attr) + else: + # TODO(GhostScreaming): Support static DistTensor later. + raise RuntimeError( + "paddle.dist.reshard only support dynamic graph now. It will be supported for static graph later." + ) diff --git a/test/auto_parallel/reshard_api.py b/test/auto_parallel/reshard_api.py index 8d3d8d8144ab0..c77cb9b773cac 100644 --- a/test/auto_parallel/reshard_api.py +++ b/test/auto_parallel/reshard_api.py @@ -26,23 +26,13 @@ def __init__(self): self._dtype = os.getenv("dtype") self._seeds = eval(os.getenv("seeds")) self._backend = os.getenv("backend") + self._shard = eval(os.getenv("shard")) self._mesh = dist.ProcessMesh([0, 1], dim_names=["x"]) def run_test_cases(self): if self._backend == "cpu": paddle.set_device("cpu") self.test_case_p_to_r() - self.test_case_r_to_p() - - for shape in [[5, 7], [10, 20]]: - for shard in [0, 1]: - self._shape = shape - self._shard = shard - self.test_case_r_to_s() - - for shard in [0, 1]: - self._shard = shard - self.test_case_s_to_r() def test_case_p_to_r(self): a = paddle.ones(self._shape) @@ -63,35 +53,6 @@ def test_case_p_to_r(self): assert np.equal(output_tensor.shape, input_tensor.shape).all() np.testing.assert_equal(output_tensor._local_value().numpy(), a.numpy()) - def test_case_r_to_p(self): - a = paddle.ones(self._shape) - in_shard_specs = [None for i in range(len(self._shape))] - out_shard_specs = [None for i in range(len(self._shape))] - dist_attr = dist.DistAttr( - mesh=self._mesh, sharding_specs=in_shard_specs - ) - out_dist_attr = dist.DistAttr( - mesh=self._mesh, sharding_specs=out_shard_specs - ) - out_dist_attr._set_partial_dims([0]) - - input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) - output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) - - if dist.get_rank() == 0: - np.testing.assert_equal( - output_tensor._local_value().numpy(), input_tensor.numpy() - ) - else: - zeros = paddle.zeros(self._shape) - np.testing.assert_equal( - output_tensor._local_value().numpy(), zeros.numpy() - ) - assert np.equal(output_tensor.shape, input_tensor.shape).all() - assert np.equal( - output_tensor._local_shape, input_tensor._local_shape - ).all() - def test_case_r_to_s(self): a = paddle.ones(self._shape) in_shard_specs = [None for i in range(len(self._shape))] @@ -121,24 +82,6 @@ def test_case_r_to_s(self): assert np.equal(output_tensor.shape, input_tensor.shape).all() assert np.equal(output_tensor._local_shape, out_shape).all() - def test_case_s_to_r(self): - a = paddle.ones(self._shape) - in_shard_specs = [None for i in range(len(self._shape))] - in_shard_specs[self._shard] = "x" - out_shard_specs = [None for i in range(len(self._shape))] - - dist_attr = dist.DistAttr( - mesh=self._mesh, sharding_specs=in_shard_specs - ) - out_dist_attr = dist.DistAttr( - mesh=self._mesh, sharding_specs=out_shard_specs - ) - - input_tensor = dist.shard_tensor(a, dist_attr=dist_attr) - output_tensor = dist.reshard(input_tensor, dist_attr=out_dist_attr) - assert np.equal(output_tensor.shape, output_tensor._local_shape).all() - assert np.equal(output_tensor.shape, input_tensor.shape).all() - if __name__ == '__main__': TestReshardAPI().run_test_cases() diff --git a/test/auto_parallel/test_reshard_api.py b/test/auto_parallel/test_reshard_api.py index b4b8206f8fc29..bdefc9c90fe79 100644 --- a/test/auto_parallel/test_reshard_api.py +++ b/test/auto_parallel/test_reshard_api.py @@ -24,6 +24,7 @@ def setUp(self): "shape": "(10, 20)", "dtype": "float32", "seeds": str(self._seeds), + "shard": "0", } self._changeable_envs = { "backend": ["cpu", "gpu"], From c7973cb662c0f3356b596577af3ee618d49c9826 Mon Sep 17 00:00:00 2001 From: GhostScreaming Date: Fri, 15 Sep 2023 10:43:58 +0800 Subject: [PATCH 3/5] Fix problem of in_dynamic_mode --- python/paddle/distributed/auto_parallel/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/distributed/auto_parallel/api.py b/python/paddle/distributed/auto_parallel/api.py index b1c4ee71b0454..e1615bac183bb 100644 --- a/python/paddle/distributed/auto_parallel/api.py +++ b/python/paddle/distributed/auto_parallel/api.py @@ -209,7 +209,7 @@ def reshard(dist_tensor, dist_attr): print(out_d_tensor) """ - if paddle.framework.in_dynamic_mode: + if paddle.framework.in_dynamic_mode(): return paddle.base.core.reshard(dist_tensor, dist_attr) else: # TODO(GhostScreaming): Support static DistTensor later. From 32eca4678982650de60a2a6fee43b684fe15cb83 Mon Sep 17 00:00:00 2001 From: GhostScreaming Date: Mon, 18 Sep 2023 14:02:04 +0800 Subject: [PATCH 4/5] Fix some problems according to review comments. --- python/paddle/distributed/__init__.py | 1 + test/auto_parallel/CMakeLists.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/python/paddle/distributed/__init__.py b/python/paddle/distributed/__init__.py index 996a4376819b3..cf7c20eacaee1 100644 --- a/python/paddle/distributed/__init__.py +++ b/python/paddle/distributed/__init__.py @@ -129,4 +129,5 @@ "DistAttr", "shard_tensor", "dtensor_from_fn", + "reshard", ] diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index 8857dc530f947..c8ec8231cc2bd 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -186,6 +186,8 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_engine_save_load MODULES test_engine_save_load) py_test_modules(test_rule_based_tuner MODULES test_rule_based_tuner) py_test_modules(test_dist_tensor MODULES test_dist_tensor) + py_test_modules(test_reshard_api MODULES test_reshard_api) + py_test_modules(test_api_dist_branch MODULES test_api_dist_branch) py_test_modules(test_shard_tensor_api MODULES test_shard_tensor_api) py_test_modules(test_cost_interface MODULES test_cost_interface) # End of unittests WITH single card WITHOUT timeout From 5c33d9ec3b679d0f06b4b8b3d34018d827bd79c4 Mon Sep 17 00:00:00 2001 From: GhostScreaming Date: Mon, 18 Sep 2023 16:01:22 +0800 Subject: [PATCH 5/5] Set test_reshard_api as multi-cards testcase. And set its timeout. --- test/auto_parallel/CMakeLists.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/auto_parallel/CMakeLists.txt b/test/auto_parallel/CMakeLists.txt index c8ec8231cc2bd..ff23fab4977e1 100644 --- a/test/auto_parallel/CMakeLists.txt +++ b/test/auto_parallel/CMakeLists.txt @@ -85,6 +85,9 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_pass_quantization MODULES test_pass_quantization) set_tests_properties(test_pass_quantization PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 60) + py_test_modules(test_reshard_api MODULES test_reshard_api) + set_tests_properties(test_reshard_api PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" + TIMEOUT 150) py_test_modules(test_reshard_s_to_r MODULES test_reshard_s_to_r) set_tests_properties(test_reshard_s_to_r PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 100) @@ -186,7 +189,6 @@ if(WITH_DISTRIBUTE AND WITH_GPU) py_test_modules(test_engine_save_load MODULES test_engine_save_load) py_test_modules(test_rule_based_tuner MODULES test_rule_based_tuner) py_test_modules(test_dist_tensor MODULES test_dist_tensor) - py_test_modules(test_reshard_api MODULES test_reshard_api) py_test_modules(test_api_dist_branch MODULES test_api_dist_branch) py_test_modules(test_shard_tensor_api MODULES test_shard_tensor_api) py_test_modules(test_cost_interface MODULES test_cost_interface)