Skip to content

Commit

Permalink
Backport #3759 to master. (#4340)
Browse files Browse the repository at this point in the history
Organize the processes into groups so that the two steps of metadata aggregation has more or less the same number or participants. This replaces in-node aggregation in first step. The new strategy balances the size of metadata gathered in the two steps.
  • Loading branch information
pnorbert authored Sep 10, 2024
1 parent 4c05ba9 commit faa8a49
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
37 changes: 24 additions & 13 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,13 @@ void BP5Writer::EndStep()
MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous(
TSInfo.NewMetaMetaBlocks, {m}, {a}, {m_ThisTimestepDataSize}, {m_StartDataPos});

if (m_Aggregator->m_Comm.Size() > 1)
if (m_AggregatorMetadata.m_Comm.Size() > 1)
{ // level 1
m_Profiler.Start("ES_meta1_gather");
size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts = m_Aggregator->m_Comm.GatherValues(LocalSize, 0);
std::vector<size_t> RecvCounts = m_AggregatorMetadata.m_Comm.GatherValues(LocalSize, 0);
std::vector<char> RecvBuffer;
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -633,10 +633,10 @@ void BP5Writer::EndStep()
<< TotalSize << " bytes from aggregator group"
<< std::endl;*/
}
m_Aggregator->m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_AggregatorMetadata.m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_Profiler.Stop("ES_meta1_gather");
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
Expand All @@ -654,17 +654,17 @@ void BP5Writer::EndStep()
m_Profiler.Stop("ES_meta1");
m_Profiler.Start("ES_meta2");
// level 2
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<char> RecvBuffer;
std::vector<char> *buf;
std::vector<size_t> RecvCounts;
size_t LocalSize = MetaBuffer.size();
if (m_CommAggregators.Size() > 1)
if (m_CommMetadataAggregators.Size() > 1)
{
m_Profiler.Start("ES_meta2_gather");
RecvCounts = m_CommAggregators.GatherValues(LocalSize, 0);
if (m_CommAggregators.Rank() == 0)
RecvCounts = m_CommMetadataAggregators.GatherValues(LocalSize, 0);
if (m_CommMetadataAggregators.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -675,8 +675,8 @@ void BP5Writer::EndStep()
<< std::endl;*/
}

m_CommAggregators.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_CommMetadataAggregators.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
buf = &RecvBuffer;
m_Profiler.Stop("ES_meta2_gather");
}
Expand All @@ -686,7 +686,7 @@ void BP5Writer::EndStep()
RecvCounts.push_back(LocalSize);
}

if (m_CommAggregators.Rank() == 0)
if (m_CommMetadataAggregators.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
Expand Down Expand Up @@ -1072,6 +1072,17 @@ void BP5Writer::InitAggregator()
*/
int color = m_Aggregator->m_Comm.Rank();
m_CommAggregators = m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open");

/* Metadata aggregator for two-level metadata aggregation */
{
size_t n = static_cast<size_t>(m_Comm.Size());
size_t a = (int)floor(sqrt((double)n));
m_AggregatorMetadata.Init(a, a, m_Comm);
/* chain of rank 0s form the second level of aggregation */
int color = m_AggregatorMetadata.m_Comm.Rank();
m_CommMetadataAggregators =
m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open");
}
}

void BP5Writer::InitTransports()
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ class BP5Writer : public BP5Engine, public core::Engine
helper::Comm *DataWritingComm; // processes that write the same data file
// aggregators only (valid if m_Aggregator->m_Comm.Rank() == 0)
helper::Comm m_CommAggregators;

/* two-level metadata aggregation */
aggregator::MPIChain m_AggregatorMetadata; // first level
helper::Comm m_CommMetadataAggregators; // second level

adios2::profiling::JSONProfiler m_Profiler;

protected:
Expand Down

0 comments on commit faa8a49

Please sign in to comment.