diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 271f47c6f208..193701b96248 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -66,11 +66,6 @@ name = "cc" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "cfg-if" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "cmake" version = "0.1.26" @@ -102,16 +97,6 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "filetime" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "fixedbitset" version = "0.1.8" @@ -140,7 +125,6 @@ dependencies = [ "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tar 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -387,11 +371,6 @@ dependencies = [ "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "redox_syscall" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "regex" version = "0.2.2" @@ -430,16 +409,6 @@ dependencies = [ "generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tar" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", - "xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempdir" version = "0.3.5" @@ -498,14 +467,6 @@ name = "winapi-build" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "xattr" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" @@ -513,13 +474,11 @@ dependencies = [ "checksum block-buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1339a1042f5d9f295737ad4d9a6ab6bf81c84a933dba110b9200cd6d1448b814" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" "checksum cc 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ef4019bdb99c0c1ddd56c12c2f507c174d729c6915eca6bd9d27c42f3d93b0f4" -"checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" "checksum cmake 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "357c07e7a1fc95732793c1edb5901e1a1f305cfcf63a90eb12dbd22bdb6b789d" "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum either 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e311a7479512fbdf858fb54d91ec59f3b9f85bc0113659f46bba12b199d273ce" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -"checksum filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "aa75ec8f7927063335a9583e7fa87b0110bb888cf766dc01b54c0ff70d760c8e" "checksum fixedbitset 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "85cb8fec437468d86dc7c83ca7cfc933341d561873275f22dd5eedefa63a6478" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159" @@ -552,12 +511,10 @@ dependencies = [ "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum protobuf 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "99c7a6694a7896f7c039bc20a6947b83781b019d7d40df77ae069cd2a432e4a7" "checksum rand 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "61efcbcd9fa8d8fbb07c84e34a8af18a1ff177b449689ad38a6e9457ecc7b2ae" -"checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509" "checksum regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1731164734096285ec2a5ec7fea5248ae2f5485b3feeb0115af4fda2183b2d1b" "checksum regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad890a5eef7953f55427c50575c680c42841653abd2b028b68cd223d157f62db" "checksum same-file 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "70a18720d745fb9ca6a041b37cb36d0b21066006b6cff8b5b360142d4b81fb60" "checksum sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7d963c78ce367df26d7ea8b8cc655c651b42e8a1e584e869c1e17dae3ccb116a" -"checksum tar 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "281285b717926caa919ad905ef89c63d75805c7d89437fb873100925a53f2b1b" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" "checksum typenum 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a99dc6780ef33c78780b826cf9d2a78840b72cae9474de4bcaf9051e60ebbd" @@ -567,4 +524,3 @@ dependencies = [ "checksum walkdir 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40b6d201f4f8998a837196b6de9c73e35af14c992cbb92c4ab641d2c2dce52de" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" -"checksum xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "5f04de8a1346489a2f9e9bd8526b73d135ec554227b17568456e86aa35b6f3fc" diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index 6f41a4d3d2df..69d83f3a39d7 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -18,7 +18,6 @@ lmdb = "0.7.2" ordermap = "0.2.8" protobuf = "1.4.1" sha2 = "0.6.0" -tar = "0.4.13" tempdir = "0.3.5" [dev-dependencies] diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index ed8625620c98..feaf173375ac 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -227,7 +227,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> { paths, ) }) - .map(|snapshot| snapshot.digest.unwrap()) + .map(|snapshot| snapshot.digest) .wait()?; Ok(println!("{} {}", digest.0, digest.1)) } diff --git a/src/rust/engine/fs/src/hash.rs b/src/rust/engine/fs/src/hash.rs index 176d84fc3cc0..104485a46071 100644 --- a/src/rust/engine/fs/src/hash.rs +++ b/src/rust/engine/fs/src/hash.rs @@ -1,12 +1,8 @@ // Copyright 2017 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use digest::{Digest, FixedOutput}; -use sha2::Sha256; - use std::error::Error; use std::fmt; -use std::io::{self, Write}; use hex; @@ -66,43 +62,6 @@ impl AsRef<[u8]> for Fingerprint { } } -/// -/// A Write instance that fingerprints all data that passes through it. -/// -pub struct WriterHasher { - hasher: Sha256, - inner: W, -} - -impl WriterHasher { - pub fn new(inner: W) -> WriterHasher { - WriterHasher { - hasher: Sha256::default(), - inner: inner, - } - } - - /// - /// Returns the result of fingerprinting this stream, and Drops the stream. - /// - pub fn finish(self) -> Fingerprint { - Fingerprint::from_bytes_unsafe(&self.hasher.fixed_result()) - } -} - -impl Write for WriterHasher { - fn write(&mut self, buf: &[u8]) -> io::Result { - let written = self.inner.write(buf)?; - // Hash the bytes that were successfully written. - self.hasher.input(&buf[0..written]); - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - #[cfg(test)] mod fingerprint_tests { use super::Fingerprint; diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index af509d3d14ee..afdbad35b09a 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -25,26 +25,22 @@ extern crate lmdb; extern crate ordermap; extern crate protobuf; extern crate sha2; -extern crate tar; extern crate tempdir; use std::collections::HashSet; use std::os::unix::fs::PermissionsExt; use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::{fmt, fs}; use std::io::{self, Read}; use std::cmp::min; use futures::future::{self, Future}; -use futures_cpupool::CpuFuture; use glob::Pattern; use ignore::gitignore::{Gitignore, GitignoreBuilder}; use ordermap::OrderMap; -use tempdir::TempDir; use boxfuture::{Boxable, BoxFuture}; -use hash::WriterHasher; #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -815,249 +811,6 @@ pub fn safe_create_dir_all_ioerror(path: &Path) -> Result<(), io::Error> { } } -fn safe_create_dir_all(path: &Path) -> Result<(), String> { - safe_create_dir_all_ioerror(path).map_err(|e| { - format!("Failed to create dir {:?} due to {:?}", path, e) - }) -} - -fn safe_create_tmpdir_in(base_dir: &Path, prefix: &str) -> Result { - safe_create_dir_all(&base_dir)?; - Ok(TempDir::new_in(&base_dir, prefix).map_err(|e| { - format!("Failed to create tempdir {:?} due to {:?}", base_dir, e) - })?) -} - -/// -/// A facade for the snapshot directory, which lives under the pants workdir. -/// -pub struct Snapshots { - snapshots_dir: PathBuf, - snapshots_generator: Mutex<(TempDir, usize)>, -} - -impl Snapshots { - pub fn new(snapshots_dir: PathBuf) -> Result { - let snapshots_tmpdir = safe_create_tmpdir_in(&snapshots_dir, ".tmp")?; - - Ok(Snapshots { - snapshots_dir: snapshots_dir, - snapshots_generator: Mutex::new((snapshots_tmpdir, 0)), - }) - } - - pub fn snapshot_path(&self) -> &Path { - self.snapshots_dir.as_path() - } - - fn next_temp_path(&self) -> Result { - let mut gen = self.snapshots_generator.lock().unwrap(); - gen.1 += 1; - - // N.B. Sometimes, in e.g. a `./pants clean-all test ...` the snapshot tempdir created at the - // beginning of a run can be removed out from under us by e.g. the `clean-all` task. Here, we - // we double check existence of the `TempDir`'s path when the path is accessed and replace if - // necessary. - if !gen.0.path().exists() { - gen.0 = safe_create_tmpdir_in(&self.snapshots_dir, ".tmp")?; - } - - Ok(gen.0.path().join(format!("{}.tmp", gen.1))) - } - - /// - /// A non-canonical (does not expand symlinks) in-memory form of normalize. Used to collapse - /// parent and cur components, which are legal in symbolic paths in PathStats, but not in - /// Tar files. - /// - fn normalize(path: &Path) -> Result { - let mut res = PathBuf::new(); - for component in path.components() { - match component { - Component::Prefix(..) | - Component::RootDir => return Err(format!("Absolute paths not supported: {:?}", path)), - Component::CurDir => continue, - Component::ParentDir => { - // Pop the previous component. - if !res.pop() { - return Err(format!( - "Globs may not traverse outside the root: {:?}", - path - )); - } else { - continue; - } - } - Component::Normal(p) => res.push(p), - } - } - Ok(res) - } - - /// - /// Create a tar file on the given Write instance containing the given paths, or - /// return an error string. - /// - fn tar_create( - dest: W, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - let mut tar_builder = tar::Builder::new(dest); - tar_builder.mode(tar::HeaderMode::Deterministic); - for path_stat in paths { - // Append the PathStat using the symbolic name and underlying stat. - let append_res = match path_stat { - &PathStat::File { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - let mut input = fs::File::open(relative_to.0.join(stat.path.as_path())) - .map_err(|e| format!("Failed to open {:?}: {:?}", path_stat, e))?; - tar_builder.append_file(normalized, &mut input) - } - &PathStat::Dir { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - tar_builder.append_dir(normalized, relative_to.0.join(stat.0.as_path())) - } - }; - append_res.map_err(|e| { - format!("Failed to tar {:?}: {:?}", path_stat, e) - })?; - } - - // Finish the tar file, returning ownership of the stream to the caller. - Ok(tar_builder.into_inner().map_err(|e| { - format!("Failed to finalize snapshot tar: {:?}", e) - })?) - } - - /// - /// Create a tar file at the given dest Path containing the given paths, while - /// fingerprinting the written stream. - /// - fn tar_create_fingerprinted( - dest: &Path, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - // Wrap buffering around a fingerprinted stream above a File. - let stream = io::BufWriter::new(WriterHasher::new(fs::File::create(dest).map_err(|e| { - format!("Failed to create destination file: {:?}", e) - })?)); - - // Then append the tar to the stream, and retrieve the Fingerprint to flush all writers. - Ok( - Snapshots::tar_create(stream, paths, relative_to)? - .into_inner() - .map_err(|e| { - format!("Failed to flush to {:?}: {:?}", dest, e.error()) - })? - .finish(), - ) - } - - /// - /// Attempts to rename src to dst, and _succeeds_ if dst already exists. This is safe in - /// the case of Snapshots because the destination path is unique to its content. - /// - fn finalize(temp_path: &Path, dest_path: &Path) -> Result<(), String> { - if dest_path.is_file() { - // The Snapshot has already been created. - fs::remove_file(temp_path).unwrap_or(()); - Ok(()) - } else { - let dest_dir = dest_path.parent().expect( - "All snapshot paths must have parent directories.", - ); - safe_create_dir_all(dest_dir)?; - match fs::rename(temp_path, dest_path) { - Ok(_) => Ok(()), - Err(_) if dest_path.is_file() => Ok(()), - Err(e) => Err(format!( - "Failed to finalize snapshot at {:?}: {:?}", - dest_path, - e - )), - } - } - } - - fn path_for(&self, fingerprint: &Fingerprint) -> PathBuf { - Snapshots::path_under_for(self.snapshot_path(), fingerprint) - } - - fn path_under_for(path: &Path, fingerprint: &Fingerprint) -> PathBuf { - let hex = fingerprint.to_hex(); - path.join(&hex[0..2]).join(&hex[2..4]).join( - format!("{}.tar", hex), - ) - } - - /// - /// Creates a Snapshot for the given paths under the given VFS. - /// - pub fn create(&self, fs: &PosixFS, paths: Vec) -> CpuFuture { - let dest_dir = self.snapshot_path().to_owned(); - let root = fs.root.clone(); - let temp_path = self.next_temp_path().expect( - "Couldn't get the next temp path.", - ); - - fs.pool.spawn_fn(move || { - // Write the tar deterministically to a temporary file while fingerprinting. - let fingerprint = Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &root)?; - - // Rename to the final path if it does not already exist. - Snapshots::finalize( - temp_path.as_path(), - Snapshots::path_under_for(&dest_dir, &fingerprint).as_path(), - )?; - - Ok(Snapshot { - fingerprint: fingerprint, - digest: None, - path_stats: paths, - }) - }) - } - - fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result, io::Error> { - let mut archive = fs::File::open(path).map(|f| tar::Archive::new(f))?; - - // Zip the in-memory Snapshot to the on disk representation, validating as we go. - let mut files_content = Vec::new(); - for (entry_res, path_stat) in archive.entries()?.zip(snapshot.path_stats.into_iter()) { - let mut entry = entry_res?; - if entry.header().entry_type() == tar::EntryType::file() { - let path = match path_stat { - PathStat::File { path, .. } => path, - PathStat::Dir { .. } => panic!("Snapshot contents changed after storage."), - }; - let mut content = Vec::new(); - io::Read::read_to_end(&mut entry, &mut content)?; - files_content.push(FileContent { - path: path, - content: content, - }); - } - } - Ok(files_content) - } - - pub fn contents_for( - &self, - fs: &PosixFS, - snapshot: Snapshot, - ) -> CpuFuture, String> { - let archive_path = self.path_for(&snapshot.fingerprint); - fs.pool.spawn_fn(move || { - let snapshot_str = format!("{:?}", snapshot); - Snapshots::contents_for_sync(snapshot, archive_path).map_err(|e| { - format!("Failed to open Snapshot {}: {:?}", snapshot_str, e) - }) - }) - } -} - #[cfg(test)] mod posixfs_test { extern crate tempdir; diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index e6a59f090ec5..a6811f8e040f 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -3,23 +3,21 @@ use bazel_protos; use boxfuture::{Boxable, BoxFuture}; +use futures; use futures::Future; use futures::future::join_all; use itertools::Itertools; -use {Digest, File, PathStat, Store}; +use {Digest, File, FileContent, PathStat, Store}; use hash::Fingerprint; use protobuf; use std::ffi::OsString; use std::fmt; +use std::path::PathBuf; use std::sync::Arc; #[derive(Clone, PartialEq)] pub struct Snapshot { - // TODO: In a follow-up commit, fingerprint will be removed, and digest will be made non-optional. - // They both exist right now as a compatibility shim so that the tar-based code and - // Directory-based code can peacefully co-exist. - pub fingerprint: Fingerprint, - pub digest: Option, + pub digest: Digest, pub path_stats: Vec, } @@ -41,83 +39,127 @@ impl Snapshot { path_stats.sort_by(|a, b| a.path().cmp(b.path())); for (first_component, group) in - &path_stats.iter().cloned().group_by(|s| { - s.path().components().next().unwrap().as_os_str().to_owned() - }) - { - let mut path_group: Vec = group.collect(); - if path_group.len() == 1 && path_group.get(0).unwrap().path().components().count() == 1 { - // Exactly one entry with exactly one component indicates either a file in this directory, - // or an empty directory. - // If the child is a non-empty directory, or a file therein, there must be multiple - // PathStats with that prefix component, and we will handle that in the recursive - // save_directory call. - - match path_group.pop().unwrap() { - PathStat::File { ref stat, .. } => { - let is_executable = stat.is_executable; - file_futures.push( - file_digester - .clone() - .digest(&stat) - .map_err(|e| format!("{:?}", e)) - .and_then(move |digest| { - let mut file_node = bazel_protos::remote_execution::FileNode::new(); - file_node.set_name(osstring_as_utf8(first_component)?); - file_node.set_digest(digest.into()); - file_node.set_is_executable(is_executable); - Ok(file_node) - }) - .to_boxed(), - ); - } - PathStat::Dir { .. } => { - // Because there are no children of this Dir, it must be empty. + &path_stats.iter().cloned().group_by(|s| { + s.path().components().next().unwrap().as_os_str().to_owned() + }) + { + let mut path_group: Vec = group.collect(); + if path_group.len() == 1 && path_group.get(0).unwrap().path().components().count() == 1 { + // Exactly one entry with exactly one component indicates either a file in this directory, + // or an empty directory. + // If the child is a non-empty directory, or a file therein, there must be multiple + // PathStats with that prefix component, and we will handle that in the recursive + // save_directory call. + + match path_group.pop().unwrap() { + PathStat::File { ref stat, .. } => { + let is_executable = stat.is_executable; + file_futures.push( + file_digester + .clone() + .digest(&stat) + .map_err(|e| format!("{:?}", e)) + .and_then(move |digest| { + let mut file_node = bazel_protos::remote_execution::FileNode::new(); + file_node.set_name(osstring_as_utf8(first_component)?); + file_node.set_digest(digest.into()); + file_node.set_is_executable(is_executable); + Ok(file_node) + }) + .to_boxed(), + ); + } + PathStat::Dir { .. } => { + // Because there are no children of this Dir, it must be empty. + dir_futures.push( + store + .record_directory(&bazel_protos::remote_execution::Directory::new()) + .map(move |digest| { + let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new(); + directory_node.set_name(osstring_as_utf8(first_component).unwrap()); + directory_node.set_digest(digest.into()); + directory_node + }) + .to_boxed(), + ); + } + } + } else { dir_futures.push( - store - .record_directory(&bazel_protos::remote_execution::Directory::new()) - .map(move |digest| { - let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new(); - directory_node.set_name(osstring_as_utf8(first_component).unwrap()); - directory_node.set_digest(digest.into()); - directory_node - }) - .to_boxed(), + // TODO: Memoize this in the graph + Snapshot::from_path_stats( + store.clone(), + file_digester.clone(), + paths_of_child_dir(path_group), + ).and_then(move |snapshot| { + let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); + dir_node.set_name(osstring_as_utf8(first_component)?); + dir_node.set_digest(snapshot.digest.into()); + Ok(dir_node) + }) + .to_boxed(), ); } } - } else { - dir_futures.push( - // TODO: Memoize this in the graph - Snapshot::from_path_stats( - store.clone(), - file_digester.clone(), - paths_of_child_dir(path_group), - ).and_then(move |snapshot| { - let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); - dir_node.set_name(osstring_as_utf8(first_component)?); - dir_node.set_digest(snapshot.digest.unwrap().into()); - Ok(dir_node) - }) - .to_boxed(), - ); - } - } join_all(dir_futures) - .join(join_all(file_futures)) - .and_then(move |(dirs, files)| { - let mut directory = bazel_protos::remote_execution::Directory::new(); - directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); - directory.set_files(protobuf::RepeatedField::from_vec(files)); - store.record_directory(&directory).map(move |digest| { - Snapshot { - fingerprint: digest.0, - digest: Some(digest), - path_stats: path_stats, - } + .join(join_all(file_futures)) + .and_then(move |(dirs, files)| { + let mut directory = bazel_protos::remote_execution::Directory::new(); + directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); + directory.set_files(protobuf::RepeatedField::from_vec(files)); + store.record_directory(&directory).map(move |digest| { + Snapshot { + digest: digest, + path_stats: path_stats, + } + }) }) - }) - .to_boxed() + .to_boxed() + } + + // TODO: Rewrite this to execute in parallel + pub fn contents(self, store: Arc) -> BoxFuture, String> { + futures::future::done(Snapshot::contents_for_directory_sync( + self.digest.0, + store, + PathBuf::from(""), + )).to_boxed() + } + + fn contents_for_directory_sync( + fingerprint: Fingerprint, + store: Arc, + path_so_far: PathBuf, + ) -> Result, String> { + let directory = store.load_directory_proto(fingerprint).wait().unwrap().ok_or_else(|| { + format!("Could not find snapshot {}", fingerprint) + })?; + + let mut contents = Vec::new(); + + for file_node in directory.get_files() { + contents.push(FileContent { + path: path_so_far.join(file_node.get_name()), + content: store + .load_file_bytes(Fingerprint::from_hex_string( + file_node.get_digest().get_hash(), + ).unwrap()).wait().unwrap() + .unwrap(), + }); + } + + for dir_node in directory.get_directories() { + contents.extend( + Snapshot::contents_for_directory_sync( + Fingerprint::from_hex_string(dir_node.get_digest().get_hash()).unwrap(), + store.clone(), + path_so_far.join(dir_node.get_name()), + ).unwrap(), + ); + } + + contents.sort_by(|a, b| a.path.cmp(&b.path)); + Ok(contents) } } @@ -125,8 +167,7 @@ impl fmt::Debug for Snapshot { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "Snapshot({}, digest={:?}, entries={})", - self.fingerprint.to_hex(), + "Snapshot(digest={:?}, entries={})", self.digest, self.path_stats.len() ) @@ -203,17 +244,17 @@ mod tests { make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 80)), + digest: Digest( + Fingerprint::from_hex_string( + "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", + ).unwrap(), + 80, + ), path_stats: path_stats, } ); @@ -229,17 +270,17 @@ mod tests { make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 78)), + digest: Digest( + Fingerprint::from_hex_string( + "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", + ).unwrap(), + 78, + ), path_stats: path_stats, } ); @@ -261,22 +302,44 @@ mod tests { let sorted_path_stats = expand_all_sorted(posix_fs); let mut unsorted_path_stats = sorted_path_stats.clone(); unsorted_path_stats.reverse(); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, unsorted_path_stats) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 232)), + digest: Digest( + Fingerprint::from_hex_string( + "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", + ).unwrap(), + 232, + ), path_stats: sorted_path_stats, } ); } + // TODO: Write more tests which involve multiple files, and directories, and stuff. + // I will do this before merging this PR, but want to check the approach before doing so. + #[test] + fn contents_for_one_file() { + let (store, dir, posix_fs, digester) = setup(); + + let file_name = PathBuf::from("roland"); + make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); + + let path_stats = expand_all_sorted(posix_fs); + let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats.clone()) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + // TODO: Write helper for asserting equality of FileContents (and Vecs thereof). + assert_eq!(contents.len(), 1); + assert_eq!(contents.get(0).unwrap().path, file_name); + assert_eq!(contents.get(0).unwrap().content, STR.as_bytes().to_vec()); + } + struct FileSaver(Arc, Arc); impl GetFileDigest for FileSaver { diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 4dfde11de22a..7086fe4d98aa 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -2,14 +2,14 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std; -use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use core::TypeId; -use externs; -use fs::{PosixFS, Snapshots, Store, safe_create_dir_all_ioerror, ResettablePool}; +use boxfuture::BoxFuture; +use core::{Failure, TypeId}; +use fs::{Digest, File, GetFileDigest, PosixFS, Store, safe_create_dir_all_ioerror, ResettablePool}; use graph::{EntryId, Graph}; +use nodes::{DigestFile, GetNode}; use rule_graph::RuleGraph; use tasks::Tasks; use types::Types; @@ -24,15 +24,14 @@ pub struct Core { pub rule_graph: RuleGraph, pub types: Types, pub pool: Arc, - pub snapshots: Snapshots, - pub store: Store, + pub store: Arc, pub vfs: PosixFS, } impl Core { pub fn new( root_subject_types: Vec, - mut tasks: Tasks, + tasks: Tasks, types: Types, build_root: &Path, ignore_patterns: Vec, @@ -57,20 +56,6 @@ impl Core { }, ); - // TODO: Create the Snapshots directory, and then expose it as a singleton to python. - // see: https://github.com/pantsbuild/pants/issues/4397 - let snapshots = Snapshots::new(snapshots_dir).unwrap_or_else(|e| { - panic!("Could not initialize Snapshot directory: {:?}", e); - }); - tasks.singleton_replace( - externs::invoke_unsafe( - &types.construct_snapshots, - &vec![ - externs::store_bytes(snapshots.snapshot_path().as_os_str().as_bytes()), - ], - ), - types.snapshots.clone(), - ); let rule_graph = RuleGraph::new(&tasks, root_subject_types); Core { @@ -79,8 +64,7 @@ impl Core { rule_graph: rule_graph, types: types, pool: pool.clone(), - snapshots: snapshots, - store: store, + store: Arc::new(store), // FIXME: Errors in initialization should definitely be exposed as python // exceptions, rather than as panics. vfs: PosixFS::new(build_root, pool, ignore_patterns).unwrap_or_else(|e| { @@ -109,6 +93,12 @@ impl Context { } } +impl GetFileDigest for Context { + fn digest(&self, file: &File) -> BoxFuture { + self.get(DigestFile(file.clone())) + } +} + pub trait ContextFactory { fn create(&self, entry_id: EntryId) -> Context; } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 1c2170925eff..9fcb61f4695d 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -6,6 +6,7 @@ use std::error::Error; use std::fmt; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::Arc; use futures::future::{self, Future}; use ordermap::OrderMap; @@ -52,7 +53,10 @@ fn was_required(failure: Failure) -> Failure { } } -trait GetNode { +// TODO: This shouldn't be public (is for context to implement GetFileDigest). +// How should this *actually* be consumed, in such a way that it can be delegated to by traits which +// have multiple implementations (either for tests, or for weird tools like fs_util). +pub trait GetNode { fn get(&self, node: N) -> NodeFuture; } @@ -301,13 +305,12 @@ impl Select { vec![ self .get_snapshot(&context) - .and_then(move |snapshot| + .and_then( + move |snapshot| // Request the file contents of the Snapshot, and then store them. - context.core.snapshots.contents_for(&context.core.vfs, snapshot) - .then(move |files_content_res| match files_content_res { - Ok(files_content) => Ok(Snapshot::store_files_content(&context, &files_content)), - Err(e) => Err(throw(&e)), - })) + snapshot.contents(context.core.store.clone()).map_err(|e| throw(&e)) + .map(move |files_content| Snapshot::store_files_content(&context, &files_content)) + ) .to_boxed(), ] } else if let Some(&(_, ref value)) = context.core.tasks.gen_singleton(self.product()) { @@ -777,7 +780,7 @@ impl From for NodeKey { /// A Node that represents reading a file and fingerprinting its contents. /// #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct DigestFile(File); +pub struct DigestFile(pub File); impl Node for DigestFile { type Output = fs::Digest; @@ -883,7 +886,7 @@ pub struct Snapshot { } impl Snapshot { - fn create(context: Context, path_globs: PathGlobs) -> NodeFuture { + fn create(context: Arc, path_globs: PathGlobs) -> NodeFuture { // Recursively expand PathGlobs into PathStats while tracking their dependencies. context .expand(path_globs) @@ -901,10 +904,16 @@ impl Snapshot { // And then create a Snapshot. stats .and_then(move |_| { - context - .core - .snapshots - .create(&context.core.vfs, path_stats) + let context = context.clone(); + let pool = context.core.pool.clone(); + pool + .spawn_fn(move || { + fs::Snapshot::from_path_stats( + context.core.store.clone(), + context.clone(), + path_stats, + ) + }) .map_err(move |e| throw(&format!("Snapshot failed: {}", e))) }) .to_boxed() @@ -936,7 +945,7 @@ impl Snapshot { externs::invoke_unsafe( &context.core.types.construct_snapshot, &vec![ - externs::store_bytes(&item.fingerprint.0), + externs::store_bytes(&(item.digest.0).0), externs::store_list(path_stats.iter().collect(), false), ], ) @@ -1009,7 +1018,7 @@ impl Node for Snapshot { .then(move |path_globs_res| match path_globs_res { Ok(path_globs_val) => { match Self::lift_path_globs(&path_globs_val) { - Ok(pgs) => Snapshot::create(context, pgs), + Ok(pgs) => Snapshot::create(Arc::new(context), pgs), Err(e) => err(throw(&format!("Failed to parse PathGlobs: {}", e))), } } diff --git a/src/rust/engine/src/tasks.rs b/src/rust/engine/src/tasks.rs index d6cd9ccbe5d5..07f5b616badd 100644 --- a/src/rust/engine/src/tasks.rs +++ b/src/rust/engine/src/tasks.rs @@ -86,16 +86,6 @@ impl Tasks { ); } - // TODO: Only exists in order to support the `Snapshots` singleton replacement in `context.rs`: - // Fix by porting isolated processes to rust: - // see: https://github.com/pantsbuild/pants/issues/4397 - pub fn singleton_replace(&mut self, value: Value, product: TypeConstraint) { - self.singletons.insert( - product, - (externs::key_for(&value), value), - ); - } - /// /// The following methods define the Task registration lifecycle. ///