Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCP/RMA/FLUSH: Dynamic Selection of Strong vs. Weak Fence #10474

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7d123db
UCP/RMA/FLUSH: Add unflushed_lanes to ucp_ep
michal-shalev Jan 22, 2025
74f5e73
UCP/RMA/FLUSH: Add sequence numbers to EP and worker structs
michal-shalev Feb 9, 2025
8d7bf47
UCP/RMA/FLUSH: Delete auto fence mode and change default to strong
michal-shalev Feb 9, 2025
4b5fbc7
UCP/RMA/FLUSH: Add EP-based fence mode
michal-shalev Feb 9, 2025
c8d8e9f
UCP/RMA/FLUSH: Implement per-EP weak and strong fences
michal-shalev Feb 9, 2025
57e8752
UCP/RMA/FLUSH: Handle fence during one-sided operations
michal-shalev Feb 9, 2025
a33f74d
UCP/RMA/FLUSH: Move unflushed_lanes from the union
michal-shalev Feb 19, 2025
2d459bd
UCP/RMA/FLUSH: Move fence_seq from the union
michal-shalev Feb 19, 2025
6160121
UCP/RMA/FLUSH: Add correctness test
michal-shalev Feb 23, 2025
c306d1f
UCP/RMA/FLUSH: PR fixes
michal-shalev Mar 2, 2025
d139b75
UCP/RMA/FLUSH: PR fixes 2.0
michal-shalev Mar 3, 2025
22aef72
UCP/RMA/FLUSH: PR fixes 3.0
michal-shalev Mar 4, 2025
471cfbe
UCP/RMA/FLUSH: PR fixes 4.0
michal-shalev Mar 4, 2025
0101b84
UCP/RMA/FLUSH: PR fixes 5.0
michal-shalev Mar 5, 2025
df2b83f
UCP/RMA/FLUSH: PR fixes 6.0
michal-shalev Mar 5, 2025
74d5cdb
UCP/RMA/FLUSH: PR fixes 7.0
michal-shalev Mar 5, 2025
09eee92
UCP/RMA/FLUSH: PR fixes 8.0
michal-shalev Mar 6, 2025
9126795
UCP/RMA/FLUSH: PR fixes 9.0
michal-shalev Mar 9, 2025
c28dd5a
UCP/RMA/FLUSH: PR fixes 10.0
michal-shalev Mar 9, 2025
1cfc7d8
UCP/RMA/FLUSH: PR fixes 11.0
michal-shalev Mar 10, 2025
1b09153
UCP/RMA/FLUSH: PR fixes 12.0
michal-shalev Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ static const char *ucp_atomic_modes[] = {
};

static const char *ucp_fence_modes[] = {
[UCP_FENCE_MODE_WEAK] = "weak",
[UCP_FENCE_MODE_STRONG] = "strong",
[UCP_FENCE_MODE_AUTO] = "auto",
[UCP_FENCE_MODE_LAST] = NULL
[UCP_FENCE_MODE_WEAK] = "weak",
[UCP_FENCE_MODE_STRONG] = "strong",
[UCP_FENCE_MODE_AUTO] = "auto",
[UCP_FENCE_MODE_EP_BASED] = "ep_based",
[UCP_FENCE_MODE_LAST] = NULL
};

static const char *ucp_rndv_modes[] = {
Expand Down Expand Up @@ -380,9 +381,10 @@ static ucs_config_field_t ucp_context_config_table[] = {

{"FENCE_MODE", "auto",
"Fence mode used in ucp_worker_fence routine.\n"
" weak - use weak fence mode.\n"
" strong - use strong fence mode.\n"
" auto - automatically detect required fence mode.",
" weak - use weak fence mode.\n"
" strong - use strong fence mode.\n"
" auto - automatically detect fence mode.\n"
" ep_based - use endpoint-based fence mode.",
ucs_offsetof(ucp_context_config_t, fence_mode),
UCS_CONFIG_TYPE_ENUM(ucp_fence_modes)},

Expand Down Expand Up @@ -2224,11 +2226,19 @@ static ucs_status_t ucp_fill_config(ucp_context_h context,
memcpy(context->config.am_mpools.sizes, config->mpool_sizes.memunits,
config->mpool_sizes.count * sizeof(size_t));

context->config.worker_strong_fence =
(context->config.ext.fence_mode == UCP_FENCE_MODE_STRONG) ||
((context->config.ext.fence_mode == UCP_FENCE_MODE_AUTO) &&
((context->config.ext.max_rma_lanes > 1) ||
context->config.ext.proto_enable));
if ((context->config.ext.fence_mode == UCP_FENCE_MODE_EP_BASED) &&
!context->config.ext.proto_enable) {
ucs_error("UCX_FENCE_MODE=ep_based requires UCX_PROTO_ENABLE=y");
status = UCS_ERR_INVALID_PARAM;
goto err_free_key_list;
} else if (context->config.ext.fence_mode == UCP_FENCE_MODE_AUTO) {
context->config.worker_fence_mode =
((context->config.ext.max_rma_lanes > 1) ||
(context->config.ext.proto_enable)) ?
UCP_FENCE_MODE_STRONG : UCP_FENCE_MODE_WEAK;
} else {
context->config.worker_fence_mode = context->config.ext.fence_mode;
}

context->config.progress_wrapper_enabled =
ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_REQ) ||
Expand Down
2 changes: 1 addition & 1 deletion src/ucp/core/ucp_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ typedef struct ucp_context {
char *env_prefix;

/* worker_fence implementation method */
unsigned worker_strong_fence;
ucp_fence_mode_t worker_fence_mode;

/* Progress wrapper enabled */
int progress_wrapper_enabled;
Expand Down
5 changes: 4 additions & 1 deletion src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ static ucp_ep_h ucp_ep_allocate(ucp_worker_h worker, const char *peer_name)
ep->ext->ka_last_round = 0;
#endif
ep->ext->peer_mem = NULL;
ep->ext->unflushed_lanes = 0;
ep->ext->fence_seq = 0;
ep->ext->uct_eps = NULL;

UCS_STATIC_ASSERT(sizeof(ep->ext->ep_match) >=
Expand Down Expand Up @@ -1765,7 +1767,8 @@ ucs_status_ptr_t ucp_ep_close_nbx(ucp_ep_h ep, const ucp_request_param_t *param)
ucp_ep_disconnected(ep, 1);
} else {
request = ucp_ep_flush_internal(ep, 0, param, NULL,
ucp_ep_close_flushed_callback, "close");
ucp_ep_close_flushed_callback, "close",
UCT_FLUSH_FLAG_LOCAL);
if (!UCS_PTR_IS_PTR(request)) {
if (ucp_ep_is_cm_local_connected(ep)) {
/* lanes already flushed, start disconnect on CM lane */
Expand Down
17 changes: 16 additions & 1 deletion src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ typedef uint16_t ucp_ep_flags_t;
#define ucp_ep_refcount_assert(_ep, _type_refcount, _cmp, _val) \
ucp_ep_refcount_field_assert(_ep, refcounts._type_refcount, _cmp, _val)

#define ucp_ep_check_fence(_ep) \
({ \
/* Apply a fence if EP's sequence is behind worker's */ \
ucs_unlikely((_ep)->ext->fence_seq < (_ep)->worker->fence_seq) ? \
/* Strong fence if unflushed operations exist on multiple lanes */ \
(ucs_likely(ucs_is_pow2_or_zero((_ep)->ext->unflushed_lanes)) ? \
ucp_ep_fence_weak((_ep)) : \
ucp_ep_fence_strong((_ep))) : \
UCS_OK; \
})

#define UCP_SA_DATA_HEADER_VERSION_SHIFT 5

Expand Down Expand Up @@ -545,6 +555,10 @@ typedef struct ucp_ep_ext {
arrived before the first one */
} am;

ucp_lane_map_t unflushed_lanes; /* Bitmap of lanes which have unflushed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't put this into flush_state which union with ep_match ?
this is because we use RMA operations before ucp_ep_flush_state_reset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly

operations */
uint64_t fence_seq; /* Sequence number for fence detection */

/**
* UCT endpoints for every slow-path lane that has no room in the base endpoint
* structure. TODO allocate this array dynamically.
Expand Down Expand Up @@ -712,7 +726,8 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const ucp_request_param_t *param,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
const char *debug_name);
const char *debug_name,
unsigned uct_flags);

void ucp_ep_config_key_set_err_mode(ucp_ep_config_key_t *key,
unsigned ep_init_flags);
Expand Down
7 changes: 4 additions & 3 deletions src/ucp/core/ucp_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,10 @@ typedef enum {
* Fence mode.
*/
typedef enum {
UCP_FENCE_MODE_WEAK, /* Use weak fence mode */
UCP_FENCE_MODE_STRONG, /* Use strong fence mode */
UCP_FENCE_MODE_AUTO, /* Automatically detect fence mode */
UCP_FENCE_MODE_WEAK, /* Use weak fence mode */
UCP_FENCE_MODE_STRONG, /* Use strong fence mode */
UCP_FENCE_MODE_AUTO, /* Automatically detect fence mode */
UCP_FENCE_MODE_EP_BASED, /* Use EP-based fence mode */
UCP_FENCE_MODE_LAST
} ucp_fence_mode_t;

Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ ucs_status_t ucp_worker_create(ucp_context_h context,
worker->context = context;
worker->uuid = ucs_generate_uuid((uintptr_t)worker);
worker->flush_ops_count = 0;
worker->fence_seq = 0;
worker->inprogress = 0;
worker->rkey_config_count = 0;
worker->num_active_ifaces = 0;
Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ typedef struct ucp_worker {
char address_name[UCP_WORKER_ADDRESS_NAME_MAX];

unsigned flush_ops_count; /* Number of pending operations */
uint64_t fence_seq; /* Sequence number of the last fence */

int event_fd; /* Allocated (on-demand) event fd for wakeup */
ucs_sys_event_set_t *event_set; /* Allocated UCS event set for wakeup */
Expand Down
8 changes: 8 additions & 0 deletions src/ucp/proto/proto_multi.inl
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,12 @@ ucp_proto_am_zcopy_multi_common_send_func(
&req->send.state.uct_comp);
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_multi_rma_init_func(ucp_request_t *req)
{
const ucp_proto_multi_priv_t *mpriv = req->send.proto_config->priv;

req->send.ep->ext->unflushed_lanes |= mpriv->lane_map;
}

#endif
8 changes: 8 additions & 0 deletions src/ucp/proto/proto_single.inl
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,12 @@ ucp_proto_zcopy_single_progress(ucp_request_t *req, unsigned uct_mem_flags,
spriv->super.lane, status);
}

static UCS_F_ALWAYS_INLINE void
ucp_proto_single_rma_init_func(ucp_request_t *req)
{
const ucp_proto_single_priv_t *spriv = req->send.proto_config->priv;

req->send.ep->ext->unflushed_lanes |= UCS_BIT(spriv->super.lane);
}

#endif
1 change: 1 addition & 0 deletions src/ucp/rma/amo_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ucp_proto_amo_progress(uct_pending_req_t *self, ucp_operation_id_t op_id,
ucp_amo_request_reply_mem_type(req), op_size);
}

ucp_proto_single_rma_init_func(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}

Expand Down
7 changes: 7 additions & 0 deletions src/ucp/rma/amo_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_atomic_op_nbx,
if (context->config.ext.proto_enable) {
ucp_amo_init_proto(req, ucp_uct_atomic_op_table[opcode], remote_addr,
rkey);

status = ucp_ep_check_fence(ep);
if (status != UCS_OK) {
status_p = UCS_STATUS_PTR(status);
goto out;
}

if (param->op_attr_mask & UCP_OP_ATTR_FIELD_REPLY_BUFFER) {
req->send.amo.reply_buffer = param->reply_buffer;
op_id = (opcode == UCP_ATOMIC_OP_CSWAP) ? UCP_OP_ID_AMO_CSWAP :
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/amo_sw.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <ucs/profile/profile.h>
#include <ucp/dt/datatype_iter.inl>
#include <ucp/proto/proto_init.h>
#include <ucp/proto/proto_single.h>
#include <ucp/proto/proto_single.inl>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove the include of proto_single.h

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done



static size_t ucp_amo_sw_pack(void *dest, ucp_request_t *req, int fetch,
Expand Down Expand Up @@ -393,6 +393,7 @@ ucp_proto_amo_sw_progress(uct_pending_req_t *self, uct_pack_callback_t pack_cb,
return status;
}

ucp_proto_single_rma_init_func(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}

Expand Down
69 changes: 58 additions & 11 deletions src/ucp/rma/flush.c
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
const ucp_request_param_t *param,
ucp_request_t *worker_req,
ucp_request_callback_t flushed_cb,
const char *debug_name)
const char *debug_name,
unsigned uct_flags)
{
ucs_status_t status;
ucp_request_t *req;
Expand All @@ -400,9 +401,7 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
req->status = UCS_OK;
req->send.ep = ep;
req->send.flushed_cb = flushed_cb;
req->send.flush.uct_flags = (worker_req != NULL) ?
worker_req->flush_worker.uct_flags :
UCT_FLUSH_FLAG_LOCAL;
req->send.flush.uct_flags = uct_flags;
req->send.flush.sw_started = 0;
req->send.flush.sw_done = 0;
req->send.flush.num_lanes = ucp_ep_num_lanes(ep);
Expand Down Expand Up @@ -453,7 +452,8 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_ep_flush_nbx, (ep, param),
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

request = ucp_ep_flush_internal(ep, 0, param, NULL,
ucp_ep_flushed_callback, "flush_nbx");
ucp_ep_flushed_callback, "flush_nbx",
UCT_FLUSH_FLAG_LOCAL);

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);

Expand Down Expand Up @@ -590,7 +590,8 @@ static unsigned ucp_worker_flush_progress(void *arg)
ep_flush_request = ucp_ep_flush_internal(ep, UCP_REQUEST_FLAG_RELEASED,
&ucp_request_null_param, req,
ucp_worker_flush_ep_flushed_cb,
"flush_worker");
"flush_worker",
req->flush_worker.uct_flags);
if (UCS_PTR_IS_ERR(ep_flush_request)) {
/* endpoint flush resulted in an error */
status = UCS_PTR_STATUS(ep_flush_request);
Expand Down Expand Up @@ -695,7 +696,8 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_ep_flush, (ep), ucp_ep_h ep)
UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(ep->worker);

request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL,
ucp_ep_flushed_callback, "flush");
ucp_ep_flushed_callback, "flush",
UCT_FLUSH_FLAG_LOCAL);
status = ucp_flush_wait(ep->worker, request);

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(ep->worker);
Expand Down Expand Up @@ -733,17 +735,62 @@ static ucs_status_t ucp_worker_fence_strong(ucp_worker_h worker)

UCS_PROFILE_FUNC(ucs_status_t, ucp_worker_fence, (worker), ucp_worker_h worker)
{
ucs_status_t status;
ucs_status_t status = UCS_OK;

UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);

if (worker->context->config.worker_strong_fence) {
/* force using flush on EPs */
switch (worker->context->config.worker_fence_mode) {
case UCP_FENCE_MODE_EP_BASED:
worker->fence_seq++;
break;
case UCP_FENCE_MODE_STRONG:
status = ucp_worker_fence_strong(worker);
} else {
break;
case UCP_FENCE_MODE_WEAK:
status = ucp_worker_fence_weak(worker);
break;
default:
ucs_error("invalid fence mode %d",
worker->context->config.worker_fence_mode);
status = UCS_ERR_INVALID_PARAM;
}

UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
return status;
}

ucs_status_t ucp_ep_fence_weak(ucp_ep_h ep)
{
ucp_lane_index_t lane;

/* TODO: Handle unflushed_lanes == 0 before reaching this function
* as part of optimizing flush */
ucs_assertv(ep->ext->unflushed_lanes != 0,
"ep=%p unexpected unflushed_lanes=0x%" PRIx64, ep,
ep->ext->unflushed_lanes);

ucs_assertv(ucs_is_pow2(ep->ext->unflushed_lanes),
"ep=%p unexpected unflushed_lanes=0x%" PRIx64, ep,
ep->ext->unflushed_lanes);

lane = ucs_ffs64_safe(ep->ext->unflushed_lanes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe assert that unflushed_lanes is nonzero and power of 2
BTW seems we can get here when unflushed_lanes is 0, is this expeted?

Copy link
Contributor Author

@michal-shalev michal-shalev Mar 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here:
image
If unflushed_lanes is 0, it reaches ucp_ep_fence_weak, which is currently unexpected since one-sided operations handle fencing and set a lane. This can only happen if unflushed_lanes is reset in flush APIs, planned for a future PR.

For now, I’m adding an assertion and noting the planned change.

return uct_ep_fence(ucp_ep_get_lane(ep, lane), 0);
}

ucs_status_t ucp_ep_fence_strong(ucp_ep_h ep)
{
ucs_status_t status;
void *request;

request = ucp_ep_flush_internal(ep, 0, &ucp_request_null_param, NULL,
ucp_ep_flushed_callback, "ep_fence_strong",
UCT_FLUSH_FLAG_REMOTE);
status = ucp_flush_wait(ep->worker, request);
if (status != UCS_OK) {
return status;
}

ep->ext->unflushed_lanes = 0;
ep->ext->fence_seq = ep->worker->fence_seq;
return UCS_OK;
}
1 change: 1 addition & 0 deletions src/ucp/rma/get_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ static ucs_status_t ucp_proto_get_am_bcopy_progress(uct_pending_req_t *self)
req->send.length = req->send.state.dt_iter.length;
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
ucp_send_request_id_alloc(req);
ucp_proto_single_rma_init_func(req);
}

ucp_worker_flush_ops_count_add(worker, +1);
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/get_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ static ucs_status_t ucp_proto_get_offload_bcopy_progress(uct_pending_req_t *self

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
ucp_proto_multi_request_init(req);
ucp_proto_multi_rma_init_func(req);
ucp_proto_completion_init(&req->send.state.uct_comp,
ucp_proto_get_offload_bcopy_completion);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
Expand Down Expand Up @@ -170,7 +171,7 @@ static ucs_status_t ucp_proto_get_offload_zcopy_progress(uct_pending_req_t *self

/* coverity[tainted_data_downcast] */
return ucp_proto_multi_zcopy_progress(
req, req->send.proto_config->priv, NULL,
req, req->send.proto_config->priv, ucp_proto_multi_rma_init_func,
UCT_MD_MEM_ACCESS_LOCAL_WRITE, UCP_DT_MASK_CONTIG_IOV,
ucp_proto_get_offload_zcopy_send_func,
ucp_request_invoke_uct_completion_success,
Expand Down
1 change: 1 addition & 0 deletions src/ucp/rma/put_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ static ucs_status_t ucp_proto_put_am_bcopy_progress(uct_pending_req_t *self)
}

ucp_proto_multi_request_init(req);
ucp_proto_multi_rma_init_func(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}

Expand Down
5 changes: 4 additions & 1 deletion src/ucp/rma/put_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ static ucs_status_t ucp_proto_put_offload_short_progress(uct_pending_req_t *self
ucs_status_t status;
uct_rkey_t tl_rkey;

ucp_proto_single_rma_init_func(req);

tl_rkey = ucp_rkey_get_tl_rkey(req->send.rma.rkey, spriv->super.rkey_index);
status = uct_ep_put_short(ucp_ep_get_fast_lane(ep, spriv->super.lane),
req->send.state.dt_iter.type.contig.buffer,
Expand Down Expand Up @@ -133,6 +135,7 @@ static ucs_status_t ucp_proto_put_offload_bcopy_progress(uct_pending_req_t *self

if (!(req->flags & UCP_REQUEST_FLAG_PROTO_INITIALIZED)) {
ucp_proto_multi_request_init(req);
ucp_proto_multi_rma_init_func(req);
req->flags |= UCP_REQUEST_FLAG_PROTO_INITIALIZED;
}

Expand Down Expand Up @@ -224,7 +227,7 @@ ucp_proto_put_offload_zcopy_progress(uct_pending_req_t *self)

/* coverity[tainted_data_downcast] */
return ucp_proto_multi_zcopy_progress(
req, req->send.proto_config->priv, NULL,
req, req->send.proto_config->priv, ucp_proto_multi_rma_init_func,
UCT_MD_MEM_ACCESS_LOCAL_READ, UCP_DT_MASK_CONTIG_IOV,
ucp_proto_put_offload_zcopy_send_func,
ucp_request_invoke_uct_completion_success,
Expand Down
Loading
Loading