Skip to content

Commit

Permalink
Merge pull request #9 from iraikov/feature/chunked_collcomm
Browse files Browse the repository at this point in the history
Chunked collective communication
  • Loading branch information
iraikov authored Jan 8, 2025
2 parents 294f192 + 5a527b8 commit d06e26a
Show file tree
Hide file tree
Showing 25 changed files with 634 additions and 301 deletions.
14 changes: 7 additions & 7 deletions include/cell/cell_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ namespace neuroh5

vector<CELL_IDX_T> gid_recvbuf;
{
vector<int> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
vector<size_t> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
idx_sendcounts[io_dests[rank]] = local_index_vector.size();

throw_assert(mpi::alltoallv_vector<CELL_IDX_T>(comm, MPI_CELL_IDX_T,
Expand All @@ -478,7 +478,7 @@ namespace neuroh5
vector<ATTR_PTR_T> attr_ptr;
{
vector<ATTR_PTR_T> attr_size_recvbuf;
vector<int> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
vector<size_t> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size();

throw_assert(mpi::alltoallv_vector<ATTR_PTR_T>(comm, MPI_ATTR_PTR_T,
Expand Down Expand Up @@ -522,7 +522,7 @@ namespace neuroh5
local_value_vector.insert(local_value_vector.end(),v.begin(),v.end());
}

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
vector<size_t> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;


Expand Down Expand Up @@ -902,7 +902,7 @@ namespace neuroh5

vector<CELL_IDX_T> gid_recvbuf;
{
vector<int> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
vector<size_t> idx_sendcounts(size, 0), idx_sdispls(size, 0), idx_recvcounts(size, 0), idx_rdispls(size, 0);
idx_sendcounts[io_dests[rank]] = local_index_vector.size();

throw_assert(mpi::alltoallv_vector<CELL_IDX_T>(comm, MPI_CELL_IDX_T,
Expand All @@ -914,7 +914,7 @@ namespace neuroh5
vector<ATTR_PTR_T> attr_ptr;
{
vector<ATTR_PTR_T> attr_size_recvbuf;
vector<int> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
vector<size_t> attr_size_sendcounts(size, 0), attr_size_sdispls(size, 0), attr_size_recvcounts(size, 0), attr_size_rdispls(size, 0);
attr_size_sendcounts[io_dests[rank]] = local_attr_size_vector.size();

throw_assert(mpi::alltoallv_vector<ATTR_PTR_T>(comm, MPI_ATTR_PTR_T,
Expand All @@ -925,7 +925,7 @@ namespace neuroh5
if ((is_io_rank) && (attr_size_recvbuf.size() > 0))
{
ATTR_PTR_T attr_ptr_offset = 0;
for (size_t s=0; s<ssize; s++)
for (size_t s=0; s<size; s++)
{
int count = attr_size_recvcounts[s];
for (size_t i=attr_size_rdispls[s]; i<attr_size_rdispls[s]+count; i++)
Expand Down Expand Up @@ -957,7 +957,7 @@ namespace neuroh5
local_value_vector.insert(local_value_vector.end(),v.begin(),v.end());
}

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
vector<size_t> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;


Expand Down
69 changes: 69 additions & 0 deletions include/data/chunk_info.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#ifndef CHUNK_INFO_HH
#define CHUNK_INFO_HH

#include <set>
#include <algorithm>

using namespace std;

namespace neuroh5
{
namespace data
{

// Constants for chunking
constexpr size_t CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin

template<typename T>
struct ChunkInfo {
std::vector<int> sendcounts;
std::vector<int> sdispls;
std::vector<int> recvcounts;
std::vector<int> rdispls;
size_t total_send_size;
size_t total_recv_size;
};


template<typename T>
ChunkInfo<T> calculate_chunk_sizes(
const std::vector<size_t>& full_sendcounts,
const std::vector<size_t>& full_sdispls,
const std::vector<size_t>& full_recvcounts,
const std::vector<size_t>& full_rdispls,
size_t chunk_start,
size_t chunk_size)
{
const size_t size = full_sendcounts.size();
ChunkInfo<T> chunk;
chunk.sendcounts.resize(size);
chunk.sdispls.resize(size);
chunk.recvcounts.resize(size);
chunk.rdispls.resize(size);

chunk.total_send_size = 0;
chunk.total_recv_size = 0;

for (size_t i = 0; i < size; ++i) {
// Calculate how much data to send in this chunk
size_t send_remaining = (chunk_start < full_sendcounts[i]) ?
full_sendcounts[i] - chunk_start : 0;
chunk.sendcounts[i] = static_cast<int>(std::min(send_remaining, chunk_size));
chunk.sdispls[i] = static_cast<int>(full_sdispls[i] + chunk_start);
chunk.total_send_size += chunk.sendcounts[i];

// Calculate how much data to receive in this chunk
size_t recv_remaining = (chunk_start < full_recvcounts[i]) ?
full_recvcounts[i] - chunk_start : 0;
chunk.recvcounts[i] = static_cast<int>(std::min(recv_remaining, chunk_size));
chunk.rdispls[i] = static_cast<int>(full_rdispls[i] + chunk_start);
chunk.total_recv_size += chunk.recvcounts[i];
}

return chunk;
}


}
}
#endif
10 changes: 5 additions & 5 deletions include/data/serialize_cell_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
/// Functions for serializing cell attributes.
///
/// Copyright (C) 2017-2020 Project Neuroh5.
/// Copyright (C) 2017-2024 Project Neuroh5.
//==============================================================================

#ifndef SERIALIZE_CELL_ATTRIBUTES_HH
Expand All @@ -27,14 +27,14 @@ namespace neuroh5
void serialize_rank_attr_map (const size_t num_ranks,
const size_t start_rank,
const map <rank_t, AttrMap>& rank_attr_map,
std::vector<int>& sendcounts,
std::vector<size_t>& sendcounts,
std::vector<char> &sendbuf,
std::vector<int> &sdispls);
std::vector<size_t> &sdispls);

void deserialize_rank_attr_map (const size_t num_ranks,
const std::vector<char> &recvbuf,
const std::vector<int>& recvcounts,
const std::vector<int>& rdispls,
const std::vector<size_t>& recvcounts,
const std::vector<size_t>& rdispls,
AttrMap& all_attr_map);


Expand Down
10 changes: 5 additions & 5 deletions include/data/serialize_edge.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
///
/// Functions for serializing edge data.
///
/// Copyright (C) 2017 Project Neuroh5.
/// Copyright (C) 2017-2024 Project Neuroh5.
//==============================================================================

#ifndef SERIALIZE_EDGE_HH
Expand All @@ -31,14 +31,14 @@ namespace neuroh5
const size_t start_rank,
const rank_edge_map_t& prj_rank_edge_map,
size_t &num_packed_edges,
vector<int>& sendcounts,
vector<size_t>& sendcounts,
vector<char> &sendbuf,
vector<int> &sdispls);
vector<size_t> &sdispls);

void deserialize_rank_edge_map (const size_t num_ranks,
const vector<char> &recvbuf,
const vector<int>& recvcounts,
const vector<int>& rdispls,
const vector<size_t>& recvcounts,
const vector<size_t>& rdispls,
edge_map_t& prj_edge_map,
size_t& num_unpacked_nodes,
size_t& num_unpacked_edges
Expand Down
12 changes: 6 additions & 6 deletions include/data/serialize_tree.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@ namespace neuroh5
void serialize_rank_tree_map (const size_t num_ranks,
const size_t start_rank,
const std::map <rank_t, std::map<CELL_IDX_T, neurotree_t> >& rank_tree_map,
std::vector<int>& sendcounts,
std::vector<size_t>& sendcounts,
std::vector<char> &sendbuf,
std::vector<int> &sdispls);
std::vector<size_t> &sdispls);

void deserialize_rank_tree_map (const size_t num_ranks,
const std::vector<char> &recvbuf,
const std::vector<int>& recvcounts,
const std::vector<int>& rdispls,
const std::vector<size_t>& recvcounts,
const std::vector<size_t>& rdispls,
std::map<CELL_IDX_T, neurotree_t> &all_tree_map
);

void deserialize_rank_tree_list (const size_t num_ranks,
const vector<char> &recvbuf,
const vector<int>& recvcounts,
const vector<int>& rdispls,
const vector<size_t>& recvcounts,
const vector<size_t>& rdispls,
forward_list<neurotree_t> &all_tree_list);
}
}
Expand Down
5 changes: 3 additions & 2 deletions include/graph/node_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ namespace neuroh5
value_offset = value_offset + v.size();
}
//attr_ptr.push_back(value_offset);

T dummy;
MPI_Datatype mpi_type = infer_mpi_datatype(dummy);

vector<int> value_sendcounts(size, 0), value_sdispls(size, 0), value_recvcounts(size, 0), value_rdispls(size, 0);
value_sendcounts[io_dests[rank]] = local_value_size;
Expand Down Expand Up @@ -241,8 +244,6 @@ namespace neuroh5
//assert(recvbuf_size > 0);
vector<T> value_recvbuf(value_recvbuf_size);

T dummy;
MPI_Datatype mpi_type = infer_mpi_datatype(dummy);
throw_assert(MPI_Alltoallv(&value_vector[0], &value_sendcounts[0], &value_sdispls[0], mpi_type,
&value_recvbuf[0], &value_recvcounts[0], &value_rdispls[0], mpi_type,
comm) == MPI_SUCCESS, "error in MPI_Alltoallv");
Expand Down
47 changes: 38 additions & 9 deletions include/hdf5/hdf5_edge_attributes.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace neuroh5
namespace hdf5
{

// Constants for chunking
constexpr size_t EDGE_ATTR_CHUNK_SIZE = 1ULL << 30; // 1GB chunks for safety margin

void size_edge_attributes
(
Expand Down Expand Up @@ -92,13 +94,15 @@ namespace neuroh5
attr_name,
current_value_size);

MPI_Request request;
size_t my_count = value.size();
std::vector<size_t> all_counts(size, 0);
throw_assert(MPI_Allgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1,
MPI_SIZE_T, comm) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Allgather");
throw_assert(MPI_Barrier(comm) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Barrier");
throw_assert(MPI_Iallgather(&my_count, 1, MPI_SIZE_T, &all_counts[0], 1,
MPI_SIZE_T, comm, &request) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Iallgather");

throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Wait");

// calculate the total dataset size and the offset of my piece
hsize_t local_value_start = current_value_size,
Expand Down Expand Up @@ -126,10 +130,35 @@ namespace neuroh5
string path = edge_attribute_path(src_pop_name, dst_pop_name,
attr_namespace, attr_name);

status = write<T> (file, path,
global_value_size, local_value_start, local_value_size,
mtype, value, wapl);

while (global_value_size - current_value_size > 0)
{
hsize_t local_write_size = std::min((hsize_t)EDGE_ATTR_CHUNK_SIZE, local_value_size);
std::vector<size_t> all_write_counts(size, 0);

throw_assert(MPI_Iallgather(&local_write_size, 1, MPI_SIZE_T, &all_write_counts[0], 1,
MPI_SIZE_T, comm, &request) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Iallgather");

throw_assert(MPI_Wait(&request, MPI_STATUS_IGNORE) == MPI_SUCCESS,
"append_edge_attribute: error in MPI_Wait");

status = write<T> (file, path,
global_value_size, local_value_start, local_write_size,
mtype, value, wapl);

if (local_value_size > 0)
{
local_value_size -= local_write_size;
local_value_start += local_write_size;
}


for (size_t p = 0; p < size; ++p)
{
current_value_size += (hsize_t) all_write_counts[p];
}

}
throw_assert(H5Pclose(wapl) >= 0, "error in H5Pclose");
}

Expand Down
Loading

0 comments on commit d06e26a

Please sign in to comment.