diff --git a/source/adios2/toolkit/sst/dp/mpi_dp.c b/source/adios2/toolkit/sst/dp/mpi_dp.c index d82fc189b4..1be46913dc 100644 --- a/source/adios2/toolkit/sst/dp/mpi_dp.c +++ b/source/adios2/toolkit/sst/dp/mpi_dp.c @@ -30,6 +30,7 @@ #include +#include #include #include #include @@ -54,7 +55,7 @@ typedef struct _MpiWriterContactInfo { char ContactString[MPI_DP_CONTACT_STRING_LEN]; void *StreamWPR; - int PID; + long taskID; } *MpiWriterContactInfo; /* Base Stream class, used implicitly */ @@ -62,7 +63,7 @@ typedef struct _MpiStream { void *CP_Stream; int Rank; - int PID; + long taskID; } MpiStream; /* Link Stream class, used implicitly */ @@ -218,7 +219,7 @@ static FMField MpiWriterContactList[] = { {"ContactString", "char[" MACRO_TO_STR(MPI_DP_CONTACT_STRING_LEN) "]", sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)}, {"writer_ID", "integer", sizeof(void *), FMOffset(MpiWriterContactInfo, StreamWPR)}, - {"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)}, + {"taskID", "integer", sizeof(long), FMOffset(MpiWriterContactInfo, taskID)}, {NULL, NULL, 0, 0}}; static FMStructDescRec MpiWriterContactStructs[] = { @@ -227,6 +228,16 @@ static FMStructDescRec MpiWriterContactStructs[] = { /*****Internal functions*****************************************************/ +/** + * Return an unique process ID (Task ID) for the current process. We do this by + * combining the PID of the process and the hostid (as return the same output + * as `hostid` or the content of /etc/machine-id in modern UNIX-like systems). + */ +static uint64_t GetUniqueTaskId() +{ + return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid(); +} + static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data, attr_list attrs); @@ -242,9 +253,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v * the reader side. It should do whatever is necessary to initialize a new * reader-side data plane. A pointer to per-reader-rank contact information * should be placed in *ReaderContactInfoPtr. The structure of that - * information should be described by DPInterface.ReaderContactFormats. (This - * is an FFS format description. See - * https://www.cc.gatech.edu/systems/projects/FFS/.) + * information should be described by DPInterface.ReaderContactFormats. */ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, attr_list WriterContact, @@ -256,7 +265,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **Read CMFormat F; Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.taskID = GetUniqueTaskId(); Stream->Link.Stats = Stats; SMPI_Comm_rank(comm, &Stream->Stream.Rank); @@ -305,7 +314,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst SMPI_Comm_rank(comm, &Stream->Stream.Rank); Stream->Stream.CP_Stream = CP_Stream; - Stream->Stream.PID = getpid(); + Stream->Stream.taskID = GetUniqueTaskId(); STAILQ_INIT(&Stream->TimeSteps); TAILQ_INIT(&Stream->Readers); @@ -329,8 +338,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst * on the connecting peer in InitReader) and should create its own * per-writer-rank contact information and place it in *writerContactInfoPtr. * The structure of that information should be described by - * DPInterface.WriterContactFormats. (This is an FFS format description. See - * https://www.cc.gatech.edu/systems/projects/FFS/.) + * DPInterface.WriterContactFormats. */ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v, int readerCohortSize, CP_PeerCohort PeerCohort, @@ -372,7 +380,7 @@ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_St "Writer Rank %d, test contact", Rank); StreamWPR->MyContactInfo.StreamWPR = StreamWPR; - StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID; + StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID; *WriterContactInfoPtr = &StreamWPR->MyContactInfo; return StreamWPR; @@ -474,7 +482,7 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, int Ra ret->cm = cm; ret->CPStream = Stream->Stream.CP_Stream; ret->DestinationRank = Rank; - ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL : MPI_DP_REMOTE; + ret->CommType = (TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL : MPI_DP_REMOTE; if (ret->CommType == MPI_DP_REMOTE) { @@ -541,7 +549,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v) } else { - Svcs->verbose(Handle->CPStream, DPTraceVerbose, + Svcs->verbose(Handle->CPStream, DPCriticalVerbose, "Remote memory read to rank %d with condition %d has FAILED" "because of " "writer failure\n", @@ -580,7 +588,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v if (!RequestedData) { PERFSTUBS_TIMER_STOP_FUNC(timer); - Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose, + Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose, "Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep); return; } @@ -799,11 +807,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int Fa FailedPeerRank); } +/** MpiDisconnectWriterPerReader. + * + * This is called whenever a reader disconnect from a writer. This function + * simply disconnect the mpi communicator, it does not frees any data + * structure. We must do it in this way since: + * + * - There is the possibility of the failed peer to re-enter in the network. + * - We must disconnect the MPI port for that particular mpi reader task since + * otherwise it the reader task might hung in mpi_finalize, in the case the + * the failure leads to a application graceful exit. + */ +static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) +{ + MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v; + MpiStreamWR StreamWR = StreamWPR->StreamWR; + + const int CohortSize = StreamWPR->Link.CohortSize; + + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize, + StreamWR->Stream.Rank); + + for (int i = 0; i < CohortSize; i++) + { + if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) + { + MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]); + } + } +} + /** * MpiDestroyWriterPerReader. * - * This is called whenever a reader disconnect from a writer. This function - * also removes the StreamWPR from its own StreamWR. + * This is called by the MpiDestroyWriter function. This function will free any resource + * allocated to the particulare WriterPerReader instance (StreamWPR). */ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v) { @@ -812,6 +851,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream const int CohortSize = StreamWPR->Link.CohortSize; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize, + StreamWR->Stream.Rank); + for (int i = 0; i < CohortSize; i++) { if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL) @@ -837,6 +880,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) { MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v; + Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank); + pthread_mutex_lock(&StreamWR->MutexReaders); while (!TAILQ_EMPTY(&StreamWR->Readers)) { @@ -866,6 +912,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v) static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v) { MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v; + + Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose, + "MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank); + const int CohortSize = StreamRS->Link.CohortSize; for (int i = 0; i < CohortSize; i++) @@ -896,7 +946,7 @@ extern CP_DP_Interface LoadMpiDP() .getPriority = MpiGetPriority, .destroyReader = MpiDestroyReader, .destroyWriter = MpiDestroyWriter, - .destroyWriterPerReader = MpiDestroyWriterPerReader, + .destroyWriterPerReader = MpiDisconnectWriterPerReader, .notifyConnFailure = MpiNotifyConnFailure, };