From 839159eb8b80f8f9522094720a49e54cd20d58f0 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Wed, 2 Mar 2022 08:46:05 -0500 Subject: [PATCH 1/3] BP5 index format is now record-based: 1+8 byte for recordID + record length, then content. recordID 's' for Steps, 'w' for writer map Writer map always PRECEEDS the step record where it was recorded. --- source/adios2/engine/bp5/BP5Engine.h | 9 + source/adios2/engine/bp5/BP5Reader.cpp | 148 +++++++----- source/adios2/engine/bp5/BP5Writer.cpp | 214 +++++++++++------- source/adios2/engine/bp5/BP5Writer.h | 4 +- source/utils/bp5dbg/adios2/bp5dbg/idxtable.py | 76 ++++--- source/utils/bpls/bpls.cpp | 5 +- 6 files changed, 284 insertions(+), 172 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 4c995087c1..1aa2df4c18 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -59,6 +59,15 @@ class BP5Engine static constexpr size_t m_VersionTagPosition = 0; static constexpr size_t m_VersionTagLength = 32; + static constexpr uint8_t m_BP5MinorVersion = 2; + + /** Index record types */ + enum IndexRecord + { + StepRecord = 's', + WriterMapRecord = 'w', + }; + std::vector GetBPSubStreamNames(const std::vector &names, size_t subFileIndex) const noexcept; diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index fb03ba0c7c..815ae752e4 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -706,6 +706,16 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, // BP minor version, unused position = m_BPMinorVersionPosition; + const uint8_t minorversion = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + if (minorversion != m_BP5MinorVersion) + { + helper::Throw( + "Engine", "BP5Reader", "ParseMetadataIndex", + "Current ADIOS2 BP5 Engine only supports version 5." + + std::to_string(m_BP5MinorVersion) + ", found 5." + + std::to_string(minorversion) + " version"); + } // Writer active flag position = m_ActiveFlagPosition; @@ -735,27 +745,22 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, m_FilteredMetadataInfo.clear(); uint64_t minfo_pos = 0; uint64_t minfo_size = 0; - int n = 0; // a loop counter for current run + int n = 0; // a loop counter for current run4 + int nrec = 0; // number of records in current run + while (position < buffer.size() && metadataSizeToRead < maxMetadataSizeInMemory) { - std::vector ptrs; - const uint64_t MetadataPos = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); - const uint64_t MetadataSize = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); - const uint64_t FlushCount = helper::ReadValue( + + const unsigned char recordID = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); - const uint64_t hasWriterMap = helper::ReadValue( + const uint64_t recordLength = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); + const size_t dbgRecordStartPosition = position; - if (!n) + switch (recordID) { - minfo_pos = MetadataPos; // initialize minfo_pos properly - MetadataPosTotalSkip = MetadataPos; - } - - if (hasWriterMap) + case IndexRecord::WriterMapRecord: { auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct()); auto &s = p.first->second; @@ -775,60 +780,93 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, } m_LastMapStep = m_StepsCount; m_LastWriterCount = s.WriterCount; + break; } - - if (m_SelectedSteps.IsSelected(m_AbsStepsInFile)) + case IndexRecord::StepRecord: { - m_WriterMapIndex.push_back(m_LastMapStep); + std::vector ptrs; + const uint64_t MetadataPos = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + const uint64_t MetadataSize = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + const uint64_t FlushCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); - // pos in metadata in memory - ptrs.push_back(MetadataPos - MetadataPosTotalSkip); - ptrs.push_back(MetadataSize); - ptrs.push_back(FlushCount); - ptrs.push_back(position); - // absolute pos in file before read - ptrs.push_back(MetadataPos); - m_MetadataIndexTable[m_StepsCount] = ptrs; -#ifdef DUMPDATALOCINFO - for (uint64_t i = 0; i < m_WriterCount; i++) + if (!n) { - size_t DataPosPos = ptrs[3]; - std::cout << "Writer " << i << " data at "; - for (uint64_t j = 0; j < FlushCount; j++) + minfo_pos = MetadataPos; // initialize minfo_pos properly + MetadataPosTotalSkip = MetadataPos; + } + + if (m_SelectedSteps.IsSelected(m_AbsStepsInFile)) + { + m_WriterMapIndex.push_back(m_LastMapStep); + + // pos in metadata in memory + ptrs.push_back(MetadataPos - MetadataPosTotalSkip); + ptrs.push_back(MetadataSize); + ptrs.push_back(FlushCount); + ptrs.push_back(position); + // absolute pos in file before read + ptrs.push_back(MetadataPos); + m_MetadataIndexTable[m_StepsCount] = ptrs; +#ifdef DUMPDATALOCINFO + for (uint64_t i = 0; i < m_WriterCount; i++) { + size_t DataPosPos = ptrs[3]; + std::cout << "Writer " << i << " data at "; + for (uint64_t j = 0; j < FlushCount; j++) + { + const uint64_t DataPos = helper::ReadValue( + buffer, DataPosPos, m_Minifooter.IsLittleEndian); + const uint64_t DataSize = helper::ReadValue( + buffer, DataPosPos, m_Minifooter.IsLittleEndian); + std::cout << "loc:" << DataPos << " siz:" << DataSize + << "; "; + } const uint64_t DataPos = helper::ReadValue( buffer, DataPosPos, m_Minifooter.IsLittleEndian); - const uint64_t DataSize = helper::ReadValue( - buffer, DataPosPos, m_Minifooter.IsLittleEndian); - std::cout << "loc:" << DataPos << " siz:" << DataSize - << "; "; + std::cout << "loc:" << DataPos << std::endl; } - const uint64_t DataPos = helper::ReadValue( - buffer, DataPosPos, m_Minifooter.IsLittleEndian); - std::cout << "loc:" << DataPos << std::endl; - } #endif - minfo_size += MetadataSize; - metadataSizeToRead += MetadataSize; - m_StepsCount++; - } - else - { - MetadataPosTotalSkip += MetadataSize; - if (minfo_size > 0) + minfo_size += MetadataSize; + metadataSizeToRead += MetadataSize; + m_StepsCount++; + } + else { - m_FilteredMetadataInfo.push_back( - std::make_pair(minfo_pos, minfo_size)); + MetadataPosTotalSkip += MetadataSize; + if (minfo_size > 0) + { + m_FilteredMetadataInfo.push_back( + std::make_pair(minfo_pos, minfo_size)); + } + minfo_pos = MetadataPos; + minfo_size = 0; } - minfo_pos = MetadataPos; - minfo_size = 0; + + // skip over the writer -> data file offset records + position += + sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1); + ++m_AbsStepsInFile; + ++n; + break; + } } + // dbg + if ((position - dbgRecordStartPosition) != (size_t)recordLength) + { + helper::Throw( + "Engine", "BP5Reader", "ParseMetadataIndex", + "Record " + std::to_string(nrec) + " (id = " + + std::to_string(recordID) + ") has invalid length " + + std::to_string(recordLength) + ". We parsed " + + std::to_string(position - dbgRecordStartPosition) + + " bytes for this record" - // skip over the writer -> data file offset records - position += - sizeof(uint64_t) * m_LastWriterCount * ((2 * FlushCount) + 1); - ++m_AbsStepsInFile; - ++n; + ); + } + ++nrec; } if (minfo_size > 0) { diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index db975d267d..c9f6da8008 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -319,57 +319,74 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, { m_FileMetadataManager.FlushFiles(); + // bufsize: Step record + size_t bufsize = + 1 + (4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * + sizeof(uint64_t); if (MetaDataPos == 0) { // First time, write the headers - format::BufferSTL bi; - MakeHeader(bi, "Index Table", true); - m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), - bi.m_Position); + bufsize += m_IndexHeaderSize; + } + if (!m_WriterSubfileMap.empty()) + { + // WriterMap record + bufsize += 1 + (4 + m_Comm.Size()) * sizeof(uint64_t); } - std::vector buf; - buf.reserve(4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size() + 3 + - m_Comm.Size()); - - buf.resize(buf.size() + 4 + - ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); - buf[0] = MetaDataPos; - buf[1] = MetaDataSize; - buf[2] = FlushPosSizeInfo.size(); - buf[3] = static_cast(m_WriterSubfileMap.size() > 0); + std::vector buf(bufsize); + size_t pos = 0; + uint64_t d; + unsigned char record; - uint64_t pos = 4; + if (MetaDataPos == 0) + { + // First time, write the headers + MakeHeader(buf, pos, "Index Table", true); + } + // WriterMap record if (!m_WriterSubfileMap.empty()) { - // Add Writer to Subfiles Map - buf.resize(buf.size() + 3 + m_Comm.Size()); - buf[4] = static_cast(m_Comm.Size()); - buf[5] = static_cast(m_Aggregator->m_NumAggregators); - buf[6] = static_cast(m_Aggregator->m_SubStreams); - pos += 3; - std::copy(m_WriterSubfileMap.begin(), m_WriterSubfileMap.end(), - buf.begin() + pos); + record = WriterMapRecord; + helper::CopyToBuffer(buf, pos, &record, 1); // record type + d = (3 + m_Comm.Size()) * sizeof(uint64_t); + helper::CopyToBuffer(buf, pos, &d, 1); // record length + d = static_cast(m_Comm.Size()); + helper::CopyToBuffer(buf, pos, &d, 1); + d = static_cast(m_Aggregator->m_NumAggregators); + helper::CopyToBuffer(buf, pos, &d, 1); + d = static_cast(m_Aggregator->m_SubStreams); + helper::CopyToBuffer(buf, pos, &d, 1); + helper::CopyToBuffer(buf, pos, m_WriterSubfileMap.data(), + m_Comm.Size()); m_WriterSubfileMap.clear(); - pos += m_Comm.Size(); } + // Step record + record = StepRecord; + helper::CopyToBuffer(buf, pos, &record, 1); // record type + d = (3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) * + sizeof(uint64_t); + helper::CopyToBuffer(buf, pos, &d, 1); // record length + helper::CopyToBuffer(buf, pos, &MetaDataPos, 1); + helper::CopyToBuffer(buf, pos, &MetaDataSize, 1); + d = static_cast(FlushPosSizeInfo.size()); + helper::CopyToBuffer(buf, pos, &d, 1); + for (int writer = 0; writer < m_Comm.Size(); writer++) { for (size_t flushNum = 0; flushNum < FlushPosSizeInfo.size(); flushNum++) { - buf[pos + (flushNum * 2)] = FlushPosSizeInfo[flushNum][2 * writer]; - buf[pos + (flushNum * 2) + 1] = - FlushPosSizeInfo[flushNum][2 * writer + 1]; + // add two numbers here + helper::CopyToBuffer(buf, pos, + &FlushPosSizeInfo[flushNum][2 * writer], 2); } - buf[pos + FlushPosSizeInfo.size() * 2] = m_WriterDataPos[writer]; - pos += (FlushPosSizeInfo.size() * 2) + 1; + helper::CopyToBuffer(buf, pos, &m_WriterDataPos[writer], 1); } - m_FileMetadataIndexManager.WriteFiles((char *)buf.data(), - buf.size() * sizeof(uint64_t)); + m_FileMetadataIndexManager.WriteFiles((char *)buf.data(), buf.size()); #ifdef DUMPDATALOCINFO std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl; @@ -675,6 +692,19 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) std::to_string(Version) + " version"); } + // BP minor version + position = m_BPMinorVersionPosition; + uint8_t minorVersion = + helper::ReadValue(buffer, position, IsLittleEndian); + if (minorVersion != m_BP5MinorVersion) + { + helper::Throw( + "Engine", "BP5Writer", "CountStepsInMetadataIndex", + "Current ADIOS2 BP5 Engine can only append to bp format 5." + + std::to_string(m_BP5MinorVersion) + " but this file is 5." + + std::to_string(minorVersion) + " version"); + } + position = m_ColumnMajorFlagPosition; const uint8_t columnMajor = helper::ReadValue(buffer, position, IsLittleEndian); @@ -697,12 +727,13 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) uint64_t nDataFiles = 0; while (position < buffer.size()) { - position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize - const uint64_t FlushCount = - helper::ReadValue(buffer, position, IsLittleEndian); - const uint64_t hasWriterMap = - helper::ReadValue(buffer, position, IsLittleEndian); - if (hasWriterMap) + const unsigned char recordID = + helper::ReadValue(buffer, position, IsLittleEndian); + position += sizeof(uint64_t); // recordLength + + switch (recordID) + { + case IndexRecord::WriterMapRecord: { m_AppendWriterCount = helper::ReadValue(buffer, position, IsLittleEndian); @@ -716,11 +747,20 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) } // jump over writermap position += m_AppendWriterCount * sizeof(uint64_t); + break; + } + case IndexRecord::StepRecord: + { + position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); + // jump over the metadata positions + position += + sizeof(uint64_t) * m_AppendWriterCount * ((2 * FlushCount) + 1); + availableSteps++; + break; + } } - - position += - sizeof(uint64_t) * m_AppendWriterCount * ((2 * FlushCount) + 1); - availableSteps++; } unsigned int targetStep = 0; @@ -774,15 +814,13 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) // reading one step beyond target to get correct offsets while (currentStep <= targetStep && position < buffer.size()) { - m_AppendMetadataIndexPos = position; - const uint64_t MetadataPos = - helper::ReadValue(buffer, position, IsLittleEndian); - position += sizeof(uint64_t); // MetadataSize - const uint64_t FlushCount = - helper::ReadValue(buffer, position, IsLittleEndian); - const uint64_t hasWriterMap = - helper::ReadValue(buffer, position, IsLittleEndian); - if (hasWriterMap) + const unsigned char recordID = + helper::ReadValue(buffer, position, IsLittleEndian); + position += sizeof(uint64_t); // recordLength + + switch (recordID) + { + case IndexRecord::WriterMapRecord: { m_AppendWriterCount = helper::ReadValue(buffer, position, IsLittleEndian); @@ -799,38 +837,50 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) buffer, position, IsLittleEndian); writerToFileMap.push_back(subfileIdx); } + break; } + case IndexRecord::StepRecord: + { + m_AppendMetadataIndexPos = position; + const uint64_t MetadataPos = + helper::ReadValue(buffer, position, IsLittleEndian); + position += sizeof(uint64_t); // MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); - m_AppendMetadataPos = static_cast(MetadataPos); + m_AppendMetadataPos = static_cast(MetadataPos); - if (currentStep == targetStep) - { - // we need the very first (smallest) write position to each - // subfile Offsets and sizes, 2*FlushCount + 1 per writer - for (uint64_t i = 0; i < m_AppendWriterCount; i++) + if (currentStep == targetStep) { - // first flush/write position will do - const size_t FirstDataPos = - static_cast(helper::ReadValue( - buffer, position, IsLittleEndian)); - position += - sizeof(uint64_t) * 2 * FlushCount; // no need to read - /* std::cout << "Writer " << i << " subfile " << - writerToFileMap[i] << " first data loc:" << - FirstDataPos << std::endl; */ - if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]]) + // we need the very first (smallest) write position to each + // subfile Offsets and sizes, 2*FlushCount + 1 per writer + for (uint64_t i = 0; i < m_AppendWriterCount; i++) { - m_AppendDataPos[writerToFileMap[i]] = FirstDataPos; + // first flush/write position will do + const size_t FirstDataPos = + static_cast(helper::ReadValue( + buffer, position, IsLittleEndian)); + position += + sizeof(uint64_t) * 2 * FlushCount; // no need to read + /* std::cout << "Writer " << i << " subfile " << + writerToFileMap[i] << " first data loc:" << + FirstDataPos << std::endl; */ + if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]]) + { + m_AppendDataPos[writerToFileMap[i]] = FirstDataPos; + } } } + else + { + // jump over all data offsets in this step + position += sizeof(uint64_t) * m_AppendWriterCount * + (1 + 2 * FlushCount); + } + currentStep++; + break; } - else - { - // jump over all data offsets in this step - position += - sizeof(uint64_t) * m_AppendWriterCount * (1 + 2 * FlushCount); } - currentStep++; } return targetStep; } @@ -1029,17 +1079,17 @@ void BP5Writer::InitTransports() } /*generate the header for the metadata index file*/ -void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, - const bool isActive) +void BP5Writer::MakeHeader(std::vector &buffer, size_t &position, + const std::string fileType, const bool isActive) { auto lf_CopyVersionChar = [](const std::string version, std::vector &buffer, size_t &position) { helper::CopyToBuffer(buffer, position, version.c_str()); }; - auto &buffer = b.m_Buffer; - auto &position = b.m_Position; - auto &absolutePosition = b.m_AbsolutePosition; + // auto &buffer = b.m_Buffer; + // auto &position = b.m_Position; + // auto &absolutePosition = b.m_AbsolutePosition; if (position > 0) { helper::Throw( @@ -1050,9 +1100,9 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, std::to_string(position) + " bytes."); } - if (b.GetAvailableSize() < m_IndexHeaderSize) + if (buffer.size() < m_IndexHeaderSize) { - b.Resize(m_IndexHeaderSize, "BP4Serializer::MakeHeader " + fileType); + buffer.resize(m_IndexHeaderSize); } const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR)); @@ -1131,7 +1181,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, "ADIOS Coding ERROR in BP5Writer::MakeHeader. BP Minor version " "position mismatch"); } - const uint8_t minorversion = 1; + const uint8_t minorversion = m_BP5MinorVersion; helper::CopyToBuffer(buffer, position, &minorversion); // byte 39: Active flag (used in Index Table only) @@ -1153,7 +1203,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, // byte 41-63: unused position += 23; - absolutePosition = position; + // absolutePosition = position; } void BP5Writer::UpdateActiveFlag(const bool active) diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index e2db854d7b..a0b1b22361 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -227,8 +227,8 @@ class BP5Writer : public BP5Engine, public core::Engine std::vector> FlushPosSizeInfo; - void MakeHeader(format::BufferSTL &b, const std::string fileType, - const bool isActive); + void MakeHeader(std::vector &buffer, size_t &position, + const std::string fileType, const bool isActive); std::vector m_WriterSubfileMap; // rank => subfile index diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index cfae81fa4c..7d2854e7e8 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -3,7 +3,6 @@ from os import fstat from .utils import * -WriterCount = -1 def ReadWriterMap(bytearray, pos): data = np.frombuffer(bytearray, dtype=np.uint64, count=3, @@ -29,50 +28,67 @@ def ReadWriterMap(bytearray, pos): pos = pos + WriterCount * 8 return pos, WriterCount, AggregatorCount, SubfileCount + def ReadIndex(f, fileSize): nBytes = fileSize - f.tell() if nBytes <= 0: return True table = f.read(nBytes) + WriterCount = 0 pos = 0 step = 0 while pos < nBytes: print("-----------------------------------------------" + "---------------------------------------------------") - data = np.frombuffer(table, dtype=np.uint64, count=4, - offset=pos) - stepstr = str(step).ljust(6) - mdatapos = str(data[0]).ljust(10) - mdatasize = str(data[1]).ljust(10) - flushcount = str(data[2]).ljust(3) - FlushCount = data[2] - haswritermap = data[3] - print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + - " | MetadataSize = " + mdatasize + " | FlushCount = " + - flushcount + "| hasWriterMap = " + - str(haswritermap).ljust(3) + "|") - - pos = pos + 4 * 8 - - if (haswritermap > 0): + record = chr(table[pos]) + pos = pos + 1 + reclen = np.frombuffer(table, dtype=np.uint64, count=1, + offset=pos) + pos = pos + 8 + print("Record '{0}', length = {1}".format(record, reclen)) + if record == 's': + # print("Step record, length = {0}".format(reclen)) + data = np.frombuffer(table, dtype=np.uint64, count=3, + offset=pos) + stepstr = str(step).ljust(6) + mdatapos = str(data[0]).ljust(10) + mdatasize = str(data[1]).ljust(10) + flushcount = str(data[2]).ljust(3) + FlushCount = data[2] + + print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + + " | MetadataSize = " + mdatasize + " | FlushCount = " + + flushcount + "|") + + pos = pos + 3 * 8 + + for Writer in range(0, WriterCount): + start = " Writer " + str(Writer) + " data " + thiswriter = np.frombuffer(table, dtype=np.uint64, + count=int(FlushCount * 2 + 1), + offset=pos) + + for i in range(0, FlushCount): # for flushes + start += ("loc:" + str(thiswriter[int(i * 2)]) + " siz:" + + str(thiswriter[i * 2 + 1]) + "; ") + start += "loc:" + str(thiswriter[int(FlushCount * 2)]) + pos = int(pos + (FlushCount * 2 + 1) * 8) + print(start) + step = step + 1 + + elif record == 'w': + # print("WriterMap record") pos, WriterCount, AggregatorCount, SubfileCount = ReadWriterMap( table, pos) - for Writer in range(0, WriterCount): - start = " Writer " + str(Writer) + " data " - thiswriter = np.frombuffer(table, dtype=np.uint64, - count=int(FlushCount * 2 + 1), - offset=pos) - - for i in range(0, FlushCount): # for flushes - start += ("loc:" + str(thiswriter[int(i * 2)]) + " siz:" + - str(thiswriter[i * 2 + 1]) + "; ") - start += "loc:" + str(thiswriter[int(FlushCount * 2)]) - pos = int(pos + (FlushCount * 2 + 1) * 8) - print(start) + elif record == 'm': + print("MetaMeta record") + + else: + print("Unknown record {0}, lenght = {1}".format(record, reclen)) + print("---------------------------------------------------" + "-----------------------------------------------") - step = step + 1 if fileSize - f.tell() > 1: print("ERROR: There are {0} bytes at the end of file" diff --git a/source/utils/bpls/bpls.cpp b/source/utils/bpls/bpls.cpp index d74c90f46a..00f2788b9b 100644 --- a/source/utils/bpls/bpls.cpp +++ b/source/utils/bpls/bpls.cpp @@ -1549,9 +1549,8 @@ int doList(const char *path) } catch (std::exception &e) { - if (verbose > 2) - printf("Failed to open with %s engine: %s\n", - engineName.c_str(), e.what()); + printf("Failed to open with %s engine: %s\n", + engineName.c_str(), e.what()); } if (fp != nullptr) break; From fe7d128c58a0811ec67d95c6e263fa9e9819ee12 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 10 Mar 2022 14:52:17 -0500 Subject: [PATCH 2/3] format --- source/utils/bpls/bpls.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/utils/bpls/bpls.cpp b/source/utils/bpls/bpls.cpp index 00f2788b9b..2d5fcca6ee 100644 --- a/source/utils/bpls/bpls.cpp +++ b/source/utils/bpls/bpls.cpp @@ -1549,8 +1549,8 @@ int doList(const char *path) } catch (std::exception &e) { - printf("Failed to open with %s engine: %s\n", - engineName.c_str(), e.what()); + printf("Failed to open with %s engine: %s\n", engineName.c_str(), + e.what()); } if (fp != nullptr) break; From 5bc3374984b5621a55e3a06a11fcd47388838b9d Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 11 Mar 2022 11:22:45 -0500 Subject: [PATCH 3/3] Find and accept parameters CASE INSENSITIVE (e.g. nUmaggReGatoRs is NumAggregators). Set NumSubFiles default to 0 as other default sets NumAggregators in BP5Writer::InitParameters() --- source/adios2/engine/bp5/BP5Engine.cpp | 73 +++++++++++++++++++------- source/adios2/engine/bp5/BP5Engine.h | 6 +-- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.cpp b/source/adios2/engine/bp5/BP5Engine.cpp index 9ffcb6b13f..0ef8619142 100644 --- a/source/adios2/engine/bp5/BP5Engine.cpp +++ b/source/adios2/engine/bp5/BP5Engine.cpp @@ -146,11 +146,20 @@ BP5Engine::GetBPSubStreamNames(const std::vector &names, void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) { + adios2::Params params_lowercase; + for (auto &p : io.m_Parameters) + { + const std::string key = helper::LowerCase(p.first); + const std::string value = helper::LowerCase(p.second); + params_lowercase[key] = value; + } + auto lf_SetBoolParameter = [&](const std::string key, bool ¶meter, bool def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; std::transform(value.begin(), value.end(), value.begin(), @@ -174,9 +183,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetFloatParameter = [&](const std::string key, float ¶meter, float def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; parameter = @@ -186,9 +196,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetSizeBytesParameter = [&](const std::string key, size_t ¶meter, size_t def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; parameter = helper::StringToByteUnits( @@ -198,9 +209,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetIntParameter = [&](const std::string key, int ¶meter, int def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { parameter = std::stoi(itKey->second); return true; @@ -210,9 +222,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetUIntParameter = [&](const std::string key, unsigned int ¶meter, unsigned int def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { unsigned long result = std::stoul(itKey->second); if (result > std::numeric_limits::max()) @@ -227,8 +240,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetStringParameter = [&](const std::string key, std::string ¶meter, const char *def) { - auto itKey = io.m_Parameters.find(key); - if (itKey != io.m_Parameters.end()) + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); + parameter = def; + if (itKey != params_lowercase.end()) { parameter = itKey->second; return true; @@ -238,9 +253,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetBufferVTypeParameter = [&](const std::string key, int ¶meter, int def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; std::transform(value.begin(), value.end(), value.begin(), @@ -265,9 +281,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetAggregationTypeParameter = [&](const std::string key, int ¶meter, int def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; std::transform(value.begin(), value.end(), value.begin(), @@ -297,9 +314,10 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) auto lf_SetAsyncWriteParameter = [&](const std::string key, int ¶meter, int def) { - auto itKey = io.m_Parameters.find(key); + const std::string lkey = helper::LowerCase(std::string(key)); + auto itKey = params_lowercase.find(lkey); parameter = def; - if (itKey != io.m_Parameters.end()) + if (itKey != params_lowercase.end()) { std::string value = itKey->second; std::transform(value.begin(), value.end(), value.begin(), @@ -331,8 +349,27 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) #define get_params(Param, Type, Typedecl, Default) \ lf_Set##Type##Parameter(#Param, Params.Param, Default); + BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params); #undef get_params + + if (Params.verbose > 0 && !m_RankMPI) + { + std::cout << "---------------- " << io.m_EngineType + << " engine parameters --------------\n"; +#define print_params(Param, Type, Typedecl, Default) \ + lf_Set##Type##Parameter(#Param, Params.Param, Default); \ + if (!m_RankMPI) \ + { \ + std::cout << " " << std::string(#Param) << " = " << Params.Param \ + << " default = " << Default << std::endl; \ + } + + BP5_FOREACH_PARAMETER_TYPE_4ARGS(print_params); +#undef print_params + std::cout << "-----------------------------------------------------" + << std::endl; + } }; } // namespace engine diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 1aa2df4c18..b07c515026 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -126,13 +126,13 @@ class BP5Engine MACRO(BeginStepPollingFrequencySecs, Float, float, 1.0f) \ MACRO(StreamReader, Bool, bool, false) \ MACRO(BurstBufferDrain, Bool, bool, true) \ - MACRO(BurstBufferPath, String, std::string, (char *)(intptr_t)0) \ + MACRO(BurstBufferPath, String, std::string, "") \ MACRO(NodeLocal, Bool, bool, false) \ MACRO(verbose, Int, int, 0) \ MACRO(CollectiveMetadata, Bool, bool, true) \ MACRO(NumAggregators, UInt, unsigned int, 0) \ MACRO(AggregatorRatio, UInt, unsigned int, 0) \ - MACRO(NumSubFiles, UInt, unsigned int, 999999) \ + MACRO(NumSubFiles, UInt, unsigned int, 0) \ MACRO(StripeSize, UInt, unsigned int, 4096) \ MACRO(DirectIO, Bool, bool, false) \ MACRO(DirectIOAlignOffset, UInt, unsigned int, 512) \ @@ -148,7 +148,7 @@ class BP5Engine MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \ MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \ MACRO(AppendAfterSteps, Int, int, INT_MAX) \ - MACRO(SelectSteps, String, std::string, (char *)(intptr_t)0) \ + MACRO(SelectSteps, String, std::string, "") \ MACRO(ReaderShortCircuitReads, Bool, bool, false) struct BP5Params