Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UCT/ROCM: add control of ipc cache usage #10497

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/uct/rocm/base/rocm_signal.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ unsigned uct_rocm_base_progress(ucs_queue_head_t *signal_queue)
static const unsigned max_signals = 16;
unsigned count = 0;
uct_rocm_base_signal_desc_t *rocm_signal;
hsa_status_t status;

ucs_queue_for_each_extract(rocm_signal, signal_queue, queue,
(hsa_signal_load_scacquire(rocm_signal->signal) == 0) &&
Expand All @@ -53,6 +54,14 @@ unsigned uct_rocm_base_progress(ucs_queue_head_t *signal_queue)
uct_invoke_completion(rocm_signal->comp, UCS_OK);
}

if (rocm_signal->mapped_addr != NULL) {
status = hsa_amd_ipc_memory_detach(rocm_signal->mapped_addr);
if (status != HSA_STATUS_SUCCESS) {
ucs_warn("failed to detach ipc memory region");
}
rocm_signal->mapped_addr = NULL;
}

ucs_trace_poll("rocm signal done :%p", rocm_signal);
ucs_mpool_put(rocm_signal);
count++;
Expand Down
2 changes: 1 addition & 1 deletion src/uct/rocm/copy/rocm_copy_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ ucs_status_t uct_rocm_copy_ep_zcopy(uct_ep_h tl_ep, uint64_t remote_addr,
ret = UCS_OK;
} else {
rocm_copy_signal->comp = comp;
rocm_copy_signal->mapped_addr = dst_addr;
rocm_copy_signal->mapped_addr = NULL;
ucs_queue_push(&iface->signal_queue, &rocm_copy_signal->queue);
}

Expand Down
25 changes: 20 additions & 5 deletions src/uct/rocm/ipc/rocm_ipc_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,22 @@ ucs_status_t uct_rocm_ipc_ep_zcopy(uct_ep_h tl_ep,
return UCS_ERR_INVALID_ADDR;
}

ret = uct_rocm_ipc_cache_map_memhandle((void *)ep->remote_memh_cache, key,
if (iface->config.enable_ipc_handle_cache) {
ret = uct_rocm_ipc_cache_map_memhandle((void*)ep->remote_memh_cache,
key, &remote_base_addr);
if (ucs_unlikely(ret != UCS_OK)) {
ucs_error("fail to attach ipc mem %p %d\n", (void*)key->address,
ret);
return ret;
}
} else {
status = hsa_amd_ipc_memory_attach(&key->ipc, key->length, 0, NULL,
&remote_base_addr);
if (ret != UCS_OK) {
ucs_error("fail to attach ipc mem %p %d\n", (void *)key->address, ret);
return ret;
if (ucs_unlikely(status != HSA_STATUS_SUCCESS)) {
ucs_error("failed to open ipc mem handle. addr:%p len:%lu",
(void*)key->address, key->length);
return UCS_ERR_INVALID_ADDR;
}
}

remote_copy_addr = UCS_PTR_BYTE_OFFSET(remote_base_addr,
Expand Down Expand Up @@ -156,7 +167,11 @@ ucs_status_t uct_rocm_ipc_ep_zcopy(uct_ep_h tl_ep,
}

rocm_ipc_signal->comp = comp;
rocm_ipc_signal->mapped_addr = remote_base_addr;
if (iface->config.enable_ipc_handle_cache) {
rocm_ipc_signal->mapped_addr = NULL;
} else {
rocm_ipc_signal->mapped_addr = remote_base_addr;
}
ucs_queue_push(&iface->signal_queue, &rocm_ipc_signal->queue);

ucs_trace("rocm async copy issued :%p remote:%p, local:%p len:%ld",
Expand Down
12 changes: 8 additions & 4 deletions src/uct/rocm/ipc/rocm_ipc_iface.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ static ucs_config_field_t uct_rocm_ipc_iface_config_table[] = {
UCS_CONFIG_TYPE_TABLE(uct_iface_config_table)},

{"MIN_ZCOPY", "128", "Minimum data size for ROCm/IPC zcopy protocols",
ucs_offsetof(uct_rocm_ipc_iface_config_t, min_zcopy),
ucs_offsetof(uct_rocm_ipc_iface_config_t, params.min_zcopy),
UCS_CONFIG_TYPE_MEMUNITS},

{"LAT", "1e-7", "Latency",
ucs_offsetof(uct_rocm_ipc_iface_config_t, latency), UCS_CONFIG_TYPE_TIME},
ucs_offsetof(uct_rocm_ipc_iface_config_t, params.latency),
UCS_CONFIG_TYPE_TIME},

{"CACHE_IPC_HANDLES", "y", "Enable caching IPC handles",
ucs_offsetof(uct_rocm_ipc_iface_config_t, params.enable_ipc_handle_cache),
UCS_CONFIG_TYPE_BOOL},

{NULL}
};
Expand Down Expand Up @@ -199,8 +204,7 @@ static UCS_CLASS_INIT_FUNC(uct_rocm_ipc_iface_t, uct_md_h md, uct_worker_h worke
tl_config UCS_STATS_ARG(params->stats_root)
UCS_STATS_ARG(UCT_ROCM_IPC_TL_NAME));

self->config.min_zcopy = config->min_zcopy;
self->config.latency = config->latency;
self->config = config->params;

ucs_mpool_params_reset(&mp_params);
mp_params.elem_size = sizeof(uct_rocm_base_signal_desc_t);
Expand Down
23 changes: 12 additions & 11 deletions src/uct/rocm/ipc/rocm_ipc_iface.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@

#define UCT_ROCM_IPC_TL_NAME "rocm_ipc"

typedef struct uct_rocm_ipc_iface {
uct_base_iface_t super;
ucs_mpool_t signal_pool;
ucs_queue_head_t signal_queue;
struct {
size_t min_zcopy;
double latency;
} config;
typedef struct uct_rocm_ipc_iface_config_params {
size_t min_zcopy;
double latency;
int enable_ipc_handle_cache;
} uct_rocm_ipc_iface_config_params_t;

typedef struct uct_rocm_ipc_iface {
uct_base_iface_t super;
ucs_mpool_t signal_pool;
ucs_queue_head_t signal_queue;
uct_rocm_ipc_iface_config_params_t config;
} uct_rocm_ipc_iface_t;

typedef struct uct_rocm_ipc_iface_config {
uct_iface_config_t super;
size_t min_zcopy;
double latency;
uct_iface_config_t super;
uct_rocm_ipc_iface_config_params_t params;
} uct_rocm_ipc_iface_config_t;

#endif
Loading