Skip to content

Commit

Permalink
Merge pull request #2791 from pnorbert/bp5-aggregator-fix
Browse files Browse the repository at this point in the history
Bp5 aggregator fix
  • Loading branch information
pnorbert authored Jul 16, 2021
2 parents 8dba298 + 6bfeba8 commit 0d7d045
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 29 deletions.
53 changes: 41 additions & 12 deletions source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <adios2-perfstubs-interface.h>

#include <ctime>
#include <iomanip>
#include <iostream>

namespace adios2
Expand Down Expand Up @@ -177,6 +178,26 @@ void BP5Writer::WriteMyOwnData(format::BufferV::BufferV_iovec DataVec)
}
}

/*std::string DoubleBufferToString(const double *b, int n)
{
std::ostringstream out;
out.precision(1);
out << std::fixed << "[";
char s[32];
for (int i = 0; i < n; ++i)
{
snprintf(s, sizeof(s), "%g", b[i]);
out << s;
if (i < n - 1)
{
out << ", ";
}
}
out << "]";
return out.str();
}*/

void BP5Writer::SendDataToAggregator(format::BufferV::BufferV_iovec DataVec,
const size_t TotalSize)
{
Expand Down Expand Up @@ -237,14 +258,17 @@ void BP5Writer::SendDataToAggregator(format::BufferV::BufferV_iovec DataVec,
}
sent += b->actual_size;

/*std::cout << "Rank " << m_Comm.Rank()
<< " filled shm, data_size = " << b->actual_size
<< " block = " << block << " temp offset = " << temp_offset
<< " sent = " << sent
<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/
/*if (m_RankMPI >= 42)
{
std::cout << "Rank " << m_Comm.Rank()
<< " filled shm, data_size = " << b->actual_size
<< " block = " << block
<< " temp offset = " << temp_offset << " sent = " << sent
<< " buf = " << static_cast<void *>(b->buf) << " = "
<< DoubleBufferToString((double *)b->buf,
b->actual_size / sizeof(double))
<< std::endl;
}*/

a->UnlockProducerBuffer();
}
Expand All @@ -264,10 +288,14 @@ void BP5Writer::WriteOthersData(size_t TotalSize)
/*std::cout << "Rank " << m_Comm.Rank()
<< " write from shm, data_size = " << b->actual_size
<< " total so far = " << wrote
<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/
<< " buf = " << static_cast<void *>(b->buf) << " = "
<< DoubleBufferToString((double *)b->buf,
b->actual_size / sizeof(double))
<< std::endl;*/
/*<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/

// b->actual_size: how much we need to write
m_FileDataManager.WriteFiles(b->buf, b->actual_size);
Expand All @@ -276,6 +304,7 @@ void BP5Writer::WriteOthersData(size_t TotalSize)

a->UnlockConsumerBuffer();
}
m_DataPos += TotalSize;
}

} // end namespace engine
Expand Down
22 changes: 5 additions & 17 deletions source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,7 @@ namespace aggregator

MPIShmChain::MPIShmChain() : MPIAggregator() {}

MPIShmChain::~MPIShmChain()
{
Close();
/*if (m_IsActive)
{
m_NodeComm.Free("free per-node comm in ~MPIShmChain()");
m_OnePerNodeComm.Free("free chain of nodes in ~MPIShmChain()");
m_AllAggregatorsComm.Free(
"free comm of all aggregators in ~MPIShmChain()");
m_AggregatorChainComm.Free(
"free chains of aggregators in ~MPIShmChain()");
}*/
}
MPIShmChain::~MPIShmChain() { Close(); }

void MPIShmChain::Close()
{
Expand Down Expand Up @@ -286,7 +274,7 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockProducerBuffer()
std::this_thread::sleep_for(std::chrono::duration<double>(0.00001));
}

m_Comm.Win_Lock(helper::Comm::LockType::Exclusive, 0, 0, m_Win);
m_Shm->lockSegment.lock();
if (m_Shm->producerBuffer == LastBufferUsed::A)

{
Expand All @@ -302,7 +290,7 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockProducerBuffer()
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufA;
}
m_Comm.Win_Unlock(0, m_Win);
m_Shm->lockSegment.unlock();

// We determined we want a specific buffer
// Now we need to get a lock on it in case consumer is using it
Expand Down Expand Up @@ -343,7 +331,7 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockConsumerBuffer()
// At this point we know buffer A has content or going to have content
// when we successfully lock it

m_Comm.Win_Lock(helper::Comm::LockType::Exclusive, 0, 0, m_Win);
m_Shm->lockSegment.lock();
if (m_Shm->consumerBuffer == LastBufferUsed::A)

{
Expand All @@ -359,7 +347,7 @@ MPIShmChain::ShmDataBuffer *MPIShmChain::LockConsumerBuffer()
// point to shm data buffer (in local process memory)
sdb->buf = m_Shm->bufA;
}
m_Comm.Win_Unlock(0, m_Win);
m_Shm->lockSegment.unlock();

// We determined we want a specific buffer
// Now we need to get a lock on it in case producer is using it
Expand Down
1 change: 1 addition & 0 deletions source/adios2/toolkit/aggregator/mpi/MPIShmChain.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ class MPIShmChain : public MPIAggregator
// user facing structs
ShmDataBuffer sdbA;
ShmDataBuffer sdbB;
aggregator::Spinlock lockSegment;
// locks for individual buffers (sdb and buf)
aggregator::Spinlock lockA;
aggregator::Spinlock lockB;
Expand Down

0 comments on commit 0d7d045

Please sign in to comment.