From 9ce878a542adf7ba22a5d7c61e5a3f8e9b70f0b1 Mon Sep 17 00:00:00 2001 From: Michal Shalev Date: Sun, 9 Feb 2025 17:39:50 +0000 Subject: [PATCH] UCP/RMA/FLUSH: Handle fence during one-sided operations --- src/ucp/core/ucp_ep.inl | 21 +++++++++++++++++++++ src/ucp/rma/amo_send.c | 9 +++++++++ src/ucp/rma/rma.inl | 11 +++++++++++ src/ucp/rma/rma_send.c | 16 ++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/src/ucp/core/ucp_ep.inl b/src/ucp/core/ucp_ep.inl index 0e41a2f590ac..68e2bf4385bb 100644 --- a/src/ucp/core/ucp_ep.inl +++ b/src/ucp/core/ucp_ep.inl @@ -301,4 +301,25 @@ ucp_ep_mark_flushed(ucp_ep_h ep) ep->ext->flush_state.unflushed_lanes = 0; } +static UCS_F_ALWAYS_INLINE int +ucp_ep_is_strong_fence(ucp_ep_h ep) +{ + /* Strong fence is required if there is more than one unflushed lane */ + return ep->ext->flush_state.unflushed_lanes & + (ep->ext->flush_state.unflushed_lanes - 1); +} + +static UCS_F_ALWAYS_INLINE int +ucp_ep_is_fence_required(ucp_ep_h ep) +{ + return (ep->ext->flush_state.fence_seq < ep->worker->fence_seq) && + (ep->worker->context->config.ext.fence_mode == UCP_FENCE_MODE_EP_BASED); +} + +static UCS_F_ALWAYS_INLINE void +ucp_ep_update_fence_seq(ucp_ep_h ep) +{ + ep->ext->flush_state.fence_seq = ep->worker->fence_seq; +} + #endif diff --git a/src/ucp/rma/amo_send.c b/src/ucp/rma/amo_send.c index 5d6aafb8823f..3dc468930912 100644 --- a/src/ucp/rma/amo_send.c +++ b/src/ucp/rma/amo_send.c @@ -215,6 +215,15 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_atomic_op_nbx, if (context->config.ext.proto_enable) { ucp_amo_init_proto(req, ucp_uct_atomic_op_table[opcode], remote_addr, rkey); + + if (ucp_ep_is_fence_required(ep)) { + status = ucp_ep_handle_fence(ep); + if (status != UCS_OK) { + status_p = UCS_STATUS_PTR(status); + goto out; + } + } + if (param->op_attr_mask & UCP_OP_ATTR_FIELD_REPLY_BUFFER) { req->send.amo.reply_buffer = param->reply_buffer; op_id = (opcode == UCP_ATOMIC_OP_CSWAP) ? UCP_OP_ID_AMO_CSWAP : diff --git a/src/ucp/rma/rma.inl b/src/ucp/rma/rma.inl index 7e8d5482dbe7..9051a5fa8e84 100644 --- a/src/ucp/rma/rma.inl +++ b/src/ucp/rma/rma.inl @@ -144,4 +144,15 @@ ucp_proto_sw_rma_cfg_thresh(ucp_context_h context, size_t default_value) default_value; } +static UCS_F_ALWAYS_INLINE ucs_status_t ucp_ep_handle_fence(ucp_ep_h ep) +{ + ucp_ep_update_fence_seq(ep); + + if (ucp_ep_is_strong_fence(ep)) { + return ucp_ep_fence_strong(ep); + } else { + return ucp_ep_fence_weak(ep); + } +} + #endif diff --git a/src/ucp/rma/rma_send.c b/src/ucp/rma/rma_send.c index 56bf7b3d84b7..dad5d6834198 100644 --- a/src/ucp/rma/rma_send.c +++ b/src/ucp/rma/rma_send.c @@ -291,6 +291,14 @@ ucs_status_ptr_t ucp_put_nbx(ucp_ep_h ep, const void *buffer, size_t count, contig_length = count; } + if (ucp_ep_is_fence_required(ep)) { + status = ucp_ep_handle_fence(ep); + if (status != UCS_OK) { + ret = UCS_STATUS_PTR(status); + goto out_unlock; + } + } + ret = ucp_proto_request_send_op( ep, &ucp_rkey_config(worker, rkey)->proto_select, rkey->cfg_index, req, UCP_OP_ID_PUT, buffer, count, datatype, @@ -397,6 +405,14 @@ ucs_status_ptr_t ucp_get_nbx(ucp_ep_h ep, void *buffer, size_t count, contig_length = ucp_contig_dt_length(datatype, count); } + if (ucp_ep_is_fence_required(ep)) { + status = ucp_ep_handle_fence(ep); + if (status != UCS_OK) { + ret = UCS_STATUS_PTR(status); + goto out_unlock; + } + } + ret = ucp_proto_request_send_op( ep, &ucp_rkey_config(worker, rkey)->proto_select, rkey->cfg_index, req, UCP_OP_ID_GET, buffer, count, datatype,