From 0baf09f5f125a64431e6e69722327b8a1fd65e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Fri, 31 May 2024 20:04:06 +0200 Subject: [PATCH] Add rank table for locality-aware streaming (#1505) * Python bindings: Extract MPI adaptor to Mpi.hpp * Auxiliary: MPI helpers * Remove unnecessary MPI helper * Some fixes in HDF5 createDataset * Series.(get)mpiRanksMetaInfo, hosttable * Python bindings: mpi_ranks_meta_info * Python bindings for HOST_INFO * Test chunk table * CI fixes: Windows * SerialIOTests: Windows compatibility Initialize Winsock API * CI fixes: NVIDIA * Add review suggestions * Make hostname() implementation explicit to the user * Remove Winsocks functionality * rank_table: "hostname": pick mpi impl if parallel series * Extend verbose mode a bit * Initialize rank_table in parallel for HDF5 * [wip] fix open/close file issue * Extended testing * Reduce support for file-based encoding * Rename mpiRanksMetaInfo() -> rankTable() * Fix unused parameter warning * Documentation * Add line in docs * CI fix * Test writing of MPI rank table * Fixes after rebase --- CMakeLists.txt | 1 + docs/source/details/backendconfig.rst | 4 + docs/source/details/mpi.rst | 8 +- docs/source/usage/streaming.rst | 27 +++ include/openPMD/ChunkInfo.hpp | 60 +++++ include/openPMD/ChunkInfo_internal.hpp | 67 ++++++ include/openPMD/Series.hpp | 58 +++++ include/openPMD/auxiliary/Mpi.hpp | 50 +++++ include/openPMD/binding/python/Mpi.hpp | 100 +++++++++ src/ChunkInfo.cpp | 117 ++++++++++ src/IO/AbstractIOHandlerImpl.cpp | 63 +++++- src/IO/HDF5/HDF5IOHandler.cpp | 13 +- src/Iteration.cpp | 9 + src/Series.cpp | 290 ++++++++++++++++++++++++- src/auxiliary/Mpi.cpp | 113 ++++++++++ src/binding/python/ChunkInfo.cpp | 27 +++ src/binding/python/Series.cpp | 85 +------- test/ParallelIOTest.cpp | 92 +++++++- test/SerialIOTest.cpp | 78 ++++++- 19 files changed, 1171 insertions(+), 91 deletions(-) create mode 100644 include/openPMD/ChunkInfo_internal.hpp create mode 100644 include/openPMD/binding/python/Mpi.hpp create mode 100644 src/auxiliary/Mpi.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ef3f77bf9d..232c97b3ca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -460,6 +460,7 @@ set(CORE_SOURCE src/auxiliary/Date.cpp src/auxiliary/Filesystem.cpp src/auxiliary/JSON.cpp + src/auxiliary/Mpi.cpp src/backend/Attributable.cpp src/backend/BaseRecordComponent.cpp src/backend/MeshRecordComponent.cpp diff --git a/docs/source/details/backendconfig.rst b/docs/source/details/backendconfig.rst index c8d82a2bc3..47c6e2d63b 100644 --- a/docs/source/details/backendconfig.rst +++ b/docs/source/details/backendconfig.rst @@ -77,6 +77,8 @@ For a consistent user interface, backends shall follow the following rules: Backend-independent JSON configuration -------------------------------------- +.. _backend_independent_config: + The openPMD backend can be chosen via the JSON/TOML key ``backend`` which recognizes the alternatives ``["hdf5", "adios2", "json"]``. The iteration encoding can be chosen via the JSON/TOML key ``iteration_encoding`` which recognizes the alternatives ``["file_based", "group_based", "variable_based"]``. @@ -97,6 +99,8 @@ It if set to ``{"resizable": true}``, this declares that it shall be allowed to For HDF5, resizable Datasets come with a performance penalty. For JSON and ADIOS2, all datasets are resizable, independent of this option. +The key ``rank_table`` allows specifying the creation of a **rank table**, used for tracking :ref:`chunk provenance especially in streaming setups `, refer to the streaming documentation for details. + Configuration Structure per Backend ----------------------------------- diff --git a/docs/source/details/mpi.rst b/docs/source/details/mpi.rst index ea4ec0551e..f59a3b0aa7 100644 --- a/docs/source/details/mpi.rst +++ b/docs/source/details/mpi.rst @@ -13,11 +13,13 @@ A **collective** operation needs to be executed by *all* MPI ranks of the MPI co Contrarily, **independent** operations can also be called by a subset of these MPI ranks. For more information, please see the `MPI standard documents `_, for example MPI-3.1 in `"Section 2.4 - Semantic Terms" `_. -============================ ================== =========================== +============================ ================== ================================ Functionality Behavior Description -============================ ================== =========================== +============================ ================== ================================ ``Series`` **collective** open and close ``::flush()`` **collective** read and write +``::setRankTable()`` **collective** write, performed at flush +``::rankTable()`` **coll**/indep. behavior specified by bool param ``Iteration`` [1]_ independent declare and open ``::open()`` [4]_ **collective** explicit open ``Mesh`` [1]_ independent declare, open, write @@ -30,7 +32,7 @@ Functionality Behavior Description ``::storeChunk`` [1]_ independent write ``::loadChunk`` independent read ``::availableChunks`` [4]_ collective read, immediate result -============================ ================== =========================== +============================ ================== ================================ .. [1] Individual backends, i.e. :ref:`parallel HDF5 `, will only support independent operations if the default, non-collective (aka independent) behavior is kept. Otherwise these operations are collective. diff --git a/docs/source/usage/streaming.rst b/docs/source/usage/streaming.rst index d70b929389..118e3a6e9d 100644 --- a/docs/source/usage/streaming.rst +++ b/docs/source/usage/streaming.rst @@ -95,3 +95,30 @@ This pays tribute to the fact that in streaming mode, an iteration is sent to th .. literalinclude:: 10_streaming_write.py :language: python3 + + +Chunk provenance tracking using a rank table +-------------------------------------------- + +.. _rank_table: + +In a large parallel streaming setup, it is important to adhere to a certain concept of data locality when deciding which data to load from the producer. +The openPMD-api has some mechanisms to help with this process: + +The API call ``BaseRecordComponent::availableChunks()``/``Base_Record_Component.available_chunks()`` returns the data chunks within a specific dataset that are available for loading, each chunk hereby annotating its MPI rank within the *data producer* in ``WrittenChunkInfo::sourceID``/``WrittenChunkInfo::source_ID``. + +In order to correlate this information with the MPI ranks of the *data consumer*, a **rank table** can be used in order to transmit an additional tag for each of the producer's MPI ranks. On the data producer side, the rank table can be set manually or automatically: + + +* **automatically** Using the :ref:`JSON/TOML option ` ``rank_table``. + The suggested specification is ``{"rank_table": "hostname"}``, although the explicit values ``"mpi_processor_name"`` and ``"posix_hostname"`` are also accepted. + ``"hostname"`` resolves to the MPI processor name when the Series has been initialized with MPI, to the POSIX hostname otherwise (if that is available). +* **manually:** Using the API call ``Series::setRankTable(std::string const &myRankInfo)`` that specifies the current rank's tag. + This can be used to set custom tags, identifying e.g. NUMA nodes or groups of compute nodes. + +The rank table takes the form of a 2-dimensional dataset, listing the tags as null-terminated strings line by line in order of the MPI ranks and can be loaded using ``Series::rankTable()``/``Series.get_rank_table()``. + +Setting the rank table is **collective**, though the collective action is only performed upon flushing. +Reading the rank table requires specifying if the read operation should be done collectively (better for performance), or independently. + +In order to retrieve the corresponding information on the **consumer side**, the function ``host_info::byMethod()``/``HostInfo.get()`` can be used for retrieving the local rank's information, or alternatively ``host_info::byMethodCollective()``/``HostInfo.get_info()`` for retrieving the rank table for all consumer ranks. diff --git a/include/openPMD/ChunkInfo.hpp b/include/openPMD/ChunkInfo.hpp index 5be3c27b56..9bc6e94972 100644 --- a/include/openPMD/ChunkInfo.hpp +++ b/include/openPMD/ChunkInfo.hpp @@ -20,8 +20,16 @@ */ #pragma once +#include "openPMD/config.hpp" + #include "openPMD/Dataset.hpp" // Offset, Extent +#if openPMD_HAVE_MPI +#include +#endif + +#include +#include #include namespace openPMD @@ -73,4 +81,56 @@ struct WrittenChunkInfo : ChunkInfo }; using ChunkTable = std::vector; + +namespace chunk_assignment +{ + using RankMeta = std::map; +} // namespace chunk_assignment + +namespace host_info +{ + /** + * Methods for retrieving hostname / processor identifiers that openPMD-api + * is aware of. These can be used for locality-aware chunk distribution + * schemes in streaming setups. + */ + enum class Method + { + POSIX_HOSTNAME, + MPI_PROCESSOR_NAME + }; + + /** + * @brief Is the method available on the current system? + * + * @return true If it is available. + * @return false Otherwise. + */ + bool methodAvailable(Method); + + /** + * @brief Wrapper for the native hostname retrieval functions such as + * POSIX gethostname(). + * + * @return std::string The hostname / processor name returned by the native + * function. + */ + std::string byMethod(Method); + +#if openPMD_HAVE_MPI + /** + * @brief Retrieve the hostname information on all MPI ranks and distribute + * a map of "rank -> hostname" to all ranks. + * + * This call is MPI collective. + * + * @return chunk_assignment::RankMeta Hostname / processor name information + * for all MPI ranks known to the communicator. + * The result is returned on all ranks. + */ + chunk_assignment::RankMeta byMethodCollective(MPI_Comm, Method); +#endif +} // namespace host_info } // namespace openPMD + +#undef openPMD_POSIX_AVAILABLE diff --git a/include/openPMD/ChunkInfo_internal.hpp b/include/openPMD/ChunkInfo_internal.hpp new file mode 100644 index 0000000000..b14ff0f7ad --- /dev/null +++ b/include/openPMD/ChunkInfo_internal.hpp @@ -0,0 +1,67 @@ +/* Copyright 2024 Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ +#pragma once + +#include "openPMD/ChunkInfo.hpp" +#include + +namespace openPMD::host_info +{ + +/** + * @brief This defines the method identifiers used + * in `{"rank_table": "hostname"}` + * + * Currently recognized are: + * + * * posix_hostname + * * mpi_processor_name + * + * For backwards compatibility reasons, "hostname" is also recognized as a + * deprecated alternative for "posix_hostname". + * + * @return Method enum identifier. The identifier is returned even if the + * method is not available on the system. This should by checked + * via methodAvailable(). + * @throws std::out_of_range If an unknown string identifier is passed. + */ +Method methodFromStringDescription(std::string const &descr, bool consider_mpi); + +/* + * The following block contains one wrapper for each native hostname + * retrieval method. The purpose is to have the same function pointer type + * for all of them. + */ + +#ifdef _WIN32 +#define openPMD_POSIX_AVAILABLE false +#else +#define openPMD_POSIX_AVAILABLE true +#endif + +#if openPMD_POSIX_AVAILABLE +std::string posix_hostname(); +#endif + +#if openPMD_HAVE_MPI +std::string mpi_processor_name(); +#endif +} // namespace openPMD::host_info diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 7f4306ad07..4a3417d149 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -49,6 +49,7 @@ #include #include #include +#include // expose private and protected members for invasive testing #ifndef OPENPMD_private @@ -201,6 +202,36 @@ namespace internal m_deferred_initialization = std::nullopt; void close(); + +#if openPMD_HAVE_MPI + /* + * @todo Once we have separate MPI headers, move this there. + */ + std::optional m_communicator; +#endif + + struct NoSourceSpecified + {}; + struct SourceSpecifiedViaJSON + { + std::string value; + }; + struct SourceSpecifiedManually + { + std::string value; + }; + + struct RankTableData + { + Attributable m_attributable; + std::variant< + NoSourceSpecified, + SourceSpecifiedViaJSON, + SourceSpecifiedManually> + m_rankTableSource; + std::optional m_bufferedRead; + }; + RankTableData m_rankTable; }; // SeriesData class SeriesInternal; @@ -388,6 +419,32 @@ class Series : public Attributable */ Series &setMeshesPath(std::string const &meshesPath); + /** + * @throw no_such_attribute_error If optional attribute is not present. + * @param collective Run this read operation collectively. + There might be an enormous IO overhead if running this + operation non-collectively. + To make this explicit to users, there is no default parameter. + Parameter is ignored if compiling without MPI support, (it is + present for the sake of a consistent API). + * @return Vector with a String per (writing) MPI rank, indicating user- + * defined meta information per rank. Example: host name. + */ +#if openPMD_HAVE_MPI + chunk_assignment::RankMeta rankTable(bool collective); +#else + chunk_assignment::RankMeta rankTable(bool collective = false); +#endif + + /** + * @brief Set the Mpi Ranks Meta Info attribute, i.e. a Vector with + * a String per (writing) MPI rank, indicating user- + * defined meta information per rank. Example: host name. + * + * @return Reference to modified series. + */ + Series &setRankTable(std::string const &myRankInfo); + /** * @throw no_such_attribute_error If optional attribute is not present. * @return String representing the path to particle species, relative(!) to @@ -745,6 +802,7 @@ OPENPMD_private bool flushIOHandler = true); void flushMeshesPath(); void flushParticlesPath(); + void flushRankTable(); void readFileBased(); void readOneIterationFileBased(std::string const &filePath); /** diff --git a/include/openPMD/auxiliary/Mpi.hpp b/include/openPMD/auxiliary/Mpi.hpp index 940ec026a3..f8eefe0cc5 100644 --- a/include/openPMD/auxiliary/Mpi.hpp +++ b/include/openPMD/auxiliary/Mpi.hpp @@ -26,6 +26,9 @@ #if openPMD_HAVE_MPI #include + +#include +#include #endif #include @@ -64,5 +67,52 @@ namespace } } // namespace +/** + * Multiple variable-length strings represented in one single buffer + * with a fixed line width. + * Strings smaller than the maximum width are padded with zeros. + * Each line is zero-terminated with at least one zero character. + * The length of char_buffer should be equal to the product of line_length + * and num_lines. + */ +struct StringMatrix +{ + std::vector char_buffer; + size_t line_length = 0; + size_t num_lines = 0; +}; + +/* + * These are mostly internal helper functions, so this defines only those that + * we need. + * Logically, these should be complemented by `collectStringsTo()` and + * `distributeStringsAsMatrixToAllRanks()`, but we don't need them (yet). + */ + +/** + * @brief Collect multiple variable-length strings to one rank in MPI_Gatherv + * fashion. Uses two collective MPI calls, the first to gather the + * different string lengths, the second to gather the actual strings. + * + * @param communicator MPI communicator + * @param destRank Target rank for MPI_Gatherv + * @param thisRankString The current MPI rank's contribution to the data. + * @return StringMatrix See documentation of StringMatrix struct. + */ +StringMatrix collectStringsAsMatrixTo( + MPI_Comm communicator, int destRank, std::string const &thisRankString); + +/** + * @brief Collect multiple variable-length strings to all ranks in + * MPI_Allgatherv fashion. Uses two collective MPI calls, the first to + * gather the different string lengths, the second to gather the actual + * strings. + * + * @param communicator communicator + * @param thisRankString The current MPI rank's contribution to the data. + * @return std::vector All ranks' strings, returned on all ranks. + */ +std::vector distributeStringsToAllRanks( + MPI_Comm communicator, std::string const &thisRankString); #endif } // namespace openPMD::auxiliary diff --git a/include/openPMD/binding/python/Mpi.hpp b/include/openPMD/binding/python/Mpi.hpp new file mode 100644 index 0000000000..dc110e0ca1 --- /dev/null +++ b/include/openPMD/binding/python/Mpi.hpp @@ -0,0 +1,100 @@ +/* Copyright 2021 Axel Huebl and Franz Poeschel + * + * This file is part of openPMD-api. + * + * openPMD-api is free software: you can redistribute it and/or modify + * it under the terms of of either the GNU General Public License or + * the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * openPMD-api is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License and the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License + * and the GNU Lesser General Public License along with openPMD-api. + * If not, see . + */ + +#pragma once + +#include "openPMD/config.hpp" + +#if openPMD_HAVE_MPI + +#include "openPMD/binding/python/Common.hpp" + +#include + +/** mpi4py communicator wrapper + * + * refs: + * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/libmpi.pxd#L35-L36 + * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/MPI.pxd#L100-L105 + * - installed: include/mpi4py/mpi4py.MPI.h + */ +struct openPMD_PyMPICommObject +{ + PyObject_HEAD MPI_Comm ob_mpi; + unsigned int flags; +}; +using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject; + +inline std::variant +pythonObjectAsMpiComm(pybind11::object &comm) +{ + namespace py = pybind11; + //! TODO perform mpi4py import test and check min-version + //! careful: double MPI_Init risk? only import mpi4py.MPI? + //! required C-API init? probably just checks: + //! refs: + //! - https://bitbucket.org/mpi4py/mpi4py/src/3.0.0/demo/wrap-c/helloworld.c + //! - installed: include/mpi4py/mpi4py.MPI_api.h + // if( import_mpi4py() < 0 ) { here be dragons } + + if (comm.ptr() == Py_None) + return {"MPI communicator cannot be None."}; + if (comm.ptr() == nullptr) + return {"MPI communicator is a nullptr."}; + + // check type string to see if this is mpi4py + // __str__ (pretty) + // __repr__ (unambiguous) + // mpi4py: + // pyMPI: ... (TODO) + py::str const comm_pystr = py::repr(comm); + std::string const comm_str = comm_pystr.cast(); + if (comm_str.substr(0, 12) != std::string(" >( + comm.get_type())) + // TODO add mpi4py version from above import check to error message + return { + "comm has unexpected type layout in " + comm_str + + " (Mismatched MPI at compile vs. runtime? " + "Breaking mpi4py release?)"}; + + // todo other possible implementations: + // - pyMPI (inactive since 2008?): import mpi; mpi.WORLD + + // reimplementation of mpi4py's: + // MPI_Comm* mpiCommPtr = PyMPIComm_Get(comm.ptr()); + MPI_Comm *mpiCommPtr = + &((openPMD_PyMPIIntracommObject *)(comm.ptr()))->ob_mpi; + + if (PyErr_Occurred()) + return {"MPI communicator access error."}; + if (mpiCommPtr == nullptr) + { + return { + "MPI communicator cast failed. " + "(Mismatched MPI at compile vs. runtime?)"}; + } + return {*mpiCommPtr}; +} + +#endif diff --git a/src/ChunkInfo.cpp b/src/ChunkInfo.cpp index 3c01b7b681..5acb1ea07e 100644 --- a/src/ChunkInfo.cpp +++ b/src/ChunkInfo.cpp @@ -19,9 +19,22 @@ * If not, see . */ #include "openPMD/ChunkInfo.hpp" +#include "openPMD/ChunkInfo_internal.hpp" + +#include "openPMD/auxiliary/Mpi.hpp" #include +#ifdef _WIN32 +#define openPMD_POSIX_AVAILABLE false +#else +#define openPMD_POSIX_AVAILABLE true +#endif + +#if openPMD_POSIX_AVAILABLE +#include +#endif + namespace openPMD { ChunkInfo::ChunkInfo(Offset offset_in, Extent extent_in) @@ -48,4 +61,108 @@ bool WrittenChunkInfo::operator==(WrittenChunkInfo const &other) const return this->sourceID == other.sourceID && this->ChunkInfo::operator==(other); } + +namespace host_info +{ + constexpr size_t MAX_HOSTNAME_LENGTH = 256; + + Method methodFromStringDescription( + std::string const &descr, [[maybe_unused]] bool consider_mpi) + { + static std::map const map{ + {"posix_hostname", Method::POSIX_HOSTNAME}, +#if openPMD_HAVE_MPI + {"hostname", + consider_mpi ? Method::MPI_PROCESSOR_NAME + : Method::POSIX_HOSTNAME}, +#else + {"hostname", Method::POSIX_HOSTNAME}, +#endif + {"mpi_processor_name", Method::MPI_PROCESSOR_NAME}}; + return map.at(descr); + } + + bool methodAvailable(Method method) + { + switch (method) + { + + case Method::POSIX_HOSTNAME: + return openPMD_POSIX_AVAILABLE; + case Method::MPI_PROCESSOR_NAME: + return openPMD_HAVE_MPI == 1; + } + throw std::runtime_error("Unreachable!"); + } + + std::string byMethod(Method method) + { + static std::map const map{ +#if openPMD_POSIX_AVAILABLE + {Method::POSIX_HOSTNAME, &posix_hostname}, +#endif +#if openPMD_HAVE_MPI + {Method::MPI_PROCESSOR_NAME, &mpi_processor_name}, +#endif + }; + try + { + return (*map.at(method))(); + } + catch (std::out_of_range const &) + { + throw std::runtime_error( + "[hostname::byMethod] Specified method is not available."); + } + } + +#if openPMD_HAVE_MPI + chunk_assignment::RankMeta byMethodCollective(MPI_Comm comm, Method method) + { + auto myHostname = byMethod(method); + chunk_assignment::RankMeta res; + auto allHostnames = + auxiliary::distributeStringsToAllRanks(comm, myHostname); + for (size_t i = 0; i < allHostnames.size(); ++i) + { + res[i] = allHostnames[i]; + } + return res; + } + + std::string mpi_processor_name() + { + std::string res; + res.resize(MPI_MAX_PROCESSOR_NAME); + int string_len; + if (MPI_Get_processor_name(res.data(), &string_len) != 0) + { + throw std::runtime_error( + "[mpi_processor_name] Could not inquire processor name."); + } + // MPI_Get_processor_name returns the string length without null + // terminator and std::string::resize() does not use null terminator + // either. So, no +-1 necessary. + res.resize(string_len); + res.shrink_to_fit(); + return res; + } +#endif + +#if openPMD_POSIX_AVAILABLE + std::string posix_hostname() + { + char hostname[MAX_HOSTNAME_LENGTH]; + if (gethostname(hostname, MAX_HOSTNAME_LENGTH)) + { + throw std::runtime_error( + "[posix_hostname] Could not inquire hostname."); + } + std::string res(hostname); + return res; + } +#endif +} // namespace host_info } // namespace openPMD + +#undef openPMD_POSIX_AVAILABLE diff --git a/src/IO/AbstractIOHandlerImpl.cpp b/src/IO/AbstractIOHandlerImpl.cpp index bbab360b4d..109942df51 100644 --- a/src/IO/AbstractIOHandlerImpl.cpp +++ b/src/IO/AbstractIOHandlerImpl.cpp @@ -25,7 +25,9 @@ #include "openPMD/backend/Writable.hpp" #include +#include #include +#include namespace openPMD { @@ -38,12 +40,67 @@ AbstractIOHandlerImpl::AbstractIOHandlerImpl(AbstractIOHandler *handler) } } +namespace +{ + template + auto vec_as_string(Vec const &vec) -> std::string + { + if (vec.empty()) + { + return "[]"; + } + else + { + std::stringstream res; + res << '['; + auto it = vec.begin(); + res << *it++; + auto end = vec.end(); + for (; it != end; ++it) + { + res << ", " << *it; + } + res << ']'; + return res.str(); + } + } + + template + struct self_or_invoked + { + using type = T; + }; + + template + struct self_or_invoked>> + { + using type = std::invoke_result_t; + }; + + template + using self_or_invoked_t = typename self_or_invoked::type; + + template + auto + undefer_string(DeferredString &&str) -> self_or_invoked_t + { + if constexpr (std::is_invocable_v) + { + return str(); + } + else + { + return std::forward(str); + } + } +} // namespace + template void AbstractIOHandlerImpl::writeToStderr([[maybe_unused]] Args &&...args) const { if (m_verboseIOTasks) { - (std::cerr << ... << args) << std::endl; + (std::cerr << ... << undefer_string(args)) << std::endl; } } @@ -108,7 +165,9 @@ std::future AbstractIOHandlerImpl::flush() "->", i.writable, "] CREATE_DATASET: ", - parameter.name); + parameter.name, + ", extent=", + [¶meter]() { return vec_as_string(parameter.extent); }); createDataset(i.writable, parameter); break; } diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index d0f75e5734..c98349a5b3 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -618,8 +618,17 @@ void HDF5IOHandlerImpl::createDataset( #endif /* Open H5Object to write into */ - auto res = getFile(writable); - File file = res ? res.value() : getFile(writable->parent).value(); + File file{}; + if (auto opt = getFile(writable->parent); opt.has_value()) + { + file = opt.value(); + } + else + { + throw error::Internal( + "[HDF5] CREATE_DATASET task must have a parent with an " + "associated file."); + } hid_t node_id = H5Gopen(file.id, concrete_h5_file_position(writable).c_str(), gapl); VERIFY( diff --git a/src/Iteration.cpp b/src/Iteration.cpp index c50758ed92..b56fb21096 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -209,6 +209,15 @@ void Iteration::flushFileBased( fCreate.name = filename; IOHandler()->enqueue(IOTask(&s.writable(), fCreate)); + /* + * If it was written before, then in the context of another iteration. + */ + s.get().m_rankTable.m_attributable.written() = false; + s.get() + .m_rankTable.m_attributable.get() + .m_writable.abstractFilePosition.reset(); + s.flushRankTable(); + /* create basePath */ Parameter pCreate; pCreate.path = auxiliary::replace_first(s.basePath(), "%T/", ""); diff --git a/src/Series.cpp b/src/Series.cpp index 9079f4d791..9140011c15 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -19,22 +19,28 @@ * If not, see . */ #include "openPMD/Series.hpp" +#include "openPMD/ChunkInfo.hpp" +#include "openPMD/ChunkInfo_internal.hpp" #include "openPMD/Error.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" #include "openPMD/IO/AbstractIOHandlerHelper.hpp" #include "openPMD/IO/Access.hpp" #include "openPMD/IO/DummyIOHandler.hpp" #include "openPMD/IO/Format.hpp" +#include "openPMD/IO/IOTask.hpp" #include "openPMD/IterationEncoding.hpp" #include "openPMD/ReadIterations.hpp" #include "openPMD/ThrowError.hpp" #include "openPMD/auxiliary/Date.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/auxiliary/JSON_internal.hpp" +#include "openPMD/auxiliary/Mpi.hpp" #include "openPMD/auxiliary/StringManip.hpp" +#include "openPMD/auxiliary/Variant.hpp" #include "openPMD/backend/Attributable.hpp" #include "openPMD/version.hpp" +#include #include #include #include @@ -190,6 +196,270 @@ Series &Series::setMeshesPath(std::string const &mp) return *this; } +#if openPMD_HAVE_MPI +chunk_assignment::RankMeta Series::rankTable(bool collective) +#else +chunk_assignment::RankMeta Series::rankTable([[maybe_unused]] bool collective) +#endif +{ + auto &series = get(); + auto &rankTable = series.m_rankTable; + if (rankTable.m_bufferedRead.has_value()) + { + return *rankTable.m_bufferedRead; + } + if (iterationEncoding() == IterationEncoding::fileBased) + { + std::cerr << "[Series] Use rank table in file-based iteration encoding " + "at your own risk. Make sure to have an iteration open " + "before calling this." + << std::endl; + if (iterations.empty()) + { + return {}; + } +#if 0 + Parameter openFile; + openFile.name = iterationFilename(iterations.begin()->first); + // @todo: check if the series currently has an open file, check if + // collective is true + IOHandler()->enqueue(IOTask(this, openFile)); +#endif + } + Parameter listDatasets; + IOHandler()->enqueue(IOTask(this, listDatasets)); + IOHandler()->flush(internal::defaultFlushParams); + if (std::none_of( + listDatasets.datasets->begin(), + listDatasets.datasets->end(), + [](std::string const &str) { return str == "rankTable"; })) + { + rankTable.m_bufferedRead = chunk_assignment::RankMeta{}; + return {}; + } + Parameter openDataset; + openDataset.name = "rankTable"; + IOHandler()->enqueue(IOTask(&rankTable.m_attributable, openDataset)); + + IOHandler()->flush(internal::defaultFlushParams); + if (openDataset.extent->size() != 2) + { + // @todo use better error type + throw std::runtime_error("[Series] rankTable must be 2D."); + } + if (*openDataset.dtype != Datatype::CHAR && + *openDataset.dtype != Datatype::UCHAR && + *openDataset.dtype != Datatype::SCHAR) + { + // @todo use better error type + throw std::runtime_error("[Series] rankTable must have char type."); + } + + auto writerRanks = (*openDataset.extent)[0]; + auto lineWidth = (*openDataset.extent)[1]; + + if (lineWidth < 1) + { + // Check this because our indexing logic later relies on this + // @todo use better error type + throw std::runtime_error("[Series] rankTable lines must not be empty."); + } + + std::shared_ptr get{ + new char[writerRanks * lineWidth], + [](char const *ptr) { delete[] ptr; }}; + + auto doReadDataset = [&openDataset, this, &get, &rankTable]() { + Parameter readDataset; + // read the whole thing + readDataset.offset.resize(2); + readDataset.extent = *openDataset.extent; + // @todo better cross-platform support by switching over + // *openDataset.dtype + readDataset.dtype = Datatype::CHAR; + readDataset.data = get; + + IOHandler()->enqueue(IOTask(&rankTable.m_attributable, readDataset)); + IOHandler()->flush(internal::defaultFlushParams); + }; + +#if openPMD_HAVE_MPI + if (collective && series.m_communicator.has_value()) + { + auto comm = series.m_communicator.value(); + int rank{0}, size{1}; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + if (rank == 0) + { + doReadDataset(); + } + MPI_Bcast(get.get(), writerRanks * lineWidth, MPI_CHAR, 0, comm); + } + else + { + doReadDataset(); + } +#else + doReadDataset(); +#endif + +#if 0 + if (iterationEncoding() == IterationEncoding::fileBased) + { + // @todo only do this if the file was previously not open + auto &it = iterations.begin()->second; + Parameter closeFile; + IOHandler()->enqueue(IOTask(this, closeFile)); + it.get().m_closed = internal::CloseStatus::ClosedTemporarily; + IOHandler()->flush(internal::defaultFlushParams); + } +#endif + + chunk_assignment::RankMeta res; + for (size_t i = 0; i < writerRanks; ++i) + { + if (get.get()[(i + 1) * lineWidth - 1] != 0) + { + throw std::runtime_error( + "[Series] rankTable lines must be null-terminated strings."); + } + // Use C-String constructor for std::string in the following line + // std::string::string(char const*); + res[i] = get.get() + i * lineWidth; + } + rankTable.m_bufferedRead = res; + return res; +} + +Series &Series::setRankTable(const std::string &myRankInfo) +{ + get().m_rankTable.m_rankTableSource = + internal::SeriesData::SourceSpecifiedManually{myRankInfo}; + return *this; +} + +void Series::flushRankTable() +{ + auto &series = get(); + auto &rankTable = series.m_rankTable; + auto maybeMyRankInfo = std::visit( + auxiliary::overloaded{ + [](internal::SeriesData::NoSourceSpecified &) + -> std::optional { return std::nullopt; }, + [&series](internal::SeriesData::SourceSpecifiedViaJSON &viaJson) + -> std::optional { + host_info::Method method; + try + { +#if openPMD_HAVE_MPI + bool consider_mpi = series.m_communicator.has_value(); +#else + (void)series; + bool consider_mpi = false; +#endif + method = host_info::methodFromStringDescription( + viaJson.value, consider_mpi); + } + catch (std::out_of_range const &) + { + throw error::WrongAPIUsage( + "[Series] Wrong value for JSON option 'rank_table': '" + + viaJson.value + "'."); + } + return host_info::byMethod(method); + }, + [](internal::SeriesData::SourceSpecifiedManually &manually) + -> std::optional { return manually.value; }}, + rankTable.m_rankTableSource); + if (!maybeMyRankInfo.has_value()) + { + return; + } + + auto myRankInfo = std::move(*maybeMyRankInfo); + + unsigned long long mySize = myRankInfo.size() + 1; // null character + int rank{0}, size{1}; + unsigned long long maxSize = mySize; + + auto createRankTable = [&size, &maxSize, &rankTable, this]() { + if (rankTable.m_attributable.written()) + { + return; + } + Parameter param; + param.name = "rankTable"; + param.dtype = Datatype::CHAR; + param.extent = {uint64_t(size), uint64_t(maxSize)}; + IOHandler()->enqueue( + IOTask(&rankTable.m_attributable, std::move(param))); + }; + + auto writeDataset = [&rank, &maxSize, this, &rankTable]( + std::shared_ptr put, size_t num_lines = 1) { + Parameter chunk; + chunk.dtype = Datatype::CHAR; + chunk.offset = {uint64_t(rank), 0}; + chunk.extent = {num_lines, maxSize}; + chunk.data = std::move(put); + IOHandler()->enqueue( + IOTask(&rankTable.m_attributable, std::move(chunk))); + }; + +#if openPMD_HAVE_MPI + if (series.m_communicator.has_value()) + { + auto comm = *series.m_communicator; + MPI_Comm_rank(comm, &rank); + MPI_Comm_size(comm, &size); + // todo char portability + auto [charBuffer, lineLength, numLines] = + auxiliary::collectStringsAsMatrixTo(comm, 0, myRankInfo); + (void)numLines; // it's the MPI size + maxSize = lineLength; + + if (backend() == "MPI_HDF5") + { + MPI_Bcast(&maxSize, 1, MPI_UNSIGNED_LONG_LONG, 0, comm); + } + if (rank == 0 || backend() == "MPI_HDF5") + { + createRankTable(); + } + + if (rank == 0) + { + auto asRawPtr = new std::vector(std::move(charBuffer)); + std::shared_ptr put{ + asRawPtr->data(), + /* + * A nicer solution would be to std::move() the vector into the + * closure and let RAII deal with it. But clang6 doesn't + * correctly implement C++17 closure move initialization, so + * we go the extra mile and use raw pointers. + * > [m_charBuffer = std::move(charBuffer)](char *){ + * > // no-op + * > } + */ + [asRawPtr](char *) { delete asRawPtr; }}; + writeDataset(std::move(put), /* num_lines = */ size); + } + return; + } +#endif + // sic! no else + // if the Series was initialized without a communicator, then this code will + // run as well + createRankTable(); + + std::shared_ptr put{ + new char[maxSize]{}, [](char const *ptr) { delete[] ptr; }}; + std::copy_n(myRankInfo.c_str(), mySize, put.get()); + + writeDataset(std::move(put)); +} + std::string Series::particlesPath() const { return getAttribute("particlesPath").get(); @@ -601,6 +871,7 @@ void Series::init( std::make_unique(parsed_directory, at)); auto &series = get(); series.iterations.linkHierarchy(writable()); + series.m_rankTable.m_attributable.linkHierarchy(writable()); series.m_deferred_initialization = [called_this_already = false, filepath, options, at, comm...]( Series &s) mutable { @@ -815,6 +1086,7 @@ void Series::initSeries( series.iterations.linkHierarchy(writable); series.iterations.writable().ownKeyWithinParent = "iterations"; + series.m_rankTable.m_attributable.linkHierarchy(writable); series.m_name = input->name; @@ -1190,6 +1462,8 @@ void Series::flushGorVBased( Parameter fCreate; fCreate.name = series.m_name; IOHandler()->enqueue(IOTask(this, fCreate)); + + flushRankTable(); } series.iterations.flush( @@ -2413,7 +2687,7 @@ namespace * The string is converted to lower case. */ template - void getJsonOptionLowerCase( + bool getJsonOptionLowerCase( json::TracingJSON &config, std::string const &key, Dest &dest) { if (config.json().contains(key)) @@ -2429,6 +2703,11 @@ namespace throw error::BackendConfigSchema( {key}, "Must be convertible to string type."); } + return true; + } + else + { + return false; } } } // namespace @@ -2439,6 +2718,11 @@ void Series::parseJsonOptions(TracingJSON &options, ParsedInput &input) auto &series = get(); getJsonOption( options, "defer_iteration_parsing", series.m_parseLazily); + internal::SeriesData::SourceSpecifiedViaJSON rankTableSource; + if (getJsonOptionLowerCase(options, "rank_table", rankTableSource.value)) + { + series.m_rankTable.m_rankTableSource = std::move(rankTableSource); + } // backend key { std::map const backendDescriptors{ @@ -2590,7 +2874,9 @@ Series::Series( std::string const &options) : Attributable(NoInit()) { - setData(std::make_shared()); + auto data = std::make_shared(); + data->m_communicator = comm; + setData(std::move(data)); init(filepath, at, options, comm); } #endif diff --git a/src/auxiliary/Mpi.cpp b/src/auxiliary/Mpi.cpp new file mode 100644 index 0000000000..1873237cb6 --- /dev/null +++ b/src/auxiliary/Mpi.cpp @@ -0,0 +1,113 @@ +#include "openPMD/auxiliary/Mpi.hpp" + +#include +#include + +#if openPMD_HAVE_MPI + +namespace openPMD::auxiliary +{ +StringMatrix collectStringsAsMatrixTo( + MPI_Comm communicator, int destRank, std::string const &thisRankString) +{ + int rank, size; + MPI_Comm_rank(communicator, &rank); + MPI_Comm_size(communicator, &size); + int sendLength = thisRankString.size() + 1; + std::vector recvcounts; + + if (rank == destRank) + { + recvcounts.resize(size); + } + + MPI_Gather( + &sendLength, + 1, + MPI_INT, + recvcounts.data(), + 1, + MPI_INT, + destRank, + MPI_COMM_WORLD); + int maxLength = std::accumulate( + recvcounts.begin(), recvcounts.end(), 0, [](int a, int b) { + return std::max(a, b); + }); + + StringMatrix res; + std::vector displs; + if (rank == destRank) + { + res.line_length = maxLength; + res.num_lines = size; + res.char_buffer.resize(maxLength * res.num_lines); + displs.reserve(size); + for (int i = 0; i < size; ++i) + { + displs.emplace_back(i * maxLength); + } + } + + MPI_Gatherv( + thisRankString.c_str(), + sendLength, + MPI_CHAR, + res.char_buffer.data(), + recvcounts.data(), + displs.data(), + MPI_CHAR, + destRank, + MPI_COMM_WORLD); + + return res; +} + +std::vector distributeStringsToAllRanks( + MPI_Comm communicator, std::string const &thisRankString) +{ + int rank, size; + MPI_Comm_rank(communicator, &rank); + MPI_Comm_size(communicator, &size); + int sendLength = thisRankString.size() + 1; + + int *sizesBuffer = new int[size]; + int *displs = new int[size]; + + MPI_Allgather( + &sendLength, 1, MPI_INT, sizesBuffer, 1, MPI_INT, MPI_COMM_WORLD); + + char *namesBuffer; + { + size_t sum = 0; + for (int i = 0; i < size; ++i) + { + displs[i] = sum; + sum += sizesBuffer[i]; + } + namesBuffer = new char[sum]; + } + + MPI_Allgatherv( + thisRankString.c_str(), + sendLength, + MPI_CHAR, + namesBuffer, + sizesBuffer, + displs, + MPI_CHAR, + MPI_COMM_WORLD); + + std::vector hostnames(size); + for (int i = 0; i < size; ++i) + { + hostnames[i] = std::string(namesBuffer + displs[i]); + } + + delete[] sizesBuffer; + delete[] displs; + delete[] namesBuffer; + return hostnames; +} +} // namespace openPMD::auxiliary +#endif diff --git a/src/binding/python/ChunkInfo.cpp b/src/binding/python/ChunkInfo.cpp index 86bcb0128a..a392cdd3e2 100644 --- a/src/binding/python/ChunkInfo.cpp +++ b/src/binding/python/ChunkInfo.cpp @@ -19,6 +19,7 @@ * If not, see . */ #include "openPMD/ChunkInfo.hpp" +#include "openPMD/binding/python/Mpi.hpp" #include "openPMD/binding/python/Common.hpp" @@ -73,4 +74,30 @@ void init_Chunk(py::module &m) return WrittenChunkInfo(offset, extent, sourceID); })); + + py::enum_(m, "HostInfo") + .value("POSIX_HOSTNAME", host_info::Method::POSIX_HOSTNAME) + .value("MPI_PROCESSOR_NAME", host_info::Method::MPI_PROCESSOR_NAME) +#if openPMD_HAVE_MPI + .def( + "get_collective", + [](host_info::Method const &self, py::object &comm) { + auto variant = pythonObjectAsMpiComm(comm); + if (auto errorMsg = std::get_if(&variant)) + { + throw std::runtime_error("[Series] " + *errorMsg); + } + else + { + return host_info::byMethodCollective( + std::get(variant), self); + } + }) +#endif + .def( + "get", + [](host_info::Method const &self) { + return host_info::byMethod(self); + }) + .def("available", &host_info::methodAvailable); } diff --git a/src/binding/python/Series.cpp b/src/binding/python/Series.cpp index 346c36f625..9a87da3bdb 100644 --- a/src/binding/python/Series.cpp +++ b/src/binding/python/Series.cpp @@ -29,28 +29,13 @@ #if openPMD_HAVE_MPI // re-implemented signatures: // include +#include "openPMD/binding/python/Mpi.hpp" #include #endif #include #include -#if openPMD_HAVE_MPI -/** mpi4py communicator wrapper - * - * refs: - * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/libmpi.pxd#L35-L36 - * - https://github.com/mpi4py/mpi4py/blob/3.0.0/src/mpi4py/MPI.pxd#L100-L105 - * - installed: include/mpi4py/mpi4py.MPI.h - */ -struct openPMD_PyMPICommObject -{ - PyObject_HEAD MPI_Comm ob_mpi; - unsigned int flags; -}; -using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject; -#endif - struct SeriesIteratorPythonAdaptor : SeriesIterator { SeriesIteratorPythonAdaptor(SeriesIterator it) @@ -229,65 +214,17 @@ It will be replaced with an automatically determined file name extension: Access at, py::object &comm, std::string const &options) { - //! TODO perform mpi4py import test and check min-version - //! careful: double MPI_Init risk? only import mpi4py.MPI? - //! required C-API init? probably just checks: - //! refs: - //! - - //! https://bitbucket.org/mpi4py/mpi4py/src/3.0.0/demo/wrap-c/helloworld.c - //! - installed: include/mpi4py/mpi4py.MPI_api.h - // if( import_mpi4py() < 0 ) { here be dragons } - - if (comm.ptr() == Py_None) - throw std::runtime_error( - "Series: MPI communicator cannot be None."); - if (comm.ptr() == nullptr) - throw std::runtime_error( - "Series: MPI communicator is a nullptr."); - - // check type string to see if this is mpi4py - // __str__ (pretty) - // __repr__ (unambiguous) - // mpi4py: - // pyMPI: ... (TODO) - py::str const comm_pystr = py::repr(comm); - std::string const comm_str = comm_pystr.cast(); - if (comm_str.substr(0, 12) != std::string(" >( - comm.get_type())) - // TODO add mpi4py version from above import check to error - // message - throw std::runtime_error( - "Series: comm has unexpected type layout in " + - comm_str + - " (Mismatched MPI at compile vs. runtime? " - "Breaking mpi4py release?)"); - - // todo other possible implementations: - // - pyMPI (inactive since 2008?): import mpi; mpi.WORLD - - // reimplementation of mpi4py's: - // MPI_Comm* mpiCommPtr = PyMPIComm_Get(comm.ptr()); - MPI_Comm *mpiCommPtr = - &((openPMD_PyMPIIntracommObject *)(comm.ptr()))->ob_mpi; - - if (PyErr_Occurred()) - throw std::runtime_error( - "Series: MPI communicator access error."); - if (mpiCommPtr == nullptr) + auto variant = pythonObjectAsMpiComm(comm); + if (auto errorMsg = std::get_if(&variant)) { - throw std::runtime_error( - "Series: MPI communicator cast failed. " - "(Mismatched MPI at compile vs. runtime?)"); + throw std::runtime_error("[Series] " + *errorMsg); + } + else + { + py::gil_scoped_release release; + return new Series( + filepath, at, std::get(variant), options); } - - py::gil_scoped_release release; - return new Series(filepath, at, *mpiCommPtr, options); }), py::arg("filepath"), py::arg("access"), @@ -338,6 +275,8 @@ this method. .def_property("base_path", &Series::basePath, &Series::setBasePath) .def_property( "meshes_path", &Series::meshesPath, &Series::setMeshesPath) + .def("get_rank_table", &Series::rankTable, py::arg("collective")) + .def("set_rank_table", &Series::setRankTable, py::arg("my_rank_info")) .def_property( "particles_path", &Series::particlesPath, &Series::setParticlesPath) .def_property("author", &Series::author, &Series::setAuthor) diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 72c376cb63..0c8c2989d8 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -2,6 +2,7 @@ * To guarantee a correct call to Init, launch the tests manually. */ #include "openPMD/IO/ADIOS/macros.hpp" +#include "openPMD/IO/Access.hpp" #include "openPMD/auxiliary/Environment.hpp" #include "openPMD/auxiliary/Filesystem.hpp" #include "openPMD/openPMD.hpp" @@ -417,7 +418,8 @@ void available_chunks_test(std::string const &file_ending) << "\"" << std::to_string(mpi_size) << "\"" << R"END( } } - } + }, + "rank_table": "hostname" } )END"; @@ -525,8 +527,11 @@ TEST_CASE("extend_dataset", "[parallel]") #if openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI TEST_CASE("adios_write_test", "[parallel][adios]") { - Series o = - Series("../samples/parallel_write.bp", Access::CREATE, MPI_COMM_WORLD); + Series o = Series( + "../samples/parallel_write.bp", + Access::CREATE, + MPI_COMM_WORLD, + R"(rank_table= "hostname")"); int size{-1}; int rank{-1}; @@ -564,6 +569,48 @@ TEST_CASE("adios_write_test", "[parallel][adios]") e["positionOffset"]["x"].storeChunk(positionOffset_local, {mpi_rank}, {1}); o.flush(); + o.close(); + + chunk_assignment::RankMeta compare; + { + auto hostname = + host_info::byMethod(host_info::Method::MPI_PROCESSOR_NAME); + for (int i = 0; i < size; ++i) + { + compare[i] = hostname; + } + } + + { + Series i( + "../samples/parallel_write.bp", + Access::READ_LINEAR, + MPI_COMM_WORLD); + i.parseBase(); + REQUIRE(i.rankTable(/* collective = */ true) == compare); + } + { + Series i( + "../samples/parallel_write.bp", + Access::READ_LINEAR, + MPI_COMM_WORLD); + i.parseBase(); + REQUIRE(i.rankTable(/* collective = */ false) == compare); + } + { + Series i( + "../samples/parallel_write.bp", + Access::READ_RANDOM_ACCESS, + MPI_COMM_WORLD); + REQUIRE(i.rankTable(/* collective = */ true) == compare); + } + { + Series i( + "../samples/parallel_write.bp", + Access::READ_RANDOM_ACCESS, + MPI_COMM_WORLD); + REQUIRE(i.rankTable(/* collective = */ false) == compare); + } } TEST_CASE("adios_write_test_zero_extent", "[parallel][adios]") @@ -716,7 +763,8 @@ void close_iteration_test(std::string const &file_ending) std::vector data{2, 4, 6, 8}; // { // we do *not* need these parentheses - Series write(name, Access::CREATE, MPI_COMM_WORLD); + Series write( + name, Access::CREATE, MPI_COMM_WORLD, R"(rank_table= "hostname")"); { Iteration it0 = write.iterations[0]; auto E_x = it0.meshes["E"]["x"]; @@ -765,6 +813,42 @@ void close_iteration_test(std::string const &file_ending) auto read_again = E_x_read.loadChunk({0, 0}, {mpi_size, 4}); REQUIRE_THROWS(read.flush()); } + + chunk_assignment::RankMeta compare; + { + auto hostname = + host_info::byMethod(host_info::Method::MPI_PROCESSOR_NAME); + for (unsigned i = 0; i < mpi_size; ++i) + { + compare[i] = hostname; + } + } + + for (auto const &filename : + {"../samples/close_iterations_parallel_%T.", + "../samples/close_iterations_parallel_0.", + "../samples/close_iterations_parallel_1."}) + { + for (auto const &[at, read_collectively] : + {std::make_pair(Access::READ_LINEAR, true), + std::make_pair(Access::READ_LINEAR, false), + std::make_pair(Access::READ_RANDOM_ACCESS, true), + std::make_pair(Access::READ_RANDOM_ACCESS, false)}) + { + std::cout << filename << file_ending << "\t" + << (at == Access::READ_LINEAR ? "linear" : "random") + << "\t" << read_collectively << std::endl; + Series i(filename + file_ending, at, MPI_COMM_WORLD); + if (at == Access::READ_LINEAR) + { + i.parseBase(); + } + // Need this in file-based iteration encoding + i.iterations.begin()->second.open(); + REQUIRE( + i.rankTable(/* collective = */ read_collectively) == compare); + } + } } TEST_CASE("close_iteration_test", "[parallel]") diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 770138bb04..4c78d16e16 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -1,4 +1,5 @@ // expose private and protected members for invasive testing +#include "openPMD/ChunkInfo_internal.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/IO/Access.hpp" #if openPMD_USE_INVASIVE_TESTS @@ -40,6 +41,12 @@ #include #endif +#ifdef _WIN32 +#include +// windows.h defines this macro and it breaks any function with the same name +#undef max +#endif + using namespace openPMD; struct BackendSelection @@ -1555,7 +1562,17 @@ struct ReadFromAnyType inline void write_test(const std::string &backend) { - Series o = Series("../samples/serial_write." + backend, Access::CREATE); +#ifdef _WIN32 + std::string jsonCfg = "{}"; +#else + std::string jsonCfg = R"({"rank_table": "posix_hostname"})"; + chunk_assignment::RankMeta compare{ + {0, + host_info::byMethod( + host_info::methodFromStringDescription("posix_hostname", false))}}; +#endif + Series o = + Series("../samples/serial_write." + backend, Access::CREATE, jsonCfg); ParticleSpecies &e_1 = o.iterations[1].particles["e"]; @@ -1666,6 +1683,10 @@ inline void write_test(const std::string &backend) << '\'' << std::endl; }, variantTypeDataset); + +#ifndef _WIN32 + REQUIRE(read.rankTable(/* collective = */ false) == compare); +#endif } TEST_CASE("write_test", "[serial]") @@ -1816,13 +1837,19 @@ fileBased_add_EDpic(ParticleSpecies &e, uint64_t const num_particles) inline void fileBased_write_test(const std::string &backend) { +#ifdef _WIN32 + std::string jsonCfg = "{}"; +#else + std::string jsonCfg = R"({"rank_table": "posix_hostname"})"; +#endif if (auxiliary::directory_exists("../samples/subdir")) auxiliary::remove_directory("../samples/subdir"); { Series o = Series( "../samples/subdir/serial_fileBased_write%03T." + backend, - Access::CREATE); + Access::CREATE, + jsonCfg); ParticleSpecies &e_1 = o.iterations[1].particles["e"]; @@ -1941,7 +1968,8 @@ inline void fileBased_write_test(const std::string &backend) { Series o = Series( "../samples/subdir/serial_fileBased_write%T." + backend, - Access::READ_ONLY); + Access::READ_ONLY, + jsonCfg); REQUIRE(o.iterations.size() == 5); REQUIRE(o.iterations.count(1) == 1); @@ -2018,7 +2046,8 @@ inline void fileBased_write_test(const std::string &backend) // padding Series o = Series( "../samples/subdir/serial_fileBased_write%T." + backend, - Access::READ_WRITE); + Access::READ_WRITE, + jsonCfg); REQUIRE(o.iterations.size() == 5); o.iterations[6]; @@ -2059,7 +2088,8 @@ inline void fileBased_write_test(const std::string &backend) { Series o = Series( "../samples/subdir/serial_fileBased_write%01T." + backend, - Access::READ_WRITE); + Access::READ_WRITE, + jsonCfg); REQUIRE(o.iterations.size() == 1); /* @@ -2152,6 +2182,44 @@ inline void fileBased_write_test(const std::string &backend) Access::READ_ONLY}; helper::listSeries(list); } + +#ifdef __unix__ + /* + * Check that the ranktable was written correctly to every iteration file. + */ + { + int dirfd = open("../samples/subdir/", O_RDONLY); + if (dirfd < 0) + { + throw std::system_error( + std::error_code(errno, std::system_category())); + } + DIR *directory = fdopendir(dirfd); + if (!directory) + { + close(dirfd); + throw std::system_error( + std::error_code(errno, std::system_category())); + } + chunk_assignment::RankMeta compare{{0, host_info::posix_hostname()}}; + dirent *entry; + while ((entry = readdir(directory)) != nullptr) + { + if (strcmp(entry->d_name, ".") == 0 || + strcmp(entry->d_name, "..") == 0 || + !auxiliary::ends_with(entry->d_name, "." + backend)) + { + continue; + } + std::string fullPath = + std::string("../samples/subdir/") + entry->d_name; + Series single_file(fullPath, Access::READ_ONLY); + REQUIRE(single_file.rankTable(/* collective = */ false) == compare); + } + closedir(directory); + close(dirfd); + } +#endif // defined(__unix__) } TEST_CASE("fileBased_write_test", "[serial]")