From 2a0356a56309622e0085ea489c81b5310e71362f Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Tue, 27 Feb 2018 23:34:57 -0800 Subject: [PATCH] Add generator `@rule` integration, with support for `Get`ing products for subjects. --- src/python/pants/engine/BUILD | 1 + src/python/pants/engine/native.py | 125 ++++++++++++------ src/python/pants/engine/scheduler.py | 2 + src/python/pants/engine/selectors.py | 21 +++- src/rust/engine/fs/src/lib.rs | 11 +- src/rust/engine/src/externs.rs | 181 ++++++++++++++++----------- src/rust/engine/src/lib.rs | 21 ++-- src/rust/engine/src/nodes.rs | 68 +++++++++- src/rust/engine/src/types.rs | 1 + 9 files changed, 299 insertions(+), 132 deletions(-) diff --git a/src/python/pants/engine/BUILD b/src/python/pants/engine/BUILD index 3d2bae81869f..39b21d9933ba 100644 --- a/src/python/pants/engine/BUILD +++ b/src/python/pants/engine/BUILD @@ -182,6 +182,7 @@ python_library( sources=['native.py'], dependencies=[ ':native_engine_shared_library', + ':selectors', '3rdparty/python:cffi', '3rdparty/python:setuptools', 'src/python/pants/binaries:binary_util', diff --git a/src/python/pants/engine/native.py b/src/python/pants/engine/native.py index 30c2e5534f41..18277f432b86 100644 --- a/src/python/pants/engine/native.py +++ b/src/python/pants/engine/native.py @@ -17,6 +17,7 @@ import pkg_resources import six +from pants.engine.selectors import Get, constraint_for from pants.util.contextutil import temporary_dir from pants.util.dirutil import safe_mkdir, safe_mkdtemp from pants.util.memo import memoized_property @@ -83,6 +84,12 @@ Value value; } PyResult; +typedef struct { + uint8_t tag; + ValueBuffer values; + ValueBuffer constraints; +} PyGeneratorResponse; + typedef struct { int64_t hash_; Value value; @@ -92,25 +99,26 @@ typedef void ExternContext; // On the rust side the integration is defined in externs.rs -typedef void (*extern_ptr_log)(ExternContext*, uint8_t, uint8_t*, uint64_t); -typedef uint8_t extern_log_level; -typedef Ident (*extern_ptr_identify)(ExternContext*, Value*); -typedef _Bool (*extern_ptr_equals)(ExternContext*, Value*, Value*); -typedef Value (*extern_ptr_clone_val)(ExternContext*, Value*); -typedef void (*extern_ptr_drop_handles)(ExternContext*, Handle*, uint64_t); -typedef Buffer (*extern_ptr_type_to_str)(ExternContext*, TypeId); -typedef Buffer (*extern_ptr_val_to_str)(ExternContext*, Value*); -typedef _Bool (*extern_ptr_satisfied_by)(ExternContext*, Value*, Value*); -typedef _Bool (*extern_ptr_satisfied_by_type)(ExternContext*, Value*, TypeId*); -typedef Value (*extern_ptr_store_list)(ExternContext*, Value**, uint64_t, _Bool); -typedef Value (*extern_ptr_store_bytes)(ExternContext*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_store_i32)(ExternContext*, int32_t); -typedef Value (*extern_ptr_project)(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*); -typedef ValueBuffer (*extern_ptr_project_multi)(ExternContext*, Value*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_project_ignoring_type)(ExternContext*, Value*, uint8_t*, uint64_t); -typedef Value (*extern_ptr_create_exception)(ExternContext*, uint8_t*, uint64_t); -typedef PyResult (*extern_ptr_call)(ExternContext*, Value*, Value*, uint64_t); -typedef PyResult (*extern_ptr_eval)(ExternContext*, uint8_t*, uint64_t); +typedef void (*extern_ptr_log)(ExternContext*, uint8_t, uint8_t*, uint64_t); +typedef uint8_t extern_log_level; +typedef Ident (*extern_ptr_identify)(ExternContext*, Value*); +typedef _Bool (*extern_ptr_equals)(ExternContext*, Value*, Value*); +typedef Value (*extern_ptr_clone_val)(ExternContext*, Value*); +typedef void (*extern_ptr_drop_handles)(ExternContext*, Handle*, uint64_t); +typedef Buffer (*extern_ptr_type_to_str)(ExternContext*, TypeId); +typedef Buffer (*extern_ptr_val_to_str)(ExternContext*, Value*); +typedef _Bool (*extern_ptr_satisfied_by)(ExternContext*, Value*, Value*); +typedef _Bool (*extern_ptr_satisfied_by_type)(ExternContext*, Value*, TypeId*); +typedef Value (*extern_ptr_store_list)(ExternContext*, Value**, uint64_t, _Bool); +typedef Value (*extern_ptr_store_bytes)(ExternContext*, uint8_t*, uint64_t); +typedef Value (*extern_ptr_store_i32)(ExternContext*, int32_t); +typedef Value (*extern_ptr_project)(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*); +typedef ValueBuffer (*extern_ptr_project_multi)(ExternContext*, Value*, uint8_t*, uint64_t); +typedef Value (*extern_ptr_project_ignoring_type)(ExternContext*, Value*, uint8_t*, uint64_t); +typedef Value (*extern_ptr_create_exception)(ExternContext*, uint8_t*, uint64_t); +typedef PyResult (*extern_ptr_call)(ExternContext*, Value*, Value*, uint64_t); +typedef PyGeneratorResponse (*extern_ptr_generator_send)(ExternContext*, Value*, Value*); +typedef PyResult (*extern_ptr_eval)(ExternContext*, uint8_t*, uint64_t); typedef void Tasks; typedef void Scheduler; @@ -136,6 +144,7 @@ extern_ptr_log, extern_log_level, extern_ptr_call, + extern_ptr_generator_send, extern_ptr_eval, extern_ptr_identify, extern_ptr_equals, @@ -189,6 +198,7 @@ TypeConstraint, TypeConstraint, TypeConstraint, + TypeConstraint, TypeId, TypeId, Buffer, @@ -226,24 +236,25 @@ CFFI_EXTERNS = ''' extern "Python" { - void extern_log(ExternContext*, uint8_t, uint8_t*, uint64_t); - PyResult extern_call(ExternContext*, Value*, Value*, uint64_t); - PyResult extern_eval(ExternContext*, uint8_t*, uint64_t); - Ident extern_identify(ExternContext*, Value*); - _Bool extern_equals(ExternContext*, Value*, Value*); - Value extern_clone_val(ExternContext*, Value*); - void extern_drop_handles(ExternContext*, Handle*, uint64_t); - Buffer extern_type_to_str(ExternContext*, TypeId); - Buffer extern_val_to_str(ExternContext*, Value*); - _Bool extern_satisfied_by(ExternContext*, Value*, Value*); - _Bool extern_satisfied_by_type(ExternContext*, Value*, TypeId*); - Value extern_store_list(ExternContext*, Value**, uint64_t, _Bool); - Value extern_store_bytes(ExternContext*, uint8_t*, uint64_t); - Value extern_store_i32(ExternContext*, int32_t); - Value extern_project(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*); - Value extern_project_ignoring_type(ExternContext*, Value*, uint8_t*, uint64_t); - ValueBuffer extern_project_multi(ExternContext*, Value*, uint8_t*, uint64_t); - Value extern_create_exception(ExternContext*, uint8_t*, uint64_t); + void extern_log(ExternContext*, uint8_t, uint8_t*, uint64_t); + PyResult extern_call(ExternContext*, Value*, Value*, uint64_t); + PyGeneratorResponse extern_generator_send(ExternContext*, Value*, Value*); + PyResult extern_eval(ExternContext*, uint8_t*, uint64_t); + Ident extern_identify(ExternContext*, Value*); + _Bool extern_equals(ExternContext*, Value*, Value*); + Value extern_clone_val(ExternContext*, Value*); + void extern_drop_handles(ExternContext*, Handle*, uint64_t); + Buffer extern_type_to_str(ExternContext*, TypeId); + Buffer extern_val_to_str(ExternContext*, Value*); + _Bool extern_satisfied_by(ExternContext*, Value*, Value*); + _Bool extern_satisfied_by_type(ExternContext*, Value*, TypeId*); + Value extern_store_list(ExternContext*, Value**, uint64_t, _Bool); + Value extern_store_bytes(ExternContext*, uint8_t*, uint64_t); + Value extern_store_i32(ExternContext*, int32_t); + Value extern_project(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*); + Value extern_project_ignoring_type(ExternContext*, Value*, uint8_t*, uint64_t); + ValueBuffer extern_project_multi(ExternContext*, Value*, uint8_t*, uint64_t); + Value extern_create_exception(ExternContext*, uint8_t*, uint64_t); } ''' @@ -459,6 +470,41 @@ def extern_create_exception(context_handle, msg_ptr, msg_len): msg = to_py_str(msg_ptr, msg_len) return c.to_value(Exception(msg)) + @ffi.def_extern() + def extern_generator_send(context_handle, func, arg): + """Given a generator, send it the given value and return a response.""" + c = ffi.from_handle(context_handle) + try: + res = c.from_value(func).send(c.from_value(arg)) + if isinstance(res, Get): + # Get. + values = [res.subject] + constraints = [constraint_for(res.product)] + tag = 2 + elif type(res) in (tuple, list): + # GetMulti. + values = [g.subject for g in res] + constraints = [constraint_for(g.product) for g in res] + tag = 3 + else: + # Break. + values = [res] + constraints = [] + tag = 0 + except Exception as e: + # Throw. + val = e + val._formatted_exc = traceback.format_exc() + values = [val] + constraints = [] + tag = 1 + + return ( + tag, + c.vals_buf([c.to_value(v) for v in values]), + c.vals_buf([c.to_value(v) for v in constraints]) + ) + @ffi.def_extern() def extern_call(context_handle, func, args_ptr, args_len): """Given a callable, call it.""" @@ -632,6 +678,7 @@ def init_externs(): self.ffi_lib.extern_log, logger.getEffectiveLevel(), self.ffi_lib.extern_call, + self.ffi_lib.extern_generator_send, self.ffi_lib.extern_eval, self.ffi_lib.extern_identify, self.ffi_lib.extern_equals, @@ -705,7 +752,8 @@ def new_scheduler(self, constraint_file, constraint_link, constraint_process_request, - constraint_process_result): + constraint_process_result, + constraint_generator): """Create and return an ExternContext and native Scheduler.""" def func(constraint): @@ -738,6 +786,7 @@ def tc(constraint): tc(constraint_link), tc(constraint_process_request), tc(constraint_process_result), + tc(constraint_generator), # Types. TypeId(self.context.to_id(six.text_type)), TypeId(self.context.to_id(six.binary_type)), diff --git a/src/python/pants/engine/scheduler.py b/src/python/pants/engine/scheduler.py index 3e98ea5fb1f5..5091597adc85 100644 --- a/src/python/pants/engine/scheduler.py +++ b/src/python/pants/engine/scheduler.py @@ -9,6 +9,7 @@ import os import time from collections import defaultdict +from types import GeneratorType from pants.base.exceptions import TaskError from pants.base.project_tree import Dir, File, Link @@ -111,6 +112,7 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index): constraint_for(Link), constraint_for(ExecuteProcessRequest), constraint_for(ExecuteProcessResult), + constraint_for(GeneratorType), ) def _root_type_ids(self): diff --git a/src/python/pants/engine/selectors.py b/src/python/pants/engine/selectors.py index 6f6cda4203cb..ff7b58aebf2b 100644 --- a/src/python/pants/engine/selectors.py +++ b/src/python/pants/engine/selectors.py @@ -32,11 +32,30 @@ def constraint_for(type_or_constraint): raise TypeError("Expected a type or constraint: got: {}".format(type_or_constraint)) +class Get(datatype('Get', ['product', 'subject'])): + """TODO: Experimental synchronous generator API. + + May be called equivalently as either: + # verbose form: Get(product_type, subject_type, subject) + # shorthand form: Get(product_type, subject_type(subject)) + """ + + def __new__(cls, *args): + if len(args) == 2: + product, subject = args + elif len(args) == 3: + product, _, subject = args + else: + raise Exception('Expected either two or three arguments to {}; got {}.'.format( + Get.__name__, args)) + return super(Get, cls).__new__(cls, product, subject) + + class Selector(AbstractClass): - # The type constraint for the product type for this selector. @property def type_constraint(self): + """The type constraint for the product type for this selector.""" return constraint_for(self.product) @abstractproperty diff --git a/src/rust/engine/fs/src/lib.rs b/src/rust/engine/fs/src/lib.rs index 8e6564859b45..3560ebbe6184 100644 --- a/src/rust/engine/fs/src/lib.rs +++ b/src/rust/engine/fs/src/lib.rs @@ -697,19 +697,14 @@ pub trait VFS: Clone + Send + Sync + 'static { // If there were any new PathGlobs, continue the expansion. if expansion.todo.is_empty() { - future::Loop::Break(expansion) + future::Loop::Break(expansion.outputs) } else { future::Loop::Continue(expansion) } }) - }).map(|expansion| { - assert!( - expansion.todo.is_empty(), - "Loop shouldn't have exited with work to do: {:?}", - expansion.todo, - ); + }).map(|expansion_outputs| { // Finally, capture the resulting PathStats from the expansion. - expansion.outputs.into_iter().map(|(k, _)| k).collect() + expansion_outputs.into_iter().map(|(k, _)| k).collect() }) .to_boxed() } diff --git a/src/rust/engine/src/externs.rs b/src/rust/engine/src/externs.rs index 9e06ef4682b6..f02970272431 100644 --- a/src/rust/engine/src/externs.rs +++ b/src/rust/engine/src/externs.rs @@ -146,6 +146,33 @@ pub fn call(func: &Value, args: &[Value]) -> Result { with_externs(|e| (e.call)(e.context, func, args.as_ptr(), args.len() as u64)).into() } +pub fn generator_send(generator: &Value, arg: &Value) -> Result { + let response = with_externs(|e| (e.generator_send)(e.context, generator, arg)); + match response.res_type { + PyGeneratorResponseType::Break => Ok(GeneratorResponse::Break(response.values.unwrap_one())), + PyGeneratorResponseType::Throw => Err(PyResult::failure_from(response.values.unwrap_one())), + PyGeneratorResponseType::Get => { + let mut interns = INTERNS.write().unwrap(); + let constraint = TypeConstraint(interns.insert(response.constraints.unwrap_one())); + Ok(GeneratorResponse::Get(Get( + constraint, + interns.insert(response.values.unwrap_one()), + ))) + } + PyGeneratorResponseType::GetMulti => { + let mut interns = INTERNS.write().unwrap(); + let continues = response + .constraints + .to_vec() + .into_iter() + .zip(response.values.to_vec().into_iter()) + .map(|(c, v)| Get(TypeConstraint(interns.insert(c)), interns.insert(v))) + .collect(); + Ok(GeneratorResponse::GetMulti(continues)) + } + } +} + /// /// NB: Panics on failure. Only recommended for use with built-in functions, such as /// those configured in types::Types. @@ -206,84 +233,35 @@ where pub type ExternContext = raw::c_void; pub struct Externs { - context: *const ExternContext, - log: LogExtern, - log_level: u8, - call: CallExtern, - eval: EvalExtern, - identify: IdentifyExtern, - equals: EqualsExtern, - clone_val: CloneValExtern, - drop_handles: DropHandlesExtern, - satisfied_by: SatisfiedByExtern, - satisfied_by_type: SatisfiedByTypeExtern, - store_list: StoreListExtern, - store_bytes: StoreBytesExtern, - store_i32: StoreI32Extern, - project: ProjectExtern, - project_ignoring_type: ProjectIgnoringTypeExtern, - project_multi: ProjectMultiExtern, - type_to_str: TypeToStrExtern, - val_to_str: ValToStrExtern, - create_exception: CreateExceptionExtern, + pub context: *const ExternContext, + pub log_level: u8, + pub log: LogExtern, + pub call: CallExtern, + pub generator_send: GeneratorSendExtern, + pub eval: EvalExtern, + pub identify: IdentifyExtern, + pub equals: EqualsExtern, + pub clone_val: CloneValExtern, + pub drop_handles: DropHandlesExtern, + pub satisfied_by: SatisfiedByExtern, + pub satisfied_by_type: SatisfiedByTypeExtern, + pub store_list: StoreListExtern, + pub store_bytes: StoreBytesExtern, + pub store_i32: StoreI32Extern, + pub project: ProjectExtern, + pub project_ignoring_type: ProjectIgnoringTypeExtern, + pub project_multi: ProjectMultiExtern, + pub type_to_str: TypeToStrExtern, + pub val_to_str: ValToStrExtern, + pub create_exception: CreateExceptionExtern, // TODO: This type is also declared on `types::Types`. - py_str_type: TypeId, + pub py_str_type: TypeId, } // The pointer to the context is safe for sharing between threads. unsafe impl Sync for Externs {} unsafe impl Send for Externs {} -impl Externs { - pub fn new( - ext_context: *const ExternContext, - log: LogExtern, - log_level: u8, - call: CallExtern, - eval: EvalExtern, - identify: IdentifyExtern, - equals: EqualsExtern, - clone_val: CloneValExtern, - drop_handles: DropHandlesExtern, - type_to_str: TypeToStrExtern, - val_to_str: ValToStrExtern, - satisfied_by: SatisfiedByExtern, - satisfied_by_type: SatisfiedByTypeExtern, - store_list: StoreListExtern, - store_bytes: StoreBytesExtern, - store_i32: StoreI32Extern, - project: ProjectExtern, - project_ignoring_type: ProjectIgnoringTypeExtern, - project_multi: ProjectMultiExtern, - create_exception: CreateExceptionExtern, - py_str_type: TypeId, - ) -> Externs { - Externs { - context: ext_context, - log: log, - log_level: log_level, - call: call, - eval: eval, - identify: identify, - equals: equals, - clone_val: clone_val, - drop_handles: drop_handles, - satisfied_by: satisfied_by, - satisfied_by_type: satisfied_by_type, - store_list: store_list, - store_bytes: store_bytes, - store_i32: store_i32, - project: project, - project_ignoring_type: project_ignoring_type, - project_multi: project_multi, - type_to_str: type_to_str, - val_to_str: val_to_str, - create_exception: create_exception, - py_str_type: py_str_type, - } - } -} - pub type LogExtern = extern "C" fn(*const ExternContext, u8, str_ptr: *const u8, str_len: u64); // TODO: Type alias used to avoid rustfmt breaking itself by rendering a 101 character line. @@ -324,11 +302,17 @@ pub struct PyResult { value: Value, } +impl PyResult { + fn failure_from(v: Value) -> Failure { + let traceback = project_str(&v, "_formatted_exc"); + Failure::Throw(v, traceback) + } +} + impl From for Result { fn from(result: PyResult) -> Self { if result.is_throw { - let traceback = project_str(&result.value, "_formatted_exc"); - Err(Failure::Throw(result.value, traceback)) + Err(PyResult::failure_from(result.value)) } else { Ok(result.value) } @@ -350,6 +334,32 @@ impl From> for PyResult { } } +// Only constructed from the python side. +#[allow(dead_code)] +#[repr(u8)] +pub enum PyGeneratorResponseType { + Break = 0, + Throw = 1, + Get = 2, + GetMulti = 3, +} + +#[repr(C)] +pub struct PyGeneratorResponse { + res_type: PyGeneratorResponseType, + values: ValueBuffer, + constraints: ValueBuffer, +} + +#[derive(Debug)] +pub struct Get(pub TypeConstraint, pub Key); + +pub enum GeneratorResponse { + Break(Value), + Get(Get), + GetMulti(Vec), +} + // The result of an `identify` call, including the __hash__ of a Value and its TypeId. #[repr(C)] pub struct Ident { @@ -358,7 +368,13 @@ pub struct Ident { pub type_id: TypeId, } -// Points to an array containing a series of values allocated by Python. +/// +/// Points to an array containing a series of values allocated by Python. +/// +/// TODO: An interesting optimization might be possible where we avoid actually +/// allocating the values array for values_len == 1, and instead store the Value in +/// the `handle_` field. +/// #[repr(C)] pub struct ValueBuffer { values_ptr: *mut Value, @@ -375,6 +391,20 @@ impl ValueBuffer { |value_vec| unsafe { value_vec.iter().map(|v| v.clone_without_handle()).collect() }, ) } + + /// Asserts that the ValueBuffer contains one value, and returns it. + pub fn unwrap_one(&self) -> Value { + assert!( + self.values_len == 1, + "ValueBuffer contained more than one value: {}", + self.values_len + ); + with_vec( + self.values_ptr, + self.values_len as usize, + |value_vec| unsafe { value_vec.iter().next().unwrap().clone_without_handle() }, + ) + } } // Points to an array of TypeIds. @@ -467,6 +497,9 @@ pub type CreateExceptionExtern = pub type CallExtern = extern "C" fn(*const ExternContext, *const Value, *const Value, u64) -> PyResult; +pub type GeneratorSendExtern = + extern "C" fn(*const ExternContext, *const Value, *const Value) -> PyGeneratorResponse; + pub type EvalExtern = extern "C" fn(*const ExternContext, python_ptr: *const u8, python_len: u64) -> PyResult; diff --git a/src/rust/engine/src/lib.rs b/src/rust/engine/src/lib.rs index 30907d878394..e87bea657c86 100644 --- a/src/rust/engine/src/lib.rs +++ b/src/rust/engine/src/lib.rs @@ -42,10 +42,11 @@ use std::path::{Path, PathBuf}; use context::Core; use core::{Failure, Function, Key, TypeConstraint, TypeId, Value}; use externs::{Buffer, BufferBuffer, CallExtern, CloneValExtern, CreateExceptionExtern, - DropHandlesExtern, EqualsExtern, EvalExtern, ExternContext, Externs, IdentifyExtern, - LogExtern, ProjectExtern, ProjectIgnoringTypeExtern, ProjectMultiExtern, PyResult, - SatisfiedByExtern, SatisfiedByTypeExtern, StoreBytesExtern, StoreI32Extern, - StoreListExtern, TypeIdBuffer, TypeToStrExtern, ValToStrExtern}; + DropHandlesExtern, EqualsExtern, EvalExtern, ExternContext, Externs, + GeneratorSendExtern, IdentifyExtern, LogExtern, ProjectExtern, + ProjectIgnoringTypeExtern, ProjectMultiExtern, PyResult, SatisfiedByExtern, + SatisfiedByTypeExtern, StoreBytesExtern, StoreI32Extern, StoreListExtern, + TypeIdBuffer, TypeToStrExtern, ValToStrExtern}; use rule_graph::{GraphMaker, RuleGraph}; use scheduler::{ExecutionRequest, RootResult, Scheduler}; use tasks::Tasks; @@ -119,10 +120,11 @@ impl RawNodes { #[no_mangle] pub extern "C" fn externs_set( - ext_context: *const ExternContext, + context: *const ExternContext, log: LogExtern, log_level: u8, call: CallExtern, + generator_send: GeneratorSendExtern, eval: EvalExtern, identify: IdentifyExtern, equals: EqualsExtern, @@ -141,11 +143,12 @@ pub extern "C" fn externs_set( create_exception: CreateExceptionExtern, py_str_type: TypeId, ) { - externs::set_externs(Externs::new( - ext_context, + externs::set_externs(Externs { + context, log, log_level, call, + generator_send, eval, identify, equals, @@ -163,7 +166,7 @@ pub extern "C" fn externs_set( project_multi, create_exception, py_str_type, - )); + }); } #[no_mangle] @@ -206,6 +209,7 @@ pub extern "C" fn scheduler_create( type_link: TypeConstraint, type_process_request: TypeConstraint, type_process_result: TypeConstraint, + type_generator: TypeConstraint, type_string: TypeId, type_bytes: TypeId, build_root_buf: Buffer, @@ -244,6 +248,7 @@ pub extern "C" fn scheduler_create( link: type_link, process_request: type_process_request, process_result: type_process_result, + generator: type_generator, string: type_string, bytes: type_bytes, }, diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 07cdb278e608..f23990b83ae8 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -908,6 +908,58 @@ impl Task { } } } + + /// + /// TODO: Merge with `get` once all edges are statically declared. + /// + fn gen_get(context: &Context, gets: Vec) -> NodeFuture> { + let get_futures = gets + .into_iter() + .map(|get| { + let externs::Get(constraint, subject) = get; + let selector = selectors::Select::without_variant(constraint.clone()); + let edges_res = context + .core + .rule_graph + .find_root_edges(*subject.type_id(), selectors::Selector::Select(selector)) + .ok_or_else(|| { + throw(&format!( + "No rules were available to compute {} for {}", + externs::key_to_str(&constraint.0), + externs::key_to_str(&subject) + )) + }); + let context = context.clone(); + future::result(edges_res).and_then(move |edges| { + Select::new(constraint, subject, Default::default(), &edges).run(context.clone()) + }) + }) + .collect::>(); + future::join_all(get_futures).to_boxed() + } + + /// + /// Given a python generator Value, loop to request the generator's dependencies until + /// it completes with a result Value. + /// + fn generate(context: Context, generator: Value) -> NodeFuture { + future::loop_fn(externs::eval("None").unwrap(), move |input| { + let context = context.clone(); + future::result(externs::generator_send(&generator, &input)).and_then(move |response| { + match response { + externs::GeneratorResponse::Get(get) => Self::gen_get(&context, vec![get]) + .map(|vs| future::Loop::Continue(vs.into_iter().next().unwrap())) + .to_boxed() as BoxFuture<_, _>, + externs::GeneratorResponse::GetMulti(gets) => Self::gen_get(&context, gets) + .map(|vs| future::Loop::Continue(externs::store_list(vs.iter().collect(), false))) + .to_boxed() as BoxFuture<_, _>, + externs::GeneratorResponse::Break(val) => { + future::ok(future::Loop::Break(val)).to_boxed() as BoxFuture<_, _> + } + } + }) + }).to_boxed() + } } impl Node for Task { @@ -923,11 +975,21 @@ impl Node for Task { .collect::>(), ); - let task = self.task.clone(); + let func = self.task.func.clone(); deps .then(move |deps_result| match deps_result { - Ok(deps) => externs::call(&externs::val_for(&task.func.0), &deps), - Err(err) => Err(err), + Ok(deps) => externs::call(&externs::val_for(&func.0), &deps), + Err(failure) => Err(failure), + }) + .then(move |task_result| match task_result { + Ok(val) => { + if externs::satisfied_by(&context.core.types.generator, &val) { + Self::generate(context, val) + } else { + ok(val) + } + } + Err(failure) => err(failure), }) .to_boxed() } diff --git a/src/rust/engine/src/types.rs b/src/rust/engine/src/types.rs index 3ad622b07e89..956a6c3f8c1b 100644 --- a/src/rust/engine/src/types.rs +++ b/src/rust/engine/src/types.rs @@ -22,6 +22,7 @@ pub struct Types { pub link: TypeConstraint, pub process_request: TypeConstraint, pub process_result: TypeConstraint, + pub generator: TypeConstraint, pub string: TypeId, pub bytes: TypeId, }