Skip to content

Commit

Permalink
Merge pull request #50 from JKLiang9714/huawei-dev-ljk
Browse files Browse the repository at this point in the history
add ucg more features
  • Loading branch information
ChenQiangFYQ authored Sep 24, 2021
2 parents 4f85705 + 01731cd commit 8f386ff
Show file tree
Hide file tree
Showing 31 changed files with 6,813 additions and 1,529 deletions.
4 changes: 3 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ if HAVE_UCG

SUBDIRS = base builtin hicoll

UCG_VERSION=2:0:1

lib_LTLIBRARIES = libucg.la
libucg_la_CFLAGS = $(BASE_CFLAGS)
libucg_la_CPPFLAGS = $(BASE_CPPFLAGS)
libucg_la_LDFLAGS = -ldl -version-info $(SOVERSION)
libucg_la_LDFLAGS = -ldl -version-info $(UCG_VERSION)
libucg_ladir = $(includedir)/ucg
libucg_la_SOURCES =
libucg_la_LIBADD = \
Expand Down
126 changes: 95 additions & 31 deletions api/ucg.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
* Copyright (c) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* Description: UCG common header file
*/

#ifndef UCG_H_
Expand Down Expand Up @@ -78,6 +78,7 @@ enum ucg_collective_modifiers {

UCG_GROUP_COLLECTIVE_MODIFIER_ALLTOALL = UCS_BIT(13), /* MPI_ALLTOALL */
UCG_GROUP_COLLECTIVE_MODIFIER_ALLGATHER = UCS_BIT(14), /* MPI_ALLGATHER */
UCG_GROUP_COLLECTIVE_MODIFIER_ALLTOALLV = UCS_BIT(15), /* MPI_ALLTOALLV */

UCG_GROUP_COLLECTIVE_MODIFIER_MASK = UCS_MASK(16)
};
Expand All @@ -87,6 +88,18 @@ typedef struct ucg_collective_type {
ucg_group_member_index_t root :48;
} ucg_collective_type_t;

typedef enum {
COLL_TYPE_BARRIER,
COLL_TYPE_BCAST,
COLL_TYPE_ALLREDUCE,
COLL_TYPE_ALLTOALLV,
/*
* Only collective operations that already
* be supported should be added above.
*/
COLL_TYPE_NUMS
} coll_type_t;

enum UCS_S_PACKED ucg_group_member_distance {
UCG_GROUP_MEMBER_DISTANCE_SELF = 0,
UCG_GROUP_MEMBER_DISTANCE_L3CACHE,
Expand All @@ -105,33 +118,48 @@ enum UCS_S_PACKED ucg_group_hierarchy_level {
typedef int (*dt_convert_f)(void *dt_ext, ucp_datatype_t *ucp_datatype);
typedef ptrdiff_t (*dt_span_f)(void *dt_ext, int count, ptrdiff_t *gap);

typedef struct inc_params {
uint16_t comm_id; /* INC comm id */
uint8_t switch_info_got; /* indicates whether switch supports INC with under the current parameters */
uint8_t feature_used; /* indicates whether the current collective operation is supported */
uint32_t spine_select; /* selected spine ip in 2-layer networking */
uint8_t coll_operation_type; /* supported collective operation */
uint16_t data_operation_type; /* supported allreduce operation type */
uint16_t data_type; /* supported collective data type */
uint16_t max_data_size; /* max data size in INC without padding */
int node_under_tor; /* node/socket num under the tor */
unsigned header_under_tor; /* for now, the minimum rank under the tor */
uint8_t req_id; /* Indicates the Nth collective operation in INC, 1-255, must be continuous increment */
/* rank id in MPI_COMM_WORLD, uniquely identify a task and communication with job_id, comm_id, cid */
int world_rank;
unsigned ppn;
} inc_params_t;

typedef enum ucg_group_member_distance (*rank_dist_f)(void *comm, int rank1, int rank2);

typedef struct {
uint16_t ppn_local; /* number of processes on my node */
uint16_t pps_local; /* number of processes on my socket */
uint16_t ppn_max; /* max number of processes on all nodes */
uint16_t node_nums;
uint16_t ppn_unbalance : 1;
uint16_t pps_unbalance : 1;
uint16_t nrank_uncontinue : 1;
uint16_t srank_uncontinue : 1;
uint16_t bind_to_none : 1;
uint16_t reserved : 7;
uint16_t rank_continuous_in_node : 1;
uint16_t rank_continuous_in_sock : 1;
uint16_t rank_balance_in_node : 1;
uint16_t rank_balance_in_sock : 1;
} ucg_topo_args_t;

typedef struct ucg_group_params {
ucg_group_member_index_t member_count; /* number of group members */
ucg_group_member_index_t member_index; /* My member index within the group */
uint32_t cid; /* Assign value to group_id */

char **topo_map; /* Global topology map, topo_map[i][j] means Distance between rank i and rank j. */

/*
* This array contains information about the process placement of different
* group members, which is used to select the best topology for collectives.
*
*
* For example, for 2 nodes, 3 sockets each, 4 cores per socket, each member
* should be passed the distance array contents as follows:
* 1st group member distance array: 0111222222223333333333333333
* 2nd group member distance array: 1011222222223333333333333333
* 3rd group member distance array: 1101222222223333333333333333
* 4th group member distance array: 1110222222223333333333333333
* 5th group member distance array: 2222011122223333333333333333
* 6th group member distance array: 2222101122223333333333333333
* 7th group member distance array: 2222110122223333333333333333
* 8th group member distance array: 2222111022223333333333333333
* ...
* 12th group member distance array: 3333333333333333011122222222
* 13th group member distance array: 3333333333333333101122222222
* ...
*/
enum ucg_group_member_distance *distance;
ucg_topo_args_t topo_args;

/* node index */
uint16_t *node_index;
Expand All @@ -156,14 +184,19 @@ typedef struct ucg_group_params {

/* Callback function for get rank in MPI_COMM_WORLD */
ucg_group_member_index_t (*mpi_global_idx_f) (void *cb_group_obj, ucg_group_member_index_t index);

rank_dist_f mpi_rank_distance;
dt_span_f mpi_datatype_span;

int (*get_operate_param_f)(void *mpi_op, void *mpi_dt, int *op, int *dt);

/* INC params */
inc_params_t inc_param;
char is_socket_balance;
} ucg_group_params_t;

typedef struct ucg_collective {
ucg_collective_type_t type; /* the type (and root) of the collective */
ucg_hash_index_t plan_cache_index; /* the index of collective type in plan cache. */
coll_type_t coll_type;

struct {
void *buf; /* buffer location to use */
Expand Down Expand Up @@ -260,7 +293,13 @@ unsigned ucg_worker_progress(ucg_worker_h worker);
* @param [in] group Group object to query.
*/
const ucg_group_params_t* ucg_group_get_params(ucg_group_h group);

/**
* @ingroup UCG_GROUP
* @brief Get group member count.
*
* @param [in] group Group object to query.
*/
ucg_group_member_index_t ucg_group_get_member_count(ucg_group_h group);

/**
* @ingroup UCG_GROUP
Expand Down Expand Up @@ -292,7 +331,9 @@ ucs_status_t ucg_collective_create(ucg_group_h group,
* @return otherwise - Operation was scheduled for send and can be
* completed in any point in time. The request handle
* is returned to the application in order to track
* progress of the message.
* progress of the message. The application is
* responsible to release the handle using
* @ref ucg_request_free routine.
*/
ucs_status_ptr_t ucg_collective_start_nb(ucg_coll_h coll);

Expand Down Expand Up @@ -345,9 +386,27 @@ void ucg_collective_destroy(ucg_coll_h coll);
* @return Error code as defined by @ref ucs_status_t
*/
ucs_status_t ucg_request_check_status(void *request);

/**
* @ingroup UCG_GROUP
* @brief Cancel an outstanding communications request.
*
* @param [in] worker UCG worker.
* @param [in] request Non-blocking request to cancel.
*
* This routine tries to cancels an outstanding communication request.
*/
void ucg_request_cancel(ucg_worker_h worker, void *request);

/**
* @ingroup UCG_GROUP
* @brief Release a communications request.
*
* @param [in] request Non-blocking request to release.
*
* This routine releases the non-blocking request back to the library, regardless
* of its current state. Communications operations associated with this request
* will make progress internally, however no further notifications or callbacks
* will be invoked for this request.
*/
void ucg_request_free(void *request);


Expand All @@ -365,6 +424,11 @@ ucs_status_t ucg_worker_create(ucp_context_h context,
const ucp_worker_params_t *params,
ucp_worker_h *worker_p);

ucs_status_t ucg_collective_check_input(ucg_group_h group,
const ucg_collective_params_t *params,
const ucg_coll_h *coll);


END_C_DECLS

#endif
9 changes: 1 addition & 8 deletions api/ucg_def.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2020. ALL RIGHTS RESERVED.
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2021. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
*/

Expand Down Expand Up @@ -66,12 +66,5 @@ typedef uint64_t ucg_group_member_index_t;
*/
typedef void (*ucg_collective_callback_t)(void *request, ucs_status_t status);

/**
* @ingroup ucg_collective
* @brief Hash index for each hash table.
*
* This type is used as index of hash array.
*/
typedef uint32_t ucg_hash_index_t;

#endif
34 changes: 21 additions & 13 deletions api/ucg_mpi.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* Description: Init function for MPI collective operations
*/

#ifndef UCG_MPI_H_
Expand All @@ -24,6 +24,7 @@ enum ucg_predefined {
UCG_PRIMITIVE_SCATTER,
UCG_PRIMITIVE_ALLREDUCE,
UCG_PRIMITIVE_ALLTOALL,
UCG_PRIMITIVE_ALLTOALLV,
UCG_PRIMITIVE_REDUCE_SCATTER,
UCG_PRIMITIVE_ALLGATHER,
UCG_PRIMITIVE_ALLGATHERV,
Expand All @@ -45,6 +46,8 @@ static enum ucg_collective_modifiers ucg_predefined_modifiers[] = {
[UCG_PRIMITIVE_ALLREDUCE] = UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE |
UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST,
[UCG_PRIMITIVE_ALLTOALL] = UCG_GROUP_COLLECTIVE_MODIFIER_ALLTOALL,
[UCG_PRIMITIVE_ALLTOALLV] = UCG_GROUP_COLLECTIVE_MODIFIER_ALLTOALLV |
UCG_GROUP_COLLECTIVE_MODIFIER_VARIABLE_LENGTH,
[UCG_PRIMITIVE_REDUCE_SCATTER] = UCG_GROUP_COLLECTIVE_MODIFIER_AGGREGATE |
UCG_GROUP_COLLECTIVE_MODIFIER_SINGLE_SOURCE,
[UCG_PRIMITIVE_ALLGATHER] = UCG_GROUP_COLLECTIVE_MODIFIER_BROADCAST |
Expand All @@ -58,11 +61,6 @@ static enum ucg_collective_modifiers ucg_predefined_modifiers[] = {
UCG_GROUP_COLLECTIVE_MODIFIER_VARIABLE_DATATYPE,
};

static ucg_hash_index_t UCS_F_ALWAYS_INLINE ucg_mpi_coll_hash(enum ucg_predefined mpi_coll_type)
{
return (ucg_hash_index_t)mpi_coll_type;
}

#define UCG_COLL_PARAMS_BUF_R(_buf, _count, _dt_len, _dt_ext) \
.buf = (_buf), \
.count = (_count), \
Expand Down Expand Up @@ -96,7 +94,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucg_coll_##_lname##_init(__VA_ARGS__, \
.modifiers = flags, \
.root = root, \
}, \
.plan_cache_index = ucg_mpi_coll_hash(UCG_PRIMITIVE_##_uname), \
.coll_type = COLL_TYPE_##_uname, \
.send = { \
UCG_COLL_PARAMS_BUF##_stype _sargs \
}, \
Expand Down Expand Up @@ -132,6 +130,14 @@ UCG_COLL_INIT_FUNC(_lname, _uname,
int *rcounts, size_t len_rdtype, \
void *mpi_rdtype, int *rdispls)

#define UCG_COLL_INIT_FUNC_SVN_RVN(_lname, _uname) \
UCG_COLL_INIT_FUNC(_lname, _uname, \
_V, ((char*)sbuf, scounts, len_sdtype, mpi_sdtype, sdispls), \
_V, (rbuf, rcounts, len_rdtype, mpi_rdtype, rdispls), \
const void *sbuf, int *scounts, size_t len_sdtype, \
void *mpi_sdtype, int *sdispls, void *rbuf, int *rcounts, \
size_t len_rdtype, void *mpi_rdtype, int *rdispls)

#define UCG_COLL_INIT_FUNC_SWN_RWN(_lname, _uname) \
UCG_COLL_INIT_FUNC(_lname, _uname, \
_W, ((char*)sbuf, scounts, len_sdtypes, mpi_sdtypes, sdispls), \
Expand All @@ -141,20 +147,22 @@ UCG_COLL_INIT_FUNC(_lname, _uname,
int *rcounts, size_t *len_rdtypes, void **mpi_rdtypes, \
int *rdispls)



UCG_COLL_INIT_FUNC_SR1_RR1(allreduce, ALLREDUCE)
UCG_COLL_INIT_FUNC_SR1_RR1(reduce, REDUCE)
UCG_COLL_INIT_FUNC_SR1_RR1(bcast, BCAST)
UCG_COLL_INIT_FUNC(barrier, BARRIER, _R, (0, 0, 0, 0), _R, (0, 0, 0, 0), int ign)
UCG_COLL_INIT_FUNC_SVN_RVN(alltoallv, ALLTOALLV)

#ifdef UCG_COLL_ALREADY_SUPPORTED
UCG_COLL_INIT_FUNC_SR1_RR1(reduce, REDUCE)
UCG_COLL_INIT_FUNC_SR1_RRN(gather, GATHER)
UCG_COLL_INIT_FUNC_SR1_RRN(scatter, SCATTER)
UCG_COLL_INIT_FUNC_SR1_RRN(allgather, ALLGATHER)
UCG_COLL_INIT_FUNC_SR1_RVN(allgatherv, ALLGATHERV)
UCG_COLL_INIT_FUNC_SR1_RRN(alltoall, ALLTOALL)
UCG_COLL_INIT_FUNC_SWN_RWN(alltoallw, ALLTOALLW)
UCG_COLL_INIT_FUNC_SWN_RWN(neighbor_alltoallw, NEIGHBOR_ALLTOALLW)
UCG_COLL_INIT_FUNC(barrier, BARRIER, _R, (0, 0, 0, 0), _R, (0, 0, 0, 0), int ign)
#endif /* UCG_COLL_ALREADY_SUPPORTED */

END_C_DECLS

#endif
#endif
32 changes: 9 additions & 23 deletions api/ucg_plan_component.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2020. ALL RIGHTS RESERVED.
* See file LICENSE for terms.
* Copyright (C) Huawei Technologies Co., Ltd. 2019-2021. All rights reserved.
* Description: UCG plan component
*/

#ifndef UCG_PLAN_COMPONENT_H_
Expand Down Expand Up @@ -101,10 +101,11 @@ typedef struct ucg_base_plan {
/* Plan lookup - caching mechanism */
ucg_collective_type_t type;
ucs_list_link_t op_head; /**< List of requests following this plan */

int op_cnt;
/* Plan progress */
ucg_plan_component_t *planner;
ucg_group_id_t group_id;
short up_offset; /* In allreduce-tree algo, my position in my father's reduce buffer */
ucg_group_member_index_t my_index;
ucg_group_h group;
ucs_mpool_t *am_mp;
Expand All @@ -113,14 +114,13 @@ typedef struct ucg_base_plan {
/* Attribute */
int support_non_commutative;
int support_large_datatype;
int is_noncontig_allreduce;
int is_ring_plan_topo_type;

} ucg_plan_t;

enum ucg_request_common_flags {
UCG_REQUEST_COMMON_FLAG_COMPLETED = UCS_BIT(0),

UCG_REQUEST_COMMON_FLAG_MASK = UCS_MASK(1)
UCG_REQUEST_COMMON_FLAG_INC_FAIL = UCS_BIT(1),
UCG_REQUEST_COMMON_FLAG_MASK = UCS_MASK(2)
};

typedef struct ucg_request {
Expand Down Expand Up @@ -164,10 +164,8 @@ struct ucg_plan_component {
unsigned (*progress)(ucg_group_h group);

/* plan a collective operation with this component */
ucs_status_t (*plan) (ucg_plan_component_t *plan_component,
const ucg_collective_type_t *coll_type,
const size_t msg_size,
ucg_group_h group,
ucs_status_t (*plan) (ucg_group_h group,
int algo_id,
ucg_collective_params_t *coll_params,
ucg_plan_t **plan_p);
/* Prepare an operation to follow the given plan */
Expand Down Expand Up @@ -268,18 +266,6 @@ ucs_status_t ucg_plan_select(ucg_group_h group, const char* planner_name,
/* Start pending operations after a barrier has been completed */
ucs_status_t ucg_collective_release_barrier(ucg_group_h group);

/* Check if the plan support non commutative operation. */
static inline int ucg_plan_support_non_commutative(ucg_plan_t *plan)
{
return plan->support_non_commutative;
}

/* Check if the plan support large datatype. */
static inline int ucg_plan_support_large_datatype(ucg_plan_t *plan)
{
return plan->support_large_datatype;
}

END_C_DECLS

#endif
Loading

0 comments on commit 8f386ff

Please sign in to comment.