diff --git a/ompi/mca/coll/acoll/coll_acoll.h b/ompi/mca/coll/acoll/coll_acoll.h index 36769db03fc..aaf636b1bae 100644 --- a/ompi/mca/coll/acoll/coll_acoll.h +++ b/ompi/mca/coll/acoll/coll_acoll.h @@ -33,6 +33,7 @@ BEGIN_C_DECLS /* Globally exported variables */ OMPI_DECLSPEC extern const mca_coll_base_component_3_0_0_t mca_coll_acoll_component; extern int mca_coll_acoll_priority; +extern int mca_coll_acoll_max_comms; extern int mca_coll_acoll_sg_size; extern int mca_coll_acoll_sg_scale; extern int mca_coll_acoll_node_size; @@ -75,7 +76,6 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base END_C_DECLS -#define MCA_COLL_ACOLL_MAX_CID 100 #define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10 typedef enum MCA_COLL_ACOLL_SG_SIZES { @@ -208,8 +208,10 @@ struct mca_coll_acoll_module_t { int mnode_log2_sg_size; int allg_lin; int allg_ring; - coll_acoll_subcomms_t subc[MCA_COLL_ACOLL_MAX_CID]; + int max_comms; + coll_acoll_subcomms_t **subc; coll_acoll_reserve_mem_t reserve_mem_s; + int num_subc; }; #ifdef HAVE_XPMEM_H diff --git a/ompi/mca/coll/acoll/coll_acoll_allgather.c b/ompi/mca/coll/acoll/coll_acoll_allgather.c index 26287215de2..5e4db719277 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allgather.c +++ b/ompi/mca/coll/acoll/coll_acoll_allgather.c @@ -481,21 +481,21 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty int brank, last_brank; int use_rd_base; mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); + coll_acoll_subcomms_t *subc = NULL; char *local_rbuf; ompi_communicator_t *intra_comm; - /* Fallback to ring if cid is beyond supported limit */ - if (cid >= MCA_COLL_ACOLL_MAX_CID) { + /* Obtain the subcomms structure */ + err = check_and_create_subc(comm, acoll_module, &subc); + /* Fallback to ring if subc is not obtained */ + if (NULL == subc) { return ompi_coll_base_allgather_intra_ring(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm, module); } - subc = &acoll_module->subc[cid]; size = ompi_comm_size(comm); if (!subc->initialized && size > 2) { - err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0); + err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0); if (MPI_SUCCESS != err) { return err; } diff --git a/ompi/mca/coll/acoll/coll_acoll_allreduce.c b/ompi/mca/coll/acoll/coll_acoll_allreduce.c index 6a248a73c5a..46c5554810c 100644 --- a/ompi/mca/coll/acoll/coll_acoll_allreduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_allreduce.c @@ -28,7 +28,8 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module, int intra); + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc, int intra); static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size) @@ -52,16 +53,13 @@ static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size) static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module) + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc) { int size; size_t total_dsize, dsize; - mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; - coll_acoll_init(module, comm, subc->data); + coll_acoll_init(module, comm, subc->data, subc); coll_acoll_data_t *data = subc->data; if (NULL == data) { return -1; @@ -188,16 +186,13 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module) + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc) { int size; size_t total_dsize, dsize; - mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; - coll_acoll_init(module, comm, subc->data); + coll_acoll_init(module, comm, subc->data, subc); coll_acoll_data_t *data = subc->data; if (NULL == data) { return -1; @@ -361,15 +356,13 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module, int intra) + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc, int intra) { size_t dsize; int err = MPI_SUCCESS; - mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; - coll_acoll_init(module, comm, subc->data); + + coll_acoll_init(module, comm, subc->data, subc); coll_acoll_data_t *data = subc->data; if (NULL == data) { return -1; @@ -385,7 +378,6 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c int l1_local_rank = data->l1_local_rank; int l2_local_rank = data->l2_local_rank; - int comm_id = ompi_comm_get_local_cid(comm); int offset1 = data->offset[0]; int offset2 = data->offset[1]; @@ -441,8 +433,8 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c } } - if (intra && (ompi_comm_size(acoll_module->subc[comm_id].numa_comm) > 1)) { - err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, acoll_module->subc[comm_id].numa_comm, module); + if (intra && (ompi_comm_size(subc->numa_comm) > 1)) { + err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, subc->numa_comm, module); } return err; } @@ -466,25 +458,23 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, return MPI_SUCCESS; } - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; - /* Falling back to recursivedoubling for non-commutative operators to be safe */ if (!ompi_op_is_commute(op)) { return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm, module); } - /* Fallback to knomial if cid is beyond supported limit */ - if (cid >= MCA_COLL_ACOLL_MAX_CID) { + /* Obtain the subcomms structure */ + coll_acoll_subcomms_t *subc = NULL; + err = check_and_create_subc(comm, acoll_module, &subc); + + /* Fallback to knomial if subc is not obtained */ + if (NULL == subc) { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); } - - subc = &acoll_module->subc[cid]; if (!subc->initialized) { - err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0); + err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0); if (MPI_SUCCESS != err) return err; } @@ -499,7 +489,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, comm, module); } else if (total_dsize < 512) { return mca_coll_acoll_allreduce_small_msgs_h(sbuf, rbuf, count, dtype, op, comm, module, - 1); + subc, 1); } else if (total_dsize <= 2048) { return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm, module); @@ -517,7 +507,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, } else if (total_dsize < 4194304) { #ifdef HAVE_XPMEM_H if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { - return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module); + return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); @@ -529,7 +519,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, } else if (total_dsize <= 16777216) { #ifdef HAVE_XPMEM_H if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { - mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module); + mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc); return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, @@ -542,7 +532,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count, } else { #ifdef HAVE_XPMEM_H if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) { - return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module); + return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc); } else { return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm, module); diff --git a/ompi/mca/coll/acoll/coll_acoll_barrier.c b/ompi/mca/coll/acoll/coll_acoll_barrier.c index a138027f444..8272136ad25 100644 --- a/ompi/mca/coll/acoll/coll_acoll_barrier.c +++ b/ompi/mca/coll/acoll/coll_acoll_barrier.c @@ -130,21 +130,22 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base ompi_request_t **reqs; int num_nodes; mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); + coll_acoll_subcomms_t *subc = NULL; - /* Fallback to linear if cid is beyond supported limit */ - if (cid >= MCA_COLL_ACOLL_MAX_CID) { + /* Obtain the subcomms structure */ + err = check_and_create_subc(comm, acoll_module, &subc); + + /* Fallback to linear if subcomms structure is not obtained */ + if (NULL == subc) { return ompi_coll_base_barrier_intra_basic_linear(comm, module); } - subc = &acoll_module->subc[cid]; size = ompi_comm_size(comm); if (size == 1) { return err; } if (!subc->initialized && size > 1) { - err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0); + err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0); if (MPI_SUCCESS != err) { return err; } diff --git a/ompi/mca/coll/acoll/coll_acoll_bcast.c b/ompi/mca/coll/acoll/coll_acoll_bcast.c index b423479db21..22103317d22 100644 --- a/ompi/mca/coll/acoll/coll_acoll_bcast.c +++ b/ompi/mca/coll/acoll/coll_acoll_bcast.c @@ -444,24 +444,25 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat size_t total_dsize, dsize; mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; bcast_subc_func bcast_func[2] = {&bcast_binomial, &bcast_flat_tree}; - coll_acoll_subcomms_t *subc; + coll_acoll_subcomms_t *subc = NULL; struct ompi_communicator_t *subcomms[MCA_COLL_ACOLL_NUM_SC] = {NULL}; int subc_roots[MCA_COLL_ACOLL_NUM_SC] = {-1}; - int cid = ompi_comm_get_local_cid(comm); - /* Fallback to knomial if cid is beyond supported limit */ - if (cid >= MCA_COLL_ACOLL_MAX_CID) { + /* Obtain the subcomms structure */ + err = check_and_create_subc(comm, acoll_module, &subc); + /* Fallback to knomial if subcomms is not obtained */ + if (NULL == subc) { return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4); } - subc = &acoll_module->subc[cid]; /* Fallback to knomial if no. of root changes is beyond a threshold */ - if (subc->num_root_change > MCA_COLL_ACOLL_ROOT_CHANGE_THRESH) { + if ((subc->num_root_change > MCA_COLL_ACOLL_ROOT_CHANGE_THRESH) + && (root != subc->prev_init_root)) { return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4); } size = ompi_comm_size(comm); if ((!subc->initialized || (root != subc->prev_init_root)) && size > 2) { - err = mca_coll_acoll_comm_split_init(comm, acoll_module, root); + err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, root); if (MPI_SUCCESS != err) { return err; } diff --git a/ompi/mca/coll/acoll/coll_acoll_component.c b/ompi/mca/coll/acoll/coll_acoll_component.c index 0214bfc89be..8f15b6b265c 100644 --- a/ompi/mca/coll/acoll/coll_acoll_component.c +++ b/ompi/mca/coll/acoll/coll_acoll_component.c @@ -25,6 +25,7 @@ const char *mca_coll_acoll_component_version_string * Global variables */ int mca_coll_acoll_priority = 0; +int mca_coll_acoll_max_comms = 10; int mca_coll_acoll_sg_size = 8; int mca_coll_acoll_sg_scale = 1; int mca_coll_acoll_node_size = 128; @@ -91,6 +92,11 @@ static int acoll_register(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_priority); /* Defaults on topology */ + (void) + mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "max_comms", + "Maximum no. of communicators using subgroup based algorithms", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_max_comms); (void) mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "sg_size", "Size of subgroup to be used for subgroup based algorithms", @@ -186,47 +192,10 @@ static int acoll_register(void) */ static void mca_coll_acoll_module_construct(mca_coll_acoll_module_t *module) { - for (int i = 0; i < MCA_COLL_ACOLL_MAX_CID; i++) { - coll_acoll_subcomms_t *subc = &module->subc[i]; - subc->initialized = 0; - subc->is_root_node = 0; - subc->is_root_sg = 0; - subc->is_root_numa = 0; - subc->outer_grp_root = -1; - subc->subgrp_root = 0; - subc->num_nodes = 1; - subc->prev_init_root = -1; - subc->num_root_change = 0; - subc->numa_root = 0; - subc->socket_ldr_root = -1; - subc->local_comm = NULL; - subc->local_r_comm = NULL; - subc->leader_comm = NULL; - subc->subgrp_comm = NULL; - subc->socket_comm = NULL; - subc->socket_ldr_comm = NULL; - for (int j = 0; j < MCA_COLL_ACOLL_NUM_LAYERS; j++) { - for (int k = 0; k < MCA_COLL_ACOLL_NUM_BASE_LYRS; k++) { - subc->base_comm[k][j] = NULL; - subc->base_root[k][j] = -1; - } - subc->local_root[j] = 0; - } - subc->numa_comm = NULL; - subc->numa_comm_ldrs = NULL; - subc->node_comm = NULL; - subc->inter_comm = NULL; - subc->cid = -1; - subc->initialized_data = false; - subc->initialized_shm_data = false; - subc->data = NULL; -#ifdef HAVE_XPMEM_H - subc->xpmem_buf_size = mca_coll_acoll_xpmem_buffer_size; - subc->without_xpmem = mca_coll_acoll_without_xpmem; - subc->xpmem_use_sr_buf = mca_coll_acoll_xpmem_use_sr_buf; -#endif - } + /* Set number of subcomms to 0 */ + module->num_subc = 0; + module->subc = NULL; /* Reserve memory init. Lazy allocation of memory when needed. */ (module->reserve_mem_s).reserve_mem = NULL; @@ -246,9 +215,8 @@ static void mca_coll_acoll_module_construct(mca_coll_acoll_module_t *module) */ static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module) { - - for (int i = 0; i < MCA_COLL_ACOLL_MAX_CID; i++) { - coll_acoll_subcomms_t *subc = &module->subc[i]; + for (int i = 0; i < module->num_subc; i++) { + coll_acoll_subcomms_t *subc = module->subc[i]; if (subc->initialized_data) { if (subc->initialized_shm_data) { if (subc->orig_comm != NULL) { @@ -334,8 +302,14 @@ static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module) } } subc->initialized = 0; + free(subc); + module->subc[i] = NULL; } + module->num_subc = 0; + free(module->subc); + module->subc = NULL; + if ((true == (module->reserve_mem_s).reserve_mem_allocate) && (NULL != (module->reserve_mem_s).reserve_mem)) { free((module->reserve_mem_s).reserve_mem); diff --git a/ompi/mca/coll/acoll/coll_acoll_module.c b/ompi/mca/coll/acoll/coll_acoll_module.c index b3b2afddc8b..3d9242226cf 100644 --- a/ompi/mca/coll/acoll/coll_acoll_module.c +++ b/ompi/mca/coll/acoll/coll_acoll_module.c @@ -77,6 +77,7 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co *priority = mca_coll_acoll_priority; /* Set topology params */ + acoll_module->max_comms = mca_coll_acoll_max_comms; acoll_module->sg_scale = mca_coll_acoll_sg_scale; acoll_module->sg_size = mca_coll_acoll_sg_size; acoll_module->sg_cnt = mca_coll_acoll_sg_size / mca_coll_acoll_sg_scale; diff --git a/ompi/mca/coll/acoll/coll_acoll_reduce.c b/ompi/mca/coll/acoll/coll_acoll_reduce.c index 836c8893158..505c3da5206 100644 --- a/ompi/mca/coll/acoll/coll_acoll_reduce.c +++ b/ompi/mca/coll/acoll/coll_acoll_reduce.c @@ -47,10 +47,10 @@ static inline int coll_reduce_decision_fixed(int comm_size, size_t msg_size) static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module) + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc) { int ret = MPI_SUCCESS, rank, sz; - int cid = ompi_comm_get_local_cid(comm); ptrdiff_t dsize, gap = 0; char *free_buffer = NULL; @@ -59,7 +59,6 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co char *tmp_sbuf = NULL; mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc = &acoll_module->subc[cid]; coll_acoll_reserve_mem_t *reserve_mem_rbuf_reduce = &(acoll_module->reserve_mem_s); rank = ompi_comm_rank(comm); @@ -158,7 +157,8 @@ static inline int coll_acoll_reduce_topo(const void *sbuf, void *rbuf, size_t co static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, struct ompi_communicator_t *comm, - mca_coll_base_module_t *module) + mca_coll_base_module_t *module, + coll_acoll_subcomms_t *subc) { int size; size_t total_dsize, dsize; @@ -166,10 +166,7 @@ static inline int mca_coll_acoll_reduce_xpmem(const void *sbuf, void *rbuf, size mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; - coll_acoll_init(module, comm, subc->data); + coll_acoll_init(module, comm, subc->data, subc); coll_acoll_reserve_mem_t *reserve_mem_rbuf_reduce = NULL; if (subc->xpmem_use_sr_buf != 0) { reserve_mem_rbuf_reduce = &(acoll_module->reserve_mem_s); @@ -337,19 +334,18 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, alg = coll_reduce_decision_fixed(size, total_dsize); - coll_acoll_subcomms_t *subc; - int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; + /* Obtain the subcomms structure */ + coll_acoll_subcomms_t *subc = NULL; + ret = check_and_create_subc(comm, acoll_module, &subc); - /* Fallback to knomial if cid is beyond supported limit */ - if (cid >= MCA_COLL_ACOLL_MAX_CID) { + /* Fallback to knomial if subc is not obtained */ + if (NULL == subc) { return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, module, 0, 0); } - subc = &acoll_module->subc[cid]; if (!subc->initialized || (root != subc->prev_init_root)) { - ret = mca_coll_acoll_comm_split_init(comm, acoll_module, 0); + ret = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0); if (MPI_SUCCESS != ret) { return ret; } @@ -360,7 +356,8 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, if (num_nodes == 1) { if (total_dsize < 262144) { if (alg == -1 /* interaction with xpmem implementation causing issues 0*/) { - return coll_acoll_reduce_topo(sbuf, rbuf, count, dtype, op, root, comm, module); + return coll_acoll_reduce_topo(sbuf, rbuf, count, dtype, op, root, comm, module, + subc); } else if (alg == 1) { return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype, op, root, comm, module); @@ -379,7 +376,7 @@ int mca_coll_acoll_reduce_intra(const void *sbuf, void *rbuf, size_t count, || ((subc->xpmem_use_sr_buf == 0) && (subc->xpmem_buf_size > 2 * total_dsize))) && (subc->without_xpmem != 1)) { return mca_coll_acoll_reduce_xpmem(sbuf, rbuf, count, dtype, op, root, comm, - module); + module, subc); } else { return ompi_coll_base_reduce_intra_binomial(sbuf, rbuf, count, dtype, op, root, comm, module, 0, 0); diff --git a/ompi/mca/coll/acoll/coll_acoll_utils.h b/ompi/mca/coll/acoll/coll_acoll_utils.h index 4b98a73ccd5..2ba56275db3 100644 --- a/ompi/mca/coll/acoll/coll_acoll_utils.h +++ b/ompi/mca/coll/acoll/coll_acoll_utils.h @@ -30,6 +30,10 @@ #define LEADER_SHM_SIZE 16384 #define PER_RANK_SHM_SIZE 8192 +extern uint64_t mca_coll_acoll_xpmem_buffer_size; +extern int mca_coll_acoll_without_xpmem; +extern int mca_coll_acoll_xpmem_use_sr_buf; + /* Function to allocate scratch buffer */ static inline void *coll_acoll_buf_alloc(coll_acoll_reserve_mem_t *reserve_mem_ptr, uint64_t size) @@ -73,6 +77,99 @@ static inline void coll_acoll_buf_free(coll_acoll_reserve_mem_t *reserve_mem_ptr } } +/* Function to check if subcomms structure is allocated and initialized */ +static inline int check_and_create_subc(ompi_communicator_t *comm, + mca_coll_acoll_module_t *acoll_module, + coll_acoll_subcomms_t **subc_ptr) +{ + int cid = ompi_comm_get_local_cid(comm); + int num_subc = acoll_module->num_subc; + coll_acoll_subcomms_t *subc; + + /* Return if max comms is not positive */ + if (acoll_module->max_comms <= 0) { + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, + "coll:acoll WARNING Set mca_coll_acoll_max_comms to positive value to use acoll!")); + *subc_ptr = NULL; + return MPI_SUCCESS; + } + + /* Allocate memory for subcomms array */ + if (NULL == acoll_module->subc) { + acoll_module->subc = malloc(sizeof(coll_acoll_subcomms_t**) * acoll_module->max_comms); + if (NULL == acoll_module->subc) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + } + + /* Check if subcomms structure is already created for the communicator */ + for (int i = 0; i < num_subc; i++) { + if (acoll_module->subc[i]->cid == cid) { + *subc_ptr = acoll_module->subc[i]; + return MPI_SUCCESS; + } + } + + /* Subcomms structure is not present, create one if within limit*/ + if (num_subc == acoll_module->max_comms) { + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, + "coll:acoll WARNING Falling back to base since max communicators limit %d exceeded, set mca_coll_acoll_max_comms to higher value to use acoll!", acoll_module->max_comms)); + *subc_ptr = NULL; + return MPI_SUCCESS; + } + *subc_ptr = (coll_acoll_subcomms_t *)malloc(sizeof(coll_acoll_subcomms_t)); + if (NULL == *subc_ptr) { + return MPI_SUCCESS; + } + /* Update the module with the new subc */ + acoll_module->subc[num_subc] = *subc_ptr; + acoll_module->num_subc++; + + /* Initialize elements of subc */ + subc = *subc_ptr; + subc->cid = cid; + subc->initialized = 0; + subc->is_root_node = 0; + subc->is_root_sg = 0; + subc->is_root_numa = 0; + subc->outer_grp_root = -1; + subc->subgrp_root = 0; + subc->num_nodes = 1; + subc->prev_init_root = -1; + subc->num_root_change = 0; + subc->numa_root = 0; + subc->socket_ldr_root = -1; + subc->orig_comm = comm; + subc->local_comm = NULL; + subc->local_r_comm = NULL; + subc->leader_comm = NULL; + subc->subgrp_comm = NULL; + subc->socket_comm = NULL; + subc->socket_ldr_comm = NULL; + for (int j = 0; j < MCA_COLL_ACOLL_NUM_LAYERS; j++) { + for (int k = 0; k < MCA_COLL_ACOLL_NUM_BASE_LYRS; k++) { + subc->base_comm[k][j] = NULL; + subc->base_root[k][j] = -1; + } + subc->local_root[j] = 0; + } + + subc->numa_comm = NULL; + subc->numa_comm_ldrs = NULL; + subc->node_comm = NULL; + subc->inter_comm = NULL; + subc->initialized_data = false; + subc->initialized_shm_data = false; + subc->data = NULL; +#ifdef HAVE_XPMEM_H + subc->xpmem_buf_size = mca_coll_acoll_xpmem_buffer_size; + subc->without_xpmem = mca_coll_acoll_without_xpmem; + subc->xpmem_use_sr_buf = mca_coll_acoll_xpmem_use_sr_buf; +#endif + return MPI_SUCCESS; + +} + /* Function to compare integer elements */ static int compare_values(const void *ptra, const void *ptrb) { @@ -155,7 +252,9 @@ static inline int mca_coll_acoll_create_base_comm(ompi_communicator_t **parent_c } static inline int mca_coll_acoll_comm_split_init(ompi_communicator_t *comm, - mca_coll_acoll_module_t *acoll_module, int root) + mca_coll_acoll_module_t *acoll_module, + coll_acoll_subcomms_t *subc, + int root) { opal_info_t comm_info; mca_coll_base_module_allreduce_fn_t coll_allreduce_org = (comm)->c_coll->coll_allreduce; @@ -164,19 +263,9 @@ static inline int mca_coll_acoll_comm_split_init(ompi_communicator_t *comm, mca_coll_base_module_allreduce_fn_t coll_allreduce_loc, coll_allreduce_soc; mca_coll_base_module_allgather_fn_t coll_allgather_loc, coll_allgather_soc; mca_coll_base_module_bcast_fn_t coll_bcast_loc, coll_bcast_soc; - coll_acoll_subcomms_t *subc; int err; int size = ompi_comm_size(comm); int rank = ompi_comm_rank(comm); - int cid = ompi_comm_get_local_cid(comm); - if (cid >= MCA_COLL_ACOLL_MAX_CID) { - return MPI_SUCCESS; - } - - /* Derive subcomm structure */ - subc = &acoll_module->subc[cid]; - subc->cid = cid; - subc->orig_comm = comm; (comm)->c_coll->coll_allgather = ompi_coll_base_allgather_intra_ring; (comm)->c_coll->coll_allreduce = ompi_coll_base_allreduce_intra_recursivedoubling; @@ -506,18 +595,14 @@ static inline int mca_coll_acoll_xpmem_deregister(void *xpmem_apid, #endif static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communicator_t *comm, - coll_acoll_data_t *data) + coll_acoll_data_t *data, coll_acoll_subcomms_t *subc) { int size, ret = 0, rank, line; - mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module; - coll_acoll_subcomms_t *subc; int cid = ompi_comm_get_local_cid(comm); - subc = &acoll_module->subc[cid]; if (subc->initialized_data) { return ret; } - subc->cid = cid; data = (coll_acoll_data_t *) malloc(sizeof(coll_acoll_data_t)); if (NULL == data) { line = __LINE__; @@ -630,9 +715,6 @@ static inline int coll_acoll_init(mca_coll_base_module_t *module, ompi_communica /* temporary variables */ int tmp1, tmp2, tmp3 = 0; - - - comm_grp_ranks_local(comm, subc->numa_comm, &tmp1, &tmp2, &data->l1_gp, tmp3); data->l1_gp_size = ompi_comm_size(subc->numa_comm); data->l1_local_rank = ompi_comm_rank(subc->numa_comm);