Skip to content

Commit

Permalink
Rework parallelism in directory iterator.
Browse files Browse the repository at this point in the history
Previously, ignore::WalkParallel would invoke the callback for all
*explicitly* given file paths in a single thread, which effectively
meant that `rg pattern foo bar baz ...` didn't actually search foo, bar
and baz in parallel.

The code was structured that way to avoid spinning up workers if no
directory paths were given. The original intention was probably to have
a separate pool of threads responsible for searching, but ripgrep ended
up just reusing the ignore::WalkParallel workers themselves for searching,
and thereby subjected to its sub-par performance in this case.

The code has been restructured so that file paths are sent to the workers,
which brings back parallelism.

Fixes #226
  • Loading branch information
BurntSushi committed Nov 9, 2016
1 parent 2dce0dc commit 5b73dcc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
61 changes: 34 additions & 27 deletions ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,40 +747,33 @@ impl WalkParallel {
let mut f = mkf();
let threads = self.threads();
let queue = Arc::new(MsQueue::new());
let mut any_dirs = false;
let mut any_work = false;
// Send the initial set of root paths to the pool of workers.
// Note that we only send directories. For files, we send to them the
// callback directly.
for path in self.paths {
if path == Path::new("-") {
if f(Ok(DirEntry::new_stdin())).is_quit() {
return;
}
continue;
}
let dent = match DirEntryRaw::from_path(0, path) {
Ok(dent) => DirEntry::new_raw(dent, None),
Err(err) => {
if f(Err(err)).is_quit() {
return;
let dent =
if path == Path::new("-") {
DirEntry::new_stdin()
} else {
match DirEntryRaw::from_path(0, path) {
Ok(dent) => DirEntry::new_raw(dent, None),
Err(err) => {
if f(Err(err)).is_quit() {
return;
}
continue;
}
}
continue;
}
};
if !dent.file_type().map_or(false, |t| t.is_dir()) {
if f(Ok(dent)).is_quit() {
return;
}
} else {
any_dirs = true;
queue.push(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
}));
}
};
queue.push(Message::Work(Work {
dent: dent,
ignore: self.ig_root.clone(),
}));
any_work = true;
}
// ... but there's no need to start workers if we don't need them.
if !any_dirs {
if !any_work {
return;
}
// Create the workers and then wait for them to finish.
Expand Down Expand Up @@ -839,6 +832,11 @@ struct Work {
}

impl Work {
/// Returns true if and only if this work item is a directory.
fn is_dir(&self) -> bool {
self.dent.file_type().map_or(false, |t| t.is_dir())
}

/// Adds ignore rules for parent directories.
///
/// Note that this only applies to entries at depth 0. On all other
Expand Down Expand Up @@ -921,6 +919,15 @@ impl Worker {
fn run(mut self) {
while let Some(mut work) = self.get_work() {
let depth = work.dent.depth();
// If this is an explicitly given path and is not a directory,
// then execute the caller's callback and move on.
if depth == 0 && !work.is_dir() {
if (self.f)(Ok(work.dent)).is_quit() {
self.quit_now();
return;
}
continue;
}
if self.parents {
if let Some(err) = work.add_parents() {
if (self.f)(Err(err)).is_quit() {
Expand Down
3 changes: 2 additions & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp;
use std::env;
use std::io;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -376,7 +377,7 @@ impl RawArgs {
};
let threads =
if self.flag_threads == 0 {
num_cpus::get()
cmp::min(12, num_cpus::get())
} else {
self.flag_threads
};
Expand Down

0 comments on commit 5b73dcc

Please sign in to comment.