-
Notifications
You must be signed in to change notification settings - Fork 4
/
manysearch.rs
242 lines (215 loc) · 9.25 KB
/
manysearch.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/// multisearch: massively search of many queries against many large subjects.
/// the OG MAGsearch, branchwater, etc.
///
/// Note: this function loads all _queries_ into memory, and iterates over
/// database once.
use anyhow::Result;
use rayon::prelude::*;
use stats::{median, stddev};
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use crate::utils::{csvwriter_thread, load_collection, load_sketches, ReportType, SearchResult};
use sourmash::ani_utils::ani_from_containment;
use sourmash::errors::SourmashError;
use sourmash::selection::Selection;
use sourmash::signature::SigsTrait;
use sourmash::sketch::minhash::KmerMinHash;
pub fn manysearch(
query_filepath: String,
against_filepath: String,
selection: &Selection,
threshold: f64,
output: Option<String>,
allow_failed_sigpaths: bool,
ignore_abundance: bool,
) -> Result<()> {
// Load query collection
let query_collection = load_collection(
&query_filepath,
selection,
ReportType::Query,
allow_failed_sigpaths,
)?;
// load all query sketches into memory, downsampling on the way
let query_sketchlist = load_sketches(query_collection, selection, ReportType::Query).unwrap();
// Against: Load all _paths_, not signatures, into memory.
let against_collection = load_collection(
&against_filepath,
selection,
ReportType::Against,
allow_failed_sigpaths,
)?;
// set up a multi-producer, single-consumer channel.
let (send, recv) = std::sync::mpsc::sync_channel::<SearchResult>(rayon::current_num_threads());
// & spawn a thread that is dedicated to printing to a buffered output
let thrd = csvwriter_thread(recv, output);
//
// Main loop: iterate (in parallel) over all search signature paths,
// loading them individually and searching them. Stuff results into
// the writer thread above.
//
let processed_sigs = AtomicUsize::new(0);
let skipped_paths = AtomicUsize::new(0);
let failed_paths = AtomicUsize::new(0);
let send = against_collection
.par_iter()
.filter_map(|(_idx, record)| {
let i = processed_sigs.fetch_add(1, atomic::Ordering::SeqCst);
if i % 1000 == 0 && i > 0 {
eprintln!("Processed {} search sigs", i);
}
let mut results = vec![];
// against downsampling happens here
match against_collection.sig_from_record(record) {
Ok(against_sig) => {
if let Some(against_mh) = against_sig.minhash() {
for query in query_sketchlist.iter() {
// avoid calculating details unless there is overlap
let overlap = query
.minhash
.count_common(against_mh, true)
.expect("incompatible sketches")
as f64;
let query_size = query.minhash.size() as f64;
let containment_query_in_target = overlap / query_size;
// only calculate results if we have shared hashes
if containment_query_in_target > threshold {
let target_size = against_mh.size() as f64;
let containment_target_in_query = overlap / target_size;
let max_containment =
containment_query_in_target.max(containment_target_in_query);
let jaccard = overlap / (target_size + query_size - overlap);
let qani = ani_from_containment(
containment_query_in_target,
against_mh.ksize() as f64,
);
let mani = ani_from_containment(
containment_target_in_query,
against_mh.ksize() as f64,
);
let query_containment_ani = Some(qani);
let match_containment_ani = Some(mani);
let average_containment_ani = Some((qani + mani) / 2.);
let max_containment_ani = Some(f64::max(qani, mani));
let calc_abund_stats =
against_mh.track_abundance() && !ignore_abundance;
let (
total_weighted_hashes,
n_weighted_found,
average_abund,
median_abund,
std_abund,
) = if calc_abund_stats {
downsample_and_inflate_abundances(&query.minhash, against_mh)
.ok()?
} else {
(None, None, None, None, None)
};
results.push(SearchResult {
query_name: query.name.clone(),
query_md5: query.md5sum.clone(),
match_name: against_sig.name(),
containment: containment_query_in_target,
intersect_hashes: overlap as usize,
match_md5: Some(against_sig.md5sum()),
jaccard: Some(jaccard),
max_containment: Some(max_containment),
average_abund,
median_abund,
std_abund,
query_containment_ani,
match_containment_ani,
average_containment_ani,
max_containment_ani,
n_weighted_found,
total_weighted_hashes,
});
}
}
} else {
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
Err(err) => {
eprintln!("Sketch loading error: {}", err);
eprintln!(
"WARNING: no compatible sketches in path '{}'",
record.internal_location()
);
let _ = skipped_paths.fetch_add(1, atomic::Ordering::SeqCst);
}
}
Some(results)
})
.flatten()
.try_for_each_with(send, |s, m| s.send(m));
// do some cleanup and error handling -
if let Err(e) = send {
eprintln!("Unable to send internal data: {:?}", e);
}
if let Err(e) = thrd.join() {
eprintln!("Unable to join internal thread: {:?}", e);
}
// done!
let i: usize = processed_sigs.fetch_max(0, atomic::Ordering::SeqCst);
eprintln!("DONE. Processed {} search sigs", i);
let skipped_paths = skipped_paths.load(atomic::Ordering::SeqCst);
let failed_paths = failed_paths.load(atomic::Ordering::SeqCst);
if skipped_paths > 0 {
eprintln!(
"WARNING: skipped {} search paths - no compatible signatures.",
skipped_paths
);
}
if failed_paths > 0 {
eprintln!(
"WARNING: {} search paths failed to load. See error messages above.",
failed_paths
);
}
Ok(())
}
fn downsample_and_inflate_abundances(
query: &KmerMinHash,
against: &KmerMinHash,
) -> Result<
(
Option<usize>,
Option<usize>,
Option<f64>,
Option<f64>,
Option<f64>,
),
SourmashError,
> {
let query_scaled = query.scaled();
let against_scaled = against.scaled();
let abunds: Vec<u64>;
let sum_weighted: u64;
let sum_all_abunds: usize;
// avoid downsampling if we can
if against_scaled != query_scaled {
let against_ds = against
.downsample_scaled(query.scaled())
.expect("cannot downsample sketch");
(abunds, sum_weighted) = query.inflated_abundances(&against_ds)?;
sum_all_abunds = against_ds.sum_abunds() as usize;
} else {
(abunds, sum_weighted) = query.inflated_abundances(against)?;
sum_all_abunds = against.sum_abunds() as usize;
}
let average_abund = sum_weighted as f64 / abunds.len() as f64;
let median_abund = median(abunds.iter().cloned()).expect("error");
let std_abund = stddev(abunds.iter().cloned());
Ok((
Some(sum_all_abunds),
Some(sum_weighted as usize),
Some(average_abund),
Some(median_abund),
Some(std_abund),
))
}