Skip to content

Commit

Permalink
DAOS-16009 rebuild: fix O_TRUNC file size related handling
Browse files Browse the repository at this point in the history
Fix several layers' issues -
1. rebuild
   fix a bug of rebuilding punch record for recx converting

2. EC aggregation
1) Fix a case of hole processing, when nothing to replicate,
   still need to delete parity.
2) Fix a case of no valid hole (all hole epoch < parity epoch),
   but with partial replica epoch > parity epoch.

3. dc_array
   refine to do dkey punch as far as possible for file truncate.

Required-githooks: true

Signed-off-by: Xuezhao Liu <xuezhao.liu@intel.com>
Co-authored-by: Mohamad Chaarawi <mohamad.chaarawi@intel.com>
Co-authored-by: Jeff Olivier <jeffolivier@google.com>
Co-authored-by: Wang, Di <wddi218@gmail.com>
  • Loading branch information
4 people committed Jul 2, 2024
1 parent ab94708 commit 67af463
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 67 deletions.
99 changes: 64 additions & 35 deletions src/client/array/dc_array.c
Original file line number Diff line number Diff line change
Expand Up @@ -2068,18 +2068,24 @@ free_set_size_cb(tse_task_t *task, void *data)
}

static int
punch_extent(daos_handle_t oh, daos_handle_t th, daos_size_t dkey_val, daos_off_t record_i,
daos_size_t num_records, tse_task_t *task, d_list_t *task_list)
punch_dkey_or_extent(daos_handle_t oh, daos_handle_t th, daos_size_t dkey_val, daos_off_t start,
daos_size_t num_records, bool punch_dkey, tse_task_t *task,
d_list_t *task_list)
{
daos_obj_update_t *io_arg;
daos_obj_punch_t *dkey_punch_arg;
daos_iod_t *iod;
d_sg_list_t *sgl;
daos_key_t *dkey;
struct io_params *params = NULL;
tse_task_t *io_task = NULL;
int rc;
int rc;

D_DEBUG(DB_IO, "Punching (%zu, %zu) in Key %zu\n", record_i + 1, num_records, dkey_val);
if (punch_dkey)
D_DEBUG(DB_IO, "Punching dkey %zu\n", dkey_val);
else
D_DEBUG(DB_IO, "Punching (%zu, %zu) in Key %zu\n",
start, num_records, dkey_val);

D_ALLOC_PTR(params);
if (params == NULL)
Expand All @@ -2093,28 +2099,42 @@ punch_extent(daos_handle_t oh, daos_handle_t th, daos_size_t dkey_val, daos_off_
dkey = &params->dkey;
d_iov_set(dkey, &params->dkey_val, sizeof(uint64_t));

/* set descriptor for KV object */
d_iov_set(&iod->iod_name, &params->akey_val, 1);
iod->iod_nr = 1;
iod->iod_size = 0; /* 0 to punch */
iod->iod_type = DAOS_IOD_ARRAY;
D_ALLOC_PTR(iod->iod_recxs);
if (iod->iod_recxs == NULL)
D_GOTO(free, rc = -DER_NOMEM);
iod->iod_recxs[0].rx_idx = record_i + 1;
iod->iod_recxs[0].rx_nr = num_records;

rc = daos_task_create(DAOS_OPC_OBJ_UPDATE, tse_task2sched(task), 0, NULL, &io_task);
if (rc)
D_GOTO(free_reqs, rc);
if (punch_dkey) {
rc = daos_task_create(DAOS_OPC_OBJ_PUNCH_DKEYS, tse_task2sched(task), 0, NULL,
&io_task);
if (rc)
D_GOTO(free_reqs, rc);

dkey_punch_arg = daos_task_get_args(io_task);
dkey_punch_arg->oh = oh;
dkey_punch_arg->th = th;
dkey_punch_arg->dkey = dkey;
dkey_punch_arg->akeys = NULL;
dkey_punch_arg->akey_nr = 0;
} else {
/* set descriptor for KV object */
d_iov_set(&iod->iod_name, &params->akey_val, 1);
iod->iod_nr = 1;
iod->iod_size = 0; /* 0 to punch */
iod->iod_type = DAOS_IOD_ARRAY;
D_ALLOC_PTR(iod->iod_recxs);
if (iod->iod_recxs == NULL)
D_GOTO(free, rc = -DER_NOMEM);
iod->iod_recxs[0].rx_idx = start;
iod->iod_recxs[0].rx_nr = num_records;

rc = daos_task_create(DAOS_OPC_OBJ_UPDATE, tse_task2sched(task), 0, NULL, &io_task);
if (rc)
D_GOTO(free_reqs, rc);

io_arg = daos_task_get_args(io_task);
io_arg->oh = oh;
io_arg->th = th;
io_arg->dkey = dkey;
io_arg->nr = 1;
io_arg->iods = iod;
io_arg->sgls = sgl;
io_arg = daos_task_get_args(io_task);
io_arg->oh = oh;
io_arg->th = th;
io_arg->dkey = dkey;
io_arg->nr = 1;
io_arg->iods = iod;
io_arg->sgls = sgl;
}

rc = tse_task_register_comp_cb(io_task, free_io_params_cb, &params, sizeof(params));
if (rc)
Expand Down Expand Up @@ -2421,18 +2441,26 @@ adjust_array_size_cb(tse_task_t *task, void *data)
memcpy(&dkey_val, ptr, args->kds[i].kd_key_len);
ptr += args->kds[i].kd_key_len;

/*
* Either punch the entire dkey or an extent in that dkey depending on the offset
* where we are truncating to. The first dkey of the array (dkey 1) will always be
* an extent punch to maintain an epoch there.
*/
if (props->size == 0 || dkey_val > props->dkey_val) {
/** Do nothing for DKEY 0 (metadata) */
if (dkey_val == 0)
continue;
/*
* The dkey is higher than the adjustded size so we could punch it here.
* But it's better to punch the extent so that the max_write for the object
* doesn't get lost by aggregation.
*/
D_DEBUG(DB_IO, "Punch full extent in key "DF_U64"\n", dkey_val);
rc = punch_extent(args->oh, args->th, dkey_val, (daos_off_t)-1,
props->chunk_size, props->ptask, &task_list);
if (dkey_val == 1) {
D_DEBUG(DB_IO, "Punch full extent in key " DF_U64 "\n", dkey_val);
rc = punch_dkey_or_extent(args->oh, args->th, dkey_val,
0, props->chunk_size, false,
props->ptask, &task_list);
} else {
D_DEBUG(DB_IO, "Punch dkey " DF_U64 "\n", dkey_val);
rc = punch_dkey_or_extent(args->oh, args->th, dkey_val,
0, props->chunk_size, true,
props->ptask, &task_list);
}
if (rc)
goto out;
} else if (dkey_val == props->dkey_val && props->record_i) {
Expand All @@ -2443,8 +2471,9 @@ adjust_array_size_cb(tse_task_t *task, void *data)
props->chunk_size);
/** Punch all records above record_i */
D_DEBUG(DB_IO, "Punch extent in key "DF_U64"\n", dkey_val);
rc = punch_extent(args->oh, args->th, dkey_val, props->record_i,
props->num_records, props->ptask, &task_list);
rc = punch_dkey_or_extent(args->oh, args->th, dkey_val,
props->record_i + 1, props->num_records,
false, props->ptask, &task_list);
if (rc)
goto out;
}
Expand Down
7 changes: 4 additions & 3 deletions src/common/pool_map.c
Original file line number Diff line number Diff line change
Expand Up @@ -1692,11 +1692,12 @@ gen_pool_buf(struct pool_map *map, struct pool_buf **map_buf_out, int map_versio
map_comp.co_flags = PO_COMPF_NONE;
map_comp.co_nr = 1;

D_DEBUG(DB_TRACE, "adding target: type=0x%hhx, status=%hhu, idx=%d, "
D_DEBUG(DB_TRACE, "adding target: type=0x%hhx, status=%hhu, idx=%d, id=%d, "
"rank=%d, ver=%d, in_ver=%d, fseq=%u, flags=0x%x, nr=%u\n",
map_comp.co_type, map_comp.co_status, map_comp.co_index,
map_comp.co_rank, map_comp.co_ver, map_comp.co_in_ver,
map_comp.co_fseq, map_comp.co_flags, map_comp.co_nr);
map_comp.co_id, map_comp.co_rank, map_comp.co_ver,
map_comp.co_in_ver, map_comp.co_fseq, map_comp.co_flags,
map_comp.co_nr);

rc = pool_buf_attach(map_buf, &map_comp, 1);
if (rc != 0)
Expand Down
84 changes: 56 additions & 28 deletions src/object/srv_ec_aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ struct ec_agg_entry {
struct pl_obj_layout *ae_obj_layout;
struct daos_shard_loc ae_peer_pshards[OBJ_EC_MAX_P];
uint32_t ae_grp_idx;
uint32_t ae_is_leader:1;
uint32_t ae_is_leader:1,
ae_process_partial:1;
};

/* Parameters used to drive iterate all.
Expand Down Expand Up @@ -148,6 +149,7 @@ struct ec_agg_stripe_ud {
d_iov_t asu_csum_iov;
struct dcs_iod_csums *asu_iod_csums; /* iod csums */
ABT_eventual asu_eventual; /* Eventual for offload */
bool asu_valid_hole; /* with hole epoch >= parity epoch */
};

/* Represents an replicated data extent.
Expand Down Expand Up @@ -1182,7 +1184,13 @@ agg_process_partial_stripe(struct ec_agg_entry *entry)
estart = entry->ae_cur_stripe.as_offset;
d_list_for_each_entry(extent, &entry->ae_cur_stripe.as_dextents,
ae_link) {
D_ASSERT(!extent->ae_hole);
if (extent->ae_hole) {
/* valid hole processed by agg_process_holes_ult() */
D_ASSERTF(extent->ae_epoch < entry->ae_par_extent.ape_epoch,
"hole ext epoch "DF_X64", parity epoch "DF_X64"\n",
extent->ae_epoch, entry->ae_par_extent.ape_epoch);
continue;
}
if (extent->ae_epoch <= entry->ae_par_extent.ape_epoch) {
has_old_replicas = true;
continue;
Expand Down Expand Up @@ -1549,18 +1557,18 @@ agg_process_holes_ult(void *arg)
uint32_t pidx = ec_age2pidx(entry);
uint32_t peer;
int i, rc = 0;
bool valid_hole = false;
uint32_t max_delay = 0;
uint64_t enqueue_id;

stripe_ud->asu_valid_hole = false;
/* Process extent list to find what to re-replicate -- build recx array
*/
d_list_for_each_entry(agg_extent,
&entry->ae_cur_stripe.as_dextents, ae_link) {
if (agg_extent->ae_epoch < entry->ae_par_extent.ape_epoch)
continue;
if (agg_extent->ae_hole)
valid_hole = true;
stripe_ud->asu_valid_hole = true;
if (agg_extent->ae_recx.rx_idx - ss > last_ext_end) {
stripe_ud->asu_recxs[ext_cnt].rx_idx =
ss + last_ext_end;
Expand All @@ -1576,7 +1584,7 @@ agg_process_holes_ult(void *arg)
break;
}

if (!valid_hole)
if (!stripe_ud->asu_valid_hole)
goto out;

obj = obj_hdl2ptr(entry->ae_obj_hdl);
Expand Down Expand Up @@ -1782,21 +1790,29 @@ agg_process_holes(struct ec_agg_entry *entry)
if (*status != 0)
D_GOTO(ev_out, rc = *status);

/* Update local vos with replicate */
entry->ae_sgl.sg_nr = 1;
if (iod->iod_nr) {
/* write the reps to vos */
rc = vos_obj_update(agg_param->ap_cont_handle, entry->ae_oid,
entry->ae_cur_stripe.as_hi_epoch, 0, 0,
&entry->ae_dkey, 1, iod,
stripe_ud.asu_iod_csums,
&entry->ae_sgl);
if (rc) {
D_ERROR("vos_update_begin failed: "DF_RC"\n",
DP_RC(rc));
goto ev_out;
/* If with valid hole (hole epoch >= parity epoch), update local vos with replicas
* and delete parity.
* For the case without valid hole, the only possibility is with partial replica epoch >
* parity epoch, and all hole epoch < parity epoch, this case set ae_process_partial
* flag and will call agg_process_partial_stripe() later.
*/
if (stripe_ud.asu_valid_hole) {
if (iod->iod_nr) {
/* write the reps to vos */
entry->ae_sgl.sg_nr = 1;
rc = vos_obj_update(agg_param->ap_cont_handle, entry->ae_oid,
entry->ae_cur_stripe.as_hi_epoch, 0, 0,
&entry->ae_dkey, 1, iod,
stripe_ud.asu_iod_csums,
&entry->ae_sgl);
if (rc) {
D_ERROR("vos_update_begin failed: "DF_RC"\n",
DP_RC(rc));
goto ev_out;
}
}
/* Delete parity */

/* Delete parity even when nothing to replicate */
epoch_range.epr_lo = agg_param->ap_epr.epr_lo;
epoch_range.epr_hi = entry->ae_cur_stripe.as_hi_epoch;
recx.rx_nr = ec_age2cs(entry);
Expand All @@ -1806,7 +1822,11 @@ agg_process_holes(struct ec_agg_entry *entry)
entry->ae_oid, &epoch_range,
&entry->ae_dkey, &entry->ae_akey,
&recx);
} else {
D_DEBUG(DB_EPC, "no valid hole, set ae_process_partial flag\n");
entry->ae_process_partial = 1;
}

ev_out:
entry->ae_sgl.sg_nr = AGG_IOV_CNT;
ABT_eventual_free(&stripe_ud.asu_eventual);
Expand All @@ -1816,6 +1836,7 @@ agg_process_holes(struct ec_agg_entry *entry)
daos_csummer_free_ic(stripe_ud.asu_csummer, &stripe_ud.asu_iod_csums);
D_FREE(stripe_ud.asu_csum_iov.iov_buf);
daos_csummer_destroy(&stripe_ud.asu_csummer);

return rc;
}

Expand All @@ -1829,7 +1850,6 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
struct vos_iter_anchors anchors = { 0 };
bool update_vos = true;
bool write_parity = true;
bool process_holes = false;
int rc = 0;

if (DAOS_FAIL_CHECK(DAOS_FORCE_EC_AGG_FAIL))
Expand Down Expand Up @@ -1900,16 +1920,23 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
goto out;
}

/* With parity and some newer partial replicas, possibly holes */
if (ec_age_with_hole(entry))
process_holes = true;
else
rc = agg_process_partial_stripe(entry);
/* With parity and some newer partial replicas, possibly holes.
* Case 1: with valid hole (hole epoch >= parity epoch), can be handled by
* agg_process_holes().
* Case 2: without valid hole, must with partial replica, should be handled by
* agg_process_partial_stripe().
*/
if (ec_age_with_hole(entry)) {
entry->ae_process_partial = 0;
rc = agg_process_holes(entry);
if (rc != 0 || !entry->ae_process_partial)
goto clear_exts;
}

rc = agg_process_partial_stripe(entry);

out:
if (process_holes && rc == 0) {
rc = agg_process_holes(entry);
} else if (update_vos && rc == 0) {
if (update_vos && rc == 0) {
if (ec_age2p(entry) > 1) {
/* offload of ds_obj_update to push remote parity */
rc = agg_peer_update(entry, write_parity);
Expand All @@ -1926,6 +1953,7 @@ agg_process_stripe(struct ec_agg_param *agg_param, struct ec_agg_entry *entry)
}
}

clear_exts:
agg_clear_extents(entry);
return rc;
}
Expand Down
3 changes: 3 additions & 0 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2424,6 +2424,8 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
D_ASSERT(ioc.ioc_coc != NULL);
dkey = (daos_key_t *)&oer->er_dkey;
iod = (daos_iod_t *)&oer->er_iod;
if (iod->iod_nr == 0) /* nothing to replicate, directly remove parity */
goto remove_parity;
iod_csums = oer->er_iod_csums.ca_arrays;
rc = vos_update_begin(ioc.ioc_coc->sc_hdl, oer->er_oid, oer->er_epoch_range.epr_hi,
VOS_OF_REBUILD, dkey, 1, iod, iod_csums, 0, &ioh, NULL);
Expand Down Expand Up @@ -2456,6 +2458,7 @@ ds_obj_ec_rep_handler(crt_rpc_t *rpc)
DP_RC(rc));
goto out;
}
remove_parity:
recx.rx_nr = obj_ioc2ec_cs(&ioc);
recx.rx_idx = (oer->er_stripenum * recx.rx_nr) | PARITY_INDICATOR;
rc = vos_obj_array_remove(ioc.ioc_coc->sc_hdl, oer->er_oid, &oer->er_epoch_range, dkey,
Expand Down
8 changes: 7 additions & 1 deletion src/object/srv_obj_migrate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,12 @@ migrate_punch(struct migrate_pool_tls *tls, struct migrate_one *mrone,

/* punch records */
if (mrone->mo_punch_iod_num > 0 && mrone->mo_rec_punch_eph <= tls->mpt_max_eph) {
if (daos_oclass_is_ec(&mrone->mo_oca) &&
!is_ec_parity_shard_by_layout_ver(mrone->mo_oid.id_layout_ver,
mrone->mo_dkey_hash, &mrone->mo_oca,
mrone->mo_oid.id_shard))
mrone_recx_daos2_vos(mrone, mrone->mo_punch_iods, mrone->mo_punch_iod_num);

rc = vos_obj_update(cont->sc_hdl, mrone->mo_oid,
mrone->mo_rec_punch_eph,
mrone->mo_version, 0, &mrone->mo_dkey,
Expand Down Expand Up @@ -2365,7 +2371,7 @@ punch_iod_pack(struct migrate_one *mrone, struct dc_object *obj, daos_iod_t *iod

D_DEBUG(DB_TRACE,
"idx %d akey "DF_KEY" nr %d size "DF_U64" type %d\n",
idx, DP_KEY(&iod->iod_name), iod->iod_nr, iod->iod_size,
idx, DP_KEY(&iod->iod_name), mrone->mo_punch_iods->iod_nr, iod->iod_size,
iod->iod_type);

if (mrone->mo_rec_punch_eph < eph)
Expand Down

0 comments on commit 67af463

Please sign in to comment.