diff --git a/src/client/api/SConscript b/src/client/api/SConscript index 62ba96ef6002..43af58a6c868 100644 --- a/src/client/api/SConscript +++ b/src/client/api/SConscript @@ -1,7 +1,7 @@ """Build DAOS client""" LIBDAOS_SRC = ['agent.c', 'array.c', 'container.c', 'event.c', 'init.c', 'job.c', 'kv.c', 'mgmt.c', - 'object.c', 'pool.c', 'rpc.c', 'task.c', 'tx.c', 'pipeline.c'] + 'object.c', 'pool.c', 'rpc.c', 'task.c', 'tx.c', 'pipeline.c', 'metrics.c'] def scons(): diff --git a/src/client/api/init.c b/src/client/api/init.c index da02c71631ce..b357e2088da7 100644 --- a/src/client/api/init.c +++ b/src/client/api/init.c @@ -23,6 +23,7 @@ #include #include #include +#include #if BUILD_PIPELINE #include #endif @@ -242,19 +243,25 @@ daos_init(void) if (rc != 0) D_GOTO(out_co, rc); + rc = dc_tm_init(); + if (rc) + D_GOTO(out_obj, rc); + #if BUILD_PIPELINE /** set up pipeline */ rc = dc_pipeline_init(); if (rc != 0) - D_GOTO(out_obj, rc); + D_GOTO(out_tm, rc); #endif module_initialized++; D_GOTO(unlock, rc = 0); #if BUILD_PIPELINE +out_tm: + dc_tm_fini(); +#endif out_obj: dc_obj_fini(); -#endif out_co: dc_cont_fini(); out_pool: @@ -322,6 +329,7 @@ daos_fini(void) D_ERROR("failed to disconnect some resources may leak, " DF_RC"\n", DP_RC(rc)); + dc_tm_fini(); dc_agent_fini(); dc_job_fini(); diff --git a/src/client/api/metrics.c b/src/client/api/metrics.c new file mode 100644 index 000000000000..c04ff52d535c --- /dev/null +++ b/src/client/api/metrics.c @@ -0,0 +1,167 @@ +/* + * (C) Copyright 2020-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define INIT_JOB_NUM 1024 +bool client_metric; +bool client_metric_retain; + +#define MAX_IDS_SIZE(num) (num * D_TM_METRIC_SIZE) +/* The client side metrics structure looks like + * root/job_id/pid/.... + */ +int +dc_tm_init(void) +{ + struct d_tm_node_t *job_node; + struct d_tm_context *current_ctx; + struct daos_thread_local_storage *dtls; + int metrics_tag; + pid_t pid; + int rc; + + d_getenv_bool(DAOS_CLIENT_METRICS_ENV, &client_metric); + if (!client_metric) + return 0; + + d_getenv_bool(DAOS_CLIENT_METRICS_RETAIN_ENV, &client_metric_retain); + + metrics_tag = D_TM_CLIENT_PROCESS | D_TM_OPEN_OR_CREATE; + if (client_metric_retain) + metrics_tag |= D_TM_RETAIN_SHMEM; + else + metrics_tag |= D_TM_RETAIN_SHMEM_IF_NON_EMPTY; + + rc = d_tm_init(DC_TM_JOB_ROOT_ID, MAX_IDS_SIZE(INIT_JOB_NUM), metrics_tag); + if (rc != 0) { + D_ERROR("init job root id %u: %d\n", DC_TM_JOB_ROOT_ID, rc); + return rc; + } + + pid = getpid(); + D_INFO("INIT %s/%u metrics\n", dc_jobid, pid); + rc = d_tm_add_metric(&job_node, D_TM_DIRECTORY, + "job id directory", "dir", + "%s/%u", dc_jobid, pid); + /* Close job root sheme */ + d_tm_fini(); + if (rc != 0) { + D_ERROR("add metric %s/%u failed: %d\n", dc_jobid, pid, rc); + D_GOTO(out, rc); + } + + metrics_tag = D_TM_CLIENT_PROCESS; + if (client_metric_retain) + metrics_tag |= D_TM_RETAIN_SHMEM; + rc = d_tm_init(pid, MAX_IDS_SIZE(INIT_JOB_NUM), metrics_tag); + if (rc != 0) + D_GOTO(out, rc); + + current_ctx = d_tm_open(pid); + if (current_ctx == NULL) + D_GOTO(out, rc = -DER_NOMEM); + + dtls = dc_tls_init(DAOS_CLI_TAG, pid); + if (dtls == NULL) + D_GOTO(out, rc = -DER_NOMEM); +out: + if (rc) + d_tm_fini(); + + return rc; +} + +static void +iter_dump(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +static int +dump_tm_file(const char *dump_path) +{ + struct d_tm_context *ctx; + struct d_tm_node_t *root; + uint32_t filter; + FILE *dump_file; + pid_t pid; + int rc = 0; + + dump_file = fopen(dump_path, "w+"); + if (dump_file == NULL) { + D_INFO("cannot open %s", dump_path); + return -DER_INVAL; + } + + filter = D_TM_COUNTER | D_TM_DURATION | D_TM_TIMESTAMP | D_TM_MEMINFO | + D_TM_TIMER_SNAPSHOT | D_TM_GAUGE | D_TM_STATS_GAUGE; + + pid = getpid(); + ctx = d_tm_open(pid); + if (ctx == NULL) + D_GOTO(close, rc = -DER_NOMEM); + + root = d_tm_get_root(ctx); + if (root == NULL) { + D_INFO("no root exist for %u\n", pid); + D_GOTO(close_ctx, rc = -DER_NONEXIST); + } + + d_tm_print_field_descriptors(0, dump_file); + + d_tm_iterate(ctx, root, 0, filter, NULL, D_TM_CSV, 0, iter_dump, dump_file); + +close_ctx: + d_tm_close(&ctx); +close: + fclose(dump_file); + return rc; +} + +void +dc_tm_fini() +{ + pid_t pid = getpid(); + char *dump_path; + int rc; + + if (!client_metric) + return; + + dump_path = getenv(METRIC_DUMP_ENV); + if (dump_path != NULL) + dump_tm_file(dump_path); + + dc_tls_fini(); + /* close current pid ctct */ + d_tm_fini(); + + if (client_metric_retain) + return; + + rc = d_tm_init(DC_TM_JOB_ROOT_ID, MAX_IDS_SIZE(INIT_JOB_NUM), + D_TM_CLIENT_PROCESS | D_TM_RETAIN_SHMEM_IF_NON_EMPTY | + D_TM_OPEN_OR_CREATE); + if (rc != 0) + return; + + D_INFO("delete pid %s/%u\n", dc_jobid, pid); + d_tm_del_node("%s/%d", dc_jobid, pid); + d_tm_del_node("%s", dc_jobid); + + d_tm_fini(); +} diff --git a/src/common/SConscript b/src/common/SConscript index 151ba5f0ed46..432b72403e51 100644 --- a/src/common/SConscript +++ b/src/common/SConscript @@ -9,7 +9,7 @@ COMMON_FILES = ['debug.c', 'mem.c', 'fail_loc.c', 'lru.c', 'dedup.c', 'profile.c', 'compression.c', 'compression_isal.c', 'compression_qat.c', 'multihash.c', 'multihash_isal.c', 'cipher.c', 'cipher_isal.c', 'qat.c', 'fault_domain.c', - 'policy.c'] + 'policy.c', 'tls.c'] def build_daos_common(denv, client): diff --git a/src/common/tls.c b/src/common/tls.c new file mode 100644 index 000000000000..7a01bc8d7893 --- /dev/null +++ b/src/common/tls.c @@ -0,0 +1,191 @@ +/** + * (C) Copyright 2016-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +/** + * It implements thread-local storage (TLS) for DAOS. + */ +#include +#include + +/* The array remember all of registered module keys on one node. */ +struct daos_module_key *daos_module_keys[DAOS_MODULE_KEYS_NR] = { NULL }; +pthread_mutex_t daos_module_keys_lock = PTHREAD_MUTEX_INITIALIZER; + +void +daos_register_key(struct daos_module_key *key) +{ + int i; + + D_MUTEX_LOCK(&daos_module_keys_lock); + for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { + if (daos_module_keys[i] == NULL) { + daos_module_keys[i] = key; + key->dmk_index = i; + break; + } + } + D_MUTEX_UNLOCK(&daos_module_keys_lock); + D_ASSERT(i < DAOS_MODULE_KEYS_NR); +} + +void +daos_unregister_key(struct daos_module_key *key) +{ + if (key == NULL) + return; + D_ASSERT(key->dmk_index >= 0); + D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); + D_MUTEX_LOCK(&daos_module_keys_lock); + daos_module_keys[key->dmk_index] = NULL; + D_MUTEX_UNLOCK(&daos_module_keys_lock); +} + +/** + * Init thread context + * + * \param[in]dtls Init the thread context to allocate the + * local thread variable for each module. + * + * \retval 0 if initialization succeeds + * \retval negative errno if initialization fails + */ +static int +daos_thread_local_storage_init(struct daos_thread_local_storage *dtls, + int xs_id, int tgt_id) +{ + int rc = 0; + int i; + + if (dtls->dtls_values == NULL) { + D_ALLOC_ARRAY(dtls->dtls_values, + (int)ARRAY_SIZE(daos_module_keys)); + if (dtls->dtls_values == NULL) + return -DER_NOMEM; + } + + for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { + struct daos_module_key *dmk = daos_module_keys[i]; + + if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { + D_ASSERT(dmk->dmk_init != NULL); + dtls->dtls_values[i] = dmk->dmk_init(dtls->dtls_tag, xs_id, tgt_id); + if (dtls->dtls_values[i] == NULL) { + rc = -DER_NOMEM; + break; + } + } + } + return rc; +} + +/** + * Finish module context + * + * \param[in]dtls Finish the thread context to free the + * local thread variable for each module. + */ +static void +daos_thread_local_storage_fini(struct daos_thread_local_storage *dtls) +{ + int i; + + if (dtls->dtls_values != NULL) { + for (i = DAOS_MODULE_KEYS_NR - 1; i >= 0; i--) { + struct daos_module_key *dmk = daos_module_keys[i]; + + if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { + D_ASSERT(dtls->dtls_values[i] != NULL); + D_ASSERT(dmk->dmk_fini != NULL); + dmk->dmk_fini(dtls->dtls_tag, dtls->dtls_values[i]); + } + } + } + + D_FREE(dtls->dtls_values); +} + +pthread_key_t dss_tls_key; +pthread_key_t dc_tls_key; + +/* + * Allocate daos_thread_local_storage for a particular thread on server and + * store the pointer in a thread-specific value which can be fetched at any + * time with daos_tls_get(). + */ +static struct daos_thread_local_storage * +daos_tls_init(int tag, int xs_id, int tgt_id, bool server) +{ + struct daos_thread_local_storage *dtls; + int rc; + + D_ALLOC_PTR(dtls); + if (dtls == NULL) + return NULL; + + dtls->dtls_tag = tag; + rc = daos_thread_local_storage_init(dtls, xs_id, tgt_id); + if (rc != 0) { + D_FREE(dtls); + return NULL; + } + + if (server) + rc = pthread_setspecific(dss_tls_key, dtls); + else + rc = pthread_setspecific(dc_tls_key, dtls); + if (rc) { + D_ERROR("failed to initialize tls: %d\n", rc); + daos_thread_local_storage_fini(dtls); + D_FREE(dtls); + return NULL; + } + + return dtls; +} + + +/* Free DTC for a particular thread. */ +static void +daos_tls_fini(struct daos_thread_local_storage *dtls, bool server) +{ + daos_thread_local_storage_fini(dtls); + D_FREE(dtls); + if (server) + pthread_setspecific(dss_tls_key, NULL); + else + pthread_setspecific(dc_tls_key, NULL); +} + +/* Allocate local per thread storage. */ +struct daos_thread_local_storage * +dc_tls_init(int tag, uint32_t pid) +{ + return daos_tls_init(tag, -1, pid, false); +} + +/* Free DTC for a particular thread. */ +void +dc_tls_fini(void) +{ + struct daos_thread_local_storage *dtls; + + dtls = (struct daos_thread_local_storage *)pthread_getspecific(dc_tls_key); + if (dtls != NULL) + daos_tls_fini(dtls, false); +} + +/* Allocate local per thread storage. */ +struct daos_thread_local_storage * +dss_tls_init(int tag, int xs_id, int tgt_id) +{ + return daos_tls_init(tag, xs_id, tgt_id, true); +} + +/* Free DTC for a particular thread. */ +void +dss_tls_fini(struct daos_thread_local_storage *dtls) +{ + daos_tls_fini(dtls, true); +} diff --git a/src/engine/SConscript b/src/engine/SConscript index ceb00a409d09..e94b6a83dd61 100644 --- a/src/engine/SConscript +++ b/src/engine/SConscript @@ -29,7 +29,7 @@ def scons(): 'drpc_handler.c', 'drpc_listener.c', 'drpc_progress.c', 'init.c', 'module.c', 'srv_cli.c', 'profile.c', 'rpc.c', - 'server_iv.c', 'srv.c', 'srv.pb-c.c', 'tls.c', + 'server_iv.c', 'srv.c', 'srv.pb-c.c', 'sched.c', 'ult.c', 'event.pb-c.c', 'srv_metrics.c'] + libdaos_tgts diff --git a/src/engine/init.c b/src/engine/init.c index eb3bca9edb14..6828f04aca71 100644 --- a/src/engine/init.c +++ b/src/engine/init.c @@ -22,6 +22,7 @@ #include #include #include +#include #include "srv_internal.h" #include "drpc_internal.h" #include @@ -618,14 +619,14 @@ server_id_cb(uint32_t *tid, uint64_t *uid) } if (tid != NULL) { - struct dss_thread_local_storage *dtc; - struct dss_module_info *dmi; + struct daos_thread_local_storage *dtc; + struct daos_module_info *dmi; int index = daos_srv_modkey.dmk_index; - /* Avoid assertion in dss_module_key_get() */ + /* Avoid assertion in daos_module_key_get() */ dtc = dss_tls_get(); if (dtc != NULL && index >= 0 && index < DAOS_MODULE_KEYS_NR && - dss_module_keys[index] == &daos_srv_modkey) { + daos_module_keys[index] == &daos_srv_modkey) { dmi = dss_get_module_info(); if (dmi != NULL) *tid = dmi->dmi_xs_id; diff --git a/src/engine/srv.c b/src/engine/srv.c index aa6cbd706e8f..634fecf81d73 100644 --- a/src/engine/srv.c +++ b/src/engine/srv.c @@ -383,7 +383,7 @@ static void dss_srv_handler(void *arg) { struct dss_xstream *dx = (struct dss_xstream *)arg; - struct dss_thread_local_storage *dtc; + struct daos_thread_local_storage *dtc; struct dss_module_info *dmi; int rc; bool track_mem = false; diff --git a/src/engine/srv_internal.h b/src/engine/srv_internal.h index 92504c026ca3..d3a06d79db37 100644 --- a/src/engine/srv_internal.h +++ b/src/engine/srv_internal.h @@ -314,10 +314,6 @@ sched_create_thread(struct dss_xstream *dx, void (*func)(void *), void *arg, return dss_abterr2der(rc); } -/* tls.c */ -void dss_tls_fini(struct dss_thread_local_storage *dtls); -struct dss_thread_local_storage *dss_tls_init(int tag, int xs_id, int tgt_id); - /* server_iv.c */ void ds_iv_init(void); void ds_iv_fini(void); diff --git a/src/engine/tls.c b/src/engine/tls.c deleted file mode 100644 index 90ea6cce7c58..000000000000 --- a/src/engine/tls.c +++ /dev/null @@ -1,155 +0,0 @@ -/** - * (C) Copyright 2016-2021 Intel Corporation. - * - * SPDX-License-Identifier: BSD-2-Clause-Patent - */ -/** - * This file is part of the DAOS server. It implements thread-local storage - * (TLS) for DAOS service threads. - */ -#define D_LOGFAC DD_FAC(server) - -#include -#include "srv_internal.h" - -/* The array remember all of registered module keys on one node. */ -struct dss_module_key *dss_module_keys[DAOS_MODULE_KEYS_NR] = { NULL }; - -pthread_mutex_t dss_module_keys_lock = PTHREAD_MUTEX_INITIALIZER; - -void -dss_register_key(struct dss_module_key *key) -{ - int i; - - D_MUTEX_LOCK(&dss_module_keys_lock); - for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { - if (dss_module_keys[i] == NULL) { - dss_module_keys[i] = key; - key->dmk_index = i; - break; - } - } - D_MUTEX_UNLOCK(&dss_module_keys_lock); - D_ASSERT(i < DAOS_MODULE_KEYS_NR); -} - -void -dss_unregister_key(struct dss_module_key *key) -{ - if (key == NULL) - return; - D_ASSERT(key->dmk_index >= 0); - D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); - D_MUTEX_LOCK(&dss_module_keys_lock); - dss_module_keys[key->dmk_index] = NULL; - D_MUTEX_UNLOCK(&dss_module_keys_lock); -} - -/** - * Init thread context - * - * \param[in]dtls Init the thread context to allocate the - * local thread variable for each module. - * - * \retval 0 if initialization succeeds - * \retval negative errno if initialization fails - */ -static int -dss_thread_local_storage_init(struct dss_thread_local_storage *dtls, - int xs_id, int tgt_id) -{ - int rc = 0; - int i; - - if (dtls->dtls_values == NULL) { - D_ALLOC_ARRAY(dtls->dtls_values, - (int)ARRAY_SIZE(dss_module_keys)); - if (dtls->dtls_values == NULL) - return -DER_NOMEM; - } - - for (i = 0; i < DAOS_MODULE_KEYS_NR; i++) { - struct dss_module_key *dmk = dss_module_keys[i]; - - if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { - D_ASSERT(dmk->dmk_init != NULL); - dtls->dtls_values[i] = dmk->dmk_init(dtls->dtls_tag, xs_id, tgt_id); - if (dtls->dtls_values[i] == NULL) { - rc = -DER_NOMEM; - break; - } - } - } - return rc; -} - -/** - * Finish module context - * - * \param[in]dtls Finish the thread context to free the - * local thread variable for each module. - */ -static void -dss_thread_local_storage_fini(struct dss_thread_local_storage *dtls) -{ - int i; - - if (dtls->dtls_values != NULL) { - for (i = DAOS_MODULE_KEYS_NR - 1; i >= 0; i--) { - struct dss_module_key *dmk = dss_module_keys[i]; - - if (dmk != NULL && dtls->dtls_tag & dmk->dmk_tags) { - D_ASSERT(dtls->dtls_values[i] != NULL); - D_ASSERT(dmk->dmk_fini != NULL); - dmk->dmk_fini(dtls->dtls_tag, dtls->dtls_values[i]); - } - } - } - - D_FREE(dtls->dtls_values); -} - -pthread_key_t dss_tls_key; - -/* - * Allocate dss_thread_local_storage for a particular thread and - * store the pointer in a thread-specific value which can be - * fetched at any time with dss_tls_get(). - */ -struct dss_thread_local_storage * -dss_tls_init(int tag, int xs_id, int tgt_id) -{ - struct dss_thread_local_storage *dtls; - int rc; - - D_ALLOC_PTR(dtls); - if (dtls == NULL) - return NULL; - - dtls->dtls_tag = tag; - rc = dss_thread_local_storage_init(dtls, xs_id, tgt_id); - if (rc != 0) { - D_FREE(dtls); - return NULL; - } - - rc = pthread_setspecific(dss_tls_key, dtls); - if (rc) { - D_ERROR("failed to initialize tls: %d\n", rc); - dss_thread_local_storage_fini(dtls); - D_FREE(dtls); - return NULL; - } - - return dtls; -} - -/* Free DTC for a particular thread. */ -void -dss_tls_fini(struct dss_thread_local_storage *dtls) -{ - dss_thread_local_storage_fini(dtls); - D_FREE(dtls); - pthread_setspecific(dss_tls_key, NULL); -} diff --git a/src/gurt/examples/telem_consumer_example.c b/src/gurt/examples/telem_consumer_example.c index 6b7b1653a163..f2b506bbc79d 100644 --- a/src/gurt/examples/telem_consumer_example.c +++ b/src/gurt/examples/telem_consumer_example.c @@ -147,6 +147,13 @@ void read_metrics(struct d_tm_context *ctx, struct d_tm_node_t *root, d_tm_list_free(head); } +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + int main(int argc, char **argv) { @@ -178,7 +185,7 @@ main(int argc, char **argv) D_TM_DURATION | D_TM_GAUGE | D_TM_DIRECTORY); show_meta = true; d_tm_iterate(ctx, root, 0, filter, NULL, D_TM_STANDARD, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); sprintf(dirname, "manually added"); filter = (D_TM_COUNTER | D_TM_TIMESTAMP | D_TM_TIMER_SNAPSHOT | diff --git a/src/gurt/telemetry.c b/src/gurt/telemetry.c index 3294c0662262..1272289fcedc 100644 --- a/src/gurt/telemetry.c +++ b/src/gurt/telemetry.c @@ -69,8 +69,9 @@ static struct d_tm_shmem { struct d_tm_context *ctx; /** context for the producer */ struct d_tm_node_t *root; /** root node of shmem */ pthread_mutex_t add_lock; /** for synchronized access */ - bool sync_access; /** whether to sync access */ - bool retain; /** retain shmem region on exit */ + uint32_t retain:1, /* retain shmem region during exit */ + sync_access:1, /** retain shmem region if it is not empty */ + retain_non_empty:1; int id; /** Instance ID */ } tm_shmem; @@ -200,6 +201,7 @@ attach_shmem(key_t key, size_t size, int flags, struct d_tm_shmem_hdr **shmem) return -DER_SHMEM_PERMS; } + D_INFO("allocate shmid %d key 0x%x addr %p\n", shmid, key, addr); *shmem = addr; return shmid; } @@ -529,7 +531,7 @@ init_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t *node, D_ERROR("cannot allocate node name [%s]\n", name); return -DER_NO_SHMEM; } - strncpy(node->dtn_name, name, buff_len); + strncpy(conv_ptr(shmem, node->dtn_name), name, buff_len); node->dtn_shmem_key = shmem->sh_key; node->dtn_child = NULL; /* may be reinitializing an existing node, in which case we shouldn't @@ -569,11 +571,11 @@ alloc_node(struct d_tm_shmem_hdr *shmem, struct d_tm_node_t **newnode, rc = -DER_NO_SHMEM; goto out; } - rc = init_node(shmem, node, name); + rc = init_node(shmem, conv_ptr(shmem, node), name); if (rc != 0) goto out; - node->dtn_metric = NULL; - node->dtn_sibling = NULL; + ((struct d_tm_node_t *)conv_ptr(shmem, node))->dtn_metric = NULL; + ((struct d_tm_node_t *)conv_ptr(shmem, node))->dtn_sibling = NULL; *newnode = node; out: @@ -624,10 +626,10 @@ add_child(struct d_tm_node_t **newnode, struct d_tm_node_t *parent, * 1) a previously-cleared link node that can be reused, or * 2) the right place to attach a newly allocated node. */ - child = parent->dtn_child; + child = conv_ptr(shmem, parent->dtn_child); while (child != NULL && !is_cleared_link(tm_shmem.ctx, child)) { sibling = child; - child = child->dtn_sibling; + child = conv_ptr(shmem, child->dtn_sibling); } if (is_cleared_link(tm_shmem.ctx, child)) { @@ -657,6 +659,7 @@ add_child(struct d_tm_node_t **newnode, struct d_tm_node_t *parent, else sibling->dtn_sibling = *newnode; + *newnode = conv_ptr(shmem, *newnode); return 0; failure: @@ -772,7 +775,7 @@ destroy_shmem_with_key(key_t key) int d_tm_init(int id, uint64_t mem_size, int flags) { - struct d_tm_shmem_hdr *new_shmem; + struct d_tm_shmem_hdr *new_shmem = NULL; key_t key; int shmid; char tmp[D_TM_MAX_NAME_LEN]; @@ -780,31 +783,48 @@ d_tm_init(int id, uint64_t mem_size, int flags) memset(&tm_shmem, 0, sizeof(tm_shmem)); - if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM)) != 0) { - D_ERROR("Invalid flags\n"); + if ((flags & ~(D_TM_SERIALIZATION | D_TM_RETAIN_SHMEM | + D_TM_RETAIN_SHMEM_IF_NON_EMPTY | D_TM_CLIENT_PROCESS | + D_TM_OPEN_OR_CREATE)) != 0) { + D_ERROR("Invalid flags 0x%x\n", flags); rc = -DER_INVAL; goto failure; } if (flags & D_TM_SERIALIZATION) { - tm_shmem.sync_access = true; + tm_shmem.sync_access = 1; D_INFO("Serialization enabled for id %d\n", id); } if (flags & D_TM_RETAIN_SHMEM) { - tm_shmem.retain = true; + tm_shmem.retain = 1; D_INFO("Retaining shared memory for id %d\n", id); } + if (flags & D_TM_RETAIN_SHMEM_IF_NON_EMPTY) { + tm_shmem.retain_non_empty = 1; + D_INFO("Retaining shared memory for id %d if not empty\n", id); + } + tm_shmem.id = id; snprintf(tmp, sizeof(tmp), "ID: %d", id); key = d_tm_get_srv_key(id); - rc = destroy_shmem_with_key(key); - if (rc != 0) - goto failure; - rc = create_shmem(tmp, key, mem_size, &shmid, &new_shmem); - if (rc != 0) - goto failure; + if (flags & D_TM_OPEN_OR_CREATE) { + rc = open_shmem(key, &new_shmem); + if (rc > 0) { + D_ASSERT(new_shmem != NULL); + shmid = rc; + } + } + + if (new_shmem == NULL) { + rc = destroy_shmem_with_key(key); + if (rc != 0) + goto failure; + rc = create_shmem(tmp, key, mem_size, &shmid, &new_shmem); + if (rc != 0) + goto failure; + } rc = alloc_ctx(&tm_shmem.ctx, new_shmem, shmid); if (rc != 0) @@ -837,14 +857,24 @@ d_tm_init(int id, uint64_t mem_size, int flags) void d_tm_fini(void) { - bool destroy_shmem = false; + bool destroy_shmem = true; if (tm_shmem.ctx == NULL) goto out; - if (!tm_shmem.retain) - destroy_shmem = true; + if (tm_shmem.retain) + destroy_shmem = false; + + if (tm_shmem.retain_non_empty) { + struct d_tm_node_t *root; + + root = d_tm_get_root(tm_shmem.ctx); + if (root->dtn_child != NULL) + destroy_shmem = false; + } + D_INFO("Delete share memory for id %u destory %s\n", tm_shmem.id, + destroy_shmem ? "yes":"no"); /* close with the option to destroy the shmem region if needed */ close_all_shmem(tm_shmem.ctx, destroy_shmem); d_tm_close(&tm_shmem.ctx); @@ -1452,9 +1482,9 @@ _reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node) return DER_SUCCESS; } -static void -reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, - char *path, int format, int opt_fields, FILE *stream) +void +d_tm_reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, FILE *stream) { char *name = NULL; @@ -1468,7 +1498,7 @@ reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, switch (node->dtn_type) { case D_TM_LINK: node = d_tm_follow_link(ctx, node); - reset_node(ctx, node, level, path, format, opt_fields, stream); + d_tm_reset_node(ctx, node, level, path, format, opt_fields, stream); break; case D_TM_DIRECTORY: case D_TM_COUNTER: @@ -1508,20 +1538,19 @@ reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, * Choose D_TM_CSV for comma separated values. * \param[in] opt_fields A bitmask. Set D_TM_INCLUDE_* as desired for * the optional output fields. - * \param[in] show_timestamp Set to true to print the timestamp the metric - * was read by the consumer. - * \param[in] stream Direct output to this stream (stdout, stderr) + * \param[in] iter_cb iterate callback. + * \param[in] cb_arg argument for iterate callback. */ void d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, int filter, char *path, int format, - int opt_fields, uint32_t ops, FILE *stream) + int opt_fields, d_tm_iter_cb_t iter_cb, void *cb_arg) { struct d_tm_shmem_hdr *shmem = NULL; char *fullpath = NULL; char *parent_name = NULL; - if ((node == NULL) || (stream == NULL)) + if (node == NULL) return; if (node->dtn_type == D_TM_LINK) { @@ -1534,14 +1563,8 @@ d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, if (shmem == NULL) return; - if (node->dtn_type & filter) { - if (ops & D_TM_ITER_READ) - d_tm_print_node(ctx, node, level, path, format, - opt_fields, stream); - if (ops & D_TM_ITER_RESET) - reset_node(ctx, node, level, path, format, - opt_fields, stream); - } + if (node->dtn_type & filter) + iter_cb(ctx, node, level, path, format, opt_fields, cb_arg); parent_name = conv_ptr(shmem, node->dtn_name); node = node->dtn_child; @@ -1557,7 +1580,7 @@ d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, D_ASPRINTF(fullpath, "%s/%s", path, parent_name); d_tm_iterate(ctx, node, level + 1, filter, fullpath, format, - opt_fields, ops, stream); + opt_fields, iter_cb, cb_arg); D_FREE(fullpath); node = node->dtn_sibling; node = conv_ptr(shmem, node); @@ -2106,6 +2129,115 @@ is_initialized(void) tm_shmem.ctx->shmem_root != NULL; } +static struct d_tm_node_t* +delete_child(struct d_tm_context *ctx, struct d_tm_node_t *parent, const char *name) +{ + struct d_tm_shmem_hdr *shmem; + struct d_tm_node_t *child = NULL; + struct d_tm_node_t *pre_sibling = NULL; + char *client_name; + + if (parent == NULL) + return NULL; + + if (parent->dtn_type == D_TM_LINK) { + parent = d_tm_follow_link(ctx, parent); + if (parent == NULL) + return NULL; + } + + shmem = get_shmem_for_key(ctx, parent->dtn_shmem_key); + if (shmem == NULL) + return NULL; + + if (parent->dtn_child == NULL) + return NULL; + + child = conv_ptr(shmem, parent->dtn_child); + if (child == NULL) + return NULL; + + client_name = conv_ptr(shmem, child->dtn_name); + + /* + * cleared links don't have names but we still want to traverse + * their siblings + */ + while ((child != NULL) && (client_name == NULL || + strncmp(client_name, name, D_TM_MAX_NAME_LEN) != 0)) { + pre_sibling = child; + child = conv_ptr(shmem, child->dtn_sibling); + client_name = NULL; + if (child == NULL) + break; + client_name = conv_ptr(shmem, child->dtn_name); + } + + if (child == NULL) + return NULL; + + if (pre_sibling != NULL) + pre_sibling->dtn_sibling = child->dtn_sibling; + else + parent->dtn_child = NULL; + + return child; +} + +/* + * Get a pointer to the last token in the path without modifying the original + * string. + */ +static const char * +get_last_token(const char *path) +{ + const char *substr = path; + const char *ch; + bool next_token = false; + + for (ch = path; *ch != '\0'; ch++) { + if (*ch == '/') { + next_token = true; + } else if (next_token) { + substr = ch; + next_token = false; + } + } + + return substr; +} + +static int +delete_metric(struct d_tm_context *ctx, char *path) +{ + struct d_tm_node_t *parent_node; + struct d_tm_node_t *temp; + char *token; + char *rest; + char *last_name; + int rc = 0; + + rest = path; + parent_node = d_tm_get_root(ctx); + temp = parent_node; + token = strtok_r(rest, "/", &rest); + last_name = (char *)get_last_token(path); + while (token != NULL && token != last_name) { + parent_node = temp; + temp = find_child(ctx, parent_node, token); + if (temp == NULL) + D_GOTO(out, rc = -DER_NONEXIST); + + token = strtok_r(rest, "/", &rest); + } + + temp = delete_child(ctx, parent_node, last_name); + if (temp == NULL) + D_GOTO(out, rc = -DER_NONEXIST); +out: + return rc; +} + static int add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, char *desc, char *units, char *path) @@ -2114,6 +2246,7 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, struct d_tm_node_t *parent_node; struct d_tm_node_t *temp = NULL; struct d_tm_shmem_hdr *shmem; + struct d_tm_metric_t *metric; char *token; char *rest; char *unit_string; @@ -2155,11 +2288,11 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, } } - temp->dtn_metric->dtm_stats = NULL; + metric = conv_ptr(shmem, temp->dtn_metric); + metric->dtm_stats = NULL; if (has_stats(temp)) { - temp->dtn_metric->dtm_stats = - shmalloc(shmem, sizeof(struct d_tm_stats_t)); - if (temp->dtn_metric->dtm_stats == NULL) { + metric->dtm_stats = shmalloc(shmem, sizeof(struct d_tm_stats_t)); + if (metric->dtm_stats == NULL) { rc = -DER_NO_SHMEM; goto out; } @@ -2176,14 +2309,14 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, if (buff_len > 0) { buff_len += 1; /** make room for the trailing null */ - temp->dtn_metric->dtm_desc = shmalloc(shmem, buff_len); - if (temp->dtn_metric->dtm_desc == NULL) { + metric->dtm_desc = shmalloc(shmem, buff_len); + if (metric->dtm_desc == NULL) { rc = -DER_NO_SHMEM; goto out; } - strncpy(temp->dtn_metric->dtm_desc, desc, buff_len); + strncpy(conv_ptr(shmem, metric->dtm_desc), desc, buff_len); } else { - temp->dtn_metric->dtm_desc = NULL; + metric->dtm_desc = NULL; } unit_string = units; @@ -2217,14 +2350,14 @@ add_metric(struct d_tm_context *ctx, struct d_tm_node_t **node, int metric_type, if (buff_len > 0) { buff_len += 1; /** make room for the trailing null */ - temp->dtn_metric->dtm_units = shmalloc(shmem, buff_len); - if (temp->dtn_metric->dtm_units == NULL) { + metric->dtm_units = shmalloc(shmem, buff_len); + if (metric->dtm_units == NULL) { rc = -DER_NO_SHMEM; goto out; } - strncpy(temp->dtn_metric->dtm_units, unit_string, buff_len); + strncpy(conv_ptr(shmem, metric->dtm_units), unit_string, buff_len); } else { - temp->dtn_metric->dtm_units = NULL; + metric->dtm_units = NULL; } temp->dtn_protect = false; @@ -2344,6 +2477,65 @@ int d_tm_add_metric(struct d_tm_node_t **node, int metric_type, char *desc, return rc; } +/** + * delete the node at the specified path + * + * \param[in] fmt Format specifier for the name and full path of + * the new metric followed by optional args to + * populate the string, printf style. + * \return DER_SUCCESS Success + * -DER_NO_SHMEM Out of shared memory + * -DER_NOMEM Out of global heap + * -DER_EXCEEDS_PATH_LEN node name exceeds + * path len or \a units + * exceeds length + * -DER_INVAL node is invalid or + * invalid units were + * specified for the metric + * type + * -DER_ADD_METRIC_FAILED Operation failed + * -DER_UNINIT API not initialized + */ +int d_tm_del_node(const char *fmt, ...) +{ + struct d_tm_node_t *tmp_node = NULL; + char path[D_TM_MAX_NAME_LEN] = {}; + int rc = 0; + va_list args; + + if (!is_initialized()) + return -DER_UNINIT; + + if (fmt == NULL) + return -DER_INVAL; + + va_start(args, fmt); + rc = parse_path_fmt(path, sizeof(path), fmt, args); + va_end(args); + if (rc != 0) + D_GOTO(unlock, rc); + + rc = d_tm_lock_shmem(); + if (rc != 0) { + D_ERROR("Failed to get mutex: " DF_RC "\n", DP_RC(rc)); + D_GOTO(unlock, rc); + } + + /* If delete node does not exist, just return. */ + tmp_node = d_tm_find_metric(tm_shmem.ctx, path); + if (tmp_node == NULL) + D_GOTO(unlock, rc = -DER_NONEXIST); + + rc = delete_metric(tm_shmem.ctx, path); + if (rc != 0) + D_GOTO(unlock, rc); + + D_DEBUG(DB_TRACE, "successfully deleted item: [%s]\n", path); +unlock: + d_tm_unlock_shmem(); + return rc; +} + static void invalidate_link_node(struct d_tm_node_t *node) { @@ -2413,29 +2605,6 @@ get_unique_shmem_key(const char *path, int id) return (key_t)d_hash_string_u32(salted, sizeof(salted)); } -/* - * Get a pointer to the last token in the path without modifying the original - * string. - */ -static const char * -get_last_token(const char *path) -{ - const char *substr = path; - const char *ch; - bool next_token = false; - - for (ch = path; *ch != '\0'; ch++) { - if (*ch == '/') { - next_token = true; - } else if (next_token) { - substr = ch; - next_token = false; - } - } - - return substr; -} - /** * Creates a directory in the metric tree at the path designated by fmt that * can be deleted later, with all its children. @@ -3669,7 +3838,7 @@ shmalloc(struct d_tm_shmem_hdr *shmem, int length) D_DEBUG(DB_TRACE, "Allocated %d bytes. Now %" PRIu64 " remain\n", length, shmem->sh_bytes_free); - memset(new_mem, 0, length); + memset(conv_ptr(shmem, new_mem), 0, length); return new_mem; } diff --git a/src/gurt/tests/test_gurt_telem_producer.c b/src/gurt/tests/test_gurt_telem_producer.c index bf3db9d19c95..76a9b7b27f25 100644 --- a/src/gurt/tests/test_gurt_telem_producer.c +++ b/src/gurt/tests/test_gurt_telem_producer.c @@ -1226,6 +1226,13 @@ test_verify_object_count(void **state) assert_int_equal(num, exp_total); } +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + static void test_print_metrics(void **state) { @@ -1239,14 +1246,14 @@ test_print_metrics(void **state) D_TM_DURATION | D_TM_GAUGE | D_TM_DIRECTORY); d_tm_iterate(cli_ctx, node, 0, filter, NULL, D_TM_STANDARD, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); d_tm_print_field_descriptors(D_TM_INCLUDE_TIMESTAMP | D_TM_INCLUDE_METADATA, stdout); filter &= ~D_TM_DIRECTORY; d_tm_iterate(cli_ctx, node, 0, filter, NULL, D_TM_CSV, - D_TM_INCLUDE_METADATA, D_TM_ITER_READ, stdout); + D_TM_INCLUDE_METADATA, iter_print, stdout); } static void diff --git a/src/include/daos/metric.h b/src/include/daos/metric.h new file mode 100644 index 000000000000..9417b52fdc93 --- /dev/null +++ b/src/include/daos/metric.h @@ -0,0 +1,19 @@ +/* + * (C) Copyright 2020-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +#ifndef __DAOS_METRIC_H__ +#define __DAOS_METRIC_H__ + +/** + * Called during library initialization to init metrics. + */ +int dc_tm_init(void); + +/** + * Called during library finalization to free metrics resources + */ +void dc_tm_fini(void); + +#endif /* __DAOS_TM_H__ */ diff --git a/src/include/daos/tls.h b/src/include/daos/tls.h new file mode 100644 index 000000000000..938cc4daf2c4 --- /dev/null +++ b/src/include/daos/tls.h @@ -0,0 +1,124 @@ +/** + * (C) Copyright 2016-2023 Intel Corporation. + * + * SPDX-License-Identifier: BSD-2-Clause-Patent + */ +/** + * This file is part of daos + * + * src/include/daos/tls.h + */ + +#ifndef __DAOS_TLS_H__ +#define __DAOS_TLS_H__ + +#include +#include + +/** + * Stackable Module API + * Provides a modular interface to load and register server-side code on + * demand. A module is composed of: + * - a set of request handlers which are registered when the module is loaded. + * - a server-side API (see header files suffixed by "_srv") used for + * inter-module direct calls. + * + * For now, all loaded modules are assumed to be trustful, but sandboxes can be + * implemented in the future. + */ +/* + * Thead-local storage + */ +struct daos_thread_local_storage { + uint32_t dtls_tag; + void **dtls_values; +}; + +enum daos_module_tag { + DAOS_SYS_TAG = 1 << 0, /** only run on system xstream */ + DAOS_TGT_TAG = 1 << 1, /** only run on target xstream */ + DAOS_RDB_TAG = 1 << 2, /** only run on rdb xstream */ + DAOS_OFF_TAG = 1 << 3, /** only run on offload/helper xstream */ + DAOS_CLI_TAG = 1 << 4, /** only run on client stack */ + DAOS_SERVER_TAG = 0xff, /** run on all xstream */ +}; + +/* The module key descriptor for each xstream */ +struct daos_module_key { + /* Indicate where the keys should be instantiated */ + enum daos_module_tag dmk_tags; + + /* The position inside the daos_module_keys */ + int dmk_index; + /* init keys for context */ + void *(*dmk_init)(int tags, int xs_id, int tgt_id); + + /* fini keys for context */ + void (*dmk_fini)(int tags, void *data); +}; + +#define DAOS_MODULE_KEYS_NR 10 +extern pthread_key_t dc_tls_key; /* client side TLS key */ +extern pthread_key_t dss_tls_key; /* server side TLS key */ +extern struct daos_module_key *daos_module_keys[DAOS_MODULE_KEYS_NR]; + +static inline struct daos_thread_local_storage * +dss_tls_get() +{ + return (struct daos_thread_local_storage *) + pthread_getspecific(dss_tls_key); +} + +static inline struct daos_thread_local_storage * +dc_tls_get() +{ + return (struct daos_thread_local_storage *) + pthread_getspecific(dc_tls_key); +} + +/* For now TLS is only enabled if metrics are enabled */ +#define METRIC_DUMP_ENV "DAOS_METRIC_DUMP_ENV" +#define DAOS_CLIENT_METRICS_ENV "DAOS_CLIENT_METRICS" +#define DAOS_CLIENT_METRICS_RETAIN_ENV "DAOS_CLIENT_METRICS_RETAIN" +extern bool client_metric; +extern bool client_metric_retain; + +/** + * Get value from context by the key + * + * Get value inside dtls by key. So each module will use this API to + * retrieve their own value in the thread context. + * + * \param[in] dtls the thread context. + * \param[in] key key used to retrieve the dtls_value. + * + * \retval the dtls_value retrieved by key. + */ +static inline void * +daos_module_key_get(struct daos_thread_local_storage *dtls, + struct daos_module_key *key) +{ + D_ASSERT(key->dmk_index >= 0); + D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); + D_ASSERT(daos_module_keys[key->dmk_index] == key); + D_ASSERT(dtls != NULL); + + return dtls->dtls_values[key->dmk_index]; +} + +#define dss_module_key_get daos_module_key_get +#define dss_register_key daos_register_key +#define dss_unregister_key daos_unregister_key +#define dss_module_info daos_module_info +#define dss_module_tag daos_module_tag +#define dss_module_key daos_module_key +#define dss_thread_local_storage daos_thread_local_storage + +void daos_register_key(struct daos_module_key *key); +void daos_unregister_key(struct daos_module_key *key); +struct daos_thread_local_storage * dc_tls_init(int tag, uint32_t pid); +void dc_tls_fini(void); +struct daos_thread_local_storage * dss_tls_init(int tag, int xs_id, int tgt_id); +void dss_tls_fini(struct daos_thread_local_storage *dtls); + +#endif /*__DAOS_TLS_H__*/ diff --git a/src/include/daos_srv/daos_engine.h b/src/include/daos_srv/daos_engine.h index be491483fbcf..db7418a21e95 100644 --- a/src/include/daos_srv/daos_engine.h +++ b/src/include/daos_srv/daos_engine.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -54,84 +55,6 @@ extern unsigned int dss_instance_idx; /** Bypass for the nvme health check */ extern bool dss_nvme_bypass_health_check; -/** - * Stackable Module API - * Provides a modular interface to load and register server-side code on - * demand. A module is composed of: - * - a set of request handlers which are registered when the module is loaded. - * - a server-side API (see header files suffixed by "_srv") used for - * inter-module direct calls. - * - * For now, all loaded modules are assumed to be trustful, but sandboxes can be - * implemented in the future. - */ -/* - * Thead-local storage - */ -struct dss_thread_local_storage { - uint32_t dtls_tag; - void **dtls_values; -}; - -enum dss_module_tag { - DAOS_SYS_TAG = 1 << 0, /** only run on system xstream */ - DAOS_TGT_TAG = 1 << 1, /** only run on target xstream */ - DAOS_RDB_TAG = 1 << 2, /** only run on rdb xstream */ - DAOS_OFF_TAG = 1 << 3, /** only run on offload/helper xstream */ - DAOS_SERVER_TAG = 0xff, /** run on all xstream */ -}; - -/* The module key descriptor for each xstream */ -struct dss_module_key { - /* Indicate where the keys should be instantiated */ - enum dss_module_tag dmk_tags; - - /* The position inside the dss_module_keys */ - int dmk_index; - /* init keys for context */ - void *(*dmk_init)(int tags, int xs_id, int tgt_id); - - /* fini keys for context */ - void (*dmk_fini)(int tags, void *data); -}; - -extern pthread_key_t dss_tls_key; -extern struct dss_module_key *dss_module_keys[]; -#define DAOS_MODULE_KEYS_NR 10 - -static inline struct dss_thread_local_storage * -dss_tls_get() -{ - return (struct dss_thread_local_storage *) - pthread_getspecific(dss_tls_key); -} - -/** - * Get value from context by the key - * - * Get value inside dtls by key. So each module will use this API to - * retrieve their own value in the thread context. - * - * \param[in] dtls the thread context. - * \param[in] key key used to retrieve the dtls_value. - * - * \retval the dtls_value retrieved by key. - */ -static inline void * -dss_module_key_get(struct dss_thread_local_storage *dtls, - struct dss_module_key *key) -{ - D_ASSERT(key->dmk_index >= 0); - D_ASSERT(key->dmk_index < DAOS_MODULE_KEYS_NR); - D_ASSERT(dss_module_keys[key->dmk_index] == key); - D_ASSERT(dtls != NULL); - - return dtls->dtls_values[key->dmk_index]; -} - -void dss_register_key(struct dss_module_key *key); -void dss_unregister_key(struct dss_module_key *key); - /** pthread names are limited to 16 chars */ #define DSS_XS_NAME_LEN (32) @@ -172,7 +95,7 @@ static inline struct dss_module_info * dss_get_module_info(void) { struct dss_module_info *dmi; - struct dss_thread_local_storage *dtc; + struct daos_thread_local_storage *dtc; dtc = dss_tls_get(); dmi = (struct dss_module_info *) diff --git a/src/include/gurt/telemetry_common.h b/src/include/gurt/telemetry_common.h index 983ec2553f23..6471bc25ae6b 100644 --- a/src/include/gurt/telemetry_common.h +++ b/src/include/gurt/telemetry_common.h @@ -155,6 +155,9 @@ enum { D_TM_SERVER_PROCESS = 0x000, D_TM_SERIALIZATION = 0x001, D_TM_RETAIN_SHMEM = 0x002, + D_TM_RETAIN_SHMEM_IF_NON_EMPTY = 0x004, + D_TM_CLIENT_PROCESS = 0x008, + D_TM_OPEN_OR_CREATE = 0x010, }; /** Output formats */ @@ -176,6 +179,7 @@ enum { D_TM_ITER_RESET = 0x002, }; +#define DC_TM_JOB_ROOT_ID 256 /** * @brief Statistics for gauge and duration metrics * diff --git a/src/include/gurt/telemetry_consumer.h b/src/include/gurt/telemetry_consumer.h index f0b1d706be71..9b8de3d70fae 100644 --- a/src/include/gurt/telemetry_consumer.h +++ b/src/include/gurt/telemetry_consumer.h @@ -49,12 +49,21 @@ int d_tm_list(struct d_tm_context *ctx, struct d_tm_nodeList_t **head, int d_tm_list_subdirs(struct d_tm_context *ctx, struct d_tm_nodeList_t **head, struct d_tm_node_t *node, uint64_t *node_count, int max_depth); + +typedef void (*d_tm_iter_cb_t)(struct d_tm_context *ctx, struct d_tm_node_t *node, + int level, char *path, int format, int opt_fields, + void *cb_arg); + void d_tm_iterate(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, int filter, char *path, int format, - int opt_fields, uint32_t ops, FILE *stream); + int opt_fields, d_tm_iter_cb_t iter_cb, void *cb_arg); void d_tm_print_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, char *name, int format, int opt_fields, FILE *stream); + +void d_tm_reset_node(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, FILE *stream); + void d_tm_print_field_descriptors(int opt_fields, FILE *stream); void d_tm_print_counter(uint64_t val, char *name, int format, char *units, int opt_fields, FILE *stream); diff --git a/src/include/gurt/telemetry_producer.h b/src/include/gurt/telemetry_producer.h index 5cd323637d47..fea596054acb 100644 --- a/src/include/gurt/telemetry_producer.h +++ b/src/include/gurt/telemetry_producer.h @@ -29,5 +29,6 @@ int d_tm_add_metric(struct d_tm_node_t **node, int metric_type, char *desc, int d_tm_add_ephemeral_dir(struct d_tm_node_t **node, size_t size_bytes, const char *fmt, ...); int d_tm_del_ephemeral_dir(const char *fmt, ...); +int d_tm_del_node(const char *fmt, ...); void d_tm_fini(void); #endif /* __TELEMETRY_PRODUCER_H__ */ diff --git a/src/object/cli_mod.c b/src/object/cli_mod.c index 79c13fee9489..b56e6e7c7e1a 100644 --- a/src/object/cli_mod.c +++ b/src/object/cli_mod.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2016-2022 Intel Corporation. + * (C) Copyright 2016-2023 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -12,6 +12,9 @@ #include #include #include +#include +#include +#include #include #include "obj_rpc.h" #include "obj_internal.h" @@ -19,14 +22,96 @@ unsigned int srv_io_mode = DIM_DTX_FULL_ENABLED; int dc_obj_proto_version; +static void* +dc_obj_tls_init(int tags, int xs_id, int pid) +{ + struct dc_obj_tls *tls; + int opc; + int rc; + + D_ALLOC_PTR(tls); + if (tls == NULL) + return NULL; + + /** register different per-opcode sensors */ + for (opc = 0; opc < OBJ_PROTO_CLI_COUNT; opc++) { + /** Start with number of active requests, of type gauge */ + rc = d_tm_add_metric(&tls->cot_op_active[opc], D_TM_STATS_GAUGE, + "number of active object RPCs", "ops", + "ops/%s/active", obj_opc_to_str(opc)); + if (rc) { + D_WARN("Failed to create active counter: "DF_RC"\n", DP_RC(rc)); + D_GOTO(out, rc); + } + + if (opc == DAOS_OBJ_RPC_UPDATE || + opc == DAOS_OBJ_RPC_TGT_UPDATE || + opc == DAOS_OBJ_RPC_FETCH) + /** See below, latency reported per size for those */ + continue; + + /** And finally the per-opcode latency, of type gauge */ + rc = d_tm_add_metric(&tls->cot_op_lat[opc], D_TM_STATS_GAUGE, + "object RPC processing time", "us", + "ops/%s/latency", obj_opc_to_str(opc)); + if (rc) { + D_WARN("Failed to create latency sensor: "DF_RC"\n", DP_RC(rc)); + D_GOTO(out, rc); + } + } + + /** + * Maintain per-I/O size latency for update & fetch RPCs + * of type gauge + */ + rc = obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, pid, tls->cot_update_lat, + obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), + "update RPC processing time", false); + if (rc) + D_GOTO(out, rc); + + rc = obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, pid, tls->cot_fetch_lat, + obj_opc_to_str(DAOS_OBJ_RPC_FETCH), + "fetch RPC processing time", false); + if (rc) + D_GOTO(out, rc); + +out: + if (rc) { + D_FREE(tls); + tls = NULL; + } + + return tls; +} + +static void +dc_obj_tls_fini(int tags, void *data) +{ + struct dc_obj_tls *tls = data; + + D_FREE(tls); +} + +struct daos_module_key dc_obj_module_key = { + .dmk_tags = DAOS_CLI_TAG, + .dmk_index = -1, + .dmk_init = dc_obj_tls_init, + .dmk_fini = dc_obj_tls_fini, +}; + /** * Initialize object interface */ int dc_obj_init(void) { - uint32_t ver_array[2] = {DAOS_OBJ_VERSION - 1, DAOS_OBJ_VERSION}; - int rc; + uint32_t ver_array[2] = {DAOS_OBJ_VERSION - 1, DAOS_OBJ_VERSION}; + int rc; + + d_getenv_bool(DAOS_CLIENT_METRICS_ENV, &client_metric); + if (client_metric) + daos_register_key(&dc_obj_module_key); rc = obj_utils_init(); if (rc) @@ -78,6 +163,7 @@ dc_obj_init(void) out_utils: if (rc) obj_utils_fini(); + return rc; } @@ -94,4 +180,6 @@ dc_obj_fini(void) obj_ec_codec_fini(); obj_class_fini(); obj_utils_fini(); + if (client_metric) + daos_unregister_key(&dc_obj_module_key); } diff --git a/src/object/cli_shard.c b/src/object/cli_shard.c index 2dd9ef9ac398..99cab8c041a8 100644 --- a/src/object/cli_shard.c +++ b/src/object/cli_shard.c @@ -14,6 +14,8 @@ #include #include #include +#include +#include #include "cli_csum.h" #include "obj_rpc.h" #include "obj_internal.h" @@ -105,6 +107,7 @@ struct rw_cb_args { daos_iom_t *maps; crt_endpoint_t tgt_ep; struct shard_rw_args *shard_args; + uint64_t send_time; }; static d_iov_t * @@ -640,6 +643,90 @@ dc_shard_update_size(struct rw_cb_args *rw_args, int fetch_rc) return rc; } +daos_size_t +obj_get_fetch_size(struct rw_cb_args *arg) +{ + struct obj_rw_v10_out *orwo; + daos_size_t size = 0; + + orwo = crt_reply_get(arg->rpc); + + if (orwo->orw_sgls.ca_count > 0) { + /* inline transfer */ + size = daos_sgls_packed_size(orwo->orw_sgls.ca_arrays, + orwo->orw_sgls.ca_count, NULL); + } else if (arg->rwaa_sgls != NULL) { + /* bulk transfer */ + daos_size_t *replied_sizes = orwo->orw_data_sizes.ca_arrays; + int i; + + for (i = 0; i < orwo->orw_data_sizes.ca_count; i++) + size += replied_sizes[i]; + } + + return size; +} + +static void +obj_shard_update_metrics_begin(crt_rpc_t *rpc) +{ + struct dc_obj_tls *tls = dc_obj_tls_get(); + int opc; + + if (tls == NULL || !client_metric) + return; + + opc = opc_get(rpc->cr_opc); + d_tm_inc_gauge(tls->cot_op_active[opc], 1); +} + +static void +obj_shard_update_metrics_end(crt_rpc_t *rpc, uint64_t send_time, void *arg, int ret) +{ + struct dc_obj_tls *tls = dc_obj_tls_get(); + struct rw_cb_args *rw_args; + struct obj_rw_in *orw; + struct d_tm_node_t *lat = NULL; + daos_size_t size; + uint64_t time; + int opc; + + if (tls == NULL || !client_metric) + return; + + opc = opc_get(rpc->cr_opc); + orw = crt_req_get(rpc); + d_tm_dec_gauge(tls->cot_op_active[opc], 1); + + if (ret != 0) + return; + /** + * Measure latency of successful I/O only. + * Use bit shift for performance and tolerate some inaccuracy. + */ + time = daos_get_ntime() - send_time; + time >>= 10; + + switch (opc) { + case DAOS_OBJ_RPC_UPDATE: + rw_args = arg; + size = daos_sgls_packed_size(rw_args->rwaa_sgls, orw->orw_nr, NULL); + lat = tls->cot_update_lat[lat_bucket(size)]; + break; + case DAOS_OBJ_RPC_FETCH: + rw_args = arg; + size = obj_get_fetch_size(rw_args); + lat = tls->cot_fetch_lat[lat_bucket(size)]; + break; + default: + lat = tls->cot_op_lat[opc]; + break; + } + + if (lat != NULL) + d_tm_set_gauge(lat, time); +} + static int dc_rw_cb(tse_task_t *task, void *arg) { @@ -956,10 +1043,15 @@ dc_rw_cb(tse_task_t *task, void *arg) out: if (rc == -DER_CSUM && opc == DAOS_OBJ_RPC_FETCH) dc_shard_csum_report(task, &rw_args->tgt_ep, rw_args->rpc); + + obj_shard_update_metrics_end(rw_args->rpc, rw_args->send_time, rw_args, + ret == 0 ? rc : ret); + crt_req_decref(rw_args->rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; + return ret; } @@ -1129,7 +1221,9 @@ dc_obj_shard_rw(struct dc_obj_shard *shard, enum obj_rpc_opc opc, rw_args.co = shard->do_co; rw_args.shard_args = args; /* remember the sgl to copyout the data inline for fetch */ - rw_args.rwaa_sgls = (opc == DAOS_OBJ_RPC_FETCH) ? sgls : NULL; + rw_args.rwaa_sgls = sgls; + rw_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); if (args->reasb_req && args->reasb_req->orr_recov) { rw_args.maps = NULL; orw->orw_flags |= ORF_EC_RECOV; @@ -1189,6 +1283,7 @@ struct obj_punch_cb_args { crt_rpc_t *rpc; unsigned int *map_ver; struct shard_punch_args *shard_args; + uint64_t send_time; }; static int @@ -1217,7 +1312,11 @@ obj_shard_punch_cb(tse_task_t *task, void *data) } } + obj_shard_update_metrics_end(cb_args->rpc, cb_args->send_time, cb_args, + task->dt_result); + crt_req_decref(rpc); + return task->dt_result; } @@ -1262,6 +1361,8 @@ dc_obj_shard_punch(struct dc_obj_shard *shard, enum obj_rpc_opc opc, cb_args.rpc = req; cb_args.map_ver = &args->pa_auxi.map_ver; cb_args.shard_args = args; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_punch_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -1324,6 +1425,7 @@ struct obj_enum_args { struct dtx_epoch *epoch; daos_handle_t *th; uint64_t *enqueue_id; + uint64_t send_time; uint32_t *max_delay; }; @@ -1652,10 +1754,15 @@ dc_enumerate_cb(tse_task_t *task, void *arg) crt_bulk_free(oei->oei_bulk); if (oei->oei_kds_bulk != NULL) crt_bulk_free(oei->oei_kds_bulk); + + obj_shard_update_metrics_end(enum_args->rpc, enum_args->send_time, + enum_args, ret == 0 ? rc : ret); + crt_req_decref(enum_args->rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; + return ret; } @@ -1805,6 +1912,8 @@ dc_obj_shard_list(struct dc_obj_shard *obj_shard, enum obj_rpc_opc opc, enum_args.th = &obj_args->th; enum_args.enqueue_id = &args->la_auxi.enqueue_id; enum_args.max_delay = &args->la_auxi.obj_auxi->max_delay; + enum_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, dc_enumerate_cb, &enum_args, sizeof(enum_args)); if (rc != 0) @@ -1838,6 +1947,7 @@ struct obj_query_key_cb_args { daos_handle_t th; uint32_t *max_delay; uint64_t *queue_id; + uint64_t send_time; }; static void @@ -2048,6 +2158,7 @@ obj_shard_query_key_cb(tse_task_t *task, void *data) D_SPIN_UNLOCK(&cb_args->obj->cob_spin); out: + obj_shard_update_metrics_end(rpc, cb_args->send_time, cb_args, ret == 0 ? rc : ret); crt_req_decref(rpc); if (ret == 0 || obj_retry_error(rc)) ret = rc; @@ -2101,6 +2212,8 @@ dc_obj_shard_query_key(struct dc_obj_shard *shard, struct dtx_epoch *epoch, uint cb_args.max_epoch = max_epoch; cb_args.queue_id = queue_id; cb_args.max_delay = max_delay; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_query_key_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -2147,6 +2260,7 @@ struct obj_shard_sync_cb_args { uint32_t *map_ver; uint32_t *max_delay; uint64_t *enqueue_id; + uint64_t send_time; }; static int @@ -2202,6 +2316,8 @@ obj_shard_sync_cb(tse_task_t *task, void *data) oso->oso_epoch, oso->oso_map_version); out: + obj_shard_update_metrics_end(rpc, cb_args->send_time, cb_args, rc); + crt_req_decref(rpc); return rc; } @@ -2248,7 +2364,8 @@ dc_obj_shard_sync(struct dc_obj_shard *shard, enum obj_rpc_opc opc, cb_args.map_ver = &args->sa_auxi.map_ver; cb_args.max_delay = &args->sa_auxi.obj_auxi->max_delay; cb_args.enqueue_id = &args->sa_auxi.enqueue_id; - + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, obj_shard_sync_cb, &cb_args, sizeof(cb_args)); if (rc != 0) @@ -2284,8 +2401,9 @@ struct obj_k2a_args { struct dtx_epoch *epoch; daos_handle_t *th; daos_anchor_t *anchor; - uint32_t shard; uint64_t *enqueue_id; + uint64_t send_time; + uint32_t shard; uint32_t *max_delay; }; @@ -2353,6 +2471,8 @@ dc_k2a_cb(tse_task_t *task, void *arg) enum_anchor_copy(k2a_args->anchor, &oko->oko_anchor); dc_obj_shard2anchor(k2a_args->anchor, k2a_args->shard); out: + obj_shard_update_metrics_end(k2a_args->rpc, k2a_args->send_time, k2a_args, + ret == 0 ? rc : ret); if (k2a_args->eaa_obj != NULL) obj_shard_decref(k2a_args->eaa_obj); crt_req_decref(k2a_args->rpc); @@ -2429,6 +2549,8 @@ dc_obj_shard_key2anchor(struct dc_obj_shard *obj_shard, enum obj_rpc_opc opc, cb_args.shard = obj_shard->do_shard_idx; cb_args.enqueue_id = &args->ka_auxi.enqueue_id; cb_args.max_delay = &args->ka_auxi.obj_auxi->max_delay; + cb_args.send_time = daos_get_ntime(); + obj_shard_update_metrics_begin(req); rc = tse_task_register_comp_cb(task, dc_k2a_cb, &cb_args, sizeof(cb_args)); if (rc != 0) D_GOTO(out_eaa, rc); diff --git a/src/object/obj_internal.h b/src/object/obj_internal.h index 8a2b12fff55a..f09d437e249d 100644 --- a/src/object/obj_internal.h +++ b/src/object/obj_internal.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "obj_rpc.h" #include "obj_ec.h" @@ -539,6 +540,59 @@ struct dc_obj_verify_args { struct dc_obj_verify_cursor cursor; }; +/* + * Report latency on a per-I/O size. + * Buckets starts at [0; 256B[ and are increased by power of 2 + * (i.e. [256B; 512B[, [512B; 1KB[) up to [4MB; infinity[ + * Since 4MB = 2^22 and 256B = 2^8, this means + * (22 - 8 + 1) = 15 buckets plus the 4MB+ bucket, so + * 16 buckets in total. + */ +#define NR_LATENCY_BUCKETS 16 + +struct dc_obj_tls { + /** Measure update/fetch latency based on I/O size (type = gauge) */ + struct d_tm_node_t *cot_update_lat[NR_LATENCY_BUCKETS]; + struct d_tm_node_t *cot_fetch_lat[NR_LATENCY_BUCKETS]; + + /** Measure per-operation latency in us (type = gauge) */ + struct d_tm_node_t *cot_op_lat[OBJ_PROTO_CLI_COUNT]; + /** Count number of per-opcode active requests (type = gauge) */ + struct d_tm_node_t *cot_op_active[OBJ_PROTO_CLI_COUNT]; +}; + +int +obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, + char *op, char *desc, bool server); +extern struct daos_module_key dc_obj_module_key; + +static inline struct dc_obj_tls * +dc_obj_tls_get() +{ + if (dc_tls_get() == NULL) + return NULL; + + return daos_module_key_get(dc_tls_get(), &dc_obj_module_key); +} + +static inline unsigned int +lat_bucket(uint64_t size) +{ + int nr; + + if (size <= 256) + return 0; + + /** return number of leading zero-bits */ + nr = __builtin_clzl(size - 1); + + /** >4MB, return last bucket */ + if (nr < 42) + return NR_LATENCY_BUCKETS - 1; + + return 56 - nr; +} + static inline int dc_cont2uuid(struct dc_cont *dc_cont, uuid_t *hdl_uuid, uuid_t *uuid) { diff --git a/src/object/obj_utils.c b/src/object/obj_utils.c index 8312c6719d89..5eba8c80e7ab 100644 --- a/src/object/obj_utils.c +++ b/src/object/obj_utils.c @@ -1,5 +1,5 @@ /** - * (C) Copyright 2018-2022 Intel Corporation. + * (C) Copyright 2018-2023 Intel Corporation. * * SPDX-License-Identifier: BSD-2-Clause-Patent */ @@ -10,6 +10,9 @@ #define DDSUBSYS DDFAC(object) #include +#include +#include +#include #include "obj_internal.h" static daos_size_t @@ -86,6 +89,57 @@ daos_iods_free(daos_iod_t *iods, int nr, bool need_free) D_FREE(iods); } +int +obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, char *op, + char *desc, bool server) +{ + unsigned int bucket_max = 256; + int i; + int rc = 0; + + for (i = 0; i < NR_LATENCY_BUCKETS; i++) { + char *path; + + if (server) { + if (bucket_max < 1024) /** B */ + D_ASPRINTF(path, "io/latency/%s/%uB/tgt_%u", + op, bucket_max, tgt_id); + else if (bucket_max < 1024 * 1024) /** KB */ + D_ASPRINTF(path, "io/latency/%s/%uKB/tgt_%u", + op, bucket_max / 1024, tgt_id); + else if (bucket_max <= 1024 * 1024 * 4) /** MB */ + D_ASPRINTF(path, "io/latency/%s/%uMB/tgt_%u", + op, bucket_max / (1024 * 1024), tgt_id); + else /** >4MB */ + D_ASPRINTF(path, "io/latency/%s/GT4MB/tgt_%u", + op, tgt_id); + } else { + if (bucket_max < 1024) /** B */ + D_ASPRINTF(path, "io/latency/%s/%uB", + op, bucket_max); + else if (bucket_max < 1024 * 1024) /** KB */ + D_ASPRINTF(path, "io/latency/%s/%uKB", + op, bucket_max / 1024); + else if (bucket_max <= 1024 * 1024 * 4) /** MB */ + D_ASPRINTF(path, "io/latency/%s/%uMB", + op, bucket_max / (1024 * 1024)); + else /** >4MB */ + D_ASPRINTF(path, "io/latency/%s/GT4MB", op); + + } + rc = d_tm_add_metric(&tm[i], D_TM_STATS_GAUGE, desc, "us", path); + if (rc) + D_WARN("Failed to create per-I/O size latency " + "sensor: "DF_RC"\n", DP_RC(rc)); + D_FREE(path); + + bucket_max <<= 1; + } + + return rc; +} + + struct recx_rec { daos_recx_t *rr_recx; }; diff --git a/src/object/srv_internal.h b/src/object/srv_internal.h index 4452e0404861..4ac391678b24 100644 --- a/src/object/srv_internal.h +++ b/src/object/srv_internal.h @@ -107,16 +107,6 @@ struct migrate_pool_tls { void migrate_pool_tls_destroy(struct migrate_pool_tls *tls); -/* - * Report latency on a per-I/O size. - * Buckets starts at [0; 256B[ and are increased by power of 2 - * (i.e. [256B; 512B[, [512B; 1KB[) up to [4MB; infinity[ - * Since 4MB = 2^22 and 256B = 2^8, this means - * (22 - 8 + 1) = 15 buckets plus the 4MB+ bucket, so - * 16 buckets in total. - */ -#define NR_LATENCY_BUCKETS 16 - struct obj_pool_metrics { /** Count number of total per-opcode requests (type = counter) */ struct d_tm_node_t *opm_total[OBJ_PROTO_CLI_COUNT]; @@ -168,24 +158,6 @@ obj_tls_get() return dss_module_key_get(dss_tls_get(), &obj_module_key); } -static inline unsigned int -lat_bucket(uint64_t size) -{ - int nr; - - if (size <= 256) - return 0; - - /** return number of leading zero-bits */ - nr = __builtin_clzl(size - 1); - - /** >4MB, return last bucket */ - if (nr < 42) - return NR_LATENCY_BUCKETS - 1; - - return 56 - nr; -} - enum latency_type { BULK_LATENCY, BIO_LATENCY, diff --git a/src/object/srv_mod.c b/src/object/srv_mod.c index 72a25ba97de1..de3436513e60 100644 --- a/src/object/srv_mod.c +++ b/src/object/srv_mod.c @@ -77,41 +77,6 @@ static struct daos_rpc_handler obj_handlers_v10[] = { #undef X -static int -obj_latency_tm_init(uint32_t opc, int tgt_id, struct d_tm_node_t **tm, char *op, char *desc) -{ - unsigned int bucket_max = 256; - int i; - int rc = 0; - - for (i = 0; i < NR_LATENCY_BUCKETS; i++) { - char *path; - - if (bucket_max < 1024) /** B */ - D_ASPRINTF(path, "io/latency/%s/%uB/tgt_%u", - op, bucket_max, tgt_id); - else if (bucket_max < 1024 * 1024) /** KB */ - D_ASPRINTF(path, "io/latency/%s/%uKB/tgt_%u", - op, bucket_max / 1024, tgt_id); - else if (bucket_max <= 1024 * 1024 * 4) /** MB */ - D_ASPRINTF(path, "io/latency/%s/%uMB/tgt_%u", - op, bucket_max / (1024 * 1024), tgt_id); - else /** >4MB */ - D_ASPRINTF(path, "io/latency/%s/GT4MB/tgt_%u", - op, tgt_id); - - rc = d_tm_add_metric(&tm[i], D_TM_STATS_GAUGE, desc, "us", path); - if (rc) - D_WARN("Failed to create per-I/O size latency " - "sensor: "DF_RC"\n", DP_RC(rc)); - D_FREE(path); - - bucket_max <<= 1; - } - - return rc; -} - static void * obj_tls_init(int tags, int xs_id, int tgt_id) { @@ -162,27 +127,33 @@ obj_tls_init(int tags, int xs_id, int tgt_id) */ obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_lat, - obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), "update RPC processing time"); + obj_opc_to_str(DAOS_OBJ_RPC_UPDATE), "update RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_lat, - obj_opc_to_str(DAOS_OBJ_RPC_FETCH), "fetch RPC processing time"); + obj_opc_to_str(DAOS_OBJ_RPC_FETCH), "fetch RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_TGT_UPDATE, tgt_id, tls->ot_tgt_update_lat, obj_opc_to_str(DAOS_OBJ_RPC_TGT_UPDATE), - "update tgt RPC processing time"); + "update tgt RPC processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_bulk_lat, - "bulk_update", "Bulk update processing time"); + "bulk_update", "Bulk update processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_bulk_lat, - "bulk_fetch", "Bulk fetch processing time"); + "bulk_fetch", "Bulk fetch processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_vos_lat, - "vos_update", "VOS update processing time"); + "vos_update", "VOS update processing time", + true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_vos_lat, - "vos_fetch", "VOS fetch processing time"); + "vos_fetch", "VOS fetch processing time", true); obj_latency_tm_init(DAOS_OBJ_RPC_UPDATE, tgt_id, tls->ot_update_bio_lat, - "bio_update", "BIO update processing time"); + "bio_update", "BIO update processing time", true); obj_latency_tm_init(DAOS_OBJ_RPC_FETCH, tgt_id, tls->ot_fetch_bio_lat, - "bio_fetch", "BIO fetch processing time"); + "bio_fetch", "BIO fetch processing time", true); return tls; } diff --git a/src/utils/daos_metrics/daos_metrics.c b/src/utils/daos_metrics/daos_metrics.c index 8a8190d5203c..dade4c281b58 100644 --- a/src/utils/daos_metrics/daos_metrics.c +++ b/src/utils/daos_metrics/daos_metrics.c @@ -57,25 +57,113 @@ print_usage(const char *prog_name) prog_name); } -int -main(int argc, char **argv) +static int +process_metrics(int metric_id, char *dirname, int format, int filter, + int extra_descriptors, int delay, int num_iter, + d_tm_iter_cb_t iter_cb, void *arg) { struct d_tm_node_t *root = NULL; struct d_tm_node_t *node = NULL; struct d_tm_context *ctx = NULL; + int iteration = 0; + int rc = 0; + + ctx = d_tm_open(metric_id); + if (!ctx) + D_GOTO(out, rc = 0); + + root = d_tm_get_root(ctx); + if (!root) + D_GOTO(out, rc = -DER_NONEXIST); + + if (strncmp(dirname, "/", D_TM_MAX_NAME_LEN) != 0) { + node = d_tm_find_metric(ctx, dirname); + if (node != NULL) { + root = node; + } else { + printf("No metrics found at: '%s'\n", dirname); + D_GOTO(out, rc = 0); + } + } + + if (format == D_TM_CSV) + d_tm_print_field_descriptors(extra_descriptors, (FILE *)arg); + + while ((num_iter == 0) || (iteration < num_iter)) { + d_tm_iterate(ctx, root, 0, filter, NULL, format, extra_descriptors, + iter_cb, arg); + iteration++; + sleep(delay); + if (format == D_TM_STANDARD) + printf("\n\n"); + } + +out: + if (ctx != NULL) + d_tm_close(&ctx); + return rc; +} + +static void +iter_print(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_print_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +static void +iter_reset(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + d_tm_reset_node(ctx, node, level, path, format, opt_fields, (FILE *)arg); +} + +struct iter_pid_arg { + char *dirname; + d_tm_iter_cb_t iter_cb; + int filter; + int delay; + int num_iter; +}; + +static void +iter_per_pid(struct d_tm_context *ctx, struct d_tm_node_t *node, int level, + char *path, int format, int opt_fields, void *arg) +{ + char *name; + struct iter_pid_arg *pid_arg = arg; + int pid; + int rc; + + name = d_tm_get_name(ctx, node); + if (name == NULL) + return; + + pid = atoi(d_tm_get_name(ctx, node)); + rc = process_metrics(pid, pid_arg->dirname, format, pid_arg->filter, + opt_fields, pid_arg->delay, pid_arg->num_iter, + pid_arg->iter_cb, stdout); + if (rc != 0) + printf("Can not get pid %d metrics: %d\n", pid, rc); +} + +int +main(int argc, char **argv) +{ char dirname[D_TM_MAX_NAME_LEN] = {0}; + char jobid[D_TM_MAX_NAME_LEN] = {0}; bool show_meta = false; bool show_when_read = false; bool show_type = false; int srv_idx = 0; - int iteration = 0; int num_iter = 1; int filter = 0; int delay = 1; int format = D_TM_STANDARD; int opt; int extra_descriptors = 0; - uint32_t ops = 0; + d_tm_iter_cb_t iter_cb = NULL; + int rc; sprintf(dirname, "/"); @@ -97,11 +185,12 @@ main(int argc, char **argv) {"type", no_argument, NULL, 'T'}, {"read", no_argument, NULL, 'r'}, {"reset", no_argument, NULL, 'e'}, + {"jobid", required_argument, NULL, 'j'}, {"help", no_argument, NULL, 'h'}, {NULL, 0, NULL, 0} }; - opt = getopt_long_only(argc, argv, "S:cCdtsgi:p:D:MmTrhe", + opt = getopt_long_only(argc, argv, "S:cCdtsgi:p:D:MmTrj:he", long_options, NULL); if (opt == -1) break; @@ -150,7 +239,10 @@ main(int argc, char **argv) delay = atoi(optarg); break; case 'e': - ops |= D_TM_ITER_RESET; + iter_cb = iter_reset; + break; + case 'j': + snprintf(jobid, sizeof(jobid), "%s", optarg); break; case 'h': case '?': @@ -160,37 +252,13 @@ main(int argc, char **argv) } } - if (ops == 0) - ops |= D_TM_ITER_READ; + if (iter_cb == NULL) + iter_cb = iter_print; if (filter == 0) filter = D_TM_COUNTER | D_TM_DURATION | D_TM_TIMESTAMP | D_TM_MEMINFO | D_TM_TIMER_SNAPSHOT | D_TM_GAUGE | D_TM_STATS_GAUGE; - ctx = d_tm_open(srv_idx); - if (!ctx) - goto failure; - - root = d_tm_get_root(ctx); - if (!root) - goto failure; - - if (strncmp(dirname, "/", D_TM_MAX_NAME_LEN) != 0) { - node = d_tm_find_metric(ctx, dirname); - if (node != NULL) { - root = node; - } else { - printf("No metrics found at: '%s'\n", dirname); - exit(0); - } - } - - if (format == D_TM_CSV) - filter &= ~D_TM_DIRECTORY; - else - filter |= D_TM_DIRECTORY; - - if (show_when_read) extra_descriptors |= D_TM_INCLUDE_TIMESTAMP; if (show_meta) @@ -199,27 +267,50 @@ main(int argc, char **argv) extra_descriptors |= D_TM_INCLUDE_TYPE; if (format == D_TM_CSV) - d_tm_print_field_descriptors(extra_descriptors, stdout); + filter &= ~D_TM_DIRECTORY; + else + filter |= D_TM_DIRECTORY; - while ((num_iter == 0) || (iteration < num_iter)) { - d_tm_iterate(ctx, root, 0, filter, NULL, format, extra_descriptors, - ops, stdout); - iteration++; - sleep(delay); - if (format == D_TM_STANDARD) - printf("\n\n"); + if (strlen(jobid) > 0) { + struct d_tm_node_t *root; + struct d_tm_context *ctx; + struct iter_pid_arg iter_arg; + + /* fetch metrics from client side by jobid */ + ctx = d_tm_open(DC_TM_JOB_ROOT_ID); + if (!ctx) { + printf("Unable to find job %s\n", jobid); + exit(0); + } + + root = d_tm_find_metric(ctx, jobid); + if (root == NULL) { + d_tm_close(&ctx); + printf("Unable to find job %s\n", jobid); + exit(0); + } + + iter_arg.dirname = dirname; + iter_arg.filter = filter; + iter_arg.iter_cb = iter_cb; + iter_arg.delay = delay; + iter_arg.num_iter = num_iter; + d_tm_iterate(ctx, root, 0, D_TM_DIRECTORY, NULL, format, extra_descriptors, + iter_per_pid, &iter_arg); + + d_tm_close(&ctx); + + return 0; } - d_tm_close(&ctx); - return 0; - -failure: - printf("Unable to attach to the shared memory for the server index: %d" - "\nMake sure to run the I/O Engine with the same index to " - "initialize the shared memory and populate it with metrics.\n" - "Verify user/group settings match those that started the I/O " - "Engine.\n", - srv_idx); - d_tm_close(&ctx); - return -1; + /* fetch metrics from server side */ + rc = process_metrics(srv_idx, dirname, format, filter, extra_descriptors, + delay, num_iter, iter_cb, stdout); + if (rc) + printf("Unable to attach to the shared memory for the server index: %d" + "\nMake sure to run the I/O Engine with the same index to " + "initialize the shared memory and populate it with metrics.\n" + "Verify user/group settings match those that started the I/O " + "Engine.\n", srv_idx); + return rc != 0 ? -1 : 0; }