Skip to content

Commit

Permalink
[FEATURE] add parallel and progress'ised BWT construction
Browse files Browse the repository at this point in the history
  • Loading branch information
h-2 committed Jan 25, 2018
1 parent 7e4881f commit e59cc1f
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/mkindex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ realMain(LambdaIndexerOptions const & options,
TRedAlph(),
Fwd());
}
#ifdef LAMBDA_LEGACY_PATHS
else
{
using TIndexSpec = IndexSa<TIndexSpecSpec>;
Expand All @@ -277,6 +278,7 @@ realMain(LambdaIndexerOptions const & options,
TRedAlph(),
Fwd());
}
#endif

// dump options
for (auto && s : std::initializer_list<std::pair<std::string, std::string>>
Expand Down
65 changes: 43 additions & 22 deletions src/mkindex_algo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,18 +652,18 @@ parseAndDumpTaxTree(std::vector<bool> & taxIdIsPresent,
for (uint32_t i = 0; i < length(taxonParentIDs); ++i)
{
if (!taxIdIsPresentOrParent[i] && (taxonParentIDs[i] != 0))
std::cout << "WARNING: TaxID " << i << " has parent, but shouldn't.\n";
std::cerr << "WARNING: TaxID " << i << " has parent, but shouldn't.\n";

if (taxIdIsPresentOrParent[i] && (taxonParentIDs[i] == 0))
std::cout << "WARNING: TaxID " << i << " has no parent, but should.\n";
std::cerr << "WARNING: TaxID " << i << " has no parent, but should.\n";
if (taxIdIsPresent[i] && (taxonParentIDs[i] == 0))
std::cout << "WARNING: TaxID " << i << " has no parent, but should. 2\n";
std::cerr << "WARNING: TaxID " << i << " has no parent, but should. 2\n";

if (taxIdIsPresent[i] && !taxIdIsPresentOrParent[i])
std::cout << "WARNING: TaxID " << i << " disappeared, but shouldn't have.\n";
std::cerr << "WARNING: TaxID " << i << " disappeared, but shouldn't have.\n";

if (!taxIdIsPresent[i] && taxIdIsPresentOrParent[i] && (inDegrees[i] == 1))
std::cout << "WARNING: TaxID " << i << " should have disappeared, but didn't.\n";
std::cerr << "WARNING: TaxID " << i << " should have disappeared, but didn't.\n";
}
#endif

Expand Down Expand Up @@ -786,7 +786,8 @@ template <typename TText, typename TSpec, typename TConfig, typename TLambda>
void
indexCreateProgress(Index<TText, FMIndex<TSpec, TConfig> > & index,
FibreSALF const &,
TLambda && progressCallback)
TLambda && progressCallback,
LambdaIndexerOptions const & options)
{
typedef Index<TText, FMIndex<TSpec, TConfig> > TIndex;
typedef typename Fibre<TIndex, FibreTempSA>::Type TTempSA;
Expand All @@ -799,46 +800,63 @@ indexCreateProgress(Index<TText, FMIndex<TSpec, TConfig> > & index,
return;

TTempSA tempSA;
auto progressCallback2 = progressCallback; // need second lambda because counter internally increased

std::cout << "Generating 0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100%\n"
" (1) SuffixArray |" << std::flush;
double s = sysTime();
myPrint(options, 1, "Generating 0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100%\n"
" (1) SuffixArray |");
// Create the full SA.
resize(tempSA, lengthSum(text), Exact());
createSuffixArray(tempSA, text, TAlgo(), progressCallback);
double e = sysTime() - s;
myPrint(options, 2, " Runtime: ", e, "s");
myPrint(options, 1, "\n");

std::cout << " (2) FM-Index..." << std::flush;
s = sysTime();
myPrint(options, 1, " (2) BWT |");
// Create the LF table.
createLF(indexLF(index), text, tempSA);
createLFProgress(indexLF(index), text, tempSA, progressCallback2);
// createLF(indexLF(index), text, tempSA);

// Set the FMIndex LF as the CompressedSA LF.
setFibre(indexSA(index), indexLF(index), FibreLF());
e = sysTime() - s;
myPrint(options, 2, " Runtime: ", e, "s");
myPrint(options, 1, "\n");


// Create the compressed SA.
s = sysTime();
myPrint(options, 1, " (3) Sampling SA...");
TSize numSentinel = countSequences(text);
createCompressedSa(indexSA(index), tempSA, numSentinel);
std::cout << " done.\n" << std::flush;
myPrint(options, 1, " done.\n");
e = sysTime() - s;
myPrint(options, 2, " Runtime: ", e, "s\n");
}

template <typename TText, typename TSpec, typename TConfig, typename TLambda>
void
indexCreateProgress(Index<TText, BidirectionalIndex<FMIndex<TSpec, TConfig> > > & index,
FibreSALF const &,
TLambda && progressCallback)
TLambda && progressCallback,
LambdaIndexerOptions const & options)
{
auto progressCallback2 = progressCallback; // need second lambda because counter internally increased

std::cout << "Bi-Directional Index [forward]\n";
indexCreateProgress(index.fwd, FibreSALF(), progressCallback);
myPrint(options, 1, "Bi-Directional Index [forward]\n");
indexCreateProgress(index.fwd, FibreSALF(), progressCallback, options);

std::cout << "Bi-Directional Index [backward]\n";
indexCreateProgress(index.rev, FibreSALF(), progressCallback2);
myPrint(options, 1, "Bi-Directional Index [backward]\n");
indexCreateProgress(index.rev, FibreSALF(), progressCallback2, options);
}

template <typename TText, typename TSpec, typename TLambda>
void
indexCreateProgress(Index<TText, IndexSa<TSpec> > & index,
FibreSA const &,
TLambda && progressCallback)
TLambda && progressCallback,
LambdaIndexerOptions const & options)
{
typedef Index<TText, IndexSa<TSpec> > TIndex;
typedef typename Fibre<TIndex, FibreSA>::Type TSA;
Expand All @@ -851,8 +869,8 @@ indexCreateProgress(Index<TText, IndexSa<TSpec> > & index,

TSA & sa = getFibre(index, FibreSA());

std::cout << "Generating 0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100%\n"
" SuffixArray |" << std::flush;
myPrint(options, 1, "Generating Index 0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100%\n"
" Progress: |");
// Create the full SA.
resize(sa, lengthSum(text), Exact());
createSuffixArray(sa, text, TAlgo(), progressCallback);
Expand Down Expand Up @@ -932,7 +950,7 @@ generateIndexAndDump(StringSet<TString, TSpec> & seqs,

double s = sysTime();

// std::cout << "indexIsFM: " << int(indexIsFM) << std::endl;
// std::cerr << "indexIsFM: " << int(indexIsFM) << std::endl;

// FM-Index needs reverse input
if (indexIsFM && std::is_same<Tag<TDirection>, Fwd>::value)
Expand All @@ -952,7 +970,8 @@ generateIndexAndDump(StringSet<TString, TSpec> & seqs,
SEQAN_OMP_PRAGMA(critical(progressBar))
// if (TID == 0)
printProgressBar(_lastPercent, curPerc);
});
},
options);
}
else
{
Expand All @@ -979,8 +998,10 @@ generateIndexAndDump(StringSet<TString, TSpec> & seqs,

double e = sysTime() - s;
if (!hasProgress)
{
myPrint(options, 1, " done.\n");
myPrint(options, 2, "Runtime: ", e, "s \n\n");
myPrint(options, 2, "Runtime: ", e, "s \n\n");
}

// Dump Index
myPrint(options, 1, "Writing Index to disk...");
Expand Down
99 changes: 99 additions & 0 deletions src/mkindex_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ bool setEnv(TString const & key, TValue & value)
// Class ComparisonCounter
// ----------------------------------------------------------------------------

#if 0 // why is this still here?

template <typename TText, typename TSpec>
struct ComparisonCounter;

Expand Down Expand Up @@ -165,6 +167,7 @@ struct ComparisonCounter<TText, std::true_type>
};
#endif

#endif

// ----------------------------------------------------------------------------
// function _readMappingFileNCBI
Expand Down Expand Up @@ -337,4 +340,100 @@ _readMappingFileNCBI(TInputIterator & fit
// return 0;
// }

// ============================================================================
// Parallel BWT construction
// ============================================================================

template <typename TText, typename TSSetSpec, typename TSpec, typename TConfig,
typename TOtherText,
typename TSA,
typename TCallback>
inline void
createRankDictionaryProgress(LF<StringSet<TText, TSSetSpec>, TSpec, TConfig> & lf,
TOtherText const & text,
TSA const & sa,
TCallback && progress)
{
typedef typename Value<TSA>::Type TSAValue;
typedef typename Size<TSA>::Type TSize;

// Resize the RankDictionary.
TSize seqNum = countSequences(text);
TSize totalLen = lengthSum(text);
resize(lf.sentinels, seqNum + totalLen, Exact());
resize(lf.bwt, seqNum + totalLen, Exact());

// Fill the sentinel positions (they are all at the beginning of the bwt).
for (TSize i = 0; i < seqNum; ++i)
{
if (length(text[seqNum - (i + 1)]) > 0)
{
setValue(lf.bwt, i, back(text[seqNum - (i + 1)]));
setValue(lf.sentinels, i, false);
}
}

/* Compute the rest of the bwt.*/

// align the chunk_size to underlying word boundaries to prevent parallel write to word spanning chunk boundary
uint64_t chunkSize = _max((length(sa) / omp_get_max_threads() / 64) * 64, 1ull);
uint64_t twoPercent = chunkSize / 50;
// the 0th thread might get an additional chunk because of the above alignment so we count from the 1st instead
uint32_t countThreadID = omp_get_max_threads() > 1 ? 1 : 0;

SEQAN_OMP_PRAGMA(parallel for schedule(static, chunkSize))
for (TSize i = 0; i < length(sa); ++i)
{
TSAValue pos; // = SA[i];
posLocalize(pos, sa[i], stringSetLimits(text));

if (getSeqOffset(pos) != 0)
{
setValue(lf.bwt, i + seqNum, getValue(getValue(text, getSeqNo(pos)), getSeqOffset(pos) - 1));
setValue(lf.sentinels, i + seqNum, false);
}
else
{
setValue(lf.bwt, i + seqNum, lf.sentinelSubstitute);
setValue(lf.sentinels, i + seqNum, true);
}

if (((static_cast<uint32_t>(omp_get_thread_num()) == countThreadID) && ((i % chunkSize) % twoPercent == 0)))
progress(((i % chunkSize) / twoPercent) * 2);
}

// Update all ranks.
updateRanks(lf.bwt);
// Update the auxiliary RankDictionary of sentinel positions.
updateRanks(lf.sentinels);
}

template <typename TText, typename TSpec, typename TConfig, typename TOtherText, typename TSA, typename TCallback>
inline void
createLFProgress(LF<TText, TSpec, TConfig> & lf, TOtherText const & text, TSA const & sa, TCallback && progress)
{
typedef LF<TText, TSpec, TConfig> TLF;
typedef typename Value<TLF>::Type TValue;
typedef typename Size<TLF>::Type TSize;

// Clear assuming undefined state.
clear(lf);

// Compute prefix sum.
prefixSums<TValue>(lf.sums, text);

// Choose the sentinel substitute.
_setSentinelSubstitute(lf);

// Create and index BWT bwt for rank queries.
createRankDictionaryProgress(lf, text, sa, progress);

// Add sentinels to prefix sum.
TSize sentinelsCount = countSequences(text);
for (TSize i = 0; i < length(lf.sums); ++i)
lf.sums[i] += sentinelsCount;

progress(100);
}

#endif // LAMBDA_INDEXER_MISC_HPP_
2 changes: 2 additions & 0 deletions src/search.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ realMain(LambdaOptions & options,
}
}

myPrint(options, 1, "\n");

myWriteFooter(globalHolder, options);

if (!options.doubleIndexing)
Expand Down
2 changes: 1 addition & 1 deletion src/shared_misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ printProgressBar(uint64_t & lastPercent, uint64_t curPerc)
for (uint64_t i = lastPercent + 2; i <= curPerc; i+=2)
{
if (i == 100)
std::cout << "|\n" << std::flush;
std::cout << "|" << std::flush;
else if (i % 10 == 0)
std::cout << ":" << std::flush;
else
Expand Down

0 comments on commit e59cc1f

Please sign in to comment.