Skip to content

Commit

Permalink
Initiate reading of group/variable-based encoding with nextStep()
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Feb 14, 2024
1 parent 2d67df8 commit 11e47ed
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 95 deletions.
26 changes: 24 additions & 2 deletions include/openPMD/snapshots/StatefulIterator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,27 @@ namespace internal
{
class SeriesData;
}

namespace detail
{
namespace seek_types
{
struct InitNonFileBased_t
{};
struct Next_t
{};
using seek_impl = std::variant<InitNonFileBased_t, Next_t>;
} // namespace seek_types
struct Seek : seek_types::seek_impl
{
using InitNonFileBased_t = seek_types::InitNonFileBased_t;
using Next_t = seek_types::Next_t;

constexpr static InitNonFileBased_t InitNonFileBased{};
constexpr static Next_t Next{};
};
} // namespace detail

class StatefulIterator
: public AbstractSeriesIterator<
StatefulIterator,
Expand Down Expand Up @@ -89,6 +110,7 @@ class StatefulIterator
using value_type =
typename Container<Iteration, Iteration::IterationIndex_t>::value_type;
using typename parent_t ::difference_type;
using Seek = detail::Seek;
//! construct the end() iterator
explicit StatefulIterator();

Expand Down Expand Up @@ -138,9 +160,9 @@ class StatefulIterator
* the /data/snapshot attribute, this helps figuring out which iteration
* is now active. Hence, recursion_depth.
*/
std::optional<StatefulIterator *> nextStep();
std::optional<StatefulIterator *> nextStep(Seek const &);

std::optional<StatefulIterator *> loopBody();
std::optional<StatefulIterator *> loopBody(Seek const &);

void deactivateDeadIteration(iteration_index_t);

Expand Down
170 changes: 77 additions & 93 deletions src/snapshots/StatefulIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

#include "openPMD/Iteration.hpp"
#include "openPMD/Series.hpp"
#include "openPMD/auxiliary/Variant.hpp"

#include <iostream>
#include <iterator>
#include <optional>
#include <stdexcept>

Expand Down Expand Up @@ -243,92 +245,55 @@ StatefulIterator::StatefulIterator(
initSeriesInLinearReadMode();
}

auto it = series.get().iterations.begin();
if (it == series.get().iterations.end())
switch (series.iterationEncoding())
{
this->close();
return;
}
else if (
it->second.get().m_closed == internal::CloseStatus::ClosedInBackend)
{
throw error::WrongAPIUsage(
"Trying to call Series::readIterations() on a (partially) read "
"Series.");
}
else
{
auto openIteration = [](Iteration &iteration) {
/*
* @todo
* Is that really clean?
* Use case: See Python ApiTest testListSeries:
* Call listSeries twice.
*/
if (iteration.get().m_closed !=
internal::CloseStatus::ClosedInBackend)
{
iteration.open();
}
};
AdvanceStatus status{};
switch (series.iterationEncoding())
{
case IterationEncoding::fileBased:
/*
* The file needs to be accessed before beginning a step upon it.
* In file-based iteration layout it maybe is not accessed yet,
* so do that now. There is only one step per file, so beginning
* the step after parsing the file is ok.
*/

openIteration(series.iterations.begin()->second);
status = it->second.beginStep(/* reread = */ true);
for (auto const &pair : series.iterations)
case IterationEncoding::fileBased: {
if (series.iterations.empty())
{
this->close();
return;
}
data.iterationsInCurrentStep.reserve(series.iterations.size());
std::transform(
series.iterations.begin(),
series.iterations.end(),
std::back_inserter(data.iterationsInCurrentStep),
[](auto const &pair) { return pair.first; });
auto it = series.iterations.begin();
auto end = series.iterations.end();
for (; it != end; ++it)
{
try
{
data.iterationsInCurrentStep.push_back(pair.first);
it->second.open();
break;
}
break;
case IterationEncoding::groupBased:
case IterationEncoding::variableBased: {
/*
* In group-based iteration layout, we have definitely already had
* access to the file until now. Better to begin a step right away,
* otherwise we might get another step's data.
*/
Iteration::BeginStepStatus::AvailableIterations_t
availableIterations;
std::tie(status, availableIterations) = Iteration::beginStep(
{},
series,
/* reread = */ reread(data.parsePreference));
/*
* In random-access mode, do not use the information read in the
* `snapshot` attribute, instead simply go through iterations
* one by one in ascending order (fallback implementation in the
* second if branch).
*/
data.iterationsInCurrentStep = availableIterations;
if (!data.iterationsInCurrentStep.empty())
catch (error::ReadError const &err)
{
openIteration(
series.iterations.at(data.iterationsInCurrentStep.at(0)));
std::cerr << "[StatefulIterator] Cannot read iteration '"
<< it->first
<< "' and will skip it due to read error:\n"
<< err.what() << std::endl;
}
break;
}
if (it != end)
{
data.currentIteration = it->first;
}

if (status == AdvanceStatus::OVER)
else
{
this->close();
return;
}
if (!resetCurrentIterationToBegin())
break;
}
case IterationEncoding::groupBased:
case IterationEncoding::variableBased:
if (!loopBody({Seek::InitNonFileBased}).has_value())
{
this->close();
return;
}
it->second.setStepStatus(StepStatus::DuringStep);
break;
}
}

Expand Down Expand Up @@ -381,12 +346,7 @@ std::optional<StatefulIterator *> StatefulIterator::nextIterationInStep()
return {this};
}

void breakpoint()
{
std::cout << "BREAKPOINT" << std::endl;
}

std::optional<StatefulIterator *> StatefulIterator::nextStep()
std::optional<StatefulIterator *> StatefulIterator::nextStep(Seek const &seek)
{
auto &data = get();
// since we are in group-based iteration layout, it does not
Expand All @@ -407,24 +367,40 @@ std::optional<StatefulIterator *> StatefulIterator::nextStep()
"below, will skip it.\n"
<< err.what() << std::endl;
data.series.advance(AdvanceMode::ENDSTEP);
return nextStep();
return nextStep(seek);
}

switch (status)
bool close = [&]() {
switch (status)
{

case AdvanceStatus::OK:
return false;
case AdvanceStatus::OVER:
return true;
case AdvanceStatus::RANDOMACCESS:
return std::visit(
auxiliary::overloaded{
[](Seek::InitNonFileBased_t const &) { return false; },
[](Seek::Next_t const &) { return true; }},
seek);
}
throw std::runtime_error("Unreachable!");
}();

if (close)
{
case AdvanceStatus::OVER:
case AdvanceStatus::RANDOMACCESS:
this->close();
break;
case AdvanceStatus::OK:
}
else
{
data.iterationsInCurrentStep = availableIterations;
resetCurrentIterationToBegin();
break;
}
return {this};
}

std::optional<StatefulIterator *> StatefulIterator::loopBody()
std::optional<StatefulIterator *> StatefulIterator::loopBody(Seek const &seek)
{
auto &data = get();
Series &series = data.series;
Expand Down Expand Up @@ -507,12 +483,20 @@ std::optional<StatefulIterator *> StatefulIterator::loopBody()
}
};

using res_t = std::optional<StatefulIterator *>;
auto optionallyAStep = std::visit(
auxiliary::overloaded{
[](Seek::InitNonFileBased_t const &) -> res_t {
return std::nullopt;
},
[this](Seek::Next_t const &) -> res_t {
return nextIterationInStep();
}},
seek);

if (optionallyAStep.has_value())
{
auto optionallyAStep = nextIterationInStep();
if (optionallyAStep.has_value())
{
return guardReturn(optionallyAStep);
}
return guardReturn(optionallyAStep);
}

// The currently active iterations have been exhausted.
Expand All @@ -525,7 +509,7 @@ std::optional<StatefulIterator *> StatefulIterator::loopBody()
return {this};
}

auto option = nextStep();
auto option = nextStep(seek);
return guardReturn(option);
}

Expand Down Expand Up @@ -568,7 +552,7 @@ StatefulIterator &StatefulIterator::operator++()
*/
do
{
res = loopBody();
res = loopBody({Seek::Next});
} while (!res.has_value());

auto resvalue = res.value();
Expand Down

0 comments on commit 11e47ed

Please sign in to comment.