Skip to content

Commit

Permalink
A start - not done
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Hursey <jhursey@us.ibm.com>
  • Loading branch information
jjhursey committed Jan 30, 2020
1 parent 3a91e04 commit ed37976
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/mca/iof/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "src/class/prrte_bitmap.h"
#include "src/mca/mca.h"
#include "src/event/event-internal.h"
#include "src/pmix/pmix-internal.h"
#include "src/util/fd.h"

#include "src/mca/iof/iof.h"
Expand Down Expand Up @@ -131,6 +132,7 @@ typedef struct {
prrte_iof_read_event_t *revstderr;
prrte_list_t *subscribers;
bool copy;
prrte_mutex_t lock;
} prrte_iof_proc_t;
PRRTE_EXPORT PRRTE_CLASS_DECLARATION(prrte_iof_proc_t);

Expand Down Expand Up @@ -275,6 +277,7 @@ PRRTE_EXPORT extern prrte_iof_base_t prrte_iof_base;
PRRTE_EXPORT int prrte_iof_base_write_output(const prrte_process_name_t *name, prrte_iof_tag_t stream,
const unsigned char *data, int numbytes,
prrte_iof_write_event_t *channel);
PRRTE_EXPORT void prrte_iof_base_static_drop_input(prrte_iof_proc_t* proct);
PRRTE_EXPORT void prrte_iof_base_static_dump_output(prrte_iof_read_event_t *rev);
PRRTE_EXPORT void prrte_iof_base_write_handler(int fd, short event, void *cbdata);

Expand Down
2 changes: 2 additions & 0 deletions src/mca/iof/base/iof_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ static void prrte_iof_base_proc_construct(prrte_iof_proc_t* ptr)
ptr->revstderr = NULL;
ptr->subscribers = NULL;
ptr->copy = true;
PRRTE_CONSTRUCT(&(ptr->lock), prrte_mutex_t);
}
static void prrte_iof_base_proc_destruct(prrte_iof_proc_t* ptr)
{
Expand All @@ -196,6 +197,7 @@ static void prrte_iof_base_proc_destruct(prrte_iof_proc_t* ptr)
if (NULL != ptr->subscribers) {
PRRTE_LIST_RELEASE(ptr->subscribers);
}
PRRTE_DESTRUCT(&(ptr->lock));
}
PRRTE_CLASS_INSTANCE(prrte_iof_proc_t,
prrte_list_item_t,
Expand Down
12 changes: 12 additions & 0 deletions src/mca/iof/base/iof_base_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,18 @@ int prrte_iof_base_write_output(const prrte_process_name_t *name, prrte_iof_tag_
return num_buffered;
}

void prrte_iof_base_static_drop_input(prrte_iof_proc_t* proct)
{
prrte_output(0,"%s iof:hnp dropping all stdin to process %s",
PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME),
PRRTE_NAME_PRINT(&proct->name));


prrte_output(0,"%s iof:hnp drop to process %s - all done",
PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME),
PRRTE_NAME_PRINT(&proct->name));
}

void prrte_iof_base_static_dump_output(prrte_iof_read_event_t *rev)
{
bool dump;
Expand Down
34 changes: 32 additions & 2 deletions src/mca/iof/hnp/iof_hnp.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ static int init(void)
NULL);

PRRTE_CONSTRUCT(&prrte_iof_hnp_component.procs, prrte_list_t);
PRRTE_CONSTRUCT(&prrte_iof_hnp_component.procs_lock, prrte_mutex_t);
prrte_iof_hnp_component.stdinev = NULL;

return PRRTE_SUCCESS;
Expand Down Expand Up @@ -154,6 +155,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta
fd, PRRTE_NAME_PRINT(dst_name)));

/* do we already have this process in our list? */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, dst_name)) {
/* found it */
Expand All @@ -167,6 +169,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta
prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super);

SETUP:
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
/* set the file descriptor to non-blocking - do this before we setup
* and activate the read event in case it fires right away
*/
Expand Down Expand Up @@ -206,6 +209,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta
if (proct->copy) {
/* see if there are any wildcard subscribers out there that
* apply to us */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(pptr, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (dst_name->jobid == pptr->name.jobid &&
PRRTE_VPID_WILDCARD == pptr->name.vpid &&
Expand All @@ -215,6 +219,7 @@ static int hnp_push(const prrte_process_name_t* dst_name, prrte_iof_tag_t src_ta
break;
}
}
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
}
PRRTE_IOF_READ_ACTIVATE(proct->revstdout);
if (!prrte_iof_base.redirect_app_stderr_to_stdout) {
Expand Down Expand Up @@ -250,15 +255,25 @@ static int push_stdin(const prrte_process_name_t* dst_name,

/* do we already have this process in our list? */
proct = NULL;
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(pptr, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &pptr->name, dst_name)) {
/* found it */
proct = pptr;
}
}
if (NULL == proct) {
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
return PRRTE_ERR_NOT_FOUND;
}
// Grab the process lock before giving up the list lock
// this prevents the caller from trashing this proc
prrte_mutex_lock(&(proct->lock));
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);

prrte_output(0, "%s iof:hnp stdin %s - locked",
PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME),
PRRTE_NAME_PRINT(dst_name));

/* pass the data to the sink */
prrte_output(0,"%s iof:hnp pushing stdin for process %s: size %zu",
Expand All @@ -275,6 +290,7 @@ static int push_stdin(const prrte_process_name_t* dst_name,
prrte_output(0, "%s iof:hnp pushing stdin for process %s failed stdinev is no longer valid (JJH DEBUG)",
PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME),
PRRTE_NAME_PRINT(dst_name));
prrte_mutex_unlock(&(proct->lock));
return PRRTE_SUCCESS;
}

Expand All @@ -298,6 +314,7 @@ static int push_stdin(const prrte_process_name_t* dst_name,

PRRTE_OUTPUT_VERBOSE((1, prrte_iof_base_framework.framework_output,
"buffer backed up - holding"));
prrte_mutex_unlock(&(proct->lock));
return PRRTE_ERR_OUT_OF_RESOURCE;
}
}
Expand Down Expand Up @@ -325,7 +342,7 @@ static int push_stdin(const prrte_process_name_t* dst_name,
}
}
}

prrte_mutex_unlock(&(proct->lock));
return PRRTE_SUCCESS;
}

Expand Down Expand Up @@ -364,6 +381,7 @@ static int hnp_pull(const prrte_process_name_t* dst_name,
}

/* do we already have this process in our list? */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, dst_name)) {
/* found it */
Expand All @@ -377,6 +395,7 @@ static int hnp_pull(const prrte_process_name_t* dst_name,
prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super);

SETUP:
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
PRRTE_IOF_SINK_DEFINE(&proct->stdinev, dst_name, fd, PRRTE_IOF_STDIN,
stdin_write_handler);
proct->stdinev->daemon.jobid = PRRTE_PROC_MY_NAME->jobid;
Expand All @@ -398,11 +417,13 @@ static int hnp_close(const prrte_process_name_t* peer,
prrte_output(0,"%s iof:hnp closing connection to process %s",
PRRTE_NAME_PRINT(PRRTE_PROC_MY_NAME),
PRRTE_NAME_PRINT(peer));

prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, peer)) {
prrte_mutex_lock(&(proct->lock));
if (PRRTE_IOF_STDIN & source_tag) {
if (NULL != proct->stdinev) {
prrte_iof_base_static_drop_input(proct);
PRRTE_RELEASE(proct->stdinev);
}
proct->stdinev = NULL;
Expand All @@ -422,6 +443,8 @@ static int hnp_close(const prrte_process_name_t* peer,
}
proct->revstderr = NULL;
}
prrte_mutex_unlock(&(proct->lock));

/* if we closed them all, then remove this proc */
if (NULL == proct->stdinev &&
NULL == proct->revstdout &&
Expand All @@ -432,6 +455,8 @@ static int hnp_close(const prrte_process_name_t* peer,
break;
}
}
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);

return PRRTE_SUCCESS;
}

Expand All @@ -440,6 +465,7 @@ static void hnp_complete(const prrte_job_t *jdata)
prrte_iof_proc_t *proct, *next;

/* cleanout any lingering sinks */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH_SAFE(proct, next, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (jdata->jobid == proct->name.jobid) {
prrte_list_remove_item(&prrte_iof_hnp_component.procs, &proct->super);
Expand All @@ -456,6 +482,7 @@ static void hnp_complete(const prrte_job_t *jdata)
PRRTE_RELEASE(proct);
}
}
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
}

static int finalize(void)
Expand Down Expand Up @@ -503,6 +530,7 @@ static int finalize(void)

/* cycle thru the procs and ensure all their output was delivered
* if they were writing to files */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
while (NULL != (proct = (prrte_iof_proc_t*)prrte_list_remove_first(&prrte_iof_hnp_component.procs))) {
if (NULL != proct->revstdout) {
prrte_iof_base_static_dump_output(proct->revstdout);
Expand All @@ -513,6 +541,8 @@ static int finalize(void)
PRRTE_RELEASE(proct);
}
PRRTE_DESTRUCT(&prrte_iof_hnp_component.procs);
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
PRRTE_DESTRUCT(&prrte_iof_hnp_component.procs_lock);

return PRRTE_SUCCESS;
}
Expand Down
1 change: 1 addition & 0 deletions src/mca/iof/hnp/iof_hnp.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ BEGIN_C_DECLS
struct prrte_iof_hnp_component_t {
prrte_iof_base_component_t super;
prrte_list_t procs;
prrte_mutex_t procs_lock;
prrte_iof_read_event_t *stdinev;
prrte_event_t stdinsig;
};
Expand Down
6 changes: 6 additions & 0 deletions src/mca/iof/hnp/iof_hnp_receive.c
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
exclusive = false;
}
/* do we already have this process in our list? */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, &origin)) {
/* found it */
Expand All @@ -152,6 +153,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super);

PROCESS:
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
/* a tool is requesting that we send it a copy of the specified stream(s)
* from the specified process(es), so create a sink for it
*/
Expand Down Expand Up @@ -191,6 +193,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
/* a tool is requesting that we no longer forward a copy of the
* specified stream(s) from the specified process(es) - remove the sink
*/
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL != prrte_util_compare_name_fields(mask, &proct->name, &origin)) {
continue;
Expand All @@ -215,6 +218,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
}
}
}
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
goto CLEAN_RETURN;
}

Expand All @@ -232,6 +236,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
PRRTE_NAME_PRINT(&origin)));

/* do we already have this process in our list? */
prrte_mutex_lock(&prrte_iof_hnp_component.procs_lock);
PRRTE_LIST_FOREACH(proct, &prrte_iof_hnp_component.procs, prrte_iof_proc_t) {
if (PRRTE_EQUAL == prrte_util_compare_name_fields(mask, &proct->name, &origin)) {
/* found it */
Expand All @@ -245,6 +250,7 @@ void prrte_iof_hnp_recv(int status, prrte_process_name_t* sender,
prrte_list_append(&prrte_iof_hnp_component.procs, &proct->super);

NSTEP:
prrte_mutex_unlock(&prrte_iof_hnp_component.procs_lock);
/* cycle through the endpoints to see if someone else wants a copy */
exclusive = false;
if (NULL != proct->subscribers) {
Expand Down

0 comments on commit ed37976

Please sign in to comment.