Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweaks for BP5 on windows. #3682

Merged
merged 12 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/DetectOptions.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ if(DAOS_FOUND)
endif()

# BP5
if(ADIOS2_USE_BP5 AND NOT WIN32)
if(ADIOS2_USE_BP5)
set(ADIOS2_HAVE_BP5 TRUE)
endif()

Expand Down
4 changes: 4 additions & 0 deletions source/adios2/core/Variable.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ std::pair<T, T> Variable<T>::DoMinMax(const size_t step) const
MinMaxStruct MM;
if (m_Engine->VariableMinMax(*this, step, MM))
{
if (std::is_same<T, std::string>::value) {
return minMax;
} else {
minMax.first = *(T *)&MM.MinUnion;
minMax.second = *(T *)&MM.MaxUnion;
return minMax;
}
}
}
if (m_Engine != nullptr && !m_FirstStreamingStep)
Expand Down
12 changes: 6 additions & 6 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ void BP5Reader::PerformGets()
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
futures[tid] = std::async(std::launch::async, lf_Reader, tid + 1,
futures[tid] = std::async(std::launch::async, lf_Reader, (int)(tid + 1),
maxOpenFiles);
}
// main thread runs last subset of reads
Expand Down Expand Up @@ -513,9 +513,9 @@ void BP5Reader::InitParameters()
}

size_t limit = helper::RaiseLimitNoFile();
if (m_Parameters.MaxOpenFilesAtOnce > limit - 8)
if (m_Parameters.MaxOpenFilesAtOnce > (unsigned int) limit - 8)
{
m_Parameters.MaxOpenFilesAtOnce = limit - 8;
m_Parameters.MaxOpenFilesAtOnce = (unsigned int) limit - 8;
}
}

Expand Down Expand Up @@ -986,11 +986,11 @@ size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
{
auto p = m_WriterMap.emplace(m_StepsCount, WriterMapStruct());
auto &s = p.first->second;
s.WriterCount = helper::ReadValue<uint64_t>(
s.WriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.AggregatorCount = helper::ReadValue<uint64_t>(
s.AggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
s.SubfileCount = helper::ReadValue<uint64_t>(
s.SubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
// Get the process -> subfile map
s.RankToSubfile.reserve(s.WriterCount);
Expand Down
54 changes: 31 additions & 23 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ void BP5Writer::WriteMetaMetadata(
m_FileMetaMetadataManager.WriteFiles((char *)b.MetaMetaInfo,
b.MetaMetaInfoLen);
}
m_FileMetaMetadataManager.FlushFiles();
}

uint64_t
Expand Down Expand Up @@ -206,6 +207,8 @@ BP5Writer::WriteMetadata(const std::vector<core::iovec> &MetaDataBlocks,
MetaDataSize += b.iov_len;
}

m_FileMetadataManager.FlushFiles();

m_MetaDataPos += MetaDataSize;
return MetaDataSize;
}
Expand Down Expand Up @@ -272,6 +275,7 @@ void BP5Writer::WriteData(format::BufferV *Data)
std::to_string(m_Parameters.AggregationType) +
"is not supported in BP5");
}
m_FileDataManager.FlushFiles();
delete Data;
}
}
Expand Down Expand Up @@ -337,8 +341,6 @@ void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data,
void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
uint64_t MetaDataSize)
{
m_FileMetadataManager.FlushFiles();

// bufsize: Step record
size_t bufsize =
1 + (4 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size()) *
Expand Down Expand Up @@ -407,7 +409,6 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
}

m_FileMetadataIndexManager.WriteFiles((char *)buf.data(), buf.size());

#ifdef DUMPDATALOCINFO
std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl;
std::cout << "Write Index positions = {" << std::endl;
Expand All @@ -427,6 +428,8 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,
}
std::cout << "}" << std::endl;
#endif
m_FileMetadataIndexManager.FlushFiles();

/* reset for next timestep */
FlushPosSizeInfo.clear();
}
Expand Down Expand Up @@ -472,7 +475,7 @@ void BP5Writer::MarshalAttributes()

if (!attributePair.second->m_IsSingleValue)
{
element_count = (*baseAttr)->m_Elements;
element_count = (int)(*baseAttr)->m_Elements;
}

if (type == DataType::None)
Expand Down Expand Up @@ -511,7 +514,7 @@ void BP5Writer::MarshalAttributes()
void *data_addr = &attribute.m_DataSingleValue; \
if (!attribute.m_IsSingleValue) \
{ \
element_count = attribute.m_Elements; \
element_count = (int)attribute.m_Elements; \
data_addr = attribute.m_DataArray.data(); \
} \
m_BP5Serializer.MarshalAttribute(attribute.m_Name.c_str(), type, \
Expand All @@ -536,7 +539,7 @@ void BP5Writer::EndStep()

// true: advances step
auto TSInfo = m_BP5Serializer.CloseTimestep(
m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO);
(int)m_WriterStep, m_Parameters.AsyncWrite || m_Parameters.DirectIO);

/* TSInfo includes NewMetaMetaBlocks, the MetaEncodeBuffer, the
* AttributeEncodeBuffer and the data encode Vector */
Expand Down Expand Up @@ -684,6 +687,10 @@ void BP5Writer::EndStep()
m_AsyncWriteLock.unlock();
}
}
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataManager.FlushFiles();
m_FileMetaMetadataManager.FlushFiles();
m_FileDataManager.FlushFiles();

m_Profiler.Stop("ES");
m_WriterStep++;
Expand Down Expand Up @@ -752,7 +759,8 @@ void BP5Writer::InitParameters()
{
size_t k =
m_Parameters.StripeSize / m_Parameters.DirectIOAlignOffset + 1;
m_Parameters.StripeSize = k * m_Parameters.DirectIOAlignOffset;
m_Parameters.StripeSize =
(unsigned int)(k * m_Parameters.DirectIOAlignOffset);
}
if (m_Parameters.BufferChunkSize % m_Parameters.DirectIOAlignOffset)
{
Expand Down Expand Up @@ -852,12 +860,12 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
{
case IndexRecord::WriterMapRecord:
{
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendSubfileCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendWriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendAggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendSubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
if (m_AppendSubfileCount > nDataFiles)
{
nDataFiles = m_AppendSubfileCount;
Expand Down Expand Up @@ -939,12 +947,12 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
{
case IndexRecord::WriterMapRecord:
{
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendSubfileCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendWriterCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendAggregatorCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);
m_AppendSubfileCount = (uint32_t)helper::ReadValue<uint64_t>(
buffer, position, IsLittleEndian);

// Get the process -> subfile map
writerToFileMap.clear();
Expand Down Expand Up @@ -1796,8 +1804,8 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
helper::DimsArray MemoryCount(variable.m_MemoryCount);
helper::DimsArray varCount(variable.m_Count);

int DimCount = variable.m_Count.size();
std::vector<size_t> ZeroDims(DimCount);
int DimCount = (int)variable.m_Count.size();
helper::DimsArray ZeroDims(DimCount, (size_t)0);
// get a temporary span then fill with memselection now
format::BufferV::BufferPos bp5span(0, 0, 0);

Expand All @@ -1816,8 +1824,8 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
}
helper::NdCopy((const char *)values, helper::CoreDims(ZeroDims),
MemoryCount, sourceRowMajor, false, (char *)ptr,
MemoryStart, varCount, sourceRowMajor, false, ObjSize,
helper::CoreDims(), helper::CoreDims(),
MemoryStart, varCount, sourceRowMajor, false,
(int)ObjSize, helper::CoreDims(), helper::CoreDims(),
helper::CoreDims(), helper::CoreDims(),
false /* safemode */, variable.m_MemSpace);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void BP5Writer::AsyncWriteOwnData(AsyncWriteInfo *info,
size_t wrote = 0;
size_t block = 0;
size_t temp_offset = 0;
size_t max_size = std::max(1024 * 1024UL, totalsize / 100UL);
size_t max_size = std::max((size_t)1024 * 1024UL, totalsize / 100UL);

bool firstWrite = seekOnFirstWrite;
while (block < nBlocks)
Expand Down
Loading