Skip to content

Commit

Permalink
seq kmer
Browse files Browse the repository at this point in the history
  • Loading branch information
dagou committed Jun 21, 2024
1 parent 5ccdfd0 commit e6dfa89
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 642 deletions.
45 changes: 27 additions & 18 deletions kr2r/src/bin/classify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use kr2r::readcounts::{TaxonCounters, TaxonCountersDash};
use kr2r::report::report_kraken_style;
use kr2r::taxonomy::Taxonomy;
use kr2r::utils::{create_sample_file, find_and_sort_files, get_lastest_file_index};
use kr2r::IndexOptions;
use seqkmer::{create_reader, read_parallel, Base, HitGroup, Meros, MinimizerIterator, Reader};
use kr2r::{HitGroup, IndexOptions};
use seqkmer::{read_parallel, Base, FastxReader, Meros, MinimizerIterator, OptionPair, Reader};
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufWriter, Write};
Expand Down Expand Up @@ -86,14 +86,14 @@ pub struct Args {
}

fn process_seq(
rows: &mut Vec<Row>,
m_iter: &mut MinimizerIterator,
hash_config: &HashConfig,
chtable: &CHTable,
) -> HitGroup<Row> {
offset: usize,
) -> usize {
let chunk_size = hash_config.hash_capacity;
let value_bits = hash_config.value_bits;
let mut rows = Vec::new();

let data: Vec<(usize, u64)> = m_iter.collect();
for (sort, hash_key) in data {
let (idx, compacted) = hash_config.compact(hash_key);
Expand All @@ -103,12 +103,11 @@ fn process_seq(
let taxid = chtable.get_from_page(index, compacted, partition_index + 1);
if taxid > 0 {
let high = u32::combined(compacted, taxid, value_bits);
let row = Row::new(high, 0, sort as u32 + 1);
let row = Row::new(high, 0, sort as u32 + 1 + offset as u32);
rows.push(row);
}
}

HitGroup::new(m_iter.size, rows, 0)
m_iter.size + offset
}

fn process_record(
Expand All @@ -120,23 +119,31 @@ fn process_record(
cur_taxon_counts: &TaxonCountersDash,
classify_counter: &AtomicUsize,
) -> String {
let id = &marker.header.id;
let hits = marker
.body
.apply_mut(|m_iter| process_seq(m_iter, &hash_config, chtable));
let total_kmers = hits.total_marker_size();
let id = &marker.header.id.clone();
let rows: Vec<Row> = marker
.fold(|rows, m_iter, offset| process_seq(rows, m_iter, &hash_config, chtable, offset));

let hits = HitGroup::new(rows, marker.range());

let seq_len_str = marker.fmt_seq_size();

let required_score = hits.required_score(args.confidence_threshold);
let hit_data = process_hitgroup(
&hits,
hash_config,
taxonomy,
cur_taxon_counts,
classify_counter,
total_kmers,
args.confidence_threshold,
required_score,
args.minimum_hit_groups,
hash_config.value_mask,
);

hit_data.3.iter().for_each(|(key, value)| {
cur_taxon_counts
.entry(*key)
.or_default()
.merge(value)
.unwrap();
});
format!(
"{}\t{}\t{}\t{}\t{}\n",
hit_data.0, id, hit_data.1, seq_len_str, hit_data.2
Expand Down Expand Up @@ -269,7 +276,9 @@ fn process_files(
file_writer.flush().unwrap();

let score = args.minimum_quality_score;
let mut reader: Box<dyn Reader> = create_reader(file_pair, file_index, score)?;
let paths = OptionPair::from_slice(file_pair);
let mut reader = FastxReader::from_paths(paths, file_index, score)?;
// let mut reader = create_reader(file_pair, file_index, score)?;
let (thread_sequences, thread_unclassified) = process_fastx_file(
&args,
meros,
Expand Down
18 changes: 8 additions & 10 deletions kr2r/src/bin/estimate_capacity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,14 @@ fn process_sequence(
let mut minimizer_set = HashSet::new();

for record in record_set {
record
.body
.fold(&mut minimizer_set, |minimizer_set, m_iter| {
let kmer_iter: HashSet<u64> = m_iter
.filter(|(_, hash_key)| *hash_key & RANGE_MASK < range_n)
.map(|(_, hash_key)| hash_key)
.collect();

minimizer_set.extend(kmer_iter);
});
record.body.apply_mut(|m_iter| {
let kmer_iter: HashSet<u64> = m_iter
.filter(|(_, hash_key)| *hash_key & RANGE_MASK < range_n)
.map(|(_, hash_key)| hash_key)
.collect();

minimizer_set.extend(kmer_iter);
});
}
Some(minimizer_set)
},
Expand Down
81 changes: 49 additions & 32 deletions kr2r/src/bin/resolve.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use clap::Parser;
use dashmap::{DashMap, DashSet};
use kr2r::classify::{add_hitlist_string, count_values, resolve_tree};
use kr2r::classify::process_hitgroup;
use kr2r::compact_hash::{HashConfig, Row};
use kr2r::readcounts::{TaxonCounters, TaxonCountersDash};
use kr2r::report::report_kraken_style;
use kr2r::taxonomy::Taxonomy;
use kr2r::utils::{find_and_sort_files, open_file};
use kr2r::HitGroup;
use rayon::prelude::*;
use seqkmer::trim_pair_info;
use seqkmer::{trim_pair_info, OptionPair};
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead, BufReader, BufWriter, Read, Result, Write};
Expand All @@ -20,7 +21,7 @@ const BATCH_SIZE: usize = 8 * 1024 * 1024;

pub fn read_id_to_seq_map<P: AsRef<Path>>(
filename: P,
) -> Result<DashMap<u32, (String, String, u32, Option<u32>)>> {
) -> Result<DashMap<u32, (String, String, usize, Option<usize>)>> {
let file = open_file(filename)?;
let reader = BufReader::new(file);
let id_map = DashMap::new();
Expand All @@ -35,9 +36,9 @@ pub fn read_id_to_seq_map<P: AsRef<Path>>(
let seq_id = parts[1].to_string();
let seq_size = parts[2].to_string();
let count_parts: Vec<&str> = parts[3].split('|').collect();
let kmer_count1 = count_parts[0].parse::<u32>().unwrap();
let kmer_count1 = count_parts[0].parse::<usize>().unwrap();
let kmer_count2 = if count_parts.len() > 1 {
count_parts[1].parse::<u32>().map_or(None, |i| Some(i))
count_parts[1].parse::<usize>().map_or(None, |i| Some(i))
} else {
None
};
Expand Down Expand Up @@ -106,7 +107,7 @@ fn process_batch<P: AsRef<Path>>(
sample_file: P,
args: &Args,
taxonomy: &Taxonomy,
id_map: &DashMap<u32, (String, String, u32, Option<u32>)>,
id_map: &DashMap<u32, (String, String, usize, Option<usize>)>,
writer: &Mutex<Box<dyn Write + Send>>,
value_mask: usize,
) -> Result<(TaxonCountersDash, usize, DashSet<u32>)> {
Expand Down Expand Up @@ -148,39 +149,50 @@ fn process_batch<P: AsRef<Path>>(
hit_counts.into_par_iter().for_each(|(k, mut rows)| {
if let Some(item) = id_map.get(&k) {
rows.sort_unstable();
let total_kmers: usize = item.2 as usize + item.3.unwrap_or(0) as usize;
let dna_id = trim_pair_info(&item.0);
let (counts, cur_counts, hit_groups) = count_values(&rows, value_mask, item.2);
let hit_string = add_hitlist_string(&rows, value_mask, item.2, item.3, taxonomy);
let mut call = resolve_tree(&counts, taxonomy, total_kmers, confidence_threshold);
if call > 0 && hit_groups < minimum_hit_groups {
call = 0;
};

cur_counts.iter().for_each(|entry| {
let range = OptionPair::from(((0, item.2), item.3.map(|size| (item.2, size + item.2))));
let hits = HitGroup::new(rows, range);

let hit_data = process_hitgroup(
&hits,
taxonomy,
&classify_counter,
hits.required_score(confidence_threshold),
minimum_hit_groups,
value_mask,
);
// let (counts, cur_counts, hit_groups) = count_values(&rows, value_mask, item.2);
// let hit_string = add_hitlist_string(&rows, value_mask, item.2, item.3, taxonomy);
// let require_score = (confidence_threshold * total_kmers as f64).ceil() as u64;
// let mut call = resolve_tree(&counts, taxonomy, require_score);
// if call > 0 && hit_groups < minimum_hit_groups {
// call = 0;
// };

hit_data.3.iter().for_each(|(key, value)| {
cur_taxon_counts
.entry(*entry.key())
.entry(*key)
.or_default()
.merge(entry.value())
.merge(value)
.unwrap();
});

let ext_call = taxonomy.nodes[call as usize].external_id;
let clasify = if call > 0 {
classify_counter.fetch_add(1, Ordering::SeqCst);
cur_taxon_counts
.entry(call as u64)
.or_default()
.increment_read_count();

"C"
} else {
"U"
};
// let ext_call = taxonomy.nodes[call as usize].external_id;
// let clasify = if call > 0 {
// classify_counter.fetch_add(1, Ordering::SeqCst);
// cur_taxon_counts
// .entry(call as u64)
// .or_default()
// .increment_read_count();

// "C"
// } else {
// "U"
// };
// 使用锁来同步写入
let output_line = format!(
"{}\t{}\t{}\t{}\t{}\n",
clasify, dna_id, ext_call, item.1, hit_string
hit_data.0, dna_id, hit_data.1, item.1, hit_data.2
);
let mut file = writer.lock().unwrap();
file.write_all(output_line.as_bytes()).unwrap();
Expand Down Expand Up @@ -241,8 +253,13 @@ pub fn run(args: Args) -> Result<()> {
.filter(|item| !hit_seq_set.contains(item.key()))
.for_each(|item| {
let dna_id = trim_pair_info(&item.0);
let hit_string = add_hitlist_string(&vec![], value_mask, item.2, item.3, &taxo);
let output_line = format!("U\t{}\t0\t{}\t{}\n", dna_id, item.1, hit_string);
let output_line = format!(
"U\t{}\t0\t{}\t{}\n",
dna_id,
item.1,
if item.3.is_none() { "" } else { " |:| " }
);

let mut file = writer.lock().unwrap();
file.write_all(output_line.as_bytes()).unwrap();
});
Expand Down
14 changes: 8 additions & 6 deletions kr2r/src/bin/splitr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kr2r::utils::{
get_lastest_file_index,
};
use kr2r::IndexOptions;
use seqkmer::{create_reader, read_parallel, Meros, MinimizerIterator, Reader};
use seqkmer::{read_parallel, FastxReader, Meros, MinimizerIterator, OptionPair, Reader};
use std::fs;
use std::io::{BufWriter, Write};
use std::io::{Error, ErrorKind, Result};
Expand Down Expand Up @@ -159,11 +159,12 @@ where
let header = &seq.header;
let index = header.reads_index;
let dna_id = header.id.clone();
seq.body.fold(&mut init, |init, mut m_iter| {
let seq_id = (file_index << 32 | index) as u64;
let seq_id = (file_index << 32 | index) as u64;

seq.body.apply_mut(|m_iter| {
process_record(
init,
&mut m_iter,
&mut init,
m_iter,
&hash_config,
chunk_size,
seq_id,
Expand Down Expand Up @@ -221,7 +222,8 @@ fn convert(args: Args, meros: Meros, hash_config: HashConfig) -> Result<()> {
create_sample_file(args.chunk_dir.join(format!("sample_id_{}.map", file_index)));

let score = args.minimum_quality_score;
let mut reader: Box<dyn Reader> = create_reader(file_pair, file_index, score)?;
let paths = OptionPair::from_slice(file_pair);
let mut reader = FastxReader::from_paths(paths, file_index, score)?;
process_fastx_file(
&args,
meros,
Expand Down
Loading

0 comments on commit e6dfa89

Please sign in to comment.