Skip to content

Commit

Permalink
Avoid fd->no_ref thread issues
Browse files Browse the repository at this point in the history
As we now have the capacity to set fd->no_ref from within a thread,
when embed_ref==2 and cram_generate_reference fails, we also need to
guard against it changing while other threads are executing.  As long
as an encoder is 100% consistent from start to finish with no_ref it's
fine, so we fetch a local copy of this at the start.

Embed_ref=2 is also incompatible with multi-ref mode, so we also
switch to no_ref for that scenario too.  This isn't ideal when we
flip back out of multi-ref mode though.

Also fixed a dubious looking fd->multi_seq assignment outside of a
thread lock.
  • Loading branch information
jkbonfield committed Jan 31, 2023
1 parent 314c167 commit e8b30ba
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 35 deletions.
93 changes: 64 additions & 29 deletions cram/cram_encode.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ KHASH_MAP_INIT_STR(m_s2u64, uint64_t)
static int process_one_read(cram_fd *fd, cram_container *c,
cram_slice *s, cram_record *cr,
bam_seq_t *b, int rnum, kstring_t *MD,
int embed_ref);
int embed_ref, int no_ref);

/*
* Returns index of val into key.
Expand All @@ -87,6 +87,10 @@ cram_block *cram_encode_compression_header(cram_fd *fd, cram_container *c,
cram_block *map = cram_new_block(COMPRESSION_HEADER, 0);
int i, mc, r = 0;

pthread_mutex_lock(&fd->metrics_lock);
int no_ref = c->no_ref;
pthread_mutex_unlock(&fd->metrics_lock);

if (!cb || !map)
return NULL;

Expand Down Expand Up @@ -162,7 +166,7 @@ cram_block *cram_encode_compression_header(cram_fd *fd, cram_container *c,
kh_val(h->preservation_map, k).i = h->qs_seq_orient;
}

if (fd->no_ref || embed_ref>0) {
if (no_ref || embed_ref>0) {
// Reference Required == No
k = kh_put(map, h->preservation_map, "RR", &r);
if (-1 == r) return NULL;
Expand Down Expand Up @@ -1746,7 +1750,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
cram_block_compression_hdr *h = c->comp_hdr;
cram_block *c_hdr;
int multi_ref = 0;
int r1, r2, sn, nref, embed_ref;
int r1, r2, sn, nref, embed_ref, no_ref;
spare_bams *spares;

if (CRAM_MAJOR_VERS(fd->version) == 1)
Expand All @@ -1759,10 +1763,11 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
/* Cache references up-front if we have unsorted access patterns */
pthread_mutex_lock(&fd->ref_lock);
nref = fd->refs->nref;
embed_ref = fd->embed_ref;
embed_ref = c->embed_ref;
no_ref = c->no_ref;
pthread_mutex_unlock(&fd->ref_lock);

if (!fd->no_ref && c->refs_used) {
if (!no_ref && c->refs_used) {
for (i = 0; i < nref; i++) {
if (c->refs_used[i]) {
cram_get_ref(fd, i, 1, 0);
Expand All @@ -1774,7 +1779,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {

/* To create M5 strings */
/* Fetch reference sequence */
if (!fd->no_ref) {
if (!no_ref) {
if (!c->bams || !c->bams[0])
goto_err;
bam_seq_t *b = c->bams[0];
Expand All @@ -1793,7 +1798,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
hts_log_warning("NOTE: the CRAM file will be bigger than"
" using an external reference");
pthread_mutex_lock(&fd->ref_lock);
embed_ref = fd->embed_ref = 2;
embed_ref = c->embed_ref = fd->embed_ref = 2;
pthread_mutex_unlock(&fd->ref_lock);
goto auto_ref;
} else if (ref) {
Expand Down Expand Up @@ -1848,8 +1853,8 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
if (cram_generate_reference(c, s, r1) < 0) {
hts_log_error("Failed to build reference, switching to non-ref mode");
pthread_mutex_lock(&fd->ref_lock);
fd->embed_ref = 0;
fd->no_ref = 1;
c->embed_ref = fd->embed_ref = 0;
c->no_ref = fd->no_ref = 1;
pthread_mutex_unlock(&fd->ref_lock);
goto restart;
}
Expand All @@ -1862,7 +1867,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
bam_seq_t *b = c->bams[r1];

/* If multi-ref we need to cope with changing reference per seq */
if (c->multi_seq && !fd->no_ref) {
if (c->multi_seq && !no_ref) {
if (bam_ref(b) != c->ref_seq_id && bam_ref(b) >= 0) {
if (c->ref_seq_id >= 0)
cram_ref_decr(fd->refs, c->ref_seq_id);
Expand All @@ -1884,7 +1889,8 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
}
}

if (process_one_read(fd, c, s, cr, b, r2, &MD, embed_ref) != 0) {
if (process_one_read(fd, c, s, cr, b, r2, &MD, embed_ref,
no_ref) != 0) {
free(MD.s);
return -1;
}
Expand Down Expand Up @@ -1951,7 +1957,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
}
}

if (c->multi_seq && !fd->no_ref) {
if (c->multi_seq && !no_ref) {
if (c->ref_seq_id >= 0)
cram_ref_decr(fd->refs, c->ref_seq_id);
}
Expand Down Expand Up @@ -1983,12 +1989,16 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {


/* Compute MD5s */
pthread_mutex_lock(&fd->ref_lock);
no_ref = c->no_ref; // may have been updated in cram_put_bam_seq
pthread_mutex_unlock(&fd->ref_lock);
int is_v4 = CRAM_MAJOR_VERS(fd->version) >= 4 ? 1 : 0;

for (i = 0; i < c->curr_slice; i++) {
cram_slice *s = c->slices[i];

if (CRAM_MAJOR_VERS(fd->version) != 1) {
if (s->hdr->ref_seq_id >= 0 && c->multi_seq == 0 && !fd->no_ref) {
if (s->hdr->ref_seq_id >= 0 && c->multi_seq == 0 && !no_ref) {
hts_md5_context *md5 = hts_md5_init();
if (!md5)
return -1;
Expand Down Expand Up @@ -2365,7 +2375,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) {
}

/* Cache references up-front if we have unsorted access patterns */
if (!fd->no_ref && c->refs_used) {
if (!no_ref && c->refs_used) {
for (i = 0; i < fd->refs->nref; i++) {
if (c->refs_used[i])
cram_ref_decr(fd->refs, i);
Expand Down Expand Up @@ -2603,7 +2613,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b,
cram_slice *s, cram_record *cr,
int verbatim_NM, int verbatim_MD,
int NM, kstring_t *MD, int cf_tag,
int *err) {
int no_ref, int *err) {
char *aux, *orig;
sam_hrec_rg_t *brg = NULL;
int aux_size = bam_get_l_aux(b);
Expand Down Expand Up @@ -2655,7 +2665,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b,

// MD:Z
if (aux[0] == 'M' && aux[1] == 'D' && aux[2] == 'Z') {
if (cr->len && !fd->no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_MD) {
if (cr->len && !no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_MD) {
if (MD && MD->s && strncasecmp(MD->s, aux+3, orig + aux_size - (aux+3)) == 0) {
while (*aux++);
if (CRAM_MAJOR_VERS(fd->version) >= 4)
Expand All @@ -2667,7 +2677,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b,

// NM:i
if (aux[0] == 'N' && aux[1] == 'M') {
if (cr->len && !fd->no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_NM) {
if (cr->len && !no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_NM) {
int NM_ = bam_aux2i((uint8_t *)aux+2);
if (NM_ == NM) {
switch(aux[2]) {
Expand Down Expand Up @@ -3101,7 +3111,12 @@ static cram_container *cram_next_container(cram_fd *fd, bam_seq_t *b) {
fd->slices_per_container);
if (!c)
return NULL;

pthread_mutex_lock(&fd->ref_lock);
c->no_ref = fd->no_ref;
c->embed_ref = fd->embed_ref;
c->record_counter = fd->record_counter;
pthread_mutex_unlock(&fd->ref_lock);
c->curr_ref = bam_ref(b);
}

Expand Down Expand Up @@ -3148,7 +3163,7 @@ static cram_container *cram_next_container(cram_fd *fd, bam_seq_t *b) {
static int process_one_read(cram_fd *fd, cram_container *c,
cram_slice *s, cram_record *cr,
bam_seq_t *b, int rnum, kstring_t *MD,
int embed_ref) {
int embed_ref, int no_ref) {
int i, fake_qual = -1, NM = 0;
char *cp;
char *ref, *seq, *qual;
Expand Down Expand Up @@ -3189,7 +3204,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,

// Non reference based encoding means storing the bases verbatim as features, which in
// turn means every base also has a quality already stored.
if (!fd->no_ref || CRAM_MAJOR_VERS(fd->version) >= 3)
if (!no_ref || CRAM_MAJOR_VERS(fd->version) >= 3)
cr->cram_flags |= CRAM_FLAG_PRESERVE_QUAL_SCORES;

if (cr->len <= 0 && CRAM_MAJOR_VERS(fd->version) >= 3)
Expand Down Expand Up @@ -3267,7 +3282,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
//fprintf(stderr, "\nBAM_CMATCH\nR: %.*s\nS: %.*s\n",
// cig_len, &ref[apos], cig_len, &seq[spos]);
l = 0;
if (!fd->no_ref && cr->len) {
if (!no_ref && cr->len) {
int end = cig_len+apos < c->ref_end
? cig_len : c->ref_end - apos;
char *sp = &seq[spos];
Expand Down Expand Up @@ -3362,7 +3377,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
}

if (l < cig_len && cr->len) {
if (fd->no_ref) {
if (no_ref) {
if (CRAM_MAJOR_VERS(fd->version) == 3) {
if (cram_add_bases(fd, c, s, cr, spos,
cig_len-l, &seq[spos]))
Expand Down Expand Up @@ -3421,7 +3436,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
if (cram_add_insertion(c, s, cr, spos, cig_len,
cr->len ? &seq[spos] : NULL))
return -1;
if (fd->no_ref && cr->len) {
if (no_ref && cr->len) {
for (l = 0; l < cig_len; l++, spos++) {
cram_add_quality(fd, c, s, cr, spos, qual[spos]);
}
Expand All @@ -3437,7 +3452,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
fd->version))
return -1;

if (fd->no_ref &&
if (no_ref &&
!(cr->cram_flags & CRAM_FLAG_PRESERVE_QUAL_SCORES)) {
if (cr->len) {
for (l = 0; l < cig_len; l++, spos++) {
Expand Down Expand Up @@ -3473,7 +3488,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
return -1;
}
fake_qual = spos;
cr->aend = fd->no_ref ? apos : MIN(apos, c->ref_end);
cr->aend = no_ref ? apos : MIN(apos, c->ref_end);
if (cram_stats_add(c->stats[DS_FN], cr->nfeature) < 0)
goto block_err;

Expand All @@ -3496,7 +3511,7 @@ static int process_one_read(cram_fd *fd, cram_container *c,
int err = 0;
sam_hrec_rg_t *brg =
cram_encode_aux(fd, b, c, s, cr, verbatim_NM, verbatim_MD, NM, MD,
cf_tag, &err);
cf_tag, no_ref, &err);
if (err)
goto block_err;

Expand Down Expand Up @@ -3792,6 +3807,15 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) {
}
c = fd->ctr;

// Cache fd embed_ref and no_ref flags so we can create consistent
// containers even when these fd globals are changing in other
// threads.
pthread_mutex_lock(&fd->ref_lock);
int embed_ref;
c->no_ref = fd->no_ref;
embed_ref = c->embed_ref = fd->embed_ref;
pthread_mutex_unlock(&fd->ref_lock);

if (!c->slice || c->curr_rec == c->max_rec ||
(bam_ref(b) != c->curr_ref && c->curr_ref >= -1) ||
(c->s_num_bases >= fd->bases_per_slice)) {
Expand All @@ -3807,9 +3831,6 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) {
* The multi_seq var here refers to our intention for the next slice.
* This slice has already been encoded so we output as-is.
*/
pthread_mutex_lock(&fd->ref_lock);
int embed_ref = fd->embed_ref;
pthread_mutex_unlock(&fd->ref_lock);
if (fd->multi_seq == -1 && c->curr_rec < c->max_rec/4+10 &&
fd->last_slice && fd->last_slice < c->max_rec/4+10 &&
embed_ref<=0) {
Expand Down Expand Up @@ -3857,6 +3878,20 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) {
c->multi_seq = 1;
c->pos_sorted = 0;

// Cram_next_container may end up flushing an existing one and
// triggering fd->embed_ref=2 if no reference is found.
// Embedded refs are incompatible with multi-seq, so we bail
// out and switch to no_ref in this scenario. We do this
// within the container only, as multi_seq may be temporary
// and we switch back away from it again.
pthread_mutex_lock(&fd->ref_lock);
if (fd->embed_ref) {
hts_log_info("Changing from embed_ref to no_ref mode");
c->embed_ref = 0;//fd->embed_ref = 0;
c->no_ref = 1;//fd->no_ref = 1;
}
pthread_mutex_unlock(&fd->ref_lock);

if (!c->refs_used) {
pthread_mutex_lock(&fd->ref_lock);
c->refs_used = calloc(fd->refs->nref, sizeof(int));
Expand All @@ -3882,8 +3917,8 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) {
} else if (c->refs_used && c->refs_used[bam_ref(b)]) {
pthread_mutex_lock(&fd->ref_lock);
fd->unsorted = 1;
pthread_mutex_unlock(&fd->ref_lock);
fd->multi_seq = 1;
pthread_mutex_unlock(&fd->ref_lock);
}
}

Expand Down
2 changes: 2 additions & 0 deletions cram/cram_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -3650,6 +3650,8 @@ cram_container *cram_new_container(int nrec, int nslice) {
c->max_apos = 0;
c->multi_seq = 0;
c->qs_seq_orient = 1;
c->no_ref = 0;
c->embed_ref = 0;

c->bams = NULL;

Expand Down
14 changes: 8 additions & 6 deletions cram/cram_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ struct cram_container {
/* Copied from fd before encoding, to allow multi-threading */
int ref_start, first_base, last_base, ref_id, ref_end;
char *ref;
int embed_ref; // 1 if embedding ref, 2 if embedding cons
int no_ref; // true if referenceless
//struct ref_entry *ref;

/* For multi-threading */
Expand Down Expand Up @@ -793,14 +795,14 @@ struct cram_fd {
cram_container *ctr_mt;

// positions for encoding or decoding
int first_base, last_base;
int first_base, last_base; // copied to container

// cached reference portion
refs_t *refs; // ref meta-data structure
char *ref, *ref_free; // current portion held in memory
int ref_id;
int ref_start;
int ref_end;
int ref_id; // copied to container
int ref_start; // copied to container
int ref_end; // copied to container
char *ref_fn; // reference fasta filename

// compression level and metrics
Expand All @@ -813,8 +815,8 @@ struct cram_fd {
int seqs_per_slice;
int bases_per_slice;
int slices_per_container;
int embed_ref;
int no_ref;
int embed_ref; // copied to container
int no_ref; // copied to container
int ignore_md5;
int use_bz2;
int use_rans;
Expand Down

0 comments on commit e8b30ba

Please sign in to comment.