-
Notifications
You must be signed in to change notification settings - Fork 441
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UCP/RMA/FLUSH: Dynamic Selection of Strong vs. Weak Fence #10474
base: master
Are you sure you want to change the base?
Changes from all commits
7d123db
74f5e73
8d7bf47
4b5fbc7
c8d8e9f
57e8752
a33f74d
2d459bd
6160121
c306d1f
d139b75
22aef72
471cfbe
0101b84
df2b83f
74d5cdb
09eee92
9126795
c28dd5a
1cfc7d8
1b09153
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
#include <ucs/profile/profile.h> | ||
#include <ucp/dt/datatype_iter.inl> | ||
#include <ucp/proto/proto_init.h> | ||
#include <ucp/proto/proto_single.h> | ||
#include <ucp/proto/proto_single.inl> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can remove the include of proto_single.h There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
|
||
static size_t ucp_amo_sw_pack(void *dest, ucp_request_t *req, int fetch, | ||
|
@@ -393,6 +393,7 @@ ucp_proto_amo_sw_progress(uct_pending_req_t *self, uct_pack_callback_t pack_cb, | |
return status; | ||
} | ||
|
||
ucp_proto_single_rma_init_func(req); | ||
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -378,7 +378,8 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags, | |
const ucp_request_param_t *param, | ||
ucp_request_t *worker_req, | ||
ucp_request_callback_t flushed_cb, | ||
const char *debug_name) | ||
const char *debug_name, | ||
unsigned uct_flags) | ||
{ | ||
ucs_status_t status; | ||
ucp_request_t *req; | ||
|
@@ -400,9 +401,7 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags, | |
req->status = UCS_OK; | ||
req->send.ep = ep; | ||
req->send.flushed_cb = flushed_cb; | ||
req->send.flush.uct_flags = (worker_req != NULL) ? | ||
worker_req->flush_worker.uct_flags : | ||
UCT_FLUSH_FLAG_LOCAL; | ||
req->send.flush.uct_flags = uct_flags; | ||
req->send.flush.sw_started = 0; | ||
req->send.flush.sw_done = 0; | ||
req->send.flush.num_lanes = ucp_ep_num_lanes(ep); | ||
|
@@ -453,7 +452,8 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nbx, (ep, param), | |
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); | ||
|
||
request = ucp_ep_flush_internal(ep, 0, param, NULL, | ||
ucp_ep_flushed_callback, "flush_nbx"); | ||
ucp_ep_flushed_callback, "flush_nbx", | ||
UCT_FLUSH_FLAG_LOCAL); | ||
|
||
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); | ||
|
||
|
@@ -590,7 +590,8 @@ static unsigned ucp_worker_flush_progress(void *arg) | |
ep_flush_request = ucp_ep_flush_internal(ep, UCP_REQUEST_FLAG_RELEASED, | ||
&ucp_request_null_param, req, | ||
ucp_worker_flush_ep_flushed_cb, | ||
"flush_worker"); | ||
"flush_worker", | ||
req->flush_worker.uct_flags); | ||
if (UCS_PTR_IS_ERR(ep_flush_request)) { | ||
/* endpoint flush resulted in an error */ | ||
status = UCS_PTR_STATUS(ep_flush_request); | ||
|
@@ -695,7 +696,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep) | |
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker); | ||
|
||
request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL, | ||
ucp_ep_flushed_callback, "flush"); | ||
ucp_ep_flushed_callback, "flush", | ||
UCT_FLUSH_FLAG_LOCAL); | ||
status = ucp_flush_wait(ep->worker, request); | ||
|
||
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker); | ||
|
@@ -733,17 +735,62 @@ static ucs_status_t ucp_worker_fence_strong(ucp_worker_h worker) | |
|
||
UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker) | ||
{ | ||
ucs_status_t status; | ||
ucs_status_t status = UCS_OK; | ||
|
||
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); | ||
|
||
if (worker->context->config.worker_strong_fence) { | ||
/* force using flush on EPs */ | ||
switch (worker->context->config.worker_fence_mode) { | ||
case UCP_FENCE_MODE_EP_BASED: | ||
worker->fence_seq++; | ||
break; | ||
case UCP_FENCE_MODE_STRONG: | ||
status = ucp_worker_fence_strong(worker); | ||
} else { | ||
break; | ||
case UCP_FENCE_MODE_WEAK: | ||
status = ucp_worker_fence_weak(worker); | ||
break; | ||
default: | ||
ucs_error("invalid fence mode %d", | ||
worker->context->config.worker_fence_mode); | ||
status = UCS_ERR_INVALID_PARAM; | ||
} | ||
|
||
UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker); | ||
return status; | ||
} | ||
|
||
ucs_status_t ucp_ep_fence_weak(ucp_ep_h ep) | ||
{ | ||
ucp_lane_index_t lane; | ||
|
||
/* TODO: Handle unflushed_lanes == 0 before reaching this function | ||
* as part of optimizing flush */ | ||
ucs_assertv(ep->ext->unflushed_lanes != 0, | ||
"ep=%p unexpected unflushed_lanes=0x%" PRIx64, ep, | ||
ep->ext->unflushed_lanes); | ||
|
||
ucs_assertv(ucs_is_pow2(ep->ext->unflushed_lanes), | ||
"ep=%p unexpected unflushed_lanes=0x%" PRIx64, ep, | ||
ep->ext->unflushed_lanes); | ||
|
||
lane = ucs_ffs64_safe(ep->ext->unflushed_lanes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe assert that unflushed_lanes is nonzero and power of 2 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here: For now, I’m adding an assertion and noting the planned change. |
||
return uct_ep_fence(ucp_ep_get_lane(ep, lane), 0); | ||
} | ||
|
||
ucs_status_t ucp_ep_fence_strong(ucp_ep_h ep) | ||
{ | ||
ucs_status_t status; | ||
void *request; | ||
|
||
request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL, | ||
ucp_ep_flushed_callback, "ep_fence_strong", | ||
UCT_FLUSH_FLAG_REMOTE); | ||
status = ucp_flush_wait(ep->worker, request); | ||
if (status != UCS_OK) { | ||
return status; | ||
} | ||
|
||
ep->ext->unflushed_lanes = 0; | ||
ep->ext->fence_seq = ep->worker->fence_seq; | ||
return UCS_OK; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can't put this into
flush_state
which union withep_match
?this is because we use RMA operations before
ucp_ep_flush_state_reset
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly