Skip to content

Commit

Permalink
Parallelise file contents fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
illicitonion committed Nov 28, 2017
1 parent 7f9bd7f commit 9726abe
Showing 1 changed file with 115 additions and 44 deletions.
159 changes: 115 additions & 44 deletions src/rust/engine/fs/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,53 +130,81 @@ impl Snapshot {
.to_boxed()
}

// TODO: Rewrite this to execute in parallel
pub fn contents(self, store: Arc<Store>) -> BoxFuture<Vec<FileContent>, String> {
futures::future::done(Snapshot::contents_for_directory_sync(
self.digest.0,
store,
PathBuf::from(""),
)).to_boxed()
Snapshot::contents_for_directory_helper(self.digest.0, store, PathBuf::from(""))
.map(|mut v| {
v.sort_by(|a, b| a.path.cmp(&b.path));
v
})
.to_boxed()
}

fn contents_for_directory_sync(
// Assumes that all fingerprints it encounters are valid.
// Returns an unsorted Vec.
fn contents_for_directory_helper(
fingerprint: Fingerprint,
store: Arc<Store>,
path_so_far: PathBuf,
) -> Result<Vec<FileContent>, String> {
let directory = store
) -> BoxFuture<Vec<FileContent>, String> {
store
.load_directory_proto(fingerprint)
.wait()
.unwrap()
.ok_or_else(|| format!("Could not find snapshot {}", fingerprint))?;

let mut contents = Vec::new();

for file_node in directory.get_files() {
contents.push(FileContent {
path: path_so_far.join(file_node.get_name()),
content: store
.load_file_bytes(
Fingerprint::from_hex_string(file_node.get_digest().get_hash()).unwrap(),
)
.wait()
.unwrap()
.unwrap(),
});
}

for dir_node in directory.get_directories() {
contents.extend(
Snapshot::contents_for_directory_sync(
Fingerprint::from_hex_string(dir_node.get_digest().get_hash()).unwrap(),
store.clone(),
path_so_far.join(dir_node.get_name()),
).unwrap(),
);
}

contents.sort_by(|a, b| a.path.cmp(&b.path));
Ok(contents)
.and_then(move |maybe_dir| {
maybe_dir.ok_or_else(|| {
format!("Could not find directory with fingerprint {}", fingerprint)
})
})
.and_then(move |dir| {
let file_futures = join_all(
dir
.get_files()
.iter()
.map(|file_node| {
let path = path_so_far.join(file_node.get_name());
let maybe_bytes =
store.load_file_bytes(
Fingerprint::from_hex_string(file_node.get_digest().get_hash()).unwrap(),
);
futures::future::ok(path).join(maybe_bytes)
})
.collect::<Vec<_>>(),
);
let dir_futures = join_all(
dir
.get_directories()
.iter()
.map(|dir_node| {
Snapshot::contents_for_directory_helper(
Fingerprint::from_hex_string(dir_node.get_digest().get_hash()).unwrap(),
store.clone(),
path_so_far.join(dir_node.get_name()),
)
})
.collect::<Vec<_>>(),
);
file_futures.join(dir_futures)
})
.and_then(
move |(paths_and_maybe_byteses, dirs): (Vec<(PathBuf, Option<Vec<u8>>)>,
Vec<Vec<FileContent>>)| {
join_all(
paths_and_maybe_byteses
.into_iter()
.map(|(path, maybe_bytes)| {
maybe_bytes
.ok_or_else(|| format!("Couldn't find file contents for {:?}", path))
.map(|content| FileContent { path, content })
})
.collect::<Vec<Result<FileContent, _>>>(),
).join(futures::future::ok(dirs))
},
)
.map(|(mut files, dirs)| {
for mut dir in dirs.into_iter() {
files.append(&mut dir)
}
files
})
.to_boxed()
}
}

Expand Down Expand Up @@ -240,6 +268,8 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;

const AGGRESSIVE: &str = "Aggressive";
const LATIN: &str = "Chaetophractus villosus";
const STR: &str = "European Burmese";

fn setup() -> (Arc<Store>, TempDir, Arc<PosixFS>, FileSaver) {
Expand Down Expand Up @@ -335,17 +365,14 @@ mod tests {
);
}

// TODO: Write more tests which involve multiple files, and directories, and stuff.
// I will do this before merging this PR, but want to check the approach before doing so.
#[test]
fn contents_for_one_file() {
let (store, dir, posix_fs, digester) = setup();

let file_name = PathBuf::from("roland");
make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600);

let path_stats = expand_all_sorted(posix_fs);
let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats.clone())
let contents = Snapshot::from_path_stats(store.clone(), digester, expand_all_sorted(posix_fs))
.wait()
.unwrap()
.contents(store)
Expand All @@ -357,6 +384,50 @@ mod tests {
assert_eq!(contents.get(0).unwrap().content, STR.as_bytes().to_vec());
}

#[test]
fn contents_for_files_in_multiple_directories() {
let (store, dir, posix_fs, digester) = setup();

let armadillos = PathBuf::from("armadillos");
let armadillos_abs = dir.path().join(&armadillos);
std::fs::create_dir_all(&armadillos_abs).unwrap();
let amy = armadillos.join("amy");
make_file(&dir.path().join(&amy), LATIN.as_bytes(), 0o600);
let rolex = armadillos.join("rolex");
make_file(&dir.path().join(&rolex), AGGRESSIVE.as_bytes(), 0o600);

let cats = PathBuf::from("cats");
let cats_abs = dir.path().join(&cats);
std::fs::create_dir_all(&cats_abs).unwrap();
let roland = cats.join("roland");
make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600);

let dogs = PathBuf::from("dogs");
let dogs_abs = dir.path().join(&dogs);
std::fs::create_dir_all(&dogs_abs).unwrap();

let path_stats_sorted = expand_all_sorted(posix_fs);
let mut path_stats_reversed = path_stats_sorted.clone();
path_stats_reversed.reverse();
let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats_reversed)
.wait()
.unwrap()
.contents(store)
.wait()
.unwrap();
// TODO: Write helper for asserting equality of FileContents (and Vecs thereof).
assert_eq!(contents.len(), 3);
assert_eq!(contents.get(0).unwrap().path, amy);
assert_eq!(contents.get(0).unwrap().content, LATIN.as_bytes().to_vec());
assert_eq!(contents.get(1).unwrap().path, rolex);
assert_eq!(
contents.get(1).unwrap().content,
AGGRESSIVE.as_bytes().to_vec()
);
assert_eq!(contents.get(2).unwrap().path, roland);
assert_eq!(contents.get(2).unwrap().content, STR.as_bytes().to_vec());
}

#[derive(Clone)]
struct FileSaver(Arc<Store>, Arc<PosixFS>);

Expand Down

0 comments on commit 9726abe

Please sign in to comment.