Skip to content
This repository has been archived by the owner on Jul 17, 2023. It is now read-only.

Commit

Permalink
MANTA-1521 Improve evidence bam thread usage
Browse files Browse the repository at this point in the history
Use finer-grained file locking when writing out evidence bams
  • Loading branch information
ctsa committed Feb 19, 2019
1 parent b199bae commit 39d3446
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ parseGSCOptions(
"minimum number of supporting spanning observations required to become an SV candidate")
("min-scored-sv-size", po::value(&opt.minScoredVariantSize)->default_value(opt.minScoredVariantSize),
"minimum size for variants which are scored and output following initial candidate generation")
("evidence-bam-stub", po::value(&opt.supportBamStub)->default_value(opt.supportBamStub),
("evidence-bam-stub", po::value(&opt.evidenceBamStub)->default_value(opt.evidenceBamStub),
"Directory and prefix of bams storing the supporting reads of SVs")
("output-contigs", po::value(&opt.isOutputContig)->zero_tokens(),
"Output assembled contig sequences in VCF files")
Expand Down
8 changes: 7 additions & 1 deletion src/c++/lib/applications/GenerateSVCandidates/GSCOptions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ struct GSCOptions
return (! tumorOutputFilename.empty());
}

bool
isGenerateEvidenceBam() const
{
return (! evidenceBamStub.empty());
}

AlignmentFileOptions alignFileOpt;
EdgeOptions edgeOpt;
ReadScannerOptions scanOpt;
Expand All @@ -75,7 +81,7 @@ struct GSCOptions
std::string somaticOutputFilename;
std::string tumorOutputFilename;
std::string rnaOutputFilename;
std::string supportBamStub;
std::string evidenceBamStub;

bool isVerbose = false; ///< provide some high-level log info to assist in debugging

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ struct EdgeThreadLocalData
GSCEdgeStatsManager edgeStatMan;
std::unique_ptr<SVFinder> svFindPtr;
std::unique_ptr<SVCandidateProcessor> svProcessorPtr;
std::unique_ptr<SVEvidenceWriter> svEvidenceWriterPtr;
SVCandidateSetData svData;
std::vector<SVCandidate> svs;
std::vector<SVMultiJunctionCandidate> mjSVs;
Expand Down Expand Up @@ -256,7 +257,7 @@ runGSC(
}

const SVWriter svWriter(opt, bamHeader, progName, progVersion);
SVEvidenceWriter svEvidenceWriter(opt);
auto svEvidenceWriterSharedData(std::make_shared<SVEvidenceWriterSharedData>(opt));

// Initialize all thread-local edge data:
std::shared_ptr<SynchronizedOutputStream> edgeTrackerStreamPtr;
Expand All @@ -272,7 +273,7 @@ runGSC(
edgeData.svFindPtr.reset(new SVFinder(opt, readScanner, bamHeader, cset.getAllSampleReadCounts(),
edgeData.edgeTrackerPtr, edgeData.edgeStatMan));
edgeData.svProcessorPtr.reset(
new SVCandidateProcessor(opt, readScanner, cset, svWriter, svEvidenceWriter,
new SVCandidateProcessor(opt, readScanner, cset, svWriter, svEvidenceWriterSharedData,
edgeData.edgeTrackerPtr, edgeData.edgeStatMan));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ SVCandidateProcessor(
const SVLocusScanner& readScanner,
const SVLocusSet& cset,
const SVWriter& svWriter,
SVEvidenceWriter& svEvidenceWriter,
std::shared_ptr<SVEvidenceWriterSharedData> svEvidenceWriterSharedData,
std::shared_ptr<EdgeRuntimeTracker> edgeTrackerPtr,
GSCEdgeStatsManager& edgeStatMan) :
_opt(opt),
_cset(cset),
_svWriter(svWriter),
_svEvidenceWriter(svEvidenceWriter),
_svEvidenceWriter(opt, svEvidenceWriterSharedData),
_edgeTrackerPtr(edgeTrackerPtr),
_edgeStatMan(edgeStatMan),
_svRefine(opt, cset.getBamHeader(), cset.getAllSampleReadCounts(), _edgeTrackerPtr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SVCandidateProcessor
const SVLocusScanner& readScanner,
const SVLocusSet& cset,
const SVWriter& svWriter,
SVEvidenceWriter& svEvidenceWriter,
std::shared_ptr<SVEvidenceWriterSharedData> svEvidenceWriterSharedData,
std::shared_ptr<EdgeRuntimeTracker> edgeTrackerPtr,
GSCEdgeStatsManager& edgeStatMan);

Expand Down Expand Up @@ -84,7 +84,7 @@ private:
const GSCOptions& _opt;
const SVLocusSet& _cset;
const SVWriter& _svWriter;
SVEvidenceWriter& _svEvidenceWriter;
SVEvidenceWriter _svEvidenceWriter;
std::shared_ptr<EdgeRuntimeTracker> _edgeTrackerPtr;
GSCEdgeStatsManager& _edgeStatMan;
SVCandidateAssemblyRefiner _svRefine;
Expand Down
65 changes: 38 additions & 27 deletions src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ processBamRecords(
bam_streamer& origBamStream,
const GenomeInterval& interval,
const support_fragments_t& supportFrags,
bam_dumper& bamDumper)
SynchronizedBamWriter& bamWriter)
{
#ifdef DEBUG_SUPPORT
log_os << __FUNCTION__ << " target interval: "
Expand Down Expand Up @@ -147,7 +147,8 @@ processBamRecords(
// Update bam record bin value
bam_update_bin(br);
// write to bam
bamDumper.put_record(&br);

bamWriter.put_record(&br);
}
}
}
Expand All @@ -156,9 +157,10 @@ processBamRecords(

void
SVEvidenceWriter::
writeSupportBam(const bam_streamer_ptr& origBamStreamPtr,
const SVEvidenceWriterSampleData& svSupportFrags,
const bam_dumper_ptr& supportBamDumperPtr)
writeSupportBam(
const bam_streamer_ptr& origBamStreamPtr,
const SVEvidenceWriterSampleData& svSupportFrags,
SynchronizedBamWriter& bamWriter)
{
std::vector<SVEvidenceWriterRead> supportReads;
const support_fragments_t& supportFrags(svSupportFrags.supportFrags);
Expand Down Expand Up @@ -193,37 +195,49 @@ writeSupportBam(const bam_streamer_ptr& origBamStreamPtr,
}

bam_streamer& origBamStream(*origBamStreamPtr);
bam_dumper& supportBamDumper(*supportBamDumperPtr);
for (const auto& interval : intervals)
{
processBamRecords(origBamStream, interval,
supportFrags, supportBamDumper);
processBamRecords(origBamStream, interval, supportFrags, bamWriter);
}

}



SVEvidenceWriter::
SVEvidenceWriter(
const GSCOptions& opt) :
m_isGenerateSupportBam(opt.supportBamStub.size() > 0),
m_sampleSize(opt.alignFileOpt.alignmentFilenames.size())
SVEvidenceWriterSharedData::
SVEvidenceWriterSharedData(
const GSCOptions& opt)
{
if (! m_isGenerateSupportBam) return;

openBamStreams(opt.referenceFilename, opt.alignFileOpt.alignmentFilenames, m_origBamStreamPtrs);
if (! opt.isGenerateEvidenceBam()) return;

assert(m_origBamStreamPtrs.size() == m_sampleSize);
for (unsigned sampleIndex(0); sampleIndex<m_sampleSize; ++sampleIndex)
const unsigned sampleSize(opt.alignFileOpt.alignmentFilenames.size());
for (unsigned sampleIndex(0); sampleIndex<sampleSize; ++sampleIndex)
{
std::string supportBamName(opt.supportBamStub
const std::string evidenceBamName(opt.evidenceBamStub
+ ".bam_" + std::to_string(sampleIndex)
+ ".bam");
const bam_hdr_t& header(m_origBamStreamPtrs[sampleIndex]->get_header());
bam_dumper_ptr bamDumperPtr(new bam_dumper(supportBamName.c_str(), header));
m_supportBamDumperPtrs.push_back(bamDumperPtr);
const bam_streamer bamStreamer(
opt.alignFileOpt.alignmentFilenames[sampleIndex].c_str(), opt.referenceFilename.c_str());

m_evidenceBamWriterPtrs.push_back(
std::make_shared<SynchronizedBamWriter>(evidenceBamName.c_str(), bamStreamer.get_header()));
}

}


SVEvidenceWriter::
SVEvidenceWriter(
const GSCOptions& opt,
std::shared_ptr<SVEvidenceWriterSharedData> sharedData) :
m_isGenerateEvidenceBam(opt.isGenerateEvidenceBam()),
m_sampleSize(opt.alignFileOpt.alignmentFilenames.size()),
m_sharedData(sharedData)
{
if (! m_isGenerateEvidenceBam) return;

openBamStreams(opt.referenceFilename, opt.alignFileOpt.alignmentFilenames, m_origBamStreamPtrs);
assert(m_origBamStreamPtrs.size() == m_sampleSize);
}


Expand All @@ -233,15 +247,12 @@ SVEvidenceWriter::
write(
const SVEvidenceWriterData& svEvidenceWriterData)
{
if (! m_isGenerateSupportBam) return;

/// TODO more specific I/O lock could be needed to make the SV caller faster when evidence BAM output is turned on:
std::lock_guard<std::mutex> lock(m_writerMutex);
if (! m_isGenerateEvidenceBam) return;

for (unsigned sampleIndex(0); sampleIndex<m_sampleSize; ++sampleIndex)
{
writeSupportBam(m_origBamStreamPtrs[sampleIndex],
svEvidenceWriterData.sampleData[sampleIndex],
m_supportBamDumperPtrs[sampleIndex]);
m_sharedData->getBamWriter(sampleIndex));
}
}
39 changes: 29 additions & 10 deletions src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@

#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "manta/SVCandidateSetData.hh"
#include "GSCOptions.hh"
#include "htsapi/bam_streamer.hh"
#include "htsapi/bam_dumper.hh"
#include "SynchronizedBamWriter.hh"


/// Records a single read that supports one or more SVs for evidence-BAM output
Expand Down Expand Up @@ -219,16 +218,38 @@ std::ostream&
operator<<( std::ostream& os, const SVEvidenceWriterData& data);



/// Data that are shared by multiple SVEvidenceWriters (potentially operating on multiple threads)
class SVEvidenceWriterSharedData {
public:
explicit
SVEvidenceWriterSharedData(
const GSCOptions& opt);

SynchronizedBamWriter&
getBamWriter(const unsigned bamIndex)
{
assert(bamIndex < m_evidenceBamWriterPtrs.size());
assert(m_evidenceBamWriterPtrs[bamIndex]);
return *(m_evidenceBamWriterPtrs[bamIndex]);
}

private:
std::vector<std::shared_ptr<SynchronizedBamWriter>> m_evidenceBamWriterPtrs;
};



/// \brief Coordinate all bookkeeping and data structures required to output evidence BAMs
///
/// Evidence BAMs are intended for visualization in debugging scenarios only. They are not part of the standard
/// calling workflow.
class SVEvidenceWriter
{
public:
explicit
SVEvidenceWriter(
const GSCOptions& opt);
const GSCOptions& opt,
std::shared_ptr<SVEvidenceWriterSharedData> sharedData);

/// Write SV evidence reads into bam files
void
Expand All @@ -238,26 +259,24 @@ public:
// Everything below is conceptually private, but kept here for unit testing until a fix can be written:

typedef std::shared_ptr<bam_streamer> bam_streamer_ptr;
typedef std::shared_ptr<bam_dumper> bam_dumper_ptr;

static
void
processBamRecords(bam_streamer& origBamStream,
const GenomeInterval& interval,
const support_fragments_t& supportFrags,
bam_dumper& bamDumper);
SynchronizedBamWriter& bamWriter);

static
void
writeSupportBam(const bam_streamer_ptr& origBamStream,
const SVEvidenceWriterSampleData& svSupportFrags,
const bam_dumper_ptr& supportBamDumper);
SynchronizedBamWriter& bamWriter);

private:

bool m_isGenerateSupportBam;
bool m_isGenerateEvidenceBam;
unsigned m_sampleSize;
std::vector<bam_streamer_ptr> m_origBamStreamPtrs;
std::vector<bam_dumper_ptr> m_supportBamDumperPtrs;
std::mutex m_writerMutex;
std::shared_ptr<SVEvidenceWriterSharedData> m_sharedData;
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//
// Manta - Structural Variant and Indel Caller
// Copyright (c) 2013-2019 Illumina, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
//

/// \file
/// \author Chris Saunders
///

#pragma once

#include <mutex>

#include "boost/noncopyable.hpp"

#include "htsapi/bam_dumper.hh"


/// Extends the standard BAM writer to allow synchronized writing to the same file from multiple threads
class SynchronizedBamWriter : private boost::noncopyable
{
public:
SynchronizedBamWriter(
const char* filename,
const bam_hdr_t& header)
: m_bamWriter(filename, header)
{}

/// Add another BAM record to the file. File must not be closed.
void
put_record(const bam1_t* brec)
{
std::lock_guard<std::mutex> lock(m_writeMutex);
m_bamWriter.put_record(brec);
}

private:
bam_dumper m_bamWriter;
std::mutex m_writeMutex;
};
Loading

0 comments on commit 39d3446

Please sign in to comment.