Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bp5 params lowercase #3101

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 55 additions & 18 deletions source/adios2/engine/bp5/BP5Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,20 @@ BP5Engine::GetBPSubStreamNames(const std::vector<std::string> &names,

void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)
{
adios2::Params params_lowercase;
for (auto &p : io.m_Parameters)
{
const std::string key = helper::LowerCase(p.first);
const std::string value = helper::LowerCase(p.second);
params_lowercase[key] = value;
}

auto lf_SetBoolParameter = [&](const std::string key, bool &parameter,
bool def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
Expand All @@ -174,9 +183,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetFloatParameter = [&](const std::string key, float &parameter,
float def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
parameter =
Expand All @@ -186,9 +196,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetSizeBytesParameter = [&](const std::string key,
size_t &parameter, size_t def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
parameter = helper::StringToByteUnits(
Expand All @@ -198,9 +209,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetIntParameter = [&](const std::string key, int &parameter,
int def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
parameter = std::stoi(itKey->second);
return true;
Expand All @@ -210,9 +222,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetUIntParameter = [&](const std::string key,
unsigned int &parameter, unsigned int def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
unsigned long result = std::stoul(itKey->second);
if (result > std::numeric_limits<unsigned>::max())
Expand All @@ -227,8 +240,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetStringParameter = [&](const std::string key,
std::string &parameter, const char *def) {
auto itKey = io.m_Parameters.find(key);
if (itKey != io.m_Parameters.end())
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != params_lowercase.end())
{
parameter = itKey->second;
return true;
Expand All @@ -238,9 +253,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetBufferVTypeParameter = [&](const std::string key, int &parameter,
int def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
Expand All @@ -265,9 +281,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetAggregationTypeParameter = [&](const std::string key,
int &parameter, int def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
Expand Down Expand Up @@ -297,9 +314,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

auto lf_SetAsyncWriteParameter = [&](const std::string key, int &parameter,
int def) {
auto itKey = io.m_Parameters.find(key);
const std::string lkey = helper::LowerCase(std::string(key));
auto itKey = params_lowercase.find(lkey);
parameter = def;
if (itKey != io.m_Parameters.end())
if (itKey != params_lowercase.end())
{
std::string value = itKey->second;
std::transform(value.begin(), value.end(), value.begin(),
Expand Down Expand Up @@ -331,8 +349,27 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params)

#define get_params(Param, Type, Typedecl, Default) \
lf_Set##Type##Parameter(#Param, Params.Param, Default);

BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params);
#undef get_params

if (Params.verbose > 0 && !m_RankMPI)
{
std::cout << "---------------- " << io.m_EngineType
<< " engine parameters --------------\n";
#define print_params(Param, Type, Typedecl, Default) \
lf_Set##Type##Parameter(#Param, Params.Param, Default); \
if (!m_RankMPI) \
{ \
std::cout << " " << std::string(#Param) << " = " << Params.Param \
<< " default = " << Default << std::endl; \
}

BP5_FOREACH_PARAMETER_TYPE_4ARGS(print_params);
#undef print_params
std::cout << "-----------------------------------------------------"
<< std::endl;
}
};

} // namespace engine
Expand Down
15 changes: 12 additions & 3 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ class BP5Engine
static constexpr size_t m_VersionTagPosition = 0;
static constexpr size_t m_VersionTagLength = 32;

static constexpr uint8_t m_BP5MinorVersion = 2;

/** Index record types */
enum IndexRecord
{
StepRecord = 's',
WriterMapRecord = 'w',
};

std::vector<std::string>
GetBPSubStreamNames(const std::vector<std::string> &names,
size_t subFileIndex) const noexcept;
Expand Down Expand Up @@ -117,13 +126,13 @@ class BP5Engine
MACRO(BeginStepPollingFrequencySecs, Float, float, 1.0f) \
MACRO(StreamReader, Bool, bool, false) \
MACRO(BurstBufferDrain, Bool, bool, true) \
MACRO(BurstBufferPath, String, std::string, (char *)(intptr_t)0) \
MACRO(BurstBufferPath, String, std::string, "") \
MACRO(NodeLocal, Bool, bool, false) \
MACRO(verbose, Int, int, 0) \
MACRO(CollectiveMetadata, Bool, bool, true) \
MACRO(NumAggregators, UInt, unsigned int, 0) \
MACRO(AggregatorRatio, UInt, unsigned int, 0) \
MACRO(NumSubFiles, UInt, unsigned int, 999999) \
MACRO(NumSubFiles, UInt, unsigned int, 0) \
MACRO(StripeSize, UInt, unsigned int, 4096) \
MACRO(DirectIO, Bool, bool, false) \
MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \
Expand All @@ -139,7 +148,7 @@ class BP5Engine
MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \
MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \
MACRO(AppendAfterSteps, Int, int, INT_MAX) \
MACRO(SelectSteps, String, std::string, (char *)(intptr_t)0) \
MACRO(SelectSteps, String, std::string, "") \
MACRO(ReaderShortCircuitReads, Bool, bool, false)

struct BP5Params
Expand Down
148 changes: 93 additions & 55 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,16 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,

// BP minor version, unused
position = m_BPMinorVersionPosition;
const uint8_t minorversion = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (minorversion != m_BP5MinorVersion)
{
helper::Throw<std::runtime_error>(
"Engine", "BP5Reader", "ParseMetadataIndex",
"Current ADIOS2 BP5 Engine only supports version 5." +
std::to_string(m_BP5MinorVersion) + ", found 5." +
std::to_string(minorversion) + " version");
}

// Writer active flag
position = m_ActiveFlagPosition;
Expand Down Expand Up @@ -735,27 +745,22 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
m_FilteredMetadataInfo.clear();
uint64_t minfo_pos = 0;
uint64_t minfo_size = 0;
int n = 0; // a loop counter for current run
int n = 0; // a loop counter for current run4
int nrec = 0; // number of records in current run

while (position < buffer.size() &&
metadataSizeToRead < maxMetadataSizeInMemory)
{
std::vector<uint64_t> ptrs;
const uint64_t MetadataPos = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(

const unsigned char recordID = helper::ReadValue<unsigned char>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t hasWriterMap = helper::ReadValue<uint64_t>(
const uint64_t recordLength = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const size_t dbgRecordStartPosition = position;

if (!n)
switch (recordID)
{
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (hasWriterMap)
case IndexRecord::WriterMapRecord:
{
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
Expand All @@ -775,60 +780,93 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
}
m_LastMapStep = m_StepsCount;
m_LastWriterCount = s.WriterCount;
break;
}

if (m_SelectedSteps.IsSelected(m_AbsStepsInFile))
case IndexRecord::StepRecord:
{
m_WriterMapIndex.push_back(m_LastMapStep);
std::vector<uint64_t> ptrs;
const uint64_t MetadataPos = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
// absolute pos in file before read
ptrs.push_back(MetadataPos);
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
if (!n)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
minfo_pos = MetadataPos; // initialize minfo_pos properly
MetadataPosTotalSkip = MetadataPos;
}

if (m_SelectedSteps.IsSelected(m_AbsStepsInFile))
{
m_WriterMapIndex.push_back(m_LastMapStep);

// pos in metadata in memory
ptrs.push_back(MetadataPos - MetadataPosTotalSkip);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
// absolute pos in file before read
ptrs.push_back(MetadataPos);
m_MetadataIndexTable[m_StepsCount] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize
<< "; ";
std::cout << "loc:" << DataPos << std::endl;
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif
minfo_size += MetadataSize;
metadataSizeToRead += MetadataSize;
m_StepsCount++;
}
else
{
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
minfo_size += MetadataSize;
metadataSizeToRead += MetadataSize;
m_StepsCount++;
}
else
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
MetadataPosTotalSkip += MetadataSize;
if (minfo_size > 0)
{
m_FilteredMetadataInfo.push_back(
std::make_pair(minfo_pos, minfo_size));
}
minfo_pos = MetadataPos;
minfo_size = 0;
}
minfo_pos = MetadataPos;
minfo_size = 0;

// skip over the writer -> data file offset records
position +=
sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1);
++m_AbsStepsInFile;
++n;
break;
}
}
// dbg
if ((position - dbgRecordStartPosition) != (size_t)recordLength)
{
helper::Throw<std::runtime_error>(
"Engine", "BP5Reader", "ParseMetadataIndex",
"Record " + std::to_string(nrec) + " (id = " +
std::to_string(recordID) + ") has invalid length " +
std::to_string(recordLength) + ". We parsed " +
std::to_string(position - dbgRecordStartPosition) +
" bytes for this record"

// skip over the writer -> data file offset records
position +=
sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1);
++m_AbsStepsInFile;
++n;
);
}
++nrec;
}
if (minfo_size > 0)
{
Expand Down
Loading