Skip to content

Commit

Permalink
Merge pull request #329 from liulab-dfci/dev
Browse files Browse the repository at this point in the history
Parallelize the read preprocessing and kmer count steps in the assembly stage.
  • Loading branch information
mourisl authored Nov 13, 2024
2 parents 92c8bdf + 2df08f6 commit 54967e3
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 228 deletions.
58 changes: 42 additions & 16 deletions KmerCount.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <map>
#include <algorithm>
#include <pthread.h>

#include "KmerCode.hpp"

Expand All @@ -13,9 +14,9 @@ class KmerCount
private:
std::map<uint64_t, int> *count ;
int kmerLength ;
KmerCode kmerCode ;
int maxReadLen ;
int khashMax ;
pthread_mutex_t **locks ;

int *c ;

Expand All @@ -24,30 +25,40 @@ class KmerCount
return k % khashMax ;
}
public:
KmerCount( int k, int hmax = 1000003 ): kmerCode( k )
KmerCount( int k, int hmax = 1000003 )
{
kmerLength = k ;
khashMax = hmax ;
maxReadLen = -1 ;
c = NULL ;
count = new std::map<uint64_t, int>[khashMax] ;
locks = NULL ;
}

KmerCount(const KmerCount &b): kmerCode(b.kmerLength)
KmerCount(const KmerCount &b)
{
kmerLength = b.kmerLength ;
khashMax = b.khashMax ;
maxReadLen = -1 ;
c = NULL ;
count = new std::map<uint64_t, int>[khashMax] ;
locks = NULL ;
}

~KmerCount()
{
if ( c != NULL )
delete[] c ;
if ( count != NULL )
delete[] count ;
Release() ;
}

void SetPthreadLocks()
{
locks = new pthread_mutex_t *[khashMax] ;
int i ;
for (i = 0 ; i < khashMax ; ++i)
{
locks[i] = new pthread_mutex_t ;
pthread_mutex_init(locks[i], NULL) ;
}
}

int AddCount( char *read )
Expand All @@ -57,7 +68,7 @@ class KmerCount
if ( len < kmerLength )
return 0 ;

kmerCode.Restart() ;
KmerCode kmerCode(kmerLength) ;
for ( i = 0 ; i < kmerLength - 1 ; ++i )
kmerCode.Append( read[i] ) ;

Expand All @@ -67,7 +78,12 @@ class KmerCount
if ( kmerCode.IsValid() )
{
uint64_t kcode = kmerCode.GetCanonicalKmerCode() ;
++count[ GetHash(kcode) ][ kcode ] ;
int h = GetHash(kcode) ;
if (locks)
pthread_mutex_lock(locks[h]) ;
++count[h][ kcode ] ;
if (locks)
pthread_mutex_unlock(locks[h]) ;
/*if ( count[ GetHash( kcode ) ][ kcode ] >= 500 )
{
printf( "%s\n", read + i - kmerLength + 1 ) ;
Expand All @@ -86,6 +102,7 @@ class KmerCount
char buffer[100] ;
int i ;

KmerCode kmerCode(kmerLength) ;
while ( fscanf( fp, "%s", buffer ) != EOF )
{
int c = atoi( &buffer[ 1 ] ) ;
Expand All @@ -102,10 +119,9 @@ class KmerCount
fclose( fp ) ;
}

void Output( char *file )
void Output(FILE *fp)
{
int i, j ;
FILE *fp = fopen( file, "r" ) ;
char *buffer = new char[kmerLength + 1] ;
for ( i = 0 ; i < khashMax ; ++i )
{
Expand All @@ -116,7 +132,7 @@ class KmerCount

for ( j = 0 ; j < kmerLength ; ++j )
{
buffer[j] = nucToNum[ ( it->first >> ( 2 * j ) ) & 3 ] ;
buffer[j] = numToNuc[ ( it->first >> ( 2 * j ) ) & 3 ] ;
}
buffer[j] = '\0' ;
fprintf( fp, ">%d\n%s\n", it->second, buffer ) ;
Expand All @@ -141,7 +157,7 @@ class KmerCount
int GetCount( char *kmer )
{
int i ;
kmerCode.Restart() ;
KmerCode kmerCode(kmerLength) ;
for ( i = 0 ; i < kmerLength ; ++i )
kmerCode.Append( kmer[i] ) ;
if ( kmerCode.IsValid() )
Expand All @@ -165,7 +181,6 @@ class KmerCount
int sum ;
//minCount = medianCount = avgCount = 1 ;
//return 0 ;

if ( maxReadLen == -1 )
return 0 ;

Expand All @@ -183,8 +198,7 @@ class KmerCount
return 0 ;
}

//kmerCode.Restart() ;
KmerCode kmerCode( this->kmerCode.GetKmerLength() ) ;
KmerCode kmerCode(kmerLength) ;
for ( i = 0 ; i < kmerLength - 1 ; ++i )
kmerCode.Append( read[i] ) ;
k = 0 ;
Expand Down Expand Up @@ -289,6 +303,18 @@ class KmerCount

c = NULL ;
count = NULL ;

if (locks != NULL)
{
int i ;
for (i = 0 ; i < khashMax ; ++i)
{
pthread_mutex_destroy(locks[i]) ;
delete locks[i] ;
}
delete[] locks ;
locks = NULL ;
}
}
} ;

Expand Down
12 changes: 12 additions & 0 deletions ReadFiles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@ class ReadFiles
fileInd = currentFpInd ;
return batchSize ;
}

void FreeBatch(struct _Read *readBatch, int batchSize)
{
int i ;
for (i = 0 ; i < batchSize ; ++i)
{
free(readBatch[i].id) ;
free(readBatch[i].seq) ;
if (readBatch[i].qual)
free(readBatch[i].qual) ;
}
}

int GetFpUsed()
{
Expand Down
28 changes: 27 additions & 1 deletion SeqSet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2452,7 +2452,7 @@ class SeqSet
seqIndex.SetConsiderBarcode(s) ;
}

void ReverseComplement( char *rcSeq, char *seq, int len )
void ReverseComplement( char *rcSeq, const char *seq, int len )
{
int i ;
for ( i = 0 ; i < len ; ++i )
Expand All @@ -2465,6 +2465,32 @@ class SeqSet
rcSeq[i] = '\0' ;
}

void ReverseComplementInPlace(char *seq, int len)
{
int i, j ;
for (i = 0, j = len - 1 ; i < j ; ++i, --j)
{
char tmp = seq[j] ;

if (seq[i] != 'N')
seq[j] = numToNuc[3-nucToNum[seq[i] - 'A']];
else
seq[j] = 'N' ;

if (tmp != 'N')
seq[i] = numToNuc[3-nucToNum[tmp-'A']] ;
else
seq[i] = 'N' ;
}
if (i == j)
{
if (seq[i] != 'N')
seq[i] = numToNuc[3 - nucToNum[seq[i] - 'A']] ;
else
seq[i] = 'N' ;
}
}

void ReverseComplementInSeqSet(int idx)
{
// Should be used only when posWeight is not very meaningful.
Expand Down
Loading

0 comments on commit 54967e3

Please sign in to comment.