Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix multi-threaded on-the-fly indexing problems #1672

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 34 additions & 36 deletions bgzf.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,41 +230,8 @@ int bgzf_idx_push(BGZF *fp, hts_idx_t *hidx, int tid, hts_pos_t beg, hts_pos_t e
return 0;
}

/*
* bgzf analogue to hts_idx_amend_last.
*
* This is needed when multi-threading and writing indices on the fly.
* At the point of writing a record we know the virtual offset for start
* and end, but that end virtual offset may be the end of the current
* block. In standard indexing our end virtual offset becomes the start
* of the next block. Thus to ensure bit for bit compatibility we
* detect this boundary case and fix it up here.
*
* In theory this has no behavioural change, but it also works around
* a bug elsewhere which causes bgzf_read to return 0 when our offset
* is the end of a block rather than the start of the next.
*/
void bgzf_idx_amend_last(BGZF *fp, hts_idx_t *hidx, uint64_t offset) {
mtaux_t *mt = fp->mt;
if (!mt) {
hts_idx_amend_last(hidx, offset);
return;
}

pthread_mutex_lock(&mt->idx_m);
hts_idx_cache_t *ic = &mt->idx_cache;
if (ic->nentries > 0) {
hts_idx_cache_entry *e = &ic->e[ic->nentries-1];
if ((offset & 0xffff) == 0 && e->offset != 0) {
// bumped to next block number
e->offset = 0;
e->block_number++;
}
}
pthread_mutex_unlock(&mt->idx_m);
}

static int bgzf_idx_flush(BGZF *fp) {
static int bgzf_idx_flush(BGZF *fp,
size_t block_uncomp_len, size_t block_comp_len) {
mtaux_t *mt = fp->mt;

if (!mt->idx_cache.e) {
Expand All @@ -280,6 +247,37 @@ static int bgzf_idx_flush(BGZF *fp) {
assert(mt->idx_cache.nentries == 0 || mt->block_written <= e[0].block_number);

for (i = 0; i < mt->idx_cache.nentries && e[i].block_number == mt->block_written; i++) {
if (block_uncomp_len > 0 && e[i].offset == block_uncomp_len) {
/*
* If the virtual offset is at the end of the current block,
* adjust it to point to the start of the next one. This
* is needed when on-the-fly indexing has recorded a virtual
* offset just before a new block has been started, and makes
* on-the-fly and standard indexing give exactly the same results.
*
* In theory the two virtual offsets are equivalent, but pointing
* to the end of a block is inefficient, and caused problems with
* versions of HTSlib before 1.11 where bgzf_read() would
* incorrectly return EOF.
*/

// Assert that this is the last entry for the current block_number
assert(i == mt->idx_cache.nentries - 1
|| e[i].block_number < e[i + 1].block_number);

// Work out where the next block starts. For this entry, the
// offset will be zero.
uint64_t next_block_addr = mt->block_address + block_comp_len;
if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
next_block_addr << 16, e[i].is_mapped) < 0) {
pthread_mutex_unlock(&mt->idx_m);
return -1;
}
// Count this entry and drop out of the loop
i++;
break;
}

if (hts_idx_push(mt->hts_idx, e[i].tid, e[i].beg, e[i].end,
(mt->block_address << 16) + e[i].offset,
e[i].is_mapped) < 0) {
Expand Down Expand Up @@ -1423,7 +1421,7 @@ static void *bgzf_mt_writer(void *vp) {
}

// Flush any cached hts_idx_push calls
if (bgzf_idx_flush(fp) < 0)
if (bgzf_idx_flush(fp, j->uncomp_len, j->comp_len) < 0)
goto err;

if (hwrite(fp->fp, j->comp_data, j->comp_len) != j->comp_len)
Expand Down
2 changes: 0 additions & 2 deletions sam.c
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,6 @@ static int bam_write_idx1(htsFile *fp, const sam_hdr_t *h, const bam1_t *b) {
return -1;
if (!bfp->mt)
hts_idx_amend_last(fp->idx, bgzf_tell(bfp));
else
bgzf_idx_amend_last(bfp, fp->idx, bgzf_tell(bfp));

int ret = bam_write1(bfp, b);
if (ret < 0)
Expand Down
16 changes: 11 additions & 5 deletions vcf.c
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,8 @@ int bcf_write(htsFile *hfp, bcf_hdr_t *h, bcf1_t *v)
if ( bgzf_write(fp, v->indiv.s, v->indiv.l) != v->indiv.l ) return -1;

if (hfp->idx) {
if (hts_idx_push(hfp->idx, v->rid, v->pos, v->pos + v->rlen, bgzf_tell(fp), 1) < 0)
if (bgzf_idx_push(fp, hfp->idx, v->rid, v->pos, v->pos + v->rlen,
bgzf_tell(fp), 1) < 0)
return -1;
}

Expand Down Expand Up @@ -3959,19 +3960,19 @@ int vcf_write(htsFile *fp, const bcf_hdr_t *h, bcf1_t *v)
if ( fp->format.compression!=no_compression ) {
if (bgzf_flush_try(fp->fp.bgzf, fp->line.l) < 0)
return -1;
if (fp->idx)
hts_idx_amend_last(fp->idx, bgzf_tell(fp->fp.bgzf));
ret = bgzf_write(fp->fp.bgzf, fp->line.s, fp->line.l);
} else {
ret = hwrite(fp->fp.hfile, fp->line.s, fp->line.l);
}

if (fp->idx) {
if (fp->idx && fp->format.compression == bgzf) {
int tid;
if ((tid = hts_idx_tbi_name(fp->idx, v->rid, bcf_seqname_safe(h, v))) < 0)
return -1;

if (hts_idx_push(fp->idx, tid, v->pos, v->pos + v->rlen, bgzf_tell(fp->fp.bgzf), 1) < 0)
if (bgzf_idx_push(fp->fp.bgzf, fp->idx,
tid, v->pos, v->pos + v->rlen,
bgzf_tell(fp->fp.bgzf), 1) < 0)
return -1;
}

Expand Down Expand Up @@ -4160,6 +4161,11 @@ static int vcf_idx_init(htsFile *fp, bcf_hdr_t *h, int min_shift, const char *fn
int bcf_idx_init(htsFile *fp, bcf_hdr_t *h, int min_shift, const char *fnidx) {
int n_lvls, nids = 0;

if (fp->format.compression != bgzf) {
hts_log_error("Indexing is only supported on BGZF-compressed files");
return -3; // Matches no-compression return for bcf_index_build3()
}

if (fp->format.format == vcf)
return vcf_idx_init(fp, h, min_shift, fnidx);

Expand Down