diff --git a/cram/cram_encode.c b/cram/cram_encode.c index cee699b3a..c878be626 100644 --- a/cram/cram_encode.c +++ b/cram/cram_encode.c @@ -61,7 +61,7 @@ KHASH_MAP_INIT_STR(m_s2u64, uint64_t) static int process_one_read(cram_fd *fd, cram_container *c, cram_slice *s, cram_record *cr, bam_seq_t *b, int rnum, kstring_t *MD, - int embed_ref); + int embed_ref, int no_ref); /* * Returns index of val into key. @@ -87,6 +87,10 @@ cram_block *cram_encode_compression_header(cram_fd *fd, cram_container *c, cram_block *map = cram_new_block(COMPRESSION_HEADER, 0); int i, mc, r = 0; + pthread_mutex_lock(&fd->metrics_lock); + int no_ref = c->no_ref; + pthread_mutex_unlock(&fd->metrics_lock); + if (!cb || !map) return NULL; @@ -162,7 +166,7 @@ cram_block *cram_encode_compression_header(cram_fd *fd, cram_container *c, kh_val(h->preservation_map, k).i = h->qs_seq_orient; } - if (fd->no_ref || embed_ref>0) { + if (no_ref || embed_ref>0) { // Reference Required == No k = kh_put(map, h->preservation_map, "RR", &r); if (-1 == r) return NULL; @@ -1746,7 +1750,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { cram_block_compression_hdr *h = c->comp_hdr; cram_block *c_hdr; int multi_ref = 0; - int r1, r2, sn, nref, embed_ref; + int r1, r2, sn, nref, embed_ref, no_ref; spare_bams *spares; if (CRAM_MAJOR_VERS(fd->version) == 1) @@ -1759,10 +1763,11 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { /* Cache references up-front if we have unsorted access patterns */ pthread_mutex_lock(&fd->ref_lock); nref = fd->refs->nref; - embed_ref = fd->embed_ref; + embed_ref = c->embed_ref; + no_ref = c->no_ref; pthread_mutex_unlock(&fd->ref_lock); - if (!fd->no_ref && c->refs_used) { + if (!no_ref && c->refs_used) { for (i = 0; i < nref; i++) { if (c->refs_used[i]) { cram_get_ref(fd, i, 1, 0); @@ -1774,7 +1779,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { /* To create M5 strings */ /* Fetch reference sequence */ - if (!fd->no_ref) { + if (!no_ref) { if (!c->bams || !c->bams[0]) goto_err; bam_seq_t *b = c->bams[0]; @@ -1793,7 +1798,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { hts_log_warning("NOTE: the CRAM file will be bigger than" " using an external reference"); pthread_mutex_lock(&fd->ref_lock); - embed_ref = fd->embed_ref = 2; + embed_ref = c->embed_ref = fd->embed_ref = 2; pthread_mutex_unlock(&fd->ref_lock); goto auto_ref; } else if (ref) { @@ -1848,8 +1853,8 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { if (cram_generate_reference(c, s, r1) < 0) { hts_log_error("Failed to build reference, switching to non-ref mode"); pthread_mutex_lock(&fd->ref_lock); - fd->embed_ref = 0; - fd->no_ref = 1; + c->embed_ref = fd->embed_ref = 0; + c->no_ref = fd->no_ref = 1; pthread_mutex_unlock(&fd->ref_lock); goto restart; } @@ -1862,7 +1867,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { bam_seq_t *b = c->bams[r1]; /* If multi-ref we need to cope with changing reference per seq */ - if (c->multi_seq && !fd->no_ref) { + if (c->multi_seq && !no_ref) { if (bam_ref(b) != c->ref_seq_id && bam_ref(b) >= 0) { if (c->ref_seq_id >= 0) cram_ref_decr(fd->refs, c->ref_seq_id); @@ -1884,7 +1889,8 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { } } - if (process_one_read(fd, c, s, cr, b, r2, &MD, embed_ref) != 0) { + if (process_one_read(fd, c, s, cr, b, r2, &MD, embed_ref, + no_ref) != 0) { free(MD.s); return -1; } @@ -1951,7 +1957,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { } } - if (c->multi_seq && !fd->no_ref) { + if (c->multi_seq && !no_ref) { if (c->ref_seq_id >= 0) cram_ref_decr(fd->refs, c->ref_seq_id); } @@ -1983,12 +1989,16 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { /* Compute MD5s */ + pthread_mutex_lock(&fd->ref_lock); + no_ref = c->no_ref; // may have been updated in cram_put_bam_seq + pthread_mutex_unlock(&fd->ref_lock); int is_v4 = CRAM_MAJOR_VERS(fd->version) >= 4 ? 1 : 0; + for (i = 0; i < c->curr_slice; i++) { cram_slice *s = c->slices[i]; if (CRAM_MAJOR_VERS(fd->version) != 1) { - if (s->hdr->ref_seq_id >= 0 && c->multi_seq == 0 && !fd->no_ref) { + if (s->hdr->ref_seq_id >= 0 && c->multi_seq == 0 && !no_ref) { hts_md5_context *md5 = hts_md5_init(); if (!md5) return -1; @@ -2365,7 +2375,7 @@ int cram_encode_container(cram_fd *fd, cram_container *c) { } /* Cache references up-front if we have unsorted access patterns */ - if (!fd->no_ref && c->refs_used) { + if (!no_ref && c->refs_used) { for (i = 0; i < fd->refs->nref; i++) { if (c->refs_used[i]) cram_ref_decr(fd->refs, i); @@ -2603,7 +2613,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b, cram_slice *s, cram_record *cr, int verbatim_NM, int verbatim_MD, int NM, kstring_t *MD, int cf_tag, - int *err) { + int no_ref, int *err) { char *aux, *orig; sam_hrec_rg_t *brg = NULL; int aux_size = bam_get_l_aux(b); @@ -2655,7 +2665,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b, // MD:Z if (aux[0] == 'M' && aux[1] == 'D' && aux[2] == 'Z') { - if (cr->len && !fd->no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_MD) { + if (cr->len && !no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_MD) { if (MD && MD->s && strncasecmp(MD->s, aux+3, orig + aux_size - (aux+3)) == 0) { while (*aux++); if (CRAM_MAJOR_VERS(fd->version) >= 4) @@ -2667,7 +2677,7 @@ static sam_hrec_rg_t *cram_encode_aux(cram_fd *fd, bam_seq_t *b, // NM:i if (aux[0] == 'N' && aux[1] == 'M') { - if (cr->len && !fd->no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_NM) { + if (cr->len && !no_ref && !(cr->flags & BAM_FUNMAP) && !verbatim_NM) { int NM_ = bam_aux2i((uint8_t *)aux+2); if (NM_ == NM) { switch(aux[2]) { @@ -3101,7 +3111,12 @@ static cram_container *cram_next_container(cram_fd *fd, bam_seq_t *b) { fd->slices_per_container); if (!c) return NULL; + + pthread_mutex_lock(&fd->ref_lock); + c->no_ref = fd->no_ref; + c->embed_ref = fd->embed_ref; c->record_counter = fd->record_counter; + pthread_mutex_unlock(&fd->ref_lock); c->curr_ref = bam_ref(b); } @@ -3148,7 +3163,7 @@ static cram_container *cram_next_container(cram_fd *fd, bam_seq_t *b) { static int process_one_read(cram_fd *fd, cram_container *c, cram_slice *s, cram_record *cr, bam_seq_t *b, int rnum, kstring_t *MD, - int embed_ref) { + int embed_ref, int no_ref) { int i, fake_qual = -1, NM = 0; char *cp; char *ref, *seq, *qual; @@ -3189,7 +3204,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, // Non reference based encoding means storing the bases verbatim as features, which in // turn means every base also has a quality already stored. - if (!fd->no_ref || CRAM_MAJOR_VERS(fd->version) >= 3) + if (!no_ref || CRAM_MAJOR_VERS(fd->version) >= 3) cr->cram_flags |= CRAM_FLAG_PRESERVE_QUAL_SCORES; if (cr->len <= 0 && CRAM_MAJOR_VERS(fd->version) >= 3) @@ -3267,7 +3282,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, //fprintf(stderr, "\nBAM_CMATCH\nR: %.*s\nS: %.*s\n", // cig_len, &ref[apos], cig_len, &seq[spos]); l = 0; - if (!fd->no_ref && cr->len) { + if (!no_ref && cr->len) { int end = cig_len+apos < c->ref_end ? cig_len : c->ref_end - apos; char *sp = &seq[spos]; @@ -3362,7 +3377,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, } if (l < cig_len && cr->len) { - if (fd->no_ref) { + if (no_ref) { if (CRAM_MAJOR_VERS(fd->version) == 3) { if (cram_add_bases(fd, c, s, cr, spos, cig_len-l, &seq[spos])) @@ -3421,7 +3436,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, if (cram_add_insertion(c, s, cr, spos, cig_len, cr->len ? &seq[spos] : NULL)) return -1; - if (fd->no_ref && cr->len) { + if (no_ref && cr->len) { for (l = 0; l < cig_len; l++, spos++) { cram_add_quality(fd, c, s, cr, spos, qual[spos]); } @@ -3437,7 +3452,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, fd->version)) return -1; - if (fd->no_ref && + if (no_ref && !(cr->cram_flags & CRAM_FLAG_PRESERVE_QUAL_SCORES)) { if (cr->len) { for (l = 0; l < cig_len; l++, spos++) { @@ -3473,7 +3488,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, return -1; } fake_qual = spos; - cr->aend = fd->no_ref ? apos : MIN(apos, c->ref_end); + cr->aend = no_ref ? apos : MIN(apos, c->ref_end); if (cram_stats_add(c->stats[DS_FN], cr->nfeature) < 0) goto block_err; @@ -3496,7 +3511,7 @@ static int process_one_read(cram_fd *fd, cram_container *c, int err = 0; sam_hrec_rg_t *brg = cram_encode_aux(fd, b, c, s, cr, verbatim_NM, verbatim_MD, NM, MD, - cf_tag, &err); + cf_tag, no_ref, &err); if (err) goto block_err; @@ -3792,6 +3807,15 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) { } c = fd->ctr; + // Cache fd embed_ref and no_ref flags so we can create consistent + // containers even when these fd globals are changing in other + // threads. + pthread_mutex_lock(&fd->ref_lock); + int embed_ref; + c->no_ref = fd->no_ref; + embed_ref = c->embed_ref = fd->embed_ref; + pthread_mutex_unlock(&fd->ref_lock); + if (!c->slice || c->curr_rec == c->max_rec || (bam_ref(b) != c->curr_ref && c->curr_ref >= -1) || (c->s_num_bases >= fd->bases_per_slice)) { @@ -3807,9 +3831,6 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) { * The multi_seq var here refers to our intention for the next slice. * This slice has already been encoded so we output as-is. */ - pthread_mutex_lock(&fd->ref_lock); - int embed_ref = fd->embed_ref; - pthread_mutex_unlock(&fd->ref_lock); if (fd->multi_seq == -1 && c->curr_rec < c->max_rec/4+10 && fd->last_slice && fd->last_slice < c->max_rec/4+10 && embed_ref<=0) { @@ -3857,6 +3878,20 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) { c->multi_seq = 1; c->pos_sorted = 0; + // Cram_next_container may end up flushing an existing one and + // triggering fd->embed_ref=2 if no reference is found. + // Embedded refs are incompatible with multi-seq, so we bail + // out and switch to no_ref in this scenario. We do this + // within the container only, as multi_seq may be temporary + // and we switch back away from it again. + pthread_mutex_lock(&fd->ref_lock); + if (fd->embed_ref) { + hts_log_info("Changing from embed_ref to no_ref mode"); + c->embed_ref = 0;//fd->embed_ref = 0; + c->no_ref = 1;//fd->no_ref = 1; + } + pthread_mutex_unlock(&fd->ref_lock); + if (!c->refs_used) { pthread_mutex_lock(&fd->ref_lock); c->refs_used = calloc(fd->refs->nref, sizeof(int)); @@ -3882,8 +3917,8 @@ int cram_put_bam_seq(cram_fd *fd, bam_seq_t *b) { } else if (c->refs_used && c->refs_used[bam_ref(b)]) { pthread_mutex_lock(&fd->ref_lock); fd->unsorted = 1; - pthread_mutex_unlock(&fd->ref_lock); fd->multi_seq = 1; + pthread_mutex_unlock(&fd->ref_lock); } } diff --git a/cram/cram_io.c b/cram/cram_io.c index e7b239c3f..ba63e1f47 100644 --- a/cram/cram_io.c +++ b/cram/cram_io.c @@ -3650,6 +3650,8 @@ cram_container *cram_new_container(int nrec, int nslice) { c->max_apos = 0; c->multi_seq = 0; c->qs_seq_orient = 1; + c->no_ref = 0; + c->embed_ref = 0; c->bams = NULL; diff --git a/cram/cram_structs.h b/cram/cram_structs.h index cbb226b70..724f6cb78 100644 --- a/cram/cram_structs.h +++ b/cram/cram_structs.h @@ -457,6 +457,8 @@ struct cram_container { /* Copied from fd before encoding, to allow multi-threading */ int ref_start, first_base, last_base, ref_id, ref_end; char *ref; + int embed_ref; // 1 if embedding ref, 2 if embedding cons + int no_ref; // true if referenceless //struct ref_entry *ref; /* For multi-threading */ @@ -793,14 +795,14 @@ struct cram_fd { cram_container *ctr_mt; // positions for encoding or decoding - int first_base, last_base; + int first_base, last_base; // copied to container // cached reference portion refs_t *refs; // ref meta-data structure char *ref, *ref_free; // current portion held in memory - int ref_id; - int ref_start; - int ref_end; + int ref_id; // copied to container + int ref_start; // copied to container + int ref_end; // copied to container char *ref_fn; // reference fasta filename // compression level and metrics @@ -813,8 +815,8 @@ struct cram_fd { int seqs_per_slice; int bases_per_slice; int slices_per_container; - int embed_ref; - int no_ref; + int embed_ref; // copied to container + int no_ref; // copied to container int ignore_md5; int use_bz2; int use_rans;