Skip to content

Commit

Permalink
Merge pull request #2074 from eisenhauer/SstReaderRace
Browse files Browse the repository at this point in the history
Reader side connection close race
  • Loading branch information
eisenhauer authored Mar 27, 2020
2 parents f3cb3f5 + 195d4f8 commit e877f36
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 29 deletions.
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct _SstStream
long DiscardPriorTimestep; /* timesteps numerically less than this will be
discarded with prejudice */
long LastDPNotifiedTimestep;
int FailureContactRank;

/* reader side marshal info */
FFSContext ReaderFFSContext;
Expand Down
37 changes: 26 additions & 11 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "cp_internal.h"

#define gettid() pthread_self()
#define MUTEX_DEBUG
#ifdef MUTEX_DEBUG
#define STREAM_MUTEX_LOCK(Stream) \
{ \
Expand Down Expand Up @@ -241,16 +240,31 @@ extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,

if (Stream->Status == Established)
{
/*
* tag our reader instance as failed.
* If any instance is failed, we should remove all, but that requires a
* global operation, so prep.
*/
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, peer likely failed\n");
Stream->Status = PeerFailed;
STREAM_CONDITION_SIGNAL(Stream);
if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) &&
(Stream->Rank != 0))
{
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, but might be part of shutdown "
"Don't change stream status.\n");
/* if this happens and *is* a failure, we'll get the status from
* rank 0 later */
}
else
{
/*
* tag our reader instance as failed, IFF this came from someone we
* should have gotten a CLOSE from. I.E. a reverse peer
*/
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, peer likely failed\n");
if (FailedPeerRank == Stream->FailureContactRank)
{
Stream->Status = PeerFailed;
STREAM_CONDITION_SIGNAL(Stream);
}
}
CP_verbose(
Stream,
"The close was for connection to writer peer %d, notifying DP\n",
Expand Down Expand Up @@ -730,6 +744,7 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *Msg_v,
{
Stream->ConnectionsToWriter[Msg->WriterRank].CMconn = conn;
CMConnection_add_reference(conn);
Stream->FailureContactRank == Msg->WriterRank;
}
CMconn_register_close_handler(conn, ReaderConnCloseHandler, (void *)Stream);
STREAM_CONDITION_SIGNAL(Stream);
Expand Down
83 changes: 65 additions & 18 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ static void SendSpeculativePreloadMsgs(CP_Services Svcs,
Evpath_WSR_Stream WSR_Stream,
TimestepList TS);

// reader-side routine, called by the main thread
static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream,
void **ReaderContactInfoPtr,
struct _SstParams *Params,
Expand Down Expand Up @@ -301,6 +302,7 @@ static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream,
return Stream;
}

// reader-side routine, called by the main thread
static void EvpathDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
Evpath_RS_Stream RS_Stream = (Evpath_RS_Stream)RS_Stream_v;
Expand All @@ -315,6 +317,7 @@ static void EvpathDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
free(RS_Stream);
}

// writer side routine, called by the
static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader,
int RequestingRank)
{
Expand All @@ -329,6 +332,7 @@ static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader,
}
}

// writer side routine, called by the network handler thread
static void EvpathReadRequestHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_Data,
attr_list attrs)
Expand Down Expand Up @@ -427,6 +431,7 @@ typedef struct _EvpathCompletionHandle
struct _EvpathCompletionHandle *Next;
} * EvpathCompletionHandle;

// reader-side routine called by the network handler thread
static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs)
{
Expand Down Expand Up @@ -473,6 +478,7 @@ static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
TAU_STOP_FUNC();
}

// reader-side routine, called from the main program
static int HandleRequestWithPreloaded(CP_Services Svcs,
Evpath_RS_Stream RS_Stream, int Rank,
long Timestep, size_t Offset,
Expand All @@ -499,6 +505,7 @@ static int HandleRequestWithPreloaded(CP_Services Svcs,
return 1;
}

// reader-side routine, called from the main program
static void DiscardPriorPreloaded(CP_Services Svcs, Evpath_RS_Stream RS_Stream,
long Timestep)
{
Expand Down Expand Up @@ -535,13 +542,13 @@ static void DiscardPriorPreloaded(CP_Services Svcs, Evpath_RS_Stream RS_Stream,
pthread_mutex_unlock(&RS_Stream->DataLock);
}

// reader-side routine, called from the network handler thread
static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs)
{
EvpathPreloadMsg PreloadMsg = (EvpathPreloadMsg)msg_v;
Evpath_RS_Stream RS_Stream = PreloadMsg->RS_Stream;
CP_Services Svcs = (CP_Services)client_Data;
EvpathCompletionHandle Handle = NULL;
RSTimestepList Entry = calloc(1, sizeof(*Entry));

Svcs->verbose(
Expand All @@ -565,6 +572,7 @@ static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v,
return;
}

// writer-side routine, called from the main program
static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream,
struct _SstParams *Params,
attr_list DPAttrs)
Expand Down Expand Up @@ -606,6 +614,7 @@ static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream,
return (void *)Stream;
}

// writer-side routine, called from the main program
static void EvpathDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
Evpath_WS_Stream WS_Stream = (Evpath_WS_Stream)WS_Stream_v;
Expand All @@ -630,6 +639,7 @@ static void EvpathDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
free(WS_Stream);
}

// writer-side routine, called from the main program
static DP_WSR_Stream EvpathInitWriterPerReader(CP_Services Svcs,
DP_WS_Stream WS_Stream_v,
int readerCohortSize,
Expand Down Expand Up @@ -694,13 +704,15 @@ static DP_WSR_Stream EvpathInitWriterPerReader(CP_Services Svcs,
return WSR_Stream;
}

// writer-side routine, called from the main program
static void EvpathDestroyWriterPerReader(CP_Services Svcs,
DP_WSR_Stream WSR_Stream_v)
{
Evpath_WSR_Stream WSR_Stream = (Evpath_WSR_Stream)WSR_Stream_v;
free(WSR_Stream);
}

// reader-side routine, called from the main program
static void EvpathProvideWriterDataToReader(CP_Services Svcs,
DP_RS_Stream RS_Stream_v,
int writerCohortSize,
Expand Down Expand Up @@ -743,6 +755,7 @@ static void AddRequestToList(CP_Services Svcs, Evpath_RS_Stream Stream,
pthread_mutex_unlock(&Stream->DataLock);
}

// reader-side routine, called from the main program
static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream,
EvpathCompletionHandle Handle)
{
Expand Down Expand Up @@ -773,19 +786,23 @@ static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream,
pthread_mutex_unlock(&Stream->DataLock);
}

// reader-side routine, called from the network handler (close handler)
static void FailRequestsToRank(CP_Services Svcs, CManager cm,
Evpath_RS_Stream Stream, int FailedRank)
{
EvpathCompletionHandle Tmp;
Svcs->verbose(Stream->CP_Stream, "Fail all pending requests on stream %p\n",
int FailedSomethingToRank = 0;
Svcs->verbose(Stream->CP_Stream,
"Fail pending requests to rank %d on stream %p\n", FailedRank,
Stream);
pthread_mutex_lock(&Stream->DataLock);
Tmp = Stream->PendingReadRequests;
while (Tmp != NULL)
{
if (Tmp->Failed != 1)
if ((Tmp->Failed != 1) && (Tmp->Rank == FailedRank))
{
Tmp->Failed = 1;
FailedSomethingToRank = 1;
Svcs->verbose(Tmp->CPStream,
"Found a pending remote memory read "
"to writer rank %d, marking as "
Expand All @@ -797,9 +814,36 @@ static void FailRequestsToRank(CP_Services Svcs, CManager cm,
}
Tmp = Tmp->Next;
}
if (FailedSomethingToRank)
{
Tmp = Stream->PendingReadRequests;
Svcs->verbose(Stream->CP_Stream,
"We were waiting for requests on rank %d, fail *all* "
"pending requests on stream %p\n",
FailedRank, Stream);

while (Tmp != NULL)
{
if (Tmp->Failed != 1)
{
Tmp->Failed = 1;
FailedSomethingToRank = 1;
Svcs->verbose(Tmp->CPStream,
"Found a pending remote memory read "
"to writer rank %d, marking as "
"failed and signalling condition %d\n",
Tmp->Rank, Tmp->CMcondition);
CMCondition_signal(cm, Tmp->CMcondition);
Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n",
Tmp->Rank, Tmp->CMcondition);
}
Tmp = Tmp->Next;
}
}
pthread_mutex_unlock(&Stream->DataLock);
Svcs->verbose(Stream->CP_Stream,
"Done Failing requests to writer from stream %p\n", Stream);
"Done Failing requests to writer %d from stream %p\n",
FailedRank, Stream);
}

typedef struct _EvpathPerTimestepInfo
Expand All @@ -808,6 +852,7 @@ typedef struct _EvpathPerTimestepInfo
int CheckInt;
} * EvpathPerTimestepInfo;

// reader-side routine, called from the main program
static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
int Rank, long Timestep, size_t Offset,
size_t Length, void *Buffer,
Expand All @@ -817,7 +862,8 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
Stream_v; /* DP_RS_Stream is the return from InitReader */
CManager cm = Svcs->getCManager(Stream->CP_Stream);
EvpathCompletionHandle ret = malloc(sizeof(struct _EvpathCompletionHandle));
EvpathPerTimestepInfo TimestepInfo = (EvpathPerTimestepInfo)DP_TimestepInfo;
// EvpathPerTimestepInfo TimestepInfo =
// (EvpathPerTimestepInfo)DP_TimestepInfo;
struct _EvpathReadRequestMsg ReadRequestMsg;

int HadPreload;
Expand Down Expand Up @@ -1065,8 +1111,9 @@ static void EvpathReaderReleaseTimestep(CP_Services Svcs,
Evpath_WSR_Stream WSR_Stream = (Evpath_WSR_Stream)Stream_v;
Evpath_WS_Stream WS_Stream =
WSR_Stream->WS_Stream; /* pointer to writer struct */
TimestepList tmp = WS_Stream->Timesteps;
TimestepList tmp;

tmp = WS_Stream->Timesteps;
if ((!WSR_Stream->ReaderRequests) &&
(Timestep >= WSR_Stream->ReadPatternLockTimestep))
{
Expand Down Expand Up @@ -1128,7 +1175,7 @@ static void EvpathProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
{
Evpath_WS_Stream Stream = (Evpath_WS_Stream)Stream_v;
TimestepList Entry = malloc(sizeof(struct _TimestepEntry));
struct _EvpathPerTimestepInfo *Info = NULL;
// struct _EvpathPerTimestepInfo *Info = NULL;
// malloc(sizeof(struct _EvpathPerTimestepInfo));

// This section exercised the CP's ability to distribute DP per timestep
Expand Down Expand Up @@ -1244,17 +1291,17 @@ static FMStructDescRec EvpathWriterContactStructs[] = {
sizeof(struct _EvpathWriterContactInfo), NULL},
{NULL, NULL, 0, NULL}};

static FMField EvpathTimestepInfoList[] = {
{"CheckString", "string", sizeof(char *),
FMOffset(EvpathPerTimestepInfo, CheckString)},
{"CheckInt", "integer", sizeof(void *),
FMOffset(EvpathPerTimestepInfo, CheckInt)},
{NULL, NULL, 0, 0}};

static FMStructDescRec EvpathTimestepInfoStructs[] = {
{"EvpathTimestepInfo", EvpathTimestepInfoList,
sizeof(struct _EvpathPerTimestepInfo), NULL},
{NULL, NULL, 0, NULL}};
// static FMField EvpathTimestepInfoList[] = {
// {"CheckString", "string", sizeof(char *),
// FMOffset(EvpathPerTimestepInfo, CheckString)},
// {"CheckInt", "integer", sizeof(void *),
// FMOffset(EvpathPerTimestepInfo, CheckInt)},
// {NULL, NULL, 0, 0}};

// static FMStructDescRec EvpathTimestepInfoStructs[] = {
// {"EvpathTimestepInfo", EvpathTimestepInfoList,
// sizeof(struct _EvpathPerTimestepInfo), NULL},
// {NULL, NULL, 0, NULL}};

static struct _CP_DP_Interface evpathDPInterface;

Expand Down

0 comments on commit e877f36

Please sign in to comment.