Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Feb 14, 2024
1 parent 11e47ed commit 1e862b7
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 77 deletions.
83 changes: 67 additions & 16 deletions include/openPMD/snapshots/StatefulIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "openPMD/Error.hpp"
#include "openPMD/Iteration.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/auxiliary/Variant.hpp"
#include "openPMD/backend/ParsePreference.hpp"
#include "openPMD/snapshots/IteratorTraits.hpp"

Expand All @@ -41,21 +43,54 @@ namespace internal

namespace detail
{
namespace seek_types
// namespace seek_types
// {
// struct InitNonFileBased_t
// {};
// struct Next_t
// {};
// using seek_impl = std::variant<InitNonFileBased_t, Next_t>;
// } // namespace seek_types
// struct Seek : seek_types::seek_impl
// {
// using InitNonFileBased_t = seek_types::InitNonFileBased_t;
// using Next_t = seek_types::Next_t;

// constexpr static InitNonFileBased_t InitNonFileBased{};
// constexpr static Next_t Next{};
// };
namespace step_status_types
{
struct InitNonFileBased_t
struct Before_t
{};
struct Next_t
struct During_t
{
size_t idx;
Iteration::IterationIndex_t iteration_idx;
bool closed = false;
};
// struct Between_t
// {
// size_t prev_idx;
// };
struct After_t
{};
using seek_impl = std::variant<InitNonFileBased_t, Next_t>;
} // namespace seek_types
struct Seek : seek_types::seek_impl
} // namespace step_status_types
struct CurrentStep
: std::variant<
step_status_types::Before_t,
step_status_types::During_t,
// step_status_types::Between_t,
step_status_types::After_t>
{
using InitNonFileBased_t = seek_types::InitNonFileBased_t;
using Next_t = seek_types::Next_t;

constexpr static InitNonFileBased_t InitNonFileBased{};
constexpr static Next_t Next{};
using Before_t = step_status_types::Before_t;
constexpr static Before_t Before{};
using During_t = step_status_types::During_t;
constexpr static During_t During{};
// using Between_t = step_status_types::Between_t;
// constexpr static Between_t Between{};
using After_t = step_status_types::After_t;
constexpr static After_t After{};
};
} // namespace detail

Expand All @@ -72,6 +107,8 @@ class StatefulIterator

using maybe_series_t = std::optional<Series>;

using CurrentStep = detail::CurrentStep;

struct SharedData
{
SharedData() = default;
Expand All @@ -84,15 +121,28 @@ class StatefulIterator

Series series;
std::vector<iteration_index_t> iterationsInCurrentStep;
// nullopt <-> currently out of step
std::optional<iteration_index_t> currentIteration{};
CurrentStep currentStep = {CurrentStep::Before};
std::optional<internal::ParsePreference> parsePreference;
/*
* Necessary because in the old ADIOS2 schema, old iterations' metadata
* will leak into new steps, making the frontend think that the groups
* are still there and the iterations can be parsed again.
*/
std::set<Iteration::IterationIndex_t> ignoreIterations;

inline std::optional<Iteration::IterationIndex_t>
currentIteration() const
{
using res_t = std::optional<Iteration::IterationIndex_t>;
return std::visit(
auxiliary::overloaded{
[](CurrentStep::During_t const during) -> res_t {
return during.closed ? res_t{during.iteration_idx}
: std::nullopt;
},
[](auto const &) -> res_t { return std::nullopt; }},
currentStep);
}
};

/*
Expand All @@ -110,7 +160,6 @@ class StatefulIterator
using value_type =
typename Container<Iteration, Iteration::IterationIndex_t>::value_type;
using typename parent_t ::difference_type;
using Seek = detail::Seek;
//! construct the end() iterator
explicit StatefulIterator();

Expand Down Expand Up @@ -160,9 +209,11 @@ class StatefulIterator
* the /data/snapshot attribute, this helps figuring out which iteration
* is now active. Hence, recursion_depth.
*/
std::optional<StatefulIterator *> nextStep(Seek const &);
std::optional<StatefulIterator *> nextStep(size_t recursion_depth);

std::optional<StatefulIterator *> loopBody();

std::optional<StatefulIterator *> loopBody(Seek const &);
void initIteratorFilebased();

void deactivateDeadIteration(iteration_index_t);

Expand Down
141 changes: 80 additions & 61 deletions src/snapshots/StatefulIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
#include <iterator>
#include <optional>
#include <stdexcept>
#include <variant>

namespace openPMD
{
StatefulIterator::SharedData::~SharedData()
{
if (auto IOHandler = series.IOHandler(); currentIteration.has_value() &&
IOHandler && IOHandler->m_lastFlushSuccessful)
auto IOHandler = series.IOHandler();
auto current_iteration = currentIteration();
if (IOHandler && current_iteration.has_value() && IOHandler &&
IOHandler->m_lastFlushSuccessful)
{
auto lastIterationIndex = currentIteration.value();
auto lastIterationIndex = *current_iteration;
auto &lastIteration = series.iterations.at(lastIterationIndex);
if (!lastIteration.closed())
{
Expand Down Expand Up @@ -152,12 +155,17 @@ auto StatefulIterator::resetCurrentIterationToBegin() -> bool
auto &data = get();
if (data.iterationsInCurrentStep.empty())
{
data.currentIteration = std::nullopt;
std::visit(
auxiliary::overloaded{
[](CurrentStep::During_t &during) { during.closed = true; },
[](auto const &) {}},
data.currentStep);
return false;
}
else
{
data.currentIteration = *data.iterationsInCurrentStep.begin();
data.currentStep = {
CurrentStep::During_t{*data.iterationsInCurrentStep.begin()}};
return true;
}
}
Expand Down Expand Up @@ -249,47 +257,12 @@ StatefulIterator::StatefulIterator(
{

case IterationEncoding::fileBased: {
if (series.iterations.empty())
{
this->close();
return;
}
data.iterationsInCurrentStep.reserve(series.iterations.size());
std::transform(
series.iterations.begin(),
series.iterations.end(),
std::back_inserter(data.iterationsInCurrentStep),
[](auto const &pair) { return pair.first; });
auto it = series.iterations.begin();
auto end = series.iterations.end();
for (; it != end; ++it)
{
try
{
it->second.open();
break;
}
catch (error::ReadError const &err)
{
std::cerr << "[StatefulIterator] Cannot read iteration '"
<< it->first
<< "' and will skip it due to read error:\n"
<< err.what() << std::endl;
}
}
if (it != end)
{
data.currentIteration = it->first;
}
else
{
this->close();
}
initIteratorFilebased();
break;
}
case IterationEncoding::groupBased:
case IterationEncoding::variableBased:
if (!loopBody({Seek::InitNonFileBased}).has_value())
if (!loopBody().has_value())
{
this->close();
}
Expand Down Expand Up @@ -346,7 +319,8 @@ std::optional<StatefulIterator *> StatefulIterator::nextIterationInStep()
return {this};
}

std::optional<StatefulIterator *> StatefulIterator::nextStep(Seek const &seek)
std::optional<StatefulIterator *>
StatefulIterator::nextStep(size_t recursion_depth)
{
auto &data = get();
// since we are in group-based iteration layout, it does not
Expand All @@ -367,7 +341,7 @@ std::optional<StatefulIterator *> StatefulIterator::nextStep(Seek const &seek)
"below, will skip it.\n"
<< err.what() << std::endl;
data.series.advance(AdvanceMode::ENDSTEP);
return nextStep(seek);
return nextStep(recursion_depth + 1);
}

bool close = [&]() {
Expand All @@ -381,9 +355,9 @@ std::optional<StatefulIterator *> StatefulIterator::nextStep(Seek const &seek)
case AdvanceStatus::RANDOMACCESS:
return std::visit(
auxiliary::overloaded{
[](Seek::InitNonFileBased_t const &) { return false; },
[](Seek::Next_t const &) { return true; }},
seek);
[](CurrentStep::Before_t const &) { return false; },
[](auto const &) { return true; }},
data.currentStep);
}
throw std::runtime_error("Unreachable!");
}();
Expand All @@ -396,11 +370,25 @@ std::optional<StatefulIterator *> StatefulIterator::nextStep(Seek const &seek)
{
data.iterationsInCurrentStep = availableIterations;
resetCurrentIterationToBegin();
std::visit(
auxiliary::overloaded{
[&](CurrentStep::During_t &during) {
during.idx += recursion_depth;
during.closed = false;
},
[&](CurrentStep::Before_t const &) {
data.currentStep = {CurrentStep::During_t{0}};
},
[](CurrentStep::After_t const &) {
throw error::Internal(
"Trying to open next step, but stream is over.");
}},
data.currentStep);
}
return {this};
}

std::optional<StatefulIterator *> StatefulIterator::loopBody(Seek const &seek)
std::optional<StatefulIterator *> StatefulIterator::loopBody()
{
auto &data = get();
Series &series = data.series;
Expand Down Expand Up @@ -483,17 +471,7 @@ std::optional<StatefulIterator *> StatefulIterator::loopBody(Seek const &seek)
}
};

using res_t = std::optional<StatefulIterator *>;
auto optionallyAStep = std::visit(
auxiliary::overloaded{
[](Seek::InitNonFileBased_t const &) -> res_t {
return std::nullopt;
},
[this](Seek::Next_t const &) -> res_t {
return nextIterationInStep();
}},
seek);

auto optionallyAStep = nextIterationInStep();
if (optionallyAStep.has_value())
{
return guardReturn(optionallyAStep);
Expand All @@ -509,10 +487,51 @@ std::optional<StatefulIterator *> StatefulIterator::loopBody(Seek const &seek)
return {this};
}

auto option = nextStep(seek);
auto option = nextStep(/* recursion_depth = */ 1);
return guardReturn(option);
}

void StatefulIterator::initIteratorFilebased()
{
auto &data = get();
auto &series = data.series;
if (series.iterations.empty())
{
this->close();
return;
}
data.iterationsInCurrentStep.reserve(series.iterations.size());
std::transform(
series.iterations.begin(),
series.iterations.end(),
std::back_inserter(data.iterationsInCurrentStep),
[](auto const &pair) { return pair.first; });
auto it = series.iterations.begin();
auto end = series.iterations.end();
for (; it != end; ++it)
{
try
{
it->second.open();
break;
}
catch (error::ReadError const &err)
{
std::cerr << "[StatefulIterator] Cannot read iteration '"
<< it->first << "' and will skip it due to read error:\n"
<< err.what() << std::endl;
}
}
if (it != end)
{
data.currentIteration = it->first;
}
else
{
this->close();
}
}

void StatefulIterator::deactivateDeadIteration(iteration_index_t index)
{
auto &data = get();
Expand Down Expand Up @@ -552,7 +571,7 @@ StatefulIterator &StatefulIterator::operator++()
*/
do
{
res = loopBody({Seek::Next});
res = loopBody();
} while (!res.has_value());

auto resvalue = res.value();
Expand Down

0 comments on commit 1e862b7

Please sign in to comment.