From 267a3d830d17090fc0aa4aca2335c14df635ae61 Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 16 Nov 2017 16:53:32 +0000 Subject: [PATCH] Snapshots are stored in the LMDB store not tar files This has a few advantages: 1. It makes the Bazel remote execution API much easier to implement. 2. We store each unique file by content exactly one time, rather than once per snapshot it belongs to. It also allows us to delete a lot of code which handles awkward specifics of tar files. --- src/rust/engine/Cargo.lock | 44 ----- src/rust/engine/fs/Cargo.toml | 1 - src/rust/engine/fs/fs_util/src/main.rs | 2 +- src/rust/engine/fs/src/hash.rs | 41 ---- src/rust/engine/fs/src/lib.rs | 249 +----------------------- src/rust/engine/fs/src/snapshot.rs | 257 +++++++++++++++---------- src/rust/engine/src/context.rs | 36 ++-- src/rust/engine/src/nodes.rs | 39 ++-- src/rust/engine/src/tasks.rs | 10 - 9 files changed, 199 insertions(+), 480 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 271f47c6f208..193701b96248 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -66,11 +66,6 @@ name = "cc" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "cfg-if" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "cmake" version = "0.1.26" @@ -102,16 +97,6 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "filetime" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "fixedbitset" version = "0.1.8" @@ -140,7 +125,6 @@ dependencies = [ "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tar 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -387,11 +371,6 @@ dependencies = [ "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "redox_syscall" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "regex" version = "0.2.2" @@ -430,16 +409,6 @@ dependencies = [ "generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tar" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", - "xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempdir" version = "0.3.5" @@ -498,14 +467,6 @@ name = "winapi-build" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "xattr" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.32 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum aho-corasick 0.6.3 (registry+https://github.com/rust-lang/crates.io-index)" = "500909c4f87a9e52355b26626d890833e9e1d53ac566db76c36faa984b889699" "checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d" @@ -513,13 +474,11 @@ dependencies = [ "checksum block-buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1339a1042f5d9f295737ad4d9a6ab6bf81c84a933dba110b9200cd6d1448b814" "checksum byte-tools 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "560c32574a12a89ecd91f5e742165893f86e3ab98d21f8ea548658eb9eef5f40" "checksum cc 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ef4019bdb99c0c1ddd56c12c2f507c174d729c6915eca6bd9d27c42f3d93b0f4" -"checksum cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d4c819a1287eb618df47cc647173c5c4c66ba19d888a6e50d605672aed3140de" "checksum cmake 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "357c07e7a1fc95732793c1edb5901e1a1f305cfcf63a90eb12dbd22bdb6b789d" "checksum crossbeam 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "0c5ea215664ca264da8a9d9c3be80d2eaf30923c259d03e870388eb927508f97" "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum either 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e311a7479512fbdf858fb54d91ec59f3b9f85bc0113659f46bba12b199d273ce" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -"checksum filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "aa75ec8f7927063335a9583e7fa87b0110bb888cf766dc01b54c0ff70d760c8e" "checksum fixedbitset 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "85cb8fec437468d86dc7c83ca7cfc933341d561873275f22dd5eedefa63a6478" "checksum fnv 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6cc484842f1e2884faf56f529f960cc12ad8c71ce96cc7abba0a067c98fee344" "checksum fuchsia-zircon 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f6c0581a4e363262e52b87f59ee2afe3415361c6ec35e665924eb08afe8ff159" @@ -552,12 +511,10 @@ dependencies = [ "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum protobuf 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "99c7a6694a7896f7c039bc20a6947b83781b019d7d40df77ae069cd2a432e4a7" "checksum rand 0.3.17 (registry+https://github.com/rust-lang/crates.io-index)" = "61efcbcd9fa8d8fbb07c84e34a8af18a1ff177b449689ad38a6e9457ecc7b2ae" -"checksum redox_syscall 0.1.31 (registry+https://github.com/rust-lang/crates.io-index)" = "8dde11f18c108289bef24469638a04dce49da56084f2d50618b226e47eb04509" "checksum regex 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1731164734096285ec2a5ec7fea5248ae2f5485b3feeb0115af4fda2183b2d1b" "checksum regex-syntax 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "ad890a5eef7953f55427c50575c680c42841653abd2b028b68cd223d157f62db" "checksum same-file 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "70a18720d745fb9ca6a041b37cb36d0b21066006b6cff8b5b360142d4b81fb60" "checksum sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7d963c78ce367df26d7ea8b8cc655c651b42e8a1e584e869c1e17dae3ccb116a" -"checksum tar 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "281285b717926caa919ad905ef89c63d75805c7d89437fb873100925a53f2b1b" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum thread_local 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "1697c4b57aeeb7a536b647165a2825faddffb1d3bad386d507709bd51a90bb14" "checksum typenum 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a99dc6780ef33c78780b826cf9d2a78840b72cae9474de4bcaf9051e60ebbd" @@ -567,4 +524,3 @@ dependencies = [ "checksum walkdir 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40b6d201f4f8998a837196b6de9c73e35af14c992cbb92c4ab641d2c2dce52de" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" -"checksum xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "5f04de8a1346489a2f9e9bd8526b73d135ec554227b17568456e86aa35b6f3fc" diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index 6f41a4d3d2df..69d83f3a39d7 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -18,7 +18,6 @@ lmdb = "0.7.2" ordermap = "0.2.8" protobuf = "1.4.1" sha2 = "0.6.0" -tar = "0.4.13" tempdir = "0.3.5" [dev-dependencies] diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index ed8625620c98..feaf173375ac 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -227,7 +227,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> { paths, ) }) - .map(|snapshot| snapshot.digest.unwrap()) + .map(|snapshot| snapshot.digest) .wait()?; Ok(println!("{} {}", digest.0, digest.1)) } diff --git a/src/rust/engine/fs/src/hash.rs b/src/rust/engine/fs/src/hash.rs index 176d84fc3cc0..104485a46071 100644 --- a/src/rust/engine/fs/src/hash.rs +++ b/src/rust/engine/fs/src/hash.rs @@ -1,12 +1,8 @@ // Copyright 2017 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use digest::{Digest, FixedOutput}; -use sha2::Sha256; - use std::error::Error; use std::fmt; -use std::io::{self, Write}; use hex; @@ -66,43 +62,6 @@ impl AsRef<[u8]> for Fingerprint { } } -/// -/// A Write instance that fingerprints all data that passes through it. -/// -pub struct WriterHasher { - hasher: Sha256, - inner: W, -} - -impl WriterHasher { - pub fn new(inner: W) -> WriterHasher { - WriterHasher { - hasher: Sha256::default(), - inner: inner, - } - } - - /// - /// Returns the result of fingerprinting this stream, and Drops the stream. - /// - pub fn finish(self) -> Fingerprint { - Fingerprint::from_bytes_unsafe(&self.hasher.fixed_result()) - } -} - -impl Write for WriterHasher { - fn write(&mut self, buf: &[u8]) -> io::Result { - let written = self.inner.write(buf)?; - // Hash the bytes that were successfully written. - self.hasher.input(&buf[0..written]); - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - self.inner.flush() - } -} - #[cfg(test)] mod fingerprint_tests { use super::Fingerprint; diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index af509d3d14ee..afdbad35b09a 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -25,26 +25,22 @@ extern crate lmdb; extern crate ordermap; extern crate protobuf; extern crate sha2; -extern crate tar; extern crate tempdir; use std::collections::HashSet; use std::os::unix::fs::PermissionsExt; use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::{fmt, fs}; use std::io::{self, Read}; use std::cmp::min; use futures::future::{self, Future}; -use futures_cpupool::CpuFuture; use glob::Pattern; use ignore::gitignore::{Gitignore, GitignoreBuilder}; use ordermap::OrderMap; -use tempdir::TempDir; use boxfuture::{Boxable, BoxFuture}; -use hash::WriterHasher; #[derive(Clone, Debug, Eq, Hash, PartialEq)] @@ -815,249 +811,6 @@ pub fn safe_create_dir_all_ioerror(path: &Path) -> Result<(), io::Error> { } } -fn safe_create_dir_all(path: &Path) -> Result<(), String> { - safe_create_dir_all_ioerror(path).map_err(|e| { - format!("Failed to create dir {:?} due to {:?}", path, e) - }) -} - -fn safe_create_tmpdir_in(base_dir: &Path, prefix: &str) -> Result { - safe_create_dir_all(&base_dir)?; - Ok(TempDir::new_in(&base_dir, prefix).map_err(|e| { - format!("Failed to create tempdir {:?} due to {:?}", base_dir, e) - })?) -} - -/// -/// A facade for the snapshot directory, which lives under the pants workdir. -/// -pub struct Snapshots { - snapshots_dir: PathBuf, - snapshots_generator: Mutex<(TempDir, usize)>, -} - -impl Snapshots { - pub fn new(snapshots_dir: PathBuf) -> Result { - let snapshots_tmpdir = safe_create_tmpdir_in(&snapshots_dir, ".tmp")?; - - Ok(Snapshots { - snapshots_dir: snapshots_dir, - snapshots_generator: Mutex::new((snapshots_tmpdir, 0)), - }) - } - - pub fn snapshot_path(&self) -> &Path { - self.snapshots_dir.as_path() - } - - fn next_temp_path(&self) -> Result { - let mut gen = self.snapshots_generator.lock().unwrap(); - gen.1 += 1; - - // N.B. Sometimes, in e.g. a `./pants clean-all test ...` the snapshot tempdir created at the - // beginning of a run can be removed out from under us by e.g. the `clean-all` task. Here, we - // we double check existence of the `TempDir`'s path when the path is accessed and replace if - // necessary. - if !gen.0.path().exists() { - gen.0 = safe_create_tmpdir_in(&self.snapshots_dir, ".tmp")?; - } - - Ok(gen.0.path().join(format!("{}.tmp", gen.1))) - } - - /// - /// A non-canonical (does not expand symlinks) in-memory form of normalize. Used to collapse - /// parent and cur components, which are legal in symbolic paths in PathStats, but not in - /// Tar files. - /// - fn normalize(path: &Path) -> Result { - let mut res = PathBuf::new(); - for component in path.components() { - match component { - Component::Prefix(..) | - Component::RootDir => return Err(format!("Absolute paths not supported: {:?}", path)), - Component::CurDir => continue, - Component::ParentDir => { - // Pop the previous component. - if !res.pop() { - return Err(format!( - "Globs may not traverse outside the root: {:?}", - path - )); - } else { - continue; - } - } - Component::Normal(p) => res.push(p), - } - } - Ok(res) - } - - /// - /// Create a tar file on the given Write instance containing the given paths, or - /// return an error string. - /// - fn tar_create( - dest: W, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - let mut tar_builder = tar::Builder::new(dest); - tar_builder.mode(tar::HeaderMode::Deterministic); - for path_stat in paths { - // Append the PathStat using the symbolic name and underlying stat. - let append_res = match path_stat { - &PathStat::File { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - let mut input = fs::File::open(relative_to.0.join(stat.path.as_path())) - .map_err(|e| format!("Failed to open {:?}: {:?}", path_stat, e))?; - tar_builder.append_file(normalized, &mut input) - } - &PathStat::Dir { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - tar_builder.append_dir(normalized, relative_to.0.join(stat.0.as_path())) - } - }; - append_res.map_err(|e| { - format!("Failed to tar {:?}: {:?}", path_stat, e) - })?; - } - - // Finish the tar file, returning ownership of the stream to the caller. - Ok(tar_builder.into_inner().map_err(|e| { - format!("Failed to finalize snapshot tar: {:?}", e) - })?) - } - - /// - /// Create a tar file at the given dest Path containing the given paths, while - /// fingerprinting the written stream. - /// - fn tar_create_fingerprinted( - dest: &Path, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - // Wrap buffering around a fingerprinted stream above a File. - let stream = io::BufWriter::new(WriterHasher::new(fs::File::create(dest).map_err(|e| { - format!("Failed to create destination file: {:?}", e) - })?)); - - // Then append the tar to the stream, and retrieve the Fingerprint to flush all writers. - Ok( - Snapshots::tar_create(stream, paths, relative_to)? - .into_inner() - .map_err(|e| { - format!("Failed to flush to {:?}: {:?}", dest, e.error()) - })? - .finish(), - ) - } - - /// - /// Attempts to rename src to dst, and _succeeds_ if dst already exists. This is safe in - /// the case of Snapshots because the destination path is unique to its content. - /// - fn finalize(temp_path: &Path, dest_path: &Path) -> Result<(), String> { - if dest_path.is_file() { - // The Snapshot has already been created. - fs::remove_file(temp_path).unwrap_or(()); - Ok(()) - } else { - let dest_dir = dest_path.parent().expect( - "All snapshot paths must have parent directories.", - ); - safe_create_dir_all(dest_dir)?; - match fs::rename(temp_path, dest_path) { - Ok(_) => Ok(()), - Err(_) if dest_path.is_file() => Ok(()), - Err(e) => Err(format!( - "Failed to finalize snapshot at {:?}: {:?}", - dest_path, - e - )), - } - } - } - - fn path_for(&self, fingerprint: &Fingerprint) -> PathBuf { - Snapshots::path_under_for(self.snapshot_path(), fingerprint) - } - - fn path_under_for(path: &Path, fingerprint: &Fingerprint) -> PathBuf { - let hex = fingerprint.to_hex(); - path.join(&hex[0..2]).join(&hex[2..4]).join( - format!("{}.tar", hex), - ) - } - - /// - /// Creates a Snapshot for the given paths under the given VFS. - /// - pub fn create(&self, fs: &PosixFS, paths: Vec) -> CpuFuture { - let dest_dir = self.snapshot_path().to_owned(); - let root = fs.root.clone(); - let temp_path = self.next_temp_path().expect( - "Couldn't get the next temp path.", - ); - - fs.pool.spawn_fn(move || { - // Write the tar deterministically to a temporary file while fingerprinting. - let fingerprint = Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &root)?; - - // Rename to the final path if it does not already exist. - Snapshots::finalize( - temp_path.as_path(), - Snapshots::path_under_for(&dest_dir, &fingerprint).as_path(), - )?; - - Ok(Snapshot { - fingerprint: fingerprint, - digest: None, - path_stats: paths, - }) - }) - } - - fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result, io::Error> { - let mut archive = fs::File::open(path).map(|f| tar::Archive::new(f))?; - - // Zip the in-memory Snapshot to the on disk representation, validating as we go. - let mut files_content = Vec::new(); - for (entry_res, path_stat) in archive.entries()?.zip(snapshot.path_stats.into_iter()) { - let mut entry = entry_res?; - if entry.header().entry_type() == tar::EntryType::file() { - let path = match path_stat { - PathStat::File { path, .. } => path, - PathStat::Dir { .. } => panic!("Snapshot contents changed after storage."), - }; - let mut content = Vec::new(); - io::Read::read_to_end(&mut entry, &mut content)?; - files_content.push(FileContent { - path: path, - content: content, - }); - } - } - Ok(files_content) - } - - pub fn contents_for( - &self, - fs: &PosixFS, - snapshot: Snapshot, - ) -> CpuFuture, String> { - let archive_path = self.path_for(&snapshot.fingerprint); - fs.pool.spawn_fn(move || { - let snapshot_str = format!("{:?}", snapshot); - Snapshots::contents_for_sync(snapshot, archive_path).map_err(|e| { - format!("Failed to open Snapshot {}: {:?}", snapshot_str, e) - }) - }) - } -} - #[cfg(test)] mod posixfs_test { extern crate tempdir; diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index e6a59f090ec5..a6811f8e040f 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -3,23 +3,21 @@ use bazel_protos; use boxfuture::{Boxable, BoxFuture}; +use futures; use futures::Future; use futures::future::join_all; use itertools::Itertools; -use {Digest, File, PathStat, Store}; +use {Digest, File, FileContent, PathStat, Store}; use hash::Fingerprint; use protobuf; use std::ffi::OsString; use std::fmt; +use std::path::PathBuf; use std::sync::Arc; #[derive(Clone, PartialEq)] pub struct Snapshot { - // TODO: In a follow-up commit, fingerprint will be removed, and digest will be made non-optional. - // They both exist right now as a compatibility shim so that the tar-based code and - // Directory-based code can peacefully co-exist. - pub fingerprint: Fingerprint, - pub digest: Option, + pub digest: Digest, pub path_stats: Vec, } @@ -41,83 +39,127 @@ impl Snapshot { path_stats.sort_by(|a, b| a.path().cmp(b.path())); for (first_component, group) in - &path_stats.iter().cloned().group_by(|s| { - s.path().components().next().unwrap().as_os_str().to_owned() - }) - { - let mut path_group: Vec = group.collect(); - if path_group.len() == 1 && path_group.get(0).unwrap().path().components().count() == 1 { - // Exactly one entry with exactly one component indicates either a file in this directory, - // or an empty directory. - // If the child is a non-empty directory, or a file therein, there must be multiple - // PathStats with that prefix component, and we will handle that in the recursive - // save_directory call. - - match path_group.pop().unwrap() { - PathStat::File { ref stat, .. } => { - let is_executable = stat.is_executable; - file_futures.push( - file_digester - .clone() - .digest(&stat) - .map_err(|e| format!("{:?}", e)) - .and_then(move |digest| { - let mut file_node = bazel_protos::remote_execution::FileNode::new(); - file_node.set_name(osstring_as_utf8(first_component)?); - file_node.set_digest(digest.into()); - file_node.set_is_executable(is_executable); - Ok(file_node) - }) - .to_boxed(), - ); - } - PathStat::Dir { .. } => { - // Because there are no children of this Dir, it must be empty. + &path_stats.iter().cloned().group_by(|s| { + s.path().components().next().unwrap().as_os_str().to_owned() + }) + { + let mut path_group: Vec = group.collect(); + if path_group.len() == 1 && path_group.get(0).unwrap().path().components().count() == 1 { + // Exactly one entry with exactly one component indicates either a file in this directory, + // or an empty directory. + // If the child is a non-empty directory, or a file therein, there must be multiple + // PathStats with that prefix component, and we will handle that in the recursive + // save_directory call. + + match path_group.pop().unwrap() { + PathStat::File { ref stat, .. } => { + let is_executable = stat.is_executable; + file_futures.push( + file_digester + .clone() + .digest(&stat) + .map_err(|e| format!("{:?}", e)) + .and_then(move |digest| { + let mut file_node = bazel_protos::remote_execution::FileNode::new(); + file_node.set_name(osstring_as_utf8(first_component)?); + file_node.set_digest(digest.into()); + file_node.set_is_executable(is_executable); + Ok(file_node) + }) + .to_boxed(), + ); + } + PathStat::Dir { .. } => { + // Because there are no children of this Dir, it must be empty. + dir_futures.push( + store + .record_directory(&bazel_protos::remote_execution::Directory::new()) + .map(move |digest| { + let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new(); + directory_node.set_name(osstring_as_utf8(first_component).unwrap()); + directory_node.set_digest(digest.into()); + directory_node + }) + .to_boxed(), + ); + } + } + } else { dir_futures.push( - store - .record_directory(&bazel_protos::remote_execution::Directory::new()) - .map(move |digest| { - let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new(); - directory_node.set_name(osstring_as_utf8(first_component).unwrap()); - directory_node.set_digest(digest.into()); - directory_node - }) - .to_boxed(), + // TODO: Memoize this in the graph + Snapshot::from_path_stats( + store.clone(), + file_digester.clone(), + paths_of_child_dir(path_group), + ).and_then(move |snapshot| { + let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); + dir_node.set_name(osstring_as_utf8(first_component)?); + dir_node.set_digest(snapshot.digest.into()); + Ok(dir_node) + }) + .to_boxed(), ); } } - } else { - dir_futures.push( - // TODO: Memoize this in the graph - Snapshot::from_path_stats( - store.clone(), - file_digester.clone(), - paths_of_child_dir(path_group), - ).and_then(move |snapshot| { - let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); - dir_node.set_name(osstring_as_utf8(first_component)?); - dir_node.set_digest(snapshot.digest.unwrap().into()); - Ok(dir_node) - }) - .to_boxed(), - ); - } - } join_all(dir_futures) - .join(join_all(file_futures)) - .and_then(move |(dirs, files)| { - let mut directory = bazel_protos::remote_execution::Directory::new(); - directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); - directory.set_files(protobuf::RepeatedField::from_vec(files)); - store.record_directory(&directory).map(move |digest| { - Snapshot { - fingerprint: digest.0, - digest: Some(digest), - path_stats: path_stats, - } + .join(join_all(file_futures)) + .and_then(move |(dirs, files)| { + let mut directory = bazel_protos::remote_execution::Directory::new(); + directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); + directory.set_files(protobuf::RepeatedField::from_vec(files)); + store.record_directory(&directory).map(move |digest| { + Snapshot { + digest: digest, + path_stats: path_stats, + } + }) }) - }) - .to_boxed() + .to_boxed() + } + + // TODO: Rewrite this to execute in parallel + pub fn contents(self, store: Arc) -> BoxFuture, String> { + futures::future::done(Snapshot::contents_for_directory_sync( + self.digest.0, + store, + PathBuf::from(""), + )).to_boxed() + } + + fn contents_for_directory_sync( + fingerprint: Fingerprint, + store: Arc, + path_so_far: PathBuf, + ) -> Result, String> { + let directory = store.load_directory_proto(fingerprint).wait().unwrap().ok_or_else(|| { + format!("Could not find snapshot {}", fingerprint) + })?; + + let mut contents = Vec::new(); + + for file_node in directory.get_files() { + contents.push(FileContent { + path: path_so_far.join(file_node.get_name()), + content: store + .load_file_bytes(Fingerprint::from_hex_string( + file_node.get_digest().get_hash(), + ).unwrap()).wait().unwrap() + .unwrap(), + }); + } + + for dir_node in directory.get_directories() { + contents.extend( + Snapshot::contents_for_directory_sync( + Fingerprint::from_hex_string(dir_node.get_digest().get_hash()).unwrap(), + store.clone(), + path_so_far.join(dir_node.get_name()), + ).unwrap(), + ); + } + + contents.sort_by(|a, b| a.path.cmp(&b.path)); + Ok(contents) } } @@ -125,8 +167,7 @@ impl fmt::Debug for Snapshot { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "Snapshot({}, digest={:?}, entries={})", - self.fingerprint.to_hex(), + "Snapshot(digest={:?}, entries={})", self.digest, self.path_stats.len() ) @@ -203,17 +244,17 @@ mod tests { make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 80)), + digest: Digest( + Fingerprint::from_hex_string( + "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", + ).unwrap(), + 80, + ), path_stats: path_stats, } ); @@ -229,17 +270,17 @@ mod tests { make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 78)), + digest: Digest( + Fingerprint::from_hex_string( + "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", + ).unwrap(), + 78, + ), path_stats: path_stats, } ); @@ -261,22 +302,44 @@ mod tests { let sorted_path_stats = expand_all_sorted(posix_fs); let mut unsorted_path_stats = sorted_path_stats.clone(); unsorted_path_stats.reverse(); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, unsorted_path_stats) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 232)), + digest: Digest( + Fingerprint::from_hex_string( + "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", + ).unwrap(), + 232, + ), path_stats: sorted_path_stats, } ); } + // TODO: Write more tests which involve multiple files, and directories, and stuff. + // I will do this before merging this PR, but want to check the approach before doing so. + #[test] + fn contents_for_one_file() { + let (store, dir, posix_fs, digester) = setup(); + + let file_name = PathBuf::from("roland"); + make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); + + let path_stats = expand_all_sorted(posix_fs); + let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats.clone()) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + // TODO: Write helper for asserting equality of FileContents (and Vecs thereof). + assert_eq!(contents.len(), 1); + assert_eq!(contents.get(0).unwrap().path, file_name); + assert_eq!(contents.get(0).unwrap().content, STR.as_bytes().to_vec()); + } + struct FileSaver(Arc, Arc); impl GetFileDigest for FileSaver { diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 4dfde11de22a..7086fe4d98aa 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -2,14 +2,14 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std; -use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use core::TypeId; -use externs; -use fs::{PosixFS, Snapshots, Store, safe_create_dir_all_ioerror, ResettablePool}; +use boxfuture::BoxFuture; +use core::{Failure, TypeId}; +use fs::{Digest, File, GetFileDigest, PosixFS, Store, safe_create_dir_all_ioerror, ResettablePool}; use graph::{EntryId, Graph}; +use nodes::{DigestFile, GetNode}; use rule_graph::RuleGraph; use tasks::Tasks; use types::Types; @@ -24,15 +24,14 @@ pub struct Core { pub rule_graph: RuleGraph, pub types: Types, pub pool: Arc, - pub snapshots: Snapshots, - pub store: Store, + pub store: Arc, pub vfs: PosixFS, } impl Core { pub fn new( root_subject_types: Vec, - mut tasks: Tasks, + tasks: Tasks, types: Types, build_root: &Path, ignore_patterns: Vec, @@ -57,20 +56,6 @@ impl Core { }, ); - // TODO: Create the Snapshots directory, and then expose it as a singleton to python. - // see: https://github.com/pantsbuild/pants/issues/4397 - let snapshots = Snapshots::new(snapshots_dir).unwrap_or_else(|e| { - panic!("Could not initialize Snapshot directory: {:?}", e); - }); - tasks.singleton_replace( - externs::invoke_unsafe( - &types.construct_snapshots, - &vec![ - externs::store_bytes(snapshots.snapshot_path().as_os_str().as_bytes()), - ], - ), - types.snapshots.clone(), - ); let rule_graph = RuleGraph::new(&tasks, root_subject_types); Core { @@ -79,8 +64,7 @@ impl Core { rule_graph: rule_graph, types: types, pool: pool.clone(), - snapshots: snapshots, - store: store, + store: Arc::new(store), // FIXME: Errors in initialization should definitely be exposed as python // exceptions, rather than as panics. vfs: PosixFS::new(build_root, pool, ignore_patterns).unwrap_or_else(|e| { @@ -109,6 +93,12 @@ impl Context { } } +impl GetFileDigest for Context { + fn digest(&self, file: &File) -> BoxFuture { + self.get(DigestFile(file.clone())) + } +} + pub trait ContextFactory { fn create(&self, entry_id: EntryId) -> Context; } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 1c2170925eff..9fcb61f4695d 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -6,6 +6,7 @@ use std::error::Error; use std::fmt; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::Arc; use futures::future::{self, Future}; use ordermap::OrderMap; @@ -52,7 +53,10 @@ fn was_required(failure: Failure) -> Failure { } } -trait GetNode { +// TODO: This shouldn't be public (is for context to implement GetFileDigest). +// How should this *actually* be consumed, in such a way that it can be delegated to by traits which +// have multiple implementations (either for tests, or for weird tools like fs_util). +pub trait GetNode { fn get(&self, node: N) -> NodeFuture; } @@ -301,13 +305,12 @@ impl Select { vec![ self .get_snapshot(&context) - .and_then(move |snapshot| + .and_then( + move |snapshot| // Request the file contents of the Snapshot, and then store them. - context.core.snapshots.contents_for(&context.core.vfs, snapshot) - .then(move |files_content_res| match files_content_res { - Ok(files_content) => Ok(Snapshot::store_files_content(&context, &files_content)), - Err(e) => Err(throw(&e)), - })) + snapshot.contents(context.core.store.clone()).map_err(|e| throw(&e)) + .map(move |files_content| Snapshot::store_files_content(&context, &files_content)) + ) .to_boxed(), ] } else if let Some(&(_, ref value)) = context.core.tasks.gen_singleton(self.product()) { @@ -777,7 +780,7 @@ impl From for NodeKey { /// A Node that represents reading a file and fingerprinting its contents. /// #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct DigestFile(File); +pub struct DigestFile(pub File); impl Node for DigestFile { type Output = fs::Digest; @@ -883,7 +886,7 @@ pub struct Snapshot { } impl Snapshot { - fn create(context: Context, path_globs: PathGlobs) -> NodeFuture { + fn create(context: Arc, path_globs: PathGlobs) -> NodeFuture { // Recursively expand PathGlobs into PathStats while tracking their dependencies. context .expand(path_globs) @@ -901,10 +904,16 @@ impl Snapshot { // And then create a Snapshot. stats .and_then(move |_| { - context - .core - .snapshots - .create(&context.core.vfs, path_stats) + let context = context.clone(); + let pool = context.core.pool.clone(); + pool + .spawn_fn(move || { + fs::Snapshot::from_path_stats( + context.core.store.clone(), + context.clone(), + path_stats, + ) + }) .map_err(move |e| throw(&format!("Snapshot failed: {}", e))) }) .to_boxed() @@ -936,7 +945,7 @@ impl Snapshot { externs::invoke_unsafe( &context.core.types.construct_snapshot, &vec![ - externs::store_bytes(&item.fingerprint.0), + externs::store_bytes(&(item.digest.0).0), externs::store_list(path_stats.iter().collect(), false), ], ) @@ -1009,7 +1018,7 @@ impl Node for Snapshot { .then(move |path_globs_res| match path_globs_res { Ok(path_globs_val) => { match Self::lift_path_globs(&path_globs_val) { - Ok(pgs) => Snapshot::create(context, pgs), + Ok(pgs) => Snapshot::create(Arc::new(context), pgs), Err(e) => err(throw(&format!("Failed to parse PathGlobs: {}", e))), } } diff --git a/src/rust/engine/src/tasks.rs b/src/rust/engine/src/tasks.rs index d6cd9ccbe5d5..07f5b616badd 100644 --- a/src/rust/engine/src/tasks.rs +++ b/src/rust/engine/src/tasks.rs @@ -86,16 +86,6 @@ impl Tasks { ); } - // TODO: Only exists in order to support the `Snapshots` singleton replacement in `context.rs`: - // Fix by porting isolated processes to rust: - // see: https://github.com/pantsbuild/pants/issues/4397 - pub fn singleton_replace(&mut self, value: Value, product: TypeConstraint) { - self.singletons.insert( - product, - (externs::key_for(&value), value), - ); - } - /// /// The following methods define the Task registration lifecycle. ///