Skip to content

Commit

Permalink
Make use of faster preset and multithreaded minimap2
Browse files Browse the repository at this point in the history
  • Loading branch information
Ge0rges committed Mar 16, 2024
1 parent b94a60f commit 18bc3bf
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ bio = "1.3.1"
clap = { version = "4.2.1", features = ["derive"] }
rayon = "1.7.0"
approx = "0.5.1"
minimap2 = "0.1.15+minimap2.26"
minimap2 = "0.1.17+minimap2.2.27"
84 changes: 48 additions & 36 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ fn is_file(pathname: &str) -> Result<(), String> {

fn main() {
let args = Cli::parse();
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build_global()
.expect("Error: Unable to build threadpool");

filter(&mut io::stdin(), args);
}

Expand All @@ -74,47 +79,53 @@ where
Some(ref fas) => {
is_file(fas)
.unwrap_or_else(|_| panic!("Fasta file for filtering contaminants is invalid",));
let mut total_reads = 0;
let mut output_reads = 0;
let aligner = setup_contamination_filter(fas);
fastq::Reader::new(input).records().for_each(|record| {
let record = record.expect("ERROR: problem parsing fastq record");
total_reads += 1;
if !record.is_empty() {
let read_len = record.seq().len();
// If a read is shorter than what is to be cropped the read is dropped entirely (filtered out)
if args.headcrop + args.tailcrop < read_len {
let average_quality =
ave_qual(&record.qual().iter().map(|i| i - 33).collect::<Vec<u8>>());
if (!args.inverse
&& average_quality >= args.minqual
&& average_quality <= args.maxqual
&& read_len >= args.minlength
&& read_len <= args.maxlength
&& !is_contamination(&record.seq(), &aligner))
|| (args.inverse
&& (average_quality < args.minqual
|| average_quality > args.maxqual
|| read_len < args.minlength
|| read_len > args.maxlength
|| is_contamination(&record.seq(), &aligner)))
{
write_record(record, &args, read_len);
output_reads += 1;

let total_reads_ = Arc::new(AtomicUsize::new(0));
let output_reads_ = Arc::new(AtomicUsize::new(0));

let aligner = setup_contamination_filter(fas, &args.threads);
fastq::Reader::new(input)
.records()
.par_bridge()
.for_each(|record| {
let record = record.expect("ERROR: problem parsing fastq record");
total_reads_.fetch_add(1, Ordering::SeqCst);
if !record.is_empty() {
let read_len = record.seq().len();
// If a read is shorter than what is to be cropped the read is dropped entirely (filtered out)
if args.headcrop + args.tailcrop < read_len {
let average_quality = ave_qual(
&record.qual().iter().map(|i| i - 33).collect::<Vec<u8>>(),
);
if (!args.inverse
&& average_quality >= args.minqual
&& average_quality <= args.maxqual
&& read_len >= args.minlength
&& read_len <= args.maxlength
&& !is_contamination(&record.seq(), &aligner))
|| (args.inverse
&& (average_quality < args.minqual
|| average_quality > args.maxqual
|| read_len < args.minlength
|| read_len > args.maxlength
|| is_contamination(&record.seq(), &aligner)))
{
write_record(record, &args, read_len);
output_reads_.fetch_add(1, Ordering::SeqCst);
}
}
}
}
});
});

let output_reads = output_reads_.load(Ordering::SeqCst);
let total_reads = total_reads_.load(Ordering::SeqCst);
eprintln!("Kept {output_reads} reads out of {total_reads} reads");
}

None => {
let total_reads_ = Arc::new(AtomicUsize::new(0));
let output_reads_ = Arc::new(AtomicUsize::new(0));
rayon::ThreadPoolBuilder::new()
.num_threads(args.threads)
.build_global()
.expect("Error: Unable to build threadpool");

fastq::Reader::new(input)
.records()
.par_bridge()
Expand Down Expand Up @@ -145,6 +156,7 @@ where
}
}
});

let output_reads = output_reads_.load(Ordering::SeqCst);
let total_reads = total_reads_.load(Ordering::SeqCst);
eprintln!("Kept {output_reads} reads out of {total_reads} reads");
Expand Down Expand Up @@ -181,11 +193,11 @@ fn ave_qual(quals: &[u8]) -> f64 {
(probability_sum / quals.len() as f64).log10() * -10.0
}

fn setup_contamination_filter(contam_fasta: &str) -> Aligner {
fn setup_contamination_filter(contam_fasta: &str, threads: &usize) -> Aligner {
Aligner::builder()
.with_threads(8)
.preset(Preset::LrHq)
.with_threads(*threads)
.with_cigar()
.map_ont()
.with_index(contam_fasta, None)
.expect("Unable to build index")
}
Expand Down

0 comments on commit 18bc3bf

Please sign in to comment.