Skip to content

Commit

Permalink
Add stream-based merging
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Mar 11, 2019
1 parent 7f3be9d commit f67022f
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 99 deletions.
90 changes: 54 additions & 36 deletions src/main/java/htsjdk/samtools/BAMIndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,56 +36,61 @@
* Merges BAM index files for (headerless) parts of a BAM file into a single
* index file. The index files must have been produced using an uninitialized window (TODO).
*/
public class BAMIndexMerger {
public class BAMIndexMerger extends IndexMerger<AbstractBAMFileIndex> {

private static final int UNINITIALIZED_WINDOW = -1;

/**
* Merge BAI files for (headerless) BAM file parts.
*
* @param header the header for the file
* @param partLengths the lengths, in bytes, of the headerless BAM file parts
* @param baiStreams streams for the BAI files to merge
* @param baiOut the output stream for the resulting merged BAI
*/
public static void merge(
final SAMFileHeader header,
final List<Long> partLengths,
final List<SeekableStream> baiStreams,
final OutputStream baiOut) {
if (baiStreams.isEmpty()) {
throw new IllegalArgumentException("Cannot merge zero BAI files");
private int numReferences;
private final List<List<BAMIndexContent>> content = new ArrayList<>();
private long noCoordinateCount;

public BAMIndexMerger(final OutputStream out, final long headerLength) {
super(out, headerLength);
}

@Override
public void processIndex(final AbstractBAMFileIndex index, final long partLength) {
this.partLengths.add(partLength);
if (content.isEmpty()) {
numReferences = index.getNumberOfReferences();
for (int ref = 0; ref < numReferences; ref++) {
content.add(new ArrayList<>());
}
}
final SAMSequenceDictionary dict = header.getSequenceDictionary();
final List<AbstractBAMFileIndex> bais = new ArrayList<>();
for (SeekableStream baiStream : baiStreams) {
bais.add(new CachingBAMFileIndex(baiStream, dict));
if (index.getNumberOfReferences() != numReferences) {
throw new IllegalArgumentException(
String.format("Cannot merge BAI files with different number of references, %s and %s.", numReferences, index.getNumberOfReferences()));
}
final int numReferences = bais.get(0).getNumberOfReferences();
for (AbstractBAMFileIndex bai : bais) {
if (bai.getNumberOfReferences() != numReferences) {
throw new IllegalArgumentException(
String.format("Cannot merge BAI files with different number of references, %s and %s.", numReferences, bai.getNumberOfReferences()));
}
for (int ref = 0; ref < numReferences; ref++) {
final List<BAMIndexContent> bamIndexContentList = content.get(ref);
bamIndexContentList.add(index.getQueryResults(ref));
}
noCoordinateCount += index.getNoCoordinateCount();
}

@Override
public void finish(final long dataFileLength) {
if (content.isEmpty()) {
throw new IllegalArgumentException("Cannot merge zero BAI files");
}
final long[] offsets = partLengths.stream().mapToLong(i -> i).toArray();
Arrays.parallelPrefix(offsets, (a, b) -> a + b); // cumulative offsets

try (BinaryBAMIndexWriter writer =
new BinaryBAMIndexWriter(numReferences, baiOut)) {
new BinaryBAMIndexWriter(numReferences, out)) {
for (int ref = 0; ref < numReferences; ref++) {
final List<BAMIndexContent> bamIndexContentList = new ArrayList<>();
for (AbstractBAMFileIndex bai : bais) {
bamIndexContentList.add(bai.getQueryResults(ref));
}
BAMIndexContent bamIndexContent = mergeBAMIndexContent(ref, bamIndexContentList, offsets);
final List<BAMIndexContent> bamIndexContentList = content.get(ref);
final BAMIndexContent bamIndexContent = mergeBAMIndexContent(ref, bamIndexContentList, offsets);
writer.writeReference(bamIndexContent);
}
long noCoordinateCount = bais.stream().mapToLong(AbstractBAMFileIndex::getNoCoordinateCount).sum();
writer.writeNoCoordinateRecordCount(noCoordinateCount);
}
}

public static AbstractBAMFileIndex openIndex(SeekableStream stream, SAMSequenceDictionary dictionary) {
return new CachingBAMFileIndex(stream, dictionary);
}

private static BAMIndexContent mergeBAMIndexContent(final int referenceSequence,
final List<BAMIndexContent> bamIndexContentList, final long[] offsets) {
final List<BinningIndexContent.BinList> binLists = new ArrayList<>();
Expand All @@ -103,15 +108,21 @@ private static BAMIndexContent mergeBAMIndexContent(final int referenceSequence,
mergeLinearIndexes(referenceSequence, linearIndexes, offsets));
}

/**
* Merge bins for (headerless) BAM file parts.
* @param binLists the bins to merge
* @param offsets bin <i>i</i> will be shifted by offset <i>i</i>
* @return the merged bins
*/
public static BinningIndexContent.BinList mergeBins(final List<BinningIndexContent.BinList> binLists, final long[] offsets) {
final List<Bin> mergedBins = new ArrayList<>();
final int maxBinNumber = binLists.stream().mapToInt(bl -> bl.maxBinNumber).max().orElse(0);
int commonNonNullBins = 0;
for (int i = 0; i <= maxBinNumber; i++) {
List<Bin> nonNullBins = new ArrayList<>();
final List<Bin> nonNullBins = new ArrayList<>();
for (int j = 0; j < binLists.size(); j++) {
BinningIndexContent.BinList binList = binLists.get(j);
Bin bin = binList.getBin(i);
final BinningIndexContent.BinList binList = binLists.get(j);
final Bin bin = binList.getBin(i);
if (bin != null) {
nonNullBins.add(bin.shift(offsets[j]));
}
Expand Down Expand Up @@ -211,6 +222,13 @@ private static BAMIndexMetaData mergeMetaData(final List<BAMIndexMetaData> metaD
return new BAMIndexMetaData(chunkList);
}

/**
* Merge linear indexes for (headerless) BAM file parts.
* @param referenceSequence the reference sequence number for the linear indexes being merged
* @param linearIndexes the linear indexes to merge
* @param offsets linear index <i>i</i> will be shifted by offset <i>i</i>
* @return the merged linear index
*/
public static LinearIndex mergeLinearIndexes(final int referenceSequence, final List<LinearIndex> linearIndexes, final long[] offsets) {
int maxIndex = -1;
for (LinearIndex li : linearIndexes) {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/htsjdk/samtools/BAMIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sub-license, and/or sell
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
Expand All @@ -15,7 +15,7 @@
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/htsjdk/samtools/IndexMerger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* The MIT License
*
* Copyright (c) 2018 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package htsjdk.samtools;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

public abstract class IndexMerger<T> {
protected final OutputStream out;
protected final List<Long> partLengths;

public IndexMerger(final OutputStream out, final long headerLength) {
this.out = out;
this.partLengths = new ArrayList<>();
this.partLengths.add(headerLength);
}

public abstract void processIndex(final T index, final long partLength);

public abstract void finish(final long dataFileLength) throws IOException;
}
9 changes: 5 additions & 4 deletions src/main/java/htsjdk/samtools/SBIIndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* Merges SBI files for parts of a file that have been concatenated.
*/
public final class SBIIndexMerger {
public final class SBIIndexMerger extends IndexMerger<SBIIndex> {

private static final Log log = Log.getInstance(SBIIndexMerger.class);

Expand All @@ -49,6 +49,7 @@ public final class SBIIndexMerger {
* an index
*/
public SBIIndexMerger(final OutputStream out, long headerLength) {
super(out, headerLength);
this.indexWriter = new SBIIndexWriter(out);
this.offset = headerLength;
}
Expand All @@ -57,7 +58,8 @@ public SBIIndexMerger(final OutputStream out, long headerLength) {
* Add an index for a part of the data file to the merged index. This method should be called for
* each index for the data file parts, in order.
*/
public void processIndex(final SBIIndex index) {
@Override
public void processIndex(final SBIIndex index, final long partLength) {
final long[] virtualOffsets = index.getVirtualOffsets();
for (int i = 0; i < virtualOffsets.length - 1; i++) {
indexWriter.writeVirtualOffset(BlockCompressedFilePointerUtil.shift(virtualOffsets[i], offset));
Expand All @@ -77,9 +79,8 @@ public void processIndex(final SBIIndex index) {

/**
* Complete the index, and close the output stream.
*
* @param dataFileLength the length of the data file in bytes
*/
@Override
public void finish(final long dataFileLength) {
final SBIIndex.Header header =
new SBIIndex.Header(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
/*
* The MIT License
*
* Copyright (c) 2018 The Broad Institute
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package htsjdk.tribble.index.tabix;

import htsjdk.samtools.BinningIndexContent;
Expand Down Expand Up @@ -44,8 +67,8 @@ public StreamBasedTabixIndexCreator(

@Override
public Index finalizeIndex(long finalFilePosition) {
Index index = super.finalizeIndex(finalFilePosition);
TabixIndex tabixIndex = (TabixIndex) index;
final Index index = super.finalizeIndex(finalFilePosition);
final TabixIndex tabixIndex = (TabixIndex) index;
return new StreamBasedTabixIndex(
tabixIndex.getFormatSpec(),
tabixIndex.getSequenceNames(),
Expand Down
Loading

0 comments on commit f67022f

Please sign in to comment.