Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush target newstep #1632

Merged
merged 6 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/source/details/backendconfig.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ Explanation of the single keys:

* If ``"disk"``, data will be moved to disk on every flush.
* If ``"buffer"``, then only upon ending an IO step or closing an engine.
* If ``new_step``, then a new step will be created. This should be used in combination with the ADIOS2 option ``adios2.engine.parameters.FlattenSteps = "on"``.

This behavior can be overridden on a per-flush basis by specifying this JSON/TOML key as an optional parameter to the ``Series::flush()`` or ``Attributable::seriesFlush()`` methods.

Additionally, specifying ``"disk_override"`` or ``"buffer_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order.
Additionally, specifying ``"disk_override"``, ``"buffer_override"`` or ``"new_step_override"`` will take precedence over options specified without the ``_override`` suffix, allowing to invert the normal precedence order.
This way, a data producing code can hardcode the preferred flush target per ``flush()`` call, but users can e.g. still entirely deactivate flushing to disk in the ``Series`` constructor by specifying ``preferred_flush_target = buffer_override``.
This is useful when applying the asynchronous IO capabilities of the BP5 engine.
* ``adios2.dataset.operators``: This key contains a list of ADIOS2 `operators <https://adios2.readthedocs.io/en/latest/components/components.html#operator>`_, used to enable compression or dataset transformations.
Expand Down
4 changes: 3 additions & 1 deletion include/openPMD/IO/ADIOS/ADIOS2Auxiliary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ namespace adios_defs
Buffer,
Buffer_Override,
Disk,
Disk_Override
Disk_Override,
NewStep,
NewStep_Override
};

using FlushTarget = adios_defs::FlushTarget;
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/IO/ADIOS/macros.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#define openPMD_HAS_ADIOS_2_10 \
(ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 210)

#define openPMD_HAS_ADIOS_2_10_1 \
(ADIOS2_VERSION_MAJOR * 1000 + ADIOS2_VERSION_MINOR * 10 + \
ADIOS2_VERSION_PATCH >= \
2101)

#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_10
// ADIOS2 v2.10 no longer defines this
#define openPMD_HAVE_ADIOS2_BP5 1
Expand Down
25 changes: 20 additions & 5 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,28 @@ class AbstractIOHandler
{
/*
* In file-based iteration encoding, the APPEND mode is handled entirely
* by the frontend, the backend should just treat it as CREATE mode
* by the frontend, the backend should just treat it as CREATE mode.
* Similar for READ_LINEAR which should be treated as READ_RANDOM_ACCESS
* in the backend.
*/
if (encoding == IterationEncoding::fileBased &&
m_backendAccess == Access::APPEND)
if (encoding == IterationEncoding::fileBased)
{
// do we really want to have those as const members..?
*const_cast<Access *>(&m_backendAccess) = Access::CREATE;
switch (m_backendAccess)
{

case Access::READ_LINEAR:
// do we really want to have those as const members..?
*const_cast<Access *>(&m_backendAccess) =
Access::READ_RANDOM_ACCESS;
break;
case Access::APPEND:
*const_cast<Access *>(&m_backendAccess) = Access::CREATE;
break;
case Access::READ_RANDOM_ACCESS:
case Access::READ_WRITE:
case Access::CREATE:
break;
}
}

m_encoding = encoding;
Expand Down
10 changes: 9 additions & 1 deletion src/IO/ADIOS/ADIOS2Auxiliary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,19 @@ FlushTarget flushTargetFromString(std::string const &str)
{
return FlushTarget::Disk_Override;
}
else if (str == "new_step")
{
return FlushTarget::NewStep;
}
else if (str == "new_step_override")
{
return FlushTarget::NewStep_Override;
}
else
{
throw error::BackendConfigSchema(
{"adios2", "engine", adios_defaults::str_flushtarget},
"Flush target must be either 'disk' or 'buffer', but "
"Flush target must be either 'disk', 'buffer' or 'new_step', but "
"was " +
str + ".");
}
Expand Down
124 changes: 81 additions & 43 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
#include "openPMD/IO/ADIOS/ADIOS2File.hpp"
#include "openPMD/Error.hpp"
#include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp"
#include "openPMD/IO/AbstractIOHandler.hpp"
#include "openPMD/auxiliary/Environment.hpp"
#include "openPMD/auxiliary/StringManip.hpp"

#include <stdexcept>

#if openPMD_USE_VERIFY
#define VERIFY(CONDITION, TEXT) \
{ \
Expand Down Expand Up @@ -1028,60 +1031,95 @@ void ADIOS2File::flush_impl(

void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts)
{
auto decideFlushAPICall =
[this, flushTarget = flushParams.flushTarget](adios2::Engine &engine) {
auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget](
adios2::Engine &engine) {
#if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
2701001223
bool performDataWrite{};
switch (flushTarget)
enum class CleanedFlushTarget
{
Buffer,
Disk,
Step
};

CleanedFlushTarget target{};
switch (flushTarget)
{
case FlushTarget::Disk:
case FlushTarget::Disk_Override:
if (m_engineType == "bp5" ||
/* this second check should be sufficient, but we leave the
first check in as a safeguard against renamings in
ADIOS2. Also do a lowerCase transform since the docstring
of `Engine::Type()` claims that the return value is in
lowercase, but for BP5 this does not seem true. */
auxiliary::lowerCase(engine.Type()) == "bp5writer")
{
case FlushTarget::Disk:
case FlushTarget::Disk_Override:
performDataWrite = true;
break;
case FlushTarget::Buffer:
case FlushTarget::Buffer_Override:
performDataWrite = false;
break;
target = CleanedFlushTarget::Disk;
}
performDataWrite = performDataWrite &&
(m_engineType == "bp5" ||
/* this second check should be sufficient, but we leave the
first check in as a safeguard against renamings in ADIOS2.
Also do a lowerCase transform since the docstring of
`Engine::Type()` claims that the return value is in
lowercase, but for BP5 this does not seem true. */
auxiliary::lowerCase(engine.Type()) == "bp5writer");

if (performDataWrite)
else
{
/*
* Deliberately don't write buffered attributes now since
* readers won't be able to see them before EndStep anyway,
* so there's no use. In fact, writing them now is harmful
* because they can't be overwritten after this anymore in the
* current step.
* Draining the uniquePtrPuts now is good however, since we
* should use this chance to free the memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
entry.run(*this);
}
engine.PerformDataWrite();
m_uniquePtrPuts.clear();
target = CleanedFlushTarget::Buffer;
}
else
break;
case FlushTarget::Buffer:
case FlushTarget::Buffer_Override:
target = CleanedFlushTarget::Buffer;
break;
case FlushTarget::NewStep:
case FlushTarget::NewStep_Override:
target = CleanedFlushTarget::Step;
break;
}

switch (target)
{
case CleanedFlushTarget::Disk:
/*
* Draining the uniquePtrPuts now to use this chance to free the
* memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
engine.PerformPuts();
entry.run(*this);
}
#else
(void)this;
(void)flushTarget;
engine.PerformDataWrite();
m_uniquePtrPuts.clear();
m_updateSpans.clear();
break;
case CleanedFlushTarget::Buffer:
engine.PerformPuts();
break;
case CleanedFlushTarget::Step:
if (streamStatus != StreamStatus::DuringStep)
{
throw error::OperationUnsupportedInBackend(
"ADIOS2",
"Trying to flush to a new step while no step is active");
}
/*
* Draining the uniquePtrPuts now to use this chance to free the
* memory.
*/
for (auto &entry : m_uniquePtrPuts)
{
entry.run(*this);
}
engine.EndStep();
engine.BeginStep();
// ++m_currentStep; // think we should keep this as the logical step
m_uniquePtrPuts.clear();
uncommittedAttributes.clear();
m_updateSpans.clear();
break;
}
#else
(void)this;
(void)flushTarget;
engine.PerformPuts();
#endif
};
};

flush_impl(
flushParams,
Expand Down
2 changes: 2 additions & 0 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,11 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val)
{
case FlushTarget::Buffer:
case FlushTarget::Disk:
case FlushTarget::NewStep:
return true;
case FlushTarget::Buffer_Override:
case FlushTarget::Disk_Override:
case FlushTarget::NewStep_Override:
return false;
}
return true;
Expand Down
7 changes: 7 additions & 0 deletions src/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,13 @@ void Series::readFileBased()
Parameter<Operation::OPEN_FILE> fOpen;
Parameter<Operation::READ_ATT> aRead;

// Tell the backend that we are parsing file-based iteration encoding.
// This especially means that READ_RANDOM_ACCESS will be used instead of
// READ_LINEAR, as READ_LINEAR is implemented in the frontend for file-based
// encoding. Don't set the iteration encoding in the frontend yet, will be
// set after reading the iteration encoding attribute from the opened file.
IOHandler()->setIterationEncoding(IterationEncoding::fileBased);

if (!auxiliary::directory_exists(IOHandler()->directory))
throw error::ReadError(
error::AffectedObject::File,
Expand Down
41 changes: 41 additions & 0 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4437,6 +4437,47 @@ BufferChunkSize = 2147483646 # 2^31 - 2
}
#endif

#if openPMD_HAVE_ADIOS2_BP5
TEST_CASE("adios2_flush_via_step")
{
Series write(
"../samples/adios2_flush_via_step/simData_%T.bp5",
Access::CREATE,
R"(adios2.engine.parameters.FlattenSteps = "on")");
std::vector<float> data(10);
for (Iteration::IterationIndex_t i = 0; i < 5; ++i)
{
Iteration it = write.writeIterations()[i];
auto E_x = it.meshes["E"]["x"];
E_x.resetDataset({Datatype::FLOAT, {10, 10}});
for (Extent::value_type j = 0; j < 10; ++j)
{
std::iota(data.begin(), data.end(), i * 100 + j * 10);
E_x.storeChunk(data, {j, 0}, {1, 10});
write.flush(R"(adios2.engine.preferred_flush_target = "new_step")");
}
it.close();
}

#if openPMD_HAS_ADIOS_2_10_1
for (auto access : {Access::READ_RANDOM_ACCESS, Access::READ_LINEAR})
{
Series read("../samples/adios2_flush_via_step/simData_%T.%E", access);
std::vector<float> load_data(100);
data.resize(100);
for (auto iteration : read.readIterations())
{
std::iota(data.begin(), data.end(), iteration.iterationIndex * 100);
iteration.meshes["E"]["x"].loadChunkRaw(
load_data.data(), {0, 0}, {10, 10});
iteration.close();
REQUIRE(load_data == data);
}
}
#endif
}
#endif

TEST_CASE("adios2_engines_and_file_endings")
{
size_t filenameCounter = 0;
Expand Down
Loading