Skip to content

Commit

Permalink
Backport ornladios#3759 to Release 2.10 branch. (cherry-picked from m…
Browse files Browse the repository at this point in the history
…aster branch ornladios#4340) (ornladios#4341)

Organize the processes into groups so that the two steps of metadata aggregation has more or less the same number or participants. This replaces in-node aggregation in first step. The new strategy balances the size of metadata gathered in the two steps.
  • Loading branch information
pnorbert authored Sep 11, 2024
1 parent d6e61c8 commit 5283b27
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
37 changes: 24 additions & 13 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -617,13 +617,13 @@ void BP5Writer::EndStep()
MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous(
TSInfo.NewMetaMetaBlocks, {m}, {a}, {m_ThisTimestepDataSize}, {m_StartDataPos});

if (m_Aggregator->m_Comm.Size() > 1)
if (m_AggregatorMetadata.m_Comm.Size() > 1)
{ // level 1
m_Profiler.Start("ES_meta1_gather");
size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts = m_Aggregator->m_Comm.GatherValues(LocalSize, 0);
std::vector<size_t> RecvCounts = m_AggregatorMetadata.m_Comm.GatherValues(LocalSize, 0);
std::vector<char> RecvBuffer;
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -633,10 +633,10 @@ void BP5Writer::EndStep()
<< TotalSize << " bytes from aggregator group"
<< std::endl;*/
}
m_Aggregator->m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_AggregatorMetadata.m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_Profiler.Stop("ES_meta1_gather");
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
Expand All @@ -654,17 +654,17 @@ void BP5Writer::EndStep()
m_Profiler.Stop("ES_meta1");
m_Profiler.Start("ES_meta2");
// level 2
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<char> RecvBuffer;
std::vector<char> *buf;
std::vector<size_t> RecvCounts;
size_t LocalSize = MetaBuffer.size();
if (m_CommAggregators.Size() > 1)
if (m_CommMetadataAggregators.Size() > 1)
{
m_Profiler.Start("ES_meta2_gather");
RecvCounts = m_CommAggregators.GatherValues(LocalSize, 0);
if (m_CommAggregators.Rank() == 0)
RecvCounts = m_CommMetadataAggregators.GatherValues(LocalSize, 0);
if (m_CommMetadataAggregators.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -675,8 +675,8 @@ void BP5Writer::EndStep()
<< std::endl;*/
}

m_CommAggregators.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
m_CommMetadataAggregators.GathervArrays(MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
buf = &RecvBuffer;
m_Profiler.Stop("ES_meta2_gather");
}
Expand All @@ -686,7 +686,7 @@ void BP5Writer::EndStep()
RecvCounts.push_back(LocalSize);
}

if (m_CommAggregators.Rank() == 0)
if (m_CommMetadataAggregators.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock> UniqueMetaMetaBlocks;
std::vector<uint64_t> DataSizes;
Expand Down Expand Up @@ -1072,6 +1072,17 @@ void BP5Writer::InitAggregator()
*/
int color = m_Aggregator->m_Comm.Rank();
m_CommAggregators = m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open");

/* Metadata aggregator for two-level metadata aggregation */
{
size_t n = static_cast<size_t>(m_Comm.Size());
size_t a = (int)floor(sqrt((double)n));
m_AggregatorMetadata.Init(a, a, m_Comm);
/* chain of rank 0s form the second level of aggregation */
int color = m_AggregatorMetadata.m_Comm.Rank();
m_CommMetadataAggregators =
m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open");
}
}

void BP5Writer::InitTransports()
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ class BP5Writer : public BP5Engine, public core::Engine
helper::Comm *DataWritingComm; // processes that write the same data file
// aggregators only (valid if m_Aggregator->m_Comm.Rank() == 0)
helper::Comm m_CommAggregators;

/* two-level metadata aggregation */
aggregator::MPIChain m_AggregatorMetadata; // first level
helper::Comm m_CommMetadataAggregators; // second level

adios2::profiling::JSONProfiler m_Profiler;

protected:
Expand Down

0 comments on commit 5283b27

Please sign in to comment.