From c68378fea1b91fe6208a077ebdcc988f0658eb55 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 3 Mar 2020 09:42:16 -0500 Subject: [PATCH 1/9] added MpiHandshake --- source/adios2/CMakeLists.txt | 1 + source/adios2/engine/ssc/SscReader.cpp | 8 +- source/adios2/engine/ssc/SscReader.h | 4 + source/adios2/engine/ssc/SscWriter.cpp | 6 +- source/adios2/engine/ssc/SscWriter.h | 4 + source/adios2/helper/adiosMpiHandshake.cpp | 273 +++++++++++++++++++++ source/adios2/helper/adiosMpiHandshake.h | 60 +++++ 7 files changed, 354 insertions(+), 2 deletions(-) create mode 100644 source/adios2/helper/adiosMpiHandshake.cpp create mode 100644 source/adios2/helper/adiosMpiHandshake.h diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index c25c5eb390..94b8bf09f8 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -231,6 +231,7 @@ if(ADIOS2_HAVE_MPI) target_sources(adios2 PRIVATE core/IOMPI.cpp helper/adiosCommMPI.h helper/adiosCommMPI.cpp + helper/adiosMpiHandshake.h helper/adiosMpiHandshake.cpp engine/insitumpi/InSituMPIWriter.cpp engine/insitumpi/InSituMPIWriter.tcc engine/insitumpi/InSituMPIReader.cpp engine/insitumpi/InSituMPIReader.tcc engine/insitumpi/InSituMPIFunctions.cpp engine/insitumpi/InSituMPISchedules.cpp diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 837daae56c..4f88adee85 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -10,6 +10,7 @@ #include "SscReader.tcc" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosCommMPI.h" #include "adios2/helper/adiosFunctions.h" #include "adios2/helper/adiosJSONcomplex.h" #include "nlohmann/json.hpp" @@ -191,13 +192,18 @@ void SscReader::EndStep() void SscReader::SyncMpiPattern() { + TAU_SCOPED_TIMER_FUNC(); + if (m_Verbosity >= 5) { std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank << ", Reader Rank " << m_ReaderRank << std::endl; } - TAU_SCOPED_TIMER_FUNC(); + m_MpiHandshake.Start(4, 128, 2, 'r', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Wait(m_Name); + m_MpiHandshake.PrintMaps(); + if (m_WorldSize == m_ReaderSize) { throw(std::runtime_error("no writers are found")); diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index c899ec6f0e..47c805aed1 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -14,6 +14,7 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" +#include "adios2/helper/adiosMpiHandshake.h" #include #include @@ -56,6 +57,9 @@ class SscReader : public Engine int m_WorldSize; int m_ReaderRank; int m_ReaderSize; + + helper::MpiHandshake m_MpiHandshake; + int m_AppID = 0; int m_AppSize = 0; std::vector> m_WriterGlobalMpiInfo; diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index ad1cdfeeb3..db697a36e2 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -10,10 +10,10 @@ #include "SscWriter.tcc" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosCommMPI.h" #include "adios2/helper/adiosJSONcomplex.h" #include "nlohmann/json.hpp" -#include "adios2/helper/adiosCommMPI.h" namespace adios2 { @@ -192,6 +192,10 @@ void SscWriter::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); + m_MpiHandshake.Start(4, 128, 2, 'w', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Wait(m_Name); + m_MpiHandshake.PrintMaps(); + if (m_Verbosity >= 5) { std::cout << "SscWriter::SyncMpiPattern, World Rank " << m_WorldRank diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index 158e065dbc..498e554aab 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -14,6 +14,7 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" #include "adios2/toolkit/profiling/taustubs/tautimer.hpp" +#include "adios2/helper/adiosMpiHandshake.h" #include #include @@ -57,6 +58,9 @@ class SscWriter : public Engine int m_WorldSize; int m_WriterRank; int m_WriterSize; + + helper::MpiHandshake m_MpiHandshake; + int m_AppID = 0; int m_AppSize = 0; std::vector> m_WriterGlobalMpiInfo; diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp new file mode 100644 index 0000000000..645ca93166 --- /dev/null +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -0,0 +1,273 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosMpiHandshake.cpp + */ + +#include +#include "adiosMpiHandshake.h" + +namespace adios2 +{ +namespace helper +{ + +std::vector MpiHandshake::m_Buffer; +std::vector> MpiHandshake::m_SendRequests; +std::vector> MpiHandshake::m_RecvRequests; +size_t MpiHandshake::m_MaxStreamsPerApp; +size_t MpiHandshake::m_MaxFilenameLength; +size_t MpiHandshake::m_ItemSize; +std::map MpiHandshake::m_AppsForStreams; +size_t MpiHandshake::m_StreamID = 0; +int MpiHandshake::m_WorldSize; +int MpiHandshake::m_WorldRank; +int MpiHandshake::m_LocalSize; +int MpiHandshake::m_LocalRank; +int MpiHandshake::m_LocalMasterRank; +std::map>> MpiHandshake::m_WritersMap; +std::map>> MpiHandshake::m_ReadersMap; +std::map MpiHandshake::m_AppsSize; + + +size_t MpiHandshake::PlaceInBuffer(size_t stream, int rank) +{ + return rank * m_MaxStreamsPerApp * m_ItemSize + stream * m_ItemSize; +} + +void MpiHandshake::Test() +{ + int success; + MPI_Status status; + + for(int rank=0; rank(m_Buffer.data() + offset)[0]; + offset += sizeof(int); + int appSize = reinterpret_cast(m_Buffer.data() + offset)[0]; + offset += sizeof(int); + std::string filename = m_Buffer.data() + offset; + m_AppsSize[appMasterRank] = appSize; + if(mode == 'w') + { + auto &ranks = m_WritersMap[filename][appMasterRank]; + bool existed = false; + for(const auto r : ranks) + { + if(r == rank) + { + existed = true; + } + } + if(not existed) + { + ranks.push_back(rank); + } + } + else if(mode == 'r') + { + auto &ranks = m_ReadersMap[filename][appMasterRank]; + bool existed = false; + for(const auto r : ranks) + { + if(r == rank) + { + existed = true; + } + } + if(not existed) + { + ranks.push_back(rank); + } + } + } + } + } +} + +bool MpiHandshake::Check(const std::string &filename) +{ + Test(); + + if(m_WritersMap[filename].size() + m_ReadersMap[filename].size() != m_AppsForStreams[filename]) + { + return false; + } + + for (const auto &app : m_WritersMap[filename]) + { + if(app.second.size() != m_AppsSize[app.first]) + { + return false; + } + } + + for (const auto &app : m_ReadersMap[filename]) + { + if(app.second.size() != m_AppsSize[app.first]) + { + return false; + } + } + + return true; +} + +void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm) +{ + m_AppsForStreams[filename] = appsForThisStream; + + if(m_StreamID == 0) + { + MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize); + MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank); + MPI_Comm_size(localComm, &m_LocalSize); + MPI_Comm_rank(localComm, &m_LocalRank); + m_MaxStreamsPerApp = maxStreamsPerApp; + m_MaxFilenameLength = maxFilenameLength; + m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2; + + if(m_LocalRank == 0) + { + m_LocalMasterRank = m_WorldRank; + } + + MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm); + + m_SendRequests.resize(m_WorldSize); + m_RecvRequests.resize(m_WorldSize); + for(int i=0; i maxFilenameLength) + { + throw(std::runtime_error("Filename too long")); + } + size_t offset = 0; + std::vector buffer(m_ItemSize); + std::memcpy(buffer.data(), &mode, sizeof(char)); + offset += sizeof(char); + std::memcpy(buffer.data() + offset, &m_LocalMasterRank, sizeof(int)); + offset += sizeof(int); + std::memcpy(buffer.data() + offset, &m_LocalSize, sizeof(int)); + offset += sizeof(int); + std::memcpy(buffer.data() + offset, filename.data(), filename.size()); + + for(int i=0; i> & MpiHandshake::GetWriterMap(const std::string &filename) +{ + return m_WritersMap[filename]; +} +const std::map> & MpiHandshake::GetReaderMap(const std::string &filename) +{ + return m_ReadersMap[filename]; +} + +void MpiHandshake::Finalize() +{ + --m_StreamID; + if(m_StreamID == 0) + { + for(auto &rs : m_RecvRequests) + { + for(auto &r : rs) + { + MPI_Status status; + int success; + MPI_Test(&r, &success, &status); + if(not success) + { + MPI_Cancel(&r); + } + } + } + m_RecvRequests.clear(); + m_SendRequests.clear(); + } +} + +void MpiHandshake::PrintMaps() +{ + for(int printRank = 0; printRank < m_WorldSize; ++printRank) + { + MPI_Barrier(MPI_COMM_WORLD); + if(m_WorldRank == printRank) + { + std::cout << "For rank " << printRank << " ********************* " << std::endl; + std::cout << "Writers: " << std::endl; + for (const auto &stream : m_WritersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first << std::endl; + std::cout << " "; + for(const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + std::cout << "Readers: " << std::endl; + for (const auto &stream : m_ReadersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first << std::endl; + std::cout << " "; + for(const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + } + } +} + +} // end namespace helper +} // end namespace adios2 diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h new file mode 100644 index 0000000000..037633363f --- /dev/null +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -0,0 +1,60 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * adiosMpiHandshake.h + */ + +#ifndef ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ +#define ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ + +#include "adios2/common/ADIOSConfig.h" +#ifndef ADIOS2_HAVE_MPI +#error "Do not include adiosCommMPI.h without ADIOS2_HAVE_MPI." +#endif + +#include +#include +#include +#include + +namespace adios2 +{ +namespace helper +{ + +class MpiHandshake +{ + public: + static void Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm); + static void Wait(const std::string &filename); + static void Finalize(); + static const std::map> & GetWriterMap(const std::string &filename); + static const std::map> & GetReaderMap(const std::string &filename); + static void PrintMaps(); + private: + static void Test(); + static bool Check(const std::string &filename); + static size_t PlaceInBuffer(const size_t stream, const int rank); + static std::vector m_Buffer; + static std::vector> m_SendRequests; + static std::vector> m_RecvRequests; + static size_t m_MaxStreamsPerApp; + static size_t m_MaxFilenameLength; + static size_t m_ItemSize; + static std::map m_AppsForStreams; + static size_t m_StreamID; + static int m_WorldSize; + static int m_WorldRank; + static int m_LocalSize; + static int m_LocalRank; + static int m_LocalMasterRank; + static std::map>> m_WritersMap; + static std::map>> m_ReadersMap; + static std::map m_AppsSize; +}; + +} // end namespace helper +} // end namespace adios2 + +#endif // ADIOS2_HELPER_ADIOSMPIHANDSHAKE_H_ From ef25fb768ef5100bc026a3970f9f9bb919a4abe4 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 3 Mar 2020 11:55:14 -0500 Subject: [PATCH 2/9] added engine parameters necessary for mpi handshake --- source/adios2/engine/ssc/SscHelper.cpp | 37 ++++++++++++++++++++++++++ source/adios2/engine/ssc/SscHelper.h | 3 +++ source/adios2/engine/ssc/SscReader.cpp | 25 +++++------------ source/adios2/engine/ssc/SscReader.h | 3 +++ source/adios2/engine/ssc/SscWriter.cpp | 25 +++++------------ source/adios2/engine/ssc/SscWriter.h | 3 +++ 6 files changed, 58 insertions(+), 38 deletions(-) diff --git a/source/adios2/engine/ssc/SscHelper.cpp b/source/adios2/engine/ssc/SscHelper.cpp index 3228b6a1c6..99b5b17d06 100644 --- a/source/adios2/engine/ssc/SscHelper.cpp +++ b/source/adios2/engine/ssc/SscHelper.cpp @@ -303,6 +303,43 @@ void PrintMpiInfo(const MpiInfo &writersInfo, const MpiInfo &readersInfo) std::cout << std::endl; } +bool GetParameter(const Params ¶ms, const std::string &key, int &value) +{ + auto it = params.find(key); + if (it == params.end()) + { + return false; + } + else + { + try + { + value = std::stoi(it->second); + } + catch (...) + { + std::string error = "Engine parameter " + key + " can only be integer numbers"; + std::cerr << error << std::endl; + return false; + } + } + return true; +} + +bool GetParameter(const Params ¶ms, const std::string &key, std::string &value) +{ + auto it = params.find(key); + if (it == params.end()) + { + return false; + } + else + { + value = it->second; + } + return true; +} + } // end namespace ssc } // end namespace engine } // end namespace core diff --git a/source/adios2/engine/ssc/SscHelper.h b/source/adios2/engine/ssc/SscHelper.h index 383e672eb7..e84bec3aa2 100644 --- a/source/adios2/engine/ssc/SscHelper.h +++ b/source/adios2/engine/ssc/SscHelper.h @@ -66,6 +66,9 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output); bool AreSameDims(const Dims &a, const Dims &b); +bool GetParameter(const Params ¶ms, const std::string &key, int &value); +bool GetParameter(const Params ¶ms, const std::string &key, std::string &value); + } // end namespace ssc } // end namespace engine } // end namespace core diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 4f88adee85..07c6fa3129 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -32,24 +32,11 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, m_ReaderRank = m_Comm.Rank(); m_ReaderSize = m_Comm.Size(); - auto it = m_IO.m_Parameters.find("MpiMode"); - if (it != m_IO.m_Parameters.end()) - { - m_MpiMode = it->second; - } - it = m_IO.m_Parameters.find("Verbose"); - if (it != m_IO.m_Parameters.end()) - { - try - { - m_Verbosity = std::stoi(it->second); - } - catch (...) - { - std::cerr << "Engine parameter Verbose can only be integer numbers" - << std::endl; - } - } + ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); + ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); m_Buffer.resize(1); @@ -200,7 +187,7 @@ void SscReader::SyncMpiPattern() << ", Reader Rank " << m_ReaderRank << std::endl; } - m_MpiHandshake.Start(4, 128, 2, 'r', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) ); m_MpiHandshake.Wait(m_Name); m_MpiHandshake.PrintMaps(); diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 47c805aed1..df78c6f483 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -106,6 +106,9 @@ class SscReader : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; + int m_MaxStreamsPerApp = 4; + int m_MaxFilenameLength = 128; + int m_RendezvousAppCount = 2; }; } // end namespace engine diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index db697a36e2..2b536c13bc 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -32,24 +32,11 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, m_WriterRank = m_Comm.Rank(); m_WriterSize = m_Comm.Size(); - auto it = m_IO.m_Parameters.find("MpiMode"); - if (it != m_IO.m_Parameters.end()) - { - m_MpiMode = it->second; - } - it = m_IO.m_Parameters.find("Verbose"); - if (it != m_IO.m_Parameters.end()) - { - try - { - m_Verbosity = std::stoi(it->second); - } - catch (...) - { - std::cerr << "Engine parameter Verbose can only be integer numbers" - << std::endl; - } - } + ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); + ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); m_GlobalWritePattern.resize(m_WorldSize); m_GlobalReadPattern.resize(m_WorldSize); @@ -192,7 +179,7 @@ void SscWriter::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); - m_MpiHandshake.Start(4, 128, 2, 'w', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) ); m_MpiHandshake.Wait(m_Name); m_MpiHandshake.PrintMaps(); diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index 498e554aab..bb580d2966 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -96,6 +96,9 @@ class SscWriter : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; + int m_MaxStreamsPerApp = 4; + int m_MaxFilenameLength = 128; + int m_RendezvousAppCount = 2; }; } // end namespace engine From 909060694b2f3f5e1af4a836a4ede487e3b497ec Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 3 Mar 2020 16:25:28 -0500 Subject: [PATCH 3/9] mpi handshake worked for all ssc tests --- source/adios2/engine/ssc/SscReader.cpp | 243 ++---------------------- source/adios2/engine/ssc/SscReader.h | 7 +- source/adios2/engine/ssc/SscWriter.cpp | 250 ++----------------------- source/adios2/engine/ssc/SscWriter.h | 7 +- 4 files changed, 32 insertions(+), 475 deletions(-) diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 07c6fa3129..20a7fa4c8b 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -34,9 +34,9 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); - ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp); ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount); m_Buffer.resize(1); @@ -104,11 +104,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, { TAU_SCOPED_TIMER_FUNC(); - if (m_Verbosity >= 5) - { - std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank - << ", Reader Rank " << m_ReaderRank << std::endl; - } if (m_InitialStep) { @@ -141,6 +136,12 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, } } + if (m_Verbosity >= 5) + { + std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank + << ", Reader Rank " << m_ReaderRank << ", Step "<< m_CurrentStep << std::endl; + } + if (m_Buffer[0] == 1) { return StepStatus::EndOfStream; @@ -187,236 +188,21 @@ void SscReader::SyncMpiPattern() << ", Reader Rank " << m_ReaderRank << std::endl; } - m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) ); m_MpiHandshake.Wait(m_Name); m_MpiHandshake.PrintMaps(); - if (m_WorldSize == m_ReaderSize) + for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { - throw(std::runtime_error("no writers are found")); - } - - std::vector lrbuf; - std::vector grbuf; - - // Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and - // find out all the m_WriterRank == 0 - if (m_WorldRank == 0) - { - grbuf.resize(m_WorldSize); - } - - MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0, - MPI_COMM_WORLD); - - std::vector AppStart; // m_WorldRank of the local rank 0 process - if (m_WorldRank == 0) - { - for (int i = 0; i < m_WorldSize; ++i) - { - if (grbuf[i] == 0) - { - AppStart.push_back(i); - } - } - m_AppSize = AppStart.size(); - } - - // Each local rank 0 process send their type (0 for writer, 1 for reader) to - // the world rank 0 process The AppStart are re-ordered to put all writers - // ahead of all the readers. - std::vector - AppType; // Vector to record the type of the local rank 0 process - if (m_ReaderRank == 0) // Send type from each local rank 0 process to the - // world rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - AppType.resize(m_AppSize); - for (int i = 0; i < m_AppSize; ++i) - { - if (i == 0) - { - AppType[i] = 1; - ; - } - else - { - int tmp = 1; - MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - AppType[i] = tmp; - } - } - } - else - { - int tmp = 1; // type 1 for reader - MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); // - } - } - - if (m_WorldRank == 0) - { - std::vector AppWriter; - std::vector AppReader; - - for (int i = 0; i < m_AppSize; ++i) - { - if (AppType[i] == 0) - { - AppWriter.push_back(AppStart[i]); - } - else - { - AppReader.push_back(AppStart[i]); - } - } - m_WriterGlobalMpiInfo.resize(AppWriter.size()); - m_ReaderGlobalMpiInfo.resize(AppReader.size()); - AppStart = AppWriter; - AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end()); - } - - // Send the m_AppSize and m_AppID to each local rank 0 process - if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_AppSize; ++i) - { - MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); // - } - } - else - { - MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - } - } - - m_Comm.Bcast(&m_AppID, sizeof(int), - 0); // Local rank 0 process broadcast the m_AppID within the - // local communicator. - - MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize - - // In each local communicator, each local rank 0 process gathers the world - // rank of all the rest local processes. - if (m_ReaderRank == 0) - { - lrbuf.resize(m_ReaderSize); - } - - m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0); - - // Send the WorldRank vector of each local communicator to the m_WorldRank - // == 0 process. - int WriterInfoSize = 0; - int ReaderInfoSize = 0; - if (m_ReaderRank == 0) - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i) - { - if (i == 0) - { - m_WriterGlobalMpiInfo[i] = lrbuf; - ++WriterInfoSize; - } - else - { - int j_writersize; - MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++WriterInfoSize; - - m_WriterGlobalMpiInfo[i].resize(j_writersize); - MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize, - MPI_INT, AppStart[i], 98, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - - for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i) - { - if (i == 0) - { - m_ReaderGlobalMpiInfo[i] = lrbuf; - ++ReaderInfoSize; - } - else - { - int j_readersize; - MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++ReaderInfoSize; - - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .resize(j_readersize); - MPI_Recv( - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .data(), - j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - } - else - { - MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD); - MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97, - MPI_COMM_WORLD); - } - } - - // Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the - // processes. - MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast writerinfo size - MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo.resize(WriterInfoSize); - m_ReaderGlobalMpiInfo.resize(ReaderInfoSize); - - for (int i = 0; i < WriterInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_WriterGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_WriterGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast readerinfo size - } - - for (int i = 0; i < ReaderInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_ReaderGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_ReaderGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); // Broadcast readerinfo size - } - - for (const auto &app : m_WriterGlobalMpiInfo) - { - for (int rank : app) + for (int rank : app.second) { m_AllWriterRanks.push_back(rank); } } - for (const auto &app : m_ReaderGlobalMpiInfo) + for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name)) { - for (int rank : app) + for (int rank : app.second) { m_AllReaderRanks.push_back(rank); } @@ -427,10 +213,6 @@ void SscReader::SyncMpiPattern() MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(), &m_MpiAllWritersGroup); - if (m_Verbosity >= 10 and m_WorldRank == 0) - { - ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo); - } } void SscReader::SyncWritePattern() @@ -680,6 +462,7 @@ void SscReader::DoClose(const int transportIndex) << ", Reader Rank " << m_ReaderRank << std::endl; } MPI_Win_free(&m_MpiWin); + m_MpiHandshake.Finalize(); } } // end namespace engine diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index df78c6f483..c20639dbb0 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -59,11 +59,6 @@ class SscReader : public Engine int m_ReaderSize; helper::MpiHandshake m_MpiHandshake; - - int m_AppID = 0; - int m_AppSize = 0; - std::vector> m_WriterGlobalMpiInfo; - std::vector> m_ReaderGlobalMpiInfo; std::vector m_AllWriterRanks; std::vector m_AllReaderRanks; @@ -106,8 +101,8 @@ class SscReader : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; - int m_MaxStreamsPerApp = 4; int m_MaxFilenameLength = 128; + int m_RendezvousStreamCount = 1; int m_RendezvousAppCount = 2; }; diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index 2b536c13bc..d8e36d1c2d 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -34,9 +34,9 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); - ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp); ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount); m_GlobalWritePattern.resize(m_WorldSize); m_GlobalReadPattern.resize(m_WorldSize); @@ -48,12 +48,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { TAU_SCOPED_TIMER_FUNC(); - if (m_Verbosity >= 5) - { - std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank - << ", Writer Rank " << m_WriterRank << std::endl; - } - if (m_InitialStep) { m_InitialStep = false; @@ -62,6 +56,13 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) { ++m_CurrentStep; } + + if (m_Verbosity >= 5) + { + std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank + << ", Writer Rank " << m_WriterRank << ", Step "<< m_CurrentStep << std::endl; + } + return StepStatus::OK; } @@ -179,239 +180,27 @@ void SscWriter::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); - m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) ); - m_MpiHandshake.Wait(m_Name); - m_MpiHandshake.PrintMaps(); - if (m_Verbosity >= 5) { std::cout << "SscWriter::SyncMpiPattern, World Rank " << m_WorldRank << ", Writer Rank " << m_WriterRank << std::endl; } - if (m_WorldSize == m_WriterSize) - { - throw(std::runtime_error("no readers are found")); - } - - std::vector lrbuf; - std::vector grbuf; - - // Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and - // find out all the m_WriterRank == 0 - if (m_WorldRank == 0) - { - grbuf.resize(m_WorldSize); - } - - MPI_Gather(&m_WriterRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0, - MPI_COMM_WORLD); - - std::vector AppStart; // m_WorldRank of the local rank 0 process - if (m_WorldRank == 0) - { - for (int i = 0; i < m_WorldSize; ++i) - { - if (grbuf[i] == 0) - { - AppStart.push_back(i); - } - } - m_AppSize = AppStart.size(); - } - - // Each local rank 0 process send their type (0 for writer, 1 for reader) to - // the world rank 0 process The AppStart are re-ordered to put all writers - // ahead of all the readers. - std::vector - AppType; // Vector to record the type of the local rank 0 process - if (m_WriterRank == 0) // Send type from each local rank 0 process to the - // world rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - AppType.resize(m_AppSize); - for (int i = 0; i < m_AppSize; ++i) - { - if (i == 0) - { - AppType[i] = 0; - } - else - { - int tmp = 1; - MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - AppType[i] = tmp; - } - } - } - else - { - int tmp = 0; // type 0 for writer - MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); // - } - } - - if (m_WorldRank == 0) - { - std::vector AppWriter; - std::vector AppReader; - - for (int i = 0; i < m_AppSize; ++i) - { - if (AppType[i] == 0) - { - AppWriter.push_back(AppStart[i]); - } - else - { - AppReader.push_back(AppStart[i]); - } - } - m_WriterGlobalMpiInfo.resize(AppWriter.size()); - m_ReaderGlobalMpiInfo.resize(AppReader.size()); - AppStart = AppWriter; - AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end()); - } - - // Send the m_AppSize and m_AppID to each local rank 0 process - if (m_WriterRank == 0) // Send m_AppID to each local rank 0 process - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_AppSize; ++i) - { - MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); // - } - } - else - { - MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); - } - } - - m_Comm.Bcast(&m_AppID, 1, 0); // Local rank 0 process broadcast the m_AppID - // within the local communicator. - - MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize - - // In each local communicator, each local rank 0 process gathers the world - // rank of all the rest local processes. - if (m_WriterRank == 0) - { - lrbuf.resize(m_WriterSize); - } - m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0); - - // Send the WorldRank vector of each local communicator to the m_WorldRank - // == 0 process. - int WriterInfoSize = 0; - int ReaderInfoSize = 0; - if (m_WriterRank == 0) - { - if (m_WorldRank == 0) // App_ID - { - for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i) - { - if (i == 0) - { - m_WriterGlobalMpiInfo[i] = lrbuf; - ++WriterInfoSize; - } - else - { - int j_writersize; - MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++WriterInfoSize; - - m_WriterGlobalMpiInfo[i].resize(j_writersize); - MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize, - MPI_INT, AppStart[i], 98, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - - for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i) - { - if (i == 0) - { - m_ReaderGlobalMpiInfo[i] = lrbuf; - ++ReaderInfoSize; - } - else - { - int j_readersize; - MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95, - MPI_COMM_WORLD, MPI_STATUS_IGNORE); // - ++ReaderInfoSize; - - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .resize(j_readersize); - MPI_Recv( - m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()] - .data(), - j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD, - MPI_STATUS_IGNORE); // - } - } - } - else - { - MPI_Send(&m_WriterSize, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); - MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 98, - MPI_COMM_WORLD); - } - } - - // Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the - // processes. - MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo.resize(WriterInfoSize); - m_ReaderGlobalMpiInfo.resize(ReaderInfoSize); - - for (int i = 0; i < WriterInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_WriterGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - - m_WriterGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); - } - - for (int i = 0; i < ReaderInfoSize; ++i) - { - int ilen; - if (m_WorldRank == 0) - { - ilen = m_ReaderGlobalMpiInfo[i].size(); - } - MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD); - m_ReaderGlobalMpiInfo[i].resize(ilen); - MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0, - MPI_COMM_WORLD); - } + m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Wait(m_Name); + m_MpiHandshake.PrintMaps(); - for (const auto &app : m_WriterGlobalMpiInfo) + for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { - for (int rank : app) + for (int rank : app.second) { m_AllWriterRanks.push_back(rank); } } - for (const auto &app : m_ReaderGlobalMpiInfo) + for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name)) { - for (int rank : app) + for (int rank : app.second) { m_AllReaderRanks.push_back(rank); } @@ -419,13 +208,7 @@ void SscWriter::SyncMpiPattern() MPI_Group worldGroup; MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); - MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), - &m_MpiAllReadersGroup); - - if (m_Verbosity >= 10 and m_WorldRank == 0) - { - ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo); - } + MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), &m_MpiAllReadersGroup); } void SscWriter::SyncWritePattern() @@ -690,6 +473,7 @@ void SscWriter::DoClose(const int transportIndex) } MPI_Win_free(&m_MpiWin); + m_MpiHandshake.Finalize(); } } // end namespace engine diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index bb580d2966..7fd4e929fe 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -60,11 +60,6 @@ class SscWriter : public Engine int m_WriterSize; helper::MpiHandshake m_MpiHandshake; - - int m_AppID = 0; - int m_AppSize = 0; - std::vector> m_WriterGlobalMpiInfo; - std::vector> m_ReaderGlobalMpiInfo; std::vector m_AllWriterRanks; std::vector m_AllReaderRanks; @@ -96,8 +91,8 @@ class SscWriter : public Engine ssc::RankPosMap &allOverlapRanks); int m_Verbosity = 0; - int m_MaxStreamsPerApp = 4; int m_MaxFilenameLength = 128; + int m_RendezvousStreamCount = 1; int m_RendezvousAppCount = 2; }; From 4e2ac8359e41196c78cc8eade47370c96ec3adb8 Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Wed, 4 Mar 2020 19:26:52 -0500 Subject: [PATCH 4/9] minor changes and clang-format --- source/adios2/engine/ssc/SscHelper.cpp | 6 +- source/adios2/engine/ssc/SscHelper.h | 3 +- source/adios2/engine/ssc/SscReader.cpp | 20 ++-- source/adios2/engine/ssc/SscReader.h | 2 +- source/adios2/engine/ssc/SscWriter.cpp | 22 +++-- source/adios2/engine/ssc/SscWriter.h | 2 +- source/adios2/helper/adiosMpiHandshake.cpp | 108 ++++++++++++--------- source/adios2/helper/adiosMpiHandshake.h | 66 +++++++------ 8 files changed, 131 insertions(+), 98 deletions(-) diff --git a/source/adios2/engine/ssc/SscHelper.cpp b/source/adios2/engine/ssc/SscHelper.cpp index 99b5b17d06..0192c8931c 100644 --- a/source/adios2/engine/ssc/SscHelper.cpp +++ b/source/adios2/engine/ssc/SscHelper.cpp @@ -318,7 +318,8 @@ bool GetParameter(const Params ¶ms, const std::string &key, int &value) } catch (...) { - std::string error = "Engine parameter " + key + " can only be integer numbers"; + std::string error = + "Engine parameter " + key + " can only be integer numbers"; std::cerr << error << std::endl; return false; } @@ -326,7 +327,8 @@ bool GetParameter(const Params ¶ms, const std::string &key, int &value) return true; } -bool GetParameter(const Params ¶ms, const std::string &key, std::string &value) +bool GetParameter(const Params ¶ms, const std::string &key, + std::string &value) { auto it = params.find(key); if (it == params.end()) diff --git a/source/adios2/engine/ssc/SscHelper.h b/source/adios2/engine/ssc/SscHelper.h index e84bec3aa2..682c980ba7 100644 --- a/source/adios2/engine/ssc/SscHelper.h +++ b/source/adios2/engine/ssc/SscHelper.h @@ -67,7 +67,8 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output); bool AreSameDims(const Dims &a, const Dims &b); bool GetParameter(const Params ¶ms, const std::string &key, int &value); -bool GetParameter(const Params ¶ms, const std::string &key, std::string &value); +bool GetParameter(const Params ¶ms, const std::string &key, + std::string &value); } // end namespace ssc } // end namespace engine diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 20a7fa4c8b..1cdbc0fd76 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -34,9 +34,12 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); - ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", + m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", + m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", + m_RendezvousStreamCount); m_Buffer.resize(1); @@ -104,7 +107,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, { TAU_SCOPED_TIMER_FUNC(); - if (m_InitialStep) { m_InitialStep = false; @@ -139,7 +141,8 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, if (m_Verbosity >= 5) { std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank - << ", Reader Rank " << m_ReaderRank << ", Step "<< m_CurrentStep << std::endl; + << ", Reader Rank " << m_ReaderRank << ", Step " + << m_CurrentStep << std::endl; } if (m_Buffer[0] == 1) @@ -188,7 +191,8 @@ void SscReader::SyncMpiPattern() << ", Reader Rank " << m_ReaderRank << std::endl; } - m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, + m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm)); m_MpiHandshake.Wait(m_Name); m_MpiHandshake.PrintMaps(); @@ -208,11 +212,12 @@ void SscReader::SyncMpiPattern() } } + m_MpiHandshake.Finalize(); + MPI_Group worldGroup; MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(), &m_MpiAllWritersGroup); - } void SscReader::SyncWritePattern() @@ -462,7 +467,6 @@ void SscReader::DoClose(const int transportIndex) << ", Reader Rank " << m_ReaderRank << std::endl; } MPI_Win_free(&m_MpiWin); - m_MpiHandshake.Finalize(); } } // end namespace engine diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index c20639dbb0..23446787ae 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -13,8 +13,8 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" -#include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include "adios2/helper/adiosMpiHandshake.h" +#include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include #include diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index d8e36d1c2d..5aca28dee4 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -14,7 +14,6 @@ #include "adios2/helper/adiosJSONcomplex.h" #include "nlohmann/json.hpp" - namespace adios2 { namespace core @@ -34,9 +33,12 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); - ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", + m_MaxFilenameLength); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", + m_RendezvousAppCount); + ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", + m_RendezvousStreamCount); m_GlobalWritePattern.resize(m_WorldSize); m_GlobalReadPattern.resize(m_WorldSize); @@ -60,7 +62,8 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds) if (m_Verbosity >= 5) { std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank - << ", Writer Rank " << m_WriterRank << ", Step "<< m_CurrentStep << std::endl; + << ", Writer Rank " << m_WriterRank << ", Step " + << m_CurrentStep << std::endl; } return StepStatus::OK; @@ -186,7 +189,8 @@ void SscWriter::SyncMpiPattern() << ", Writer Rank " << m_WriterRank << std::endl; } - m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) ); + m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, + m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm)); m_MpiHandshake.Wait(m_Name); m_MpiHandshake.PrintMaps(); @@ -206,9 +210,12 @@ void SscWriter::SyncMpiPattern() } } + m_MpiHandshake.Finalize(); + MPI_Group worldGroup; MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); - MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), &m_MpiAllReadersGroup); + MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), + &m_MpiAllReadersGroup); } void SscWriter::SyncWritePattern() @@ -473,7 +480,6 @@ void SscWriter::DoClose(const int transportIndex) } MPI_Win_free(&m_MpiWin); - m_MpiHandshake.Finalize(); } } // end namespace engine diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index 7fd4e929fe..e78354b025 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -13,8 +13,8 @@ #include "SscHelper.h" #include "adios2/core/Engine.h" -#include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include "adios2/helper/adiosMpiHandshake.h" +#include "adios2/toolkit/profiling/taustubs/tautimer.hpp" #include #include diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index 645ca93166..bf5847f4b8 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -5,8 +5,8 @@ * adiosMpiHandshake.cpp */ -#include #include "adiosMpiHandshake.h" +#include namespace adios2 { @@ -26,11 +26,12 @@ int MpiHandshake::m_WorldRank; int MpiHandshake::m_LocalSize; int MpiHandshake::m_LocalRank; int MpiHandshake::m_LocalMasterRank; -std::map>> MpiHandshake::m_WritersMap; -std::map>> MpiHandshake::m_ReadersMap; +std::map>> + MpiHandshake::m_WritersMap; +std::map>> + MpiHandshake::m_ReadersMap; std::map MpiHandshake::m_AppsSize; - size_t MpiHandshake::PlaceInBuffer(size_t stream, int rank) { return rank * m_MaxStreamsPerApp * m_ItemSize + stream * m_ItemSize; @@ -41,50 +42,52 @@ void MpiHandshake::Test() int success; MPI_Status status; - for(int rank=0; rank(m_Buffer.data() + offset)[0]; + int appMasterRank = + reinterpret_cast(m_Buffer.data() + offset)[0]; offset += sizeof(int); - int appSize = reinterpret_cast(m_Buffer.data() + offset)[0]; + int appSize = + reinterpret_cast(m_Buffer.data() + offset)[0]; offset += sizeof(int); std::string filename = m_Buffer.data() + offset; m_AppsSize[appMasterRank] = appSize; - if(mode == 'w') + if (mode == 'w') { auto &ranks = m_WritersMap[filename][appMasterRank]; bool existed = false; - for(const auto r : ranks) + for (const auto r : ranks) { - if(r == rank) + if (r == rank) { existed = true; } } - if(not existed) + if (not existed) { ranks.push_back(rank); } } - else if(mode == 'r') + else if (mode == 'r') { auto &ranks = m_ReadersMap[filename][appMasterRank]; bool existed = false; - for(const auto r : ranks) + for (const auto r : ranks) { - if(r == rank) + if (r == rank) { existed = true; } } - if(not existed) + if (not existed) { ranks.push_back(rank); } @@ -98,14 +101,15 @@ bool MpiHandshake::Check(const std::string &filename) { Test(); - if(m_WritersMap[filename].size() + m_ReadersMap[filename].size() != m_AppsForStreams[filename]) + if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() != + m_AppsForStreams[filename]) { return false; } for (const auto &app : m_WritersMap[filename]) { - if(app.second.size() != m_AppsSize[app.first]) + if (app.second.size() != m_AppsSize[app.first]) { return false; } @@ -113,7 +117,7 @@ bool MpiHandshake::Check(const std::string &filename) for (const auto &app : m_ReadersMap[filename]) { - if(app.second.size() != m_AppsSize[app.first]) + if (app.second.size() != m_AppsSize[app.first]) { return false; } @@ -122,11 +126,14 @@ bool MpiHandshake::Check(const std::string &filename) return true; } -void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm) +void MpiHandshake::Start(const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t appsForThisStream, const char mode, + const std::string &filename, MPI_Comm localComm) { m_AppsForStreams[filename] = appsForThisStream; - if(m_StreamID == 0) + if (m_StreamID == 0) { MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize); MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank); @@ -136,7 +143,7 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename m_MaxFilenameLength = maxFilenameLength; m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2; - if(m_LocalRank == 0) + if (m_LocalRank == 0) { m_LocalMasterRank = m_WorldRank; } @@ -145,7 +152,7 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename m_SendRequests.resize(m_WorldSize); m_RecvRequests.resize(m_WorldSize); - for(int i=0; i maxFilenameLength) + if (filename.size() > maxFilenameLength) { throw(std::runtime_error("Filename too long")); } @@ -177,29 +186,31 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename offset += sizeof(int); std::memcpy(buffer.data() + offset, filename.data(), filename.size()); - for(int i=0; i> & MpiHandshake::GetWriterMap(const std::string &filename) +const std::map> & +MpiHandshake::GetWriterMap(const std::string &filename) { return m_WritersMap[filename]; } -const std::map> & MpiHandshake::GetReaderMap(const std::string &filename) +const std::map> & +MpiHandshake::GetReaderMap(const std::string &filename) { return m_ReadersMap[filename]; } @@ -207,16 +218,16 @@ const std::map> & MpiHandshake::GetReaderMap(const std::st void MpiHandshake::Finalize() { --m_StreamID; - if(m_StreamID == 0) + if (m_StreamID == 0) { - for(auto &rs : m_RecvRequests) + for (auto &rs : m_RecvRequests) { - for(auto &r : rs) + for (auto &r : rs) { MPI_Status status; int success; MPI_Test(&r, &success, &status); - if(not success) + if (not success) { MPI_Cancel(&r); } @@ -229,21 +240,23 @@ void MpiHandshake::Finalize() void MpiHandshake::PrintMaps() { - for(int printRank = 0; printRank < m_WorldSize; ++printRank) + for (int printRank = 0; printRank < m_WorldSize; ++printRank) { MPI_Barrier(MPI_COMM_WORLD); - if(m_WorldRank == printRank) + if (m_WorldRank == printRank) { - std::cout << "For rank " << printRank << " ********************* " << std::endl; + std::cout << "For rank " << printRank << " ********************* " + << std::endl; std::cout << "Writers: " << std::endl; for (const auto &stream : m_WritersMap) { std::cout << " Stream " << stream.first << std::endl; for (const auto &app : stream.second) { - std::cout << " App Master Rank " << app.first << std::endl; - std::cout << " "; - for(const auto &rank : app.second) + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) { std::cout << rank << ", "; } @@ -256,9 +269,10 @@ void MpiHandshake::PrintMaps() std::cout << " Stream " << stream.first << std::endl; for (const auto &app : stream.second) { - std::cout << " App Master Rank " << app.first << std::endl; - std::cout << " "; - for(const auto &rank : app.second) + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) { std::cout << rank << ", "; } diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h index 037633363f..ad2b3aafa4 100644 --- a/source/adios2/helper/adiosMpiHandshake.h +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -10,13 +10,13 @@ #include "adios2/common/ADIOSConfig.h" #ifndef ADIOS2_HAVE_MPI -#error "Do not include adiosCommMPI.h without ADIOS2_HAVE_MPI." +#error "Do not include adiosMpiHandshake.h without ADIOS2_HAVE_MPI." #endif -#include #include -#include #include +#include +#include namespace adios2 { @@ -25,33 +25,39 @@ namespace helper class MpiHandshake { - public: - static void Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm); - static void Wait(const std::string &filename); - static void Finalize(); - static const std::map> & GetWriterMap(const std::string &filename); - static const std::map> & GetReaderMap(const std::string &filename); - static void PrintMaps(); - private: - static void Test(); - static bool Check(const std::string &filename); - static size_t PlaceInBuffer(const size_t stream, const int rank); - static std::vector m_Buffer; - static std::vector> m_SendRequests; - static std::vector> m_RecvRequests; - static size_t m_MaxStreamsPerApp; - static size_t m_MaxFilenameLength; - static size_t m_ItemSize; - static std::map m_AppsForStreams; - static size_t m_StreamID; - static int m_WorldSize; - static int m_WorldRank; - static int m_LocalSize; - static int m_LocalRank; - static int m_LocalMasterRank; - static std::map>> m_WritersMap; - static std::map>> m_ReadersMap; - static std::map m_AppsSize; +public: + static void Start(const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t appsForThisStream, const char mode, + const std::string &filename, MPI_Comm localComm); + static void Wait(const std::string &filename); + static void Finalize(); + static const std::map> & + GetWriterMap(const std::string &filename); + static const std::map> & + GetReaderMap(const std::string &filename); + static void PrintMaps(); + +private: + static void Test(); + static bool Check(const std::string &filename); + static size_t PlaceInBuffer(const size_t stream, const int rank); + static std::vector m_Buffer; + static std::vector> m_SendRequests; + static std::vector> m_RecvRequests; + static size_t m_MaxStreamsPerApp; + static size_t m_MaxFilenameLength; + static size_t m_ItemSize; + static std::map m_AppsForStreams; + static size_t m_StreamID; + static int m_WorldSize; + static int m_WorldRank; + static int m_LocalSize; + static int m_LocalRank; + static int m_LocalMasterRank; + static std::map>> m_WritersMap; + static std::map>> m_ReadersMap; + static std::map m_AppsSize; }; } // end namespace helper From 819dada383b10ab048f04bf646eccf2a7799fd4b Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Wed, 4 Mar 2020 20:15:38 -0500 Subject: [PATCH 5/9] added header file --- source/adios2/helper/adiosMpiHandshake.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index bf5847f4b8..3c86c63b42 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -6,6 +6,7 @@ */ #include "adiosMpiHandshake.h" +#include #include namespace adios2 From 12ae522d23b9c3c4fc43fc07bffc00d0618c86f8 Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Thu, 5 Mar 2020 00:10:42 -0500 Subject: [PATCH 6/9] trying to fix issues with windows build --- source/adios2/helper/adiosMpiHandshake.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index 3c86c63b42..8e75102125 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -72,7 +72,7 @@ void MpiHandshake::Test() existed = true; } } - if (not existed) + if (!existed) { ranks.push_back(rank); } @@ -88,7 +88,7 @@ void MpiHandshake::Test() existed = true; } } - if (not existed) + if (!existed) { ranks.push_back(rank); } @@ -199,7 +199,7 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, void MpiHandshake::Wait(const std::string &filename) { bool finished = false; - while (not finished) + while (!finished) { finished = Check(filename); } @@ -228,7 +228,7 @@ void MpiHandshake::Finalize() MPI_Status status; int success; MPI_Test(&r, &success, &status); - if (not success) + if (!success) { MPI_Cancel(&r); } From b8fc7346d308b96650ebcebeffec2450fb4ef1c7 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Thu, 5 Mar 2020 14:46:00 -0500 Subject: [PATCH 7/9] refined MpiHandshake and added timeout --- source/adios2/engine/ssc/SscReader.cpp | 14 +- source/adios2/engine/ssc/SscReader.h | 3 +- source/adios2/engine/ssc/SscWriter.cpp | 14 +- source/adios2/engine/ssc/SscWriter.h | 3 +- source/adios2/helper/adiosMpiHandshake.cpp | 184 ++++++++++----------- source/adios2/helper/adiosMpiHandshake.h | 14 +- 6 files changed, 114 insertions(+), 118 deletions(-) diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 1cdbc0fd76..5de16fd3fd 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -38,8 +38,9 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, m_MaxFilenameLength); ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", - m_RendezvousStreamCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", + m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); m_Buffer.resize(1); @@ -191,10 +192,9 @@ void SscReader::SyncMpiPattern() << ", Reader Rank " << m_ReaderRank << std::endl; } - m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, - m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm)); - m_MpiHandshake.Wait(m_Name); - m_MpiHandshake.PrintMaps(); + m_MpiHandshake.Handshake(m_Name, 'r', m_OpenTimeoutSecs, m_MaxStreamsPerApp, + m_MaxFilenameLength, m_RendezvousAppCount, + CommAsMPI(m_Comm)); for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { @@ -212,8 +212,6 @@ void SscReader::SyncMpiPattern() } } - m_MpiHandshake.Finalize(); - MPI_Group worldGroup; MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(), diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 23446787ae..4690bb19c7 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -102,8 +102,9 @@ class SscReader : public Engine int m_Verbosity = 0; int m_MaxFilenameLength = 128; - int m_RendezvousStreamCount = 1; + int m_MaxStreamsPerApp = 1; int m_RendezvousAppCount = 2; + int m_OpenTimeoutSecs = 10; }; } // end namespace engine diff --git a/source/adios2/engine/ssc/SscWriter.cpp b/source/adios2/engine/ssc/SscWriter.cpp index 5aca28dee4..3e4946c249 100644 --- a/source/adios2/engine/ssc/SscWriter.cpp +++ b/source/adios2/engine/ssc/SscWriter.cpp @@ -37,8 +37,9 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode, m_MaxFilenameLength); ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount); - ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", - m_RendezvousStreamCount); + ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", + m_MaxStreamsPerApp); + ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); m_GlobalWritePattern.resize(m_WorldSize); m_GlobalReadPattern.resize(m_WorldSize); @@ -189,10 +190,9 @@ void SscWriter::SyncMpiPattern() << ", Writer Rank " << m_WriterRank << std::endl; } - m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, - m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm)); - m_MpiHandshake.Wait(m_Name); - m_MpiHandshake.PrintMaps(); + m_MpiHandshake.Handshake(m_Name, 'w', m_OpenTimeoutSecs, m_MaxStreamsPerApp, + m_MaxFilenameLength, m_RendezvousAppCount, + CommAsMPI(m_Comm)); for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name)) { @@ -210,8 +210,6 @@ void SscWriter::SyncMpiPattern() } } - m_MpiHandshake.Finalize(); - MPI_Group worldGroup; MPI_Comm_group(MPI_COMM_WORLD, &worldGroup); MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), diff --git a/source/adios2/engine/ssc/SscWriter.h b/source/adios2/engine/ssc/SscWriter.h index e78354b025..531ddfba34 100644 --- a/source/adios2/engine/ssc/SscWriter.h +++ b/source/adios2/engine/ssc/SscWriter.h @@ -92,8 +92,9 @@ class SscWriter : public Engine int m_Verbosity = 0; int m_MaxFilenameLength = 128; - int m_RendezvousStreamCount = 1; + int m_MaxStreamsPerApp = 1; int m_RendezvousAppCount = 2; + int m_OpenTimeoutSecs = 10; }; } // end namespace engine diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index 8e75102125..946780368c 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -6,6 +6,8 @@ */ #include "adiosMpiHandshake.h" +#include +#include #include #include @@ -20,7 +22,7 @@ std::vector> MpiHandshake::m_RecvRequests; size_t MpiHandshake::m_MaxStreamsPerApp; size_t MpiHandshake::m_MaxFilenameLength; size_t MpiHandshake::m_ItemSize; -std::map MpiHandshake::m_AppsForStreams; +std::map MpiHandshake::m_RendezvousAppCounts; size_t MpiHandshake::m_StreamID = 0; int MpiHandshake::m_WorldSize; int MpiHandshake::m_WorldRank; @@ -64,15 +66,8 @@ void MpiHandshake::Test() if (mode == 'w') { auto &ranks = m_WritersMap[filename][appMasterRank]; - bool existed = false; - for (const auto r : ranks) - { - if (r == rank) - { - existed = true; - } - } - if (!existed) + if (std::find(ranks.begin(), ranks.end(), rank) == + ranks.end()) { ranks.push_back(rank); } @@ -80,15 +75,8 @@ void MpiHandshake::Test() else if (mode == 'r') { auto &ranks = m_ReadersMap[filename][appMasterRank]; - bool existed = false; - for (const auto r : ranks) - { - if (r == rank) - { - existed = true; - } - } - if (!existed) + if (std::find(ranks.begin(), ranks.end(), rank) == + ranks.end()) { ranks.push_back(rank); } @@ -102,12 +90,16 @@ bool MpiHandshake::Check(const std::string &filename) { Test(); + // check if RendezvousAppCount reached + if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() != - m_AppsForStreams[filename]) + m_RendezvousAppCounts[filename]) { return false; } + // check if all ranks' info is received + for (const auto &app : m_WritersMap[filename]) { if (app.second.size() != m_AppsSize[app.first]) @@ -127,56 +119,62 @@ bool MpiHandshake::Check(const std::string &filename) return true; } -void MpiHandshake::Start(const size_t maxStreamsPerApp, - const size_t maxFilenameLength, - const size_t appsForThisStream, const char mode, - const std::string &filename, MPI_Comm localComm) +void MpiHandshake::Handshake(const std::string &filename, const char mode, + const int timeoutSeconds, + const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t rendezvousAppCountForStream, + MPI_Comm localComm) { - m_AppsForStreams[filename] = appsForThisStream; - if (m_StreamID == 0) + // initialize variables + + if (filename.size() > maxFilenameLength) { - MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize); - MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank); - MPI_Comm_size(localComm, &m_LocalSize); - MPI_Comm_rank(localComm, &m_LocalRank); - m_MaxStreamsPerApp = maxStreamsPerApp; - m_MaxFilenameLength = maxFilenameLength; - m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2; - - if (m_LocalRank == 0) - { - m_LocalMasterRank = m_WorldRank; - } + throw(std::runtime_error("Filename too long")); + } - MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm); + MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize); + MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank); + MPI_Comm_size(localComm, &m_LocalSize); + MPI_Comm_rank(localComm, &m_LocalRank); + m_MaxStreamsPerApp = maxStreamsPerApp; + m_MaxFilenameLength = maxFilenameLength; + m_RendezvousAppCounts[filename] = rendezvousAppCountForStream; - m_SendRequests.resize(m_WorldSize); - m_RecvRequests.resize(m_WorldSize); - for (int i = 0; i < m_WorldSize; ++i) - { - m_SendRequests[i].resize(maxStreamsPerApp); - m_RecvRequests[i].resize(maxStreamsPerApp); - } + m_SendRequests.resize(m_WorldSize); + m_RecvRequests.resize(m_WorldSize); + for (int rank = 0; rank < m_WorldSize; ++rank) + { + m_SendRequests[rank].resize(maxStreamsPerApp); + m_RecvRequests[rank].resize(maxStreamsPerApp); + } - size_t bufferSize = m_WorldSize * maxStreamsPerApp * m_ItemSize; - m_Buffer.resize(bufferSize); + m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2; + m_Buffer.resize(m_WorldSize * maxStreamsPerApp * m_ItemSize); - for (int rank = 0; rank < m_WorldSize; ++rank) - { - for (size_t stream = 0; stream < maxStreamsPerApp; ++stream) - { - MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank), - m_ItemSize, MPI_CHAR, rank, rank, MPI_COMM_WORLD, - &m_RecvRequests[rank][stream]); - } - } + // broadcast local master rank's world rank to use as app ID + + if (m_LocalRank == 0) + { + m_LocalMasterRank = m_WorldRank; } + MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm); - if (filename.size() > maxFilenameLength) + // start receiving + + for (int rank = 0; rank < m_WorldSize; ++rank) { - throw(std::runtime_error("Filename too long")); + for (size_t stream = 0; stream < maxStreamsPerApp; ++stream) + { + MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank), m_ItemSize, + MPI_CHAR, rank, rank, MPI_COMM_WORLD, + &m_RecvRequests[rank][stream]); + } } + + // start sending + size_t offset = 0; std::vector buffer(m_ItemSize); std::memcpy(buffer.data(), &mode, sizeof(char)); @@ -187,22 +185,44 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, offset += sizeof(int); std::memcpy(buffer.data() + offset, filename.data(), filename.size()); - for (int i = 0; i < m_WorldSize; ++i) + for (int rank = 0; rank < m_WorldSize; ++rank) { - MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, i, m_WorldRank, - MPI_COMM_WORLD, &m_SendRequests[i][m_StreamID]); + MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, rank, m_WorldRank, + MPI_COMM_WORLD, &m_SendRequests[rank][m_StreamID]); } - ++m_StreamID; -} + // wait and check if required RendezvousAppCount reached -void MpiHandshake::Wait(const std::string &filename) -{ - bool finished = false; - while (!finished) + auto start_time = std::chrono::system_clock::now(); + while (!Check(filename)) { - finished = Check(filename); + auto now_time = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast( + now_time - start_time); + if (duration.count() > timeoutSeconds) + { + throw(std::runtime_error("Mpi handshake timeout")); + } } + + // clean up MPI requests + + for (auto &rs : m_RecvRequests) + { + for (auto &r : rs) + { + MPI_Status status; + int success; + MPI_Test(&r, &success, &status); + if (!success) + { + MPI_Cancel(&r); + } + } + } + m_RecvRequests.clear(); + + ++m_StreamID; } const std::map> & @@ -216,29 +236,6 @@ MpiHandshake::GetReaderMap(const std::string &filename) return m_ReadersMap[filename]; } -void MpiHandshake::Finalize() -{ - --m_StreamID; - if (m_StreamID == 0) - { - for (auto &rs : m_RecvRequests) - { - for (auto &r : rs) - { - MPI_Status status; - int success; - MPI_Test(&r, &success, &status); - if (!success) - { - MPI_Cancel(&r); - } - } - } - m_RecvRequests.clear(); - m_SendRequests.clear(); - } -} - void MpiHandshake::PrintMaps() { for (int printRank = 0; printRank < m_WorldSize; ++printRank) @@ -246,7 +243,8 @@ void MpiHandshake::PrintMaps() MPI_Barrier(MPI_COMM_WORLD); if (m_WorldRank == printRank) { - std::cout << "For rank " << printRank << " ********************* " + std::cout << "For rank " << printRank + << "============================================" << std::endl; std::cout << "Writers: " << std::endl; for (const auto &stream : m_WritersMap) diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h index ad2b3aafa4..42c56e6b37 100644 --- a/source/adios2/helper/adiosMpiHandshake.h +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -26,12 +26,12 @@ namespace helper class MpiHandshake { public: - static void Start(const size_t maxStreamsPerApp, - const size_t maxFilenameLength, - const size_t appsForThisStream, const char mode, - const std::string &filename, MPI_Comm localComm); - static void Wait(const std::string &filename); - static void Finalize(); + static void Handshake(const std::string &filename, const char mode, + const int timeoutSeconds, + const size_t maxStreamsPerApp, + const size_t maxFilenameLength, + const size_t RendezvousAppCountForStream, + MPI_Comm localComm); static const std::map> & GetWriterMap(const std::string &filename); static const std::map> & @@ -48,7 +48,7 @@ class MpiHandshake static size_t m_MaxStreamsPerApp; static size_t m_MaxFilenameLength; static size_t m_ItemSize; - static std::map m_AppsForStreams; + static std::map m_RendezvousAppCounts; static size_t m_StreamID; static int m_WorldSize; static int m_WorldRank; From efe04da7f69d590c54aff77db2ec3d234356afc9 Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Fri, 6 Mar 2020 12:28:56 -0500 Subject: [PATCH 8/9] added sleep --- source/adios2/helper/adiosMpiHandshake.cpp | 8 +++-- source/adios2/helper/adiosMpiHandshake.h | 38 +++++++++++++++++++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index 946780368c..23da4126d6 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -10,6 +10,7 @@ #include #include #include +#include namespace adios2 { @@ -193,12 +194,13 @@ void MpiHandshake::Handshake(const std::string &filename, const char mode, // wait and check if required RendezvousAppCount reached - auto start_time = std::chrono::system_clock::now(); + auto startTime = std::chrono::system_clock::now(); while (!Check(filename)) { - auto now_time = std::chrono::system_clock::now(); + std::this_thread::sleep_for(std::chrono::microseconds(100)); + auto nowTime = std::chrono::system_clock::now(); auto duration = std::chrono::duration_cast( - now_time - start_time); + nowTime - startTime); if (duration.count() > timeoutSeconds) { throw(std::runtime_error("Mpi handshake timeout")); diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h index 42c56e6b37..eaea203828 100644 --- a/source/adios2/helper/adiosMpiHandshake.h +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -26,12 +26,48 @@ namespace helper class MpiHandshake { public: + /** + * Start the handshake operations and wait until the rendezvous conditions + * are reached, or timeout. + * + * @param filename: name of the staging stream, must be within the length of + * maxFilenameLength + * + * @param mode: 'r' or 'w', read or write + * + * @param timeoutSeconds: timeout for the handshake, will throw exception + * when reaching this timeout + * + * @param maxStreamsPerApp: the maximum number of streams that all apps + * sharing this MPI_COMM_WORLD can possibly open. It is required that this + * number is consistent across all ranks. This is used for pre-allocating + * the vectors holding MPI requests and must be specified correctly, + * otherwise strange errors could occur. This class does not provide any + * mechanism to check whether this number being passed is actually correct + * or not accross all ranks, because implementing this logic for an + * arbitrary communication pattern is overly expensive, if not impossible. + * + * @param maxFilenameLength: the maximum possible length of filename that + * all apps sharing this MPI_COMM_WORLD could possibly define. It is + * required that this number is consistent across all ranks. This is used + * for pre-allocating the buffer for aggregating the global MPI information. + * An exception will be thrown if any filename on any rank is found to be + * longer than this. + * + * @param rendezvousAppCountForStream: the number of apps, including both + * writers and readers, that will work on this stream. The function will + * block until it receives the MPI handshake information from all these + * apps, or until timeoutSeconds is passed. + * + * @param localComm: local MPI communicator for the app + */ static void Handshake(const std::string &filename, const char mode, const int timeoutSeconds, const size_t maxStreamsPerApp, const size_t maxFilenameLength, - const size_t RendezvousAppCountForStream, + const size_t rendezvousAppCountForStream, MPI_Comm localComm); + static const std::map> & GetWriterMap(const std::string &filename); static const std::map> & From 70baa4d59f23d17289c70e8206d37e2aa522d20d Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Fri, 6 Mar 2020 15:22:53 -0500 Subject: [PATCH 9/9] modified test parameters for the new handshake mechanism --- source/adios2/helper/adiosMpiHandshake.cpp | 38 +++++++++++++++++++ source/adios2/helper/adiosMpiHandshake.h | 1 + testing/adios2/engine/ssc/TestSscMultiApp.cpp | 2 +- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/source/adios2/helper/adiosMpiHandshake.cpp b/source/adios2/helper/adiosMpiHandshake.cpp index 23da4126d6..4e8f5c14c2 100644 --- a/source/adios2/helper/adiosMpiHandshake.cpp +++ b/source/adios2/helper/adiosMpiHandshake.cpp @@ -238,6 +238,44 @@ MpiHandshake::GetReaderMap(const std::string &filename) return m_ReadersMap[filename]; } +void MpiHandshake::PrintMaps(const int printRank) +{ + if (m_WorldRank == printRank) + { + std::cout << "Writers: " << std::endl; + for (const auto &stream : m_WritersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + std::cout << "Readers: " << std::endl; + for (const auto &stream : m_ReadersMap) + { + std::cout << " Stream " << stream.first << std::endl; + for (const auto &app : stream.second) + { + std::cout << " App Master Rank " << app.first + << std::endl; + std::cout << " "; + for (const auto &rank : app.second) + { + std::cout << rank << ", "; + } + std::cout << std::endl; + } + } + } +} void MpiHandshake::PrintMaps() { for (int printRank = 0; printRank < m_WorldSize; ++printRank) diff --git a/source/adios2/helper/adiosMpiHandshake.h b/source/adios2/helper/adiosMpiHandshake.h index eaea203828..9790f60b98 100644 --- a/source/adios2/helper/adiosMpiHandshake.h +++ b/source/adios2/helper/adiosMpiHandshake.h @@ -73,6 +73,7 @@ class MpiHandshake static const std::map> & GetReaderMap(const std::string &filename); static void PrintMaps(); + static void PrintMaps(const int printRank); private: static void Test(); diff --git a/testing/adios2/engine/ssc/TestSscMultiApp.cpp b/testing/adios2/engine/ssc/TestSscMultiApp.cpp index 422e62513b..3101c758d1 100644 --- a/testing/adios2/engine/ssc/TestSscMultiApp.cpp +++ b/testing/adios2/engine/ssc/TestSscMultiApp.cpp @@ -500,7 +500,7 @@ void Reader2(const Dims &shape, const Dims &start, const Dims &count, TEST_F(SscEngineTest, TestSscMultiApp) { std::string filename = "TestSscMultiApp"; - adios2::Params engineParams = {}; + adios2::Params engineParams = {{"RendezvousAppCount", "4"}}; int worldRank, worldSize; Dims start, count, shape;