Skip to content

Commit

Permalink
DAOS-15800 client: create cart context on specific interface
Browse files Browse the repository at this point in the history
Cart has added the ability to select network interface on context creation. The daos_agent also added a numa-fabric map that can be queried at init time. Update the DAOS client to query from the agent a map of numa to network interface on daos_init(), and on EQ creation, select the best interface for the network context based on the numa of the calling thread.

Quick-Functional: true
Test-tag: DaosCoreTestDfs

Required-githooks: true

Signed-off-by: Mohamad Chaarawi <mohamad.chaarawi@intel.com>
  • Loading branch information
mchaarawi committed Aug 5, 2024
1 parent 5ea9557 commit 99406dc
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#define CRT_CONTEXT_NULL (NULL)

#ifndef CRT_SRV_CONTEXT_NUM
#define CRT_SRV_CONTEXT_NUM (64) /* Maximum number of contexts */
#define CRT_SRV_CONTEXT_NUM (128) /* Maximum number of contexts */
#endif


Expand Down
2 changes: 1 addition & 1 deletion src/client/api/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scons():

if prereqs.client_requested():
libdaos = env.d_library('daos', libdaos_tgts, SHLIBVERSION=API_VERSION,
LIBS=['daos_common'])
LIBS=['daos_common', 'numa'])
if hasattr(env, 'InstallVersionedLib'):
env.InstallVersionedLib('$PREFIX/lib64/', libdaos, SHLIBVERSION=API_VERSION)
else:
Expand Down
39 changes: 34 additions & 5 deletions src/client/api/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,24 @@ daos_eq_lib_init(crt_init_options_t *crt_info)
D_GOTO(unlock, rc);
}

/* use a global shared context for all eq for now */
rc = crt_context_create(&daos_eq_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc && rc != -DER_NONEXIST) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&daos_eq_ctx);
else
rc = crt_context_create_on_iface(iface, &daos_eq_ctx);
} else {
rc = crt_context_create(&daos_eq_ctx);
}
if (rc != 0) {
D_ERROR("failed to create client context: "DF_RC"\n",
DP_RC(rc));
D_ERROR("failed to create client context: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}

Expand Down Expand Up @@ -656,7 +669,23 @@ daos_eq_create(daos_handle_t *eqh)

eqx = daos_eq2eqx(eq);

rc = crt_context_create(&eqx->eqx_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
daos_eq_free(&eqx->eqx_hlink);
return rc;
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&eqx->eqx_ctx);
else
rc = crt_context_create_on_iface(iface, &eqx->eqx_ctx);
} else {
rc = crt_context_create(&eqx->eqx_ctx);
}
if (rc) {
D_WARN("Failed to create CART context; using the global one, "DF_RC"\n", DP_RC(rc));
eqx->eqx_ctx = daos_eq_ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def scons():
denv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD'])
libraries = ['daos_common_pmem', 'gurt', 'cart', 'vos_srv']
libraries += ['bio', 'dl', 'uuid', 'pthread', 'abt']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal', 'numa']

denv.require('argobots', 'protobufc', 'pmdk', 'isal')

Expand Down
6 changes: 6 additions & 0 deletions src/include/daos/mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <daos/pool.h>
#include "svc.pb-c.h"

extern bool d_dynamic_ctx_g;

int dc_mgmt_init(void);

void dc_mgmt_fini(void);
Expand All @@ -41,6 +43,8 @@ struct dc_mgmt_sys_info {
d_rank_list_t *ms_ranks;
char system_name[DAOS_SYS_INFO_STRING_MAX + 1];
uint32_t provider_idx; /* Provider index (if more than one available) */
daos_size_t numa_entries_nr;
daos_size_t *numa_iface_idx_rr;
};

/** Client system handle */
Expand Down Expand Up @@ -78,5 +82,7 @@ int dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info
void dc_put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp);
int dc_mgmt_cache_attach_info(const char *name);
void dc_mgmt_drop_attach_info(void);
int
dc_mgmt_get_iface(char *iface);
int dc_mgmt_tm_register(const char *sys, const char *jobid, key_t shm_key, uid_t *owner_uid);
#endif
115 changes: 105 additions & 10 deletions src/mgmt/cli_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

#define D_LOGFAC DD_FAC(mgmt)

#include <daos/mgmt.h>

#include <daos/agent.h>
#include <daos/drpc_modules.h>
#include <daos/event.h>
#include <daos/job.h>
#include <daos/mgmt.h>
#include <daos/pool.h>
#include <daos/security.h>
#include "svc.pb-c.h"
#include "rpc.h"
#include <errno.h>
#include <numa.h>
#include <stdlib.h>
#include <sys/ipc.h>

Expand All @@ -31,6 +31,7 @@ char agent_sys_name[DAOS_SYS_NAME_MAX + 1] = DAOS_DEFAULT_SYS_NAME;
static struct dc_mgmt_sys_info info_g;
static Mgmt__GetAttachInfoResp *resp_g;

bool d_dynamic_ctx_g;
int dc_mgmt_proto_version;

int
Expand Down Expand Up @@ -241,6 +242,7 @@ put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp)
if (resp != NULL)
free_get_attach_info_resp(resp);
d_rank_list_free(info->ms_ranks);
D_FREE(info->numa_iface_idx_rr);
}

void
Expand Down Expand Up @@ -413,9 +415,23 @@ dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info *in
int
dc_mgmt_cache_attach_info(const char *name)
{
int rc;

if (name != NULL && strcmp(name, agent_sys_name) != 0)
return -DER_INVAL;
return get_attach_info(name, true, &info_g, &resp_g);
rc = get_attach_info(name, true, &info_g, &resp_g);
if (rc)
return rc;

info_g.numa_entries_nr = resp_g->n_numa_fabric_interfaces;
D_ALLOC_ARRAY(info_g.numa_iface_idx_rr, info_g.numa_entries_nr);
if (info_g.numa_iface_idx_rr == NULL)
D_GOTO(err_rank_list, rc = -DER_NOMEM);
return 0;

err_rank_list:
d_rank_list_free(info_g.ms_ranks);
return rc;
}

static void
Expand Down Expand Up @@ -625,14 +641,43 @@ dc_mgmt_net_cfg(const char *name, crt_init_options_t *crt_info)
D_STRNDUP(crt_info->cio_provider, info->provider, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_provider)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);

D_INFO("Network interface: %s, Domain: %s\n", info->interface, info->domain);
d_getenv_bool("D_DYNAMIC_CTX", &d_dynamic_ctx_g);
if (d_dynamic_ctx_g) {
int i;

D_ALLOC(crt_info->cio_interface,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_interface == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_ALLOC(crt_info->cio_domain,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_domain == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);

for (i = 0; i < resp->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int j;

for (j = 0; j < numa_ifaces->n_ifaces; j++) {
if (i != 0 || j != 0) {
strcat(crt_info->cio_interface, ",");
strcat(crt_info->cio_domain, ",");
}
strcat(crt_info->cio_interface, numa_ifaces->ifaces[j]->interface);
strcat(crt_info->cio_domain, numa_ifaces->ifaces[j]->domain);
}
}
} else {
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);
}
D_INFO("Network interface: %s, Domain: %s, Provider: %s\n", crt_info->cio_interface,
crt_info->cio_domain, crt_info->cio_provider);
D_DEBUG(DB_MGMT,
"CaRT initialization with:\n"
"\tD_PROVIDER: %s, CRT_TIMEOUT: %d, CRT_SECONDARY_PROVIDER: %s\n",
Expand Down Expand Up @@ -667,6 +712,56 @@ int dc_mgmt_net_cfg_check(const char *name)
return 0;
}

int
dc_mgmt_get_iface(char *iface)
{
int cpu;
int numa;
int i;

cpu = sched_getcpu();
if (cpu < 0) {
D_ERROR("sched_getcpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

numa = numa_node_of_cpu(cpu);
if (numa < 0) {
D_ERROR("numa_node_of_cpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

if (resp_g->n_numa_fabric_interfaces <= 0) {
D_ERROR("No fabric interfaces initialized.\n");
return -DER_INVAL;
}

for (i = 0; i < resp_g->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int idx;

if (numa_ifaces->numa_node != numa)
continue;

idx = info_g.numa_iface_idx_rr[i] % numa_ifaces->n_ifaces;
D_ASSERT(numa_ifaces->ifaces[idx]->numa_node == numa);
info_g.numa_iface_idx_rr[i]++;

if (copy_str(iface, numa_ifaces->ifaces[idx]->interface) != 0) {
D_ERROR("Interface string too long.\n");
return -DER_INVAL;
}
D_DEBUG(DB_MGMT, "Numa: %d, Interface Selected: IDX: %d, Name = %s\n", numa, idx,
iface);
break;
}
if (i == resp_g->n_numa_fabric_interfaces) {
D_DEBUG(DB_MGMT, "No iface on numa %d\n", numa);
return -DER_NONEXIST;
}
return 0;
}

static int send_monitor_request(struct dc_pool *pool, int request_type)
{
struct drpc *ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/rdb/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def scons():
# rdbt client
rdbt = tenv.d_program('rdbt', ['rdbt.c', 'rpc.c'] + libdaos_tgts,
LIBS=['daos_common_pmem', 'cart', 'gurt', 'uuid', 'isal', 'protobuf-c',
'pthread'])
'pthread', 'numa'])
tenv.Install('$PREFIX/bin', rdbt)


Expand Down
2 changes: 1 addition & 1 deletion src/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_tests(env):
daos_perf = denv.d_program('daos_perf', ['daos_perf.c', perf_common], LIBS=libs_client)
denv.Install('$PREFIX/bin/', daos_perf)

libs_server += ['vos', 'bio', 'abt']
libs_server += ['vos', 'bio', 'abt', 'numa']
vos_engine = denv.StaticObject(['vos_engine.c'])

if denv["STACK_MMAP"] == 1:
Expand Down
4 changes: 2 additions & 2 deletions src/tests/ftest/util/daos_core_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ def run_subtest(self):
self.hostlist_clients, self.subtest_name, self.outputdir, self.test_dir, self.log)
daos_test_env = cmocka_utils.get_cmocka_env()
daos_test_env["D_LOG_FILE"] = get_log_file(self.client_log)
daos_test_env["D_LOG_MASK"] = self.get_test_param("test_log_mask", "DEBUG")
daos_test_env["DD_MASK"] = "mgmt,io,md,epc,rebuild,test"
daos_test_env["D_LOG_MASK"] = "DEBUG"
daos_test_env["COVFILE"] = "/tmp/test.cov"
daos_test_env["POOL_SCM_SIZE"] = str(scm_size)
daos_test_env["POOL_NVME_SIZE"] = str(nvme_size)
daos_test_env["D_DYNAMIC_CTX"] = 1
daos_test_cmd = cmocka_utils.get_cmocka_command(
" ".join([self.daos_test, "-n", dmg_config_file, "".join(["-", subtest]), str(args)]))
job = get_job_manager(self, "Orterun", daos_test_cmd, mpi_type="openmpi")
Expand Down

0 comments on commit 99406dc

Please sign in to comment.