Skip to content

Commit

Permalink
DAOS-15800 client: create cart context on specific interface
Browse files Browse the repository at this point in the history
Cart has added the ability to select network interface on context creation. The daos_agent also added a numa-fabric map that can be queried at init time. Update the DAOS client to query from the agent a map of numa to network interface on daos_init(), and on EQ creation, select the best interface for the network context based on the numa of the calling thread.

Features: test_pil4dfs_vs_dfs

Required-githooks: true

Signed-off-by: Mohamad Chaarawi <mohamad.chaarawi@intel.com>
  • Loading branch information
mchaarawi committed Aug 19, 2024
1 parent 427d135 commit ba8013a
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/cart/README.env
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ This file lists the environment variables used in CaRT.

. CRT_CTX_NUM
If set, specifies the limit of number of allowed CaRT contexts to be created.
Valid range is [1, 64], with default being 64 if unset.
Valid range is [1, 128], with default being 128 if unset.

. D_FI_CONFIG
Specifies the fault injection configuration file. If this variable is not set
Expand Down
2 changes: 1 addition & 1 deletion src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#define CRT_CONTEXT_NULL (NULL)

#ifndef CRT_SRV_CONTEXT_NUM
#define CRT_SRV_CONTEXT_NUM (64) /* Maximum number of contexts */
#define CRT_SRV_CONTEXT_NUM (128) /* Maximum number of contexts */
#endif


Expand Down
2 changes: 1 addition & 1 deletion src/client/api/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def scons():

if prereqs.client_requested():
libdaos = env.d_library('daos', libdaos_tgts, SHLIBVERSION=API_VERSION,
LIBS=['daos_common'])
LIBS=['daos_common', 'numa'])
if hasattr(env, 'InstallVersionedLib'):
env.InstallVersionedLib('$PREFIX/lib64/', libdaos, SHLIBVERSION=API_VERSION)
else:
Expand Down
39 changes: 34 additions & 5 deletions src/client/api/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,24 @@ daos_eq_lib_init(crt_init_options_t *crt_info)
D_GOTO(unlock, rc);
}

/* use a global shared context for all eq for now */
rc = crt_context_create(&daos_eq_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc && rc != -DER_NONEXIST) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&daos_eq_ctx);
else
rc = crt_context_create_on_iface(iface, &daos_eq_ctx);
} else {
rc = crt_context_create(&daos_eq_ctx);
}
if (rc != 0) {
D_ERROR("failed to create client context: "DF_RC"\n",
DP_RC(rc));
D_ERROR("failed to create client context: " DF_RC "\n", DP_RC(rc));
D_GOTO(crt, rc);
}

Expand Down Expand Up @@ -656,7 +669,23 @@ daos_eq_create(daos_handle_t *eqh)

eqx = daos_eq2eqx(eq);

rc = crt_context_create(&eqx->eqx_ctx);
if (d_dynamic_ctx_g) {
char iface[DAOS_SYS_INFO_STRING_MAX];

rc = dc_mgmt_get_iface(&iface[0]);
if (rc) {
D_ERROR("failed to get iface: " DF_RC "\n", DP_RC(rc));
daos_eq_free(&eqx->eqx_hlink);
return rc;
}
/** if no interface returned, use the default */
if (rc == -DER_NONEXIST)
rc = crt_context_create(&eqx->eqx_ctx);
else
rc = crt_context_create_on_iface(iface, &eqx->eqx_ctx);
} else {
rc = crt_context_create(&eqx->eqx_ctx);
}
if (rc) {
D_WARN("Failed to create CART context; using the global one, "DF_RC"\n", DP_RC(rc));
eqx->eqx_ctx = daos_eq_ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/engine/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def scons():
denv.Append(CPPDEFINES=['-DDAOS_PMEM_BUILD'])
libraries = ['daos_common_pmem', 'gurt', 'cart', 'vos_srv']
libraries += ['bio', 'dl', 'uuid', 'pthread', 'abt']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal']
libraries += ['hwloc', 'pmemobj', 'protobuf-c', 'isal', 'numa']

denv.require('argobots', 'protobufc', 'pmdk', 'isal')

Expand Down
6 changes: 6 additions & 0 deletions src/include/daos/mgmt.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include <daos/pool.h>
#include "svc.pb-c.h"

extern bool d_dynamic_ctx_g;

int dc_mgmt_init(void);

void dc_mgmt_fini(void);
Expand All @@ -41,6 +43,8 @@ struct dc_mgmt_sys_info {
d_rank_list_t *ms_ranks;
char system_name[DAOS_SYS_INFO_STRING_MAX + 1];
uint32_t provider_idx; /* Provider index (if more than one available) */
daos_size_t numa_entries_nr;
daos_size_t *numa_iface_idx_rr;
};

/** Client system handle */
Expand Down Expand Up @@ -78,5 +82,7 @@ int dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info
void dc_put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp);
int dc_mgmt_cache_attach_info(const char *name);
void dc_mgmt_drop_attach_info(void);
int
dc_mgmt_get_iface(char *iface);
int dc_mgmt_tm_register(const char *sys, const char *jobid, key_t shm_key, uid_t *owner_uid);
#endif
131 changes: 121 additions & 10 deletions src/mgmt/cli_mgmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@

#define D_LOGFAC DD_FAC(mgmt)

#include <daos/mgmt.h>

#include <daos/agent.h>
#include <daos/drpc_modules.h>
#include <daos/event.h>
#include <daos/job.h>
#include <daos/mgmt.h>
#include <daos/pool.h>
#include <daos/security.h>
#include "svc.pb-c.h"
#include "rpc.h"
#include <errno.h>
#include <numa.h>
#include <stdlib.h>
#include <sys/ipc.h>

Expand All @@ -31,6 +31,7 @@ char agent_sys_name[DAOS_SYS_NAME_MAX + 1] = DAOS_DEFAULT_SYS_NAME;
static struct dc_mgmt_sys_info info_g;
static Mgmt__GetAttachInfoResp *resp_g;

bool d_dynamic_ctx_g;
int dc_mgmt_proto_version;

int
Expand Down Expand Up @@ -241,6 +242,7 @@ put_attach_info(struct dc_mgmt_sys_info *info, Mgmt__GetAttachInfoResp *resp)
if (resp != NULL)
free_get_attach_info_resp(resp);
d_rank_list_free(info->ms_ranks);
D_FREE(info->numa_iface_idx_rr);
}

void
Expand Down Expand Up @@ -413,9 +415,23 @@ dc_get_attach_info(const char *name, bool all_ranks, struct dc_mgmt_sys_info *in
int
dc_mgmt_cache_attach_info(const char *name)
{
int rc;

if (name != NULL && strcmp(name, agent_sys_name) != 0)
return -DER_INVAL;
return get_attach_info(name, true, &info_g, &resp_g);
rc = get_attach_info(name, true, &info_g, &resp_g);
if (rc)
return rc;

info_g.numa_entries_nr = resp_g->n_numa_fabric_interfaces;
D_ALLOC_ARRAY(info_g.numa_iface_idx_rr, info_g.numa_entries_nr);
if (info_g.numa_iface_idx_rr == NULL)
D_GOTO(err_rank_list, rc = -DER_NOMEM);
return 0;

err_rank_list:
d_rank_list_free(info_g.ms_ranks);
return rc;
}

static void
Expand Down Expand Up @@ -625,14 +641,51 @@ dc_mgmt_net_cfg(const char *name, crt_init_options_t *crt_info)
D_STRNDUP(crt_info->cio_provider, info->provider, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_provider)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);

D_INFO("Network interface: %s, Domain: %s\n", info->interface, info->domain);
d_getenv_bool("D_DYNAMIC_CTX", &d_dynamic_ctx_g);
if (d_dynamic_ctx_g) {
int i;

D_ALLOC(crt_info->cio_interface,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_interface == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_ALLOC(crt_info->cio_domain,
DAOS_SYS_INFO_STRING_MAX * resp->n_numa_fabric_interfaces);
if (crt_info->cio_domain == NULL)
D_GOTO(cleanup, rc = -DER_NOMEM);

for (i = 0; i < resp->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int j;

for (j = 0; j < numa_ifaces->n_ifaces; j++) {
if (i != 0 || j != 0) {
strcat(crt_info->cio_interface, ",");
strcat(crt_info->cio_domain, ",");
}
strcat(crt_info->cio_interface, numa_ifaces->ifaces[j]->interface);
strcat(crt_info->cio_domain, numa_ifaces->ifaces[j]->domain);
}
/*
* If we have multiple interfaces per numa node, we want to randomize the
* first interface selected in case we have multiple processes running
* there. So initialize the index array at that interface to -1 to know that
* this is the first selection later.
*/
if (numa_ifaces->n_ifaces)
info_g.numa_iface_idx_rr[i] = -1;
}
} else {
D_STRNDUP(crt_info->cio_interface, info->interface, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_interface)
D_GOTO(cleanup, rc = -DER_NOMEM);
D_STRNDUP(crt_info->cio_domain, info->domain, DAOS_SYS_INFO_STRING_MAX);
if (NULL == crt_info->cio_domain)
D_GOTO(cleanup, rc = -DER_NOMEM);
}
D_INFO("Network interface: %s, Domain: %s, Provider: %s\n", crt_info->cio_interface,
crt_info->cio_domain, crt_info->cio_provider);
D_DEBUG(DB_MGMT,
"CaRT initialization with:\n"
"\tD_PROVIDER: %s, CRT_TIMEOUT: %d, CRT_SECONDARY_PROVIDER: %s\n",
Expand Down Expand Up @@ -667,6 +720,64 @@ int dc_mgmt_net_cfg_check(const char *name)
return 0;
}

int
dc_mgmt_get_iface(char *iface)
{
int cpu;
int numa;
int i;

cpu = sched_getcpu();
if (cpu < 0) {
D_ERROR("sched_getcpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

numa = numa_node_of_cpu(cpu);
if (numa < 0) {
D_ERROR("numa_node_of_cpu() failed: %d (%s)\n", errno, strerror(errno));
return d_errno2der(errno);
}

if (resp_g->n_numa_fabric_interfaces <= 0) {
D_ERROR("No fabric interfaces initialized.\n");
return -DER_INVAL;
}

for (i = 0; i < resp_g->n_numa_fabric_interfaces; i++) {
Mgmt__FabricInterfaces *numa_ifaces = resp_g->numa_fabric_interfaces[i];
int idx;

if (numa_ifaces->numa_node != numa)
continue;

/*
* Randomize the first interface used to avoid multiple processes starting on the
* first interface (if there is more than 1).
*/
if (info_g.numa_iface_idx_rr[i] == -1) {
d_srand(getpid());
info_g.numa_iface_idx_rr[i] = d_rand() % numa_ifaces->n_ifaces;
}
idx = info_g.numa_iface_idx_rr[i] % numa_ifaces->n_ifaces;
D_ASSERT(numa_ifaces->ifaces[idx]->numa_node == numa);
info_g.numa_iface_idx_rr[i]++;

if (copy_str(iface, numa_ifaces->ifaces[idx]->interface) != 0) {
D_ERROR("Interface string too long.\n");
return -DER_INVAL;
}
D_DEBUG(DB_MGMT, "Numa: %d, Interface Selected: IDX: %d, Name = %s\n", numa, idx,
iface);
break;
}
if (i == resp_g->n_numa_fabric_interfaces) {
D_DEBUG(DB_MGMT, "No iface on numa %d\n", numa);
return -DER_NONEXIST;
}
return 0;
}

static int send_monitor_request(struct dc_pool *pool, int request_type)
{
struct drpc *ctx;
Expand Down
2 changes: 1 addition & 1 deletion src/rdb/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def scons():
# rdbt client
rdbt = tenv.d_program('rdbt', ['rdbt.c', 'rpc.c'] + libdaos_tgts,
LIBS=['daos_common_pmem', 'cart', 'gurt', 'uuid', 'isal', 'protobuf-c',
'pthread'])
'pthread', 'numa'])
tenv.Install('$PREFIX/bin', rdbt)


Expand Down
2 changes: 1 addition & 1 deletion src/tests/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def build_tests(env):
daos_perf = denv.d_program('daos_perf', ['daos_perf.c', perf_common], LIBS=libs_client)
denv.Install('$PREFIX/bin/', daos_perf)

libs_server += ['vos', 'bio', 'abt']
libs_server += ['vos', 'bio', 'abt', 'numa']
vos_engine = denv.StaticObject(['vos_engine.c'])

if denv["STACK_MMAP"] == 1:
Expand Down
8 changes: 7 additions & 1 deletion src/tests/ftest/dfuse/pil4dfs_fio.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from cpu_utils import CpuInfo
from dfuse_utils import get_dfuse, start_dfuse
from fio_utils import FioCommand
from general_utils import bytes_to_human, percent_change
from general_utils import bytes_to_human, get_log_file, percent_change


class Pil4dfsFio(TestWithServers):
Expand Down Expand Up @@ -115,6 +115,9 @@ def _run_fio_pil4dfs(self, ioengine):
"global", "cpus_allowed", self.fio_cpus_allowed,
f"fio --name=global --cpus_allowed={self.fio_cpus_allowed}")
fio_cmd.env['LD_PRELOAD'] = os.path.join(self.prefix, 'lib64', 'libpil4dfs.so')
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down Expand Up @@ -154,6 +157,9 @@ def _run_fio_dfs(self):
fio_cmd.update(
"job", "pool", container.pool.uuid, f"fio --name=job --pool={container.pool.uuid}")
fio_cmd.update("job", "cont", container.uuid, f"fio --name=job --cont={container.uuid}")
fio_cmd.env['D_DYNAMIC_CTX'] = 1
fio_cmd.env["D_LOG_FILE"] = get_log_file(self.client_log)
fio_cmd.env["D_LOG_MASK"] = 'INFO'
fio_cmd.hosts = self.hostlist_clients

bws = {}
Expand Down
2 changes: 2 additions & 0 deletions src/tests/ftest/dfuse/pil4dfs_fio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ server_config:
fabric_iface: ib0
fabric_iface_port: 31317
log_file: daos_server0.log
log_mask: INFO
storage: auto
1:
pinned_numa_node: 1
fabric_iface: ib1
fabric_iface_port: 31417
log_file: daos_server1.log
log_mask: INFO
storage: auto

pool:
Expand Down

0 comments on commit ba8013a

Please sign in to comment.