Skip to content

Commit

Permalink
Merge pull request #2593 from JasonRuonanWang/dataman
Browse files Browse the repository at this point in the history
Dataman step combination
  • Loading branch information
JasonRuonanWang authored Jan 26, 2021
2 parents 42a83dc + 9c3d56f commit 3b4e3e0
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
4 changes: 2 additions & 2 deletions source/adios2/engine/dataman/DataManMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ void DataManMonitor::EndTransport()
{
auto latency = std::chrono::duration_cast<std::chrono::microseconds>(
(std::chrono::system_clock::now() -
m_TransportTimers.back().second))
m_TransportTimers.front().second))
.count();
if (m_Verbose)
{
std::lock_guard<std::mutex> l(m_PrintMutex);
std::cout << "Step " << m_TransportTimers.back().first
std::cout << "Step " << m_TransportTimers.front().first
<< ", Latency milliseconds "
<< static_cast<double>(latency) / 1000.0 << std::endl;
}
Expand Down
2 changes: 1 addition & 1 deletion source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ StepStatus DataManReader::BeginStep(StepMode stepMode,
}

m_CurrentStepMetadata = m_Serializer.GetEarliestLatestStep(
m_CurrentStep, m_PublisherAddresses.size(), timeout, true);
m_CurrentStep, m_PublisherAddresses.size(), timeout, false);

if (m_CurrentStepMetadata == nullptr)
{
Expand Down
83 changes: 67 additions & 16 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
m_MpiRank = m_Comm.Rank();
m_MpiSize = m_Comm.Size();

m_Serializer.NewWriterBuffer(m_SerializerBufferSize);

helper::GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress);
helper::GetParameter(m_IO.m_Parameters, "Port", m_Port);
helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
Expand All @@ -36,6 +38,7 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);
helper::GetParameter(m_IO.m_Parameters, "TransportMode", m_TransportMode);
helper::GetParameter(m_IO.m_Parameters, "Monitor", m_MonitorActive);
helper::GetParameter(m_IO.m_Parameters, "CombiningSteps", m_CombiningSteps);

if (m_IPAddress.empty())
{
Expand Down Expand Up @@ -114,7 +117,10 @@ DataManWriter::~DataManWriter()
StepStatus DataManWriter::BeginStep(StepMode mode, const float timeout_sec)
{
++m_CurrentStep;
m_Serializer.NewWriterBuffer(m_SerializerBufferSize);
if (m_CombinedSteps == 0)
{
m_Serializer.NewWriterBuffer(m_SerializerBufferSize);
}

if (m_MonitorActive)
{
Expand All @@ -140,28 +146,44 @@ void DataManWriter::EndStep()
m_Serializer.PutAttributes(m_IO);
}

m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}
++m_CombinedSteps;

if (m_MonitorActive)
if (m_CombinedSteps >= m_CombiningSteps)
{
m_Monitor.BeginTransport(m_CurrentStep);
}
m_CombinedSteps = 0;
m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
if (m_MonitorActive)
{
m_Monitor.BeginTransport(m_CurrentStep);
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
m_Monitor.EndTransport();
m_Monitor.BeginTransport(m_CurrentStep);
}
}

Expand Down Expand Up @@ -194,6 +216,32 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)

void DataManWriter::DoClose(const int transportIndex)
{
if (m_CombinedSteps < m_CombiningSteps && m_CombinedSteps > 0)
{
m_Serializer.AttachAttributesToLocalPack();
const auto buffer = m_Serializer.GetLocalPack();
if (buffer->size() > m_SerializerBufferSize)
{
m_SerializerBufferSize = buffer->size();
}

if (m_DoubleBuffer || m_TransportMode == "reliable")
{
PushBufferQueue(buffer);
}
else
{
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}

nlohmann::json endSignal;
endSignal["FinalStep"] = static_cast<int64_t>(m_CurrentStep);
std::string s = endSignal.dump() + '\0';
Expand Down Expand Up @@ -268,7 +316,10 @@ void DataManWriter::PublishThread()
m_Publisher.Send(buffer);
if (m_MonitorActive)
{
m_Monitor.EndTransport();
for (int i = 0; i < m_CombiningSteps; ++i)
{
m_Monitor.EndTransport();
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class DataManWriter : public Engine
bool m_DoubleBuffer = false;
std::string m_TransportMode = "fast";
bool m_MonitorActive = false;
int m_CombiningSteps = 1;
int m_CombinedSteps = 0;

std::string m_AllAddresses;
std::string m_PublisherAddress;
Expand Down
18 changes: 0 additions & 18 deletions source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,6 @@ void DataManSerializer::PutData(
metaj["E"] = m_IsLittleEndian;
}

for (const auto &op : ops)
{
const auto opName = op.Op->m_Type;
if (opName == "zfp" or opName == "bzip2" or opName == "sz")
{
/*
m_CompressionParams[variable.m_Name]["CompressionMethod"] =
opName;
for (const auto &p : op.Parameters)
{
m_CompressionParams[variable.m_Name]
[opName + ":" + p.first] = p.second;
}
break;
*/
}
}

size_t datasize = 0;
bool compressed = false;
std::string compressionMethod;
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void ZmqReqRep::SendReply(const void *reply, const size_t size)
}

std::shared_ptr<std::vector<char>>
ZmqReqRep::Request(const void *request, const size_t size,
ZmqReqRep::Request(const char *request, const size_t size,
const std::string &address)
{
auto reply = std::make_shared<std::vector<char>>();
Expand Down Expand Up @@ -183,7 +183,7 @@ ZmqReqRep::Request(const void *request, const size_t size,
return reply;
}

std::shared_ptr<std::vector<char>> ZmqReqRep::Request(const void *request,
std::shared_ptr<std::vector<char>> ZmqReqRep::Request(const char *request,
const size_t size)
{
auto reply = std::make_shared<std::vector<char>>();
Expand Down
4 changes: 2 additions & 2 deletions source/adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class ZmqReqRep
void OpenRequester(const std::string &address, const int timeout,
const size_t receiverBufferSize);
std::shared_ptr<std::vector<char>>
Request(const void *request, const size_t size, const std::string &address);
std::shared_ptr<std::vector<char>> Request(const void *request,
Request(const char *request, const size_t size, const std::string &address);
std::shared_ptr<std::vector<char>> Request(const char *request,
const size_t size);

// replier
Expand Down

0 comments on commit 3b4e3e0

Please sign in to comment.