Skip to content

Commit

Permalink
Fix: Keep track of file offset (m_DataPos) in the aggregator when wri…
Browse files Browse the repository at this point in the history
…ting out a non-aggregator's data, otherwise next step will start writing at a low offset overwriting the previous step..
  • Loading branch information
pnorbert committed Jul 16, 2021
1 parent e35f5ea commit 6bfeba8
Showing 1 changed file with 41 additions and 12 deletions.
53 changes: 41 additions & 12 deletions source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <adios2-perfstubs-interface.h>

#include <ctime>
#include <iomanip>
#include <iostream>

namespace adios2
Expand Down Expand Up @@ -177,6 +178,26 @@ void BP5Writer::WriteMyOwnData(format::BufferV::BufferV_iovec DataVec)
}
}

/*std::string DoubleBufferToString(const double *b, int n)
{
std::ostringstream out;
out.precision(1);
out << std::fixed << "[";
char s[32];
for (int i = 0; i < n; ++i)
{
snprintf(s, sizeof(s), "%g", b[i]);
out << s;
if (i < n - 1)
{
out << ", ";
}
}
out << "]";
return out.str();
}*/

void BP5Writer::SendDataToAggregator(format::BufferV::BufferV_iovec DataVec,
const size_t TotalSize)
{
Expand Down Expand Up @@ -237,14 +258,17 @@ void BP5Writer::SendDataToAggregator(format::BufferV::BufferV_iovec DataVec,
}
sent += b->actual_size;

/*std::cout << "Rank " << m_Comm.Rank()
<< " filled shm, data_size = " << b->actual_size
<< " block = " << block << " temp offset = " << temp_offset
<< " sent = " << sent
<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/
/*if (m_RankMPI >= 42)
{
std::cout << "Rank " << m_Comm.Rank()
<< " filled shm, data_size = " << b->actual_size
<< " block = " << block
<< " temp offset = " << temp_offset << " sent = " << sent
<< " buf = " << static_cast<void *>(b->buf) << " = "
<< DoubleBufferToString((double *)b->buf,
b->actual_size / sizeof(double))
<< std::endl;
}*/

a->UnlockProducerBuffer();
}
Expand All @@ -264,10 +288,14 @@ void BP5Writer::WriteOthersData(size_t TotalSize)
/*std::cout << "Rank " << m_Comm.Rank()
<< " write from shm, data_size = " << b->actual_size
<< " total so far = " << wrote
<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/
<< " buf = " << static_cast<void *>(b->buf) << " = "
<< DoubleBufferToString((double *)b->buf,
b->actual_size / sizeof(double))
<< std::endl;*/
/*<< " buf = " << static_cast<void *>(b->buf) << " = ["
<< (int)b->buf[0] << (int)b->buf[1] << "..."
<< (int)b->buf[b->actual_size - 2]
<< (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/

// b->actual_size: how much we need to write
m_FileDataManager.WriteFiles(b->buf, b->actual_size);
Expand All @@ -276,6 +304,7 @@ void BP5Writer::WriteOthersData(size_t TotalSize)

a->UnlockConsumerBuffer();
}
m_DataPos += TotalSize;
}

} // end namespace engine
Expand Down

0 comments on commit 6bfeba8

Please sign in to comment.