Skip to content

Commit

Permalink
Extract OneOffStoreFileByDigest (#5782)
Browse files Browse the repository at this point in the history
There are currently a few implementations of this for standalone
utilities or tests.

I'm about to need another one for local process execution, which is real
production code.
  • Loading branch information
illicitonion authored and Stu Hood committed May 4, 2018
1 parent e96b11f commit cb10df5
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 63 deletions.
31 changes: 3 additions & 28 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ extern crate futures;
extern crate hashing;
extern crate protobuf;

use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use clap::{App, Arg, SubCommand};
use fs::{ResettablePool, Snapshot, Store, StoreFileByDigest, VFS};
Expand Down Expand Up @@ -221,10 +220,8 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
.unwrap();
match file {
fs::Stat::File(f) => {
let digest = FileSaver {
store: store.clone(),
posix_fs: Arc::new(posix_fs),
}.store_by_digest(&f)
let digest = fs::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
.store_by_digest(f)
.wait()
.unwrap();
if store_has_remote {
Expand Down Expand Up @@ -280,10 +277,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
.and_then(move |paths| {
Snapshot::from_path_stats(
store_copy.clone(),
FileSaver {
store: store_copy,
posix_fs: posix_fs,
},
fs::OneOffStoreFileByDigest::new(store_copy, posix_fs),
paths,
)
})
Expand Down Expand Up @@ -371,22 +365,3 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
fn make_posix_fs<P: AsRef<Path>>(root: P, pool: Arc<ResettablePool>) -> fs::PosixFS {
fs::PosixFS::new(&root, pool, vec![]).unwrap()
}

#[derive(Clone)]
struct FileSaver {
store: Store,
posix_fs: Arc<fs::PosixFS>,
}

impl StoreFileByDigest<String> for FileSaver {
fn store_by_digest(&self, file: &fs::File) -> BoxFuture<Digest, String> {
let file_copy = file.clone();
let store = self.store.clone();
self
.posix_fs
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {}", file_copy, err.description()))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).

mod snapshot;
pub use snapshot::{Snapshot, StoreFileByDigest, EMPTY_DIGEST};
pub use snapshot::{OneOffStoreFileByDigest, Snapshot, StoreFileByDigest, EMPTY_DIGEST};
mod store;
pub use store::Store;
mod pool;
Expand Down
73 changes: 41 additions & 32 deletions src/rust/engine/fs/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::{self, join_all};
use hashing::{Digest, Fingerprint};
use indexmap::{self, IndexMap};
use itertools::Itertools;
use {File, FileContent, PathStat, Store};
use {File, FileContent, PathStat, PosixFS, Store};
use protobuf;
use std::collections::HashMap;
use std::ffi::OsString;
Expand All @@ -31,14 +31,6 @@ pub struct Snapshot {
pub path_stats: Vec<PathStat>,
}

// StoreFileByDigest allows a File to be saved to an underlying Store, in such a way that it can be
// looked up by the Digest produced by the store_by_digest method.
// It is a separate trait so that caching implementations can be written which wrap the Store (used
// to store the bytes) and VFS (used to read the files off disk if needed).
pub trait StoreFileByDigest<Error> {
fn store_by_digest(&self, file: &File) -> BoxFuture<Digest, Error>;
}

impl Snapshot {
pub fn empty() -> Snapshot {
Snapshot {
Expand Down Expand Up @@ -94,7 +86,7 @@ impl Snapshot {
file_futures.push(
file_digester
.clone()
.store_by_digest(&stat)
.store_by_digest(stat.clone())
.map_err(|e| format!("{:?}", e))
.and_then(move |digest| {
let mut file_node = bazel_protos::remote_execution::FileNode::new();
Expand Down Expand Up @@ -392,37 +384,71 @@ fn osstring_as_utf8(path: OsString) -> Result<String, String> {
.map_err(|p| format!("{:?}'s file_name is not representable in UTF8", p))
}

// StoreFileByDigest allows a File to be saved to an underlying Store, in such a way that it can be
// looked up by the Digest produced by the store_by_digest method.
// It is a separate trait so that caching implementations can be written which wrap the Store (used
// to store the bytes) and VFS (used to read the files off disk if needed).
pub trait StoreFileByDigest<Error> {
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, Error>;
}

///
/// A StoreFileByDigest which reads with a PosixFS and writes to a Store, with no caching.
///
#[derive(Clone)]
pub struct OneOffStoreFileByDigest {
store: Store,
posix_fs: Arc<PosixFS>,
}

impl OneOffStoreFileByDigest {
pub fn new(store: Store, posix_fs: Arc<PosixFS>) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest { store, posix_fs }
}
}

impl StoreFileByDigest<String> for OneOffStoreFileByDigest {
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, String> {
let store = self.store.clone();
self
.posix_fs
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {:?}", file, err))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}

#[cfg(test)]
mod tests {
extern crate tempdir;
extern crate testutil;

use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use futures::future::Future;
use hashing::{Digest, Fingerprint};
use tempdir::TempDir;
use self::testutil::make_file;

use super::OneOffStoreFileByDigest;
use super::super::{Dir, File, FileContent, Path, PathGlobs, PathStat, PosixFS, ResettablePool,
Snapshot, Store, StoreFileByDigest, VFS};
Snapshot, Store, VFS};

use std;
use std::error::Error;
use std::path::PathBuf;
use std::sync::Arc;

const AGGRESSIVE: &str = "Aggressive";
const LATIN: &str = "Chaetophractus villosus";
const STR: &str = "European Burmese";

fn setup() -> (Store, TempDir, Arc<PosixFS>, FileSaver) {
fn setup() -> (Store, TempDir, Arc<PosixFS>, OneOffStoreFileByDigest) {
let pool = Arc::new(ResettablePool::new("test-pool-".to_string()));
// TODO: Pass a remote CAS address through.
let store = Store::local_only(TempDir::new("lmdb_store").unwrap(), pool.clone()).unwrap();
let dir = TempDir::new("root").unwrap();
let posix_fs = Arc::new(PosixFS::new(dir.path(), pool, vec![]).unwrap());
let file_saver = FileSaver(store.clone(), posix_fs.clone());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone());
(store, dir, posix_fs, file_saver)
}

Expand Down Expand Up @@ -661,9 +687,6 @@ mod tests {
}
}

#[derive(Clone)]
struct FileSaver(Store, Arc<PosixFS>);

fn make_dir_stat(root: &Path, relpath: &Path) -> PathStat {
std::fs::create_dir(root.join(relpath)).unwrap();
PathStat::dir(relpath.to_owned(), Dir(relpath.to_owned()))
Expand All @@ -684,20 +707,6 @@ mod tests {
)
}

impl StoreFileByDigest<String> for FileSaver {
fn store_by_digest(&self, file: &File) -> BoxFuture<Digest, String> {
let file_copy = file.clone();
let store = self.0.clone();
self
.1
.clone()
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {}", file_copy, err.description()))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}

fn expand_all_sorted(posix_fs: Arc<PosixFS>) -> Vec<PathStat> {
let mut v = posix_fs
.expand(PathGlobs::create(&["**".to_owned()], &[]).unwrap())
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl VFS<Failure> for Context {
}

impl StoreFileByDigest<Failure> for Context {
fn store_by_digest(&self, file: &File) -> BoxFuture<hashing::Digest, Failure> {
self.get(DigestFile(file.clone()))
fn store_by_digest(&self, file: File) -> BoxFuture<hashing::Digest, Failure> {
self.get(DigestFile(file))
}
}

Expand Down

0 comments on commit cb10df5

Please sign in to comment.