Skip to content

Commit

Permalink
Go back using active flag in metadata index header instead of using a…
Browse files Browse the repository at this point in the history
… separate 'active' file, which introduced race conditions. A separate file cannot guarantee the order of the following events: removal of active file (file system's metadata server operation), and arrival of content of metadata index file on disk (a storage operation). Index content may be en route to disk but held up to indefinite time while the active file disappears, so a reader may not get the last steps believing the file is not active anymore. In contrast, active flag is written to the same file after the last steps are written, so order is guaranteed as long as the file system guarantees the order of writes to the same file.
  • Loading branch information
pnorbert committed May 18, 2020
1 parent d124d6d commit f29b2e9
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 63 deletions.
41 changes: 20 additions & 21 deletions source/adios2/engine/bp4/BP4Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,15 @@ void BP4Reader::OpenFiles(const TimePoint &timeoutInstant,
if (m_BP4Deserializer.m_RankMPI == 0)
{
throw std::ios_base::failure(
"ERROR: File " + m_Name +
" could not be found within timeout: " + lasterrmsg);
"ERROR: File " + m_Name + " could not be found within the " +
std::to_string(timeoutSeconds.count()) +
"s timeout: " + lasterrmsg);
}
else
{
throw std::ios_base::failure("ERROR: File " + m_Name +
" could not be found within timeout");
throw std::ios_base::failure(
"ERROR: File " + m_Name + " could not be found within the " +
std::to_string(timeoutSeconds.count()) + "s timeout");
}
}

Expand Down Expand Up @@ -508,13 +510,10 @@ bool BP4Reader::CheckWriterActive()
size_t flag = 0;
if (m_BP4Deserializer.m_RankMPI == 0)
{
/* Look for the active flag file */
const std::string activeFlagFile(
m_BP4Deserializer.GetBPActiveFlagFileName(m_Name));

flag = m_ActiveFlagFileManager.FileExists(
activeFlagFile, m_IO.m_TransportsParameters[0],
m_BP4Deserializer.m_Profiler.m_IsActive);
std::vector<char> header(64, '\0');
m_MDIndexFileManager.ReadFile(header.data(), 64, 0, 0);
bool active = m_BP4Deserializer.ReadActiveFlag(header);
flag = (active ? 1 : 0);
}
flag = m_BP4Deserializer.m_Comm.BroadcastValue(flag, 0);
m_WriterIsActive = (flag > 0);
Expand All @@ -526,7 +525,6 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
/* Do a collective wait for a step within timeout.
Make sure every reader comes to the same conclusion */
StepStatus retval = StepStatus::OK;
bool haveNewStep = false;

if (timeoutSeconds < Seconds::zero())
{
Expand All @@ -547,16 +545,14 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
// Hack: processing metadata for multiple new steps only works
// when pretending not to be in streaming mode
const bool saveReadStreaming = m_IO.m_ReadStreaming;
size_t newIdxSize = 0;

m_IO.m_ReadStreaming = false;
while (m_WriterIsActive)
{
size_t newIdxSize = UpdateBuffer(timeoutInstant, pollSeconds / 10);
newIdxSize = UpdateBuffer(timeoutInstant, pollSeconds / 10);
if (newIdxSize > 0)
{
haveNewStep = true;
/* we have new metadata in memory. Need to parse it now */
ProcessMetadataForNewSteps(newIdxSize);
break;
}
if (!CheckWriterActive())
Expand All @@ -570,7 +566,13 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
}
}

if (!haveNewStep)
if (newIdxSize > 0)
{
/* we have new metadata in memory. Need to parse it now */
ProcessMetadataForNewSteps(newIdxSize);
retval = StepStatus::OK;
}
else
{
m_IO.m_ReadStreaming = false;
if (m_WriterIsActive)
Expand All @@ -582,10 +584,7 @@ StepStatus BP4Reader::CheckForNewSteps(Seconds timeoutSeconds)
retval = StepStatus::EndOfStream;
}
}
else
{
retval = StepStatus::OK;
}

m_IO.m_ReadStreaming = saveReadStreaming;

return retval;
Expand Down
67 changes: 35 additions & 32 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ BP4Writer::BP4Writer(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("BP4Writer", io, name, mode, std::move(comm)), m_BP4Serializer(m_Comm),
m_FileDataManager(m_Comm), m_FileMetadataManager(m_Comm),
m_FileMetadataIndexManager(m_Comm), m_FileActiveFlagManager(m_Comm),
m_FileDrainer()
m_FileMetadataIndexManager(m_Comm), m_FileDrainer()
{
TAU_SCOPED_TIMER("BP4Writer::Open");
m_IO.m_ReadStreaming = false;
Expand Down Expand Up @@ -244,16 +243,6 @@ void BP4Writer::InitTransports()
m_FileMetadataManager.GetFilesBaseNames(
m_BBName, m_IO.m_TransportsParameters);

/* Create active flag file now to indicate a producer is active.
* When the index file is created, a reader might start polling on it,
* so the active flag should already exist. */
m_ActiveFlagFileNames =
m_BP4Serializer.GetBPActiveFlagFileNames(transportsNames);

m_FileActiveFlagManager.OpenFiles(
m_ActiveFlagFileNames, Mode::Write, m_IO.m_TransportsParameters,
m_BP4Serializer.m_Profiler.m_IsActive);

m_MetadataFileNames =
m_BP4Serializer.GetBPMetadataFileNames(transportsNames);

Expand All @@ -278,13 +267,7 @@ void BP4Writer::InitTransports()
m_DrainMetadataIndexFileNames =
m_BP4Serializer.GetBPMetadataIndexFileNames(
drainTransportNames);
m_DrainActiveFlagFileNames =
m_BP4Serializer.GetBPActiveFlagFileNames(drainTransportNames);

for (const auto &name : m_DrainActiveFlagFileNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
}
for (const auto &name : m_DrainMetadataFileNames)
{
m_FileDrainer.AddOperationOpen(name, m_OpenMode);
Expand All @@ -301,8 +284,6 @@ void BP4Writer::InitBPBuffer()
{
if (m_OpenMode == Mode::Append)
{
// throw std::invalid_argument(
// "ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
format::BufferSTL preMetadataIndex;
size_t preMetadataIndexFileSize;
Expand Down Expand Up @@ -362,20 +343,30 @@ void BP4Writer::InitBPBuffer()
if (m_BP4Serializer.m_PreDataFileLength == 0)
{
/* This is a new file.
* Make headers in data buffer and metadata buffer
* Make headers in data buffer and metadata buffer (but do not write
* them yet so that Open() can stay free of writing to disk)
*/
if (m_BP4Serializer.m_RankMPI == 0)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Metadata, "Metadata",
false);
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_MetadataIndex,
"Index Table", false);
"Index Table", true);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
}
else
{
if (m_BP4Serializer.m_RankMPI == 0)
{
// Set the flag in the header of metadata index table to 1 again
// to indicate a new run begins
UpdateActiveFlag(true);
}
}

m_BP4Serializer.PutProcessGroupIndex(
m_IO.m_Name, m_IO.m_HostLanguage,
Expand Down Expand Up @@ -436,22 +427,15 @@ void BP4Writer::DoClose(const int transportIndex)

if (m_BP4Serializer.m_RankMPI == 0)
{
// Update the active flag in index to indicate current run is over.
UpdateActiveFlag(false);

// close metadata file
m_FileMetadataManager.CloseFiles();

// close metadata index file
m_FileMetadataIndexManager.CloseFiles();

// Delete the active flag file to indicate current run is over.
m_FileActiveFlagManager.DeleteFiles();
if (m_DrainBB)
{
for (const auto &name : m_DrainActiveFlagFileNames)
{
m_FileDrainer.AddOperationDelete(name);
}
}

// Delete metadata files from temporary storage if draining was on
if (m_DrainBB)
{
Expand Down Expand Up @@ -547,6 +531,25 @@ void BP4Writer::PopulateMetadataIndexFileContent(
position += 8;
}

void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
if (m_DrainBB)
{
for (int i = 0; i < m_MetadataIndexFileNames.size(); ++i)
{
m_FileDrainer.AddOperationWriteAt(
m_DrainMetadataIndexFileNames[i],
m_BP4Serializer.m_ActiveFlagPosition, 1, &activeChar);
m_FileDrainer.AddOperationSeekEnd(m_DrainMetadataIndexFileNames[i]);
}
}
}

void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{

Expand Down
5 changes: 2 additions & 3 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ class BP4Writer : public core::Engine
/* transport manager for managing the metadata index file */
transportman::TransportMan m_FileMetadataIndexManager;

transportman::TransportMan m_FileActiveFlagManager;

/*
* Burst buffer variables
*/
Expand All @@ -87,7 +85,6 @@ class BP4Writer : public core::Engine
std::vector<std::string> m_MetadataIndexFileNames;
std::vector<std::string> m_DrainMetadataIndexFileNames;
std::vector<std::string> m_ActiveFlagFileNames;
std::vector<std::string> m_DrainActiveFlagFileNames;

void Init() final;

Expand Down Expand Up @@ -137,6 +134,8 @@ class BP4Writer : public core::Engine
const uint64_t variablesIndexStart, const uint64_t attributesIndexStart,
const uint64_t currentStepEndPos, const uint64_t currentTimeStamp);

void UpdateActiveFlag(const bool active);

void WriteCollectiveMetadataFile(const bool isFinal = false);

/**
Expand Down
27 changes: 20 additions & 7 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,11 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL,
std::to_string(m_Minifooter.Version) + " version \n");
}

/* Writer active flag (not used anymore)
//
//position = m_ActiveFlagPosition;
//const char activeChar = helper::ReadValue<uint8_t>(
// buffer, position, m_Minifooter.IsLittleEndian);
//m_WriterIsActive = (activeChar == '\1' ? true : false);
*/
// Writer active flag
position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);

// move position to first row
position = 64;
Expand Down Expand Up @@ -634,6 +632,21 @@ void BP4Deserializer::ClipMemory(const std::string &variableName, core::IO &io,
#undef declare_type
}

bool BP4Deserializer::ReadActiveFlag(std::vector<char> &buffer)
{
if (buffer.size() < m_ActiveFlagPosition)
{
throw std::runtime_error("BP4Deserializer::ReadActiveFlag() is called "
"with a buffer smaller than required");
}
// Writer active flag
size_t position = m_ActiveFlagPosition;
const char activeChar = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
m_WriterIsActive = (activeChar == '\1' ? true : false);
return m_WriterIsActive;
}

#define declare_template_instantiation(T) \
template void BP4Deserializer::GetSyncVariableDataFromStream( \
core::Variable<T> &, BufferSTL &) const; \
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class BP4Deserializer : virtual public BP4Base
/** BP Minifooter fields */
Minifooter m_Minifooter;

bool m_WriterIsActive = false;

/**
* Unique constructor
* @param comm multi-process communicator
Expand Down Expand Up @@ -176,6 +178,8 @@ class BP4Deserializer : virtual public BP4Base
const Box<Dims> &blockBox,
const Box<Dims> &intersectionBox) const;

bool ReadActiveFlag(std::vector<char> &buffer);

// TODO: will deprecate
bool m_PerformedGets = false;

Expand Down

0 comments on commit f29b2e9

Please sign in to comment.