From d0739068b9142f2984ce398e8f4cfd34d9091ef5 Mon Sep 17 00:00:00 2001 From: dagou Date: Sat, 15 Jun 2024 21:09:09 +0800 Subject: [PATCH] seq kmer --- kr2r/src/bin/classify.rs | 386 ++++++++------------------------------- kr2r/src/kr2r_data.rs | 12 ++ seqkmer/Cargo.toml | 1 - seqkmer/src/fasta.rs | 106 +++++++++++ seqkmer/src/fastq.rs | 205 +++++++++++++++++---- seqkmer/src/lib.rs | 5 +- seqkmer/src/mmscanner.rs | 47 +++-- seqkmer/src/parallel.rs | 105 +++++------ seqkmer/src/reader.rs | 43 ++++- seqkmer/src/seq.rs | 52 +++++- 10 files changed, 535 insertions(+), 427 deletions(-) create mode 100644 seqkmer/src/fasta.rs diff --git a/kr2r/src/bin/classify.rs b/kr2r/src/bin/classify.rs index 7c8e890..ab448c6 100644 --- a/kr2r/src/bin/classify.rs +++ b/kr2r/src/bin/classify.rs @@ -1,18 +1,14 @@ use clap::Parser; use kr2r::classify::{add_hitlist_string, count_values, resolve_tree, trim_pair_info}; use kr2r::compact_hash::{CHTable, Compact, HashConfig, Row}; -use kr2r::mmscanner::MinimizerScanner; use kr2r::readcounts::{TaxonCounters, TaxonCountersDash}; use kr2r::report::report_kraken_style; -use kr2r::seq::{self, open_fasta_reader, SeqX}; use kr2r::taxonomy::Taxonomy; use kr2r::utils::{ create_sample_file, detect_file_format, find_and_sort_files, get_lastest_file_index, FileFormat, }; -use kr2r::{IndexOptions, Meros}; -use seq_io::fasta::Record; -use seq_io::fastq::Record as FqRecord; -use seq_io::parallel::read_parallel; +use kr2r::IndexOptions; +use seqkmer::seq::BaseType; use std::collections::HashMap; use std::fs::File; use std::io::{self, BufWriter, Write}; @@ -21,10 +17,10 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; -use seqkmer::fastq::FastqReader; -use seqkmer::parallel::read_parallel as s_parallel; -use seqkmer::reader::Reader; -use seqkmer::Meros as SMeros; +use seqkmer::parallel::read_parallel; +use seqkmer::reader::SeqMer; +use seqkmer::Meros; +use seqkmer::{reader::Reader, FastaReader, FastqPairReader, FastqReader}; #[derive(Parser, Debug, Clone)] #[clap( @@ -97,17 +93,16 @@ pub struct Args { } fn process_seq( - miner: MinimizerScanner, + minimizer: &Vec, hash_config: &HashConfig, chtable: &CHTable, offset: u32, -) -> (u32, Vec) { +) -> Vec { let chunk_size = hash_config.hash_capacity; let value_bits = hash_config.value_bits; let mut rows = Vec::new(); - let mut kmer_count = 0; - for (sort, hash_key) in miner.into_iter().enumerate() { + for (sort, &hash_key) in minimizer.iter().enumerate() { let idx = hash_config.index(hash_key); let partition_index = idx / chunk_size; let index = idx % chunk_size; @@ -118,132 +113,46 @@ fn process_seq( let row = Row::new(high, 0, sort as u32 + 1 + offset); rows.push(row); } - kmer_count += 1; } - (kmer_count, rows) + rows } fn process_record( dna_id: String, - seq1: Vec, - seq2: Option>, + seq: &SeqMer, args: &Args, taxonomy: &Taxonomy, - meros: Meros, chtable: &CHTable, hash_config: &HashConfig, cur_taxon_counts: &TaxonCountersDash, classify_counter: &AtomicUsize, ) -> String { let value_mask = hash_config.value_mask; - let mut seq_len_str = String::new(); - let seq1_len = seq1.len(); - seq_len_str.push_str(&seq1_len.to_string()); - - let scan1 = MinimizerScanner::new(&seq1, meros); - let (kmer_count1, mut rows) = process_seq(scan1, &hash_config, chtable, 0); - let kmer_count2 = if let Some(seq) = seq2 { - let scan2 = MinimizerScanner::new(&seq, meros); - let (kmer_count2, rows2) = process_seq(scan2, &hash_config, chtable, kmer_count1); - rows.extend_from_slice(&rows2); - seq_len_str.push_str(format!("|{}", seq.len()).as_str()); - Some(kmer_count2) - } else { - None - }; - let total_kmers: usize = (kmer_count1 + kmer_count2.unwrap_or(0)) as usize; - let (counts, cur_counts, hit_groups) = count_values(&rows, value_mask, kmer_count1); - let hit_string = add_hitlist_string(&rows, value_mask, kmer_count1, kmer_count2, taxonomy); - let mut call = resolve_tree(&counts, taxonomy, total_kmers, args.confidence_threshold); - if call > 0 && hit_groups < args.minimum_hit_groups { - call = 0; - }; - cur_counts.iter().for_each(|entry| { - cur_taxon_counts - .entry(*entry.key()) - .or_default() - .merge(entry.value()) - .unwrap(); - }); - - let ext_call = taxonomy.nodes[call as usize].external_id; - let clasify = if call > 0 { - classify_counter.fetch_add(1, Ordering::SeqCst); - cur_taxon_counts - .entry(call as u64) - .or_default() - .increment_read_count(); - - "C" - } else { - "U" - }; - // 使用锁来同步写入 - let output_line = format!( - "{}\t{}\t{}\t{}\t{}\n", - clasify, dna_id, ext_call, seq_len_str, hit_string - ); - output_line -} - -fn process_seq1( - miner: Vec, - hash_config: &HashConfig, - chtable: &CHTable, - offset: u32, -) -> (u32, Vec) { - let chunk_size = hash_config.hash_capacity; - let value_bits = hash_config.value_bits; - - let mut rows = Vec::new(); - let mut kmer_count = 0; - for (sort, hash_key) in miner.into_iter().enumerate() { - let idx = hash_config.index(hash_key); - let partition_index = idx / chunk_size; - let index = idx % chunk_size; - let taxid = chtable.get_from_page(index, hash_key, partition_index + 1); - if taxid > 0 { - let compacted_key = hash_key.left(value_bits) as u32; - let high = u32::combined(compacted_key, taxid, value_bits); - let row = Row::new(high, 0, sort as u32 + 1 + offset); - rows.push(row); + let seq_len_str = seq.fmt_size(); + let (kmer_count1, kmer_count2, rows) = match &seq.marker { + BaseType::Single(marker) => ( + marker.size(), + 0, + process_seq(&marker.minimizer, &hash_config, chtable, 0), + ), + BaseType::Pair((marker1, marker2)) => { + let mut rows = process_seq(&marker1.minimizer, &hash_config, chtable, 0); + let seq_len1 = marker1.size(); + let rows2 = process_seq(&marker2.minimizer, &hash_config, chtable, seq_len1 as u32); + rows.extend_from_slice(&rows2); + (seq_len1, marker2.size(), rows) } - kmer_count += 1; - } - (kmer_count, rows) -} - -fn process_record1( - dna_id: String, - seq1: Vec, - seq2: Option>, - args: &Args, - taxonomy: &Taxonomy, - meros: Meros, - chtable: &CHTable, - hash_config: &HashConfig, - cur_taxon_counts: &TaxonCountersDash, - classify_counter: &AtomicUsize, -) -> String { - let value_mask = hash_config.value_mask; - let mut seq_len_str = String::new(); - let seq1_len = seq1.len(); - seq_len_str.push_str(&seq1_len.to_string()); - - let (kmer_count1, mut rows) = process_seq1(seq1, &hash_config, chtable, 0); - let kmer_count2 = if let Some(seq) = seq2 { - let scan2 = MinimizerScanner::new(&seq, meros); - let (kmer_count2, rows2) = process_seq(scan2, &hash_config, chtable, kmer_count1); - rows.extend_from_slice(&rows2); - seq_len_str.push_str(format!("|{}", seq.len()).as_str()); - Some(kmer_count2) - } else { - None }; - let total_kmers: usize = (kmer_count1 + kmer_count2.unwrap_or(0)) as usize; - let (counts, cur_counts, hit_groups) = count_values(&rows, value_mask, kmer_count1); - let hit_string = add_hitlist_string(&rows, value_mask, kmer_count1, kmer_count2, taxonomy); + let total_kmers = kmer_count1 + kmer_count2; + let (counts, cur_counts, hit_groups) = count_values(&rows, value_mask, kmer_count1 as u32); + let hit_string = add_hitlist_string( + &rows, + value_mask, + kmer_count1 as u32, + Some(kmer_count2 as u32), + taxonomy, + ); let mut call = resolve_tree(&counts, taxonomy, total_kmers, args.confidence_threshold); if call > 0 && hit_groups < args.minimum_hit_groups { call = 0; @@ -277,20 +186,16 @@ fn process_record1( output_line } -fn process_fasta_file( +fn process_fastx_file( args: &Args, meros: Meros, hash_config: HashConfig, file_index: usize, - files: &[String], + reader: &mut Box, chtable: &CHTable, taxonomy: &Taxonomy, total_taxon_counts: &mut TaxonCounters, ) -> io::Result<(usize, usize)> { - let score = args.minimum_quality_score; - let mut files_iter = files.iter(); - let file1 = files_iter.next().cloned().unwrap(); - let mut writer: Box = match &args.kraken_output_dir { Some(ref file_path) => { let filename = file_path.join(format!("output_{}.txt", file_index)); @@ -301,30 +206,26 @@ fn process_fasta_file( }; let cur_taxon_counts = TaxonCountersDash::new(); + let sequence_count = AtomicUsize::new(0); let classify_counter = AtomicUsize::new(0); - let reader = open_fasta_reader(&file1).expect("Unable to create fasta reader from path"); - read_parallel( + let _ = read_parallel( reader, - args.num_threads as u32, - args.num_threads as usize, - |record_set| { + 13, + 15, + meros, + |seqs| { let mut buffer = String::new(); - - for records in record_set.into_iter() { - let dna_id = trim_pair_info(records.id().unwrap_or_default()); + for seq in seqs { + let dna_id = trim_pair_info(&seq.id); sequence_count.fetch_add(1, Ordering::SeqCst); - let seq1: Vec = records.seq_x(score); - let seq2 = None; let output_line = process_record( dna_id, - seq1, - seq2, + &seq, args, taxonomy, - meros, chtable, &hash_config, &cur_taxon_counts, @@ -333,13 +234,14 @@ fn process_fasta_file( buffer.push_str(&output_line); } - buffer + + Some(buffer) }, - |record_sets| { - while let Some(Ok((_, buffer))) = record_sets.next() { + |dataset| { + while let Ok(Some(res)) = dataset.next() { writer - .write_all(buffer.as_bytes()) - .expect("write data error"); + .write_all(res.as_bytes()) + .expect("Failed to write date to file"); } }, ); @@ -379,138 +281,6 @@ fn process_fasta_file( Ok((thread_sequences, thread_sequences - thread_classified)) } -/// fastq -fn process_fastq_file( - args: &Args, - meros: Meros, - hash_config: HashConfig, - file_index: usize, - files: &[String], - chtable: &CHTable, - taxonomy: &Taxonomy, - total_taxon_counts: &mut TaxonCounters, -) -> io::Result<(usize, usize)> { - let score = args.minimum_quality_score; - let mut files_iter = files.iter(); - let file1 = files_iter.next().cloned().unwrap(); - let file2 = files_iter.next().cloned(); - - let mut writer: Box = match &args.kraken_output_dir { - Some(ref file_path) => { - let filename = file_path.join(format!("output_{}.txt", file_index)); - let file = File::create(filename)?; - Box::new(BufWriter::new(file)) as Box - } - None => Box::new(io::stdout()) as Box, - }; - - let cur_taxon_counts = TaxonCountersDash::new(); - - let sequence_count = AtomicUsize::new(0); - let classify_counter = AtomicUsize::new(0); - - let mut reader1 = FastqReader::from_path(&file1, 1, 0)?; - let _ = s_parallel( - &mut reader1, - 13, - 15, - None, - SMeros::default(), - |seq1, seq| { - let dna_id = trim_pair_info(&seq.id); - sequence_count.fetch_add(1, Ordering::SeqCst); - - let seq2 = None; - let output_line = process_record1( - dna_id, - seq1, - seq2, - args, - taxonomy, - meros, - chtable, - &hash_config, - &cur_taxon_counts, - &classify_counter, - ); - None - }, - ); - // let reader = seq::PairFastqReader::from_path(&file1, file2.as_ref()) - // .expect("Unable to create pair reader from paths"); - // read_parallel( - // reader, - // args.num_threads as u32, - // args.num_threads as usize, - // |record_set| { - // let mut buffer = String::new(); - - // for records in record_set.into_iter() { - // let dna_id = trim_pair_info(records.0.id().unwrap_or_default()); - // sequence_count.fetch_add(1, Ordering::SeqCst); - // let seq1: Vec = records.0.seq_x(score); - // let seq2 = records.1.map(|seq| seq.seq_x(score)); - // let output_line = process_record( - // dna_id, - // seq1, - // seq2, - // args, - // taxonomy, - // meros, - // chtable, - // &hash_config, - // &cur_taxon_counts, - // &classify_counter, - // ); - - // buffer.push_str(&output_line); - // } - // buffer - // }, - // |record_sets| { - // while let Some(Ok((_, buffer))) = record_sets.next() { - // writer - // .write_all(buffer.as_bytes()) - // .expect("write data error"); - // } - // }, - // ); - - let mut sample_taxon_counts: HashMap< - u64, - kr2r::readcounts::ReadCounts>, - > = HashMap::new(); - cur_taxon_counts.iter().for_each(|entry| { - total_taxon_counts - .entry(*entry.key()) - .or_default() - .merge(&entry.value()) - .unwrap(); - sample_taxon_counts - .entry(*entry.key()) - .or_default() - .merge(&entry.value()) - .unwrap(); - }); - - let thread_sequences = sequence_count.load(Ordering::SeqCst); - let thread_classified = classify_counter.load(Ordering::SeqCst); - if let Some(output) = &args.kraken_output_dir { - let filename = output.join(format!("output_{}.kreport2", file_index)); - report_kraken_style( - filename, - args.report_zero_counts, - args.report_kmer_data, - &taxonomy, - &sample_taxon_counts, - thread_sequences as u64, - (thread_sequences - thread_classified) as u64, - )?; - } - - Ok((thread_sequences, thread_sequences - thread_classified)) -} - fn process_files( args: Args, meros: Meros, @@ -542,36 +312,34 @@ fn process_files( writeln!(file_writer, "{}\t{}", file_index, file_pair.join(","))?; file_writer.flush().unwrap(); - match detect_file_format(&file_pair[0])? { + let mut files_iter = file_pair.iter(); + let file1 = files_iter.next().cloned().unwrap(); + let file2 = files_iter.next().cloned(); + let score = args.minimum_quality_score; + + let mut reader: Box = match detect_file_format(&file_pair[0])? { FileFormat::Fastq => { - let (thread_sequences, thread_unclassified) = process_fastq_file( - &args, - meros, - hash_config, - file_index, - file_pair, - chtable, - taxonomy, - &mut total_taxon_counts, - )?; - total_seqs += thread_sequences; - total_unclassified += thread_unclassified; - } - FileFormat::Fasta => { - let (thread_sequences, thread_unclassified) = process_fasta_file( - &args, - meros, - hash_config, - file_index, - file_pair, - chtable, - taxonomy, - &mut total_taxon_counts, - )?; - total_seqs += thread_sequences; - total_unclassified += thread_unclassified; + if let Some(file2) = file2 { + Box::new(FastqPairReader::from_path(file1, file2, file_index, score)?) + } else { + Box::new(FastqReader::from_path(file1, file_index, score)?) + } } - } + FileFormat::Fasta => Box::new(FastaReader::from_path(file1, file_index)?), + }; + + let (thread_sequences, thread_unclassified) = process_fastx_file( + &args, + meros, + hash_config, + file_index, + &mut reader, + chtable, + taxonomy, + &mut total_taxon_counts, + )?; + total_seqs += thread_sequences; + total_unclassified += thread_unclassified; } if let Some(output) = &args.kraken_output_dir { let filename = output.join("output.kreport2"); @@ -624,7 +392,7 @@ pub fn run(args: Args) -> Result<()> { } println!("start..."); let start = Instant::now(); - let meros = idx_opts.as_meros(); + let meros = idx_opts.as_smeros(); let hash_files = find_and_sort_files(&args.k2d_dir, "hash", ".k2d")?; let chtable = CHTable::from_hash_files(hash_config, hash_files)?; diff --git a/kr2r/src/kr2r_data.rs b/kr2r/src/kr2r_data.rs index b993634..792f2c1 100644 --- a/kr2r/src/kr2r_data.rs +++ b/kr2r/src/kr2r_data.rs @@ -9,6 +9,8 @@ use std::io::{Read, Result as IoResult, Write}; use std::mem; use std::path::Path; +use seqkmer::Meros as SMeros; + pub fn parse_binary(src: &str) -> Result { u64::from_str_radix(src, 2) } @@ -180,4 +182,14 @@ impl IndexOptions { u64_to_option(self.minimum_acceptable_hash_value), ) } + + pub fn as_smeros(&self) -> SMeros { + SMeros::new( + self.k, + self.l, + u64_to_option(self.spaced_seed_mask), + u64_to_option(self.toggle_mask), + u64_to_option(self.minimum_acceptable_hash_value), + ) + } } diff --git a/seqkmer/Cargo.toml b/seqkmer/Cargo.toml index 03a50de..76a0d27 100644 --- a/seqkmer/Cargo.toml +++ b/seqkmer/Cargo.toml @@ -11,7 +11,6 @@ crossbeam-channel = "0.5" scoped_threadpool = "0.1.9" flate2 = "1.0" - [features] default = ["dna"] dna = [] diff --git a/seqkmer/src/fasta.rs b/seqkmer/src/fasta.rs new file mode 100644 index 0000000..aa0e173 --- /dev/null +++ b/seqkmer/src/fasta.rs @@ -0,0 +1,106 @@ +use crate::reader::{dyn_reader, trim_end, Reader, BUFSIZE}; +use crate::seq::{BaseType, SeqFormat, Sequence}; +use std::io::{BufRead, BufReader, Read, Result}; +use std::path::Path; + +/// FastaReader +pub struct FastaReader +where + R: Read + Send, +{ + reader: BufReader, + file_index: usize, + reads_index: usize, + header: Vec, + seq: Vec, +} + +impl FastaReader +where + R: Read + Send, +{ + pub fn new(reader: R, file_index: usize) -> Self { + Self::with_capacity(reader, file_index, BUFSIZE) + } + + pub fn with_capacity(reader: R, file_index: usize, capacity: usize) -> Self { + assert!(capacity >= 3); + Self { + reader: BufReader::with_capacity(capacity, reader), + file_index, + reads_index: 0, + header: Vec::new(), + seq: Vec::new(), + } + } + + pub fn read_next(&mut self) -> Result> { + // 读取fastq文件header部分 + self.header.clear(); + if self.reader.read_until(b'\n', &mut self.header)? == 0 { + return Ok(None); + } + // 读取fasta文件seq部分 + self.seq.clear(); + if self.reader.read_until(b'>', &mut self.seq)? == 0 { + return Ok(None); + } + trim_end(&mut self.seq); + Ok(Some(())) + } +} + +impl FastaReader> { + #[inline] + pub fn from_path>(path: P, file_index: usize) -> Result { + let reader = dyn_reader(path)?; + Ok(Self::new(reader, file_index)) + } +} + +fn check_sequence_length(seq: &Vec) -> bool { + let limit = u64::pow(2, 32); + // 检查seq的长度是否大于2的32次方 + (seq.len() as u64) > limit +} + +impl Reader for FastaReader { + fn next(&mut self) -> Result>> { + if self.read_next()?.is_none() { + return Ok(None); + } + + if check_sequence_length(&self.seq) { + eprintln!("Sequence length exceeds 2^32, which is not handled."); + return Ok(None); + } + + let seq_id = unsafe { + let slice = if self.header.starts_with(b">") { + &self.header[1..] + } else { + &self.header[..] + }; + + let s = std::str::from_utf8_unchecked(slice); + let first_space_index = s + .as_bytes() + .iter() + .position(|&c| c == b' ') + .unwrap_or(s.len()); + + // 直接从原始切片创建第一个单词的切片 + &s[..first_space_index] + }; + self.reads_index += 1; + + let sequence = Sequence { + file_index: self.file_index, + reads_index: self.reads_index, + id: seq_id.to_owned(), + seq: BaseType::Single(self.seq.to_owned()), + format: SeqFormat::Fasta, + }; + Ok(Some(vec![sequence])) + } +} diff --git a/seqkmer/src/fastq.rs b/seqkmer/src/fastq.rs index 698114c..ab61784 100644 --- a/seqkmer/src/fastq.rs +++ b/seqkmer/src/fastq.rs @@ -1,46 +1,35 @@ use crate::reader::{dyn_reader, trim_end, Reader, BUFSIZE}; -use crate::seq::{SeqFormat, Sequence}; +use crate::seq::{BaseType, SeqFormat, Sequence}; use std::io::{BufRead, BufReader, Read, Result}; use std::path::Path; -/// FastqReader -pub struct FastqReader { - pub reader: BufReader, - pub file_index: u64, - pub reads_index: u64, - pub seq_id: String, +struct QReader { + reader: BufReader, + quality_score: i32, - score: i32, header: Vec, seq: Vec, plus: Vec, quals: Vec, } -impl FastqReader +impl QReader where R: Read + Send, { - pub fn new(reader: R, file_index: u64, score: i32) -> Self { - Self::with_capacity(reader, file_index, BUFSIZE, score) - } - - pub fn with_capacity<'a>(reader: R, file_index: u64, capacity: usize, score: i32) -> Self { + pub fn with_capacity(reader: R, capacity: usize, quality_score: i32) -> Self { assert!(capacity >= 3); Self { reader: BufReader::with_capacity(capacity, reader), - file_index, - reads_index: 0, - seq_id: String::new(), header: Vec::new(), seq: Vec::new(), plus: Vec::new(), quals: Vec::new(), - score, + quality_score, } } - pub fn read_lines(&mut self) -> Result> { + pub fn read_next(&mut self) -> Result> { // 读取fastq文件header部分 self.header.clear(); if self.reader.read_until(b'\n', &mut self.header)? == 0 { @@ -66,8 +55,58 @@ where } trim_end(&mut self.quals); + if self.quality_score > 0 { + for (base, &qscore) in self.seq.iter_mut().zip(self.quals.iter()) { + if (qscore as i32 - '!' as i32) < self.quality_score { + *base = b'x'; + } + } + } + + Ok(Some(())) + } +} + +/// FastqReader +pub struct FastqReader { + inner: QReader, + file_index: usize, + reads_index: usize, + // 批量读取 + batch_size: usize, +} + +impl FastqReader +where + R: Read + Send, +{ + pub fn new(reader: R, file_index: usize, quality_score: i32) -> Self { + Self::with_capacity(reader, file_index, BUFSIZE, quality_score, 30) + } + + pub fn with_capacity<'a>( + reader: R, + file_index: usize, + capacity: usize, + quality_score: i32, + batch_size: usize, + ) -> Self { + assert!(capacity >= 3); + Self { + inner: QReader::with_capacity(reader, capacity, quality_score), + file_index, + reads_index: 0, + batch_size, + } + } + + pub fn read_next(&mut self) -> Result> { + if self.inner.read_next()?.is_none() { + return Ok(None); + } + let seq_id = unsafe { - let s = std::str::from_utf8_unchecked(&self.header[1..]); + let s = std::str::from_utf8_unchecked(&self.inner.header[1..]); let first_space_index = s .as_bytes() .iter() @@ -79,20 +118,12 @@ where }; self.reads_index += 1; - if self.score > 0 { - for (base, &qscore) in self.seq.iter_mut().zip(self.quals.iter()) { - if (qscore as i32 - '!' as i32) < self.score { - *base = b'x'; - } - } - } - let sequence = Sequence { file_index: self.file_index, reads_index: self.reads_index, id: seq_id.to_owned(), - seq: self.seq.to_owned(), - format: SeqFormat::FASTQ, + seq: BaseType::Single(self.inner.seq.to_owned()), + format: SeqFormat::Fastq, }; Ok(Some(sequence)) } @@ -100,17 +131,121 @@ where impl FastqReader> { #[inline] - pub fn from_path>(path: P, file_index: u64, score: i32) -> Result { + pub fn from_path>( + path: P, + file_index: usize, + quality_score: i32, + ) -> Result { let reader = dyn_reader(path)?; - Ok(Self::new(reader, file_index, score)) + Ok(Self::new(reader, file_index, quality_score)) } } -impl Reader for FastqReader +impl Reader for FastqReader where R: Read + Send, { - fn next(&mut self) -> Result> { - self.read_lines() + fn next(&mut self) -> Result>> { + let seqs: Vec = (0..self.batch_size) + .filter_map(|_| self.read_next().transpose()) // 将 Result, _> 转换为 Option> + .collect::>>()?; + + Ok(Some(seqs).filter(|v| !v.is_empty())) + } +} + +/// FastqPairReader +pub struct FastqPairReader { + inner1: QReader, + inner2: QReader, + file_index: usize, + reads_index: usize, + // 批量读取 + batch_size: usize, +} + +impl FastqPairReader +where + R: Read + Send, +{ + pub fn new(reader1: R, reader2: R, file_index: usize, score: i32) -> Self { + Self::with_capacity(reader1, reader2, file_index, BUFSIZE, score, 30) + } + + pub fn with_capacity<'a>( + reader1: R, + reader2: R, + file_index: usize, + capacity: usize, + score: i32, + batch_size: usize, + ) -> Self { + assert!(capacity >= 3); + Self { + inner1: QReader::with_capacity(reader1, capacity, score), + inner2: QReader::with_capacity(reader2, capacity, score), + file_index, + reads_index: 0, + batch_size, + } + } + + pub fn read_next(&mut self) -> Result> { + if self.inner1.read_next()?.is_none() { + return Ok(None); + } + + if self.inner2.read_next()?.is_none() { + return Ok(None); + } + + let seq_id = unsafe { + let s = std::str::from_utf8_unchecked(&self.inner1.header[1..]); + let first_space_index = s + .as_bytes() + .iter() + .position(|&c| c == b' ') + .unwrap_or(s.len()); + + // 直接从原始切片创建第一个单词的切片 + &s[..first_space_index] + }; + self.reads_index += 1; + + let sequence = Sequence { + file_index: self.file_index, + reads_index: self.reads_index, + id: seq_id.to_owned(), + seq: BaseType::Pair((self.inner1.seq.to_owned(), self.inner2.seq.to_owned())), + format: SeqFormat::PairFastq, + }; + Ok(Some(sequence)) + } +} + +impl FastqPairReader> { + #[inline] + pub fn from_path>( + path1: P, + path2: P, + file_index: usize, + quality_score: i32, + ) -> Result { + let reader1 = dyn_reader(path1)?; + let reader2 = dyn_reader(path2)?; + Ok(Self::new(reader1, reader2, file_index, quality_score)) + } +} + +impl Reader for FastqPairReader +where + R: Read + Send, +{ + fn next(&mut self) -> Result>> { + let seqs: Vec = (0..self.batch_size) + .filter_map(|_| self.read_next().transpose()) // 将 Result, _> 转换为 Option> + .collect::>>()?; + + Ok(Some(seqs).filter(|v| !v.is_empty())) } } diff --git a/seqkmer/src/lib.rs b/seqkmer/src/lib.rs index 758e421..6a6d58d 100644 --- a/seqkmer/src/lib.rs +++ b/seqkmer/src/lib.rs @@ -1,4 +1,5 @@ -pub mod fastq; +mod fasta; +mod fastq; mod feat; pub mod mmscanner; pub mod reader; @@ -6,3 +7,5 @@ pub mod seq; pub use feat::constants::*; pub use feat::*; pub mod parallel; +pub use fasta::*; +pub use fastq::*; diff --git a/seqkmer/src/mmscanner.rs b/seqkmer/src/mmscanner.rs index b0d1fed..8fe7498 100644 --- a/seqkmer/src/mmscanner.rs +++ b/seqkmer/src/mmscanner.rs @@ -1,4 +1,5 @@ // kraken 2 使用的是murmur_hash3 算法的 fmix64作为 hash +use crate::seq::{BaseType, Marker}; use crate::{ canonical_representation, char_to_value, fmix64 as murmur_hash3, Meros, BITS_PER_CHAR, }; @@ -15,7 +16,7 @@ fn to_candidate_lmer(meros: &Meros, lmer: u64) -> u64 { #[derive(Debug)] struct MinimizerData { - pub pos: usize, + pos: usize, candidate_lmer: u64, } @@ -134,14 +135,14 @@ impl Cursor { } pub struct MinimizerScanner<'a> { - seq: &'a [u8], + seq: &'a BaseType>, meros: Meros, cursor: Cursor, window: MinimizerWindow, } impl<'a> MinimizerScanner<'a> { - pub fn new(seq: &'a [u8], meros: Meros) -> Self { + pub fn new(seq: &'a BaseType>, meros: Meros) -> Self { MinimizerScanner { seq, meros, @@ -156,27 +157,33 @@ impl<'a> MinimizerScanner<'a> { self.window.clear(); } - pub fn iter(&mut self) -> Vec { - self.seq + fn iter_seq(&mut self, seq: &Vec) -> Marker { + let minimizer = seq .iter() .filter_map(|&ch| { - // if ch == b'\n' || ch == b'\r' { - // None - // } else { - match char_to_value(ch) { - Some(code) => self.cursor.next_lmer(code).and_then(|lmer| { - let candidate_lmer: u64 = to_candidate_lmer(&self.meros, lmer); - self.window - .next(candidate_lmer) - .map(|minimizer| murmur_hash3(minimizer ^ self.meros.toggle_mask)) - }), - None => { - self.clear(); - None + if ch == b'\n' || ch == b'\r' { + None + } else { + match char_to_value(ch) { + Some(code) => self.cursor.next_lmer(code).and_then(|lmer| { + let candidate_lmer: u64 = to_candidate_lmer(&self.meros, lmer); + self.window + .next(candidate_lmer) + .map(|minimizer| murmur_hash3(minimizer ^ self.meros.toggle_mask)) + }), + None => { + self.clear(); + None + } } } - // } }) - .collect() + .collect(); + + Marker::new(seq.len(), minimizer) + } + + pub fn iter(&mut self) -> BaseType { + self.seq.apply(|seq| self.iter_seq(seq)) } } diff --git a/seqkmer/src/parallel.rs b/seqkmer/src/parallel.rs index 505b3d1..644c6ee 100644 --- a/seqkmer/src/parallel.rs +++ b/seqkmer/src/parallel.rs @@ -1,83 +1,86 @@ -use crate::mmscanner::MinimizerScanner; -use crate::reader::Reader; +use crate::reader::{Reader, SeqMer}; use crate::seq::Sequence; use crate::Meros; -use crossbeam_channel::bounded; +use crossbeam_channel::{bounded, Receiver, RecvError}; use scoped_threadpool::Pool; -use std::fs::File; -use std::io::Read; -use std::io::{self, BufWriter, Result, Write}; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::io::Result; use std::sync::Arc; -pub fn read_parallel( - reader: &mut dyn Reader, +pub struct ParallelResult

+where + P: Send, +{ + recv: Receiver

, +} + +impl

ParallelResult

+where + P: Send, +{ + #[inline] + pub fn next(&mut self) -> std::result::Result { + self.recv.recv() + } +} + +pub fn read_parallel( + reader: &mut Box, n_threads: usize, buffer_len: usize, - output_file: Option<&PathBuf>, meros: Meros, work: W, + func: F, ) -> Result<()> where - R: Read + Send, - W: Send + Sync + Fn(Vec, Sequence) -> Option, + O: Send, + Out: Send + Default, + W: Send + Sync + Fn(Vec) -> Option, + F: FnOnce(&mut ParallelResult>) -> Out + Send, { + assert!(n_threads > 2); assert!(n_threads <= buffer_len); - let (sender, receiver) = bounded::(buffer_len); + let (sender, receiver) = bounded::>(buffer_len); + let (done_send, done_recv) = bounded::>(buffer_len); let receiver = Arc::new(receiver); // 使用 Arc 来共享 receiver - let mut pool = Pool::new(10); - - let counter = Arc::new(AtomicUsize::new(0)); + let done_send = Arc::new(done_send); + let mut pool = Pool::new(n_threads as u32); - let mut writer: Box = match output_file { - Some(file_name) => { - let file = File::create(file_name)?; - Box::new(BufWriter::new(file)) as Box - } - None => Box::new(io::stdout()) as Box, - }; + let mut parallel_result = ParallelResult { recv: done_recv }; - let _ = pool.scoped(|pool_scope| -> Result<()> { + pool.scoped(|pool_scope| { // 生产者线程 pool_scope.execute(move || { - while let Some(seq) = reader.next().unwrap() { - sender.send(seq).unwrap(); + while let Ok(Some(seqs)) = reader.next() { + sender.send(seqs).expect("Failed to send sequences"); } }); // 消费者线程 - for i in 0..n_threads { + for _ in 0..n_threads - 2 { let receiver = Arc::clone(&receiver); - let counter_clone = Arc::clone(&counter); let work = &work; - - let mut temp_writer: Box = match output_file { - Some(file_name) => { - let parent_dir = file_name.parent().unwrap_or_else(|| Path::new("")); - let file_name = file_name.file_name().unwrap().to_str().unwrap(); - let filename = parent_dir.join(format!("{}.tmp.{}", file_name, i)); - let file = File::create(filename)?; - Box::new(BufWriter::new(file)) as Box - } - None => Box::new(io::stdout()) as Box, - }; + let done_send = Arc::clone(&done_send); pool_scope.execute(move || { - while let Ok(seq) = receiver.recv() { - counter_clone.fetch_add(1, Ordering::Relaxed); - let mut ms = MinimizerScanner::new(&seq.seq, meros); - let res = ms.iter(); - if let Some(out) = work(res, seq) { - temp_writer - .write_all(out.as_bytes()) - .expect("write data error"); - } + while let Ok(seqs) = receiver.recv() { + let seq_mers: Vec = seqs + .iter() + .map(|seq| SeqMer::from_seq(seq, meros)) + .collect(); + + let output = work(seq_mers); + done_send.send(output).expect("Failed to send outputs"); } }); } + + // 引用计数减掉一个,这样都子线程结束时, done_send还能完全释放 + drop(done_send); + pool_scope.execute(move || { + let _ = func(&mut parallel_result); + }); + pool_scope.join_all(); - Ok(()) }); - println!("counter {:?}", counter.load(Ordering::Relaxed)); + Ok(()) } diff --git a/seqkmer/src/reader.rs b/seqkmer/src/reader.rs index c7bc395..04c27e1 100644 --- a/seqkmer/src/reader.rs +++ b/seqkmer/src/reader.rs @@ -1,4 +1,5 @@ -use crate::seq::Sequence; +use crate::seq::{BaseType, Marker, Sequence}; +use crate::{mmscanner::MinimizerScanner, Meros}; use flate2::read::GzDecoder; use std::fs::File; use std::io::{self, Read, Result, Seek}; @@ -32,13 +33,45 @@ pub fn open_file>(path: P) -> Result { } pub fn trim_end(buffer: &mut Vec) { - while let Some(&b'\n' | &b'\r') = buffer.last() { + while let Some(&b'\n' | &b'\r' | &b'>' | &b'@') = buffer.last() { buffer.pop(); } } -pub const BUFSIZE: usize = 8 * 1024 * 1024; +pub const BUFSIZE: usize = 16 * 1024 * 1024; -pub trait Reader: Send { - fn next(&mut self) -> Result>; +pub trait Reader: Send { + fn next(&mut self) -> Result>>; +} + +#[derive(Debug, Clone)] +pub struct SeqMer { + pub id: String, + pub file_index: usize, + pub reads_index: usize, + pub marker: BaseType, +} + +impl SeqMer { + pub fn from_seq(seq: &Sequence, meros: Meros) -> Self { + let mut ms = MinimizerScanner::new(&seq.seq, meros); + let marker = ms.iter(); + Self { + marker, + id: seq.id.clone(), + file_index: seq.file_index, + reads_index: seq.reads_index, + } + } + + pub fn size_str(&self) -> BaseType { + self.marker.apply(|marker| marker.cap.to_string()) + } + + pub fn fmt_size(&self) -> String { + match &self.marker { + BaseType::Single(marker1) => marker1.cap.to_string(), + BaseType::Pair((marker1, marker2)) => format!("{}:{}", marker1.cap, marker2.cap), + } + } } diff --git a/seqkmer/src/seq.rs b/seqkmer/src/seq.rs index bfd4de8..c0e2840 100644 --- a/seqkmer/src/seq.rs +++ b/seqkmer/src/seq.rs @@ -1,14 +1,56 @@ #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum SeqFormat { - FASTA, - FASTQ, + Fasta, + Fastq, + PairFastq, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BaseType { + Single(U), + Pair((U, U)), +} + +impl BaseType { + // 泛型方法,根据序列类型执行操作 + pub fn apply(&self, mut func: F) -> BaseType + where + F: FnMut(&T) -> U, + { + match self { + BaseType::Single(seq) => BaseType::Single(func(seq)), + BaseType::Pair((seq1, seq2)) => BaseType::Pair((func(seq1), func(seq2))), + } + } +} + +impl BaseType> { + pub fn len(&self) -> BaseType { + self.apply(|seq| seq.len()) + } +} + +#[derive(Debug, Clone)] +pub struct Marker { + pub cap: usize, + pub minimizer: Vec, +} + +impl Marker { + pub fn new(cap: usize, minimizer: Vec) -> Self { + Self { cap, minimizer } + } + + pub fn size(&self) -> usize { + self.minimizer.len() + } } #[derive(Debug, Clone)] pub struct Sequence { - pub file_index: u64, - pub reads_index: u64, + pub file_index: usize, + pub reads_index: usize, pub id: String, - pub seq: Vec, + pub seq: BaseType>, pub format: SeqFormat, }