Skip to content

Commit

Permalink
UCP/RMA/FLUSH: Handle fence during one-sided operations
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-shalev committed Feb 10, 2025
1 parent 7eaaafc commit 9ce878a
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/ucp/core/ucp_ep.inl
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,25 @@ ucp_ep_mark_flushed(ucp_ep_h ep)
ep->ext->flush_state.unflushed_lanes = 0;
}

static UCS_F_ALWAYS_INLINE int
ucp_ep_is_strong_fence(ucp_ep_h ep)
{
/* Strong fence is required if there is more than one unflushed lane */
return ep->ext->flush_state.unflushed_lanes &
(ep->ext->flush_state.unflushed_lanes - 1);
}

static UCS_F_ALWAYS_INLINE int
ucp_ep_is_fence_required(ucp_ep_h ep)
{
return (ep->ext->flush_state.fence_seq < ep->worker->fence_seq) &&
(ep->worker->context->config.ext.fence_mode == UCP_FENCE_MODE_EP_BASED);
}

static UCS_F_ALWAYS_INLINE void
ucp_ep_update_fence_seq(ucp_ep_h ep)
{
ep->ext->flush_state.fence_seq = ep->worker->fence_seq;
}

#endif
9 changes: 9 additions & 0 deletions src/ucp/rma/amo_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_atomic_op_nbx,
if (context->config.ext.proto_enable) {
ucp_amo_init_proto(req, ucp_uct_atomic_op_table[opcode], remote_addr,
rkey);

if (ucp_ep_is_fence_required(ep)) {
status = ucp_ep_handle_fence(ep);
if (status != UCS_OK) {
status_p = UCS_STATUS_PTR(status);
goto out;
}
}

if (param->op_attr_mask & UCP_OP_ATTR_FIELD_REPLY_BUFFER) {
req->send.amo.reply_buffer = param->reply_buffer;
op_id = (opcode == UCP_ATOMIC_OP_CSWAP) ? UCP_OP_ID_AMO_CSWAP :
Expand Down
11 changes: 11 additions & 0 deletions src/ucp/rma/rma.inl
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,15 @@ ucp_proto_sw_rma_cfg_thresh(ucp_context_h context, size_t default_value)
default_value;
}

static UCS_F_ALWAYS_INLINE ucs_status_t ucp_ep_handle_fence(ucp_ep_h ep)
{
ucp_ep_update_fence_seq(ep);

if (ucp_ep_is_strong_fence(ep)) {
return ucp_ep_fence_strong(ep);
} else {
return ucp_ep_fence_weak(ep);
}
}

#endif
16 changes: 16 additions & 0 deletions src/ucp/rma/rma_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,14 @@ ucs_status_ptr_t ucp_put_nbx(ucp_ep_h ep, const void *buffer, size_t count,
contig_length = count;
}

if (ucp_ep_is_fence_required(ep)) {
status = ucp_ep_handle_fence(ep);
if (status != UCS_OK) {
ret = UCS_STATUS_PTR(status);
goto out_unlock;
}
}

ret = ucp_proto_request_send_op(
ep, &ucp_rkey_config(worker, rkey)->proto_select,
rkey->cfg_index, req, UCP_OP_ID_PUT, buffer, count, datatype,
Expand Down Expand Up @@ -397,6 +405,14 @@ ucs_status_ptr_t ucp_get_nbx(ucp_ep_h ep, void *buffer, size_t count,
contig_length = ucp_contig_dt_length(datatype, count);
}

if (ucp_ep_is_fence_required(ep)) {
status = ucp_ep_handle_fence(ep);
if (status != UCS_OK) {
ret = UCS_STATUS_PTR(status);
goto out_unlock;
}
}

ret = ucp_proto_request_send_op(
ep, &ucp_rkey_config(worker, rkey)->proto_select,
rkey->cfg_index, req, UCP_OP_ID_GET, buffer, count, datatype,
Expand Down

0 comments on commit 9ce878a

Please sign in to comment.