Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Store #5117

Merged
merged 8 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 125 additions & 80 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ extern crate protobuf;
use boxfuture::{Boxable, BoxFuture};
use clap::{App, Arg, SubCommand};
use fs::{Digest, Fingerprint, Store, VFS, ResettablePool};
use futures::future::{Future, join_all};
use futures::future::{self, Future, join_all};
use itertools::Itertools;
use std::error::Error;
use std::ffi::OsString;
Expand Down Expand Up @@ -136,7 +136,8 @@ to this directory.",

fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
let store_dir = top_match.value_of("local-store-path").unwrap();
let store = Arc::new(Store::new(store_dir).map_err(|e| {
let pool = Arc::new(ResettablePool::new("fsutil-pool-".to_string()));
let store = Arc::new(Store::new(store_dir, pool.clone()).map_err(|e| {
format!(
"Failed to open/create store for directory {}: {}",
store_dir,
Expand All @@ -149,29 +150,31 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
match sub_match.subcommand() {
("cat", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
match store.load_file_bytes(&fingerprint)? {
Some(bytes) => {
io::stdout().write(&bytes).unwrap();
Ok(())
}
None => Err(ExitError(
let write_result = store
.load_file_bytes_with(fingerprint, |bytes| io::stdout().write_all(&bytes).unwrap())
.wait()?;
write_result.ok_or_else(|| {
ExitError(
format!("File with fingerprint {} not found", fingerprint),
ExitCode::NotFound,
)),
}
)
})
}
("save", Some(args)) => {
let path = PathBuf::from(args.value_of("path").unwrap());
// Canonicalize path to guarantee that a relative path has a parent.
let posix_fs = make_posix_fs(path
.canonicalize()
.map_err(|e| {
format!("Error canonicalizing path {:?}: {}", path, e.description())
})?
.parent()
.ok_or_else(|| {
format!("File being saved must have parent but {:?} did not", path)
})?);
let posix_fs = make_posix_fs(
path
.canonicalize()
.map_err(|e| {
format!("Error canonicalizing path {:?}: {}", path, e.description())
})?
.parent()
.ok_or_else(|| {
format!("File being saved must have parent but {:?} did not", path)
})?,
pool,
);
let file = posix_fs
.stat(PathBuf::from(path.file_name().unwrap()))
.unwrap();
Expand All @@ -196,12 +199,12 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
("directory", Some(sub_match)) => {
match sub_match.subcommand() {
("materialize", Some(args)) => {
let destination = Path::new(args.value_of("destination").unwrap());
let destination = PathBuf::from(args.value_of("destination").unwrap());
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
Ok(materialize_directory(store, destination, &fingerprint)?)
materialize_directory(store, destination, fingerprint).wait()
}
("save", Some(args)) => {
let posix_fs = Arc::new(make_posix_fs(args.value_of("root").unwrap()));
let posix_fs = Arc::new(make_posix_fs(args.value_of("root").unwrap(), pool));
let digest = posix_fs
.expand(fs::PathGlobs::create(
&args
Expand All @@ -219,11 +222,13 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
("cat-proto", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let proto_bytes = match args.value_of("output-format").unwrap() {
"binary" => store.load_directory_proto_bytes(&fingerprint),
"binary" => store.load_directory_proto_bytes(fingerprint).wait(),
"text" => {
store.load_directory_proto(&fingerprint).map(|maybe_p| {
maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec())
})
store.load_directory_proto(fingerprint).wait().map(
|maybe_p| {
maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec())
},
)
}
format => Err(format!(
"Unexpected value of --output-format arg: {}",
Expand All @@ -249,8 +254,8 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
}
("cat", Some(args)) => {
let fingerprint = Fingerprint::from_hex_string(args.value_of("fingerprint").unwrap())?;
let v = match store.load_file_bytes(&fingerprint)? {
None => store.load_directory_proto_bytes(&fingerprint)?,
let v = match store.load_file_bytes(fingerprint).wait()? {
None => store.load_directory_proto_bytes(fingerprint).wait()?,
some => some,
};
match v {
Expand All @@ -269,12 +274,8 @@ fn execute(top_match: clap::ArgMatches) -> Result<(), ExitError> {
}
}

fn make_posix_fs<P: AsRef<Path>>(root: P) -> fs::PosixFS {
fs::PosixFS::new(
&root,
Arc::new(ResettablePool::new("fsutil-pool-".to_string())),
vec![],
).unwrap()
fn make_posix_fs<P: AsRef<Path>>(root: P, pool: Arc<ResettablePool>) -> fs::PosixFS {
fs::PosixFS::new(&root, pool, vec![]).unwrap()
}

fn save_file(
Expand All @@ -287,7 +288,7 @@ fn save_file(
.map_err(move |err| {
format!("Error reading file {:?}: {}", file, err.description())
})
.and_then(move |content| store.store_file_bytes(&content.content))
.and_then(move |content| store.store_file_bytes(content.content))
.to_boxed()
}

Expand Down Expand Up @@ -332,13 +333,17 @@ fn save_directory(
}
fs::PathStat::Dir { .. } => {
// Because there are no children of this Dir, it must be empty.
let digest = store
.record_directory(&bazel_protos::remote_execution::Directory::new())
.unwrap();
let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new();
directory_node.set_name(osstring_as_utf8(first_component).unwrap());
directory_node.set_digest(digest.into());
dir_futures.push(futures::future::ok(directory_node).to_boxed());
dir_futures.push(
store
.record_directory(&bazel_protos::remote_execution::Directory::new())
.map(move |digest| {
let mut directory_node = bazel_protos::remote_execution::DirectoryNode::new();
directory_node.set_name(osstring_as_utf8(first_component).unwrap());
directory_node.set_digest(digest.into());
directory_node
})
.to_boxed(),
);
}
}
} else {
Expand Down Expand Up @@ -395,52 +400,92 @@ fn paths_of_child_dir(paths: Vec<fs::PathStat>) -> Vec<fs::PathStat> {

fn materialize_directory(
store: Arc<Store>,
destination: &Path,
fingerprint: &Fingerprint,
) -> Result<(), ExitError> {
let directory = store.load_directory_proto(&fingerprint)?.ok_or_else(|| {
ExitError(
format!("Directory with fingerprint {} not found", fingerprint),
ExitCode::NotFound,
)
})?;
make_clean_dir(&destination).map_err(|e| {
destination: PathBuf,
fingerprint: Fingerprint,
) -> BoxFuture<(), ExitError> {
let mkdir = make_clean_dir(&destination).map_err(|e| {
format!(
"Error making directory {:?}: {}",
destination,
e.description()
)
})?;
for file_node in directory.get_files() {
let fingerprint = &Fingerprint::from_hex_string(&file_node.get_digest().get_hash())?;
match store.load_file_bytes(fingerprint)? {
Some(bytes) => {
let path = destination.join(file_node.get_name());
File::create(&path)
.and_then(|mut f| f.write_all(&bytes))
.map_err(|e| {
format!("Error writing file {:?}: {}", path, e.description())
})?;
}
).into()
});
match mkdir {
Ok(()) => {}
Err(e) => return future::err(e).to_boxed(),
};
store
.load_directory_proto(fingerprint)
.map_err(|e| e.into())
.and_then(move |directory_opt| {
directory_opt.ok_or_else(|| {
ExitError(
format!("Directory with fingerprint {} not found", fingerprint),
ExitCode::NotFound,
)
})
})
.and_then(move |directory| {
let file_futures = directory
.get_files()
.iter()
.map(|file_node| {
let store = store.clone();
let path = destination.join(file_node.get_name());
future::result(Fingerprint::from_hex_string(
file_node.get_digest().get_hash(),
)).map_err(|e| e.into())
.and_then(move |fingerprint| {
materialize_file(store, path, fingerprint)
})
})
.collect::<Vec<_>>();
let directory_futures = directory
.get_directories()
.iter()
.map(|directory_node| {
let store = store.clone();
let path = destination.join(directory_node.get_name());
future::result(Fingerprint::from_hex_string(
directory_node.get_digest().get_hash(),
)).map_err(|e| e.into())
.and_then(move |fingerprint| {
materialize_directory(store, path, fingerprint)
})
})
.collect::<Vec<_>>();
join_all(file_futures)
.join(join_all(directory_futures))
.map(|_| ())
})
.to_boxed()
}

fn materialize_file(
store: Arc<Store>,
destination: PathBuf,
fingerprint: Fingerprint,
) -> BoxFuture<(), ExitError> {
store
.load_file_bytes_with(fingerprint, move |bytes| {
File::create(&destination)
.and_then(|mut f| f.write_all(bytes))
.map_err(|e| {
format!("Error writing file {:?}: {}", destination, e.description())
})
})
.map_err(|e| e.into())
.and_then(move |write_result| match write_result {
Some(Ok(())) => Ok(()),
Some(Err(e)) => Err(e.into()),
None => {
return Err(ExitError(
format!(
"File with fingerprint {} not found",
file_node.get_digest().get_hash()
),
Err(ExitError(
format!("File with fingerprint {} not found", fingerprint),
ExitCode::NotFound,
))
}
}
}
for directory_node in directory.get_directories() {
materialize_directory(
store.clone(),
&destination.join(directory_node.get_name()),
&Fingerprint::from_hex_string(directory_node.get_digest().get_hash())?,
)?;
}
Ok(())
})
.to_boxed()
}

fn osstring_as_utf8(path: OsString) -> Result<String, String> {
Expand Down
71 changes: 5 additions & 66 deletions src/rust/engine/fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod snapshot;
pub use snapshot::Snapshot;
mod store;
pub use store::{Digest, Store};
mod pool;
pub use pool::ResettablePool;

extern crate bazel_protos;
extern crate boxfuture;
Expand All @@ -28,13 +30,13 @@ extern crate tempdir;
use std::collections::HashSet;
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::{fmt, fs};
use std::io::{self, Read};
use std::cmp::min;

use futures::future::{self, IntoFuture, Future};
use futures_cpupool::{CpuFuture, CpuPool};
use futures::future::{self, Future};
use futures_cpupool::CpuFuture;
use glob::Pattern;
use ignore::gitignore::{Gitignore, GitignoreBuilder};
use ordermap::OrderMap;
Expand Down Expand Up @@ -358,69 +360,6 @@ fn is_ignored(ignore: &Gitignore, stat: &Stat) -> bool {
}
}

///
/// A wrapper around a CpuPool, to add the ability to drop the pool before forking,
/// and then lazily re-initialize it in a new process.
///
/// When a process forks, the kernel clones only the thread that called fork: all other
/// threads are effectively destroyed. If a CpuPool has live threads during a fork, it
/// will not be able to perform any work or be dropped cleanly (it will hang instead).
/// It's thus necessary to drop the pool before forking, and to re-create it after forking.
///
pub struct ResettablePool {
name_prefix: String,
pool: RwLock<Option<CpuPool>>,
}

impl ResettablePool {
pub fn new(name_prefix: String) -> ResettablePool {
ResettablePool {
name_prefix: name_prefix,
pool: RwLock::new(None),
}
}

///
/// Delegates to `CpuPool::spawn_fn`, and shares its signature.
/// http://alexcrichton.com/futures-rs/futures_cpupool/struct.CpuPool.html#method.spawn_fn
///
pub fn spawn_fn<F, R>(&self, f: F) -> CpuFuture<R::Item, R::Error>
where
F: FnOnce() -> R + Send + 'static,
R: IntoFuture + 'static,
R::Future: Send + 'static,
R::Item: Send + 'static,
R::Error: Send + 'static,
{
{
// The happy path: pool is already initialized.
let pool_opt = self.pool.read().unwrap();
if let Some(ref pool) = *pool_opt {
return pool.spawn_fn(f);
}
}
{
// Initialize the pool, but then release the write lock.
let mut pool_opt = self.pool.write().unwrap();
pool_opt.get_or_insert_with(|| self.new_pool());
}

// Recurse to run the function under the read lock.
self.spawn_fn(f)
}

pub fn reset(&self) {
let mut pool = self.pool.write().unwrap();
*pool = None;
}

fn new_pool(&self) -> CpuPool {
futures_cpupool::Builder::new()
.name_prefix(self.name_prefix.clone())
.create()
}
}

///
/// All Stats consumed or return by this type are relative to the root.
///
Expand Down
Loading