Skip to content

Commit

Permalink
[XPU] support dygraph unittest for collective OPs (PaddlePaddle#59273)
Browse files Browse the repository at this point in the history
* [XPU] support dygraph unittest for collective OPs

* update license && code style polish

* code style

* skip collective test if device_cnt < 2

* code style
  • Loading branch information
XiaociZhang authored and SecretXV committed Nov 28, 2023
1 parent f8d25c5 commit fb4aa41
Show file tree
Hide file tree
Showing 22 changed files with 1,505 additions and 222 deletions.
4 changes: 4 additions & 0 deletions paddle/fluid/pybind/pybind.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,10 @@ All parameter, weight, gradient are variables in Paddle.
m.def("get_ipu_device_count", platform::GetIPUDeviceCount);
#endif

#ifdef PADDLE_WITH_XPU
m.def("get_xpu_device_count", platform::GetXPUDeviceCount);
#endif

py::enum_<platform::TracerOption>(m, "TracerOption", py::arithmetic())
.value("kDefault", platform::TracerOption::kDefault)
.value("kOpDetail", platform::TracerOption::kOpDetail)
Expand Down
7 changes: 2 additions & 5 deletions paddle/phi/backends/xpu/xpu2_op_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,9 @@ XPUOpMap& get_kl2_ops() {
{"c_allgather",
XPUKernelSet({phi::DataType::FLOAT16,
phi::DataType::FLOAT32,
phi::DataType::FLOAT64,
phi::DataType::INT32,
phi::DataType::INT64,
phi::DataType::UINT8,
phi::DataType::BOOL})},
phi::DataType::UINT8})},
{"c_allreduce_max",
XPUKernelSet({phi::DataType::FLOAT16,
phi::DataType::FLOAT32,
Expand Down Expand Up @@ -150,8 +148,7 @@ XPUOpMap& get_kl2_ops() {
{"c_softmax_with_cross_entropy", XPUKernelSet({phi::DataType::FLOAT32})},
{"c_softmax_with_cross_entropy_grad",
XPUKernelSet({phi::DataType::FLOAT32})},
{"c_reduce_sum",
XPUKernelSet({phi::DataType::FLOAT16, phi::DataType::FLOAT32})},
{"c_reduce_sum", XPUKernelSet({phi::DataType::FLOAT32})},
{"c_split",
XPUKernelSet({phi::DataType::FLOAT16,
phi::DataType::FLOAT32,
Expand Down
3 changes: 1 addition & 2 deletions paddle/phi/backends/xpu/xpu3_op_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ XPUOpMap& get_kl3_ops() {
{"c_softmax_with_cross_entropy", XPUKernelSet({phi::DataType::FLOAT32})},
{"c_softmax_with_cross_entropy_grad",
XPUKernelSet({phi::DataType::FLOAT32})},
{"c_reduce_sum",
XPUKernelSet({phi::DataType::FLOAT16, phi::DataType::FLOAT32})},
{"c_reduce_sum", XPUKernelSet({phi::DataType::FLOAT32})},
{"c_split",
XPUKernelSet({phi::DataType::FLOAT16,
phi::DataType::FLOAT32,
Expand Down
25 changes: 25 additions & 0 deletions python/paddle/device/xpu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,28 @@ def synchronize(device=None):
raise ValueError("device type must be int or paddle.XPUPlace")

return core._xpu_device_synchronize(device_id)


def device_count():
'''
Return the number of XPUs available.
Returns:
int: the number of XPUs available.
Examples:
.. code-block:: python
>>> import paddle
>>> paddle.device.xpu.device_count()
'''

num_xpus = (
core.get_xpu_device_count()
if hasattr(core, 'get_xpu_device_count')
else 0
)

return num_xpus
144 changes: 144 additions & 0 deletions test/xpu/collective_allgather_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import test_collective_api_base as test_base

import paddle
import paddle.distributed as dist
from paddle import base, framework
from paddle.base import data_feeder

paddle.enable_static()


def all_gather_new(tensor_list, tensor, group=None):
op_type = 'all_gather'
helper = framework.LayerHelper(op_type, **locals())
out = helper.create_variable_for_type_inference(dtype=tensor.dtype)
for elem in tensor_list:
data_feeder.check_variable_and_dtype(
elem,
'tensor_list',
[
'float16',
'float32',
'float64',
'int32',
'int64',
],
op_type,
)
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
[
'float16',
'float32',
'float64',
'int32',
'int64',
],
op_type,
)

ring_id = 0 if group is None else group.id
nranks = dist.get_world_size()
helper.append_op(
type=op_type,
inputs={'x': [tensor]},
outputs={'out': [out]},
attrs={
'ring_id': ring_id,
'nranks': nranks,
},
)
tensor_list.clear()
tensor_list.extend(paddle.split(out, nranks, 0))


class TestCollectiveAllgatherAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, dtype="float32"):
with base.program_guard(main_prog, startup_program):
tensor_list = []
tindata = paddle.static.data(
name="tindata", shape=[10, 1000], dtype=dtype
)
paddle.distributed.all_gather(tensor_list, tindata)
return tensor_list

def get_model_new(
self, main_prog, startup_program, rank, dtype=None, reduce_type=None
):
with base.program_guard(main_prog, startup_program):
tensor_list = []
tindata = paddle.static.data(
name="tindata", shape=[10, 1000], dtype=dtype
)
all_gather_new(tensor_list, tindata)
return tensor_list

def run_trainer(self, args):
train_prog = base.Program()
startup_prog = base.Program()
endpoints = args["endpoints"].split(",")
rank = args["trainerid"]
current_endpoint = args["currentendpoint"]
nranks = 2
if args["use_comm_context"] or args["dynamic_static_unified_comm"]:
paddle.distributed.collective._init_parallel_env(args["backend"])
else:
paddle.distributed.init_parallel_env()
if args['backend'] == 'nccl':
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = base.CUDAPlace(
device_id
) # if args.use_gpu else base.CPUPlace()
elif args['backend'] == 'bkcl':
device_id = int(os.getenv("FLAGS_selected_xpus", "0"))
place = base.XPUPlace(device_id)
else:
place = base.CPUPlace()
indata = test_base.create_test_data(
shape=(10, 1000), dtype=args["dtype"], seed=os.getpid()
)
assert (
args['static_mode'] == 1
), "collective_allgather_api only support static graph mode"
result = (
self.get_model_new(
train_prog, startup_prog, rank, dtype=args["dtype"]
)
if args["use_comm_context"]
else self.get_model(
train_prog, startup_prog, rank, dtype=args["dtype"]
)
)
exe = base.Executor(place)
exe.run(startup_prog)
fetch_list = []
for elem in result:
fetch_list.append(elem.name)
out = exe.run(
train_prog, feed={'tindata': indata}, fetch_list=fetch_list
)
test_base.dump_output(out)


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllgatherAPI, "allgather")
43 changes: 43 additions & 0 deletions test/xpu/collective_allgather_api_dygraph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import test_collective_api_base as test_base

import paddle
import paddle.distributed as dist
from paddle import base


class TestCollectiveAllgatherAPI(test_base.TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0

def get_model(self, main_prog, startup_program, rank, indata=None):
with base.program_guard(main_prog, startup_program):
tensor_list = []
# NOTE: this is a hack relying on an undocumented behavior that `to_tensor` uses uint16 to replace bfloat16
if indata.dtype == "bfloat16":
tindata = paddle.to_tensor(indata, "float32").cast("uint16")
dist.all_gather(tensor_list, tindata)
return [
tensor.cast("float32").numpy() for tensor in tensor_list
]
else:
tindata = paddle.to_tensor(indata)
dist.all_gather(tensor_list, tindata)
return [tensor.numpy() for tensor in tensor_list]


if __name__ == "__main__":
test_base.runtime_main(TestCollectiveAllgatherAPI, "allgather")
59 changes: 0 additions & 59 deletions test/xpu/collective_allgather_op_xpu.py

This file was deleted.

Loading

0 comments on commit fb4aa41

Please sign in to comment.