diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 18277f432b86..c14866547f81 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -168,6 +168,7 @@ Tasks* tasks_create(void); void tasks_task_begin(Tasks*, Function, TypeConstraint); +void tasks_add_get(Tasks*, TypeConstraint, TypeId); void tasks_add_select(Tasks*, TypeConstraint); void tasks_add_select_variant(Tasks*, TypeConstraint, Buffer); void tasks_add_select_dependencies(Tasks*, TypeConstraint, TypeConstraint, Buffer, TypeIdBuffer); diff --git a/src/python/pants/engine/rules.py b/src/python/pants/engine/rules.py index aa0cc4d3a123..597cb227beac 100644 --- a/src/python/pants/engine/rules.py +++ b/src/python/pants/engine/rules.py @@ -5,6 +5,8 @@ from __future__ import (absolute_import, division, generators, nested_scopes, print_function, unicode_literals, with_statement) +import ast +import inspect import logging from abc import abstractproperty from collections import OrderedDict @@ -12,7 +14,7 @@ from twitter.common.collections import OrderedSet from pants.engine.addressable import Exactly -from pants.engine.selectors import type_or_constraint_repr +from pants.engine.selectors import Get, type_or_constraint_repr from pants.util.meta import AbstractClass from pants.util.objects import datatype @@ -20,6 +22,30 @@ logger = logging.getLogger(__name__) +class _RuleVisitor(ast.NodeVisitor): + def __init__(self): + super(_RuleVisitor, self).__init__() + self.gets = [] + + def visit_Call(self, node): + if not isinstance(node.func, ast.Name) or node.func.id != Get.__name__: + return + + # TODO: Validation. + if len(node.args) == 2: + product_type, subject_constructor = node.args + if not isinstance(product_type, ast.Name) or not isinstance(subject_constructor, ast.Call): + raise Exception('TODO: Implement validation of Get shapes.') + self.gets.append((product_type.id, subject_constructor.func.id)) + elif len(node.args) == 3: + product_type, subject_type, _ = node.args + if not isinstance(product_type, ast.Name) or not isinstance(subject_type, ast.Name): + raise Exception('TODO: Implement validation of Get shapes.') + self.gets.append((product_type.id, subject_type.id)) + else: + raise Exception('Invalid {}: {}'.format(Get.__name__, node.args)) + + def rule(output_type, input_selectors): """A @decorator that declares that a particular static function may be used as a TaskRule. @@ -29,7 +55,20 @@ def rule(output_type, input_selectors): to the @decorated function. """ def wrapper(func): - func._rule = TaskRule(output_type, input_selectors, func) + caller_frame = inspect.stack()[1][0] + module_ast = ast.parse(inspect.getsource(func)) + + def resolve(name): + return caller_frame.f_globals.get(name) or caller_frame.f_builtins.get(name) + + gets = [] + for node in ast.iter_child_nodes(module_ast): + if isinstance(node, ast.FunctionDef) and node.name == func.__name__: + rule_visitor = _RuleVisitor() + rule_visitor.visit(node) + gets.extend(Get(resolve(p), resolve(s)) for p, s in rule_visitor.gets) + + func._rule = TaskRule(output_type, input_selectors, func, input_gets=gets) return func return wrapper @@ -50,10 +89,13 @@ def input_selectors(self): """Collection of input selectors.""" -class TaskRule(datatype('TaskRule', ['output_constraint', 'input_selectors', 'func']), Rule): - """A Rule that runs a task function when all of its input selectors are satisfied.""" +class TaskRule(datatype('TaskRule', ['output_constraint', 'input_selectors', 'input_gets', 'func']), Rule): + """A Rule that runs a task function when all of its input selectors are satisfied. + + TODO: Make input_gets non-optional when more/all rules are using them. + """ - def __new__(cls, output_type, input_selectors, func): + def __new__(cls, output_type, input_selectors, func, input_gets=None): # Validate result type. if isinstance(output_type, Exactly): constraint = output_type @@ -68,8 +110,14 @@ def __new__(cls, output_type, input_selectors, func): raise TypeError("Expected a list of Selectors for rule `{}`, got: {}".format( func.__name__, type(input_selectors))) + # Validate gets. + input_gets = [] if input_gets is None else input_gets + if not isinstance(input_gets, list): + raise TypeError("Expected a list of Gets for rule `{}`, got: {}".format( + func.__name__, type(input_gets))) + # Create. - return super(TaskRule, cls).__new__(cls, constraint, tuple(input_selectors), func) + return super(TaskRule, cls).__new__(cls, constraint, tuple(input_selectors), tuple(input_gets), func) def __str__(self): return '({}, {!r}, {})'.format(type_or_constraint_repr(self.output_constraint), diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 5091597adc85..dcf9a0f4903b 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -191,10 +191,9 @@ def _register_singleton(self, output_constraint, rule): def _register_task(self, output_constraint, rule): """Register the given TaskRule with the native scheduler.""" - input_selects = rule.input_selectors func = rule.func self._native.lib.tasks_task_begin(self._tasks, Function(self._to_key(func)), output_constraint) - for selector in input_selects: + for selector in rule.input_selectors: selector_type = type(selector) product_constraint = self._to_constraint(selector.product) if selector_type is Select: @@ -218,6 +217,10 @@ def _register_task(self, output_constraint, rule): self._to_constraint(selector.input_product)) else: raise ValueError('Unrecognized Selector type: {}'.format(selector)) + for get in rule.input_gets: + self._native.lib.tasks_add_get(self._tasks, + self._to_constraint(get.product), + TypeId(self._to_id(get.subject))) self._native.lib.tasks_task_end(self._tasks) def visualize_graph_to_file(self, execution_request, filename): diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index e87bea657c86..f8d71dd3971e 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -328,6 +328,13 @@ pub extern "C" fn tasks_task_begin( }) } +#[no_mangle] +pub extern "C" fn tasks_add_get(tasks_ptr: *mut Tasks, product: TypeConstraint, subject: TypeId) { + with_tasks(tasks_ptr, |tasks| { + tasks.add_get(product, subject); + }) +} + #[no_mangle] pub extern "C" fn tasks_add_select(tasks_ptr: *mut Tasks, product: TypeConstraint) { with_tasks(tasks_ptr, |tasks| { diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index f23990b83ae8..58860edeed6f 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -9,6 +9,7 @@ use std::collections::BTreeMap; use std::fmt; use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::Arc; use futures::future::{self, Future}; use tempdir::TempDir; @@ -124,6 +125,21 @@ impl Select { } } + pub fn new_with_entries( + product: TypeConstraint, + subject: Key, + variants: Variants, + entries: rule_graph::Entries, + ) -> Select { + let selector = selectors::Select::without_variant(product); + Select { + selector: selector, + subject: subject, + variants: variants, + entries: entries, + } + } + pub fn new_with_selector( selector: selectors::Select, subject: Key, @@ -356,7 +372,7 @@ impl Select { product: self.product().clone(), variants: self.variants.clone(), task: task, - entry: entry.clone(), + entry: Arc::new(entry.clone()), }) }) .collect::>>() @@ -883,7 +899,7 @@ pub struct Task { product: TypeConstraint, variants: Variants, task: tasks::Task, - entry: rule_graph::Entry, + entry: Arc, } impl Task { @@ -909,30 +925,25 @@ impl Task { } } - /// - /// TODO: Merge with `get` once all edges are statically declared. - /// - fn gen_get(context: &Context, gets: Vec) -> NodeFuture> { + fn gen_get( + context: &Context, + entry: Arc, + gets: Vec, + ) -> NodeFuture> { let get_futures = gets .into_iter() .map(|get| { - let externs::Get(constraint, subject) = get; - let selector = selectors::Select::without_variant(constraint.clone()); - let edges_res = context + let externs::Get(product, subject) = get; + let entries = context .core .rule_graph - .find_root_edges(*subject.type_id(), selectors::Selector::Select(selector)) - .ok_or_else(|| { - throw(&format!( - "No rules were available to compute {} for {}", - externs::key_to_str(&constraint.0), - externs::key_to_str(&subject) - )) - }); - let context = context.clone(); - future::result(edges_res).and_then(move |edges| { - Select::new(constraint, subject, Default::default(), &edges).run(context.clone()) - }) + .edges_for_inner(&entry) + .expect("edges for task exist.") + .entries_for(&rule_graph::SelectKey::JustGet(selectors::Get { + product: product, + subject: subject.type_id().clone(), + })); + Select::new_with_entries(product, subject, Default::default(), entries).run(context.clone()) }) .collect::>(); future::join_all(get_futures).to_boxed() @@ -942,15 +953,20 @@ impl Task { /// Given a python generator Value, loop to request the generator's dependencies until /// it completes with a result Value. /// - fn generate(context: Context, generator: Value) -> NodeFuture { + fn generate( + context: Context, + entry: Arc, + generator: Value, + ) -> NodeFuture { future::loop_fn(externs::eval("None").unwrap(), move |input| { let context = context.clone(); + let entry = entry.clone(); future::result(externs::generator_send(&generator, &input)).and_then(move |response| { match response { - externs::GeneratorResponse::Get(get) => Self::gen_get(&context, vec![get]) + externs::GeneratorResponse::Get(get) => Self::gen_get(&context, entry, vec![get]) .map(|vs| future::Loop::Continue(vs.into_iter().next().unwrap())) .to_boxed() as BoxFuture<_, _>, - externs::GeneratorResponse::GetMulti(gets) => Self::gen_get(&context, gets) + externs::GeneratorResponse::GetMulti(gets) => Self::gen_get(&context, entry, gets) .map(|vs| future::Loop::Continue(externs::store_list(vs.iter().collect(), false))) .to_boxed() as BoxFuture<_, _>, externs::GeneratorResponse::Break(val) => { @@ -976,6 +992,7 @@ impl Node for Task { ); let func = self.task.func.clone(); + let entry = self.entry.clone(); deps .then(move |deps_result| match deps_result { Ok(deps) => externs::call(&externs::val_for(&func.0), &deps), @@ -984,7 +1001,7 @@ impl Node for Task { .then(move |task_result| match task_result { Ok(val) => { if externs::satisfied_by(&context.core.types.generator, &val) { - Self::generate(context, val) + Self::generate(context, entry, val) } else { ok(val) } diff --git a/src/rust/engine/src/rule_graph.rs b/src/rust/engine/src/rule_graph.rs index bebc2fb3612d..818a21b05025 100644 --- a/src/rust/engine/src/rule_graph.rs +++ b/src/rust/engine/src/rule_graph.rs @@ -8,7 +8,7 @@ use std::io; use core::{Function, Key, TypeConstraint, TypeId, Value, ANY_TYPE}; use externs; -use selectors::{Select, SelectDependencies, Selector}; +use selectors::{Get, Select, SelectDependencies, Selector}; use tasks::{Task, Tasks}; #[derive(Eq, Hash, PartialEq, Clone, Debug)] @@ -49,7 +49,11 @@ impl Entry { #[derive(Eq, Hash, PartialEq, Clone, Debug)] pub struct RootEntry { subject_type: TypeId, + // TODO: A RootEntry can only have one declared `Selector`, and no declared `Get`s, but these + // are shaped as Vecs to temporarily minimize the re-shuffling in `_construct_graph`. Remove in + // a future commit. clause: Vec, + gets: Vec, } impl From for Entry { @@ -146,6 +150,8 @@ impl Entry { /// #[derive(Eq, Hash, PartialEq, Clone, Debug)] pub enum SelectKey { + // A Get for a particular product/subject pair. + JustGet(Get), // A bare select with no projection. JustSelect(Select), // The initial select of a multi-select operator, eg SelectDependencies. @@ -323,10 +329,18 @@ impl<'t> GraphMaker<'t> { let mut was_unfulfillable = false; match entry { Entry::InnerEntry(InnerEntry { - rule: Task { ref clause, .. }, + rule: Task { + ref clause, + ref gets, + .. + }, .. }) - | Entry::Root(RootEntry { ref clause, .. }) => { + | Entry::Root(RootEntry { + ref clause, + ref gets, + .. + }) => { for selector in clause { match selector { &Selector::Select(ref select) => { @@ -503,6 +517,39 @@ impl<'t> GraphMaker<'t> { } } } + for get in gets { + match get { + &Get { + ref subject, + ref product, + } => { + let rules_or_literals_for_selector = rhs(&self.tasks, subject.clone(), product); + if rules_or_literals_for_selector.is_empty() { + mark_unfulfillable( + &mut unfulfillable_rules, + &entry, + subject.clone(), + format!( + "no rule was available to compute {} for {}", + type_constraint_str(product.clone()), + type_str(subject.clone()) + ), + ); + was_unfulfillable = true; + continue; + } + add_rules_to_graph( + &mut rules_to_traverse, + &mut rule_dependency_edges, + &mut unfulfillable_rules, + &mut root_rule_dependency_edges, + &entry, + SelectKey::JustGet(get.clone()), + rules_or_literals_for_selector, + ); + } + } + } } _ => panic!( "Entry type that cannot dependencies was not filtered out {:?}", @@ -611,6 +658,7 @@ impl<'t> GraphMaker<'t> { variant_key: None, }), ], + gets: vec![], }) } } @@ -638,6 +686,7 @@ pub struct RuleGraph { unfulfillable_rules: UnfulfillableRuleMap, } +// TODO: Take by reference. fn type_constraint_str(type_constraint: TypeConstraint) -> String { let str_val = externs::call_method(&to_val(type_constraint), "graph_str", &[]) .expect("string from calling repr"); @@ -761,6 +810,7 @@ impl RuleGraph { let root = RootEntry { subject_type: subject_type, clause: vec![selector], + gets: vec![], }; self.root_dependencies.get(&root).map(|e| e.clone()) } @@ -852,7 +902,6 @@ impl RuleGraph { }) } - // TODO instead of this, make own fmt thing that accepts externs pub fn visualize(&self, f: &mut io::Write) -> io::Result<()> { if self.root_dependencies.is_empty() && self.rule_dependency_edges.is_empty() { write!(f, "digraph {{\n")?; @@ -1029,13 +1078,17 @@ fn update_edges_based_on_unfulfillable_entry( } fn rhs_for_select(tasks: &Tasks, subject_type: TypeId, select: &Select) -> Entries { - if externs::satisfied_by_type(&select.product, &subject_type) { + rhs(tasks, subject_type, &select.product) +} + +fn rhs(tasks: &Tasks, subject_type: TypeId, product_type: &TypeConstraint) -> Entries { + if externs::satisfied_by_type(product_type, &subject_type) { // NB a matching subject is always picked first vec![Entry::new_subject_is_product(subject_type)] - } else if let Some(&(ref key, _)) = tasks.gen_singleton(&select.product) { - vec![Entry::new_singleton(key.clone(), select.product.clone())] + } else if let Some(&(ref key, _)) = tasks.gen_singleton(product_type) { + vec![Entry::new_singleton(key.clone(), product_type.clone())] } else { - match tasks.gen_tasks(&select.product) { + match tasks.gen_tasks(product_type) { Some(ref matching_tasks) => matching_tasks .iter() .map(|t| Entry::new_inner(subject_type, t)) diff --git a/src/rust/engine/src/selectors.rs b/src/rust/engine/src/selectors.rs index ff7159cadf2a..f47cfdf658da 100644 --- a/src/rust/engine/src/selectors.rs +++ b/src/rust/engine/src/selectors.rs @@ -3,6 +3,12 @@ use core::{Field, TypeConstraint, TypeId}; +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct Get { + pub product: TypeConstraint, + pub subject: TypeId, +} + #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Select { pub product: TypeConstraint, diff --git a/src/rust/engine/src/tasks.rs b/src/rust/engine/src/tasks.rs index 625b2929c1ce..c65043e36ea1 100644 --- a/src/rust/engine/src/tasks.rs +++ b/src/rust/engine/src/tasks.rs @@ -5,12 +5,13 @@ use std::collections::{HashMap, HashSet}; use core::{Field, Function, Key, TypeConstraint, TypeId, Value, FNV}; use externs; -use selectors::{Select, SelectDependencies, SelectProjection, Selector}; +use selectors::{Get, Select, SelectDependencies, SelectProjection, Selector}; #[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct Task { pub product: TypeConstraint, pub clause: Vec, + pub gets: Vec, pub func: Function, pub cacheable: bool, } @@ -95,10 +96,23 @@ impl Tasks { cacheable: true, product: product, clause: Vec::new(), + gets: Vec::new(), func: func, }); } + pub fn add_get(&mut self, product: TypeConstraint, subject: TypeId) { + self + .preparing + .as_mut() + .expect("Must `begin()` a task creation before adding gets!") + .gets + .push(Get { + product: product, + subject: subject, + }); + } + pub fn add_select(&mut self, product: TypeConstraint, variant_key: Option) { self.clause(Selector::Select(Select { product: product, @@ -163,6 +177,7 @@ impl Tasks { tasks, ); task.clause.shrink_to_fit(); + task.gets.shrink_to_fit(); tasks.push(task); } }