Skip to content

Commit

Permalink
Merge pull request #3588 from vicentebolea/fix-mpi-dp
Browse files Browse the repository at this point in the history
Fix MPI Data plane cohort handling
  • Loading branch information
vicentebolea authored Oct 31, 2023
2 parents 2054006 + 4e2f6eb commit c77df61
Showing 1 changed file with 67 additions and 17 deletions.
84 changes: 67 additions & 17 deletions source/adios2/toolkit/sst/dp/mpi_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include <mpi.h>

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -54,15 +55,15 @@ typedef struct _MpiWriterContactInfo
{
char ContactString[MPI_DP_CONTACT_STRING_LEN];
void *StreamWPR;
int PID;
long taskID;
} *MpiWriterContactInfo;

/* Base Stream class, used implicitly */
typedef struct _MpiStream
{
void *CP_Stream;
int Rank;
int PID;
long taskID;
} MpiStream;

/* Link Stream class, used implicitly */
Expand Down Expand Up @@ -218,7 +219,7 @@ static FMField MpiWriterContactList[] = {
{"ContactString", "char[" MACRO_TO_STR(MPI_DP_CONTACT_STRING_LEN) "]", sizeof(char),
FMOffset(MpiWriterContactInfo, ContactString)},
{"writer_ID", "integer", sizeof(void *), FMOffset(MpiWriterContactInfo, StreamWPR)},
{"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)},
{"taskID", "integer", sizeof(long), FMOffset(MpiWriterContactInfo, taskID)},
{NULL, NULL, 0, 0}};

static FMStructDescRec MpiWriterContactStructs[] = {
Expand All @@ -227,6 +228,16 @@ static FMStructDescRec MpiWriterContactStructs[] = {

/*****Internal functions*****************************************************/

/**
* Return an unique process ID (Task ID) for the current process. We do this by
* combining the PID of the process and the hostid (as return the same output
* as `hostid` or the content of /etc/machine-id in modern UNIX-like systems).
*/
static uint64_t GetUniqueTaskId()
{
return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid();
}

static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, void *client_Data,
attr_list attrs);

Expand All @@ -242,9 +253,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v
* the reader side. It should do whatever is necessary to initialize a new
* reader-side data plane. A pointer to per-reader-rank contact information
* should be placed in *ReaderContactInfoPtr. The structure of that
* information should be described by DPInterface.ReaderContactFormats. (This
* is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* information should be described by DPInterface.ReaderContactFormats.
*/
static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr,
struct _SstParams *Params, attr_list WriterContact,
Expand All @@ -256,7 +265,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream, void **Read
CMFormat F;

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
Stream->Link.Stats = Stats;

SMPI_Comm_rank(comm, &Stream->Stream.Rank);
Expand Down Expand Up @@ -305,7 +314,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst
SMPI_Comm_rank(comm, &Stream->Stream.Rank);

Stream->Stream.CP_Stream = CP_Stream;
Stream->Stream.PID = getpid();
Stream->Stream.taskID = GetUniqueTaskId();
STAILQ_INIT(&Stream->TimeSteps);
TAILQ_INIT(&Stream->Readers);

Expand All @@ -329,8 +338,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream, struct _Sst
* on the connecting peer in InitReader) and should create its own
* per-writer-rank contact information and place it in *writerContactInfoPtr.
* The structure of that information should be described by
* DPInterface.WriterContactFormats. (This is an FFS format description. See
* https://www.cc.gatech.edu/systems/projects/FFS/.)
* DPInterface.WriterContactFormats.
*/
static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
int readerCohortSize, CP_PeerCohort PeerCohort,
Expand Down Expand Up @@ -372,7 +380,7 @@ static DP_WSR_Stream MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_St
"Writer Rank %d, test contact", Rank);

StreamWPR->MyContactInfo.StreamWPR = StreamWPR;
StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID;
StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID;
*WriterContactInfoPtr = &StreamWPR->MyContactInfo;

return StreamWPR;
Expand Down Expand Up @@ -474,7 +482,7 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, int Ra
ret->cm = cm;
ret->CPStream = Stream->Stream.CP_Stream;
ret->DestinationRank = Rank;
ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL : MPI_DP_REMOTE;
ret->CommType = (TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL : MPI_DP_REMOTE;

if (ret->CommType == MPI_DP_REMOTE)
{
Expand Down Expand Up @@ -541,7 +549,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
}
else
{
Svcs->verbose(Handle->CPStream, DPTraceVerbose,
Svcs->verbose(Handle->CPStream, DPCriticalVerbose,
"Remote memory read to rank %d with condition %d has FAILED"
"because of "
"writer failure\n",
Expand Down Expand Up @@ -580,7 +588,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, v
if (!RequestedData)
{
PERFSTUBS_TIMER_STOP_FUNC(timer);
Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
"Failed to read TimeStep %ld, not found\n", ReadRequestMsg->TimeStep);
return;
}
Expand Down Expand Up @@ -799,11 +807,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v, int Fa
FailedPeerRank);
}

/** MpiDisconnectWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* simply disconnect the mpi communicator, it does not frees any data
* structure. We must do it in this way since:
*
* - There is the possibility of the failed peer to re-enter in the network.
* - We must disconnect the MPI port for that particular mpi reader task since
* otherwise it the reader task might hung in mpi_finalize, in the case the
* the failure leads to a application graceful exit.
*/
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
MpiStreamWR StreamWR = StreamWPR->StreamWR;

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
{
MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
}
}
}

/**
* MpiDestroyWriterPerReader.
*
* This is called whenever a reader disconnect from a writer. This function
* also removes the StreamWPR from its own StreamWR.
* This is called by the MpiDestroyWriter function. This function will free any resource
* allocated to the particulare WriterPerReader instance (StreamWPR).
*/
static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
Expand All @@ -812,6 +851,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream

const int CohortSize = StreamWPR->Link.CohortSize;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
StreamWR->Stream.Rank);

for (int i = 0; i < CohortSize; i++)
{
if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
Expand All @@ -837,6 +880,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;

Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);

pthread_mutex_lock(&StreamWR->MutexReaders);
while (!TAILQ_EMPTY(&StreamWR->Readers))
{
Expand Down Expand Up @@ -866,6 +912,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;

Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
"MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);

const int CohortSize = StreamRS->Link.CohortSize;

for (int i = 0; i < CohortSize; i++)
Expand Down Expand Up @@ -896,7 +946,7 @@ extern CP_DP_Interface LoadMpiDP()
.getPriority = MpiGetPriority,
.destroyReader = MpiDestroyReader,
.destroyWriter = MpiDestroyWriter,
.destroyWriterPerReader = MpiDestroyWriterPerReader,
.destroyWriterPerReader = MpiDisconnectWriterPerReader,
.notifyConnFailure = MpiNotifyConnFailure,
};

Expand Down

0 comments on commit c77df61

Please sign in to comment.