Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Independent writes: Make workaround more flexible #1660

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class Iteration : public Attributable
friend class internal::AttributableData;
template <typename T>
friend T &internal::makeOwning(T &self, Series);
friend class Writable;

public:
Iteration(Iteration const &) = default;
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
* Flush the openPMD hierarchy to the backend without flushing any actual
* data yet.
*/
seriesFlush({FlushLevel::SkeletonOnly});
seriesFlush_impl</* flush_entire_series = */ false>(
{FlushLevel::SkeletonOnly});

size_t size = 1;
for (auto ext : e)
Expand Down
12 changes: 12 additions & 0 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/WriteIterations.hpp"
#include "openPMD/auxiliary/TypeTraits.hpp"
#include "openPMD/auxiliary/Variant.hpp"
#include "openPMD/backend/Attributable.hpp"
#include "openPMD/backend/Container.hpp"
Expand Down Expand Up @@ -692,6 +693,17 @@ class Series : public Attributable
*/
void close();

/**
* This overrides Attributable::iterationFlush() which will fail on Series.
*/
template <typename X = void, typename... Args>
auto iterationFlush(Args &&...)
{
static_assert(
auxiliary::dependent_false_v<X>,
"Cannot call this on an instance of Series.");
}

// clang-format off
OPENPMD_private
// clang-format on
Expand Down
23 changes: 22 additions & 1 deletion include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,20 @@ class Attributable
*/
void seriesFlush(std::string backendConfig = "{}");

/** Flush the containing Iteration.
*
* Writable connects all objects of an openPMD series through a linked list
* of parents. This method will walk up the parent list to find
* the containing Iteration.
* The Iteration will be flushed regardless if it is dirty.
*
* @param backendConfig Further backend-specific instructions on how to
* implement this flush call.
* Must be provided in-line, configuration is not read
* from files.
*/
void iterationFlush(std::string backendConfig = "{}");

/** String serialization to describe an Attributable
*
* This object contains the Series data path as well as the openPMD object
Expand Down Expand Up @@ -308,6 +322,12 @@ class Attributable
*/
MyPath myPath() const;

/**
* @brief Sets the object dirty to make internal procedures think it has
* been modified.
*/
void touch();

// clang-format off
OPENPMD_protected
// clang-format on
Expand All @@ -330,7 +350,8 @@ OPENPMD_protected
internal::SeriesData *>;
/** @} */

void seriesFlush(internal::FlushParams const &);
template <bool flush_entire_series>
void seriesFlush_impl(internal::FlushParams const &);

void flushAttributes(internal::FlushParams const &);

Expand Down
2 changes: 2 additions & 0 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ class Writable final
* an object that has no parent, which is the Series object, and flush()-es
* it.
*/
template <bool flush_entire_series>
void seriesFlush(std::string backendConfig = "{}");

// clang-format off
OPENPMD_private
// clang-format on

template <bool flush_entire_series>
void seriesFlush(internal::FlushParams const &);
/*
* These members need to be shared pointers since distinct instances of
Expand Down
23 changes: 20 additions & 3 deletions src/backend/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,14 @@ Attributable &Attributable::setComment(std::string const &c)

void Attributable::seriesFlush(std::string backendConfig)
{
writable().seriesFlush(std::move(backendConfig));
writable().seriesFlush</* flush_entire_series = */ true>(
std::move(backendConfig));
}

void Attributable::iterationFlush(std::string backendConfig)
{
writable().seriesFlush</* flush_entire_series = */ false>(
std::move(backendConfig));
}

Series Attributable::retrieveSeries() const
Expand Down Expand Up @@ -240,10 +247,20 @@ auto Attributable::myPath() const -> MyPath
return res;
}

void Attributable::seriesFlush(internal::FlushParams const &flushParams)
void Attributable::touch()
{
setDirtyRecursive(true);
}

template <bool flush_entire_series>
void Attributable::seriesFlush_impl(internal::FlushParams const &flushParams)
{
writable().seriesFlush(flushParams);
writable().seriesFlush<flush_entire_series>(flushParams);
}
template void
Attributable::seriesFlush_impl<true>(internal::FlushParams const &flushParams);
template void
Attributable::seriesFlush_impl<false>(internal::FlushParams const &flushParams);

void Attributable::flushAttributes(internal::FlushParams const &flushParams)
{
Expand Down
54 changes: 47 additions & 7 deletions src/backend/Writable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
* If not, see <http://www.gnu.org/licenses/>.
*/
#include "openPMD/backend/Writable.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/DerefDynamicCast.hpp"
#include <stdexcept>

namespace openPMD
{
Expand All @@ -42,25 +45,62 @@ Writable::~Writable()
IOTask(this, Parameter<Operation::DEREGISTER>(parent)));
}

template <bool flush_entire_series>
void Writable::seriesFlush(std::string backendConfig)
{
seriesFlush({FlushLevel::UserFlush, std::move(backendConfig)});
seriesFlush<flush_entire_series>(
internal::FlushParams{FlushLevel::UserFlush, std::move(backendConfig)});
}
template void Writable::seriesFlush<true>(std::string backendConfig);
template void Writable::seriesFlush<false>(std::string backendConfig);

template <bool flush_entire_series>
void Writable::seriesFlush(internal::FlushParams const &flushParams)
{
Attributable impl;
impl.setData({attributable, [](auto const *) {}});
auto [iteration_internal, series_internal] = impl.containingIteration();
if (iteration_internal)
{
(*iteration_internal)
->asInternalCopyOf<Iteration>()
.setDirtyRecursive(true);
(*iteration_internal)->asInternalCopyOf<Iteration>().touch();
}
auto series = series_internal->asInternalCopyOf<Series>();
series.flush_impl(
series.iterations.begin(), series.iterations.end(), flushParams);
auto [begin, end] = [&, &iteration_internal_lambda = iteration_internal]()
-> std::pair<Series::iterations_iterator, Series::iterations_iterator> {
if (!flush_entire_series)
{
if (!iteration_internal_lambda.has_value())
{
throw std::runtime_error(
"[Writable::seriesFlush()] Requested flushing the "
"containing Iteration, but no Iteration was found?");
}
auto it = series.iterations.begin();
auto end_lambda = series.iterations.end();
for (; it != end_lambda; ++it)
{
if (&it->second.Iteration::get() == *iteration_internal_lambda)
{
auto next = it;
++next;
return {it, next};
}
}
throw std::runtime_error(
"[Writable::seriesFlush()] Found a containing Iteration that "
"seems to not be part of the containing Series?? You might try "
"running this with `flushing_entire_series=false` as a "
"workaround, but something is still wrong.");
}
else
{
return {series.iterations.begin(), series.iterations.end()};
}
}();
series.flush_impl(begin, end, flushParams);
}

template void
Writable::seriesFlush<true>(internal::FlushParams const &flushParams);
template void
Writable::seriesFlush<false>(internal::FlushParams const &flushParams);
} // namespace openPMD
4 changes: 4 additions & 0 deletions src/binding/python/Attributable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,10 @@ void init_Attributable(py::module &m)
"series_flush",
py::overload_cast<std::string>(&Attributable::seriesFlush),
py::arg("backend_config") = "{}")
.def(
"iteration_flush",
py::overload_cast<std::string>(&Attributable::iterationFlush),
py::arg("backend_config") = "{}")

.def_property_readonly(
"attributes",
Expand Down
5 changes: 2 additions & 3 deletions test/ParallelIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]")
Access::CREATE,
MPI_COMM_WORLD,
"adios2.engine.preferred_flush_target = \"buffer\"");
write.seriesFlush();
int size, rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
Expand All @@ -1182,11 +1183,9 @@ TEST_CASE("independent_write_with_collective_flush", "[parallel]")
* conflict with the default buffer target that will run in the destructor,
* unless the flush in the next line really is collective.
*/
std::cout << "ENTER" << std::endl;
MPI_Barrier(MPI_COMM_WORLD);
iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\"");
iteration.iterationFlush("adios2.engine.preferred_flush_target = \"disk\"");
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "LEAVE" << std::endl;
}
#endif

Expand Down
Loading