diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index f722aef02a3..7386d74fc0c 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -3,15 +3,18 @@ use bazel_protos; use boxfuture::{Boxable, BoxFuture}; +use bytes::Bytes; use futures::Future; use futures::future::join_all; use hashing::{Digest, Fingerprint}; use itertools::Itertools; -use {File, PathStat, Store}; +use {File, FileContent, PathStat, Store}; use protobuf; +use std::collections::HashMap; use std::ffi::OsString; use std::fmt; -use std::sync::Arc; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; #[derive(Clone, PartialEq)] pub struct Snapshot { @@ -38,15 +41,34 @@ impl Snapshot { >( store: Arc, file_digester: S, - mut path_stats: Vec, + path_stats: Vec, ) -> BoxFuture { + let mut sorted_path_stats = path_stats.clone(); + sorted_path_stats.sort_by(|a, b| a.path().cmp(b.path())); + Snapshot::ingest_directory_from_sorted_path_stats(store, file_digester, sorted_path_stats) + .map(|digest| { + Snapshot { + fingerprint: digest.0, + digest: Some(digest), + path_stats: path_stats, + } + }) + .to_boxed() + } + + fn ingest_directory_from_sorted_path_stats< + S: StoreFileByDigest + Sized + Clone, + Error: fmt::Debug + 'static + Send, + >( + store: Arc, + file_digester: S, + path_stats: Vec, + ) -> BoxFuture { let mut file_futures: Vec> = Vec::new(); let mut dir_futures: Vec> = Vec::new(); - path_stats.sort_by(|a, b| a.path().cmp(b.path())); - for (first_component, group) in &path_stats.iter().cloned().group_by(|s| { s.path().components().next().unwrap().as_os_str().to_owned() @@ -96,14 +118,14 @@ impl Snapshot { } else { dir_futures.push( // TODO: Memoize this in the graph - Snapshot::from_path_stats( + Snapshot::ingest_directory_from_sorted_path_stats( store.clone(), file_digester.clone(), paths_of_child_dir(path_group), - ).and_then(move |snapshot| { + ).and_then(move |digest| { let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); dir_node.set_name(osstring_as_utf8(first_component)?); - dir_node.set_digest((&snapshot.digest.unwrap()).into()); + dir_node.set_digest((&digest).into()); Ok(dir_node) }) .to_boxed(), @@ -116,14 +138,100 @@ impl Snapshot { let mut directory = bazel_protos::remote_execution::Directory::new(); directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); directory.set_files(protobuf::RepeatedField::from_vec(files)); - store.record_directory(&directory, true).map(move |digest| { - Snapshot { - fingerprint: digest.0, - digest: Some(digest), - path_stats: path_stats, + store.record_directory(&directory, true) + }) + .to_boxed() + } + + // Preserves the order of Snapshot's path_stats in its returned Vec. + pub fn contents(self, store: Arc) -> BoxFuture, String> { + let contents = Arc::new(Mutex::new(HashMap::new())); + let path_stats = self.path_stats; + Snapshot::contents_for_directory_helper( + self.digest.unwrap(), + store, + PathBuf::from(""), + contents.clone(), + ).map(move |_| { + let mut contents = contents.lock().unwrap(); + let mut vec = Vec::new(); + for path in path_stats.iter().filter_map(|path_stat| match path_stat { + &PathStat::File { ref path, .. } => Some(path.to_path_buf()), + &PathStat::Dir { .. } => None, + }) + { + match contents.remove(&path) { + Some(content) => vec.push(FileContent { path, content }), + None => { + panic!(format!( + "PathStat for {:?} was present in path_stats but missing from Snapshot contents", + path + )); } + } + } + vec + }) + .to_boxed() + } + + // Assumes that all fingerprints it encounters are valid. + fn contents_for_directory_helper( + digest: Digest, + store: Arc, + path_so_far: PathBuf, + contents_wrapped: Arc>>, + ) -> BoxFuture<(), String> { + store + .load_directory(digest) + .and_then(move |maybe_dir| { + maybe_dir.ok_or_else(|| { + format!("Could not find directory with digest {:?}", digest) }) }) + .and_then(move |dir| { + let contents_wrapped_copy = contents_wrapped.clone(); + let path_so_far_copy = path_so_far.clone(); + let store_copy = store.clone(); + let file_futures = join_all( + dir + .get_files() + .iter() + .map(move |file_node| { + let path = path_so_far_copy.join(file_node.get_name()); + let contents_wrapped_copy = contents_wrapped_copy.clone(); + store_copy + .load_file_bytes_with(file_node.get_digest().into(), |b| b) + .and_then(move |maybe_bytes| { + maybe_bytes + .ok_or_else(|| format!("Couldn't find file contents for {:?}", path)) + .map(move |bytes| { + let mut contents = contents_wrapped_copy.lock().unwrap(); + contents.insert(path, bytes); + }) + }) + }) + .collect::>(), + ); + let contents_wrapped_copy2 = contents_wrapped.clone(); + let store_copy = store.clone(); + let dir_futures = join_all( + dir + .get_directories() + .iter() + .map(move |dir_node| { + Snapshot::contents_for_directory_helper( + dir_node.get_digest().into(), + store_copy.clone(), + path_so_far.join(dir_node.get_name()), + contents_wrapped_copy2.clone(), + ) + }) + .collect::>(), + ); + file_futures.join(dir_futures) + }) + .map(|(_, _)| ()) .to_boxed() } } @@ -177,19 +285,22 @@ mod tests { extern crate tempdir; use boxfuture::{BoxFuture, Boxable}; + use bytes::Bytes; use futures::future::Future; use hashing::{Digest, Fingerprint}; use tempdir::TempDir; use self::testutil::make_file; - use super::super::{File, PathGlobs, PathStat, PosixFS, ResettablePool, Snapshot, Store, - StoreFileByDigest, VFS}; + use super::super::{File, FileContent, Path, PathGlobs, PathStat, PosixFS, ResettablePool, + Snapshot, Store, StoreFileByDigest, VFS}; use std; use std::error::Error; use std::path::PathBuf; use std::sync::Arc; + const AGGRESSIVE: &str = "Aggressive"; + const LATIN: &str = "Chaetophractus villosus"; const STR: &str = "European Burmese"; fn setup() -> (Arc, TempDir, Arc, FileSaver) { @@ -275,17 +386,83 @@ mod tests { "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", ).unwrap(); assert_eq!( - Snapshot::from_path_stats(store, digester, unsorted_path_stats) + Snapshot::from_path_stats(store, digester, unsorted_path_stats.clone()) .wait() .unwrap(), Snapshot { fingerprint: fingerprint, digest: Some(Digest(fingerprint, 232)), - path_stats: sorted_path_stats, + path_stats: unsorted_path_stats, } ); } + #[test] + fn contents_for_one_file() { + let (store, dir, posix_fs, digester) = setup(); + + let file_name = PathBuf::from("roland"); + make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); + + let contents = Snapshot::from_path_stats(store.clone(), digester, expand_all_sorted(posix_fs)) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + assert_snapshot_contents(contents, vec![(&file_name, STR)]); + } + + #[test] + fn contents_for_files_in_multiple_directories() { + let (store, dir, posix_fs, digester) = setup(); + + let armadillos = PathBuf::from("armadillos"); + let armadillos_abs = dir.path().join(&armadillos); + std::fs::create_dir_all(&armadillos_abs).unwrap(); + let amy = armadillos.join("amy"); + make_file(&dir.path().join(&amy), LATIN.as_bytes(), 0o600); + let rolex = armadillos.join("rolex"); + make_file(&dir.path().join(&rolex), AGGRESSIVE.as_bytes(), 0o600); + + let cats = PathBuf::from("cats"); + let cats_abs = dir.path().join(&cats); + std::fs::create_dir_all(&cats_abs).unwrap(); + let roland = cats.join("roland"); + make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); + + let dogs = PathBuf::from("dogs"); + let dogs_abs = dir.path().join(&dogs); + std::fs::create_dir_all(&dogs_abs).unwrap(); + + let path_stats_sorted = expand_all_sorted(posix_fs); + let mut path_stats_reversed = path_stats_sorted.clone(); + path_stats_reversed.reverse(); + { + let snapshot = + Snapshot::from_path_stats(store.clone(), digester.clone(), path_stats_reversed) + .wait() + .unwrap(); + let contents = snapshot.contents(store.clone()).wait().unwrap(); + assert_snapshot_contents( + contents, + vec![(&roland, STR), (&rolex, AGGRESSIVE), (&amy, LATIN)], + ); + } + { + let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats_sorted) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + assert_snapshot_contents( + contents, + vec![(&amy, LATIN), (&rolex, AGGRESSIVE), (&roland, STR)], + ); + } + } + #[derive(Clone)] struct FileSaver(Arc, Arc); @@ -313,4 +490,16 @@ mod tests { v.sort_by(|a, b| a.path().cmp(b.path())); v } + + fn assert_snapshot_contents(contents: Vec, expected: Vec<(&Path, &str)>) { + let expected_with_array: Vec<_> = expected + .into_iter() + .map(|(path, s)| (path.to_path_buf(), Bytes::from(s))) + .collect(); + let got: Vec<_> = contents + .into_iter() + .map(|file_content| (file_content.path, file_content.content)) + .collect(); + assert_eq!(expected_with_array, got); + } }