From c3de15b4e327b06e5dece1de1aace66cba0e2d7f Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Thu, 16 Nov 2017 16:53:32 +0000 Subject: [PATCH] Snapshots are stored in the LMDB store not tar files This has a few advantages: 1. It makes the Bazel remote execution API much easier to implement. 2. We store each unique file by content exactly one time, rather than once per snapshot it belongs to. It also allows us to delete a lot of code which handles awkward specifics of tar files. --- src/rust/engine/Cargo.lock | 39 ---- src/rust/engine/fs/Cargo.toml | 1 - src/rust/engine/fs/fs_util/Cargo.lock | 33 --- src/rust/engine/fs/fs_util/src/main.rs | 15 +- src/rust/engine/fs/src/lib.rs | 251 +-------------------- src/rust/engine/fs/src/snapshot.rs | 292 ++++++++++++++++++++----- src/rust/engine/fs/src/store.rs | 18 +- src/rust/engine/src/context.rs | 37 ++-- src/rust/engine/src/nodes.rs | 90 +++----- src/rust/engine/src/tasks.rs | 10 - 10 files changed, 305 insertions(+), 481 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 8764f38b3bf7..1c3ca37f2784 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -117,16 +117,6 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "filetime" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "fixedbitset" version = "0.1.8" @@ -159,7 +149,6 @@ dependencies = [ "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tar 0.4.14 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -414,11 +403,6 @@ dependencies = [ "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "redox_syscall" -version = "0.1.37" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "regex" version = "0.2.5" @@ -456,17 +440,6 @@ dependencies = [ "generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tar" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", - "xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempdir" version = "0.3.5" @@ -539,14 +512,6 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "xattr" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5" @@ -562,7 +527,6 @@ dependencies = [ "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -"checksum filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "aa75ec8f7927063335a9583e7fa87b0110bb888cf766dc01b54c0ff70d760c8e" "checksum fixedbitset 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "85cb8fec437468d86dc7c83ca7cfc933341d561873275f22dd5eedefa63a6478" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" @@ -595,12 +559,10 @@ dependencies = [ "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" "checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" "checksum rand 0.3.20 (registry+https://github.com/rust-lang/crates.io-index)" = "512870020642bb8c221bf68baa1b2573da814f6ccfe5c9699b1c303047abe9b1" -"checksum redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)" = "0d92eecebad22b767915e4d529f89f28ee96dbbf5a4810d2b844373f136417fd" "checksum regex 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "744554e01ccbd98fff8c457c3b092cd67af62a555a43bfe97ae8a0451f7799fa" "checksum regex-syntax 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "8e931c58b93d86f080c734bfd2bce7dd0079ae2331235818133c8be7f422e20e" "checksum same-file 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cfb6eded0b06a0b512c8ddbcf04089138c9b4362c2f696f3c3d76039d68f3637" "checksum sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7d963c78ce367df26d7ea8b8cc655c651b42e8a1e584e869c1e17dae3ccb116a" -"checksum tar 0.4.14 (registry+https://github.com/rust-lang/crates.io-index)" = "1605d3388ceb50252952ffebab4b5dc43017ead7e4481b175961c283bb951195" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum thread_local 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "279ef31c19ededf577bfd12dfae728040a21f635b06a24cd670ff510edd38963" "checksum typenum 1.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "13a99dc6780ef33c78780b826cf9d2a78840b72cae9474de4bcaf9051e60ebbd" @@ -612,4 +574,3 @@ dependencies = [ "checksum winapi 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b09fb3b6f248ea4cd42c9a65113a847d612e17505d6ebd1f7357ad68a8bf8693" "checksum winapi-i686-pc-windows-gnu 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ec6667f60c23eca65c561e63a13d81b44234c2e38a6b6c959025ee907ec614cc" "checksum winapi-x86_64-pc-windows-gnu 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "98f12c52b2630cd05d2c3ffd8e008f7f48252c042b4871c72aed9dc733b96668" -"checksum xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "5f04de8a1346489a2f9e9bd8526b73d135ec554227b17568456e86aa35b6f3fc" diff --git a/src/rust/engine/fs/Cargo.toml b/src/rust/engine/fs/Cargo.toml index bb398e15cd4a..2bbcc8f3a643 100644 --- a/src/rust/engine/fs/Cargo.toml +++ b/src/rust/engine/fs/Cargo.toml @@ -22,7 +22,6 @@ lmdb = "0.7.2" ordermap = "0.2.8" protobuf = "1.4.1" sha2 = "0.6.0" -tar = "0.4.13" tempdir = "0.3.5" [dev-dependencies] diff --git a/src/rust/engine/fs/fs_util/Cargo.lock b/src/rust/engine/fs/fs_util/Cargo.lock index 2498dad4ac90..903b2f7e5691 100644 --- a/src/rust/engine/fs/fs_util/Cargo.lock +++ b/src/rust/engine/fs/fs_util/Cargo.lock @@ -130,16 +130,6 @@ name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "filetime" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "fnv" version = "1.0.6" @@ -167,7 +157,6 @@ dependencies = [ "ordermap 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tar 0.4.14 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -465,17 +454,6 @@ name = "strsim" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "tar" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", - "redox_syscall 0.1.37 (registry+https://github.com/rust-lang/crates.io-index)", - "xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempdir" version = "0.3.5" @@ -576,14 +554,6 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "xattr" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "libc 0.2.35 (registry+https://github.com/rust-lang/crates.io-index)", -] - [metadata] "checksum aho-corasick 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6531d44de723825aa81398a6415283229725a00fa30713812ab9323faa82fc4" "checksum ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6b3568b48b7cefa6b8ce125f9bb4989e52fbcc29ebea88df04cc7c5f12f70455" @@ -602,7 +572,6 @@ dependencies = [ "checksum digest 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e5b29bf156f3f4b3c4f610a25ff69370616ae6e0657d416de22645483e72af0a" "checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -"checksum filetime 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "aa75ec8f7927063335a9583e7fa87b0110bb888cf766dc01b54c0ff70d760c8e" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" @@ -639,7 +608,6 @@ dependencies = [ "checksum same-file 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f3257af0472da4b8b8902102a57bafffd9991f0f43772a8af6153d597e6e4ae2" "checksum sha2 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7d963c78ce367df26d7ea8b8cc655c651b42e8a1e584e869c1e17dae3ccb116a" "checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694" -"checksum tar 0.4.14 (registry+https://github.com/rust-lang/crates.io-index)" = "1605d3388ceb50252952ffebab4b5dc43017ead7e4481b175961c283bb951195" "checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6" "checksum termion 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "689a3bdfaab439fd92bc87df5c4c78417d3cbe537487274e9b0b2dce76e92096" "checksum textwrap 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c0b59b6b4b44d867f1370ef1bd91bfb262bf07bf0ae65c202ea2fbc16153b693" @@ -655,4 +623,3 @@ dependencies = [ "checksum winapi 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b09fb3b6f248ea4cd42c9a65113a847d612e17505d6ebd1f7357ad68a8bf8693" "checksum winapi-i686-pc-windows-gnu 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ec6667f60c23eca65c561e63a13d81b44234c2e38a6b6c959025ee907ec614cc" "checksum winapi-x86_64-pc-windows-gnu 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "98f12c52b2630cd05d2c3ffd8e008f7f48252c042b4871c72aed9dc733b96668" -"checksum xattr 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)" = "5f04de8a1346489a2f9e9bd8526b73d135ec554227b17568456e86aa35b6f3fc" diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index db2eb29f858d..4ea7ef36f09d 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -8,7 +8,7 @@ extern crate protobuf; use boxfuture::{Boxable, BoxFuture}; use clap::{App, Arg, SubCommand}; -use fs::{GetFileDigest, ResettablePool, Snapshot, Store, VFS}; +use fs::{ResettablePool, Snapshot, Store, StoreFileByDigest, VFS}; use futures::future::{self, Future, join_all}; use hashing::{Digest, Fingerprint}; use protobuf::Message; @@ -206,7 +206,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> { let digest = FileSaver { store: store, posix_fs: Arc::new(posix_fs), - }.digest(&f) + }.store_by_digest(&f) .wait() .unwrap(); Ok(println!("{} {}", digest.0, digest.1)) @@ -246,14 +246,14 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> { .and_then(move |paths| { Snapshot::from_path_stats( store.clone(), - Arc::new(FileSaver { + FileSaver { store: store.clone(), posix_fs: posix_fs, - }), + }, paths, ) }) - .map(|snapshot| snapshot.digest.unwrap()) + .map(|snapshot| snapshot.digest) .wait()?; Ok(println!("{} {}", digest.0, digest.1)) } @@ -331,13 +331,14 @@ fn make_posix_fs>(root: P, pool: Arc) -> fs::Posi fs::PosixFS::new(&root, pool, vec![]).unwrap() } +#[derive(Clone)] struct FileSaver { store: Arc, posix_fs: Arc, } -impl GetFileDigest for FileSaver { - fn digest(&self, file: &fs::File) -> BoxFuture { +impl StoreFileByDigest for FileSaver { + fn store_by_digest(&self, file: &fs::File) -> BoxFuture { let file_copy = file.clone(); let store = self.store.clone(); self diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 5f2e360005a7..5828157d0fa3 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -2,7 +2,7 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). mod snapshot; -pub use snapshot::{GetFileDigest, Snapshot}; +pub use snapshot::{Snapshot, StoreFileByDigest}; mod store; pub use store::Store; mod pool; @@ -29,24 +29,20 @@ extern crate mock; extern crate ordermap; extern crate protobuf; extern crate sha2; -extern crate tar; extern crate tempdir; use std::collections::HashSet; use std::os::unix::fs::PermissionsExt; use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::{fmt, fs}; use std::io::{self, Read}; use std::cmp::min; use futures::future::{self, Future}; -use futures_cpupool::CpuFuture; use glob::Pattern; -use hashing::{Fingerprint, WriterHasher}; use ignore::gitignore::{Gitignore, GitignoreBuilder}; use ordermap::OrderMap; -use tempdir::TempDir; use boxfuture::{Boxable, BoxFuture}; @@ -819,249 +815,6 @@ pub fn safe_create_dir_all_ioerror(path: &Path) -> Result<(), io::Error> { } } -fn safe_create_dir_all(path: &Path) -> Result<(), String> { - safe_create_dir_all_ioerror(path).map_err(|e| { - format!("Failed to create dir {:?} due to {:?}", path, e) - }) -} - -fn safe_create_tmpdir_in(base_dir: &Path, prefix: &str) -> Result { - safe_create_dir_all(&base_dir)?; - Ok(TempDir::new_in(&base_dir, prefix).map_err(|e| { - format!("Failed to create tempdir {:?} due to {:?}", base_dir, e) - })?) -} - -/// -/// A facade for the snapshot directory, which lives under the pants workdir. -/// -pub struct Snapshots { - snapshots_dir: PathBuf, - snapshots_generator: Mutex<(TempDir, usize)>, -} - -impl Snapshots { - pub fn new(snapshots_dir: PathBuf) -> Result { - let snapshots_tmpdir = safe_create_tmpdir_in(&snapshots_dir, ".tmp")?; - - Ok(Snapshots { - snapshots_dir: snapshots_dir, - snapshots_generator: Mutex::new((snapshots_tmpdir, 0)), - }) - } - - pub fn snapshot_path(&self) -> &Path { - self.snapshots_dir.as_path() - } - - fn next_temp_path(&self) -> Result { - let mut gen = self.snapshots_generator.lock().unwrap(); - gen.1 += 1; - - // N.B. Sometimes, in e.g. a `./pants clean-all test ...` the snapshot tempdir created at the - // beginning of a run can be removed out from under us by e.g. the `clean-all` task. Here, we - // we double check existence of the `TempDir`'s path when the path is accessed and replace if - // necessary. - if !gen.0.path().exists() { - gen.0 = safe_create_tmpdir_in(&self.snapshots_dir, ".tmp")?; - } - - Ok(gen.0.path().join(format!("{}.tmp", gen.1))) - } - - /// - /// A non-canonical (does not expand symlinks) in-memory form of normalize. Used to collapse - /// parent and cur components, which are legal in symbolic paths in PathStats, but not in - /// Tar files. - /// - fn normalize(path: &Path) -> Result { - let mut res = PathBuf::new(); - for component in path.components() { - match component { - Component::Prefix(..) | - Component::RootDir => return Err(format!("Absolute paths not supported: {:?}", path)), - Component::CurDir => continue, - Component::ParentDir => { - // Pop the previous component. - if !res.pop() { - return Err(format!( - "Globs may not traverse outside the root: {:?}", - path - )); - } else { - continue; - } - } - Component::Normal(p) => res.push(p), - } - } - Ok(res) - } - - /// - /// Create a tar file on the given Write instance containing the given paths, or - /// return an error string. - /// - fn tar_create( - dest: W, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - let mut tar_builder = tar::Builder::new(dest); - tar_builder.mode(tar::HeaderMode::Deterministic); - for path_stat in paths { - // Append the PathStat using the symbolic name and underlying stat. - let append_res = match path_stat { - &PathStat::File { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - let mut input = fs::File::open(relative_to.0.join(stat.path.as_path())) - .map_err(|e| format!("Failed to open {:?}: {:?}", path_stat, e))?; - tar_builder.append_file(normalized, &mut input) - } - &PathStat::Dir { ref path, ref stat } => { - let normalized = Snapshots::normalize(path)?; - tar_builder.append_dir(normalized, relative_to.0.join(stat.0.as_path())) - } - }; - append_res.map_err(|e| { - format!("Failed to tar {:?}: {:?}", path_stat, e) - })?; - } - - // Finish the tar file, returning ownership of the stream to the caller. - Ok(tar_builder.into_inner().map_err(|e| { - format!("Failed to finalize snapshot tar: {:?}", e) - })?) - } - - /// - /// Create a tar file at the given dest Path containing the given paths, while - /// fingerprinting the written stream. - /// - fn tar_create_fingerprinted( - dest: &Path, - paths: &Vec, - relative_to: &Dir, - ) -> Result { - // Wrap buffering around a fingerprinted stream above a File. - let stream = io::BufWriter::new(WriterHasher::new(fs::File::create(dest).map_err(|e| { - format!("Failed to create destination file: {:?}", e) - })?)); - - // Then append the tar to the stream, and retrieve the Fingerprint to flush all writers. - Ok( - Snapshots::tar_create(stream, paths, relative_to)? - .into_inner() - .map_err(|e| { - format!("Failed to flush to {:?}: {:?}", dest, e.error()) - })? - .finish(), - ) - } - - /// - /// Attempts to rename src to dst, and _succeeds_ if dst already exists. This is safe in - /// the case of Snapshots because the destination path is unique to its content. - /// - fn finalize(temp_path: &Path, dest_path: &Path) -> Result<(), String> { - if dest_path.is_file() { - // The Snapshot has already been created. - fs::remove_file(temp_path).unwrap_or(()); - Ok(()) - } else { - let dest_dir = dest_path.parent().expect( - "All snapshot paths must have parent directories.", - ); - safe_create_dir_all(dest_dir)?; - match fs::rename(temp_path, dest_path) { - Ok(_) => Ok(()), - Err(_) if dest_path.is_file() => Ok(()), - Err(e) => Err(format!( - "Failed to finalize snapshot at {:?}: {:?}", - dest_path, - e - )), - } - } - } - - fn path_for(&self, fingerprint: &Fingerprint) -> PathBuf { - Snapshots::path_under_for(self.snapshot_path(), fingerprint) - } - - fn path_under_for(path: &Path, fingerprint: &Fingerprint) -> PathBuf { - let hex = fingerprint.to_hex(); - path.join(&hex[0..2]).join(&hex[2..4]).join( - format!("{}.tar", hex), - ) - } - - /// - /// Creates a Snapshot for the given paths under the given VFS. - /// - pub fn create(&self, fs: &PosixFS, paths: Vec) -> CpuFuture { - let dest_dir = self.snapshot_path().to_owned(); - let root = fs.root.clone(); - let temp_path = self.next_temp_path().expect( - "Couldn't get the next temp path.", - ); - - fs.pool.spawn_fn(move || { - // Write the tar deterministically to a temporary file while fingerprinting. - let fingerprint = Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &root)?; - - // Rename to the final path if it does not already exist. - Snapshots::finalize( - temp_path.as_path(), - Snapshots::path_under_for(&dest_dir, &fingerprint).as_path(), - )?; - - Ok(Snapshot { - fingerprint: fingerprint, - digest: None, - path_stats: paths, - }) - }) - } - - fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result, io::Error> { - let mut archive = fs::File::open(path).map(|f| tar::Archive::new(f))?; - - // Zip the in-memory Snapshot to the on disk representation, validating as we go. - let mut files_content = Vec::new(); - for (entry_res, path_stat) in archive.entries()?.zip(snapshot.path_stats.into_iter()) { - let mut entry = entry_res?; - if entry.header().entry_type() == tar::EntryType::file() { - let path = match path_stat { - PathStat::File { path, .. } => path, - PathStat::Dir { .. } => panic!("Snapshot contents changed after storage."), - }; - let mut content = Vec::new(); - io::Read::read_to_end(&mut entry, &mut content)?; - files_content.push(FileContent { - path: path, - content: content, - }); - } - } - Ok(files_content) - } - - pub fn contents_for( - &self, - fs: &PosixFS, - snapshot: Snapshot, - ) -> CpuFuture, String> { - let archive_path = self.path_for(&snapshot.fingerprint); - fs.pool.spawn_fn(move || { - let snapshot_str = format!("{:?}", snapshot); - Snapshots::contents_for_sync(snapshot, archive_path).map_err(|e| { - format!("Failed to open Snapshot {}: {:?}", snapshot_str, e) - }) - }) - } -} - #[cfg(test)] mod posixfs_test { extern crate tempdir; diff --git a/src/rust/engine/fs/src/snapshot.rs b/src/rust/engine/fs/src/snapshot.rs index 11e3eeff05ae..352425b4569a 100644 --- a/src/rust/engine/fs/src/snapshot.rs +++ b/src/rust/engine/fs/src/snapshot.rs @@ -7,39 +7,57 @@ use futures::Future; use futures::future::join_all; use hashing::{Digest, Fingerprint}; use itertools::Itertools; -use {File, PathStat, Store}; +use {File, FileContent, PathStat, Store}; use protobuf; +use std::collections::HashMap; use std::ffi::OsString; use std::fmt; -use std::sync::Arc; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; #[derive(Clone, PartialEq)] pub struct Snapshot { - // TODO: In a follow-up commit, fingerprint will be removed, and digest will be made non-optional. - // They both exist right now as a compatibility shim so that the tar-based code and - // Directory-based code can peacefully co-exist. - pub fingerprint: Fingerprint, - pub digest: Option, + pub digest: Digest, pub path_stats: Vec, } -pub trait GetFileDigest { - fn digest(&self, file: &File) -> BoxFuture; +// StoreFileByDigest allows a File to be saved to an underlying Store, in such a way that it can be +// looked up by the Digest produced by the store_by_digest method. +// It is a separate trait so that caching implementations can be written which wrap the Store (used +// to store the bytes) and VFS (used to read the files off disk if needed). +pub trait StoreFileByDigest { + fn store_by_digest(&self, file: &File) -> BoxFuture; } impl Snapshot { - pub fn from_path_stats + Sized, Error: fmt::Debug + 'static + Send>( + pub fn from_path_stats< + SFBD: StoreFileByDigest + Sized + Clone, + Error: fmt::Debug + 'static + Send, + >( store: Arc, - file_digester: Arc, - mut path_stats: Vec, + file_digester: SFBD, + path_stats: Vec, ) -> BoxFuture { + let mut sorted_path_stats = path_stats.clone(); + sorted_path_stats.sort_by(|a, b| a.path().cmp(b.path())); + Snapshot::ingest_directory_from_sorted_path_stats(store, file_digester, sorted_path_stats) + .map(|digest| Snapshot { digest, path_stats }) + .to_boxed() + } + + fn ingest_directory_from_sorted_path_stats< + SFBD: StoreFileByDigest + Sized + Clone, + Error: fmt::Debug + 'static + Send, + >( + store: Arc, + file_digester: SFBD, + path_stats: Vec, + ) -> BoxFuture { let mut file_futures: Vec> = Vec::new(); let mut dir_futures: Vec> = Vec::new(); - path_stats.sort_by(|a, b| a.path().cmp(b.path())); - for (first_component, group) in &path_stats.iter().cloned().group_by(|s| { s.path().components().next().unwrap().as_os_str().to_owned() @@ -59,7 +77,7 @@ impl Snapshot { file_futures.push( file_digester .clone() - .digest(&stat) + .store_by_digest(&stat) .map_err(|e| format!("{:?}", e)) .and_then(move |digest| { let mut file_node = bazel_protos::remote_execution::FileNode::new(); @@ -89,14 +107,14 @@ impl Snapshot { } else { dir_futures.push( // TODO: Memoize this in the graph - Snapshot::from_path_stats( + Snapshot::ingest_directory_from_sorted_path_stats( store.clone(), file_digester.clone(), paths_of_child_dir(path_group), - ).and_then(move |snapshot| { + ).and_then(move |digest| { let mut dir_node = bazel_protos::remote_execution::DirectoryNode::new(); dir_node.set_name(osstring_as_utf8(first_component)?); - dir_node.set_digest((&snapshot.digest.unwrap()).into()); + dir_node.set_digest((&digest).into()); Ok(dir_node) }) .to_boxed(), @@ -109,14 +127,102 @@ impl Snapshot { let mut directory = bazel_protos::remote_execution::Directory::new(); directory.set_directories(protobuf::RepeatedField::from_vec(dirs)); directory.set_files(protobuf::RepeatedField::from_vec(files)); - store.record_directory(&directory, true).map(move |digest| { - Snapshot { - fingerprint: digest.0, - digest: Some(digest), - path_stats: path_stats, + store.record_directory(&directory, true) + }) + .to_boxed() + } + + pub fn contents(self, store: Arc) -> BoxFuture, String> { + let contents = Arc::new(Mutex::new(HashMap::new())); + let path_stats = self.path_stats; + Snapshot::contents_for_directory_helper( + self.digest.0, + store, + PathBuf::from(""), + contents.clone(), + ).map(move |_| { + let mut contents = contents.lock().unwrap(); + let mut vec = Vec::new(); + for path in path_stats.iter().filter_map(|path_stat| match path_stat { + &PathStat::File { ref path, .. } => Some(path.to_path_buf()), + &PathStat::Dir { .. } => None, + }) + { + match contents.remove(&path) { + Some(content) => vec.push(FileContent { path, content }), + None => { + panic!(format!( + "PathStat for {:?} was present in path_stats but missing from Snapshot contents", + path + )); } + } + } + vec + }) + .to_boxed() + } + + // Assumes that all fingerprints it encounters are valid. + // Returns an unsorted Vec. + fn contents_for_directory_helper( + fingerprint: Fingerprint, + store: Arc, + path_so_far: PathBuf, + contents_wrapped: Arc>>>, + ) -> BoxFuture<(), String> { + store + .load_directory(fingerprint) + .and_then(move |maybe_dir| { + maybe_dir.ok_or_else(|| { + format!("Could not find directory with fingerprint {}", fingerprint) }) }) + .and_then(move |dir| { + let contents_wrapped_copy = contents_wrapped.clone(); + let path_so_far_copy = path_so_far.clone(); + let store_copy = store.clone(); + let file_futures = join_all( + dir + .get_files() + .iter() + .map(move |file_node| { + let path = path_so_far_copy.join(file_node.get_name()); + let contents_wrapped_copy = contents_wrapped_copy.clone(); + store_copy + .load_file_bytes_with( + Fingerprint::from_hex_string(file_node.get_digest().get_hash()).unwrap(), + |b| b.to_vec(), + ) + .and_then(move |maybe_bytes| { + maybe_bytes.ok_or_else(|| format!("Couldn't find file contents for {:?}", path)) + .map(move |bytes| { + let mut contents = contents_wrapped_copy.lock().unwrap(); + contents.insert(path, bytes); + }) + }) + }) + .collect::>(), + ); + let contents_wrapped_copy2 = contents_wrapped.clone(); + let store_copy = store.clone(); + let dir_futures = join_all( + dir + .get_directories() + .iter() + .map(move |dir_node| { + Snapshot::contents_for_directory_helper( + Fingerprint::from_hex_string(dir_node.get_digest().get_hash()).unwrap(), + store_copy.clone(), + path_so_far.join(dir_node.get_name()), + contents_wrapped_copy2.clone(), + ) + }) + .collect::>(), + ); + file_futures.join(dir_futures) + }) + .map(|(_, _)| ()) .to_boxed() } } @@ -125,8 +231,7 @@ impl fmt::Debug for Snapshot { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, - "Snapshot({}, digest={:?}, entries={})", - self.fingerprint.to_hex(), + "Snapshot(digest={:?}, entries={})", self.digest, self.path_stats.len() ) @@ -175,17 +280,19 @@ mod tests { use tempdir::TempDir; use self::testutil::make_file; - use super::super::{File, GetFileDigest, PathGlobs, PathStat, PosixFS, ResettablePool, Snapshot, - Store, VFS}; + use super::super::{File, FileContent, PathGlobs, PathStat, PosixFS, ResettablePool, Snapshot, + Store, StoreFileByDigest, VFS}; use std; use std::error::Error; - use std::path::PathBuf; + use std::path::{Path, PathBuf}; use std::sync::Arc; + const AGGRESSIVE: &str = "Aggressive"; + const LATIN: &str = "Chaetophractus villosus"; const STR: &str = "European Burmese"; - fn setup() -> (Arc, TempDir, Arc, Arc) { + fn setup() -> (Arc, TempDir, Arc, FileSaver) { let pool = Arc::new(ResettablePool::new("test-pool-".to_string())); // TODO: Pass a remote CAS address through. let store = Arc::new( @@ -193,7 +300,7 @@ mod tests { ); let dir = TempDir::new("root").unwrap(); let posix_fs = Arc::new(PosixFS::new(dir.path(), pool, vec![]).unwrap()); - let digester = Arc::new(FileSaver(store.clone(), posix_fs.clone())); + let digester = FileSaver(store.clone(), posix_fs.clone()); (store, dir, posix_fs, digester) } @@ -205,17 +312,17 @@ mod tests { make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 80)), + digest: Digest( + Fingerprint::from_hex_string( + "63949aa823baf765eff07b946050d76ec0033144c785a94d3ebd82baa931cd16", + ).unwrap(), + 80, + ), path_stats: path_stats, } ); @@ -231,17 +338,17 @@ mod tests { make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); let path_stats = expand_all_sorted(posix_fs); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", - ).unwrap(); assert_eq!( Snapshot::from_path_stats(store, digester, path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 78)), + digest: Digest( + Fingerprint::from_hex_string( + "8b1a7ea04eaa2527b35683edac088bc826117b53b7ec6601740b55e20bce3deb", + ).unwrap(), + 78, + ), path_stats: path_stats, } ); @@ -263,26 +370,95 @@ mod tests { let sorted_path_stats = expand_all_sorted(posix_fs); let mut unsorted_path_stats = sorted_path_stats.clone(); unsorted_path_stats.reverse(); - // TODO: Inline when only used once - let fingerprint = Fingerprint::from_hex_string( - "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", - ).unwrap(); assert_eq!( - Snapshot::from_path_stats(store, digester, unsorted_path_stats) + Snapshot::from_path_stats(store, digester, unsorted_path_stats.clone()) .wait() .unwrap(), Snapshot { - fingerprint: fingerprint, - digest: Some(Digest(fingerprint, 232)), - path_stats: sorted_path_stats, + digest: Digest( + Fingerprint::from_hex_string( + "fbff703bdaac62accf2ea5083bcfed89292073bf710ef9ad14d9298c637e777b", + ).unwrap(), + 232, + ), + path_stats: unsorted_path_stats, } ); } + #[test] + fn contents_for_one_file() { + let (store, dir, posix_fs, digester) = setup(); + + let file_name = PathBuf::from("roland"); + make_file(&dir.path().join(&file_name), STR.as_bytes(), 0o600); + + let contents = Snapshot::from_path_stats(store.clone(), digester, expand_all_sorted(posix_fs)) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + assert_snapshot_contents(contents, vec![(&file_name, STR)]); + } + + #[test] + fn contents_for_files_in_multiple_directories() { + let (store, dir, posix_fs, digester) = setup(); + + let armadillos = PathBuf::from("armadillos"); + let armadillos_abs = dir.path().join(&armadillos); + std::fs::create_dir_all(&armadillos_abs).unwrap(); + let amy = armadillos.join("amy"); + make_file(&dir.path().join(&amy), LATIN.as_bytes(), 0o600); + let rolex = armadillos.join("rolex"); + make_file(&dir.path().join(&rolex), AGGRESSIVE.as_bytes(), 0o600); + + let cats = PathBuf::from("cats"); + let cats_abs = dir.path().join(&cats); + std::fs::create_dir_all(&cats_abs).unwrap(); + let roland = cats.join("roland"); + make_file(&dir.path().join(&roland), STR.as_bytes(), 0o600); + + let dogs = PathBuf::from("dogs"); + let dogs_abs = dir.path().join(&dogs); + std::fs::create_dir_all(&dogs_abs).unwrap(); + + let path_stats_sorted = expand_all_sorted(posix_fs); + let mut path_stats_reversed = path_stats_sorted.clone(); + path_stats_reversed.reverse(); + { + let contents = + Snapshot::from_path_stats(store.clone(), digester.clone(), path_stats_reversed) + .wait() + .unwrap() + .contents(store.clone()) + .wait() + .unwrap(); + assert_snapshot_contents( + contents, + vec![(&roland, STR), (&rolex, AGGRESSIVE), (&amy, LATIN)], + ); + } + { + let contents = Snapshot::from_path_stats(store.clone(), digester, path_stats_sorted) + .wait() + .unwrap() + .contents(store) + .wait() + .unwrap(); + assert_snapshot_contents( + contents, + vec![(&amy, LATIN), (&rolex, AGGRESSIVE), (&roland, STR)], + ); + } + } + + #[derive(Clone)] struct FileSaver(Arc, Arc); - impl GetFileDigest for FileSaver { - fn digest(&self, file: &File) -> BoxFuture { + impl StoreFileByDigest for FileSaver { + fn store_by_digest(&self, file: &File) -> BoxFuture { let file_copy = file.clone(); let store = self.0.clone(); self @@ -305,4 +481,16 @@ mod tests { v.sort_by(|a, b| a.path().cmp(b.path())); v } + + fn assert_snapshot_contents(contents: Vec, expected: Vec<(&Path, &str)>) { + let expected_with_array: Vec<_> = expected + .into_iter() + .map(|(path, s)| (path.to_path_buf(), s.as_bytes().to_vec())) + .collect(); + let got: Vec<_> = contents + .into_iter() + .map(|file_content| (file_content.path, file_content.content)) + .collect(); + assert_eq!(expected_with_array, got); + } } diff --git a/src/rust/engine/fs/src/store.rs b/src/rust/engine/fs/src/store.rs index 4dbf27aa0bb6..07c9d287848b 100644 --- a/src/rust/engine/fs/src/store.rs +++ b/src/rust/engine/fs/src/store.rs @@ -257,8 +257,8 @@ mod local { use digest::{Digest as DigestTrait, FixedOutput}; use futures::Future; use hashing::{Digest, Fingerprint}; - use lmdb::{self, Cursor, Database, DatabaseFlags, Environment, NO_OVERWRITE, RwTransaction, - Transaction, WriteFlags}; + use lmdb::{self, Cursor, Database, DatabaseFlags, Environment, NO_OVERWRITE, NO_TLS, + RwTransaction, Transaction, WriteFlags}; use lmdb::Error::{KeyExist, MapFull, NotFound}; use sha2::Sha256; use std::collections::BinaryHeap; @@ -288,8 +288,20 @@ mod local { impl ByteStore { pub fn new>(path: P, pool: Arc) -> Result { - // 3 DBs; one for file contents, one for directories, one for leases. let env = Environment::new() + // Without this flag, each time a read transaction is started, it eats into our transaction + // limit (default: 126) until that thread dies. + // + // This flag makes transactions are removed from that limit when they are dropped, rather + // than when their thread dies. This is important, because we perform reads from a thread + // pool, so our threads never die. Without this flag, all read requests will fail after the + // first 126. + // + // The only down-side is that you need to make sure that any individual OS thread must not + // try to perform multiple write transactions concurrently. Fortunately, this property + // holds for us. + .set_flags(NO_TLS) + // 3 DBs; one for file contents, one for directories, one for leases. .set_max_dbs(3) .set_map_size(MAX_LOCAL_STORE_SIZE_BYTES) .open(path.as_ref()) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index e4b36c03d34d..1b9e44869c0e 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -2,16 +2,17 @@ // Licensed under the Apache License, Version 2.0 (see LICENSE). use std; -use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::sync::Arc; -use core::TypeId; +use boxfuture::BoxFuture; +use core::{Failure, TypeId}; use externs; -use fs::{PosixFS, Snapshots, Store, safe_create_dir_all_ioerror, ResettablePool}; +use fs::{File, PosixFS, Store, safe_create_dir_all_ioerror, ResettablePool, StoreFileByDigest}; use graph::{EntryId, Graph}; use handles::maybe_drain_handles; -use nodes::{Node, NodeFuture}; +use hashing::Digest; +use nodes::{DigestFile, Node, NodeFuture}; use rule_graph::RuleGraph; use tasks::Tasks; use types::Types; @@ -26,15 +27,14 @@ pub struct Core { pub rule_graph: RuleGraph, pub types: Types, pub pool: Arc, - pub snapshots: Snapshots, - pub store: Store, + pub store: Arc, pub vfs: PosixFS, } impl Core { pub fn new( root_subject_types: Vec, - mut tasks: Tasks, + tasks: Tasks, types: Types, build_root: &Path, ignore_patterns: Vec, @@ -59,20 +59,6 @@ impl Core { }, ); - // TODO: Create the Snapshots directory, and then expose it as a singleton to python. - // see: https://github.com/pantsbuild/pants/issues/4397 - let snapshots = Snapshots::new(snapshots_dir).unwrap_or_else(|e| { - panic!("Could not initialize Snapshot directory: {:?}", e); - }); - tasks.singleton_replace( - externs::invoke_unsafe( - &types.construct_snapshots, - &vec![ - externs::store_bytes(snapshots.snapshot_path().as_os_str().as_bytes()), - ], - ), - types.snapshots.clone(), - ); let rule_graph = RuleGraph::new(&tasks, root_subject_types); Core { @@ -81,8 +67,7 @@ impl Core { rule_graph: rule_graph, types: types, pool: pool.clone(), - snapshots: snapshots, - store: store, + store: Arc::new(store), // FIXME: Errors in initialization should definitely be exposed as python // exceptions, rather than as panics. vfs: PosixFS::new(build_root, pool, ignore_patterns).unwrap_or_else(|e| { @@ -120,6 +105,12 @@ impl Context { } } +impl StoreFileByDigest for Context { + fn store_by_digest(&self, file: &File) -> BoxFuture { + self.get(DigestFile(file.clone())) + } +} + pub trait ContextFactory { fn create(&self, entry_id: EntryId) -> Context; } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 2bcd9dde97ea..724ab6d4a450 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -43,7 +43,7 @@ fn was_required(failure: Failure) -> Failure { } } -trait GetNode { +pub trait GetNode { fn get(&self, node: N) -> NodeFuture; } @@ -282,13 +282,12 @@ impl Select { vec![ self .get_snapshot(&context) - .and_then(move |snapshot| + .and_then( + move |snapshot| // Request the file contents of the Snapshot, and then store them. - context.core.snapshots.contents_for(&context.core.vfs, snapshot) - .then(move |files_content_res| match files_content_res { - Ok(files_content) => Ok(Snapshot::store_files_content(&context, &files_content)), - Err(e) => Err(throw(&e)), - })) + snapshot.contents(context.core.store.clone()).map_err(|e| throw(&e)) + .map(move |files_content| Snapshot::store_files_content(&context, &files_content)) + ) .to_boxed(), ] } else if self.product() == &context.core.types.process_result { @@ -814,7 +813,7 @@ impl From for NodeKey { /// A Node that represents reading a file and fingerprinting its contents. /// #[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct DigestFile(File); +pub struct DigestFile(pub File); impl Node for DigestFile { type Output = hashing::Digest; @@ -849,31 +848,6 @@ impl From for NodeKey { } } -/// -/// A Node that represents consuming the stat for some path. -/// -/// NB: Because the `Scandir` operation gets the stats for a parent directory in a single syscall, -/// this operation results in no data, and is simply a placeholder for `Snapshot` Nodes to use to -/// declare a dependency on the existence/content of a particular path. This makes them more error -/// prone, unfortunately. -/// -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct Stat(PathBuf); - -impl Node for Stat { - type Output = (); - - fn run(self, _: Context) -> NodeFuture<()> { - future::ok(()).to_boxed() - } -} - -impl From for NodeKey { - fn from(n: Stat) -> Self { - NodeKey::Stat(n) - } -} - /// /// A Node that represents executing a directory listing that returns a Stat per directory /// entry (generally in one syscall). No symlinks are expanded. @@ -923,33 +897,17 @@ pub struct Snapshot { impl Snapshot { fn create(context: Context, path_globs: PathGlobs) -> NodeFuture { - // Recursively expand PathGlobs into PathStats while tracking their dependencies. + // Recursively expand PathGlobs into PathStats. + // We rely on Context::expand tracking dependencies for scandirs, + // and fs::Snapshot::from_path_stats tracking dependencies for file digests. context .expand(path_globs) - .then(move |path_stats_res| match path_stats_res { - Ok(path_stats) => { - // Declare dependencies on the relevant Stats, and then create a Snapshot. - let stats = future::join_all( - path_stats - .iter() - .map( - |path_stat| context.get(Stat(path_stat.path().to_owned())), // for recording only - ) - .collect::>(), - ); - // And then create a Snapshot. - stats - .and_then(move |_| { - context - .core - .snapshots - .create(&context.core.vfs, path_stats) - .map_err(move |e| throw(&format!("Snapshot failed: {}", e))) - }) - .to_boxed() - } - Err(e) => err(throw(&format!("PathGlobs expansion failed: {:?}", e))), + .map_err(|e| format!("PlatGlobs expansion failed: {:?}", e)) + .and_then(move |path_stats| { + fs::Snapshot::from_path_stats(context.core.store.clone(), context.clone(), path_stats) + .map_err(move |e| format!("Snapshot failed: {}", e)) }) + .map_err(|e| throw(&e)) .to_boxed() } @@ -975,7 +933,7 @@ impl Snapshot { externs::invoke_unsafe( &context.core.types.construct_snapshot, &vec![ - externs::store_bytes(&item.fingerprint.0), + externs::store_bytes(&(item.digest.0).0), externs::store_list(path_stats.iter().collect(), false), ], ) @@ -1138,7 +1096,6 @@ pub enum NodeKey { ExecuteProcess(ExecuteProcess), ReadLink(ReadLink), Scandir(Scandir), - Stat(Stat), Select(Select), Snapshot(Snapshot), Task(Task), @@ -1157,7 +1114,6 @@ impl NodeKey { &NodeKey::ExecuteProcess(ref s) => format!("ExecuteProcess({:?}", s.0), &NodeKey::ReadLink(ref s) => format!("ReadLink({:?})", s.0), &NodeKey::Scandir(ref s) => format!("Scandir({:?})", s.0), - &NodeKey::Stat(ref s) => format!("Stat({:?})", s.0), &NodeKey::Select(ref s) => { format!( "Select({}, {})", @@ -1189,7 +1145,6 @@ impl NodeKey { &NodeKey::DigestFile(..) => "DigestFile".to_string(), &NodeKey::ReadLink(..) => "LinkDest".to_string(), &NodeKey::Scandir(..) => "DirectoryListing".to_string(), - &NodeKey::Stat(..) => "Stat".to_string(), } } @@ -1198,10 +1153,18 @@ impl NodeKey { /// pub fn fs_subject(&self) -> Option<&Path> { match self { + &NodeKey::DigestFile(ref s) => Some(s.0.path.as_path()), &NodeKey::ReadLink(ref s) => Some((s.0).0.as_path()), &NodeKey::Scandir(ref s) => Some((s.0).0.as_path()), - &NodeKey::Stat(ref s) => Some(s.0.as_path()), - _ => None, + + // Not FS operations: + // Explicitly listed so that if people add new NodeKeys they need to consider whether their + // NodeKey represents an FS operation, and accordingly whether they need to add it to the + // above list or the below list. + &NodeKey::ExecuteProcess { .. } | + &NodeKey::Select { .. } | + &NodeKey::Snapshot { .. } | + &NodeKey::Task { .. } => None, } } } @@ -1214,7 +1177,6 @@ impl Node for NodeKey { NodeKey::DigestFile(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::ExecuteProcess(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::ReadLink(n) => n.run(context).map(|v| v.into()).to_boxed(), - NodeKey::Stat(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::Scandir(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::Select(n) => n.run(context).map(|v| v.into()).to_boxed(), NodeKey::Snapshot(n) => n.run(context).map(|v| v.into()).to_boxed(), diff --git a/src/rust/engine/src/tasks.rs b/src/rust/engine/src/tasks.rs index d6cd9ccbe5d5..07f5b616badd 100644 --- a/src/rust/engine/src/tasks.rs +++ b/src/rust/engine/src/tasks.rs @@ -86,16 +86,6 @@ impl Tasks { ); } - // TODO: Only exists in order to support the `Snapshots` singleton replacement in `context.rs`: - // Fix by porting isolated processes to rust: - // see: https://github.com/pantsbuild/pants/issues/4397 - pub fn singleton_replace(&mut self, value: Value, product: TypeConstraint) { - self.singletons.insert( - product, - (externs::key_for(&value), value), - ); - } - /// /// The following methods define the Task registration lifecycle. ///