Skip to content

Commit

Permalink
Fix the multi-threaded CRAM multi-region iterator regression.
Browse files Browse the repository at this point in the history
Fixes #1061

This appears to have been introduced during bfc9f0d and fixed for BAM
only in 6149ea6.  The effect on multi-threading decoding for CRAM was
significant.

This fix takes a totally different approach, which not only fixes the
regression but passes it.  CRAM can do CRAM_OPT_RANGE queries, used by
the single version of the iterator, which informs the decode of both
start and end range positions.  When threading this means we don't
preemptively decode the next container unless it'll actually be used.
This same logic is now used in the multi-region iterator, although
it's complex.

The general strategy is as follows:

- Ensure we know the next container start from index.  This needs a
  small tweak to cram_index struct as the next container isn't quite
  the same as this container + slice offset + slice size (sadly).  I
  think it's missing the size of the container struct itself.

- When being given a start..end offset, step into the reg_list to find
  the corresponding chr:start-end range.

- Produce a new CRAM_OPT_RANGE_NOSEEK option that does the same job
  as the old CRAM_OPT_RANGE opt but without the seek.  This is
  necessary hts.c does the seek for us and the pseek/ptell only work
  with each other and not within the cram subdir code itself).

- Identify neighbouring file offsets and merge together their
  correponding ranges so a block of adjacent offsets becomes a single
  CRAM_OPT_RANGE query.  We cache the end offset (.v) in
  iter->end so we can avoid duplicating the seek / range request
  in subsequent intervals.

- Manage the EOF vs EOR (end of range) return values.  For EOR we have
  do the incrementing ourselves as we need to restart the loop without
  triggering the end-of-file or end-of-multi-iterator logic below it.

- Tweak the region sorting a bit so ties are resolved by the max
  value.  We want the index into reg_list to also be sorted, so we can
  accurately step through them.  Note this logic is CRAM specific, as
  the sorting wouldn't work on BAI anyway due to the R-tree.

Some benchmarks on ~37,000 regions returning ~110 million seqs across
NA06985.final.cram:

CRAM-1.10 no threads:  real  4m37.361s   user  4m21.128s   sys  0m7.988s
CRAM-1.10 -@16:        real  3m14.371s   user 28m48.872s   sys  4m52.442s
CRAM-dev -@16:         real  5m55.670s   user 61m0.005s    sys 11m23.147s
CRAM-current -@16:     real  1m55.701s   user  5m54.234s   sys  0m51.699s

The increase in user time between unthreaded and 16 threads is modest.
System time increase is considerable, but this appears to primarily be
down to glibc malloc inefficiencies.  Using libtcmalloc.so instead gives:

1M CRAM-current -@16:  real  1m30.882s   user  6m9.103s    sys  0m4.960s

We're still nowhere near using 16 threads, but this is because the
average size of each region is significantly smaller than 16
containers worth.
  • Loading branch information
jkbonfield authored and valeriuo committed Jun 2, 2020
1 parent 9de45b7 commit d314715
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 14 deletions.
28 changes: 28 additions & 0 deletions cram/cram_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,34 @@ cram_index *cram_index_query_last(cram_fd *fd, int refid, hts_pos_t end) {
first++;
}

// Compute the start location of next container.
//
// This is useful for stitching containers together in the multi-region
// iterator. Sadly we can't compute this from the single index line.
//
// Note we can have neighbouring index entries at the same location
// for when we have multi-reference mode and/or multiple slices per
// container.
cram_index *next = first;
do {
if (next >= last) {
// Next non-empty reference
while (++refid+1 < fd->index_sz)
if (fd->index[refid+1].nslice)
break;
if (refid+1 >= fd->index_sz) {
next = NULL;
} else {
next = fd->index[refid+1].e;
last = fd->index[refid+1].e + fd->index[refid+1].nslice;
}
} else {
next++;
}
} while (next && next->offset == first->offset);

first->next = next ? next->offset : 0;

return first;
}

Expand Down
19 changes: 19 additions & 0 deletions cram/cram_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -4870,6 +4870,25 @@ int cram_set_voption(cram_fd *fd, enum hts_fmt_option opt, va_list args) {
return r;
}

case CRAM_OPT_RANGE_NOSEEK: {
// As per CRAM_OPT_RANGE, but no seeking
pthread_mutex_lock(&fd->range_lock);
cram_range *r = va_arg(args, cram_range *);
fd->range = *r;
if (r->refid == HTS_IDX_NOCOOR) {
fd->range.refid = -1;
fd->range.start = 0;
} else if (r->refid == HTS_IDX_START || r->refid == HTS_IDX_REST) {
fd->range.refid = -2; // special case in cram_next_slice
}
if (fd->range.refid != -2)
fd->required_fields |= SAM_POS;
fd->ooc = 0;
fd->eof = 0;
pthread_mutex_unlock(&fd->range_lock);
return 0;
}

case CRAM_OPT_REFERENCE:
return cram_load_reference(fd, va_arg(args, char *));

Expand Down
1 change: 1 addition & 0 deletions cram/cram_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ typedef struct cram_index {
int slice; // 1.0 landmark index, 1.1 landmark value
int len; // 1.1 - size of slice in bytes
int64_t offset; // 1.0 1.1
int64_t next; // derived: offset of next container.
} cram_index;

typedef struct {
Expand Down
176 changes: 162 additions & 14 deletions hts.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,9 +1603,11 @@ int hts_check_EOF(htsFile *fp)
#define META_BIN(idx) ((idx)->n_bins + 1)

#define pair64_lt(a,b) ((a).u < (b).u)
#define pair64max_lt(a,b) ((a).u < (b).u || \
((a).u == (b).u && (a).max < (b).max))

KSORT_INIT_STATIC(_off, hts_pair64_t, pair64_lt)
KSORT_INIT_STATIC(_off_max, hts_pair64_max_t, pair64_lt)
KSORT_INIT_STATIC(_off_max, hts_pair64_max_t, pair64max_lt)

typedef struct {
int32_t m, n;
Expand Down Expand Up @@ -2832,7 +2834,9 @@ int hts_itr_multi_cram(const hts_idx_t *idx, hts_itr_t *iter)
}

if (e) {
off[n_off++].v = e->offset + e->slice + e->len;
off[n_off++].v = e->next
? e->next
: e->offset + e->slice + e->len;
} else {
hts_log_warning("Could not set offset end for region %d:%"PRIhts_pos"-%"PRIhts_pos". Skipping", tid, beg, end);
}
Expand Down Expand Up @@ -3381,6 +3385,7 @@ int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r)
// A NULL iter->off should always be accompanied by iter->finished.
assert(iter->off != NULL || iter->nocoor != 0);

int next_range = 0;
for (;;) {
// Note that due to the way bam indexing works, iter->off may contain
// file chunks that are not actually needed as they contain data
Expand All @@ -3390,31 +3395,66 @@ int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r)
// associated with the file region with iter->curr_tid and
// iter->curr_intv.

if (iter->curr_off == 0
if (next_range
|| iter->curr_off == 0
|| iter->i >= iter->n_off
|| iter->curr_off >= iter->off[iter->i].v
|| (iter->off[iter->i].max >> 32 == iter->curr_tid
&& (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv)) {

// Jump to the next chunk. It may be necessary to skip more
// than one as the iter->off list can include overlapping entries.

do {
iter->i++;
} while (iter->i < iter->n_off
&& (iter->curr_off >= iter->off[iter->i].v
|| (iter->off[iter->i].max >> 32 == iter->curr_tid
&& (iter->off[iter->i].max & 0xffffffff) < iter->curr_intv)));

if (iter->is_cram && iter->i < iter->n_off) {
// Insure iter->curr_reg is correct.
//
// We need this for CRAM as we shortcut some of the later
// logic by getting an end-of-range and continuing to the
// next offset.
//
// We cannot do this for BAM (and fortunately do not need to
// either) because in BAM world a query to genomic positions
// GX and GY leading to a seek offsets PX and PY may have
// GX > GY and PX < PY. (This is due to the R-tree and falling
// between intervals, bumping up to a higher bin.)
// CRAM strictly follows PX >= PY if GX >= GY, so this logic
// works.
int want_tid = iter->off[iter->i].max >> 32;
if (!(iter->curr_reg < iter->n_reg &&
iter->reg_list[iter->curr_reg].tid == want_tid)) {
int j;
for (j = 0; j < iter->n_reg; j++)
if (iter->reg_list[j].tid == want_tid)
break;
if (j == iter->n_reg)
return -1;
iter->curr_reg = j;
iter->curr_tid = iter->reg_list[iter->curr_reg].tid;
};
iter->curr_intv = iter->off[iter->i].max & 0xffffffff;
}

if (iter->i >= iter->n_off) { // no more chunks, except NOCOORs
if (iter->nocoor) {
next_range = 0;
if (iter->seek(fp, iter->nocoor_off, SEEK_SET) < 0) {
hts_log_error("Seek at offset %" PRIu64 " failed.", iter->nocoor_off);
return -1;
}
if (iter->is_cram) {
cram_range r = { -1 };
cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r);
}

//The first slice covering the unmapped reads might contain a few mapped reads, so scroll
//forward until finding the first unmapped read.
// The first slice covering the unmapped reads might
// contain a few mapped reads, so scroll
// forward until finding the first unmapped read.
do {
ret = iter->readrec(fp, fd, r, &tid, &beg, &end);
} while (tid >= 0 && ret >=0);
Expand All @@ -3436,27 +3476,134 @@ int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r)
} else if (iter->i < iter->n_off) {
// New chunk may overlap the last one, so ensure we
// only seek forwards.
if (iter->curr_off < iter->off[iter->i].u) {
if (iter->curr_off < iter->off[iter->i].u || next_range) {
iter->curr_off = iter->off[iter->i].u;
if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) {
hts_log_error("Seek at offset %" PRIu64 " failed.", iter->curr_off);
return -1;

// CRAM has the capability of setting an end location.
// This means multi-threaded decodes can stop once they
// reach that point, rather than pointlessly decoding
// more slices than we'll be using.
//
// We have to be careful here. Whenever we set the cram
// range we need a corresponding seek in order to ensure
// we can safely decode at that offset. We use next_range
// var to ensure this is always true; this is set on
// end-of-range condition. It's never modified for BAM.
if (iter->is_cram) {
// Next offset.[uv] tuple, but it's already been
// included in our cram range, so don't seek and don't
// reset range so we can efficiently multi-thread.
if (!next_range && iter->curr_off < iter->end)
goto range_already_known;

if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) {
hts_log_error("Seek at offset %" PRIu64
" failed.", iter->curr_off);
return -1;
}

// Find the genomic range matching this interval.
int j;
hts_reglist_t *rl = &iter->reg_list[iter->curr_reg];
cram_range r = {
rl->tid,
rl->intervals[iter->curr_intv].beg,
rl->intervals[iter->curr_intv].end
};

// Expand it up to cover neighbouring intervals.
// Note we can only have a single chromosome in a
// range, so if we detect our blocks span chromosomes
// or we have a multi-ref mode slice, we just use
// HTS_IDX_START refid instead. This doesn't actually
// seek (due to CRAM_OPT_RANGE_NOSEEK) and is simply
// and indicator of decoding with no end limit.
//
// That isn't as efficient as it could be, but it's
// no poorer than before and it works.
int tid = r.refid;
int64_t end = r.end;
int64_t v = iter->off[iter->i].v;
j = iter->i+1;
while (j < iter->n_off) {
if (iter->off[j].u > v)
break;

uint64_t max = iter->off[j].max;
if ((max>>32) != tid)
tid = HTS_IDX_START; // => no range limit

if (end < rl->intervals[max & 0xffffffff].end)
end = rl->intervals[max & 0xffffffff].end;
if (v < iter->off[j].v)
v = iter->off[j].v;
j++;
}
r.refid = tid;
r.end = end;

// Remember maximum 'v' here so we don't do
// unnecessary subsequent seeks for the next
// regions. We can't change curr_off, but
// beg/end are used only by single region iterator so
// we cache it there to avoid changing the struct.
iter->end = v;

cram_set_option(fp, CRAM_OPT_RANGE_NOSEEK, &r);

} else { // Not CRAM
if (iter->seek(fp, iter->curr_off, SEEK_SET) < 0) {
hts_log_error("Seek at offset %" PRIu64 " failed.",
iter->curr_off);
return -1;
}
}
}
}

range_already_known:
next_range = 0;
}

ret = iter->readrec(fp, fd, r, &tid, &beg, &end);
if (ret < 0)
break;
if (ret < 0) {
if (iter->is_cram && cram_eof(fp)) {
// Skip to end of range
//
// We should never be adjusting curr_off manually unless
// we also can guarantee we'll be doing a seek after to
// a new location. Otherwise we'll be reading wrong offset
// for the next container.
//
// We ensure this by adjusting our CRAM_OPT_RANGE
// accordingly above, but to double check we also
// set the skipped_block flag to enforce a seek also.
iter->curr_off = iter->off[iter->i].v;
next_range = 1;

// Next region
if (++iter->curr_intv >= iter->reg_list[iter->curr_reg].count){
if (++iter->curr_reg >= iter->n_reg)
break;
iter->curr_intv = 0;
iter->curr_tid = iter->reg_list[iter->curr_reg].tid;
}
continue;
} else {
break;
}
}

iter->curr_off = iter->tell(fp);

if (tid != iter->curr_tid) {
hts_reglist_t key;
key.tid = tid;

found_reg = (hts_reglist_t *)bsearch(&key, iter->reg_list, iter->n_reg, sizeof(hts_reglist_t), compare_regions);
found_reg = (hts_reglist_t *)bsearch(&key, iter->reg_list,
iter->n_reg,
sizeof(hts_reglist_t),
compare_regions);
if (!found_reg)
continue;

Expand All @@ -3469,7 +3616,8 @@ int hts_itr_multi_next(htsFile *fd, hts_itr_t *iter, void *r)
ci = iter->curr_intv;

for (i = ci; i < iter->reg_list[cr].count; i++) {
if (end > iter->reg_list[cr].intervals[i].beg && iter->reg_list[cr].intervals[i].end > beg) {
if (end > iter->reg_list[cr].intervals[i].beg &&
iter->reg_list[cr].intervals[i].end > beg) {
iter->curr_beg = beg;
iter->curr_end = end;
iter->curr_intv = i;
Expand Down
1 change: 1 addition & 0 deletions htslib/hts.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ enum hts_fmt_option {
CRAM_OPT_BASES_PER_SLICE,
CRAM_OPT_STORE_MD,
CRAM_OPT_STORE_NM,
CRAM_OPT_RANGE_NOSEEK, // CRAM_OPT_RANGE minus the seek

// General purpose
HTS_OPT_COMPRESSION_LEVEL = 100,
Expand Down

0 comments on commit d314715

Please sign in to comment.