diff --git a/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.cpp b/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.cpp index 7548cebd..5cfb9730 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.cpp @@ -115,7 +115,7 @@ parseGSCOptions( "minimum number of supporting spanning observations required to become an SV candidate") ("min-scored-sv-size", po::value(&opt.minScoredVariantSize)->default_value(opt.minScoredVariantSize), "minimum size for variants which are scored and output following initial candidate generation") - ("evidence-bam-stub", po::value(&opt.supportBamStub)->default_value(opt.supportBamStub), + ("evidence-bam-stub", po::value(&opt.evidenceBamStub)->default_value(opt.evidenceBamStub), "Directory and prefix of bams storing the supporting reads of SVs") ("output-contigs", po::value(&opt.isOutputContig)->zero_tokens(), "Output assembled contig sequences in VCF files") diff --git a/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.hh b/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.hh index 21419e65..c0d00333 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.hh +++ b/src/c++/lib/applications/GenerateSVCandidates/GSCOptions.hh @@ -51,6 +51,12 @@ struct GSCOptions return (! tumorOutputFilename.empty()); } + bool + isGenerateEvidenceBam() const + { + return (! evidenceBamStub.empty()); + } + AlignmentFileOptions alignFileOpt; EdgeOptions edgeOpt; ReadScannerOptions scanOpt; @@ -75,7 +81,7 @@ struct GSCOptions std::string somaticOutputFilename; std::string tumorOutputFilename; std::string rnaOutputFilename; - std::string supportBamStub; + std::string evidenceBamStub; bool isVerbose = false; ///< provide some high-level log info to assist in debugging diff --git a/src/c++/lib/applications/GenerateSVCandidates/GenerateSVCandidates.cpp b/src/c++/lib/applications/GenerateSVCandidates/GenerateSVCandidates.cpp index a47e0de5..7dd67996 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/GenerateSVCandidates.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/GenerateSVCandidates.cpp @@ -154,6 +154,7 @@ struct EdgeThreadLocalData GSCEdgeStatsManager edgeStatMan; std::unique_ptr svFindPtr; std::unique_ptr svProcessorPtr; + std::unique_ptr svEvidenceWriterPtr; SVCandidateSetData svData; std::vector svs; std::vector mjSVs; @@ -256,7 +257,7 @@ runGSC( } const SVWriter svWriter(opt, bamHeader, progName, progVersion); - SVEvidenceWriter svEvidenceWriter(opt); + auto svEvidenceWriterSharedData(std::make_shared(opt)); // Initialize all thread-local edge data: std::shared_ptr edgeTrackerStreamPtr; @@ -272,7 +273,7 @@ runGSC( edgeData.svFindPtr.reset(new SVFinder(opt, readScanner, bamHeader, cset.getAllSampleReadCounts(), edgeData.edgeTrackerPtr, edgeData.edgeStatMan)); edgeData.svProcessorPtr.reset( - new SVCandidateProcessor(opt, readScanner, cset, svWriter, svEvidenceWriter, + new SVCandidateProcessor(opt, readScanner, cset, svWriter, svEvidenceWriterSharedData, edgeData.edgeTrackerPtr, edgeData.edgeStatMan)); } diff --git a/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.cpp b/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.cpp index bba442ce..0b1b41d0 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.cpp @@ -39,13 +39,13 @@ SVCandidateProcessor( const SVLocusScanner& readScanner, const SVLocusSet& cset, const SVWriter& svWriter, - SVEvidenceWriter& svEvidenceWriter, + std::shared_ptr svEvidenceWriterSharedData, std::shared_ptr edgeTrackerPtr, GSCEdgeStatsManager& edgeStatMan) : _opt(opt), _cset(cset), _svWriter(svWriter), - _svEvidenceWriter(svEvidenceWriter), + _svEvidenceWriter(opt, svEvidenceWriterSharedData), _edgeTrackerPtr(edgeTrackerPtr), _edgeStatMan(edgeStatMan), _svRefine(opt, cset.getBamHeader(), cset.getAllSampleReadCounts(), _edgeTrackerPtr), diff --git a/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.hh b/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.hh index 30d018be..bc092769 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.hh +++ b/src/c++/lib/applications/GenerateSVCandidates/SVCandidateProcessor.hh @@ -43,7 +43,7 @@ struct SVCandidateProcessor const SVLocusScanner& readScanner, const SVLocusSet& cset, const SVWriter& svWriter, - SVEvidenceWriter& svEvidenceWriter, + std::shared_ptr svEvidenceWriterSharedData, std::shared_ptr edgeTrackerPtr, GSCEdgeStatsManager& edgeStatMan); @@ -84,7 +84,7 @@ private: const GSCOptions& _opt; const SVLocusSet& _cset; const SVWriter& _svWriter; - SVEvidenceWriter& _svEvidenceWriter; + SVEvidenceWriter _svEvidenceWriter; std::shared_ptr _edgeTrackerPtr; GSCEdgeStatsManager& _edgeStatMan; SVCandidateAssemblyRefiner _svRefine; diff --git a/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.cpp b/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.cpp index aad486e1..9da53901 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.cpp @@ -91,7 +91,7 @@ processBamRecords( bam_streamer& origBamStream, const GenomeInterval& interval, const support_fragments_t& supportFrags, - bam_dumper& bamDumper) + SynchronizedBamWriter& bamWriter) { #ifdef DEBUG_SUPPORT log_os << __FUNCTION__ << " target interval: " @@ -147,7 +147,8 @@ processBamRecords( // Update bam record bin value bam_update_bin(br); // write to bam - bamDumper.put_record(&br); + + bamWriter.put_record(&br); } } } @@ -156,9 +157,10 @@ processBamRecords( void SVEvidenceWriter:: -writeSupportBam(const bam_streamer_ptr& origBamStreamPtr, - const SVEvidenceWriterSampleData& svSupportFrags, - const bam_dumper_ptr& supportBamDumperPtr) +writeSupportBam( + const bam_streamer_ptr& origBamStreamPtr, + const SVEvidenceWriterSampleData& svSupportFrags, + SynchronizedBamWriter& bamWriter) { std::vector supportReads; const support_fragments_t& supportFrags(svSupportFrags.supportFrags); @@ -193,37 +195,49 @@ writeSupportBam(const bam_streamer_ptr& origBamStreamPtr, } bam_streamer& origBamStream(*origBamStreamPtr); - bam_dumper& supportBamDumper(*supportBamDumperPtr); for (const auto& interval : intervals) { - processBamRecords(origBamStream, interval, - supportFrags, supportBamDumper); + processBamRecords(origBamStream, interval, supportFrags, bamWriter); } } -SVEvidenceWriter:: -SVEvidenceWriter( - const GSCOptions& opt) : - m_isGenerateSupportBam(opt.supportBamStub.size() > 0), - m_sampleSize(opt.alignFileOpt.alignmentFilenames.size()) +SVEvidenceWriterSharedData:: +SVEvidenceWriterSharedData( + const GSCOptions& opt) { - if (! m_isGenerateSupportBam) return; - - openBamStreams(opt.referenceFilename, opt.alignFileOpt.alignmentFilenames, m_origBamStreamPtrs); + if (! opt.isGenerateEvidenceBam()) return; - assert(m_origBamStreamPtrs.size() == m_sampleSize); - for (unsigned sampleIndex(0); sampleIndexget_header()); - bam_dumper_ptr bamDumperPtr(new bam_dumper(supportBamName.c_str(), header)); - m_supportBamDumperPtrs.push_back(bamDumperPtr); + const bam_streamer bamStreamer( + opt.alignFileOpt.alignmentFilenames[sampleIndex].c_str(), opt.referenceFilename.c_str()); + + m_evidenceBamWriterPtrs.push_back( + std::make_shared(evidenceBamName.c_str(), bamStreamer.get_header())); } + +} + + +SVEvidenceWriter:: +SVEvidenceWriter( + const GSCOptions& opt, + std::shared_ptr sharedData) : + m_isGenerateEvidenceBam(opt.isGenerateEvidenceBam()), + m_sampleSize(opt.alignFileOpt.alignmentFilenames.size()), + m_sharedData(sharedData) +{ + if (! m_isGenerateEvidenceBam) return; + + openBamStreams(opt.referenceFilename, opt.alignFileOpt.alignmentFilenames, m_origBamStreamPtrs); + assert(m_origBamStreamPtrs.size() == m_sampleSize); } @@ -233,15 +247,12 @@ SVEvidenceWriter:: write( const SVEvidenceWriterData& svEvidenceWriterData) { - if (! m_isGenerateSupportBam) return; - - /// TODO more specific I/O lock could be needed to make the SV caller faster when evidence BAM output is turned on: - std::lock_guard lock(m_writerMutex); + if (! m_isGenerateEvidenceBam) return; for (unsigned sampleIndex(0); sampleIndexgetBamWriter(sampleIndex)); } } diff --git a/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.hh b/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.hh index b3ac09ad..fb56db16 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.hh +++ b/src/c++/lib/applications/GenerateSVCandidates/SVEvidenceWriter.hh @@ -25,14 +25,13 @@ #include #include -#include #include #include #include "manta/SVCandidateSetData.hh" #include "GSCOptions.hh" #include "htsapi/bam_streamer.hh" -#include "htsapi/bam_dumper.hh" +#include "SynchronizedBamWriter.hh" /// Records a single read that supports one or more SVs for evidence-BAM output @@ -219,6 +218,28 @@ std::ostream& operator<<( std::ostream& os, const SVEvidenceWriterData& data); + +/// Data that are shared by multiple SVEvidenceWriters (potentially operating on multiple threads) +class SVEvidenceWriterSharedData { +public: + explicit + SVEvidenceWriterSharedData( + const GSCOptions& opt); + + SynchronizedBamWriter& + getBamWriter(const unsigned bamIndex) + { + assert(bamIndex < m_evidenceBamWriterPtrs.size()); + assert(m_evidenceBamWriterPtrs[bamIndex]); + return *(m_evidenceBamWriterPtrs[bamIndex]); + } + +private: + std::vector> m_evidenceBamWriterPtrs; +}; + + + /// \brief Coordinate all bookkeeping and data structures required to output evidence BAMs /// /// Evidence BAMs are intended for visualization in debugging scenarios only. They are not part of the standard @@ -226,9 +247,9 @@ operator<<( std::ostream& os, const SVEvidenceWriterData& data); class SVEvidenceWriter { public: - explicit SVEvidenceWriter( - const GSCOptions& opt); + const GSCOptions& opt, + std::shared_ptr sharedData); /// Write SV evidence reads into bam files void @@ -238,26 +259,24 @@ public: // Everything below is conceptually private, but kept here for unit testing until a fix can be written: typedef std::shared_ptr bam_streamer_ptr; - typedef std::shared_ptr bam_dumper_ptr; static void processBamRecords(bam_streamer& origBamStream, const GenomeInterval& interval, const support_fragments_t& supportFrags, - bam_dumper& bamDumper); + SynchronizedBamWriter& bamWriter); static void writeSupportBam(const bam_streamer_ptr& origBamStream, const SVEvidenceWriterSampleData& svSupportFrags, - const bam_dumper_ptr& supportBamDumper); + SynchronizedBamWriter& bamWriter); private: - bool m_isGenerateSupportBam; + bool m_isGenerateEvidenceBam; unsigned m_sampleSize; std::vector m_origBamStreamPtrs; - std::vector m_supportBamDumperPtrs; - std::mutex m_writerMutex; + std::shared_ptr m_sharedData; }; diff --git a/src/c++/lib/applications/GenerateSVCandidates/SynchronizedBamWriter.hh b/src/c++/lib/applications/GenerateSVCandidates/SynchronizedBamWriter.hh new file mode 100644 index 00000000..f0516076 --- /dev/null +++ b/src/c++/lib/applications/GenerateSVCandidates/SynchronizedBamWriter.hh @@ -0,0 +1,54 @@ +// +// Manta - Structural Variant and Indel Caller +// Copyright (c) 2013-2019 Illumina, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// +// + +/// \file +/// \author Chris Saunders +/// + +#pragma once + +#include + +#include "boost/noncopyable.hpp" + +#include "htsapi/bam_dumper.hh" + + +/// Extends the standard BAM writer to allow synchronized writing to the same file from multiple threads +class SynchronizedBamWriter : private boost::noncopyable +{ +public: + SynchronizedBamWriter( + const char* filename, + const bam_hdr_t& header) + : m_bamWriter(filename, header) + {} + + /// Add another BAM record to the file. File must not be closed. + void + put_record(const bam1_t* brec) + { + std::lock_guard lock(m_writeMutex); + m_bamWriter.put_record(brec); + } + +private: + bam_dumper m_bamWriter; + std::mutex m_writeMutex; +}; diff --git a/src/c++/lib/applications/GenerateSVCandidates/test/SVCandidateProcessorTest.cpp b/src/c++/lib/applications/GenerateSVCandidates/test/SVCandidateProcessorTest.cpp index 7bb47267..d8aa28d0 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/test/SVCandidateProcessorTest.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/test/SVCandidateProcessorTest.cpp @@ -298,9 +298,9 @@ BOOST_AUTO_TEST_CASE( test_tumorOnly ) // block-scope svWriter to force file flush at end of scope: { const SVWriter svWriter(options, bamHeader, programName.c_str(), version.c_str()); - SVEvidenceWriter svEvidenceWriter(options); - SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, svEvidenceWriter, - edgeTrackerPtr, edgeStatMan); + auto svEvidenceWriterSharedData(std::make_shared(options)); + SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, + svEvidenceWriterSharedData, edgeTrackerPtr, edgeStatMan); candidateProcessor.evaluateCandidates(edgeInfo, mjSvs, svData); } @@ -413,9 +413,9 @@ BOOST_AUTO_TEST_CASE( test_RNA ) // block-scope svWriter to force file flush at end of scope: { const SVWriter svWriter(options, bamHeader, programName.c_str(), version.c_str()); - SVEvidenceWriter svEvidenceWriter(options); - SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, svEvidenceWriter, - edgeTrackerPtr, edgeStatMan); + auto svEvidenceWriterSharedData(std::make_shared(options)); + SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, + svEvidenceWriterSharedData, edgeTrackerPtr, edgeStatMan); candidateProcessor.evaluateCandidates(edgeInfo, mjSvs, svData); } @@ -529,9 +529,9 @@ BOOST_AUTO_TEST_CASE( test_Diploid ) // block-scope svWriter to force file flush at end of scope: { const SVWriter svWriter(options, bamHeader, programName.c_str(), version.c_str()); - SVEvidenceWriter svEvidenceWriter(options); - SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, svEvidenceWriter, - edgeTrackerPtr, edgeStatMan); + auto svEvidenceWriterSharedData(std::make_shared(options)); + SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, + svEvidenceWriterSharedData, edgeTrackerPtr, edgeStatMan); candidateProcessor.evaluateCandidates(edgeInfo, mjSvs, svData); } @@ -668,9 +668,9 @@ BOOST_AUTO_TEST_CASE( test_Somatic ) // block-scope svWriter to force file flush at end of scope: { const SVWriter svWriter(options, bamHeader, programName.c_str(), version.c_str()); - SVEvidenceWriter svEvidenceWriter(options); - SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, svEvidenceWriter, - edgeTrackerPtr, edgeStatMan); + auto svEvidenceWriterSharedData(std::make_shared(options)); + SVCandidateProcessor candidateProcessor(options, scanner.operator*(), set1, svWriter, + svEvidenceWriterSharedData, edgeTrackerPtr, edgeStatMan); candidateProcessor.evaluateCandidates(edgeInfo, mjSvs, svData); } diff --git a/src/c++/lib/applications/GenerateSVCandidates/test/SVEvidenceWriterTest.cpp b/src/c++/lib/applications/GenerateSVCandidates/test/SVEvidenceWriterTest.cpp index 7298688a..1db3a7e2 100644 --- a/src/c++/lib/applications/GenerateSVCandidates/test/SVEvidenceWriterTest.cpp +++ b/src/c++/lib/applications/GenerateSVCandidates/test/SVEvidenceWriterTest.cpp @@ -300,11 +300,15 @@ BOOST_AUTO_TEST_CASE( test_ProcessRecords ) suppFragments.supportFrags[readsToAdd[0].qname()] = suppFragment1; suppFragments.supportFrags[readsToAdd[1].qname()] = suppFragment2; - bam_dumper bamDumper1(bamFileName.c_str(), bamHeaderManager.get()); - // Process the bam record and add ZM tag for all reads which are intersection to genomeInterval1 - SVEvidenceWriter::processBamRecords(bamStream.operator*(), genomeInterval1, suppFragments.supportFrags, bamDumper1); - bamDumper1.close(); - // creating bam index + { + // this code block forces bamWriter to flush at block end: + SynchronizedBamWriter bamWriter(bamFileName.c_str(), bamHeaderManager.get()); + // Process the bam record and add ZM tag for all reads which are intersection to genomeInterval1 + SVEvidenceWriter::processBamRecords(bamStream.operator*(), genomeInterval1, suppFragments.supportFrags, + bamWriter); + } + + // create bam index const int indexStatus1 = bam_index_build(bamFileName.c_str(), 0); // check whether .bai file for bam file is build successfully or not. BOOST_REQUIRE(indexStatus1 >=0); @@ -312,10 +316,15 @@ BOOST_AUTO_TEST_CASE( test_ProcessRecords ) // check the evidence bam for bamRecord1 as genomeInterval1 intersects with bamRecord1. checkEvidenceBam(genomeInterval1, bamFileName, "INS_1|PR", "bamRecord1"); - bam_dumper bamDumper2(bamFileName.c_str(), bamHeaderManager.get()); - // Process the bam record and add ZM tag for all reads which are intersection to genomeInterval2 - SVEvidenceWriter::processBamRecords(bamStream.operator*(), genomeInterval1, suppFragments.supportFrags, bamDumper2); - bamDumper2.close(); + { + // this code block forces bamWriter to flush at block end: + SynchronizedBamWriter bamWriter(bamFileName.c_str(), bamHeaderManager.get()); + // Process the bam record and add ZM tag for all reads which are intersection to genomeInterval2 + SVEvidenceWriter::processBamRecords(bamStream.operator*(), genomeInterval1, suppFragments.supportFrags, + bamWriter); + } + + // create bam index const int indexStatus2 = bam_index_build(bamFileName.c_str(), 0); // check whether .bai file for bam file is build successfully or not. BOOST_REQUIRE(indexStatus2 >=0); @@ -347,11 +356,12 @@ BOOST_AUTO_TEST_CASE( test_writeEvidenceBam ) suppFragments.supportFrags[readsToAdd[1].qname()] = suppFragment2; { - SVEvidenceWriter::bam_dumper_ptr bamDumperPtr(new bam_dumper(bamFileName.c_str(), bamHeaderManager.get())); - // Write both the bamRecord1 and bamRecord2 in the evidence bam - SVEvidenceWriter::writeSupportBam(bamStream, suppFragments, bamDumperPtr); + // this code block forces bamWriter to flush at block end: + SynchronizedBamWriter bamWriter(bamFileName.c_str(), bamHeaderManager.get()); + SVEvidenceWriter::writeSupportBam(bamStream, suppFragments, bamWriter); } - // creating bam index + + // create bam index const int indexStatus = bam_index_build(bamFileName.c_str(), 0); // check whether .bai file for bam file is build successfully or not. BOOST_REQUIRE(indexStatus >=0); diff --git a/src/c++/lib/blt_util/io_util.cpp b/src/c++/lib/blt_util/io_util.cpp index 33cb84bd..9e8fd0ec 100644 --- a/src/c++/lib/blt_util/io_util.cpp +++ b/src/c++/lib/blt_util/io_util.cpp @@ -91,6 +91,6 @@ void SynchronizedOutputStream:: write(const std::string& msg) { - std::lock_guard lock(m_writerMutex); + std::lock_guard lock(m_writeMutex); *m_osPtr << msg; } diff --git a/src/c++/lib/blt_util/io_util.hh b/src/c++/lib/blt_util/io_util.hh index 24aa9aaa..bd71f374 100644 --- a/src/c++/lib/blt_util/io_util.hh +++ b/src/c++/lib/blt_util/io_util.hh @@ -65,6 +65,6 @@ public: private: std::unique_ptr m_osPtr; - std::mutex m_writerMutex; + std::mutex m_writeMutex; }; diff --git a/src/python/lib/mantaWorkflow.py b/src/python/lib/mantaWorkflow.py index 71c9b34c..afc032ab 100644 --- a/src/python/lib/mantaWorkflow.py +++ b/src/python/lib/mantaWorkflow.py @@ -336,14 +336,9 @@ def workflow(self2) : -def sortBams(self, sortBamTasks, taskPrefix="", binStr="", isNormal=True, bamIdx=0, dependencies=None): +def sortEvidenceBams(self, sortBamTasks, taskPrefix="", binStr="", dependencies=None): - if isNormal: - bamList = self.params.normalBamList - else: - bamList = self.params.tumorBamList - - for _ in bamList: + for bamIdx, _ in enumerate(self.params.normalBamList + self.params.tumorBamList): supportBam = self.paths.getSupportBamPath(bamIdx, binStr) sortedBam = self.paths.getSortedSupportBamPath(bamIdx, binStr) @@ -355,9 +350,6 @@ def sortBams(self, sortBamTasks, taskPrefix="", binStr="", isNormal=True, bamIdx sortBamTask = preJoin(taskPrefix, "sortEvidenceBam_%s_%s" % (binStr, bamIdx)) sortBamTasks.add(self.addTask(sortBamTask, sortBamCmd, dependencies=dependencies)) - bamIdx += 1 - - return bamIdx def sortAllVcfs(self, taskPrefix="", dependencies=None) : @@ -451,36 +443,28 @@ def extractSmall(inPath, outPath) : return nextStepWait -def mergeSupportBams(self, mergeBamTasks, taskPrefix="", isNormal=True, bamIdx=0, dependencies=None) : - if isNormal: - bamList = self.params.normalBamList - else: - bamList = self.params.tumorBamList +def mergeEvidenceBams(self, mergeBamTasks, taskPrefix="", dependencies=None) : - for bamPath in bamList: + for bamIdx, bamPath in enumerate(self.params.normalBamList + self.params.tumorBamList) : # merge support bams - supportBamFile = self.paths.getFinalSupportBamPath(bamPath, bamIdx) + evidenceBamFile = self.paths.getFinalSupportBamPath(bamPath, bamIdx) mergeCmd = [ sys.executable, self.params.mantaMergeBam, self.params.samtoolsBin, self.paths.getSortedSupportBamMask(bamIdx), - supportBamFile, + evidenceBamFile, self.paths.getSupportBamListPath(bamIdx) ] mergeBamTask=self.addTask(preJoin(taskPrefix,"merge_evidenceBam_%s" % (bamIdx)), mergeCmd, dependencies=dependencies) - mergeBamTasks.add(mergeBamTask) # index the filtered bam ### TODO still needs to handle the case where supportBamFile does not exist - indexCmd = [ self.params.samtoolsBin, "index", supportBamFile ] + indexCmd = [ self.params.samtoolsBin, "index", evidenceBamFile ] indexBamTask = self.addTask(preJoin(taskPrefix,"index_evidenceBam_%s" % (bamIdx)), indexCmd, dependencies=mergeBamTask) mergeBamTasks.add(indexBamTask) - bamIdx += 1 - - return bamIdx def runHyGen(self, taskPrefix="", dependencies=None) : @@ -502,7 +486,7 @@ def runHyGen(self, taskPrefix="", dependencies=None) : hygenTasks=set() if self.params.isGenerateSupportBam : - sortBamVcfTasks = set() + sortEvidenceBamTasks = set() self.candidateVcfPaths = [] self.diploidVcfPaths = [] @@ -598,41 +582,27 @@ def isEnableRemoteReadRetrieval() : hygenTask = preJoin(taskPrefix,"generateCandidateSV_"+binStr) hygenTasks.add(self.addTask(hygenTask,hygenCmd,dependencies=dirTask, nCores=self.getNCores(), memMb=self.params.hyGenMemMb)) - # TODO: if the bam is large, for efficiency, consider - # 1) filtering the bin-specific bam first w.r.t. the final candidate vcf - # 2) then sort the bin-specific bam and merge them - # This would require moving the filter/sort bam jobs outside the hygen loop if self.params.isGenerateSupportBam : - bamIndex = 0 - # sort supporting bams extracted from normal samples - bamIndex = sortBams(self, sortBamVcfTasks, - taskPrefix=taskPrefix, binStr=binStr, - isNormal=True, bamIdx=bamIndex, - dependencies=hygenTask) - # sort supporting bams extracted from tumor samples - bamIndex = sortBams(self, sortBamVcfTasks, - taskPrefix=taskPrefix, binStr=binStr, - isNormal=False, bamIdx=bamIndex, - dependencies=hygenTask) + # sort evidence bams + # + # TODO: if the bam is large, for efficiency, consider + # 1) filtering the bin-specific bam first w.r.t. the final candidate vcf + # 2) then sort the bin-specific bam and merge them + # This would require moving the filter/sort bam jobs outside the hygen loop + sortEvidenceBams(self, sortEvidenceBamTasks, taskPrefix=taskPrefix, binStr=binStr, dependencies=hygenTask) - vcfTasks = sortAllVcfs(self,taskPrefix=taskPrefix,dependencies=hygenTasks) nextStepWait = copy.deepcopy(hygenTasks) + vcfTasks = sortAllVcfs(self,taskPrefix=taskPrefix,dependencies=hygenTasks) + nextStepWait = nextStepWait.union(vcfTasks) + if self.params.isGenerateSupportBam : - sortBamVcfTasks.union(vcfTasks) mergeBamTasks = set() - bamCount = 0 - # merge supporting bams for each normal sample - bamCount = mergeSupportBams(self, mergeBamTasks, taskPrefix=taskPrefix, - isNormal=True, bamIdx=bamCount, - dependencies=sortBamVcfTasks) - - # merge supporting bams for each tumor sample - bamCount = mergeSupportBams(self, mergeBamTasks, taskPrefix=taskPrefix, - isNormal=False, bamIdx=bamCount, - dependencies=sortBamVcfTasks) - - nextStepWait = nextStepWait.union(sortBamVcfTasks) + + # merge evidence bams + mergeEvidenceBams(self, mergeBamTasks, taskPrefix=taskPrefix, + dependencies=sortEvidenceBamTasks) + nextStepWait = nextStepWait.union(mergeBamTasks) #