Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-15829 object: fix potential DRAM leak when retry after DTX refresh #14394

Merged
merged 1 commit into from
May 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 98 additions & 99 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -697,20 +697,21 @@ obj_set_reply_nrs(crt_rpc_t *rpc, daos_handle_t ioh, d_sg_list_t *echo_sgl, uint
/* Re-entry case. */
if (orwo->orw_nrs.ca_count != 0) {
D_ASSERT(orwo->orw_nrs.ca_count == nrs_count);
return 0;
}

/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;
D_ASSERT(orwo->orw_data_sizes.ca_count == nrs_count);
D_ASSERT(orwo->orw_nrs.ca_arrays != NULL);
D_ASSERT(orwo->orw_data_sizes.ca_arrays != NULL);
} else {
/* return sg_nr_out and data size for sgl */
D_ALLOC(orwo->orw_nrs.ca_arrays,
nrs_count * (sizeof(uint32_t) + sizeof(daos_size_t)));
if (orwo->orw_nrs.ca_arrays == NULL)
return -DER_NOMEM;

orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays =
(void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
orwo->orw_nrs.ca_count = nrs_count;
orwo->orw_data_sizes.ca_count = nrs_count;
orwo->orw_data_sizes.ca_arrays = (void *)((char *)orwo->orw_nrs.ca_arrays +
nrs_count * (sizeof(uint32_t)));
}

nrs = orwo->orw_nrs.ca_arrays;
data_sizes = orwo->orw_data_sizes.ca_arrays;
Expand Down Expand Up @@ -850,13 +851,20 @@ obj_fetch_csum_init(struct ds_cont_child *cont, struct obj_rw_in *orw, struct ob
*
* The memory will be freed in obj_rw_reply
*/
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = (uint64_t)rc;
rc = 0;
/* Re-entry case. */
if (orwo->orw_iod_csums.ca_count != 0) {
D_ASSERT(orwo->orw_iod_csums.ca_arrays != NULL);
rc = 0;
} else {
rc = daos_csummer_alloc_iods_csums(cont->sc_csummer, orw->orw_iod_array.oia_iods,
orw->orw_iod_array.oia_iod_nr, false, NULL,
&orwo->orw_iod_csums.ca_arrays);

if (rc >= 0) {
orwo->orw_iod_csums.ca_count = rc;
rc = 0;
}
}

return rc;
Expand Down Expand Up @@ -1061,10 +1069,10 @@ obj_fetch_create_maps(crt_rpc_t *rpc, struct bio_desc *biod, daos_iod_t *iods, u
if (skips == NULL)
D_ASSERTF(total_nr == iods_nr, "total nr %d, iods_nr %d\n", total_nr, iods_nr);

/* Re-entry case. */
if (orwo->orw_maps.ca_count != 0) {
D_ASSERT(orwo->orw_maps.ca_count == total_nr);
return 0;
/* Re-entry case, iods may be changed, let's re-generate the maps. */
if (orwo->orw_maps.ca_arrays != NULL) {
ds_iom_free(&orwo->orw_maps.ca_arrays, orwo->orw_maps.ca_count);
orwo->orw_maps.ca_count = 0;
}

rc = ds_iom_create(biod, iods, iods_nr, flags, &maps);
Expand Down Expand Up @@ -1144,6 +1152,10 @@ obj_prep_fetch_sgls(crt_rpc_t *rpc, struct obj_io_context *ioc)
int j;
int rc = 0;

/* Re-entry case. */
if (ioc->ioc_free_sgls)
return 0;

for (i = 0; i < nr; i++) {
for (j = 0; j < sgls[i].sg_nr; j++) {
d_iov_t *iov = &sgls[i].sg_iovs[j];
Expand Down Expand Up @@ -1938,7 +1950,7 @@ obj_get_iods_offs(daos_unit_oid_t uoid, struct obj_iod_array *iod_array,
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
obj_local_rw_internal_wrap(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct obj_rw_in *orw = crt_req_get(rpc);
daos_iod_t iod = { 0 };
Expand All @@ -1952,32 +1964,13 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
uint8_t *skips = (uint8_t *)&local_skips;
uint32_t nr = 0;
int rc;
int count = 0;

rc = obj_get_iods_offs(orw->orw_oid, &orw->orw_iod_array, &ioc->ioc_oca,
orw->orw_dkey_hash, ioc->ioc_layout_ver, &iods,
&offs, &skips, &csums, &csum_info, &nr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just confirm why change to do the retry outside obj_local_rw_internal_wrap()?
previously retry inside this func so need not do obj_get_iods_offs() and free the mem for retry, is it with bug?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because obj_local_rw_internal() may internally allocate some DRAM. If we do not release them before retry, when re-entry obj_local_rw_internal(), it may cause related DRAM leak, such as the iods. It is not impossible to trace them one by one, but it is still possible to miss some corner cases. So here, we introduce obj_local_rw_internal_wrap() that will cleanup obj_local_rw_internal() before re-entry.

if (rc != 0)
D_GOTO(out, rc);
again:
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++count % 10 == 3)) {
struct dtx_share_peer *dsp;

dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, count);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}
if (rc == 0)
rc = obj_local_rw_internal(rpc, ioc, iods, csums, offs, skips, nr, dth);

out:
if (csums != NULL && csums != &csum && csums != orw->orw_iod_array.oia_iod_csums) {
int i;

Expand All @@ -1997,6 +1990,32 @@ obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
return rc;
}

static int
obj_local_rw(crt_rpc_t *rpc, struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = obj_local_rw_internal_wrap(rpc, ioc, dth);
if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next, struct dtx_share_peer,
dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d times, "
"maybe dead loop\n", DP_DTI(&dth->dth_xid), DP_DTI(&dsp->dsp_xid),
dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
}

return rc;
}

static int
obj_capa_check(struct ds_cont_hdl *coh, bool is_write, bool is_agg_migrate)
{
Expand Down Expand Up @@ -3046,45 +3065,12 @@ obj_enum_complete(crt_rpc_t *rpc, int status, int map_version,
D_FREE(oeo->oeo_csum_iov.iov_buf);
}

static int
obj_restore_enum_args(crt_rpc_t *rpc, struct ds_obj_enum_arg *des,
struct ds_obj_enum_arg *src)
{
struct obj_key_enum_out *oeo = crt_reply_get(rpc);
struct obj_key_enum_in *oei = crt_req_get(rpc);
int rc;

if (!des->fill_recxs && des->csum_iov.iov_buf != NULL)
daos_iov_free(&des->csum_iov);

*des = *src;

if (des->fill_recxs)
return 0;

if (des->kds != NULL)
memset(des->kds, 0, des->kds_cap * sizeof(daos_key_desc_t));
des->kds_len = 0;

if (oeo->oeo_sgl.sg_iovs == NULL)
return 0;

d_sgl_fini(&oeo->oeo_sgl, true);
rc = daos_sgls_alloc(&oeo->oeo_sgl, &oei->oei_sgl, 1);
if (rc != 0)
return rc;

des->sgl = &oeo->oeo_sgl;
return 0;
}

static int
obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
struct vos_iter_anchors *anchors, struct ds_obj_enum_arg *enum_arg,
daos_epoch_t *e_out)
{
vos_iter_param_t param = { 0 };
struct ds_obj_enum_arg saved_arg;
struct obj_key_enum_in *oei = crt_req_get(rpc);
struct dtx_handle *dth = NULL;
uint32_t flags = 0;
Expand Down Expand Up @@ -3147,7 +3133,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
D_ASSERT(opc == DAOS_OBJ_RPC_ENUMERATE);
type = VOS_ITER_DKEY;
param.ip_flags |= VOS_IT_RECX_VISIBLE;
if (daos_anchor_get_flags(&anchors[0].ia_dkey) &
if (daos_anchor_get_flags(&anchors->ia_dkey) &
DIOF_WITH_SPEC_EPOCH) {
/* For obj verification case. */
param.ip_epc_expr = VOS_IT_EPC_RR;
Expand Down Expand Up @@ -3177,7 +3163,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
* 'type' to indicate the anchor is on SV tree or EV tree.
*/
if (type == VOS_ITER_SINGLE)
anchors[0].ia_sv = anchors[0].ia_ev;
anchors->ia_sv = anchors->ia_ev;
else if (oei->oei_oid.id_shard % 3 == 1 &&
DAOS_FAIL_CHECK(DAOS_VC_LOST_REPLICA))
D_GOTO(failed, rc = -DER_NONEXIST);
Expand All @@ -3193,9 +3179,6 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
goto failed;
}

anchors[1] = anchors[0];
saved_arg = *enum_arg;

if (oei->oei_flags & ORF_FOR_MIGRATION)
flags = DTX_FOR_MIGRATION;

Expand All @@ -3206,16 +3189,12 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
goto failed;

re_pack:
rc = ds_obj_enum_pack(&param, type, recursive, &anchors[0], enum_arg, vos_iterate, dth);
rc = ds_obj_enum_pack(&param, type, recursive, anchors, enum_arg, vos_iterate, dth);
if (obj_dtx_need_refresh(dth, rc)) {
rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN) {
anchors[0] = anchors[1];
obj_restore_enum_args(rpc, enum_arg, &saved_arg);
if (opc == DAOS_OBJ_RPC_ENUMERATE)
fill_oid(oei->oei_oid, enum_arg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please explain a little bit why above restore section is not needed any more? thx

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because vos_iterate() takes anchor parameter, that is both input and output. When vos_iterate() returns for retry, it contains enough information to locate the position for next item. When retry after DTX refresh, it is better to resume from break point (that is the one triggered DTX refresh) instead of from the relative beginning with restore() method.

/* After DTX refresh, re_pack will resume from the position at \@anchors. */
if (rc == -DER_AGAIN)
goto re_pack;
}
}

if ((rc == -DER_KEY2BIG) && opc == DAOS_OBJ_RPC_ENUMERATE &&
Expand All @@ -3241,7 +3220,7 @@ obj_local_enum(struct obj_io_context *ioc, crt_rpc_t *rpc,
rc = rc_tmp;

if (type == VOS_ITER_SINGLE)
anchors[0].ia_ev = anchors[0].ia_sv;
anchors->ia_ev = anchors->ia_sv;

D_DEBUG(DB_IO, ""DF_UOID" iterate "DF_X64"-"DF_X64" type %d tag %d"
" rc %d\n", DP_UOID(oei->oei_oid), param.ip_epr.epr_lo,
Expand Down Expand Up @@ -3338,13 +3317,13 @@ ds_obj_enum_handler(crt_rpc_t *rpc)
dss_get_module_info()->dmi_xs_id,
oei->oei_map_ver, ioc.ioc_map_ver);

D_ALLOC_ARRAY(anchors, 2);
D_ALLOC_PTR(anchors);
if (anchors == NULL)
D_GOTO(out, rc = -DER_NOMEM);

anchors[0].ia_dkey = oei->oei_dkey_anchor;
anchors[0].ia_akey = oei->oei_akey_anchor;
anchors[0].ia_ev = oei->oei_anchor;
anchors->ia_dkey = oei->oei_dkey_anchor;
anchors->ia_akey = oei->oei_akey_anchor;
anchors->ia_ev = oei->oei_anchor;

/* TODO: Transfer the inline_thres from enumerate RPC */
enum_arg.inline_thres = 32;
Expand Down Expand Up @@ -3395,9 +3374,9 @@ ds_obj_enum_handler(crt_rpc_t *rpc)
if (rc)
D_GOTO(out, rc);

oeo->oeo_dkey_anchor = anchors[0].ia_dkey;
oeo->oeo_akey_anchor = anchors[0].ia_akey;
oeo->oeo_anchor = anchors[0].ia_ev;
oeo->oeo_dkey_anchor = anchors->ia_dkey;
oeo->oeo_akey_anchor = anchors->ia_akey;
oeo->oeo_anchor = anchors->ia_ev;

if (enum_arg.eprs)
oeo->oeo_eprs.ca_count = enum_arg.eprs_len;
Expand Down Expand Up @@ -3446,7 +3425,9 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc,
struct obj_io_context *ioc, struct dtx_handle *dth)
{
struct ds_cont_child *cont = ioc->ioc_coc;
struct dtx_share_peer *dsp;
uint64_t sched_seq;
uint32_t retry = 0;
int rc = 0;

if (daos_is_zero_dti(&opi->opi_dti)) {
Expand Down Expand Up @@ -3493,6 +3474,14 @@ obj_local_punch(struct obj_punch_in *opi, crt_opcode_t opc,
}

if (dth != NULL && obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc != -DER_AGAIN)
goto out;
Expand Down Expand Up @@ -4830,11 +4819,21 @@ ds_cpd_handle_one_wrap(crt_rpc_t *rpc, struct daos_cpd_sub_head *dcsh,
struct daos_cpd_disp_ent *dcde, struct daos_cpd_sub_req *dcsrs,
struct obj_io_context *ioc, struct dtx_handle *dth)
{
int rc;
struct dtx_share_peer *dsp;
uint32_t retry = 0;
int rc;

again:
rc = ds_cpd_handle_one(rpc, dcsh, dcde, dcsrs, ioc, dth);
if (obj_dtx_need_refresh(dth, rc)) {
if (unlikely(++retry % 10 == 3)) {
dsp = d_list_entry(dth->dth_share_tbd_list.next,
struct dtx_share_peer, dsp_link);
D_WARN("DTX refresh for "DF_DTI" because of "DF_DTI" (%d) for %d "
"times, maybe dead loop\n", DP_DTI(&dth->dth_xid),
DP_DTI(&dsp->dsp_xid), dth->dth_share_tbd_count, retry);
}

rc = dtx_refresh(dth, ioc->ioc_coc);
if (rc == -DER_AGAIN)
goto again;
Expand Down
Loading