From ed37976c36e267bf4a46eb20bee91bdbd8e81aa9 Mon Sep 17 00:00:00 2001 From: Joshua Hursey Date: Thu, 30 Jan 2020 13:34:34 -0500 Subject: [PATCH] A start - not done Signed-off-by: Joshua Hursey --- src/mca/iof/base/base.h | 3 +++ src/mca/iof/base/iof_base_frame.c | 2 ++ src/mca/iof/base/iof_base_output.c | 12 +++++++++++ src/mca/iof/hnp/iof_hnp.c | 34 ++++++++++++++++++++++++++++-- src/mca/iof/hnp/iof_hnp.h | 1 + src/mca/iof/hnp/iof_hnp_receive.c | 6 ++++++ 6 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/mca/iof/base/base.h b/src/mca/iof/base/base.h index f4d27f52ab..c8df772a92 100644 --- a/src/mca/iof/base/base.h +++ b/src/mca/iof/base/base.h @@ -51,6 +51,7 @@ #include "src/class/prrte_bitmap.h" #include "src/mca/mca.h" #include "src/event/event-internal.h" +#include "src/pmix/pmix-internal.h" #include "src/util/fd.h" #include "src/mca/iof/iof.h" @@ -131,6 +132,7 @@ typedef struct { prrte_iof_read_event_t *revstderr; prrte_list_t *subscribers; bool copy; + prrte_mutex_t lock; } prrte_iof_proc_t; PRRTE_EXPORT PRRTE_CLASS_DECLARATION(prrte_iof_proc_t); @@ -275,6 +277,7 @@ PRRTE_EXPORT extern prrte_iof_base_t prrte_iof_base; PRRTE_EXPORT int prrte_iof_base_write_output(const prrte_process_name_t *name, prrte_iof_tag_t stream, const unsigned char *data, int numbytes, prrte_iof_write_event_t *channel); +PRRTE_EXPORT void prrte_iof_base_static_drop_input(prrte_iof_proc_t* proct); PRRTE_EXPORT void prrte_iof_base_static_dump_output(prrte_iof_read_event_t *rev); PRRTE_EXPORT void prrte_iof_base_write_handler(int fd, short event, void *cbdata); diff --git a/src/mca/iof/base/iof_base_frame.c b/src/mca/iof/base/iof_base_frame.c index 714e90f2b1..4e4959ec4c 100644 --- a/src/mca/iof/base/iof_base_frame.c +++ b/src/mca/iof/base/iof_base_frame.c @@ -181,6 +181,7 @@ static void prrte_iof_base_proc_construct(prrte_iof_proc_t* ptr) ptr->revstderr = NULL; ptr->subscribers = NULL; ptr->copy = true; + PRRTE_CONSTRUCT(&(ptr->lock), prrte_mutex_t); } static void prrte_iof_base_proc_destruct(prrte_iof_proc_t* ptr) { @@ -196,6 +197,7 @@ static void prrte_iof_base_proc_destruct(prrte_iof_proc_t* ptr) if (NULL != ptr->subscribers) { PRRTE_LIST_RELEASE(ptr->subscribers); } + PRRTE_DESTRUCT(&(ptr->lock)); } PRRTE_CLASS_INSTANCE(prrte_iof_proc_t, prrte_list_item_t, diff --git a/src/mca/iof/base/iof_base_output.c b/src/mca/iof/base/iof_base_output.c index 92c6776af4..f88d675d69 100644 --- a/src/mca/iof/base/iof_base_output.c +++ b/src/mca/iof/base/iof_base_output.c @@ -279,6 +279,18 @@ int prrte_iof_base_write_output(const prrte_process_name_t *name, prrte_iof_tag_ return num_buffered; } +void prrte_iof_base_static_drop_input(prrte_iof_proc_t* proct) +{ + prrte_output(0,"%s iof:hnp dropping all stdin to process %s", + PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME), + PRRTE_NAME_PRINT(&proct->name)); + + + prrte_output(0,"%s iof:hnp drop to process %s - all done", + PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME), + PRRTE_NAME_PRINT(&proct->name)); +} + void prrte_iof_base_static_dump_output(prrte_iof_read_event_t *rev) { bool dump; diff --git a/src/mca/iof/hnp/iof_hnp.c b/src/mca/iof/hnp/iof_hnp.c index bd0246edbc..e2a1c6251b 100644 --- a/src/mca/iof/hnp/iof_hnp.c +++ b/src/mca/iof/hnp/iof_hnp.c @@ -115,6 +115,7 @@ static int init(void) NULL); PRRTE_CONSTRUCT(&prrte_iof_hnp_component.procs, prrte_list_t); + PRRTE_CONSTRUCT(&prrte_iof_hnp_component.procs_lock, prrte_mutex_t); prrte_iof_hnp_component.stdinev = NULL; return PRRTE_SUCCESS; @@ -154,6 +155,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta fd, PRRTE_NAME_PRINT(dst_name))); /* do we already have this process in our list? */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, dst_name)) { /* found it */ @@ -167,6 +169,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super); SETUP: + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); /* set the file descriptor to non-blocking - do this before we setup * and activate the read event in case it fires right away */ @@ -206,6 +209,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta if (proct->copy) { /* see if there are any wildcard subscribers out there that * apply to us */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(pptr, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (dst_name->jobid == pptr->name.jobid && PRRTE_VPID_WILDCARD == pptr->name.vpid && @@ -215,6 +219,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta break; } } + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); } PRRTE_IOF_READ_ACTIVATE(proct->revstdout); if (!prrte_iof_base.redirect_app_stderr_to_stdout) { @@ -250,6 +255,7 @@ static int push_stdin(const prrte_process_name_t* dst_name, /* do we already have this process in our list? */ proct = NULL; + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(pptr, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &pptr->name, dst_name)) { /* found it */ @@ -257,8 +263,17 @@ static int push_stdin(const prrte_process_name_t* dst_name, } } if (NULL == proct) { + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); return PRRTE_ERR_NOT_FOUND; } + // Grab the process lock before giving up the list lock + // this prevents the caller from trashing this proc + prrte_mutex_lock(&(proct->lock)); + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); + + prrte_output(0, "%s iof:hnp stdin %s - locked", + PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME), + PRRTE_NAME_PRINT(dst_name)); /* pass the data to the sink */ prrte_output(0,"%s iof:hnp pushing stdin for process %s: size %zu", @@ -275,6 +290,7 @@ static int push_stdin(const prrte_process_name_t* dst_name, prrte_output(0, "%s iof:hnp pushing stdin for process %s failed stdinev is no longer valid (JJH DEBUG)", PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME), PRRTE_NAME_PRINT(dst_name)); + prrte_mutex_unlock(&(proct->lock)); return PRRTE_SUCCESS; } @@ -298,6 +314,7 @@ static int push_stdin(const prrte_process_name_t* dst_name, PRRTE_OUTPUT_VERBOSE((1, prrte_iof_base_framework.framework_output, "buffer backed up - holding")); + prrte_mutex_unlock(&(proct->lock)); return PRRTE_ERR_OUT_OF_RESOURCE; } } @@ -325,7 +342,7 @@ static int push_stdin(const prrte_process_name_t* dst_name, } } } - + prrte_mutex_unlock(&(proct->lock)); return PRRTE_SUCCESS; } @@ -364,6 +381,7 @@ static int hnp_pull(const prrte_process_name_t* dst_name, } /* do we already have this process in our list? */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, dst_name)) { /* found it */ @@ -377,6 +395,7 @@ static int hnp_pull(const prrte_process_name_t* dst_name, prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super); SETUP: + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); PRRTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, PRRTE_IOF_STDIN, stdin_write_handler); proct->stdinev->daemon.jobid = PRRTE_PROC_MY_NAME->jobid; @@ -398,11 +417,13 @@ static int hnp_close(const prrte_process_name_t* peer, prrte_output(0,"%s iof:hnp closing connection to process %s", PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME), PRRTE_NAME_PRINT(peer)); - + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, peer)) { + prrte_mutex_lock(&(proct->lock)); if (PRRTE_IOF_STDIN & source_tag) { if (NULL != proct->stdinev) { + prrte_iof_base_static_drop_input(proct); PRRTE_RELEASE(proct->stdinev); } proct->stdinev = NULL; @@ -422,6 +443,8 @@ static int hnp_close(const prrte_process_name_t* peer, } proct->revstderr = NULL; } + prrte_mutex_unlock(&(proct->lock)); + /* if we closed them all, then remove this proc */ if (NULL == proct->stdinev && NULL == proct->revstdout && @@ -432,6 +455,8 @@ static int hnp_close(const prrte_process_name_t* peer, break; } } + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); + return PRRTE_SUCCESS; } @@ -440,6 +465,7 @@ static void hnp_complete(const prrte_job_t *jdata) prrte_iof_proc_t *proct, *next; /* cleanout any lingering sinks */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH_SAFE(proct, next, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (jdata->jobid == proct->name.jobid) { prrte_list_remove_item(&prrte_iof_hnp_component.procs, &proct->super); @@ -456,6 +482,7 @@ static void hnp_complete(const prrte_job_t *jdata) PRRTE_RELEASE(proct); } } + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); } static int finalize(void) @@ -503,6 +530,7 @@ static int finalize(void) /* cycle thru the procs and ensure all their output was delivered * if they were writing to files */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); while (NULL != (proct = (prrte_iof_proc_t*)prrte_list_remove_first(&prrte_iof_hnp_component.procs))) { if (NULL != proct->revstdout) { prrte_iof_base_static_dump_output(proct->revstdout); @@ -513,6 +541,8 @@ static int finalize(void) PRRTE_RELEASE(proct); } PRRTE_DESTRUCT(&prrte_iof_hnp_component.procs); + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); + PRRTE_DESTRUCT(&prrte_iof_hnp_component.procs_lock); return PRRTE_SUCCESS; } diff --git a/src/mca/iof/hnp/iof_hnp.h b/src/mca/iof/hnp/iof_hnp.h index 0aad9b3b39..76333bebcd 100644 --- a/src/mca/iof/hnp/iof_hnp.h +++ b/src/mca/iof/hnp/iof_hnp.h @@ -64,6 +64,7 @@ BEGIN_C_DECLS struct prrte_iof_hnp_component_t { prrte_iof_base_component_t super; prrte_list_t procs; + prrte_mutex_t procs_lock; prrte_iof_read_event_t *stdinev; prrte_event_t stdinsig; }; diff --git a/src/mca/iof/hnp/iof_hnp_receive.c b/src/mca/iof/hnp/iof_hnp_receive.c index edb1d779ce..d625651afb 100644 --- a/src/mca/iof/hnp/iof_hnp_receive.c +++ b/src/mca/iof/hnp/iof_hnp_receive.c @@ -139,6 +139,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, exclusive = false; } /* do we already have this process in our list? */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, &origin)) { /* found it */ @@ -152,6 +153,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super); PROCESS: + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); /* a tool is requesting that we send it a copy of the specified stream(s) * from the specified process(es), so create a sink for it */ @@ -191,6 +193,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, /* a tool is requesting that we no longer forward a copy of the * specified stream(s) from the specified process(es) - remove the sink */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL != prrte_util_compare_name_fields(mask, &proct->name, &origin)) { continue; @@ -215,6 +218,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, } } } + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); goto CLEAN_RETURN; } @@ -232,6 +236,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, PRRTE_NAME_PRINT(&origin))); /* do we already have this process in our list? */ + prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock); PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) { if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, &origin)) { /* found it */ @@ -245,6 +250,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender, prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super); NSTEP: + prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock); /* cycle through the endpoints to see if someone else wants a copy */ exclusive = false; if (NULL != proct->subscribers) {