Skip to content

Commit

Permalink
DAOS-16721 dtx: handle potential DTX ID reusing trouble (#15408)
Browse files Browse the repository at this point in the history
The patch contains the following improvements:

1. When VOS level logic returns -DER_TX_RESATRT, the object level RPC
   handler should set 'RESEND' flag then restart the transaction with
   newer epoch. Because dtx_abort() logic cannot guarantee all former
   prepared DTX entries (on all related participants) can be aborted,
   especially if the former one failed for some network trouble, that
   may cause restarted transaction hit -DER_TX_ID_REUSED unexpectedly.

2. Compare the epoch for DTX entries with the same transaction ID for
   distinguishing potential reused TX ID more accurately.

3. Add DTX entry into DTX CoS cache if cannot commit it synchronously.
   Then subsequent batched commit logic can handle it.

4. If server complains suspected TX ID reusing, then reports -EIO to
   related application instead of assertion on client.

5. Control DTX related warning message frequency to avoid log flood.

6. Collect more information when generate some error/warning message.

Signed-off-by: Fan Yong <fan.yong@intel.com>
  • Loading branch information
Nasf-Fan authored Nov 5, 2024
1 parent 2469e21 commit 35ee55e
Show file tree
Hide file tree
Showing 15 changed files with 254 additions and 143 deletions.
8 changes: 5 additions & 3 deletions src/dtx/dtx_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ dtx_coll_prep_ult(void *arg)
}

if (dcpa->dcpa_result != 0) {
if (dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST)
D_ERROR("Failed to load mbs for "DF_DTI", opc %u: "DF_RC"\n",
DP_DTI(&dci->dci_xid), opc, DP_RC(rc));
if (dcpa->dcpa_result < 0 &&
dcpa->dcpa_result != -DER_INPROGRESS && dcpa->dcpa_result != -DER_NONEXIST)
D_ERROR("Failed to load mbs for "DF_DTI" in "DF_UUID"/"DF_UUID", opc %u: "
DF_RC"\n", DP_DTI(&dci->dci_xid), DP_UUID(dci->dci_po_uuid),
DP_UUID(dci->dci_co_uuid), opc, DP_RC(dcpa->dcpa_result));
goto out;
}

Expand Down
83 changes: 47 additions & 36 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,6 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
int status = -1;
int rc = 0;
bool aborted = false;
bool unpin = false;

D_ASSERT(cont != NULL);

Expand Down Expand Up @@ -1339,7 +1338,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
* it persistently. Otherwise, the subsequent DTX resync may not find it as
* to regard it as failed transaction and abort it.
*/
if (result == 0 && !dth->dth_active && !dth->dth_prepared && !dth->dth_solo &&
if (!dth->dth_active && !dth->dth_prepared &&
(dth->dth_dist || dth->dth_modification_cnt > 0)) {
result = vos_dtx_attach(dth, true, dth->dth_ent != NULL ? true : false);
if (unlikely(result < 0)) {
Expand All @@ -1349,7 +1348,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
}
}

if (dth->dth_prepared || dtx_batched_ult_max == 0) {
if ((dth->dth_prepared && !dlh->dlh_coll) || dtx_batched_ult_max == 0) {
dth->dth_sync = 1;
goto sync;
}
Expand All @@ -1363,53 +1362,60 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
if (DAOS_FAIL_CHECK(DAOS_DTX_MISS_COMMIT))
dth->dth_sync = 1;

/* For synchronous DTX, do not add it into CoS cache, otherwise,
* we may have no way to remove it from the cache.
*/
if (dth->dth_sync)
goto sync;

D_ASSERT(dth->dth_mbs != NULL);

cache:
if (dlh->dlh_coll) {
rc = dtx_cos_add(cont, dlh->dlh_coll_entry, &dth->dth_leader_oid,
dth->dth_dkey_hash, dth->dth_epoch, DCF_EXP_CMT | DCF_COLL);
} else {
size = sizeof(*dte) + sizeof(*mbs) + dth->dth_mbs->dm_data_size;
D_ALLOC(dte, size);
if (dte == NULL) {
dth->dth_sync = 1;
goto sync;
}

mbs = (struct dtx_memberships *)(dte + 1);
memcpy(mbs, dth->dth_mbs, size - sizeof(*dte));

dte->dte_xid = dth->dth_xid;
dte->dte_ver = dth->dth_ver;
dte->dte_refs = 1;
dte->dte_mbs = mbs;
rc = -DER_NOMEM;
} else {
mbs = (struct dtx_memberships *)(dte + 1);
memcpy(mbs, dth->dth_mbs, size - sizeof(*dte));

dte->dte_xid = dth->dth_xid;
dte->dte_ver = dth->dth_ver;
dte->dte_refs = 1;
dte->dte_mbs = mbs;

if (!(mbs->dm_flags & DMF_SRDG_REP))
flags = DCF_EXP_CMT;
else if (dth->dth_modify_shared)
flags = DCF_SHARED;
else
flags = 0;

if (!(mbs->dm_flags & DMF_SRDG_REP))
flags = DCF_EXP_CMT;
else if (dth->dth_modify_shared)
flags = DCF_SHARED;
else
flags = 0;
rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash,
dth->dth_epoch, flags);
dtx_entry_put(dte);
}
}

rc = dtx_cos_add(cont, dte, &dth->dth_leader_oid, dth->dth_dkey_hash,
dth->dth_epoch, flags);
dtx_entry_put(dte);
/*
* NOTE: If we failed to add the committable DTX into CoS cache, then we also have no way
* to commit (or abort) the DTX because of out of memory. Such DTX will be finally
* committed via next DTX resync (after recovered from OOM).
*
* Here, we only warning to notify the trouble, but not failed the transaction.
*/
if (rc != 0) {
D_WARN(DF_UUID": Fail to cache %s DTX "DF_DTI": "DF_RC"\n",
DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular",
DP_DTI(&dth->dth_xid), DP_RC(rc));
D_GOTO(out, result = 0);
}

if (rc == 0) {
if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) {
vos_dtx_mark_committable(dth);
if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll)
sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req);
}
} else {
dth->dth_sync = 1;
if (!DAOS_FAIL_CHECK(DAOS_DTX_NO_COMMITTABLE)) {
vos_dtx_mark_committable(dth);
if (cont->sc_dtx_committable_count > DTX_THRESHOLD_COUNT || dlh->dlh_coll)
sched_req_wakeup(dss_get_module_info()->dmi_dtx_cmt_req);
}

sync:
Expand All @@ -1428,10 +1434,15 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
rc = dtx_commit(cont, &dte, NULL, 1, false);
}

if (rc != 0)
if (rc != 0) {
D_WARN(DF_UUID": Fail to sync %s commit DTX "DF_DTI": "DF_RC"\n",
DP_UUID(cont->sc_uuid), dlh->dlh_coll ? "collective" : "regular",
DP_DTI(&dth->dth_xid), DP_RC(rc));
if (likely(dtx_batched_ult_max != 0)) {
dth->dth_sync = 0;
goto cache;
}
}

/*
* NOTE: The semantics of 'sync' commit does not guarantee that all
Expand All @@ -1451,7 +1462,7 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_hdl *coh, int resul
* to locally retry for avoiding related forwarded RPC timeout, instead,
* The leader will trigger retry globally without abort 'prepared' ones.
*/
if (unpin || (result < 0 && result != -DER_AGAIN && !dth->dth_solo)) {
if (result < 0 && result != -DER_AGAIN && !dth->dth_solo) {
/* 1. Drop partial modification for distributed transaction.
* 2. Remove the pinned DTX entry.
*/
Expand Down
15 changes: 9 additions & 6 deletions src/dtx/dtx_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1657,8 +1657,9 @@ dtx_coll_commit(struct ds_cont_child *cont, struct dtx_coll_entry *dce, struct d
}

D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE,
"Collectively commit DTX "DF_DTI": %d/%d/%d\n",
DP_DTI(&dce->dce_xid), rc, rc1, rc2);
"Collectively commit DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/%d\n",
DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid),
rc, rc1, rc2);

return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2;
}
Expand Down Expand Up @@ -1717,8 +1718,9 @@ dtx_coll_abort(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc
rc2 = 0;

D_CDEBUG(rc != 0 || rc1 != 0 || rc2 != 0, DLOG_ERR, DB_TRACE,
"Collectively abort DTX "DF_DTI": %d/%d/%d\n",
DP_DTI(&dce->dce_xid), rc, rc1, rc2);
"Collectively abort DTX "DF_DTI" with epoch "DF_X64" in "
DF_UUID"/"DF_UUID": %d/%d/%d\n", DP_DTI(&dce->dce_xid), epoch,
DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid), rc, rc1, rc2);

return rc != 0 ? rc : rc1 != 0 ? rc1 : rc2;
}
Expand Down Expand Up @@ -1766,8 +1768,9 @@ dtx_coll_check(struct ds_cont_child *cont, struct dtx_coll_entry *dce, daos_epoc
}

D_CDEBUG((rc < 0 && rc != -DER_NONEXIST) || (rc1 < 0 && rc1 != -DER_NONEXIST), DLOG_ERR,
DB_TRACE, "Collectively check DTX "DF_DTI": %d/%d/\n",
DP_DTI(&dce->dce_xid), rc, rc1);
DB_TRACE, "Collectively check DTX "DF_DTI" in "DF_UUID"/"DF_UUID": %d/%d/\n",
DP_DTI(&dce->dce_xid), DP_UUID(cont->sc_pool_uuid), DP_UUID(cont->sc_uuid),
rc, rc1);

return dce->dce_ranks != NULL ? rc : rc1;
}
5 changes: 3 additions & 2 deletions src/dtx/dtx_srv.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ dtx_coll_handler(crt_rpc_t *rpc)

out:
D_CDEBUG(rc < 0, DLOG_ERR, DB_TRACE,
"Handled collective DTX PRC %u on rank %u for "DF_DTI": "DF_RC"\n",
opc, myrank, DP_DTI(&dci->dci_xid), DP_RC(rc));
"Handled collective DTX PRC %u on rank %u for "DF_DTI" in "
DF_UUID"/"DF_UUID": "DF_RC"\n", opc, myrank, DP_DTI(&dci->dci_xid),
DP_UUID(dci->dci_po_uuid), DP_UUID(dci->dci_co_uuid), DP_RC(rc));

dco->dco_status = rc;
rc = crt_reply_send(rpc);
Expand Down
3 changes: 3 additions & 0 deletions src/include/daos_srv/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ struct ds_cont_child {
uint32_t sc_dtx_committable_count;
uint32_t sc_dtx_committable_coll_count;

/* Last timestamp when EC aggregation reports -DER_INPROGRESS. */
uint64_t sc_ec_agg_busy_ts;

/* The global minimum EC aggregation epoch, which will be upper
* limit for VOS aggregation, i.e. EC object VOS aggregation can
* not cross this limit. For simplification purpose, all objects
Expand Down
2 changes: 1 addition & 1 deletion src/object/cli_coll.c
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ queue_coll_query_task(tse_task_t *api_task, struct obj_auxi_args *obj_auxi, stru
0, 0, ocdc);

for (i = 0; i < ocdc->grp_nr; i++) {
obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep);
obj_coll_disp_dest(ocdc, coa->coa_dcts, &tgt_ep, obj->cob_md.omd_id);

tmp = coa->coa_dcts[ocdc->cur_pos].dct_shards[tgt_ep.ep_tag].dcs_idx;
rc = queue_shard_query_key_task(api_task, obj_auxi, epoch, tmp, map_ver,
Expand Down
23 changes: 16 additions & 7 deletions src/object/cli_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -1718,15 +1718,20 @@ dc_obj_retry_delay(tse_task_t *task, int err, uint16_t *retry_cnt, uint16_t *inp
uint32_t timeout_sec)
{
uint32_t delay = 0;
uint32_t limit = 4;

/*
* Randomly delay 5 - 68 us if it is not the first retry for
* Randomly delay 5 ~ 1028 us if it is not the first retry for
* -DER_INPROGRESS || -DER_UPDATE_AGAIN cases.
*/
++(*retry_cnt);
if (err == -DER_INPROGRESS || err == -DER_UPDATE_AGAIN) {
if (++(*inprogress_cnt) > 1) {
delay = (d_rand() & ((1 << 6) - 1)) + 5;
limit += *inprogress_cnt;
if (limit > 10)
limit = 10;

delay = (d_rand() & ((1 << limit) - 1)) + 5;
/* Rebuild is being established on the server side, wait a bit longer */
if (err == -DER_UPDATE_AGAIN)
delay <<= 10;
Expand Down Expand Up @@ -4856,11 +4861,14 @@ obj_comp_cb(tse_task_t *task, void *data)
D_ASSERT(daos_handle_is_inval(obj_auxi->th));
D_ASSERT(obj_is_modification_opc(obj_auxi->opc));

if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0)
/* XXX: it is must because miss to set "RESEND" flag, that is bug. */
D_ASSERTF(0,
"Miss 'RESEND' flag (%x) when resend the RPC for task %p: %u\n",
obj_auxi->flags, task, obj_auxi->retry_cnt);
if (task->dt_result == -DER_TX_ID_REUSED && obj_auxi->retry_cnt != 0) {
D_ERROR("TX ID maybe reused for unknown reason, "
"task %p, opc %u, flags %x, retry_cnt %u\n",
task, obj_auxi->opc, obj_auxi->flags, obj_auxi->retry_cnt);
task->dt_result = -DER_IO;
obj_auxi->io_retry = 0;
goto args_fini;
}

if (obj_auxi->opc == DAOS_OBJ_RPC_UPDATE) {
daos_obj_rw_t *api_args = dc_task_get_args(obj_auxi->obj_task);
Expand All @@ -4886,6 +4894,7 @@ obj_comp_cb(tse_task_t *task, void *data)
}
}

args_fini:
if (obj_auxi->opc == DAOS_OBJ_RPC_COLL_PUNCH)
obj_coll_oper_args_fini(&obj_auxi->p_args.pa_coa);

Expand Down
13 changes: 8 additions & 5 deletions src/object/cli_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -1451,11 +1451,14 @@ obj_shard_coll_punch_cb(tse_task_t *task, void *data)
shard_args->pa_auxi.obj_auxi->max_delay = timeout;
}

DL_CDEBUG(task->dt_result < 0, DLOG_ERR, DB_IO, task->dt_result,
"DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" with DTX "
DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout",
rpc, DP_UOID(ocpi->ocpi_oid), DP_DTI(&ocpi->ocpi_xid), task, ocpi->ocpi_map_ver,
*cb_args->cpca_ver, (unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags,
DL_CDEBUG(task->dt_result < 0 && task->dt_result != -DER_INPROGRESS,
DLOG_ERR, DB_IO, task->dt_result,
"DAOS_OBJ_RPC_COLL_PUNCH RPC %p for "DF_UOID" in "DF_UUID"/"DF_UUID"/"
DF_UUID" with DTX "DF_DTI" for task %p, map_ver %u/%u, flags %lx/%x, %s layout",
rpc, DP_UOID(ocpi->ocpi_oid), DP_UUID(ocpi->ocpi_po_uuid),
DP_UUID(ocpi->ocpi_co_hdl), DP_UUID(ocpi->ocpi_co_uuid), DP_DTI(&ocpi->ocpi_xid),
task, ocpi->ocpi_map_ver, *cb_args->cpca_ver,
(unsigned long)ocpi->ocpi_api_flags, ocpi->ocpi_flags,
cb_args->cpca_shard_args->pa_coa.coa_raw_sparse ? "sparse" : "continuous");

crt_req_decref(rpc);
Expand Down
2 changes: 1 addition & 1 deletion src/object/obj_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ int daos_obj_query_merge(struct obj_query_merge_args *oqma);
void obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size,
uint32_t start, uint32_t max_width, struct obj_coll_disp_cursor *ocdc);
void obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts,
crt_endpoint_t *tgt_ep);
crt_endpoint_t *tgt_ep, daos_obj_id_t oid);
void obj_coll_disp_move(struct obj_coll_disp_cursor *ocdc);
int obj_utils_init(void);
void obj_utils_fini(void);
Expand Down
17 changes: 8 additions & 9 deletions src/object/obj_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -616,23 +616,22 @@ obj_coll_disp_init(uint32_t tgt_nr, uint32_t max_tgt_size, uint32_t inline_size,

void
obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *tgts,
crt_endpoint_t *tgt_ep)
crt_endpoint_t *tgt_ep, daos_obj_id_t oid)
{
struct daos_coll_target *dct = &tgts[ocdc->cur_pos];
struct daos_coll_target tmp;
unsigned long rand = 0;
uint32_t size;
int pos;
int i;

if (ocdc->cur_step > 2) {
rand = d_rand();
/*
* Randomly choose an engine as the relay one for load balance.
* If the one corresponding to "pos" is former moved one, then
* use the "cur_pos" as the relay engine.
* Choose an engine (according to the given oid) as the relay one for load balance.
* If the one corresponding to "pos" is former moved one, then use the "cur_pos" as
* the relay engine. Then even if related RPC was resent without changing pool map,
* then the relay one will be the same as the original case.
*/
pos = rand % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos;
pos = oid.lo % (ocdc->tgt_nr - ocdc->cur_pos) + ocdc->cur_pos;
if (pos > ocdc->cur_pos && tgts[pos].dct_rank > dct->dct_rank) {
memcpy(&tmp, &tgts[pos], sizeof(tmp));
memcpy(&tgts[pos], dct, sizeof(tmp));
Expand All @@ -642,8 +641,8 @@ obj_coll_disp_dest(struct obj_coll_disp_cursor *ocdc, struct daos_coll_target *t

size = dct->dct_bitmap_sz << 3;

/* Randomly choose a XS as the local leader on target engine for load balance. */
for (i = 0, pos = (rand != 0 ? rand : d_rand()) % dct->dct_tgt_nr; i < size; i++) {
/* Choose a target as the local agent on the engine for load balance. */
for (i = 0, pos = oid.lo % dct->dct_tgt_nr; i < size; i++) {
if (isset(dct->dct_bitmap, i)) {
pos -= dct->dct_shards[i].dcs_nr;
if (pos < 0)
Expand Down
33 changes: 31 additions & 2 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -2674,8 +2674,13 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
struct ec_agg_param *ec_agg_param = agg_param->ap_data;
vos_iter_param_t iter_param = { 0 };
struct vos_iter_anchors anchors = { 0 };
struct dtx_handle *dth = NULL;
struct dtx_share_peer *dsp;
struct dtx_id dti = { 0 };
struct dtx_epoch epoch = { 0 };
daos_unit_oid_t oid = { 0 };
int blocks = 0;
int rc = 0;
int blocks = 0;

/*
* Avoid calling into vos_aggregate() when aborting aggregation
Expand Down Expand Up @@ -2722,8 +2727,32 @@ cont_ec_aggregate_cb(struct ds_cont_child *cont, daos_epoch_range_t *epr,
agg_reset_entry(&ec_agg_param->ap_agg_entry, NULL, NULL);

retry:
epoch.oe_value = epr->epr_hi;
rc = dtx_begin(cont->sc_hdl, &dti, &epoch, 0, cont->sc_pool->spc_map_version, &oid,
NULL, 0, 0, NULL, &dth);
if (rc != 0)
goto update_hae;

rc = vos_iterate(&iter_param, VOS_ITER_OBJ, true, &anchors, agg_iterate_pre_cb,
agg_iterate_post_cb, ec_agg_param, NULL);
agg_iterate_post_cb, ec_agg_param, dth);
if (rc == -DER_INPROGRESS && !d_list_empty(&dth->dth_share_tbd_list)) {
uint64_t now = daos_gettime_coarse();

/* Report warning per each 10 seconds to avoid log flood. */
if (now - cont->sc_ec_agg_busy_ts > 10) {
while ((dsp = d_list_pop_entry(&dth->dth_share_tbd_list,
struct dtx_share_peer, dsp_link)) != NULL) {
D_WARN(DF_CONT ": EC aggregate hit non-committed DTX " DF_DTI "\n",
DP_CONT(cont->sc_pool->spc_uuid, cont->sc_uuid),
DP_DTI(&dsp->dsp_xid));
dtx_dsp_free(dsp);
}

cont->sc_ec_agg_busy_ts = now;
}
}

dtx_end(dth, cont, rc);

/* Post_cb may not being executed in some cases */
agg_clear_extents(&ec_agg_param->ap_agg_entry);
Expand Down
Loading

0 comments on commit 35ee55e

Please sign in to comment.