Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16330 cart: Reduce error logs during timeouts (#14905) #14915

Merged
merged 2 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 60 additions & 31 deletions src/cart/crt_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -1026,9 +1026,7 @@ crt_req_timeout_track(struct crt_rpc_priv *rpc_priv)
if (rc == 0) {
rpc_priv->crp_in_binheap = 1;
} else {
RPC_ERROR(rpc_priv,
"d_binheap_insert failed, rc: %d\n",
rc);
RPC_ERROR(rpc_priv, "d_binheap_insert failed, rc: %d\n", rc);
RPC_DECREF(rpc_priv);
}

Expand Down Expand Up @@ -1130,24 +1128,22 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
switch (rpc_priv->crp_state) {
case RPC_STATE_INITED:
case RPC_STATE_QUEUED:
RPC_ERROR(rpc_priv, "aborting %s rpc to group %s, tgt %d:%d, tgt_uri %s\n",
rpc_priv->crp_state == RPC_STATE_QUEUED ? "queued" : "inited",
grp_priv->gp_pub.cg_grpid, tgt_ep->ep_rank, tgt_ep->ep_tag,
rpc_priv->crp_tgt_uri);
RPC_INFO(rpc_priv, "aborting %s rpc to group %s, tgt %d:%d, tgt_uri %s\n",
rpc_priv->crp_state == RPC_STATE_QUEUED ? "queued" : "inited",
grp_priv->gp_pub.cg_grpid, tgt_ep->ep_rank, tgt_ep->ep_tag,
rpc_priv->crp_tgt_uri);
crt_context_req_untrack(rpc_priv);
crt_rpc_complete_and_unlock(rpc_priv, -DER_TIMEDOUT);
break;
case RPC_STATE_URI_LOOKUP:
ul_req = rpc_priv->crp_ul_req;
D_ASSERT(ul_req != NULL);
ul_in = crt_req_get(ul_req);
RPC_ERROR(rpc_priv,
"failed due to URI_LOOKUP(rpc_priv %p) to group %s,"
"rank %d through PSR %d timedout\n",
container_of(ul_req, struct crt_rpc_priv, crp_pub),
ul_in->ul_grp_id,
ul_in->ul_rank,
ul_req->cr_ep.ep_rank);
RPC_INFO(rpc_priv,
"failed due to URI_LOOKUP(rpc_priv %p) to group %s,"
"rank %d through PSR %d timedout\n",
container_of(ul_req, struct crt_rpc_priv, crp_pub), ul_in->ul_grp_id,
ul_in->ul_rank, ul_req->cr_ep.ep_rank);

if (crt_gdata.cg_use_sensors)
d_tm_inc_counter(crt_ctx->cc_timedout_uri, 1);
Expand All @@ -1162,24 +1158,24 @@ crt_req_timeout_hdlr(struct crt_rpc_priv *rpc_priv)
crt_rpc_unlock(rpc_priv);
break;
case RPC_STATE_FWD_UNREACH:
RPC_ERROR(rpc_priv,
"failed due to group %s, rank %d, tgt_uri %s can't reach the target\n",
grp_priv->gp_pub.cg_grpid,
tgt_ep->ep_rank,
rpc_priv->crp_tgt_uri);
RPC_INFO(rpc_priv,
"failed due to group %s, rank %d, tgt_uri %s can't reach the target\n",
grp_priv->gp_pub.cg_grpid, tgt_ep->ep_rank, rpc_priv->crp_tgt_uri);
crt_context_req_untrack(rpc_priv);
crt_rpc_complete_and_unlock(rpc_priv, -DER_UNREACH);
break;
case RPC_STATE_REQ_SENT:
/* At this point, RPC should always be completed by
* Mercury
*/
RPC_ERROR(rpc_priv, "aborting in-flight to group %s, rank %d, tgt_uri %s\n",
grp_priv->gp_pub.cg_grpid, tgt_ep->ep_rank, rpc_priv->crp_tgt_uri);
RPC_INFO(rpc_priv, "aborting in-flight to group %s, rank %d, tgt_uri %s\n",
grp_priv->gp_pub.cg_grpid, tgt_ep->ep_rank, rpc_priv->crp_tgt_uri);
rc = crt_hg_req_cancel(rpc_priv);
if (rc != 0) {
RPC_ERROR(rpc_priv, "crt_hg_req_cancel failed, rc: %d, "
"opc: %#x.\n", rc, rpc_priv->crp_pub.cr_opc);
RPC_WARN(rpc_priv,
"crt_hg_req_cancel failed, rc: %d, "
"opc: %#x.\n",
rc, rpc_priv->crp_pub.cr_opc);
crt_context_req_untrack(rpc_priv);
}
crt_rpc_unlock(rpc_priv);
Expand All @@ -1198,6 +1194,8 @@ crt_context_timeout_check(struct crt_context *crt_ctx)
struct d_binheap_node *bh_node;
d_list_t timeout_list;
uint64_t ts_now;
int err_to_print = 0;
int left_to_print = 0;

D_ASSERT(crt_ctx != NULL);

Expand Down Expand Up @@ -1225,17 +1223,48 @@ crt_context_timeout_check(struct crt_context *crt_ctx)
};
D_MUTEX_UNLOCK(&crt_ctx->cc_mutex);

/* Limit logging when many rpcs time-out at the same time */
d_list_for_each_entry(rpc_priv, &timeout_list, crp_tmp_link_timeout) {
left_to_print++;
}

/* TODO: might expose via env in future */
err_to_print = 1;

/* handle the timeout RPCs */
while ((rpc_priv =
d_list_pop_entry(&timeout_list, struct crt_rpc_priv, crp_tmp_link_timeout))) {
RPC_ERROR(rpc_priv,
"ctx_id %d, (status: %#x) timed out (%d seconds), "
"target (%d:%d)\n",
crt_ctx->cc_idx,
rpc_priv->crp_state,
rpc_priv->crp_timeout_sec,
rpc_priv->crp_pub.cr_ep.ep_rank,
rpc_priv->crp_pub.cr_ep.ep_tag);
/*
* This error is annoying and fills the logs. For now prevent bursts of timeouts
* happening at the same time.
*
* Ideally we would also limit to 1 error per target. Can keep track of it in per
* target cache used for hg_addrs.
*
* Extra lookup cost of cache entry would be ok as this is an error case
**/

if (err_to_print > 0) {
RPC_ERROR(rpc_priv,
"ctx_id %d, (status: %#x) timed out (%d seconds), "
"target (%d:%d)\n",
crt_ctx->cc_idx, rpc_priv->crp_state, rpc_priv->crp_timeout_sec,
rpc_priv->crp_pub.cr_ep.ep_rank, rpc_priv->crp_pub.cr_ep.ep_tag);
err_to_print--;
left_to_print--;

if (err_to_print == 0 && left_to_print > 0)
D_ERROR(" %d more rpcs timed out. rest logged at INFO level\n",
left_to_print);

} else {
RPC_INFO(rpc_priv,
"ctx_id %d, (status: %#x) timed out (%d seconds), "
"target (%d:%d)\n",
crt_ctx->cc_idx, rpc_priv->crp_state, rpc_priv->crp_timeout_sec,
rpc_priv->crp_pub.cr_ep.ep_rank, rpc_priv->crp_pub.cr_ep.ep_tag);
left_to_print--;
}

crt_req_timeout_hdlr(rpc_priv);
RPC_DECREF(rpc_priv);
Expand Down
8 changes: 4 additions & 4 deletions src/cart/crt_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -2368,6 +2368,8 @@ crt_grp_psr_reload(struct crt_grp_priv *grp_priv)
*
* Each time node is removed, an corresponding index is added back
* to the free index list
*
* Returns index on success or -DER_NOSPACE if all indices are used up
*/
static int
grp_get_free_index(struct crt_grp_priv *priv)
Expand All @@ -2377,11 +2379,9 @@ grp_get_free_index(struct crt_grp_priv *priv)

free_index = d_list_pop_entry(&priv->gp_membs.cgm_free_indices,
struct free_index, fi_link);

if (!free_index) {
D_DEBUG(DB_ALL, "No more free indices left\n");
/* Forces caller to expand the list */
if (!free_index)
return -DER_NOSPACE;
}

ret = free_index->fi_index;
D_FREE(free_index);
Expand Down
62 changes: 31 additions & 31 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,26 @@ prov_data_init(struct crt_prov_gdata *prov_data, crt_provider_t provider,
return DER_SUCCESS;
}

#define DUMP_GDATA_FIELD(format, x) D_INFO("\t%s = " format " \n", #x, crt_gdata.x)

static void
crt_gdata_dump(void)
{
D_INFO("settings:\n");
DUMP_GDATA_FIELD("%d", cg_post_init);
DUMP_GDATA_FIELD("%d", cg_post_incr);
DUMP_GDATA_FIELD("%d", cg_timeout);
DUMP_GDATA_FIELD("%d", cg_swim_crt_idx);
DUMP_GDATA_FIELD("%d", cg_credit_ep_ctx);
DUMP_GDATA_FIELD("%d", cg_iv_inline_limit);
DUMP_GDATA_FIELD("%d", cg_auto_swim_disable);
DUMP_GDATA_FIELD("%d", cg_server);
DUMP_GDATA_FIELD("%d", cg_use_sensors);
DUMP_GDATA_FIELD("%d", cg_provider_is_primary);
DUMP_GDATA_FIELD("0x%lx", cg_rpcid);
DUMP_GDATA_FIELD("%ld", cg_num_cores);
DUMP_GDATA_FIELD("%d", cg_rpc_quota);
}

/* first step init - for initializing crt_gdata */
static int data_init(int server, crt_init_options_t *opt)
Expand All @@ -228,10 +248,7 @@ static int data_init(int server, crt_init_options_t *opt)
uint32_t post_init = CRT_HG_POST_INIT, post_incr = CRT_HG_POST_INCR;
int rc = 0;

D_DEBUG(DB_ALL, "initializing crt_gdata...\n");
crt_env_dump();
D_DEBUG(DB_ALL, "Starting RPCID %#lx. Num cores: %ld\n",
crt_gdata.cg_rpcid, crt_gdata.cg_num_cores);

/* Set context post init / post incr to tune number of pre-posted recvs */
crt_env_get(D_POST_INIT, &post_init);
Expand Down Expand Up @@ -264,13 +281,8 @@ static int data_init(int server, crt_init_options_t *opt)
else
crt_gdata.cg_timeout = timeout;

D_DEBUG(DB_ALL, "set the global timeout value as %d second.\n",
crt_gdata.cg_timeout);

crt_gdata.cg_swim_crt_idx = CRT_DEFAULT_PROGRESS_CTX_IDX;

D_DEBUG(DB_ALL, "SWIM context idx=%d\n", crt_gdata.cg_swim_crt_idx);

/* Override defaults and environment if option is set */
if (opt && opt->cio_use_credits) {
credits = opt->cio_ep_credits;
Expand All @@ -290,23 +302,12 @@ static int data_init(int server, crt_init_options_t *opt)
/* This is a workaround for CART-871 if universe size is not set */
crt_env_get(FI_UNIVERSE_SIZE, &fi_univ_size);
if (fi_univ_size == 0) {
D_INFO("FI_UNIVERSE_SIZE was not set; setting to 2048\n");
d_setenv("FI_UNIVERSE_SIZE", "2048", 1);
}

if (credits == 0) {
D_DEBUG(DB_ALL, "CRT_CREDIT_EP_CTX set as 0, flow control disabled.\n");
} else if (credits > CRT_MAX_CREDITS_PER_EP_CTX) {
D_DEBUG(DB_ALL, "ENV CRT_CREDIT_EP_CTX's value %d exceed max "
"allowed value, use %d for flow control.\n",
credits, CRT_MAX_CREDITS_PER_EP_CTX);
if (credits > CRT_MAX_CREDITS_PER_EP_CTX)
credits = CRT_MAX_CREDITS_PER_EP_CTX;
} else {
D_DEBUG(DB_ALL, "CRT_CREDIT_EP_CTX set as %d for flow "
"control.\n", credits);
}
crt_gdata.cg_credit_ep_ctx = credits;
D_ASSERT(crt_gdata.cg_credit_ep_ctx <= CRT_MAX_CREDITS_PER_EP_CTX);

/** Enable statistics only for the server side and if requested */
if (opt && opt->cio_use_sensors && server) {
Expand All @@ -330,6 +331,8 @@ static int data_init(int server, crt_init_options_t *opt)
}

gdata_init_flag = 1;
crt_gdata_dump();

return rc;
}

Expand Down Expand Up @@ -431,15 +434,14 @@ __split_arg(char *s_arg_to_split, const char *delim, char **first_arg, char **se
return DER_SUCCESS;
}


int
crt_provider_t
crt_str_to_provider(const char *str_provider)
{
int provider_idx = CRT_PROV_UNKNOWN;
int i;
crt_provider_t prov = CRT_PROV_UNKNOWN;
int i;

if (str_provider == NULL)
return provider_idx;
return prov;

for (i = 0; crt_na_dict[i].nad_str != NULL; i++) {

Expand All @@ -448,12 +450,12 @@ crt_str_to_provider(const char *str_provider)
(crt_na_dict[i].nad_alt_str &&
!strncmp(str_provider, crt_na_dict[i].nad_alt_str,
strlen(crt_na_dict[i].nad_alt_str) + 1))) {
provider_idx = crt_na_dict[i].nad_type;
prov = crt_na_dict[i].nad_type;
break;
}
}

return provider_idx;
return prov;
}

static int
Expand All @@ -465,14 +467,12 @@ check_grpid(crt_group_id_t grpid)
return rc;

if (crt_validate_grpid(grpid) != 0) {
D_ERROR("grpid contains invalid characters "
"or is too long\n");
D_ERROR("grpid contains invalid characters or is too long\n");
D_GOTO(out, rc = -DER_INVAL);
}

if (strcmp(grpid, CRT_DEFAULT_GRPID) == 0) {
D_ERROR("invalid client grpid (same as "
"CRT_DEFAULT_GRPID).\n");
D_ERROR("invalid client grpid (same as CRT_DEFAULT_GRPID).\n");
D_GOTO(out, rc = -DER_INVAL);
}
out:
Expand Down
29 changes: 27 additions & 2 deletions src/cart/crt_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -42,7 +42,7 @@
##__VA_ARGS__); \
} while (0)

/* Log an error with a RPC descriptor */
/* Log an error with an RPC descriptor */
#define RPC_ERROR(rpc, fmt, ...) \
do { \
char *_module; \
Expand All @@ -55,6 +55,31 @@
##__VA_ARGS__); \
} while (0)

/* Log a warning with an RPC descriptor */
#define RPC_WARN(rpc, fmt, ...) \
do { \
char *_module; \
char *_opc; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_WARN((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
} while (0)

/* Log an info message with an RPC descriptor */
#define RPC_INFO(rpc, fmt, ...) \
do { \
char *_module; \
char *_opc; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_INFO((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
} while (0)
/**
* If \a cond is false, this is equivalent to an RPC_ERROR (i.e., \a mask is
* ignored). If \a cond is true, this is equivalent to an RPC_TRACE.
Expand Down
4 changes: 1 addition & 3 deletions src/cart/crt_internal_fns.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand All @@ -14,8 +14,6 @@
/** crt_init.c */
bool crt_initialized(void);

int crt_str_to_provider(const char *provider);

/** crt_register.c */
int crt_opc_map_create(void);
void crt_opc_map_destroy(struct crt_opc_map *map);
Expand Down
21 changes: 11 additions & 10 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,17 @@ struct crt_gdata {
volatile unsigned int cg_refcount;

/** flags to keep track of states */
unsigned int cg_inited : 1,
cg_grp_inited : 1,
cg_swim_inited : 1,
cg_auto_swim_disable : 1,
/** whether it is a client or server */
cg_server : 1,
/** whether scalable endpoint is enabled */
cg_use_sensors : 1,
/** whether we are on a primary provider */
cg_provider_is_primary : 1;
unsigned int cg_inited : 1;
frostedcmos marked this conversation as resolved.
Show resolved Hide resolved
unsigned int cg_grp_inited : 1;
unsigned int cg_swim_inited : 1;
unsigned int cg_auto_swim_disable : 1;

/** whether it is a client or server */
unsigned int cg_server : 1;
/** whether metrics are used */
unsigned int cg_use_sensors : 1;
/** whether we are on a primary provider */
unsigned int cg_provider_is_primary : 1;

ATOMIC uint64_t cg_rpcid; /* rpc id */

Expand Down
Loading