From d45cfd34551537ce6f8317504bd520d7a2a1a588 Mon Sep 17 00:00:00 2001 From: Liangliang Ma Date: Fri, 27 Sep 2024 04:39:59 +0800 Subject: [PATCH] [XPU] Support DeepNVMe new code structure (#6532) In DeepNVMe GDS update, many functions are changed into a more abstract way. Also added some files. These change break zero-infinity on XPU. To bring this feature back, we have this PR: 1. modify the aio opbuilder for new files. 2. Add custom cpu_op_desc_t for xpu users. (XPU don't handle buffer aligned here) --------- Co-authored-by: Olatunji Ruwase Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> --- csrc/xpu/aio/deepspeed_cpu_op.cpp | 51 +++++++++++++++++++++++++++++++ op_builder/xpu/async_io.py | 17 ++++++++--- 2 files changed, 63 insertions(+), 5 deletions(-) create mode 100644 csrc/xpu/aio/deepspeed_cpu_op.cpp diff --git a/csrc/xpu/aio/deepspeed_cpu_op.cpp b/csrc/xpu/aio/deepspeed_cpu_op.cpp new file mode 100644 index 000000000000..ee98c2d5cac2 --- /dev/null +++ b/csrc/xpu/aio/deepspeed_cpu_op.cpp @@ -0,0 +1,51 @@ +// Copyright (c) Microsoft Corporation. +// SPDX-License-Identifier: Apache-2.0 + +// DeepSpeed Team + +#include "deepspeed_cpu_op.h" + +using namespace std; + +cpu_op_desc_t::cpu_op_desc_t(const bool read_op, + const torch::Tensor& buffer, + const int fd, + const char* filename, + const long long int file_num_bytes, + const int num_threads, + const bool validate) + : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, num_threads, validate), + _cpu_buffer(buffer) +{ + // XPU don't handle buffer here. See XPU Accelerator pin_memory. + _contiguous_buffer = _cpu_buffer.contiguous(); +} + +char* cpu_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_ptr(); } + +void cpu_op_desc_t::finish() +{ + if (_read_op && _buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } +} + +void cpu_op_desc_t::validate() +{ + validate_aio_operation(_read_op, _filename.c_str(), data_ptr(), _file_num_bytes); +} + +void cpu_op_desc_t::run(const int tid, + std::unique_ptr& aio_ctxt, + deepspeed_aio_config_t* aio_config) +{ + assert(tid < _num_threads); + const auto base_offset = _num_bytes_per_thread * tid; + + std::unique_ptr xfer_ctxt( + new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr())); + + if (aio_config->_overlap_events) { + do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr); + } else { + do_aio_operation_sequential(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr); + } +} diff --git a/op_builder/xpu/async_io.py b/op_builder/xpu/async_io.py index 7ed527e016fa..6a6798eaeb9c 100644 --- a/op_builder/xpu/async_io.py +++ b/op_builder/xpu/async_io.py @@ -21,11 +21,18 @@ def absolute_name(self): def sources(self): return [ - 'csrc/aio/py_lib/deepspeed_py_copy.cpp', 'csrc/aio/py_lib/py_ds_aio.cpp', - 'csrc/aio/py_lib/deepspeed_py_aio.cpp', 'csrc/aio/py_lib/deepspeed_py_aio_handle.cpp', - 'csrc/aio/py_lib/deepspeed_aio_thread.cpp', 'csrc/aio/common/deepspeed_aio_utils.cpp', - 'csrc/aio/common/deepspeed_aio_common.cpp', 'csrc/aio/common/deepspeed_aio_types.cpp', - 'csrc/aio/py_lib/deepspeed_pin_tensor.cpp' + 'csrc/aio/py_lib/deepspeed_py_copy.cpp', + 'csrc/aio/py_lib/py_ds_aio.cpp', + 'csrc/aio/py_lib/deepspeed_py_aio.cpp', + 'csrc/aio/py_lib/deepspeed_py_aio_handle.cpp', + 'csrc/aio/py_lib/deepspeed_aio_thread.cpp', + 'csrc/aio/common/deepspeed_aio_utils.cpp', + 'csrc/aio/common/deepspeed_aio_common.cpp', + 'csrc/aio/common/deepspeed_aio_types.cpp', + 'csrc/aio/py_lib/deepspeed_pin_tensor.cpp', + 'csrc/aio/py_lib/deepspeed_py_io_handle.cpp', + 'csrc/xpu/aio/deepspeed_cpu_op.cpp', + 'csrc/aio/py_lib/deepspeed_aio_op_desc.cpp', ] def include_paths(self):