Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go back using active flag in metadata index header instead of using a… #2267

Merged
merged 1 commit into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,15 @@ void BP4Reader::OpenFiles(const TimePoint &timeoutInstant,
if (m_BP4Deserializer.m_RankMPI == 0)
{
throw std::ios_base::failure(
"ERROR: File " + m_Name +
" could not be found within timeout: " + lasterrmsg);
"ERROR: File " + m_Name + " could not be found within the " +
std::to_string(timeoutSeconds.count()) +
"s timeout: " + lasterrmsg);
}
else
{
throw std::ios_base::failure("ERROR: File " + m_Name +
" could not be found within timeout");
throw std::ios_base::failure(
"ERROR: File " + m_Name + " could not be found within the " +
std::to_string(timeoutSeconds.count()) + "s timeout");
}
}

Expand Down Expand Up @@ -508,13 +510,10 @@ bool BP4Reader::CheckWriterActive()
size_t flag = 0;
if (m_BP4Deserializer.m_RankMPI == 0)
{
/* Look for the active flag file */
const std::string activeFlagFile(
m_BP4Deserializer.GetBPActiveFlagFileName(m_Name));

flag = m_ActiveFlagFileManager.FileExists(
activeFlagFile, m_IO.m_TransportsParameters[0],
m_BP4Deserializer.m_Profiler.m_IsActive);
std::vector<char> header(64, '\0');
m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0);
bool active = m_BP4Deserializer.ReadActiveFlag(header);
flag = (active ? 1 : 0);
}
flag = m_BP4Deserializer.m_Comm.BroadcastValue(flag, 0);
m_WriterIsActive = (flag > 0);
Expand All @@ -526,7 +525,6 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
/* Do a collective wait for a step within timeout.
Make sure every reader comes to the same conclusion */
StepStatus retval = StepStatus::OK;
bool haveNewStep = false;

if (timeoutSeconds < Seconds::zero())
{
Expand All @@ -547,16 +545,14 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;
size_t newIdxSize = 0;

m_IO.m_ReadStreaming = false;
while (m_WriterIsActive)
{
size_t newIdxSize = UpdateBuffer(timeoutInstant, pollSeconds / 10);
newIdxSize = UpdateBuffer(timeoutInstant, pollSeconds / 10);
if (newIdxSize > 0)
{
haveNewStep = true;
/* we have new metadata in memory. Need to parse it now */
ProcessMetadataForNewSteps(newIdxSize);
break;
}
if (!CheckWriterActive())
Expand All @@ -570,7 +566,13 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
}
}

if (!haveNewStep)
if (newIdxSize > 0)
{
/* we have new metadata in memory. Need to parse it now */
ProcessMetadataForNewSteps(newIdxSize);
retval = StepStatus::OK;
}
else
{
m_IO.m_ReadStreaming = false;
if (m_WriterIsActive)
Expand All @@ -582,10 +584,7 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
retval = StepStatus::EndOfStream;
}
}
else
{
retval = StepStatus::OK;
}

m_IO.m_ReadStreaming = saveReadStreaming;

return retval;
Expand Down
67 changes: 35 additions & 32 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ BP4Writer::BP4Writer(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP4Writer", io, name, mode, std::move(comm)), m_BP4Serializer(m_Comm),
m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm),
m_FileMetadataIndexManager(m_Comm), m_FileActiveFlagManager(m_Comm),
m_FileDrainer()
m_FileMetadataIndexManager(m_Comm), m_FileDrainer()
{
TAU_SCOPED_TIMER("BP4Writer::Open");
m_IO.m_ReadStreaming = false;
Expand Down Expand Up @@ -244,16 +243,6 @@ void BP4Writer::InitTransports()
m_FileMetadataManager.GetFilesBaseNames(
m_BBName, m_IO.m_TransportsParameters);

/* Create active flag file now to indicate a producer is active.
* When the index file is created, a reader might start polling on it,
* so the active flag should already exist. */
m_ActiveFlagFileNames =
m_BP4Serializer.GetBPActiveFlagFileNames(transportsNames);

m_FileActiveFlagManager.OpenFiles(
m_ActiveFlagFileNames, Mode::Write, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);

m_MetadataFileNames =
m_BP4Serializer.GetBPMetadataFileNames(transportsNames);

Expand All @@ -278,13 +267,7 @@ void BP4Writer::InitTransports()
m_DrainMetadataIndexFileNames =
m_BP4Serializer.GetBPMetadataIndexFileNames(
drainTransportNames);
m_DrainActiveFlagFileNames =
m_BP4Serializer.GetBPActiveFlagFileNames(drainTransportNames);

for (const auto &name : m_DrainActiveFlagFileNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
}
for (const auto &name : m_DrainMetadataFileNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
Expand All @@ -301,8 +284,6 @@ void BP4Writer::InitBPBuffer()
{
if (m_OpenMode == Mode::Append)
{
// throw std::invalid_argument(
// "ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
format::BufferSTL preMetadataIndex;
size_t preMetadataIndexFileSize;
Expand Down Expand Up @@ -362,20 +343,30 @@ void BP4Writer::InitBPBuffer()
if (m_BP4Serializer.m_PreDataFileLength == 0)
{
/* This is a new file.
* Make headers in data buffer and metadata buffer
* Make headers in data buffer and metadata buffer (but do not write
* them yet so that Open() can stay free of writing to disk)
*/
if (m_BP4Serializer.m_RankMPI == 0)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Metadata, "Metadata",
false);
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_MetadataIndex,
"Index Table", false);
"Index Table", true);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
}
else
{
if (m_BP4Serializer.m_RankMPI == 0)
{
// Set the flag in the header of metadata index table to 1 again
// to indicate a new run begins
UpdateActiveFlag(true);
}
}

m_BP4Serializer.PutProcessGroupIndex(
m_IO.m_Name, m_IO.m_HostLanguage,
Expand Down Expand Up @@ -436,22 +427,15 @@ void BP4Writer::DoClose(const int transportIndex)

if (m_BP4Serializer.m_RankMPI == 0)
{
// Update the active flag in index to indicate current run is over.
UpdateActiveFlag(false);

// close metadata file
m_FileMetadataManager.CloseFiles();

// close metadata index file
m_FileMetadataIndexManager.CloseFiles();

// Delete the active flag file to indicate current run is over.
m_FileActiveFlagManager.DeleteFiles();
if (m_DrainBB)
{
for (const auto &name : m_DrainActiveFlagFileNames)
{
m_FileDrainer.AddOperationDelete(name);
}
}

// Delete metadata files from temporary storage if draining was on
if (m_DrainBB)
{
Expand Down Expand Up @@ -547,6 +531,25 @@ void BP4Writer::PopulateMetadataIndexFileContent(
position += 8;
}

void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWriteAt(
m_DrainMetadataIndexFileNames[i],
m_BP4Serializer.m_ActiveFlagPosition, 1, &activeChar);
m_FileDrainer.AddOperationSeekEnd(m_DrainMetadataIndexFileNames[i]);
}
}
}

void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class BP4Writer : public core::Engine
/* transport manager for managing the metadata index file */
transportman::TransportMan m_FileMetadataIndexManager;

transportman::TransportMan m_FileActiveFlagManager;

/*
* Burst buffer variables
*/
Expand All @@ -87,7 +85,6 @@ class BP4Writer : public core::Engine
std::vector<std::string> m_MetadataIndexFileNames;
std::vector<std::string> m_DrainMetadataIndexFileNames;
std::vector<std::string> m_ActiveFlagFileNames;
std::vector<std::string> m_DrainActiveFlagFileNames;

void Init() final;

Expand Down Expand Up @@ -137,6 +134,8 @@ class BP4Writer : public core::Engine
const uint64_t variablesIndexStart, const uint64_t attributesIndexStart,
const uint64_t currentStepEndPos, const uint64_t currentTimeStamp);

void UpdateActiveFlag(const bool active);

void WriteCollectiveMetadataFile(const bool isFinal = false);

/**
Expand Down
27 changes: 20 additions & 7 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,11 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL,
std::to_string(m_Minifooter.Version) + " version \n");
}

/* Writer active flag (not used anymore)
//
//position = m_ActiveFlagPosition;
//const char activeChar = helper::ReadValue<uint8_t>(
// buffer, position, m_Minifooter.IsLittleEndian);
//m_WriterIsActive = (activeChar == '\1' ? true : false);
*/
// Writer active flag
position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);

// move position to first row
position = 64;
Expand Down Expand Up @@ -634,6 +632,21 @@ void BP4Deserializer::ClipMemory(const std::string &variableName, core::IO &io,
#undef declare_type
}

bool BP4Deserializer::ReadActiveFlag(std::vector<char> &buffer)
{
if (buffer.size() < m_ActiveFlagPosition)
{
throw std::runtime_error("BP4Deserializer::ReadActiveFlag() is called "
"with a buffer smaller than required");
}
// Writer active flag
size_t position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);
return m_WriterIsActive;
}

#define declare_template_instantiation(T) \
template void BP4Deserializer::GetSyncVariableDataFromStream( \
core::Variable<T> &, BufferSTL &) const; \
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class BP4Deserializer : virtual public BP4Base
/** BP Minifooter fields */
Minifooter m_Minifooter;

bool m_WriterIsActive = false;

/**
* Unique constructor
* @param comm multi-process communicator
Expand Down Expand Up @@ -176,6 +178,8 @@ class BP4Deserializer : virtual public BP4Base
const Box<Dims> &blockBox,
const Box<Dims> &intersectionBox) const;

bool ReadActiveFlag(std::vector<char> &buffer);

// TODO: will deprecate
bool m_PerformedGets = false;

Expand Down