Skip to content

Commit

Permalink
Add rank table for locality-aware streaming (#1505)
Browse files Browse the repository at this point in the history
* Python bindings: Extract MPI adaptor to Mpi.hpp
* Auxiliary: MPI helpers
* Remove unnecessary MPI helper
* Some fixes in HDF5 createDataset
* Series.(get)mpiRanksMetaInfo, hosttable
* Python bindings: mpi_ranks_meta_info
* Python bindings for HOST_INFO
* Test chunk table
* CI fixes: Windows
* SerialIOTests: Windows compatibility
Initialize Winsock API
* CI fixes: NVIDIA
* Add review suggestions
* Make hostname() implementation explicit to the user
* Remove Winsocks functionality
* rank_table: "hostname": pick mpi impl if parallel series
* Extend verbose mode a bit
* Initialize rank_table in parallel for HDF5
* [wip] fix open/close file issue
* Extended testing
* Reduce support for file-based encoding
* Rename mpiRanksMetaInfo() -> rankTable()
* Fix unused parameter warning
* Documentation
* Add line in docs
* CI fix
* Test writing of MPI rank table
* Fixes after rebase
  • Loading branch information
franzpoeschel authored May 31, 2024
1 parent 8c57285 commit 0baf09f
Show file tree
Hide file tree
Showing 19 changed files with 1,171 additions and 91 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ set(CORE_SOURCE
src/auxiliary/Date.cpp
src/auxiliary/Filesystem.cpp
src/auxiliary/JSON.cpp
src/auxiliary/Mpi.cpp
src/backend/Attributable.cpp
src/backend/BaseRecordComponent.cpp
src/backend/MeshRecordComponent.cpp
Expand Down
4 changes: 4 additions & 0 deletions docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ For a consistent user interface, backends shall follow the following rules:
Backend-independent JSON configuration
--------------------------------------

.. _backend_independent_config:

The openPMD backend can be chosen via the JSON/TOML key ``backend`` which recognizes the alternatives ``["hdf5", "adios2", "json"]``.

The iteration encoding can be chosen via the JSON/TOML key ``iteration_encoding`` which recognizes the alternatives ``["file_based", "group_based", "variable_based"]``.
Expand All @@ -97,6 +99,8 @@ It if set to ``{"resizable": true}``, this declares that it shall be allowed to
For HDF5, resizable Datasets come with a performance penalty.
For JSON and ADIOS2, all datasets are resizable, independent of this option.

The key ``rank_table`` allows specifying the creation of a **rank table**, used for tracking :ref:`chunk provenance especially in streaming setups <rank_table>`, refer to the streaming documentation for details.

Configuration Structure per Backend
-----------------------------------

Expand Down
8 changes: 5 additions & 3 deletions docs/source/details/mpi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ A **collective** operation needs to be executed by *all* MPI ranks of the MPI co
Contrarily, **independent** operations can also be called by a subset of these MPI ranks.
For more information, please see the `MPI standard documents <https://www.mpi-forum.org/docs/>`_, for example MPI-3.1 in `"Section 2.4 - Semantic Terms" <https://www.mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf>`_.

============================ ================== ===========================
============================ ================== ================================
Functionality Behavior Description
============================ ================== ===========================
============================ ================== ================================
``Series`` **collective** open and close
``::flush()`` **collective** read and write
``::setRankTable()`` **collective** write, performed at flush
``::rankTable()`` **coll**/indep. behavior specified by bool param
``Iteration`` [1]_ independent declare and open
``::open()`` [4]_ **collective** explicit open
``Mesh`` [1]_ independent declare, open, write
Expand All @@ -30,7 +32,7 @@ Functionality Behavior Description
``::storeChunk`` [1]_ independent write
``::loadChunk`` independent read
``::availableChunks`` [4]_ collective read, immediate result
============================ ================== ===========================
============================ ================== ================================

.. [1] Individual backends, i.e. :ref:`parallel HDF5 <backends-hdf5>`, will only support independent operations if the default, non-collective (aka independent) behavior is kept.
Otherwise these operations are collective.
Expand Down
27 changes: 27 additions & 0 deletions docs/source/usage/streaming.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,30 @@ This pays tribute to the fact that in streaming mode, an iteration is sent to th

.. literalinclude:: 10_streaming_write.py
:language: python3


Chunk provenance tracking using a rank table
--------------------------------------------

.. _rank_table:

In a large parallel streaming setup, it is important to adhere to a certain concept of data locality when deciding which data to load from the producer.
The openPMD-api has some mechanisms to help with this process:

The API call ``BaseRecordComponent::availableChunks()``/``Base_Record_Component.available_chunks()`` returns the data chunks within a specific dataset that are available for loading, each chunk hereby annotating its MPI rank within the *data producer* in ``WrittenChunkInfo::sourceID``/``WrittenChunkInfo::source_ID``.

In order to correlate this information with the MPI ranks of the *data consumer*, a **rank table** can be used in order to transmit an additional tag for each of the producer's MPI ranks. On the data producer side, the rank table can be set manually or automatically:


* **automatically** Using the :ref:`JSON/TOML option <backend_independent_config>` ``rank_table``.
The suggested specification is ``{"rank_table": "hostname"}``, although the explicit values ``"mpi_processor_name"`` and ``"posix_hostname"`` are also accepted.
``"hostname"`` resolves to the MPI processor name when the Series has been initialized with MPI, to the POSIX hostname otherwise (if that is available).
* **manually:** Using the API call ``Series::setRankTable(std::string const &myRankInfo)`` that specifies the current rank's tag.
This can be used to set custom tags, identifying e.g. NUMA nodes or groups of compute nodes.

The rank table takes the form of a 2-dimensional dataset, listing the tags as null-terminated strings line by line in order of the MPI ranks and can be loaded using ``Series::rankTable()``/``Series.get_rank_table()``.

Setting the rank table is **collective**, though the collective action is only performed upon flushing.
Reading the rank table requires specifying if the read operation should be done collectively (better for performance), or independently.

In order to retrieve the corresponding information on the **consumer side**, the function ``host_info::byMethod()``/``HostInfo.get()`` can be used for retrieving the local rank's information, or alternatively ``host_info::byMethodCollective()``/``HostInfo.get_info()`` for retrieving the rank table for all consumer ranks.
60 changes: 60 additions & 0 deletions include/openPMD/ChunkInfo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@
*/
#pragma once

#include "openPMD/config.hpp"

#include "openPMD/Dataset.hpp" // Offset, Extent

#if openPMD_HAVE_MPI
#include <mpi.h>
#endif

#include <map>
#include <string>
#include <vector>

namespace openPMD
Expand Down Expand Up @@ -73,4 +81,56 @@ struct WrittenChunkInfo : ChunkInfo
};

using ChunkTable = std::vector<WrittenChunkInfo>;

namespace chunk_assignment
{
using RankMeta = std::map<unsigned int, std::string>;
} // namespace chunk_assignment

namespace host_info
{
/**
* Methods for retrieving hostname / processor identifiers that openPMD-api
* is aware of. These can be used for locality-aware chunk distribution
* schemes in streaming setups.
*/
enum class Method
{
POSIX_HOSTNAME,
MPI_PROCESSOR_NAME
};

/**
* @brief Is the method available on the current system?
*
* @return true If it is available.
* @return false Otherwise.
*/
bool methodAvailable(Method);

/**
* @brief Wrapper for the native hostname retrieval functions such as
* POSIX gethostname().
*
* @return std::string The hostname / processor name returned by the native
* function.
*/
std::string byMethod(Method);

#if openPMD_HAVE_MPI
/**
* @brief Retrieve the hostname information on all MPI ranks and distribute
* a map of "rank -> hostname" to all ranks.
*
* This call is MPI collective.
*
* @return chunk_assignment::RankMeta Hostname / processor name information
* for all MPI ranks known to the communicator.
* The result is returned on all ranks.
*/
chunk_assignment::RankMeta byMethodCollective(MPI_Comm, Method);
#endif
} // namespace host_info
} // namespace openPMD

#undef openPMD_POSIX_AVAILABLE
67 changes: 67 additions & 0 deletions include/openPMD/ChunkInfo_internal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* Copyright 2024 Franz Poeschel
*
* This file is part of openPMD-api.
*
* openPMD-api is free software: you can redistribute it and/or modify
* it under the terms of of either the GNU General Public License or
* the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* openPMD-api is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License and the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License
* and the GNU Lesser General Public License along with openPMD-api.
* If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once

#include "openPMD/ChunkInfo.hpp"
#include <string>

namespace openPMD::host_info
{

/**
* @brief This defines the method identifiers used
* in `{"rank_table": "hostname"}`
*
* Currently recognized are:
*
* * posix_hostname
* * mpi_processor_name
*
* For backwards compatibility reasons, "hostname" is also recognized as a
* deprecated alternative for "posix_hostname".
*
* @return Method enum identifier. The identifier is returned even if the
* method is not available on the system. This should by checked
* via methodAvailable().
* @throws std::out_of_range If an unknown string identifier is passed.
*/
Method methodFromStringDescription(std::string const &descr, bool consider_mpi);

/*
* The following block contains one wrapper for each native hostname
* retrieval method. The purpose is to have the same function pointer type
* for all of them.
*/

#ifdef _WIN32
#define openPMD_POSIX_AVAILABLE false
#else
#define openPMD_POSIX_AVAILABLE true
#endif

#if openPMD_POSIX_AVAILABLE
std::string posix_hostname();
#endif

#if openPMD_HAVE_MPI
std::string mpi_processor_name();
#endif
} // namespace openPMD::host_info
58 changes: 58 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <stdexcept>
#include <string>
#include <tuple>
#include <variant>

// expose private and protected members for invasive testing
#ifndef OPENPMD_private
Expand Down Expand Up @@ -201,6 +202,36 @@ namespace internal
m_deferred_initialization = std::nullopt;

void close();

#if openPMD_HAVE_MPI
/*
* @todo Once we have separate MPI headers, move this there.
*/
std::optional<MPI_Comm> m_communicator;
#endif

struct NoSourceSpecified
{};
struct SourceSpecifiedViaJSON
{
std::string value;
};
struct SourceSpecifiedManually
{
std::string value;
};

struct RankTableData
{
Attributable m_attributable;
std::variant<
NoSourceSpecified,
SourceSpecifiedViaJSON,
SourceSpecifiedManually>
m_rankTableSource;
std::optional<chunk_assignment::RankMeta> m_bufferedRead;
};
RankTableData m_rankTable;
}; // SeriesData

class SeriesInternal;
Expand Down Expand Up @@ -388,6 +419,32 @@ class Series : public Attributable
*/
Series &setMeshesPath(std::string const &meshesPath);

/**
* @throw no_such_attribute_error If optional attribute is not present.
* @param collective Run this read operation collectively.
There might be an enormous IO overhead if running this
operation non-collectively.
To make this explicit to users, there is no default parameter.
Parameter is ignored if compiling without MPI support, (it is
present for the sake of a consistent API).
* @return Vector with a String per (writing) MPI rank, indicating user-
* defined meta information per rank. Example: host name.
*/
#if openPMD_HAVE_MPI
chunk_assignment::RankMeta rankTable(bool collective);
#else
chunk_assignment::RankMeta rankTable(bool collective = false);
#endif

/**
* @brief Set the Mpi Ranks Meta Info attribute, i.e. a Vector with
* a String per (writing) MPI rank, indicating user-
* defined meta information per rank. Example: host name.
*
* @return Reference to modified series.
*/
Series &setRankTable(std::string const &myRankInfo);

/**
* @throw no_such_attribute_error If optional attribute is not present.
* @return String representing the path to particle species, relative(!) to
Expand Down Expand Up @@ -745,6 +802,7 @@ OPENPMD_private
bool flushIOHandler = true);
void flushMeshesPath();
void flushParticlesPath();
void flushRankTable();
void readFileBased();
void readOneIterationFileBased(std::string const &filePath);
/**
Expand Down
50 changes: 50 additions & 0 deletions include/openPMD/auxiliary/Mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

#if openPMD_HAVE_MPI
#include <mpi.h>

#include <string>
#include <vector>
#endif

#include <type_traits>
Expand Down Expand Up @@ -64,5 +67,52 @@ namespace
}
} // namespace

/**
* Multiple variable-length strings represented in one single buffer
* with a fixed line width.
* Strings smaller than the maximum width are padded with zeros.
* Each line is zero-terminated with at least one zero character.
* The length of char_buffer should be equal to the product of line_length
* and num_lines.
*/
struct StringMatrix
{
std::vector<char> char_buffer;
size_t line_length = 0;
size_t num_lines = 0;
};

/*
* These are mostly internal helper functions, so this defines only those that
* we need.
* Logically, these should be complemented by `collectStringsTo()` and
* `distributeStringsAsMatrixToAllRanks()`, but we don't need them (yet).
*/

/**
* @brief Collect multiple variable-length strings to one rank in MPI_Gatherv
* fashion. Uses two collective MPI calls, the first to gather the
* different string lengths, the second to gather the actual strings.
*
* @param communicator MPI communicator
* @param destRank Target rank for MPI_Gatherv
* @param thisRankString The current MPI rank's contribution to the data.
* @return StringMatrix See documentation of StringMatrix struct.
*/
StringMatrix collectStringsAsMatrixTo(
MPI_Comm communicator, int destRank, std::string const &thisRankString);

/**
* @brief Collect multiple variable-length strings to all ranks in
* MPI_Allgatherv fashion. Uses two collective MPI calls, the first to
* gather the different string lengths, the second to gather the actual
* strings.
*
* @param communicator communicator
* @param thisRankString The current MPI rank's contribution to the data.
* @return std::vector<std::string> All ranks' strings, returned on all ranks.
*/
std::vector<std::string> distributeStringsToAllRanks(
MPI_Comm communicator, std::string const &thisRankString);
#endif
} // namespace openPMD::auxiliary
Loading

0 comments on commit 0baf09f

Please sign in to comment.