From 376d89accfcc11f394f4da39bc69626ac9e86824 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Mon, 24 Jan 2022 15:31:46 -0500 Subject: [PATCH 1/8] BP5 append after specified step. Supports different number of writers/aggregators/subfiles. It changes the BP5 index format with writing the writers => subfiles map every time it changes. --- source/adios2/engine/bp5/BP5Engine.h | 1 + source/adios2/engine/bp5/BP5Writer.cpp | 447 ++++++++++++------ source/adios2/engine/bp5/BP5Writer.h | 16 +- source/utils/bp5dbg/adios2/bp5dbg/idxtable.py | 54 ++- source/utils/bp5dbg/adios2/bp5dbg/utils.py | 33 +- 5 files changed, 361 insertions(+), 190 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 7ed67ada31..78b5b1f2a6 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -130,6 +130,7 @@ class BP5Engine MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \ MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \ MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \ + MACRO(AppendAfterStep, UInt, unsigned int, INT_MAX) \ MACRO(ReaderShortCircuitReads, Bool, bool, false) struct BP5Params diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 3fdb5fddbc..42efd00504 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -311,16 +311,32 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos, uint64_t MetaDataSize) { - m_FileMetadataManager.FlushFiles(); - std::vector buf; - buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); + std::vector buf( + 4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size() + 3 + + m_Comm.Size()); + buf.resize(4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()); buf[0] = MetaDataPos; buf[1] = MetaDataSize; buf[2] = FlushPosSizeInfo.size(); + buf[3] = static_cast(m_WriterSubfileMap.size() > 0); - uint64_t pos = 3; + uint64_t pos = 4; + + if (!m_WriterSubfileMap.empty()) + { + // Add Writer to Subfiles Map + buf.resize(buf.size() + 3 + m_Comm.Size()); + buf[4] = static_cast(m_Comm.Size()); + buf[5] = static_cast(m_Aggregator->m_NumAggregators); + buf[6] = static_cast(m_Aggregator->m_SubStreams); + pos += 3; + std::copy(m_WriterSubfileMap.begin(), m_WriterSubfileMap.end(), + buf.begin() + pos); + m_WriterSubfileMap.clear(); + pos += m_Comm.Size(); + } for (int writer = 0; writer < m_Comm.Size(); writer++) { @@ -551,43 +567,6 @@ void BP5Writer::Init() InitBPBuffer(); } -#define declare_type(T) \ - void BP5Writer::DoPut(Variable &variable, \ - typename Variable::Span &span, \ - const bool initialize, const T &value) \ - { \ - PERFSTUBS_SCOPED_TIMER("BP5Writer::Put"); \ - PutCommonSpan(variable, span, initialize, value); \ - } - -ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type) -#undef declare_type - -#define declare_type(T) \ - void BP5Writer::DoPutSync(Variable &variable, const T *data) \ - { \ - PutCommon(variable, data, true); \ - } \ - void BP5Writer::DoPutDeferred(Variable &variable, const T *data) \ - { \ - PutCommon(variable, data, false); \ - } - -ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) -#undef declare_type - -#define declare_type(T, L) \ - T *BP5Writer::DoBufferData_##L(const int bufferIdx, \ - const size_t payloadPosition, \ - const size_t bufferID) noexcept \ - { \ - return reinterpret_cast( \ - m_BP5Serializer.GetPtr(bufferIdx, payloadPosition)); \ - } - -ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type) -#undef declare_type - void BP5Writer::InitParameters() { ParseParams(m_IO, m_Parameters); @@ -615,6 +594,188 @@ void BP5Writer::InitParameters() } } +uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) +{ + const auto &buffer = bufferSTL.m_Buffer; + size_t &position = bufferSTL.m_Position; + + if (buffer.size() < 64) + { + return 0; + } + + // Check endinanness + position = m_EndianFlagPosition; + const uint8_t endianness = helper::ReadValue(buffer, position); + bool IsLittleEndian = (endianness == 0) ? true : false; + if (helper::IsLittleEndian() != IsLittleEndian) + { + std::string m = (IsLittleEndian ? "Little" : "Big"); + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports appending with the same " + "endianness. The existing file is " + + m + "Endian\n"); + } + + // BP version + position = m_BPVersionPosition; + uint8_t Version = + helper::ReadValue(buffer, position, IsLittleEndian); + if (Version != 5) + { + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports bp format " + "version 5, found " + + std::to_string(Version) + " version \n"); + } + + position = m_ColumnMajorFlagPosition; + const uint8_t columnMajor = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint8_t NowColumnMajor = + (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; + if (columnMajor != NowColumnMajor) + { + std::string m = (columnMajor == 'y' ? "column" : "row"); + throw std::runtime_error( + "ERROR: ADIOS2 BP5 Engine only supports appending with the same " + "column/row major settings as it was written." + " Existing file is " + + m + " major\n"); + } + + position = 64; // after the header + // Just count the steps first + unsigned int availableSteps = 0; + uint64_t nDataFiles = 0; + while (position < buffer.size()) + { + position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint64_t hasWriterMap = + helper::ReadValue(buffer, position, IsLittleEndian); + if (hasWriterMap) + { + m_AppendWriterCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendAggregatorCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendSubfileCount = + helper::ReadValue(buffer, position, IsLittleEndian); + if (m_AppendSubfileCount > nDataFiles) + { + nDataFiles = m_AppendSubfileCount; + } + // jump over writermap + position += m_AppendWriterCount * sizeof(uint64_t); + } + + position += + sizeof(uint64_t) * m_AppendWriterCount * ((2 * FlushCount) + 1); + availableSteps++; + } + + unsigned int targetStep = m_Parameters.AppendAfterStep; + if (targetStep < 0) + { + // -1 means last step + targetStep = availableSteps + m_Parameters.AppendAfterStep + 1; + if (targetStep < 0) + { + targetStep = 0; + } + } + if (targetStep > availableSteps) + { + targetStep = availableSteps; + } + + if (!targetStep) + { + // append at 0 is like writing new file + return 0; + } + + m_AppendDataPos.resize(nDataFiles); + std::fill(m_AppendDataPos.begin(), m_AppendDataPos.end(), MaxSizeT); + m_AppendMetadataPos = MaxSizeT; // size of header + m_AppendMetadataIndexPos = MaxSizeT; + + if (targetStep == availableSteps) + { + // append after existing steps + return targetStep; + } + + // append but not at 0 and not after existing steps + // Read each record now completely to get offsets at step+1 + position = 64; + unsigned int currentStep = 0; + std::vector writerToFileMap; + // reading one step beyond target to get correct offsets + while (currentStep <= targetStep && position < buffer.size()) + { + m_AppendMetadataIndexPos = position; + const uint64_t MetadataPos = + helper::ReadValue(buffer, position, IsLittleEndian); + position += sizeof(uint64_t); // MetadataSize + const uint64_t FlushCount = + helper::ReadValue(buffer, position, IsLittleEndian); + const uint64_t hasWriterMap = + helper::ReadValue(buffer, position, IsLittleEndian); + if (hasWriterMap) + { + m_AppendWriterCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendAggregatorCount = + helper::ReadValue(buffer, position, IsLittleEndian); + m_AppendSubfileCount = + helper::ReadValue(buffer, position, IsLittleEndian); + + // Get the process -> subfile map + writerToFileMap.clear(); + for (uint64_t i = 0; i < m_AppendWriterCount; i++) + { + const uint64_t subfileIdx = helper::ReadValue( + buffer, position, IsLittleEndian); + writerToFileMap.push_back(subfileIdx); + } + } + + m_AppendMetadataPos = static_cast(MetadataPos); + + if (currentStep == targetStep) + { + // we need the very first (smallest) write position to each subfile + // Offsets and sizes, 2*FlushCount + 1 per writer + for (uint64_t i = 0; i < m_AppendWriterCount; i++) + { + // first flush/write position will do + const size_t FirstDataPos = + static_cast(helper::ReadValue( + buffer, position, IsLittleEndian)); + position += + sizeof(uint64_t) * 2 * FlushCount; // no need to read + std::cout << "Writer " << i << " subfile " << writerToFileMap[i] + << " first data loc:" << FirstDataPos << std::endl; + if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]]) + { + m_AppendDataPos[writerToFileMap[i]] = FirstDataPos; + } + } + } + else + { + // jump over all data offsets in this step + position += + sizeof(uint64_t) * m_AppendWriterCount * (1 + 2 * FlushCount); + } + currentStep++; + } + return targetStep; +} + void BP5Writer::InitAggregator() { // in BP5, aggregation is "always on", but processes may be alone, so @@ -724,6 +885,7 @@ void BP5Writer::InitTransports() m_FileMetadataManager.MkDirsBarrier(m_MetadataFileNames, m_IO.m_TransportsParameters, m_Parameters.NodeLocal || m_WriteToBB); + /* Create the directories on burst buffer if used */ if (m_DrainBB) { /* Create the directories on target anyway by main thread */ @@ -940,95 +1102,6 @@ void BP5Writer::UpdateActiveFlag(const bool active) } } -uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) -{ - const auto &buffer = bufferSTL.m_Buffer; - size_t &position = bufferSTL.m_Position; - - // Check endinanness - position = m_EndianFlagPosition; - const uint8_t endianness = helper::ReadValue(buffer, position); - bool IsLittleEndian = (endianness == 0) ? true : false; -#ifndef ADIOS2_HAVE_ENDIAN_REVERSE - if (helper::IsLittleEndian() != IsLittleEndian) - { - throw std::runtime_error( - "ERROR: reader found BigEndian bp file, " - "this version of ADIOS2 wasn't compiled " - "with the cmake flag -DADIOS2_USE_Endian_Reverse=ON " - "explicitly, in call to Open\n"); - } -#endif - - // BP version - position = m_BPVersionPosition; - uint8_t Version = - helper::ReadValue(buffer, position, IsLittleEndian); - if (Version != 5) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports bp format " - "version 5, found " + - std::to_string(Version) + " version \n"); - } - - position = m_WriterCountPosition; - uint32_t WriterCount = - helper::ReadValue(buffer, position, IsLittleEndian); - if ((int)WriterCount != m_Comm.Size()) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "number of processes as it was written." - " Number of writers in the existing file = " + - std::to_string(WriterCount) + "\n"); - } - - position = m_AggregatorCountPosition; - uint32_t AggregatorCount = - helper::ReadValue(buffer, position, IsLittleEndian); - if ((size_t)AggregatorCount != m_Aggregator->m_NumAggregators) - { - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "number of aggregators as it was written." - " Number of aggregators in the existing file = " + - std::to_string(AggregatorCount) + - " current number of aggregators = " + - std::to_string(m_Aggregator->m_NumAggregators) + "\n"); - } - - position = m_ColumnMajorFlagPosition; - const uint8_t columnMajor = - helper::ReadValue(buffer, position, IsLittleEndian); - const uint8_t NowColumnMajor = - (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; - if (columnMajor != NowColumnMajor) - { - std::string m = (columnMajor == 'y' ? "column" : "row"); - throw std::runtime_error( - "ERROR: ADIOS2 BP5 Engine only supports appending with the same " - "column/row major settings as it was written." - " Existing file is " + - m + " major\n"); - } - - // move position to first row - position = 64 + WriterCount * sizeof(uint64_t); - - // Read each record now - uint64_t currentStep = 0; - while (position < buffer.size()) - { - position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize - const uint64_t FlushCount = - helper::ReadValue(buffer, position, IsLittleEndian); - position += sizeof(uint64_t) * WriterCount * ((2 * FlushCount) + 1); - currentStep++; - } - return currentStep; -} - void BP5Writer::InitBPBuffer() { if (m_OpenMode == Mode::Append) @@ -1049,56 +1122,72 @@ void BP5Writer::InitBPBuffer() } m_Comm.BroadcastVector(preMetadataIndex.m_Buffer); m_WriterStep = CountStepsInMetadataIndex(preMetadataIndex); - if (m_WriterStep > 0) - { - if (m_Aggregator->m_IsAggregator) - { - m_DataPos = m_FileDataManager.GetFileSize(0); - } - - if (m_Comm.Rank() == 0) - { - // Get the size of existing metametadata file - m_BP5Serializer.m_PreMetaMetadataFileLength = - m_FileMetaMetadataManager.GetFileSize(0); - // Get the size of existing metadata file - m_MetaDataPos = m_FileMetadataManager.GetFileSize(0); - } - } } if (!m_WriterStep) { - /* This is a new file. + /* This is a new file or append at 0 * Make headers in data buffer and metadata buffer (but do not write * them yet so that Open() can stay free of writing to disk) */ - const uint64_t a = - static_cast(m_Aggregator->m_SubStreamIndex); - std::vector Assignment = m_Comm.GatherValues(a, 0); if (m_Comm.Rank() == 0) { format::BufferSTL b; MakeHeader(b, "Metadata", false); + m_FileMetadataManager.SeekToFileBegin(); m_FileMetadataManager.WriteFiles(b.m_Buffer.data(), b.m_Position); m_MetaDataPos = b.m_Position; format::BufferSTL bi; MakeHeader(bi, "Index Table", true); + m_FileMetadataIndexManager.SeekToFileBegin(); m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), bi.m_Position); - // where each rank's data will end up - m_FileMetadataIndexManager.WriteFiles((char *)Assignment.data(), - sizeof(Assignment[0]) * - Assignment.size()); + } + if (m_Aggregator->m_IsAggregator) + { + m_FileDataManager.SeekTo(0); } } else { + if (m_Aggregator->m_IsAggregator) + { + const size_t off = m_AppendDataPos[m_Aggregator->m_SubStreamIndex]; + if (off < MaxSizeT) + { + m_FileDataManager.SeekTo(off); + m_DataPos = off; + } + else + { + m_DataPos = m_FileDataManager.GetFileSize(0); + } + } + if (m_Comm.Rank() == 0) { + // Get the size of existing metadata file + if (m_AppendMetadataPos < MaxSizeT) + { + m_MetaDataPos = m_AppendMetadataPos; + m_FileMetadataManager.SeekTo(m_MetaDataPos); + } + else + { + m_MetaDataPos = m_FileMetadataManager.GetFileSize(0); + } + // Set the flag in the header of metadata index table to 1 again // to indicate a new run begins UpdateActiveFlag(true); + if (m_AppendMetadataIndexPos < MaxSizeT) + { + m_FileMetadataIndexManager.SeekTo(m_AppendMetadataIndexPos); + } + else + { + m_FileMetadataIndexManager.SeekToFileEnd(); + } } } @@ -1106,6 +1195,19 @@ void BP5Writer::InitBPBuffer() { m_WriterDataPos.resize(m_Comm.Size()); } + + if (!m_WriterStep || + m_AppendWriterCount != static_cast(m_Comm.Size()) || + m_AppendAggregatorCount != + static_cast(m_Aggregator->m_NumAggregators) || + m_AppendSubfileCount != + static_cast(m_Aggregator->m_SubStreams)) + { + // new Writer Map is needed, generate now, write later + const uint64_t a = + static_cast(m_Aggregator->m_SubStreamIndex); + m_WriterSubfileMap = m_Comm.GatherValues(a, 0); + } } void BP5Writer::NotifyEngineAttribute(std::string name, DataType type) noexcept @@ -1357,6 +1459,43 @@ size_t BP5Writer::DebugGetDataBufferSize() const return m_BP5Serializer.DebugGetDataBufferSize(); } +#define declare_type(T) \ + void BP5Writer::DoPut(Variable &variable, \ + typename Variable::Span &span, \ + const bool initialize, const T &value) \ + { \ + PERFSTUBS_SCOPED_TIMER("BP5Writer::Put"); \ + PutCommonSpan(variable, span, initialize, value); \ + } + +ADIOS2_FOREACH_PRIMITIVE_STDTYPE_1ARG(declare_type) +#undef declare_type + +#define declare_type(T) \ + void BP5Writer::DoPutSync(Variable &variable, const T *data) \ + { \ + PutCommon(variable, data, true); \ + } \ + void BP5Writer::DoPutDeferred(Variable &variable, const T *data) \ + { \ + PutCommon(variable, data, false); \ + } + +ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) +#undef declare_type + +#define declare_type(T, L) \ + T *BP5Writer::DoBufferData_##L(const int bufferIdx, \ + const size_t payloadPosition, \ + const size_t bufferID) noexcept \ + { \ + return reinterpret_cast( \ + m_BP5Serializer.GetPtr(bufferIdx, payloadPosition)); \ + } + +ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type) +#undef declare_type + } // end namespace engine } // end namespace core } // end namespace adios2 diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index de2f0c3465..918097aef0 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -106,7 +106,7 @@ class BP5Writer : public BP5Engine, public core::Engine void InitParameters() final; /** Set up the aggregator */ void InitAggregator(); - /** Parses transports and parameters from IO AddTransport */ + /** Complete opening/createing metadata and data files */ void InitTransports() final; /** Allocates memory and starts a PG group */ void InitBPBuffer(); @@ -238,6 +238,20 @@ class BP5Writer : public BP5Engine, public core::Engine void MakeHeader(format::BufferSTL &b, const std::string fileType, const bool isActive); + std::vector m_WriterSubfileMap; // rank => subfile index + + // Append helper data + std::vector m_AppendDataPos; // each subfile append pos + size_t m_AppendMetadataPos; // metadata file append pos + size_t m_AppendMetadataIndexPos; // index file append pos + uint32_t m_AppendWriterCount; // last active number of writers + unsigned int m_AppendAggregatorCount; // last active number of aggr + unsigned int m_AppendSubfileCount; // last active number of subfiles + /* Process existing index, fill in append variables, + * and return the actual step we land after appending. + * Uses parameter AppendAfterStep + * It resets m_Aggregator->m_NumAggregators so init aggregators later + */ uint64_t CountStepsInMetadataIndex(format::BufferSTL &bufferSTL); /* Async write's future */ diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index 0d68354bf7..57e1af3214 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -1,26 +1,35 @@ +from ast import Sub import numpy as np from os import fstat from .utils import * WriterCount = -1 -def ReadWriterArray(f, fileSize, WriterCount): +def ReadWriterMap(bytearray, pos): + data = np.frombuffer(bytearray, dtype=np.uint64, count=3, + offset=pos) + WriterCount = int(data[0]) + AggregatorCount = int(data[1]) + SubfileCount = int(data[2]) + pos = pos + 3 * 8 - print("Writer count is " + str(WriterCount)) - array = f.read(WriterCount * 8) - print("=====================") - print("| Rank | Subfile |") - print("=====================") + print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}".format( + WriterCount, AggregatorCount, SubfileCount)) + data = np.frombuffer(bytearray, dtype=np.uint64, count=WriterCount, + offset=pos) + print(" =====================") + print(" | Rank | Subfile |") + print(" ---------------------") for r in range(0, WriterCount): - pos = r * 8 - data = np.frombuffer(array, dtype=np.uint64, count=1, offset=pos) rank = str(r).rjust(7) - sub = str(data[0]).rjust(9) - print("|" + rank + " | FlushCount = " + sub + " |") - print("=====================") - return True + sub = str(data[r]).rjust(8) + print(" |" + rank + " | " + sub + " |") + print(" =====================") + + pos = pos + WriterCount *8 + return pos, WriterCount, AggregatorCount, SubfileCount -def ReadIndex(f, fileSize, WriterCount): +def ReadIndex(f, fileSize): nBytes = fileSize - f.tell() if nBytes <= 0: return True @@ -30,17 +39,24 @@ def ReadIndex(f, fileSize, WriterCount): while pos < nBytes: print("-----------------------------------------------" + "---------------------------------------------------") - data = np.frombuffer(table, dtype=np.uint64, count=3, + data = np.frombuffer(table, dtype=np.uint64, count=4, offset=pos) stepstr = str(step).ljust(6) mdatapos = str(data[0]).ljust(10) mdatasize = str(data[1]).ljust(10) flushcount = str(data[2]).ljust(3) FlushCount = data[2] + haswritermap = data[3] print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + - " | MetadataSize = " + mdatasize + " |" + flushcount + "|") + " | MetadataSize = " + mdatasize + " | FlushCount = " + + flushcount + "| hasWriterMap = " + str(haswritermap).ljust(3) + "|") + + pos = pos + 4 * 8 + + if (haswritermap > 0): + pos, WriterCount, AggregatorCount, SubfileCount = ReadWriterMap(table, pos) + - pos = pos + 3 * 8 for Writer in range(0, WriterCount): start = " Writer " + str(Writer) + " data " thiswriter = np.frombuffer(table, dtype=np.uint64, @@ -76,10 +92,10 @@ def DumpIndexTable(fileName): if isinstance(status, list): WriterCount = status[1] status = status[0] + #if status: + # status = ReadWriterArray(f, fileSize, WriterCount) if status: - status = ReadWriterArray(f, fileSize, WriterCount) - if status: - status = ReadIndex(f, fileSize, WriterCount) + status = ReadIndex(f, fileSize) return status diff --git a/source/utils/bp5dbg/adios2/bp5dbg/utils.py b/source/utils/bp5dbg/adios2/bp5dbg/utils.py index bcc207d490..48c58c8f00 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/utils.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/utils.py @@ -143,23 +143,24 @@ def ReadHeader(f, fileSize, fileType): iscolumnmajor = header[49] # 45..63 unused - print("-----------------------------------------------------------" - "-----------------------------------------------------------") - print("| Version string | Major | Minor | Patch " - "| unused | Endian | BP version | Active | WriterCount | AggCount" + - " | ColumnMajor | unused |") - print("| 32 bytes | 1B | 1B | 1B " - "| 1B | 1B | 1B | 1B | 4b | 4b " + - "| 1b | 16B |") - print("+----------------------------------------------------------" - "----------------------------------------------------------+") - print("| {0} | {1} | {2} | {3} | | {4} " - "| {5} | {6} | {7:d} | {8:d} | " + - "{9} | |".format( - versionStr, major, minor, micro, endian, bpversion, activeStr, + print("---------------------------------------------------------------" + "---------------------------------------------------------------") + print("| Version string |Major|Minor|Patch" + "|unused|Endian|BP version|Active|WriterCount|AggCount" + + "|ColumnMajor|unused|") + print("| 32 bytes | 1B | 1B | 1B " + "| 1B | 1B | 1B | 1B | 4B | 4B " + + "| 1B | 16B |") + print("+----------------------------------+-----+-----+-----+------+---" + "---+----------+------+-----------+--------+-----------+------+") + print("| {0} | {1} | {2} | {3} | | {4} " + "| {5} | {6} | {7:d} | {8:d} | " + "{9} | |".format( + versionStr, str(major).center(3), str(minor).center(3), + str(micro).center(3), endian, bpversion, activeStr, WriterCount, aggregatorcount, iscolumnmajor)) - print("-----------------------------------------------------------" - "-----------------------------------------------------------") + print("---------------------------------------------------------------" + "---------------------------------------------------------------") return [status, WriterCount] From ff1a74b85964d21a09dfa4f9064df750e33170fa Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Mon, 24 Jan 2022 16:50:59 -0500 Subject: [PATCH 2/8] Add Transport.Truncate() and use that at appending to existing datasets --- source/adios2/engine/bp5/BP5Writer.cpp | 10 ++++++++- source/adios2/toolkit/transport/Transport.h | 2 ++ .../toolkit/transport/file/FileDaos.cpp | 5 +++++ .../adios2/toolkit/transport/file/FileDaos.h | 2 ++ .../toolkit/transport/file/FileFStream.cpp | 18 +++++++++++++++ .../toolkit/transport/file/FileFStream.h | 2 ++ .../adios2/toolkit/transport/file/FileIME.cpp | 5 +++++ .../adios2/toolkit/transport/file/FileIME.h | 2 ++ .../toolkit/transport/file/FilePOSIX.cpp | 17 +++++++++++++- .../adios2/toolkit/transport/file/FilePOSIX.h | 2 ++ .../toolkit/transport/file/FileStdio.cpp | 15 +++++++++++++ .../adios2/toolkit/transport/file/FileStdio.h | 2 ++ .../toolkit/transport/null/NullTransport.cpp | 10 +++++++++ .../toolkit/transport/null/NullTransport.h | 2 ++ .../toolkit/transportman/TransportMan.cpp | 22 +++++++++++++++++++ .../toolkit/transportman/TransportMan.h | 2 ++ 16 files changed, 116 insertions(+), 2 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 42efd00504..a84a826802 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -1155,6 +1155,7 @@ void BP5Writer::InitBPBuffer() const size_t off = m_AppendDataPos[m_Aggregator->m_SubStreamIndex]; if (off < MaxSizeT) { + m_FileDataManager.Truncate(off); m_FileDataManager.SeekTo(off); m_DataPos = off; } @@ -1166,15 +1167,20 @@ void BP5Writer::InitBPBuffer() if (m_Comm.Rank() == 0) { - // Get the size of existing metadata file + // Truncate existing metadata file if (m_AppendMetadataPos < MaxSizeT) { m_MetaDataPos = m_AppendMetadataPos; + m_FileMetadataManager.Truncate(m_MetaDataPos); + // SeekTo instead of SeetToFileEnd in case a transport + // does not support actual truncate + // Plus truncate does not seek anyway m_FileMetadataManager.SeekTo(m_MetaDataPos); } else { m_MetaDataPos = m_FileMetadataManager.GetFileSize(0); + m_FileMetadataManager.SeekToFileEnd(); } // Set the flag in the header of metadata index table to 1 again @@ -1182,6 +1188,8 @@ void BP5Writer::InitBPBuffer() UpdateActiveFlag(true); if (m_AppendMetadataIndexPos < MaxSizeT) { + m_FileMetadataIndexManager.Truncate(m_AppendMetadataIndexPos); + // SeekTo in case a transport does not support actual truncate m_FileMetadataIndexManager.SeekTo(m_AppendMetadataIndexPos); } else diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index 3ccde6be19..dafeb94ac1 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -154,6 +154,8 @@ class Transport virtual void Seek(const size_t start = MaxSizeT) = 0; + virtual void Truncate(const size_t length) = 0; + virtual void MkDir(const std::string &fileName) = 0; protected: diff --git a/source/adios2/toolkit/transport/file/FileDaos.cpp b/source/adios2/toolkit/transport/file/FileDaos.cpp index adfed83b40..0ba08b0ee6 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.cpp +++ b/source/adios2/toolkit/transport/file/FileDaos.cpp @@ -946,5 +946,10 @@ void FileDaos::Seek(const size_t start) } } +void FileDaos::Truncate(const size_t length) +{ + throw std::ios_base::failure("ERROR: Daos Truncate is not implemented yet"); +} + } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FileDaos.h b/source/adios2/toolkit/transport/file/FileDaos.h index d6f533b576..810267a5cc 100644 --- a/source/adios2/toolkit/transport/file/FileDaos.h +++ b/source/adios2/toolkit/transport/file/FileDaos.h @@ -57,6 +57,8 @@ class FileDaos : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index e9da737894..2ccdd931af 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -354,6 +354,24 @@ void FileFStream::Seek(const size_t start) } } +#if __cplusplus >= 201703L +#include +#endif + +void FileFStream::Truncate(const size_t length) +{ +#if __cplusplus >= 201703L + // C++17 specific stuff here + WaitForOpen(); + std::filesystem::path p(m_Name); + std::filesystem::resize_file(p, static_cast(length)); + CheckFile("couldn't move to offset " + std::to_string(start) + " of file " + + m_Name + ", in call to fstream seekp"); +#else + // Trunation is not supported in a portable manner pre C++17 +#endif +} + void FileFStream::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index bc3b7ac0ef..f9441c52d3 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -59,6 +59,8 @@ class FileFStream : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileIME.cpp b/source/adios2/toolkit/transport/file/FileIME.cpp index 5a46b17cc3..7047743731 100644 --- a/source/adios2/toolkit/transport/file/FileIME.cpp +++ b/source/adios2/toolkit/transport/file/FileIME.cpp @@ -355,6 +355,11 @@ void FileIME::Seek(const size_t start) } } +void FileIME::Truncate(const size_t length) +{ + throw std::ios_base::failure("ERROR: Daos Truncate is not implemented yet"); +} + void FileIME::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FileIME.h b/source/adios2/toolkit/transport/file/FileIME.h index 24b9b60fbc..f45b63a9a4 100644 --- a/source/adios2/toolkit/transport/file/FileIME.h +++ b/source/adios2/toolkit/transport/file/FileIME.h @@ -59,6 +59,8 @@ class FileIME : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index 52dc677767..a197541462 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -17,7 +17,7 @@ #include // open, fstat #include // open #include // writev -#include // write, close +#include // write, close, ftruncate #include @@ -566,6 +566,21 @@ void FilePOSIX::Seek(const size_t start) } } +void FilePOSIX::Truncate(const size_t length) +{ + WaitForOpen(); + errno = 0; + const int status = ftruncate(m_FileDescriptor, static_cast(length)); + m_Errno = errno; + if (status == -1) + { + throw std::ios_base::failure( + "ERROR: couldn't truncate to " + std::to_string(length) + + " bytes of file " + m_Name + ", in call to POSIX IO truncate" + + SysErrMsg()); + } +} + void FilePOSIX::MkDir(const std::string &fileName) {} } // end namespace transport diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 2585d4a093..255dbba2b3 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -66,6 +66,8 @@ class FilePOSIX : public Transport void Seek(const size_t start = MaxSizeT) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index 0db79d7254..3ec3fdb5df 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -372,6 +372,21 @@ void FileStdio::Seek(const size_t start) } } +#include // ftruncate +void FileStdio::Truncate(const size_t length) +{ + + WaitForOpen(); + int fd = fileno(m_File); + const auto status = ftruncate(fd, length); + if (status == -1) + { + throw std::ios_base::failure("ERROR: couldn't truncate to " + + std::to_string(length) + " of file " + + m_Name + ", in call to stdio Truncate\n"); + } +} + void FileStdio::MkDir(const std::string &fileName) {} } // end namespace transport } // end namespace adios2 diff --git a/source/adios2/toolkit/transport/file/FileStdio.h b/source/adios2/toolkit/transport/file/FileStdio.h index d3b9e9c44a..37d7ded3d4 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.h +++ b/source/adios2/toolkit/transport/file/FileStdio.h @@ -57,6 +57,8 @@ class FileStdio : public Transport void Seek(const size_t start) final; + void Truncate(const size_t length) final; + void MkDir(const std::string &fileName) final; private: diff --git a/source/adios2/toolkit/transport/null/NullTransport.cpp b/source/adios2/toolkit/transport/null/NullTransport.cpp index 41825fd3df..2a478c36f0 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.cpp +++ b/source/adios2/toolkit/transport/null/NullTransport.cpp @@ -141,6 +141,16 @@ void NullTransport::Seek(const size_t start) Impl->CurPos = start; } +void NullTransport::Truncate(const size_t length) +{ + if (!Impl->IsOpen) + { + throw std::runtime_error( + "ERROR: NullTransport::Truncate: The transport is not open."); + } + Impl->Capacity = length; +} + void NullTransport::MkDir(const std::string &fileName) { return; } void NullTransport::CheckName() const { return; } diff --git a/source/adios2/toolkit/transport/null/NullTransport.h b/source/adios2/toolkit/transport/null/NullTransport.h index f8b95c78a6..28c9f272b0 100644 --- a/source/adios2/toolkit/transport/null/NullTransport.h +++ b/source/adios2/toolkit/transport/null/NullTransport.h @@ -58,6 +58,8 @@ class NullTransport : public Transport void Seek(const size_t start = MaxSizeT) override; + void Truncate(const size_t length) override; + protected: struct NullTransportImpl; std::unique_ptr Impl; diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index 307416064c..0f6b9eab26 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -376,6 +376,28 @@ void TransportMan::SeekTo(const size_t start, const int transportIndex) } } +void TransportMan::Truncate(const size_t length, const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transportPair : m_Transports) + { + auto &transport = transportPair.second; + if (transport->m_Type == "File") + { + transport->Truncate(length); + } + } + } + else + { + auto itTransport = m_Transports.find(transportIndex); + CheckFile(itTransport, ", in call to Truncate with index " + + std::to_string(transportIndex)); + itTransport->second->Truncate(length); + } +} + size_t TransportMan::GetFileSize(const size_t transportIndex) const { auto itTransport = m_Transports.find(transportIndex); diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index 8d901cc723..ba60c84897 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -203,6 +203,8 @@ class TransportMan void SeekTo(const size_t start, const int transportIndex = -1); + void Truncate(const size_t length, const int transportIndex = -1); + /** * Check if a file exists. * @param name From 03106ecf9c9ad798df0ffe574b905f2425fb24cc Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 25 Jan 2022 09:03:54 -0500 Subject: [PATCH 3/8] Modify BP5 header. Modify reader to read writer maps. Reader still only works if there is the same number of writers every step because it does not deal with changes over time. --- source/adios2/engine/bp5/BP5Engine.h | 8 +-- source/adios2/engine/bp5/BP5Reader.cpp | 61 +++++++++++++------ source/adios2/engine/bp5/BP5Reader.h | 20 +++++- source/adios2/engine/bp5/BP5Writer.cpp | 45 +++++++------- source/utils/bp5dbg/adios2/bp5dbg/utils.py | 41 ++++++------- .../engine/bp/TestBPWriteAppendReadADIOS2.cpp | 55 +++++++---------- 6 files changed, 124 insertions(+), 106 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 78b5b1f2a6..b64094086d 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -47,11 +47,9 @@ class BP5Engine static constexpr size_t m_IndexHeaderSize = 64; static constexpr size_t m_EndianFlagPosition = 36; static constexpr size_t m_BPVersionPosition = 37; - static constexpr size_t m_ActiveFlagPosition = 38; - static constexpr size_t m_BPMinorVersionPosition = 39; - static constexpr size_t m_WriterCountPosition = 40; - static constexpr size_t m_AggregatorCountPosition = 44; - static constexpr size_t m_ColumnMajorFlagPosition = 48; + static constexpr size_t m_BPMinorVersionPosition = 38; + static constexpr size_t m_ActiveFlagPosition = 39; + static constexpr size_t m_ColumnMajorFlagPosition = 40; static constexpr size_t m_VersionTagPosition = 0; static constexpr size_t m_VersionTagLength = 32; diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 040e25c8de..96eda6e064 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -40,8 +40,10 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step) { size_t pgstart = m_MetadataIndexTable[Step][0]; size_t Position = pgstart + sizeof(uint64_t); // skip total data size - size_t MDPosition = Position + 2 * sizeof(uint64_t) * m_WriterCount; - for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++) + const uint64_t WriterCount = + m_WriterMap[m_WriterMapIndex[Step]].WriterCount; + size_t MDPosition = Position + 2 * sizeof(uint64_t) * WriterCount; + for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++) { // variable metadata for timestep size_t ThisMDSize = helper::ReadValue( @@ -58,7 +60,7 @@ void BP5Reader::InstallMetadataForTimestep(size_t Step) } MDPosition += ThisMDSize; } - for (size_t WriterRank = 0; WriterRank < m_WriterCount; WriterRank++) + for (size_t WriterRank = 0; WriterRank < WriterCount; WriterRank++) { // attribute metadata for timestep size_t ThisADSize = helper::ReadValue( @@ -173,7 +175,8 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep, { size_t FlushCount = m_MetadataIndexTable[Timestep][2]; size_t DataPosPos = m_MetadataIndexTable[Timestep][3]; - size_t SubfileNum = m_WriterToFileMap[WriterRank]; + size_t SubfileNum = static_cast( + m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]); // check if subfile is already opened if (m_DataFileManager.m_Transports.count(SubfileNum) == 0) @@ -572,7 +575,7 @@ void BP5Reader::InitBuffer(const TimePoint &timeoutInstant, // done m_BP5Deserializer = new format::BP5Deserializer( - m_WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor, + m_WriterMap[0].WriterCount, m_WriterIsRowMajor, m_ReaderIsRowMajor, (m_OpenMode == Mode::ReadRandomAccess)); m_BP5Deserializer->m_Engine = this; @@ -647,33 +650,27 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, std::to_string(m_Minifooter.Version) + " version \n"); } + // BP minor version, unused + position = m_BPMinorVersionPosition; + // Writer active flag position = m_ActiveFlagPosition; const char activeChar = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); m_WriterIsActive = (activeChar == '\1' ? true : false); - position = m_WriterCountPosition; - m_WriterCount = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); - position = m_AggregatorCountPosition; - m_AggregatorCount = helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian); + position = m_ColumnMajorFlagPosition; const uint8_t val = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); m_WriterIsRowMajor = val == 'n'; // move position to first row - position = 64; - } - - for (uint64_t i = 0; i < m_WriterCount; i++) - { - m_WriterToFileMap.push_back(helper::ReadValue( - buffer, position, m_Minifooter.IsLittleEndian)); + position = m_IndexHeaderSize; } // Read each record now uint64_t currentStep = 0; + uint64_t lastMapStep = 0; + uint64_t lastWriterCount = 0; do { std::vector ptrs; @@ -683,6 +680,31 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, buffer, position, m_Minifooter.IsLittleEndian); const uint64_t FlushCount = helper::ReadValue( buffer, position, m_Minifooter.IsLittleEndian); + const uint64_t hasWriterMap = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + + if (hasWriterMap) + { + auto p = m_WriterMap.emplace(currentStep, WriterMapStruct()); + auto &s = p.first->second; + s.WriterCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.AggregatorCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.SubfileCount = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + // Get the process -> subfile map + s.RankToSubfile.reserve(s.WriterCount); + for (uint64_t i = 0; i < s.WriterCount; i++) + { + const uint64_t subfileIdx = helper::ReadValue( + buffer, position, m_Minifooter.IsLittleEndian); + s.RankToSubfile.push_back(subfileIdx); + } + lastMapStep = currentStep; + lastWriterCount = s.WriterCount; + } + m_WriterMapIndex.push_back(lastMapStep); ptrs.push_back(MetadataPos); ptrs.push_back(MetadataSize); @@ -708,7 +730,8 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, } #endif - position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1); + // skip over the writer -> data file offset records + position += sizeof(uint64_t) * lastWriterCount * ((2 * FlushCount) + 1); m_StepsCount++; currentStep++; } while (!oneStepOnly && position < buffer.size()); diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index a84fb0d70b..e7103910ff 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -20,6 +20,8 @@ #include "adios2/toolkit/transportman/TransportMan.h" #include +#include +#include namespace adios2 { @@ -189,12 +191,11 @@ class BP5Reader : public BP5Engine, public Engine #undef declare_type size_t DoSteps() const final; - uint32_t m_WriterCount = 0; - uint32_t m_AggregatorCount = 0; + uint32_t m_WriterColumnMajor = 0; bool m_ReaderIsRowMajor = true; bool m_WriterIsRowMajor = true; - std::vector m_WriterToFileMap; + format::BufferSTL m_MetadataIndex; format::BufferSTL m_MetaMetadata; format::BufferSTL m_Metadata; @@ -205,6 +206,19 @@ class BP5Reader : public BP5Engine, public Engine void ReadData(const size_t WriterRank, const size_t Timestep, const size_t StartOffset, const size_t Length, char *Destination); + + struct WriterMapStruct + { + uint32_t WriterCount = 0; + uint32_t AggregatorCount = 0; + uint32_t SubfileCount = 0; + std::vector RankToSubfile; // size WriterCount + }; + + // step -> writermap but not for all steps + std::map m_WriterMap; + // step -> writermap index (for all steps) + std::vector m_WriterMapIndex; }; } // end namespace engine diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index a84a826802..b3ba8fd913 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -599,7 +599,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) const auto &buffer = bufferSTL.m_Buffer; size_t &position = bufferSTL.m_Position; - if (buffer.size() < 64) + if (buffer.size() < m_IndexHeaderSize) { return 0; } @@ -644,7 +644,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) m + " major\n"); } - position = 64; // after the header + position = m_IndexHeaderSize; // after the header // Just count the steps first unsigned int availableSteps = 0; uint64_t nDataFiles = 0; @@ -710,7 +710,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) // append but not at 0 and not after existing steps // Read each record now completely to get offsets at step+1 - position = 64; + position = m_IndexHeaderSize; unsigned int currentStep = 0; std::vector writerToFileMap; // reading one step beyond target to get correct offsets @@ -981,9 +981,9 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, " bytes."); } - if (b.GetAvailableSize() < 64) + if (b.GetAvailableSize() < m_IndexHeaderSize) { - b.Resize(position + 64, "BP4Serializer::MakeHeader " + fileType); + b.Resize(m_IndexHeaderSize, "BP4Serializer::MakeHeader " + fileType); } const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR)); @@ -1035,7 +1035,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, if (position != m_EndianFlagPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Endian Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. Endian Flag " "position mismatch"); } const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1; @@ -1045,41 +1045,40 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType, if (position != m_BPVersionPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. BP Version " "position mismatch"); } const uint8_t version = 5; helper::CopyToBuffer(buffer, position, &version); - // byte 38: Active flag (used in Index Table only) + // byte 38: BP Minor version 1 + if (position != m_BPMinorVersionPosition) + { + throw std::runtime_error( + "ADIOS Coding ERROR in BP5Writer::MakeHeader. BP Minor version " + "position mismatch"); + } + const uint8_t minorversion = 1; + helper::CopyToBuffer(buffer, position, &minorversion); + + // byte 39: Active flag (used in Index Table only) if (position != m_ActiveFlagPosition) { throw std::runtime_error( - "ADIOS Coding ERROR in BP4Serializer::MakeHeader. Active Flag " + "ADIOS Coding ERROR in BP5Writer::MakeHeader. Active Flag " "position mismatch"); } const uint8_t activeFlag = (isActive ? 1 : 0); helper::CopyToBuffer(buffer, position, &activeFlag); - // byte 39: Minor file version - const uint8_t subversion = 0; - helper::CopyToBuffer(buffer, position, &subversion); - - // bytes 40-43 writer count - const uint32_t WriterCount = m_Comm.Size(); - helper::CopyToBuffer(buffer, position, &WriterCount); - // bytes 44-47 aggregator count - const uint32_t AggregatorCount = - static_cast(m_Aggregator->m_NumAggregators); - helper::CopyToBuffer(buffer, position, &AggregatorCount); - // byte 48 columnMajor + // byte 40 columnMajor // write if data is column major in metadata and data const uint8_t columnMajor = (m_IO.m_ArrayOrder == ArrayOrdering::ColumnMajor) ? 'y' : 'n'; helper::CopyToBuffer(buffer, position, &columnMajor); - // byte 49-63: unused - position += 15; + // byte 41-63: unused + position += 23; absolutePosition = position; } diff --git a/source/utils/bp5dbg/adios2/bp5dbg/utils.py b/source/utils/bp5dbg/adios2/bp5dbg/utils.py index 48c58c8f00..8a79b9aa2e 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/utils.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/utils.py @@ -130,38 +130,35 @@ def ReadHeader(f, fileSize, fileType): status = False bpversion = int(header[37]) - active = int(header[38]) + bpminorversion = int(header[38]) + active = int(header[39]) if active == 0: activeStr = ' no' else: activeStr = 'yes' - - # unused = hStr[39] - - WriterCount = int(header[40]) - aggregatorcount = int(header[44]) - iscolumnmajor = header[49] + iscolumnmajor = hStr[40] + if iscolumnmajor == 'n': + clmnStr = ' no' + else: + clmnStr = ' yes' # 45..63 unused - print("---------------------------------------------------------------" - "---------------------------------------------------------------") + print("---------------------------------------------------------" + "---------------------------------------------------------") print("| Version string |Major|Minor|Patch" - "|unused|Endian|BP version|Active|WriterCount|AggCount" + - "|ColumnMajor|unused|") + "|unused|Endian|BP version|BP minor|Active|ColumnMajor|unused|") print("| 32 bytes | 1B | 1B | 1B " - "| 1B | 1B | 1B | 1B | 4B | 4B " + - "| 1B | 16B |") - print("+----------------------------------+-----+-----+-----+------+---" - "---+----------+------+-----------+--------+-----------+------+") + "| 1B | 1B | 1B | 1B | 1B | 1B | 23B |") + print("+----------------------------------+-----+-----+-----" + "+------+------+----------+--------+------+-----------+------+") print("| {0} | {1} | {2} | {3} | | {4} " - "| {5} | {6} | {7:d} | {8:d} | " - "{9} | |".format( + "| {5} | {6} | {7} | {8} | |".format( versionStr, str(major).center(3), str(minor).center(3), - str(micro).center(3), endian, bpversion, activeStr, - WriterCount, aggregatorcount, iscolumnmajor)) - print("---------------------------------------------------------------" - "---------------------------------------------------------------") - return [status, WriterCount] + str(micro).center(3), endian, str(bpversion).center(3), + str(bpminorversion).center(3), activeStr, clmnStr)) + print("---------------------------------------------------------" + "---------------------------------------------------------") + return status if __name__ == "__main__": diff --git a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp index 4617c8252d..114ec156cb 100644 --- a/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp +++ b/testing/adios2/engine/bp/TestBPWriteAppendReadADIOS2.cpp @@ -836,48 +836,35 @@ TEST_F(BPWriteAppendReadTestADIOS2, ADIOS2BPWriteAppendReadVaryingAggregation) /* Write phase II: append */ io.SetParameter("NumAggregators", "1"); - if (engineName == "BP5" && mpiSize > 1) + bpWriter = io.Open(fname, adios2::Mode::Append); + for (size_t step = NSteps; step < 2 * NSteps; ++step) { - EXPECT_THROW(bpWriter = io.Open(fname, adios2::Mode::Append), - std::runtime_error); + // Generate test data for each process uniquely + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), mpiRank, mpiSize); + bpWriter.BeginStep(); + bpWriter.Put(var_i32, currentTestData.I32.data()); + bpWriter.EndStep(); } - else - { - bpWriter = io.Open(fname, adios2::Mode::Append); - for (size_t step = NSteps; step < 2 * NSteps; ++step) - { - // Generate test data for each process uniquely - SmallTestData currentTestData = generateNewSmallTestData( - m_TestData, static_cast(step), mpiRank, mpiSize); - bpWriter.BeginStep(); - bpWriter.Put(var_i32, currentTestData.I32.data()); - bpWriter.EndStep(); - } - bpWriter.Close(); + bpWriter.Close(); - /* Write phase III: append */ - io.SetParameter("NumAggregators", "2"); - bpWriter = io.Open(fname, adios2::Mode::Append); - for (size_t step = 2 * NSteps; step < 3 * NSteps; ++step) - { - // Generate test data for each process uniquely - SmallTestData currentTestData = generateNewSmallTestData( - m_TestData, static_cast(step), mpiRank, mpiSize); - bpWriter.BeginStep(); - bpWriter.Put(var_i32, currentTestData.I32.data()); - bpWriter.EndStep(); - } - bpWriter.Close(); + /* Write phase III: append */ + io.SetParameter("NumAggregators", "2"); + bpWriter = io.Open(fname, adios2::Mode::Append); + for (size_t step = 2 * NSteps; step < 3 * NSteps; ++step) + { + // Generate test data for each process uniquely + SmallTestData currentTestData = generateNewSmallTestData( + m_TestData, static_cast(step), mpiRank, mpiSize); + bpWriter.BeginStep(); + bpWriter.Put(var_i32, currentTestData.I32.data()); + bpWriter.EndStep(); } + bpWriter.Close(); } { size_t NumSteps = 3 * NSteps; - if (engineName == "BP5" && mpiSize > 1) - { - NumSteps = NSteps; - } - adios2::IO io = adios.DeclareIO("ReadIO"); io.SetEngine(engineName); From f1e5588644201b407a8ea11c7fa8bf72ac8c7893 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 25 Jan 2022 09:59:42 -0500 Subject: [PATCH 4/8] disable Truncate in FileStdio on Windows --- source/adios2/toolkit/transport/file/FileStdio.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/adios2/toolkit/transport/file/FileStdio.cpp b/source/adios2/toolkit/transport/file/FileStdio.cpp index 3ec3fdb5df..8f213610ca 100644 --- a/source/adios2/toolkit/transport/file/FileStdio.cpp +++ b/source/adios2/toolkit/transport/file/FileStdio.cpp @@ -372,6 +372,13 @@ void FileStdio::Seek(const size_t start) } } +#ifdef _WIN32 +void FileStdio::Truncate(const size_t length) +{ + throw std::ios_base::failure( + "ERROR: FileStdio::Truncate is not supported in Windows\n"); +} +#else #include // ftruncate void FileStdio::Truncate(const size_t length) { @@ -386,7 +393,9 @@ void FileStdio::Truncate(const size_t length) m_Name + ", in call to stdio Truncate\n"); } } +#endif void FileStdio::MkDir(const std::string &fileName) {} + } // end namespace transport } // end namespace adios2 From 5f2e480b9550eaa8cdce473a72c1fc30013fd62e Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 25 Jan 2022 10:00:13 -0500 Subject: [PATCH 5/8] flake8 python --- source/utils/bp5dbg/adios2/bp5dbg/idxtable.py | 19 ++++++++----------- source/utils/bp5dbg/adios2/bp5dbg/utils.py | 2 +- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index 57e1af3214..c6e4852d15 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -7,16 +7,15 @@ def ReadWriterMap(bytearray, pos): data = np.frombuffer(bytearray, dtype=np.uint64, count=3, - offset=pos) + offset=pos) WriterCount = int(data[0]) AggregatorCount = int(data[1]) SubfileCount = int(data[2]) pos = pos + 3 * 8 - print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}".format( - WriterCount, AggregatorCount, SubfileCount)) + print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}" .format(WriterCount, AggregatorCount, SubfileCount)) data = np.frombuffer(bytearray, dtype=np.uint64, count=WriterCount, - offset=pos) + offset=pos) print(" =====================") print(" | Rank | Subfile |") print(" ---------------------") @@ -26,7 +25,7 @@ def ReadWriterMap(bytearray, pos): print(" |" + rank + " | " + sub + " |") print(" =====================") - pos = pos + WriterCount *8 + pos = pos + WriterCount * 8 return pos, WriterCount, AggregatorCount, SubfileCount def ReadIndex(f, fileSize): @@ -49,13 +48,14 @@ def ReadIndex(f, fileSize): haswritermap = data[3] print("| Step = " + stepstr + "| MetadataPos = " + mdatapos + " | MetadataSize = " + mdatasize + " | FlushCount = " + - flushcount + "| hasWriterMap = " + str(haswritermap).ljust(3) + "|") + flushcount + "| hasWriterMap = " + + str(haswritermap).ljust(3) + "|") pos = pos + 4 * 8 if (haswritermap > 0): - pos, WriterCount, AggregatorCount, SubfileCount = ReadWriterMap(table, pos) - + pos, WriterCount, AggregatorCount, SubfileCount = ReadWriterMap( + table, pos) for Writer in range(0, WriterCount): start = " Writer " + str(Writer) + " data " @@ -90,10 +90,7 @@ def DumpIndexTable(fileName): fileSize = fstat(f.fileno()).st_size status = ReadHeader(f, fileSize, "Index Table") if isinstance(status, list): - WriterCount = status[1] status = status[0] - #if status: - # status = ReadWriterArray(f, fileSize, WriterCount) if status: status = ReadIndex(f, fileSize) return status diff --git a/source/utils/bp5dbg/adios2/bp5dbg/utils.py b/source/utils/bp5dbg/adios2/bp5dbg/utils.py index 8a79b9aa2e..b24e50eb21 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/utils.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/utils.py @@ -153,7 +153,7 @@ def ReadHeader(f, fileSize, fileType): "+------+------+----------+--------+------+-----------+------+") print("| {0} | {1} | {2} | {3} | | {4} " "| {5} | {6} | {7} | {8} | |".format( - versionStr, str(major).center(3), str(minor).center(3), + versionStr, str(major).center(3), str(minor).center(3), str(micro).center(3), endian, str(bpversion).center(3), str(bpminorversion).center(3), activeStr, clmnStr)) print("---------------------------------------------------------" From ed23c6104e7f8766d325aa2be15e1795fc12c237 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 25 Jan 2022 10:20:46 -0500 Subject: [PATCH 6/8] reflake --- source/utils/bp5dbg/adios2/bp5dbg/idxtable.py | 3 ++- source/utils/bp5dbg/adios2/bp5dbg/utils.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py index c6e4852d15..cfae81fa4c 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/idxtable.py @@ -13,7 +13,8 @@ def ReadWriterMap(bytearray, pos): SubfileCount = int(data[2]) pos = pos + 3 * 8 - print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}" .format(WriterCount, AggregatorCount, SubfileCount)) + print(" WriterMap: Writers = {0} Aggregators = {1} Subfiles = {2}" + .format(WriterCount, AggregatorCount, SubfileCount)) data = np.frombuffer(bytearray, dtype=np.uint64, count=WriterCount, offset=pos) print(" =====================") diff --git a/source/utils/bp5dbg/adios2/bp5dbg/utils.py b/source/utils/bp5dbg/adios2/bp5dbg/utils.py index b24e50eb21..22dbe45e6b 100644 --- a/source/utils/bp5dbg/adios2/bp5dbg/utils.py +++ b/source/utils/bp5dbg/adios2/bp5dbg/utils.py @@ -153,7 +153,7 @@ def ReadHeader(f, fileSize, fileType): "+------+------+----------+--------+------+-----------+------+") print("| {0} | {1} | {2} | {3} | | {4} " "| {5} | {6} | {7} | {8} | |".format( - versionStr, str(major).center(3), str(minor).center(3), + versionStr, str(major).center(3), str(minor).center(3), str(micro).center(3), endian, str(bpversion).center(3), str(bpminorversion).center(3), activeStr, clmnStr)) print("---------------------------------------------------------" From d9d98b4e0a90c24f08356e5ee1f92ed297419039 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Wed, 26 Jan 2022 07:46:26 -0500 Subject: [PATCH 7/8] Truncate properly when appending after 0 steps. Rename parameter to AppendAfterSteps. --- source/adios2/engine/bp5/BP5Engine.h | 2 +- source/adios2/engine/bp5/BP5Writer.cpp | 114 ++++++++++++++++--------- source/adios2/engine/bp5/BP5Writer.h | 1 + 3 files changed, 74 insertions(+), 43 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index b64094086d..6d0e6b82f8 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -128,7 +128,7 @@ class BP5Engine MACRO(BufferChunkSize, SizeBytes, size_t, DefaultBufferChunkSize) \ MACRO(MaxShmSize, SizeBytes, size_t, DefaultMaxShmSize) \ MACRO(BufferVType, BufferVType, int, (int)BufferVType::ChunkVType) \ - MACRO(AppendAfterStep, UInt, unsigned int, INT_MAX) \ + MACRO(AppendAfterSteps, Int, int, INT_MAX) \ MACRO(ReaderShortCircuitReads, Bool, bool, false) struct BP5Params diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index b3ba8fd913..53af024677 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -601,6 +601,11 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) if (buffer.size() < m_IndexHeaderSize) { + m_AppendMetadataPos = 0; + m_AppendMetaMetadataPos = 0; + m_AppendMetadataIndexPos = 0; + m_AppendDataPos.resize(m_Aggregator->m_NumAggregators, + 0ULL); // safe bet return 0; } @@ -676,31 +681,40 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) availableSteps++; } - unsigned int targetStep = m_Parameters.AppendAfterStep; - if (targetStep < 0) + unsigned int targetStep = 0; + if (m_Parameters.AppendAfterSteps + static_cast(availableSteps) < 0) + { + targetStep = 0; + } + else if (m_Parameters.AppendAfterSteps < 0) { // -1 means last step - targetStep = availableSteps + m_Parameters.AppendAfterStep + 1; - if (targetStep < 0) - { - targetStep = 0; - } + targetStep = availableSteps + m_Parameters.AppendAfterSteps + 1; + } + else + { + targetStep = static_cast(m_Parameters.AppendAfterSteps); } if (targetStep > availableSteps) { targetStep = availableSteps; } + m_AppendDataPos.resize(nDataFiles, 0ULL); + if (!targetStep) { // append at 0 is like writing new file + m_AppendMetadataPos = 0; + m_AppendMetaMetadataPos = 0; + m_AppendMetadataIndexPos = 0; return 0; } - m_AppendDataPos.resize(nDataFiles); - std::fill(m_AppendDataPos.begin(), m_AppendDataPos.end(), MaxSizeT); m_AppendMetadataPos = MaxSizeT; // size of header + m_AppendMetaMetadataPos = MaxSizeT; m_AppendMetadataIndexPos = MaxSizeT; + std::fill(m_AppendDataPos.begin(), m_AppendDataPos.end(), MaxSizeT); if (targetStep == availableSteps) { @@ -757,8 +771,9 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) buffer, position, IsLittleEndian)); position += sizeof(uint64_t) * 2 * FlushCount; // no need to read - std::cout << "Writer " << i << " subfile " << writerToFileMap[i] - << " first data loc:" << FirstDataPos << std::endl; + /* std::cout << "Writer " << i << " subfile " << + writerToFileMap[i] << " first data loc:" << FirstDataPos << + std::endl; */ if (FirstDataPos < m_AppendDataPos[writerToFileMap[i]]) { m_AppendDataPos[writerToFileMap[i]] = FirstDataPos; @@ -1121,40 +1136,17 @@ void BP5Writer::InitBPBuffer() } m_Comm.BroadcastVector(preMetadataIndex.m_Buffer); m_WriterStep = CountStepsInMetadataIndex(preMetadataIndex); - } - if (!m_WriterStep) - { - /* This is a new file or append at 0 - * Make headers in data buffer and metadata buffer (but do not write - * them yet so that Open() can stay free of writing to disk) - */ - if (m_Comm.Rank() == 0) - { - format::BufferSTL b; - MakeHeader(b, "Metadata", false); - m_FileMetadataManager.SeekToFileBegin(); - m_FileMetadataManager.WriteFiles(b.m_Buffer.data(), b.m_Position); - m_MetaDataPos = b.m_Position; - format::BufferSTL bi; - MakeHeader(bi, "Index Table", true); - m_FileMetadataIndexManager.SeekToFileBegin(); - m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), - bi.m_Position); - } - if (m_Aggregator->m_IsAggregator) - { - m_FileDataManager.SeekTo(0); - } - } - else - { + // truncate and seek if (m_Aggregator->m_IsAggregator) { const size_t off = m_AppendDataPos[m_Aggregator->m_SubStreamIndex]; if (off < MaxSizeT) { m_FileDataManager.Truncate(off); + // Seek is needed since truncate does not seek. + // SeekTo instead of SeetToFileEnd in case a transport + // does not support actual truncate. m_FileDataManager.SeekTo(off); m_DataPos = off; } @@ -1171,9 +1163,6 @@ void BP5Writer::InitBPBuffer() { m_MetaDataPos = m_AppendMetadataPos; m_FileMetadataManager.Truncate(m_MetaDataPos); - // SeekTo instead of SeetToFileEnd in case a transport - // does not support actual truncate - // Plus truncate does not seek anyway m_FileMetadataManager.SeekTo(m_MetaDataPos); } else @@ -1182,13 +1171,25 @@ void BP5Writer::InitBPBuffer() m_FileMetadataManager.SeekToFileEnd(); } + // Truncate existing meta-meta file + if (m_AppendMetaMetadataPos < MaxSizeT) + { + m_FileMetaMetadataManager.Truncate(m_AppendMetaMetadataPos); + m_FileMetaMetadataManager.SeekTo(m_AppendMetaMetadataPos); + } + else + { + m_FileMetadataIndexManager.SeekToFileEnd(); + } + // Set the flag in the header of metadata index table to 1 again // to indicate a new run begins UpdateActiveFlag(true); + + // Truncate existing index file if (m_AppendMetadataIndexPos < MaxSizeT) { m_FileMetadataIndexManager.Truncate(m_AppendMetadataIndexPos); - // SeekTo in case a transport does not support actual truncate m_FileMetadataIndexManager.SeekTo(m_AppendMetadataIndexPos); } else @@ -1196,6 +1197,35 @@ void BP5Writer::InitBPBuffer() m_FileMetadataIndexManager.SeekToFileEnd(); } } + m_AppendDataPos.clear(); + } + + if (!m_WriterStep) + { + /* This is a new file or append at 0 + * Make headers in data buffer and metadata buffer (but do not write + * them yet so that Open() can stay free of writing to disk) + */ + if (m_Comm.Rank() == 0) + { + format::BufferSTL b; + MakeHeader(b, "Metadata", false); + m_FileMetadataManager.SeekToFileBegin(); + m_FileMetadataManager.WriteFiles(b.m_Buffer.data(), b.m_Position); + m_MetaDataPos = b.m_Position; + format::BufferSTL bi; + MakeHeader(bi, "Index Table", true); + m_FileMetadataIndexManager.SeekToFileBegin(); + m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), + bi.m_Position); + m_FileMetaMetadataManager.SeekToFileBegin(); + } + // last attempt to clean up datafile if called with append mode, + // data existed but index was missing + if (m_Aggregator->m_IsAggregator) + { + m_FileDataManager.SeekTo(0); + } } if (m_Comm.Rank() == 0) diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 918097aef0..d452de2c3b 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -243,6 +243,7 @@ class BP5Writer : public BP5Engine, public core::Engine // Append helper data std::vector m_AppendDataPos; // each subfile append pos size_t m_AppendMetadataPos; // metadata file append pos + size_t m_AppendMetaMetadataPos; // meta-metadata file append pos size_t m_AppendMetadataIndexPos; // index file append pos uint32_t m_AppendWriterCount; // last active number of writers unsigned int m_AppendAggregatorCount; // last active number of aggr From 4fbeca521afc773b506e8548cb08001efda693f0 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Wed, 26 Jan 2022 08:04:25 -0500 Subject: [PATCH 8/8] Fix target step calculation avoiding integer overflow --- source/adios2/engine/bp5/BP5Writer.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 53af024677..726adc1b82 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -682,14 +682,16 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) } unsigned int targetStep = 0; - if (m_Parameters.AppendAfterSteps + static_cast(availableSteps) < 0) - { - targetStep = 0; - } - else if (m_Parameters.AppendAfterSteps < 0) + + if (m_Parameters.AppendAfterSteps < 0) { - // -1 means last step - targetStep = availableSteps + m_Parameters.AppendAfterSteps + 1; + // -1 means append after last step + int s = (int)availableSteps + m_Parameters.AppendAfterSteps + 1; + if (s < 0) + { + s = 0; + } + targetStep = static_cast(s); } else {