Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract OneOffStoreFileByDigest #5782

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 3 additions & 28 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ extern crate futures;
extern crate hashing;
extern crate protobuf;

use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use clap::{App, Arg, SubCommand};
use fs::{ResettablePool, Snapshot, Store, StoreFileByDigest, VFS};
Expand Down Expand Up @@ -221,10 +220,8 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
.unwrap();
match file {
fs::Stat::File(f) => {
let digest = FileSaver {
store: store.clone(),
posix_fs: Arc::new(posix_fs),
}.store_by_digest(&f)
let digest = fs::OneOffStoreFileByDigest::new(store.clone(), Arc::new(posix_fs))
.store_by_digest(f)
.wait()
.unwrap();
if store_has_remote {
Expand Down Expand Up @@ -280,10 +277,7 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
.and_then(move |paths| {
Snapshot::from_path_stats(
store_copy.clone(),
FileSaver {
store: store_copy,
posix_fs: posix_fs,
},
fs::OneOffStoreFileByDigest::new(store_copy, posix_fs),
paths,
)
})
Expand Down Expand Up @@ -371,22 +365,3 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
fn make_posix_fs<P: AsRef<Path>>(root: P, pool: Arc<ResettablePool>) -> fs::PosixFS {
fs::PosixFS::new(&root, pool, vec![]).unwrap()
}

#[derive(Clone)]
struct FileSaver {
store: Store,
posix_fs: Arc<fs::PosixFS>,
}

impl StoreFileByDigest<String> for FileSaver {
fn store_by_digest(&self, file: &fs::File) -> BoxFuture<Digest, String> {
let file_copy = file.clone();
let store = self.store.clone();
self
.posix_fs
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {}", file_copy, err.description()))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the Apache License, Version 2.0 (see LICENSE).

mod snapshot;
pub use snapshot::{Snapshot, StoreFileByDigest, EMPTY_DIGEST};
pub use snapshot::{OneOffStoreFileByDigest, Snapshot, StoreFileByDigest, EMPTY_DIGEST};
mod store;
pub use store::Store;
mod pool;
Expand Down
73 changes: 41 additions & 32 deletions src/rust/engine/fs/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::{self, join_all};
use hashing::{Digest, Fingerprint};
use indexmap::{self, IndexMap};
use itertools::Itertools;
use {File, FileContent, PathStat, Store};
use {File, FileContent, PathStat, PosixFS, Store};
use protobuf;
use std::collections::HashMap;
use std::ffi::OsString;
Expand All @@ -31,14 +31,6 @@ pub struct Snapshot {
pub path_stats: Vec<PathStat>,
}

// StoreFileByDigest allows a File to be saved to an underlying Store, in such a way that it can be
// looked up by the Digest produced by the store_by_digest method.
// It is a separate trait so that caching implementations can be written which wrap the Store (used
// to store the bytes) and VFS (used to read the files off disk if needed).
pub trait StoreFileByDigest<Error> {
fn store_by_digest(&self, file: &File) -> BoxFuture<Digest, Error>;
}

impl Snapshot {
pub fn empty() -> Snapshot {
Snapshot {
Expand Down Expand Up @@ -94,7 +86,7 @@ impl Snapshot {
file_futures.push(
file_digester
.clone()
.store_by_digest(&stat)
.store_by_digest(stat.clone())
.map_err(|e| format!("{:?}", e))
.and_then(move |digest| {
let mut file_node = bazel_protos::remote_execution::FileNode::new();
Expand Down Expand Up @@ -392,37 +384,71 @@ fn osstring_as_utf8(path: OsString) -> Result<String, String> {
.map_err(|p| format!("{:?}'s file_name is not representable in UTF8", p))
}

// StoreFileByDigest allows a File to be saved to an underlying Store, in such a way that it can be
// looked up by the Digest produced by the store_by_digest method.
// It is a separate trait so that caching implementations can be written which wrap the Store (used
// to store the bytes) and VFS (used to read the files off disk if needed).
pub trait StoreFileByDigest<Error> {
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, Error>;
}

///
/// A StoreFileByDigest which reads with a PosixFS and writes to a Store, with no caching.
///
#[derive(Clone)]
pub struct OneOffStoreFileByDigest {
store: Store,
posix_fs: Arc<PosixFS>,
}

impl OneOffStoreFileByDigest {
pub fn new(store: Store, posix_fs: Arc<PosixFS>) -> OneOffStoreFileByDigest {
OneOffStoreFileByDigest { store, posix_fs }
}
}

impl StoreFileByDigest<String> for OneOffStoreFileByDigest {
fn store_by_digest(&self, file: File) -> BoxFuture<Digest, String> {
let store = self.store.clone();
self
.posix_fs
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {:?}", file, err))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}

#[cfg(test)]
mod tests {
extern crate tempdir;
extern crate testutil;

use boxfuture::{BoxFuture, Boxable};
use bytes::Bytes;
use futures::future::Future;
use hashing::{Digest, Fingerprint};
use tempdir::TempDir;
use self::testutil::make_file;

use super::OneOffStoreFileByDigest;
use super::super::{Dir, File, FileContent, Path, PathGlobs, PathStat, PosixFS, ResettablePool,
Snapshot, Store, StoreFileByDigest, VFS};
Snapshot, Store, VFS};

use std;
use std::error::Error;
use std::path::PathBuf;
use std::sync::Arc;

const AGGRESSIVE: &str = "Aggressive";
const LATIN: &str = "Chaetophractus villosus";
const STR: &str = "European Burmese";

fn setup() -> (Store, TempDir, Arc<PosixFS>, FileSaver) {
fn setup() -> (Store, TempDir, Arc<PosixFS>, OneOffStoreFileByDigest) {
let pool = Arc::new(ResettablePool::new("test-pool-".to_string()));
// TODO: Pass a remote CAS address through.
let store = Store::local_only(TempDir::new("lmdb_store").unwrap(), pool.clone()).unwrap();
let dir = TempDir::new("root").unwrap();
let posix_fs = Arc::new(PosixFS::new(dir.path(), pool, vec![]).unwrap());
let file_saver = FileSaver(store.clone(), posix_fs.clone());
let file_saver = OneOffStoreFileByDigest::new(store.clone(), posix_fs.clone());
(store, dir, posix_fs, file_saver)
}

Expand Down Expand Up @@ -661,9 +687,6 @@ mod tests {
}
}

#[derive(Clone)]
struct FileSaver(Store, Arc<PosixFS>);

fn make_dir_stat(root: &Path, relpath: &Path) -> PathStat {
std::fs::create_dir(root.join(relpath)).unwrap();
PathStat::dir(relpath.to_owned(), Dir(relpath.to_owned()))
Expand All @@ -684,20 +707,6 @@ mod tests {
)
}

impl StoreFileByDigest<String> for FileSaver {
fn store_by_digest(&self, file: &File) -> BoxFuture<Digest, String> {
let file_copy = file.clone();
let store = self.0.clone();
self
.1
.clone()
.read_file(&file)
.map_err(move |err| format!("Error reading file {:?}: {}", file_copy, err.description()))
.and_then(move |content| store.store_file_bytes(content.content, true))
.to_boxed()
}
}

fn expand_all_sorted(posix_fs: Arc<PosixFS>) -> Vec<PathStat> {
let mut v = posix_fs
.expand(PathGlobs::create(&["**".to_owned()], &[]).unwrap())
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl VFS<Failure> for Context {
}

impl StoreFileByDigest<Failure> for Context {
fn store_by_digest(&self, file: &File) -> BoxFuture<hashing::Digest, Failure> {
self.get(DigestFile(file.clone()))
fn store_by_digest(&self, file: File) -> BoxFuture<hashing::Digest, Failure> {
self.get(DigestFile(file))
}
}

Expand Down