Skip to content
This repository has been archived by the owner on Apr 9, 2020. It is now read-only.

Commit

Permalink
walker: migrate to new futures
Browse files Browse the repository at this point in the history
Summary:
walker: migrate to new futures (0.3.1).

* walk.rs is not fully migrated, due to function call to `bounded_traversal_stream`, which uses old futures.

* `scrub::scrub_objects`, `sizing::compression_benefit` and `validate::validate` returns `BoxFuture` instead of being a regular `async` function, due to limitation of rust issue [#63303](rust-lang/rust#63033).

Reviewed By: farnz

Differential Revision: D19536696

fbshipit-source-id: a0df337b86d7b067a44bf3b18834193d3f63f5dc
  • Loading branch information
wangbj authored and facebook-github-bot committed Jan 29, 2020
1 parent 58f0f6b commit d28b99d
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 264 deletions.
4 changes: 2 additions & 2 deletions mononoke/walker/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::{format_err, Error};
use bookmarks::BookmarkName;
use filenodes::FilenodeInfo;
use filestore::Alias;
use futures_ext::BoxStream;
use futures_preview::stream::BoxStream;
use mercurial_types::{
blobs::HgBlobChangeset, FileBytes, HgChangesetId, HgFileEnvelope, HgFileNodeId, HgManifest,
HgManifestId,
Expand Down Expand Up @@ -204,7 +204,7 @@ impl fmt::Display for EdgeType {

/// File content gets a special two-state content so we can chose when to read the data
pub enum FileContentData {
ContentStream(BoxStream<FileBytes, Error>),
ContentStream(BoxStream<'static, Result<FileBytes, Error>>),
Consumed(usize),
}

Expand Down
14 changes: 6 additions & 8 deletions mononoke/walker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@

#![deny(warnings)]
#![feature(process_exitcode_placeholder)]

#![feature(async_closure)]
use anyhow::Error;
use fbinit::FacebookInit;
use futures::IntoFuture;
use futures_ext::FutureExt;
use futures_preview::compat::Future01CompatExt;
use futures_preview::future::{self, FutureExt};

use cmdlib::{args, helpers::block_execute};

Expand Down Expand Up @@ -41,13 +39,13 @@ fn main(fb: FacebookInit) -> Result<(), Error> {
sizing::compression_benefit(fb, logger.clone(), &matches, sub_m)
}
(setup::VALIDATE, Some(sub_m)) => validate::validate(fb, logger.clone(), &matches, sub_m),
_ => Err(Error::msg("Invalid Arguments, pass --help for usage."))
.into_future()
.boxify(),
_ => {
future::err::<_, Error>(Error::msg("Invalid Arguments, pass --help for usage.")).boxed()
}
};

block_execute(
future.compat(),
future,
fb,
app_name,
&logger,
Expand Down
48 changes: 28 additions & 20 deletions mononoke/walker/src/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::state::StepStats;
use anyhow::Error;
use cloned::cloned;
use context::CoreContext;
use futures::{
future::{self},
Future, Stream,
use futures_preview::{
future::FutureExt,
stream::{Stream, StreamExt, TryStreamExt},
};
use slog::{info, Logger};
use stats::prelude::*;
Expand Down Expand Up @@ -328,50 +328,58 @@ impl<Inner> Clone for ProgressStateMutex<Inner> {
// Print some status update, passing on all data unchanged
pub fn progress_stream<InStream, PS, ND, SS>(
quiet: bool,
progress_state: PS,
progress_state: &PS,
s: InStream,
) -> impl Stream<Item = (Node, Option<ND>, Option<SS>), Error = Error>
) -> impl Stream<Item = Result<(Node, Option<ND>, Option<SS>), Error>>
where
InStream: 'static + Stream<Item = (Node, Option<ND>, Option<SS>), Error = Error> + Send,
InStream: Stream<Item = Result<(Node, Option<ND>, Option<SS>), Error>> + 'static + Send,
PS: 'static + Send + Clone + ProgressRecorder<SS> + ProgressReporter,
{
s.map(move |(n, data_opt, stats_opt)| {
progress_state.record_step(&n, stats_opt.as_ref());
if !quiet {
progress_state.report_throttled();
s.map({
let progress_state = progress_state.clone();
move |r| {
r.and_then(|(n, data_opt, stats_opt)| {
progress_state.record_step(&n, stats_opt.as_ref());
if !quiet {
progress_state.report_throttled();
}
Ok((n, data_opt, stats_opt))
})
}
(n, data_opt, stats_opt)
})
}

// Final status summary, plus count of seen nodes
pub fn report_state<InStream, PS, ND, SS>(
pub async fn report_state<InStream, PS, ND, SS>(
ctx: CoreContext,
progress_state: PS,
s: InStream,
) -> impl Future<Item = (), Error = Error>
) -> Result<(), Error>
where
InStream: Stream<Item = (Node, Option<ND>, Option<SS>), Error = Error>,
InStream: Stream<Item = Result<(Node, Option<ND>, Option<SS>), Error>> + 'static + Send,
PS: 'static + Send + Clone + ProgressReporter,
{
let init_stats: (usize, usize) = (0, 0);
s.fold(init_stats, {
move |(mut seen, mut loaded), (_n, nd, _ss)| {
s.try_fold(init_stats, {
async move |(mut seen, mut loaded), (_n, nd, _ss)| {
let data_count = match nd {
None => 0,
_ => 1,
};
seen += 1;
loaded += data_count;
future::ok::<_, Error>((seen, loaded))
Ok((seen, loaded))
}
})
.map({
cloned!(ctx);
move |stats| {
info!(ctx.logger(), "Final count: {:?}", stats);
progress_state.report_progress();
()
stats.and_then(|stats| {
info!(ctx.logger(), "Final count: {:?}", stats);
progress_state.report_progress();
Ok(())
})
}
})
.await
}
97 changes: 46 additions & 51 deletions mononoke/walker/src/scrub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,43 @@ use crate::tail::walk_exact_tail;

use anyhow::Error;
use clap::ArgMatches;
use cloned::cloned;
use context::CoreContext;
use fbinit::FacebookInit;
use futures::{
future::{self},
Future, Stream,
use futures_preview::{
future::{self, BoxFuture, FutureExt},
stream::{Stream, StreamExt, TryStreamExt},
};
use futures_ext::{try_boxfuture, BoxFuture, FutureExt};
use slog::Logger;

// Force load of leaf data like file contents that graph traversal did not need
pub fn loading_stream<InStream, SS>(
limit_data_fetch: bool,
scheduled_max: usize,
s: InStream,
) -> impl Stream<Item = (Node, Option<NodeData>, Option<SS>), Error = Error>
) -> impl Stream<Item = Result<(Node, Option<NodeData>, Option<SS>), Error>>
where
InStream: Stream<Item = (Node, Option<NodeData>, Option<SS>), Error = Error>,
InStream: Stream<Item = Result<(Node, Option<NodeData>, Option<SS>), Error>> + 'static + Send,
{
s.map(move |(n, nd, ss)| match nd {
Some(NodeData::FileContent(FileContentData::ContentStream(file_bytes_stream)))
if !limit_data_fetch =>
{
file_bytes_stream
.fold(0, |acc, file_bytes| {
future::ok::<_, Error>(acc + file_bytes.size())
})
.map(|num_bytes| {
(
s.then(async move |r| match r {
Err(e) => future::err(e),
Ok((n, nd, ss)) => match nd {
Some(NodeData::FileContent(FileContentData::ContentStream(file_bytes_stream)))
if !limit_data_fetch =>
{
let res = file_bytes_stream
.try_fold(0, async move |acc, file_bytes| Ok(acc + file_bytes.size()))
.await;
match res {
Err(e) => future::err(e),
Ok(bytes) => future::ok((
n,
Some(NodeData::FileContent(FileContentData::Consumed(num_bytes))),
Some(NodeData::FileContent(FileContentData::Consumed(bytes))),
ss,
)
})
.left_future()
}
_ => future::ok((n, nd, ss)).right_future(),
)),
}
}
_ => future::ok((n, nd, ss)),
},
})
.buffer_unordered(scheduled_max)
}
Expand All @@ -61,35 +61,30 @@ pub fn scrub_objects(
logger: Logger,
matches: &ArgMatches<'_>,
sub_m: &ArgMatches<'_>,
) -> BoxFuture<(), Error> {
let (datasources, walk_params) =
try_boxfuture!(setup_common(SCRUB, fb, &logger, matches, sub_m));
let ctx = CoreContext::new_with_logger(fb, logger.clone());
) -> BoxFuture<'static, Result<(), Error>> {
match setup_common(SCRUB, fb, &logger, matches, sub_m) {
Err(e) => future::err::<_, Error>(e).boxed(),
Ok((datasources, walk_params)) => {
let ctx = CoreContext::new_with_logger(fb, logger.clone());
let limit_data_fetch = sub_m.is_present(LIMIT_DATA_FETCH_ARG);

let limit_data_fetch = sub_m.is_present(LIMIT_DATA_FETCH_ARG);
let make_sink = {
let scheduled_max = walk_params.scheduled_max;
let quiet = walk_params.quiet;
let progress_state = walk_params.progress_state.clone();
let ctx = ctx.clone();
async move |walk_output| {
let loading = loading_stream(limit_data_fetch, scheduled_max, walk_output);
let show_progress = progress_stream(quiet, &progress_state, loading);
report_state(ctx, progress_state, show_progress).await
}
};

let make_sink = {
cloned!(
ctx,
walk_params.progress_state,
walk_params.quiet,
walk_params.scheduled_max
);
move |walk_output| {
cloned!(ctx, progress_state);
let loading = loading_stream(limit_data_fetch, scheduled_max, walk_output);
let show_progress = progress_stream(quiet, progress_state.clone(), loading);
let one_fut = report_state(ctx, progress_state.clone(), show_progress);
one_fut
let walk_state = WalkState::new(WalkStateCHashMap::new(
walk_params.include_node_types.clone(),
walk_params.include_edge_types.clone(),
));
walk_exact_tail(ctx, datasources, walk_params, walk_state, make_sink).boxed()
}
};
cloned!(
walk_params.include_node_types,
walk_params.include_edge_types
);
let walk_state = WalkState::new(WalkStateCHashMap::new(
include_node_types,
include_edge_types,
));
walk_exact_tail(ctx, datasources, walk_params, walk_state, make_sink).boxify()
}
}
10 changes: 7 additions & 3 deletions mononoke/walker/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ use bookmarks::BookmarkName;
use clap::{App, Arg, ArgMatches, SubCommand, Values};
use cmdlib::args;
use fbinit::FacebookInit;
use futures_ext::{BoxFuture, FutureExt};
use futures_preview::{
compat::Future01CompatExt,
future::{BoxFuture, FutureExt},
};
use lazy_static::lazy_static;
use metaconfig_types::{Redaction, ScrubAction};
use scuba_ext::{ScubaSampleBuilder, ScubaSampleBuilderExt};
use slog::{info, warn, Logger};
use std::{collections::HashSet, iter::FromIterator, str::FromStr, time::Duration};

pub struct RepoWalkDatasources {
pub blobrepo: BoxFuture<BlobRepo, Error>,
pub blobrepo: BoxFuture<'static, Result<BlobRepo, Error>>,
pub scuba_builder: ScubaSampleBuilder,
}

Expand Down Expand Up @@ -696,7 +699,8 @@ pub fn setup_common(
config.filestore,
readonly_storage,
)
.boxify();
.compat()
.boxed();

let mut progress_node_types = include_node_types.clone();
for e in &walk_roots {
Expand Down
Loading

0 comments on commit d28b99d

Please sign in to comment.