Skip to content

Commit

Permalink
Port IsolatedProcess implementation from Python to Rust - Split 1 (#5239
Browse files Browse the repository at this point in the history
)

* Port IsolatedProcess implementation from Python to Rust - Part 1
Problem
From #4397 - Isolated processes currently work for the purposes of testing, but there are lots of abstraction leaks: in particular, a _Snapshots snapshots object is exposed which the python code uses to reverse engineer the location of snapshots.
Instead, process execution should be ported to rust (maybe using duct?) to remove that leak.
Additionally, the outcome of discussion around https://docs.google.com/document/d/1jGq34ds_fhRLPDnmHNV_FHcJiJnbhmk2XEwOf9-w5To/edit#heading=h.ylneg9wnwxcs was that we should adapt the process execution model (which is implicitly also a "remote process execution" model) more closely with the draft of bazel's process execution model.

Solution
This is the first split of the bigger branch (master...ity:ity/cloc_rules) that ports the implementation to Rust. This PR can go in independently and will make it easier to work with smaller branches.

Changes made in this PR:
Invoke process execution in Rust from Python
Plumbing for ExecuteProcessRequest/ExecuteProcessResult
cloc can be invoked from commandline which invokes Rust process execution 
Test (test_javac_version_example) added in python which invokes Rust through Rules.
Removed directories_to_create after discussing with @illicitonion
  • Loading branch information
ity authored and stuhood committed Jan 24, 2018
1 parent 9ecbdce commit 01ac566
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 50 deletions.
49 changes: 34 additions & 15 deletions src/python/pants/backend/graph_info/tasks/cloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pants.base.exceptions import TaskError
from pants.base.workunit import WorkUnitLabel
from pants.binaries.binary_util import BinaryUtil
from pants.engine.isolated_process import ExecuteProcessRequest, ExecuteProcessResult
from pants.task.console_task import ConsoleTask
from pants.util.contextutil import temporary_dir
from pants.util.process_handler import subprocess
Expand Down Expand Up @@ -44,6 +45,10 @@ def console_output(self, targets):
buildroot = get_buildroot()
with temporary_dir() as tmpdir:
# Write the paths of all files we want cloc to process to the so-called 'list file'.
# TODO: 1) list_file, report_file and ignored_file should be relative files within the
# execution "chroot", 2) list_file should be part of an input files Snapshot, and
# 3) report_file and ignored_file should be part of an output files Snapshot, when we have
# that capability.
list_file = os.path.join(tmpdir, 'list_file')
with open(list_file, 'w') as list_file_out:
for target in targets:
Expand All @@ -53,22 +58,36 @@ def console_output(self, targets):

report_file = os.path.join(tmpdir, 'report_file')
ignored_file = os.path.join(tmpdir, 'ignored')
cloc_script = self._get_cloc_script()
# See http://cloc.sourceforge.net/#options for cloc cmd-line options.
cmd = [cloc_script,
'--skip-uniqueness',
'--ignored={}'.format(ignored_file),
'--list-file={}'.format(list_file),
'--report-file={}'.format(report_file)]
with self.context.new_workunit(name='cloc',
labels=[WorkUnitLabel.TOOL],
cmd=' '.join(cmd)) as workunit:
result = subprocess.call(cmd,
stdout=workunit.output('stdout'),
stderr=workunit.output('stderr'))

if result != 0:
raise TaskError('{} ... exited non-zero ({}).'.format(' '.join(cmd), result))
# TODO: Look at how to make BinaryUtil support Snapshots - such as adding an instrinsic to do
# network fetch directly into a Snapshot.
# See http://cloc.sourceforge.net/#options for cloc cmd-line options.
cmd = (
self._get_cloc_script(),
'--skip-uniqueness',
'--ignored={}'.format(ignored_file),
'--list-file={}'.format(list_file),
'--report-file={}'.format(report_file)
)
if self.context._scheduler is None:
with self.context.new_workunit(
name='cloc',
labels=[WorkUnitLabel.TOOL],
cmd=' '.join(cmd)) as workunit:
result = subprocess.call(
cmd,
stdout=workunit.output('stdout'),
stderr=workunit.output('stderr')
)
else:
# TODO: Longer term we need to figure out what to put on $PATH in a remote execution env.
# Currently, we are adding everything within $PATH to the request.
env_path = ['PATH', os.environ.get('PATH')]
req = ExecuteProcessRequest(cmd, env_path)
execute_process_result, = self.context._scheduler.product_request(ExecuteProcessResult, [req])
exit_code = execute_process_result.exit_code
if exit_code != 0:
raise TaskError('{} ... exited non-zero ({}).'.format(' '.join(cmd), result))

with open(report_file, 'r') as report_file_in:
for line in report_file_in.read().split('\n'):
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/bin/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pants.base.file_system_project_tree import FileSystemProjectTree
from pants.engine.build_files import create_graph_rules
from pants.engine.fs import create_fs_rules
from pants.engine.isolated_process import create_process_rules
from pants.engine.legacy.address_mapper import LegacyAddressMapper
from pants.engine.legacy.change_calculator import EngineChangeCalculator
from pants.engine.legacy.graph import HydratedTargets, LegacyBuildGraph, create_legacy_graph_tasks
Expand Down Expand Up @@ -168,7 +169,8 @@ def setup_legacy_graph(pants_ignore_patterns,
tasks = (
create_legacy_graph_tasks(symbol_table) +
create_fs_rules() +
create_graph_rules(address_mapper, symbol_table)
create_graph_rules(address_mapper, symbol_table) +
create_process_rules()
)

scheduler = LocalScheduler(workdir, dict(), tasks, project_tree, native, include_trace_on_error=include_trace_on_error)
Expand Down
11 changes: 6 additions & 5 deletions src/python/pants/bin/goal_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def _init_graph(self, use_engine, pants_ignore_patterns, build_ignore_patterns,
:param list target_specs: The original target specs.
:param LegacyGraphHelper graph_helper: A LegacyGraphHelper to use for graph construction,
if available. This would usually come from the daemon.
:returns: A tuple of (BuildGraph, AddressMapper, spec_roots).
:returns: A tuple of (BuildGraph, AddressMapper, opt Scheduler, spec_roots).
"""
# N.B. Use of the daemon implies use of the v2 engine.
if graph_helper or use_engine:
Expand All @@ -114,15 +114,15 @@ def _init_graph(self, use_engine, pants_ignore_patterns, build_ignore_patterns,
build_root=self._root_dir,
change_calculator=graph_helper.change_calculator)
graph, address_mapper = graph_helper.create_build_graph(target_roots, self._root_dir)
return graph, address_mapper, target_roots.as_specs()
return graph, address_mapper, graph_helper.scheduler, target_roots.as_specs()
else:
spec_roots = TargetRoots.parse_specs(target_specs, self._root_dir)
address_mapper = BuildFileAddressMapper(self._build_file_parser,
get_project_tree(self._global_options),
build_ignore_patterns,
exclude_target_regexps,
subproject_build_roots)
return MutableBuildGraph(address_mapper), address_mapper, spec_roots
return MutableBuildGraph(address_mapper), address_mapper, None, spec_roots

def _determine_goals(self, requested_goals):
"""Check and populate the requested goals for a given run."""
Expand Down Expand Up @@ -163,7 +163,7 @@ def _should_be_quiet(self, goals):

def _setup_context(self):
with self._run_tracker.new_workunit(name='setup', labels=[WorkUnitLabel.SETUP]):
self._build_graph, self._address_mapper, spec_roots = self._init_graph(
self._build_graph, self._address_mapper, scheduler, spec_roots = self._init_graph(
self._global_options.enable_v2_engine,
self._global_options.pants_ignore,
self._global_options.build_ignore,
Expand Down Expand Up @@ -194,7 +194,8 @@ def _setup_context(self):
build_graph=self._build_graph,
build_file_parser=self._build_file_parser,
address_mapper=self._address_mapper,
invalidation_report=invalidation_report)
invalidation_report=invalidation_report,
scheduler=scheduler)
return goals, context

def setup(self):
Expand Down
78 changes: 75 additions & 3 deletions src/python/pants/engine/isolated_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from abc import abstractproperty
from binascii import hexlify

from pants.engine.rules import SingletonRule, TaskRule
from pants.engine.rules import RootRule, SingletonRule, TaskRule, rule
from pants.engine.selectors import Select
from pants.util.contextutil import open_tar, temporary_dir
from pants.util.dirutil import safe_mkdir
Expand All @@ -29,7 +29,7 @@ def _run_command(binary, sandbox_dir, process_request):
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
cwd=sandbox_dir)
# TODO At some point, we may want to replace this blocking wait with a timed one that returns
# TODO: At some point, we may want to replace this blocking wait with a timed one that returns
# some kind of in progress state.
popen.wait()
logger.debug('Done running command in {}'.format(sandbox_dir))
Expand Down Expand Up @@ -58,7 +58,6 @@ def _snapshotted_process(input_conversion,
Receives two conversion functions, some required inputs, and the user-declared inputs.
"""

process_request = input_conversion(*args)

# TODO resolve what to do with output files, then make these tmp dirs cleaned up.
Expand All @@ -82,6 +81,20 @@ def _snapshotted_process(input_conversion,
return output_conversion(process_result, sandbox_dir)


def _setup_process_execution(input_conversion, *args):
"""A pickleable top-level function to setup pre-execution.
"""
return input_conversion(*args)


def _post_process_execution(output_conversion, *args):
"""A pickleable top-level function to execute a process.
Receives two conversion functions, some required inputs, and the user-declared inputs.
"""
return output_conversion(*args)


class Binary(object):
"""Binary in the product graph.
Expand Down Expand Up @@ -159,3 +172,62 @@ def create_snapshot_rules():
return [
SingletonRule(_Snapshots, _Snapshots('/dev/null'))
]


class ExecuteProcess(object):
"""A static helper for defining a task rule to execute a process."""

def __new__(cls, *args):
raise ValueError('Use `create` to declare a task function representing a process.')

@staticmethod
def create_in(product_type, input_selectors, input_conversion):
# TODO: combine create_in/create_out fucntions
func = functools.partial(_setup_process_execution, input_conversion)
func.__name__ = '{}_and_then_execute_process'.format(input_conversion.__name__)
inputs = list(input_selectors)

# Return a task triple that executes the function to produce the product type.
return TaskRule(product_type, inputs, func)

@staticmethod
def create_out(product_type, input_selectors, output_conversion):
func = functools.partial(_post_process_execution,
output_conversion)
func.__name__ = 'execute_process_and_then_{}'.format(output_conversion.__name__)
inputs = list(input_selectors)

# Return a task triple that executes the function to produce the product type.
return TaskRule(product_type, inputs, func)


class ExecuteProcessRequest(datatype('ExecuteProcessRequest', ['argv', 'env'])):
"""Request for execution with args and snapshots to extract."""

def __new__(cls, argv, env):
"""
:param args: Arguments to the process being run.
:param env: A tuple of environment variables and values.
"""
if not isinstance(argv, tuple):
raise ValueError('argv must be a tuple.')
return super(ExecuteProcessRequest, cls).__new__(cls, argv, tuple(env))


class ExecuteProcessResult(datatype('ExecuteProcessResult', ['stdout', 'stderr', 'exit_code'])):
pass


def create_process_rules():
"""Intrinsically replaced on the rust side."""
return [execute_process_noop, RootRule(ExecuteProcessRequest)]


@rule(ExecuteProcessResult, [Select(ExecuteProcessRequest)])
def execute_process_noop(*args):
raise Exception('This task is replaced intrinsically, and should never run.')




21 changes: 20 additions & 1 deletion src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
typedef _Bool (*extern_ptr_satisfied_by_type)(ExternContext*, TypeConstraint*, TypeId*);
typedef Value (*extern_ptr_store_list)(ExternContext*, Value**, uint64_t, _Bool);
typedef Value (*extern_ptr_store_bytes)(ExternContext*, uint8_t*, uint64_t);
typedef Value (*extern_ptr_store_i32)(ExternContext*, int32_t);
typedef Value (*extern_ptr_project)(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*);
typedef ValueBuffer (*extern_ptr_project_multi)(ExternContext*, Value*, uint8_t*, uint64_t);
typedef Value (*extern_ptr_project_ignoring_type)(ExternContext*, Value*, uint8_t*, uint64_t);
Expand Down Expand Up @@ -136,6 +137,7 @@
extern_ptr_satisfied_by_type,
extern_ptr_store_list,
extern_ptr_store_bytes,
extern_ptr_store_i32,
extern_ptr_project,
extern_ptr_project_ignoring_type,
extern_ptr_project_multi,
Expand Down Expand Up @@ -163,6 +165,9 @@
Function,
Function,
Function,
Function,
TypeConstraint,
TypeConstraint,
TypeConstraint,
TypeConstraint,
TypeConstraint,
Expand Down Expand Up @@ -217,6 +222,7 @@
_Bool extern_satisfied_by_type(ExternContext*, TypeConstraint*, TypeId*);
Value extern_store_list(ExternContext*, Value**, uint64_t, _Bool);
Value extern_store_bytes(ExternContext*, uint8_t*, uint64_t);
Value extern_store_i32(ExternContext*, int32_t);
Value extern_project(ExternContext*, Value*, uint8_t*, uint64_t, TypeId*);
Value extern_project_ignoring_type(ExternContext*, Value*, uint8_t*, uint64_t);
ValueBuffer extern_project_multi(ExternContext*, Value*, uint8_t*, uint64_t);
Expand Down Expand Up @@ -359,6 +365,12 @@ def extern_store_bytes(context_handle, bytes_ptr, bytes_len):
c = ffi.from_handle(context_handle)
return c.to_value(bytes(ffi.buffer(bytes_ptr, bytes_len)))

@ffi.def_extern()
def extern_store_i32(context_handle, i32):
"""Given a context and int32_t, return a new Value to represent the int32_t."""
c = ffi.from_handle(context_handle)
return c.to_value(i32)

@ffi.def_extern()
def extern_project(context_handle, val, field_str_ptr, field_str_len, type_id):
"""Given a Value for `obj`, a field name, and a type, project the field as a new Value."""
Expand Down Expand Up @@ -626,6 +638,7 @@ def init_externs():
self.ffi_lib.extern_satisfied_by_type,
self.ffi_lib.extern_store_list,
self.ffi_lib.extern_store_bytes,
self.ffi_lib.extern_store_i32,
self.ffi_lib.extern_project,
self.ffi_lib.extern_project_ignoring_type,
self.ffi_lib.extern_project_multi,
Expand Down Expand Up @@ -676,6 +689,7 @@ def new_scheduler(self,
construct_dir,
construct_file,
construct_link,
construct_process_result,
constraint_has_products,
constraint_address,
constraint_variants,
Expand All @@ -685,7 +699,9 @@ def new_scheduler(self,
constraint_files_content,
constraint_dir,
constraint_file,
constraint_link):
constraint_link,
constraint_process_request,
constraint_process_result):
"""Create and return an ExternContext and native Scheduler."""

def tc(constraint):
Expand All @@ -702,6 +718,7 @@ def tc(constraint):
Function(self.context.to_id(construct_dir)),
Function(self.context.to_id(construct_file)),
Function(self.context.to_id(construct_link)),
Function(self.context.to_id(construct_process_result)),
# TypeConstraints.
tc(constraint_address),
tc(constraint_has_products),
Expand All @@ -713,6 +730,8 @@ def tc(constraint):
tc(constraint_dir),
tc(constraint_file),
tc(constraint_link),
tc(constraint_process_request),
tc(constraint_process_result),
# Types.
TypeId(self.context.to_id(six.text_type)),
TypeId(self.context.to_id(six.binary_type)),
Expand Down
6 changes: 5 additions & 1 deletion src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from pants.build_graph.address import Address
from pants.engine.addressable import SubclassesOf
from pants.engine.fs import FileContent, FilesContent, Path, PathGlobs, Snapshot
from pants.engine.isolated_process import _Snapshots, create_snapshot_rules
from pants.engine.isolated_process import (ExecuteProcessRequest, ExecuteProcessResult, _Snapshots,
create_snapshot_rules)
from pants.engine.native import Function, TypeConstraint, TypeId
from pants.engine.nodes import Return, State, Throw
from pants.engine.rules import RuleIndex, SingletonRule, TaskRule
Expand Down Expand Up @@ -97,6 +98,7 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
Dir,
File,
Link,
ExecuteProcessResult,
has_products_constraint,
constraint_for(Address),
constraint_for(Variants),
Expand All @@ -107,6 +109,8 @@ def __init__(self, native, build_root, work_dir, ignore_patterns, rule_index):
constraint_for(Dir),
constraint_for(File),
constraint_for(Link),
constraint_for(ExecuteProcessRequest),
constraint_for(ExecuteProcessResult),
)

def _root_type_ids(self):
Expand Down
5 changes: 4 additions & 1 deletion src/python/pants/goal/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def fatal(self, *msg_elements):
def __init__(self, options, run_tracker, target_roots,
requested_goals=None, target_base=None, build_graph=None,
build_file_parser=None, address_mapper=None, console_outstream=None, scm=None,
workspace=None, invalidation_report=None):
workspace=None, invalidation_report=None, scheduler=None):
self._options = options
self.build_graph = build_graph
self.build_file_parser = build_file_parser
Expand All @@ -80,6 +80,9 @@ def __init__(self, options, run_tracker, target_roots,
self._workspace = workspace or (ScmWorkspace(self._scm) if self._scm else None)
self._replace_targets(target_roots)
self._invalidation_report = invalidation_report
# TODO(#4769): This should not be exposed to anyone.
# Note that the Context created in unit tests by BaseTest uses a different codepath.
self._scheduler = scheduler

@property
def options(self):
Expand Down
Loading

0 comments on commit 01ac566

Please sign in to comment.