Skip to content

Commit

Permalink
set params on all transports
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer committed Jan 28, 2024
1 parent b0b4987 commit f108c3f
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
2 changes: 1 addition & 1 deletion source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ std::pair<double, double> BP5Reader::ReadData(adios2::transportman::TransportMan
{
Params transportParameters;
transportParameters["FailOnEOF"] = "true";
FileManager.SetParameters(transportParameters);
FileManager.SetParameters(transportParameters, -1);
}
}
TP endSubfile = NOW();
Expand Down
20 changes: 16 additions & 4 deletions source/adios2/toolkit/transportman/TransportMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,22 @@ void TransportMan::ReadFile(char *buffer, const size_t size, const size_t start,

void TransportMan::SetParameters(const Params &params, const size_t transportIndex)
{
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport,
", in call to SetParameters with index " + std::to_string(transportIndex));
itTransport->second->SetParameters(params);
if (transportIndex == -1)
{
for (auto &transportPair : m_Transports)
{
auto &transport = transportPair.second;

transport->SetParameters(params);
}
}
else
{
auto itTransport = m_Transports.find(transportIndex);
CheckFile(itTransport,
", in call to SetParameters with index " + std::to_string(transportIndex));
itTransport->second->SetParameters(params);
}
}

void TransportMan::FlushFiles(const int transportIndex)
Expand Down
130 changes: 130 additions & 0 deletions testing/adios2/engine/bp/operations/TestBPWriteReadBZIP2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,135 @@ void BZIP2Accuracy1D(const std::string accuracy)
}
}

void BZIP2Accuracy1DLocal(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
// form a mpiSize * Nx 1D array
const std::string fname("BPWR_BZIP2_1D_Local_" + accuracy + ".bp");

int mpiRank = 0, mpiSize = 1;
// Number of rows
const size_t Nx = 1000;

// Number of steps
const size_t NSteps = 1;

std::vector<float> r32s(Nx);
std::vector<double> r64s(Nx);

// range 0 to 999
std::iota(r32s.begin(), r32s.end(), 0.f);
std::iota(r64s.begin(), r64s.end(), 0.);

#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{Nx};

adios2::Variable<float> var_r32 =
io.DefineVariable<float>("r32", shape, start, count, adios2::ConstantDims);
adios2::Variable<double> var_r64 =
io.DefineVariable<double>("r64", shape, start, count, adios2::ConstantDims);

// add operations
adios2::Operator BZIP2Op =
adios.DefineOperator("BZIP2Compressor", adios2::ops::LosslessBZIP2);

var_r32.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});
var_r64.AddOperation(BZIP2Op, {{adios2::ops::bzip2::key::blockSize100k, accuracy}});

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (size_t step = 0; step < NSteps; ++step)
{
bpWriter.BeginStep();
bpWriter.Put<float>("r32", r32s.data());
bpWriter.Put<double>("r64", r64s.data());
bpWriter.EndStep();
}

bpWriter.Close();
}

{
adios2::IO io = adios.DeclareIO("ReadIO");

if (!engineName.empty())
{
io.SetEngine(engineName);
}
else
{
// Create the BP Engine
io.SetEngine("BPFile");
}

adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);

unsigned int t = 0;
std::vector<float> decompressedR32s;
std::vector<double> decompressedR64s;

while (bpReader.BeginStep() == adios2::StepStatus::OK)
{
auto var_r32 = io.InquireVariable<float>("r32");
EXPECT_TRUE(var_r32);
ASSERT_EQ(var_r32.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r32.Steps(), NSteps);

auto var_r64 = io.InquireVariable<double>("r64");
EXPECT_TRUE(var_r64);
ASSERT_EQ(var_r64.ShapeID(), adios2::ShapeID::LocalArray);
ASSERT_EQ(var_r64.Steps(), NSteps);
auto r32_info = bpReader.BlocksInfo(var_r32, -1);
auto r64_info = bpReader.BlocksInfo(var_r64, -1);

var_r32.SetBlockSelection(mpiRank);
var_r64.SetBlockSelection(mpiRank);

bpReader.Get(var_r32, decompressedR32s);
bpReader.Get(var_r64, decompressedR64s);
bpReader.EndStep();

for (size_t i = 0; i < Nx; ++i)
{
std::stringstream ss;
ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
std::string msg = ss.str();
ASSERT_EQ(decompressedR32s[i], r32s[i]) << msg;
ASSERT_EQ(decompressedR64s[i], r64s[i]) << msg;
}
++t;
}

EXPECT_EQ(t, NSteps);

bpReader.Close();
}
}

void BZIP2Accuracy2D(const std::string accuracy)
{
// Each process would write a 1x8 array and all processes would
Expand Down Expand Up @@ -830,6 +959,7 @@ class BPWriteReadBZIP2 : public ::testing::TestWithParam<std::string>
};

TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21D) { BZIP2Accuracy1D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DLocal) { BZIP2Accuracy1DLocal(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP22D) { BZIP2Accuracy2D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP23D) { BZIP2Accuracy3D(GetParam()); }
TEST_P(BPWriteReadBZIP2, ADIOS2BPWriteReadBZIP21DSel) { BZIP2Accuracy1DSel(GetParam()); }
Expand Down

0 comments on commit f108c3f

Please sign in to comment.