Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIO File Offsets #6641

Merged
merged 39 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b2848a5
file offset for gds
Aug 23, 2024
63a278f
update thread cxt for gds handle
Aug 23, 2024
99896b6
toward cpu opp w/offset
Aug 28, 2024
7b9d26b
offset updates
jomayeri Sep 12, 2024
33fe0dd
update test api
jomayeri Sep 23, 2024
92e3502
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Sep 23, 2024
b5ad64f
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Oct 1, 2024
4f212b2
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Oct 8, 2024
cb40d94
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Oct 10, 2024
f872b53
mid compile test
jomayeri Oct 10, 2024
4820710
fixing compil error
jomayeri Oct 10, 2024
43bba31
offset into swap_tensor
jomayeri Oct 11, 2024
d1be6e9
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Oct 18, 2024
9cd8bb0
adding all the offsets
jomayeri Oct 18, 2024
3d89bf3
fixing compiles
jomayeri Oct 18, 2024
1400c3f
pytest additions
jomayeri Oct 19, 2024
b8da1cc
fix formatting
jomayeri Oct 19, 2024
461181e
Merge branch 'master' into jomayeri/aio-file-offset
tohtana Oct 22, 2024
8b13dea
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Oct 28, 2024
441c7e3
removing commented code
jomayeri Oct 28, 2024
4daa101
Merge branch 'jomayeri/aio-file-offset' of github.com:microsoft/DeepS…
jomayeri Oct 28, 2024
5aa4e2a
Merge branch 'master' into jomayeri/aio-file-offset
tjruwase Oct 29, 2024
dd8fe54
Merge branch 'master' into jomayeri/aio-file-offset
tjruwase Oct 30, 2024
988be5a
moving offset to last argument
jomayeri Oct 30, 2024
b7bef24
Merge branch 'jomayeri/aio-file-offset' of github.com:microsoft/DeepS…
jomayeri Oct 30, 2024
43a5659
Merge branch 'master' into jomayeri/aio-file-offset
loadams Oct 30, 2024
7454159
moving offset argumetn
jomayeri Oct 30, 2024
26dc7d0
Merge branch 'jomayeri/aio-file-offset' of github.com:microsoft/DeepS…
jomayeri Oct 30, 2024
da4ef46
Merge branch 'master' into jomayeri/aio-file-offset
tjruwase Oct 31, 2024
29f0048
Merge branch 'master' into jomayeri/aio-file-offset
tjruwase Nov 1, 2024
0619528
Merge branch 'master' into jomayeri/aio-file-offset
loadams Nov 4, 2024
6d0a89e
skipping pin mem when not cuda
jomayeri Nov 4, 2024
25f94d7
formatting
jomayeri Nov 4, 2024
2f5446a
Merge branch 'master' into jomayeri/aio-file-offset
jomayeri Nov 5, 2024
58bc018
Merge branch 'master' into jomayeri/aio-file-offset
loadams Nov 6, 2024
6ff3b1f
Merge branch 'master' into jomayeri/aio-file-offset
loadams Nov 8, 2024
00bd284
Merge branch 'master' into jomayeri/aio-file-offset
tjruwase Nov 12, 2024
b4beedc
Merge branch 'master' into jomayeri/aio-file-offset
loadams Nov 12, 2024
206df0f
Merge branch 'master' into jomayeri/aio-file-offset
loadams Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions accelerator/cpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def device_count(self):
# In flat mode, HBM is in separate NUMA node with no cores on this node.
# Ignore these NUMA nodes with no cores.
numa_core_lists = get_numa_cores()
if not numa_core_lists:
return 1
numa_count = 0
prev_core_list = []
for core_list in numa_core_lists:
Expand Down
18 changes: 12 additions & 6 deletions csrc/aio/common/deepspeed_aio_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ const int c_io_queue_depth = 8;

io_xfer_ctxt::io_xfer_ctxt(const int fd,
const int64_t file_offset,
const int64_t buffer_offset,
const int64_t num_bytes,
const void* buffer)
: _fd(fd), _base_offset(file_offset), _mem_buffer(buffer), _num_bytes(num_bytes)
: _fd(fd),
_file_base_offset(file_offset),
_buffer_base_offset(buffer_offset),
_mem_buffer(buffer),
_num_bytes(num_bytes)
{
}

Expand All @@ -41,9 +46,10 @@ void io_prep_context::prep_iocbs(const int n_iocbs,
assert(static_cast<size_t>(n_iocbs) <= _iocbs->size());
for (auto i = 0; i < n_iocbs; ++i) {
const auto shift = i * _block_size;
const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_base_offset + shift;
const auto xfer_offset = _xfer_ctxt->_base_offset + start_offset + shift;
const auto xfer_buffer = (char*)start_buffer + _xfer_ctxt->_buffer_base_offset + shift;
const auto xfer_offset = _xfer_ctxt->_file_base_offset + start_offset + shift;
auto byte_count = _block_size;

if ((shift + _block_size) > num_bytes) { byte_count = num_bytes - shift; }

if (_read_op) {
Expand Down Expand Up @@ -79,10 +85,10 @@ int io_prep_generator::prep_iocbs(const int n_iocbs, std::vector<struct iocb*>*

auto actual_n_iocbs = min(static_cast<int64_t>(n_iocbs), _remaining_io_blocks);
for (auto i = 0; i < actual_n_iocbs; ++i, ++_next_iocb_index) {
const auto xfer_offset = _xfer_ctxt->_base_offset + (_next_iocb_index * _block_size);
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + xfer_offset;
const auto xfer_buffer = (char*)_xfer_ctxt->_mem_buffer + _xfer_ctxt->_buffer_base_offset +
(_next_iocb_index * _block_size);
const auto xfer_offset = _xfer_ctxt->_file_base_offset + (_next_iocb_index * _block_size);
const auto num_bytes = min(static_cast<int64_t>(_block_size), _remaining_bytes);

if (_read_op) {
io_prep_pread(iocbs->at(i), _xfer_ctxt->_fd, xfer_buffer, num_bytes, xfer_offset);
} else {
Expand Down
4 changes: 3 additions & 1 deletion csrc/aio/common/deepspeed_aio_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices.

struct io_xfer_ctxt {
const int _fd;
const int64_t _base_offset;
const int64_t _file_base_offset;
const int64_t _buffer_base_offset;
const void* _mem_buffer;
const int64_t _num_bytes;

io_xfer_ctxt(const int fd,
const int64_t file_offset,
const int64_t buffer_offset,
const int64_t num_bytes,
const void* buffer);
};
Expand Down
6 changes: 4 additions & 2 deletions csrc/aio/py_lib/deepspeed_aio_op_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ io_op_desc_t::io_op_desc_t(const bool read_op,
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
const bool validate,
const int64_t file_offset)
: _read_op(read_op),
_buffer(buffer),
_fd(fd),
_filename(filename),
_file_num_bytes(file_num_bytes),
_file_offset(file_offset),
_intra_op_parallelism(intra_op_parallelism),
_num_bytes_per_thread(file_num_bytes / intra_op_parallelism),
_num_bytes_per_thread(static_cast<int64_t>(buffer.nbytes()) / intra_op_parallelism),
_validate(validate)
{
}
Expand Down
4 changes: 3 additions & 1 deletion csrc/aio/py_lib/deepspeed_aio_op_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ struct io_op_desc_t {
const int64_t _num_bytes_per_thread;
torch::Tensor _contiguous_buffer;
const bool _validate;
const int64_t _file_offset;

io_op_desc_t(const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);
const bool validate,
const int64_t file_offset);

virtual void run(const int tid,
std::unique_ptr<aio_context>& aio_ctxt,
Expand Down
19 changes: 14 additions & 5 deletions csrc/aio/py_lib/deepspeed_cpu_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ cpu_op_desc_t::cpu_op_desc_t(
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate)
: io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, intra_op_parallelism, validate),
const bool validate,
const int64_t file_offset)
: io_op_desc_t(read_op,
buffer,
fd,
filename,
file_num_bytes,
intra_op_parallelism,
validate,
file_offset),
_cpu_buffer(buffer),
_pinned_tensor_mgr(pinned_tensor_mgr),
_is_managed_bounce_buffer(false)
Expand Down Expand Up @@ -66,10 +74,11 @@ void cpu_op_desc_t::run(const int tid,
deepspeed_aio_config_t* aio_config)
{
assert(tid < _intra_op_parallelism);
const auto base_offset = _num_bytes_per_thread * tid;
const auto buffer_base_offset = _num_bytes_per_thread * tid;
const auto file_base_offset = _file_offset + (_num_bytes_per_thread * tid);

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(_fd, base_offset, _num_bytes_per_thread, data_ptr()));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(
_fd, file_base_offset, buffer_base_offset, _num_bytes_per_thread, data_ptr()));

if (aio_config->_overlap_events) {
do_aio_operation_overlap(_read_op, aio_ctxt, xfer_ctxt, aio_config, nullptr);
Expand Down
3 changes: 2 additions & 1 deletion csrc/aio/py_lib/deepspeed_cpu_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ struct cpu_op_desc_t : io_op_desc_t {
const char* filename,
const int64_t file_num_bytes,
const int intra_op_parallelism,
const bool validate);
const bool validate,
const int64_t file_offset);

void run(const int tid,
std::unique_ptr<aio_context>& aio_ctxt,
Expand Down
7 changes: 5 additions & 2 deletions csrc/aio/py_lib/deepspeed_py_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ int deepspeed_py_aio_write(const torch::Tensor& buffer,

auto write_buffer = (char*)buffer.data_ptr();
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer));

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_write_bytes, write_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));

if (config._overlap_events) {
Expand Down Expand Up @@ -97,7 +99,8 @@ int deepspeed_py_aio_read(torch::Tensor& buffer,
auto read_buffer = (char*)buffer.data_ptr();
assert(static_cast<int64_t>(buffer.nbytes()) == num_file_bytes);

std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, 0, 0, num_file_bytes, read_buffer));
std::unique_ptr<aio_context> aio_ctxt(new aio_context(config._block_size, config._queue_depth));

if (config._overlap_events) {
Expand Down
65 changes: 41 additions & 24 deletions csrc/aio/py_lib/deepspeed_py_io_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_e

const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intra_op_parallelism; }

int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate)
int deepspeed_io_handle_t::read(torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset)
{
const auto start_time = std::chrono::high_resolution_clock::now();

Expand All @@ -76,7 +79,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con
if (fd == -1) { return -1; }

auto read_buffer = (char*)buffer.data_ptr();
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_file_bytes, read_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, file_offset, 0, num_file_bytes, read_buffer));

if (_aio_config._overlap_events) {
do_aio_operation_overlap(true, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
Expand All @@ -98,7 +102,8 @@ int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, con

int deepspeed_io_handle_t::write(const torch::Tensor& buffer,
const char* filename,
const bool validate)
const bool validate,
const int64_t file_offset)
{
assert(_aio_ctxt);

Expand All @@ -109,7 +114,8 @@ int deepspeed_io_handle_t::write(const torch::Tensor& buffer,

auto write_buffer = (char*)buffer.data_ptr();
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(new io_xfer_ctxt(fd, 0, num_write_bytes, write_buffer));
std::unique_ptr<io_xfer_ctxt> xfer_ctxt(
new io_xfer_ctxt(fd, file_offset, 0, num_write_bytes, write_buffer));

if (_aio_config._overlap_events) {
do_aio_operation_overlap(false, _aio_ctxt, xfer_ctxt, &_aio_config, nullptr);
Expand Down Expand Up @@ -206,7 +212,8 @@ std::shared_ptr<struct io_op_desc_t> deepspeed_io_handle_t::_create_io_op_desc(
const int fd,
const char* filename,
const int64_t file_num_bytes,
const bool validate)
const bool validate,
const int64_t file_offset)
{
return std::make_shared<cpu_op_desc_t>(read_op,
buffer,
Expand All @@ -215,34 +222,34 @@ std::shared_ptr<struct io_op_desc_t> deepspeed_io_handle_t::_create_io_op_desc(
filename,
file_num_bytes,
_intra_op_parallelism,
validate);
validate,
file_offset);
}

int deepspeed_io_handle_t::pread(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async)
const bool async,
const int64_t file_offset)
{
int64_t num_file_bytes;
if (-1 == get_file_size(filename, num_file_bytes)) {
const auto error_code = errno;
report_file_error(filename, " fstat for read", error_code);
return -1;
}

// buffer can exceed file size to enable 4k alignment
const auto buffer_bytes = static_cast<int64_t>(buffer.nbytes());
if (buffer_bytes != num_file_bytes) {
std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes
<< " != " << num_file_bytes << std::endl;
}
assert(buffer_bytes == num_file_bytes);
assert((num_file_bytes % _intra_op_parallelism) == 0);

if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; }
if (!_is_valid_parallel_aio_op(true, buffer_bytes)) { return -1; }

const auto fd = open_file(filename, true);
if (fd == -1) { return -1; }

auto scheduled_op = _create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate);
auto scheduled_op =
_create_io_op_desc(true, buffer, fd, filename, num_file_bytes, validate, file_offset);

_schedule_aio_work(scheduled_op);

Expand All @@ -254,7 +261,8 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer,
int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async)
const bool async,
const int64_t file_offset)
{
const auto num_write_bytes = static_cast<int64_t>(buffer.nbytes());
assert((num_write_bytes % _intra_op_parallelism) == 0);
Expand All @@ -264,7 +272,8 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
const auto fd = open_file(filename, false);
if (fd == -1) { return -1; }

auto scheduled_op = _create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate);
auto scheduled_op =
_create_io_op_desc(false, buffer, fd, filename, num_write_bytes, validate, file_offset);

_schedule_aio_work(scheduled_op);

Expand All @@ -273,24 +282,32 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer,
return wait();
}

int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::sync_pread(torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pread(buffer, filename, false, false);
return pread(buffer, filename, false, false, file_offset);
}

int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::sync_pwrite(const torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pwrite(buffer, filename, false, false);
return pwrite(buffer, filename, false, false, file_offset);
}

int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::async_pread(torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pread(buffer, filename, false, true);
return pread(buffer, filename, false, true, file_offset);
}

int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* filename)
int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer,
const char* filename,
const int64_t file_offset)
{
return pwrite(buffer, filename, false, true);
return pwrite(buffer, filename, false, true, file_offset);
}

at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const int64_t num_elem,
Expand Down
27 changes: 18 additions & 9 deletions csrc/aio/py_lib/deepspeed_py_io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,35 @@ struct deepspeed_io_handle_t {
const bool get_overlap_events() const;
const int get_intra_op_parallelism() const;

int read(torch::Tensor& buffer, const char* filename, const bool validate);
int read(torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset);

int write(const torch::Tensor& buffer, const char* filename, const bool validate);
int write(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const int64_t file_offset);

int pread(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async);
const bool async,
const int64_t file_offset);

int pwrite(const torch::Tensor& buffer,
const char* filename,
const bool validate,
const bool async);
const bool async,
const int64_t file_offset);

int sync_pread(torch::Tensor& buffer, const char* filename);
int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int sync_pwrite(const torch::Tensor& buffer, const char* filename);
int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int async_pread(torch::Tensor& buffer, const char* filename);
int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);

int async_pwrite(const torch::Tensor& buffer, const char* filename);
int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);

// TODO: Make API's args to be shape and dtype.
torch::Tensor new_cpu_locked_tensor(const int64_t num_elem,
Expand All @@ -81,5 +89,6 @@ struct deepspeed_io_handle_t {
const int fd,
const char* filename,
const int64_t file_num_bytes,
const bool validate);
const bool validate,
const int64_t file_offset);
};
Loading