Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot can get FileContent #5494

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 206 additions & 17 deletions src/rust/engine/fs/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@

use bazel_protos;
use boxfuture::{Boxable, BoxFuture};
use bytes::Bytes;
use futures::Future;
use futures::future::join_all;
use hashing::{Digest, Fingerprint};
use itertools::Itertools;
use {File, PathStat, Store};
use {File, FileContent, PathStat, Store};
use protobuf;
use std::collections::HashMap;
use std::ffi::OsString;
use std::fmt;
use std::sync::Arc;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

#[derive(Clone, PartialEq)]
pub struct Snapshot {
Expand All @@ -38,15 +41,34 @@ impl Snapshot {
>(
store: Arc<Store>,
file_digester: S,
mut path_stats: Vec<PathStat>,
path_stats: Vec<PathStat>,
) -> BoxFuture<Snapshot, String> {
let mut sorted_path_stats = path_stats.clone();
sorted_path_stats.sort_by(|a, b| a.path().cmp(b.path()));
Snapshot::ingest_directory_from_sorted_path_stats(store, file_digester, sorted_path_stats)
.map(|digest| {
Snapshot {
fingerprint: digest.0,
digest: Some(digest),
path_stats: path_stats,
}
})
.to_boxed()
}

fn ingest_directory_from_sorted_path_stats<
S: StoreFileByDigest<Error> + Sized + Clone,
Error: fmt::Debug + 'static + Send,
>(
store: Arc<Store>,
file_digester: S,
path_stats: Vec<PathStat>,
) -> BoxFuture<Digest, String> {
let mut file_futures: Vec<BoxFuture<bazel_protos::remote_execution::FileNode, String>> =
Vec::new();
let mut dir_futures: Vec<BoxFuture<bazel_protos::remote_execution::DirectoryNode, String>> =
Vec::new();

path_stats.sort_by(|a, b| a.path().cmp(b.path()));

for (first_component, group) in
&path_stats.iter().cloned().group_by(|s| {
s.path().components().next().unwrap().as_os_str().to_owned()
Expand Down Expand Up @@ -96,14 +118,14 @@ impl Snapshot {
} else {
dir_futures.push(
// TODO: Memoize this in the graph
Snapshot::from_path_stats(
Snapshot::ingest_directory_from_sorted_path_stats(
store.clone(),
file_digester.clone(),
paths_of_child_dir(path_group),
).and_then(move |snapshot| {
).and_then(move |digest| {
let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new();
dir_node.set_name(osstring_as_utf8(first_component)?);
dir_node.set_digest((&snapshot.digest.unwrap()).into());
dir_node.set_digest((&digest).into());
Ok(dir_node)
})
.to_boxed(),
Expand All @@ -116,14 +138,100 @@ impl Snapshot {
let mut directory = bazel_protos::remote_execution::Directory::new();
directory.set_directories(protobuf::RepeatedField::from_vec(dirs));
directory.set_files(protobuf::RepeatedField::from_vec(files));
store.record_directory(&directory, true).map(move |digest| {
Snapshot {
fingerprint: digest.0,
digest: Some(digest),
path_stats: path_stats,
store.record_directory(&directory, true)
})
.to_boxed()
}

// Preserves the order of Snapshot's path_stats in its returned Vec.
pub fn contents(self, store: Arc<Store>) -> BoxFuture<Vec<FileContent>, String> {
let contents = Arc::new(Mutex::new(HashMap::new()));
let path_stats = self.path_stats;
Snapshot::contents_for_directory_helper(
self.digest.unwrap(),
store,
PathBuf::from(""),
contents.clone(),
).map(move |_| {
let mut contents = contents.lock().unwrap();
let mut vec = Vec::new();
for path in path_stats.iter().filter_map(|path_stat| match path_stat {
&PathStat::File { ref path, .. } => Some(path.to_path_buf()),
&PathStat::Dir { .. } => None,
})
{
match contents.remove(&path) {
Some(content) => vec.push(FileContent { path, content }),
None => {
panic!(format!(
"PathStat for {:?} was present in path_stats but missing from Snapshot contents",
path
));
}
}
}
vec
})
.to_boxed()
}

// Assumes that all fingerprints it encounters are valid.
fn contents_for_directory_helper(
digest: Digest,
store: Arc<Store>,
path_so_far: PathBuf,
contents_wrapped: Arc<Mutex<HashMap<PathBuf, Bytes>>>,
) -> BoxFuture<(), String> {
store
.load_directory(digest)
.and_then(move |maybe_dir| {
maybe_dir.ok_or_else(|| {
format!("Could not find directory with digest {:?}", digest)
})
})
.and_then(move |dir| {
let contents_wrapped_copy = contents_wrapped.clone();
let path_so_far_copy = path_so_far.clone();
let store_copy = store.clone();
let file_futures = join_all(
dir
.get_files()
.iter()
.map(move |file_node| {
let path = path_so_far_copy.join(file_node.get_name());
let contents_wrapped_copy = contents_wrapped_copy.clone();
store_copy
.load_file_bytes_with(file_node.get_digest().into(), |b| b)
.and_then(move |maybe_bytes| {
maybe_bytes
.ok_or_else(|| format!("Couldn't find file contents for {:?}", path))
.map(move |bytes| {
let mut contents = contents_wrapped_copy.lock().unwrap();
contents.insert(path, bytes);
})
})
})
.collect::<Vec<_>>(),
);
let contents_wrapped_copy2 = contents_wrapped.clone();
let store_copy = store.clone();
let dir_futures = join_all(
dir
.get_directories()
.iter()
.map(move |dir_node| {
Snapshot::contents_for_directory_helper(
dir_node.get_digest().into(),
store_copy.clone(),
path_so_far.join(dir_node.get_name()),
contents_wrapped_copy2.clone(),
)
})
.collect::<Vec<_>>(),
);
file_futures.join(dir_futures)
})
.map(|(_, _)| ())
.to_boxed()
}
}
Expand Down Expand Up @@ -177,19 +285,22 @@ mod tests {
extern crate tempdir;

use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use futures::future::Future;
use hashing::{Digest, Fingerprint};
use tempdir::TempDir;
use self::testutil::make_file;

use super::super::{File, PathGlobs, PathStat, PosixFS, ResettablePool, Snapshot, Store,
StoreFileByDigest, VFS};
use super::super::{File, FileContent, Path, PathGlobs, PathStat, PosixFS, ResettablePool,
Snapshot, Store, StoreFileByDigest, VFS};

use std;
use std::error::Error;
use std::path::PathBuf;
use std::sync::Arc;

const AGGRESSIVE: &str = "Aggressive";
const LATIN: &str = "Chaetophractus villosus";
const STR: &str = "European Burmese";

fn setup() -> (Arc<Store>, TempDir, Arc<PosixFS>, FileSaver) {
Expand Down Expand Up @@ -275,17 +386,83 @@ mod tests {
"fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b",
).unwrap();
assert_eq!(
Snapshot::from_path_stats(store, digester, unsorted_path_stats)
Snapshot::from_path_stats(store, digester, unsorted_path_stats.clone())
.wait()
.unwrap(),
Snapshot {
fingerprint: fingerprint,
digest: Some(Digest(fingerprint, 232)),
path_stats: sorted_path_stats,
path_stats: unsorted_path_stats,
}
);
}

#[test]
fn contents_for_one_file() {
let (store, dir, posix_fs, digester) = setup();

let file_name = PathBuf::from("roland");
make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600);

let contents = Snapshot::from_path_stats(store.clone(), digester, expand_all_sorted(posix_fs))
.wait()
.unwrap()
.contents(store)
.wait()
.unwrap();
assert_snapshot_contents(contents, vec![(&file_name, STR)]);
}

#[test]
fn contents_for_files_in_multiple_directories() {
let (store, dir, posix_fs, digester) = setup();

let armadillos = PathBuf::from("armadillos");
let armadillos_abs = dir.path().join(&armadillos);
std::fs::create_dir_all(&armadillos_abs).unwrap();
let amy = armadillos.join("amy");
make_file(&dir.path().join(&amy), LATIN.as_bytes(), 0o600);
let rolex = armadillos.join("rolex");
make_file(&dir.path().join(&rolex), AGGRESSIVE.as_bytes(), 0o600);

let cats = PathBuf::from("cats");
let cats_abs = dir.path().join(&cats);
std::fs::create_dir_all(&cats_abs).unwrap();
let roland = cats.join("roland");
make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600);

let dogs = PathBuf::from("dogs");
let dogs_abs = dir.path().join(&dogs);
std::fs::create_dir_all(&dogs_abs).unwrap();

let path_stats_sorted = expand_all_sorted(posix_fs);
let mut path_stats_reversed = path_stats_sorted.clone();
path_stats_reversed.reverse();
{
let snapshot =
Snapshot::from_path_stats(store.clone(), digester.clone(), path_stats_reversed)
.wait()
.unwrap();
let contents = snapshot.contents(store.clone()).wait().unwrap();
assert_snapshot_contents(
contents,
vec![(&roland, STR), (&rolex, AGGRESSIVE), (&amy, LATIN)],
);
}
{
let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats_sorted)
.wait()
.unwrap()
.contents(store)
.wait()
.unwrap();
assert_snapshot_contents(
contents,
vec![(&amy, LATIN), (&rolex, AGGRESSIVE), (&roland, STR)],
);
}
}

#[derive(Clone)]
struct FileSaver(Arc<Store>, Arc<PosixFS>);

Expand Down Expand Up @@ -313,4 +490,16 @@ mod tests {
v.sort_by(|a, b| a.path().cmp(b.path()));
v
}

fn assert_snapshot_contents(contents: Vec<FileContent>, expected: Vec<(&Path, &str)>) {
let expected_with_array: Vec<_> = expected
.into_iter()
.map(|(path, s)| (path.to_path_buf(), Bytes::from(s)))
.collect();
let got: Vec<_> = contents
.into_iter()
.map(|file_content| (file_content.path, file_content.content))
.collect();
assert_eq!(expected_with_array, got);
}
}