Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nailgun execution for RscCompile by bundling together the tool classpaths #7092

Merged
merged 7 commits into from
Jan 18, 2019
204 changes: 122 additions & 82 deletions src/python/pants/backend/jvm/tasks/jvm_compile/rsc/rsc_compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pants.util.contextutil import Timer
from pants.util.dirutil import (fast_relpath, fast_relpath_optional, maybe_read_file,
safe_file_dump, safe_mkdir)
from pants.util.memo import memoized_property


#
Expand Down Expand Up @@ -150,7 +151,8 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('rsc', recursive=True),
])
]
)
cls.register_jvm_tool(
register,
'metacp',
Expand All @@ -163,7 +165,8 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('scala', recursive=True),
])
]
)
cls.register_jvm_tool(
register,
'metai',
Expand All @@ -176,7 +179,31 @@ def register_options(cls, register):
],
custom_rules=[
Shader.exclude_package('scala', recursive=True),
])
]
)

# TODO: allow @memoized_method to convert lists into tuples so they can be hashed!
@memoized_property
def _nailgunnable_combined_classpath(self):
"""Register all of the component tools of the rsc compile task as a "combined" jvm tool.

This allows us to invoke their combined classpath in a single nailgun instance (see #7089 and
#7092). We still invoke their classpaths separately when not using nailgun, however.
"""
cp = []
for component_tool_name in ['rsc', 'metai', 'metacp']:
cp.extend(self.tool_classpath(component_tool_name))
# Add zinc's classpath so that it can be invoked from the same nailgun instance.
cp.extend(super(RscCompile, self).get_zinc_compiler_classpath())
return cp

# Overrides the normal zinc compiler classpath, which only contains zinc.
def get_zinc_compiler_classpath(self):
return self.do_for_execution_strategy_variant({
self.HERMETIC: lambda: super(RscCompile, self).get_zinc_compiler_classpath(),
self.SUBPROCESS: lambda: super(RscCompile, self).get_zinc_compiler_classpath(),
self.NAILGUN: lambda: self._nailgunnable_combined_classpath,
})

def register_extra_products_from_contexts(self, targets, compile_contexts):
super(RscCompile, self).register_extra_products_from_contexts(targets, compile_contexts)
Expand Down Expand Up @@ -747,87 +774,100 @@ def create_compile_context(self, target, target_workdir):
)
]

def _runtool(
self, main, tool_name, args, distribution, tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
if self.execution_strategy == self.HERMETIC:
with self.context.new_workunit(tool_name) as wu:
tool_classpath_abs = self.tool_classpath(tool_name)
tool_classpath = fast_relpath_collection(tool_classpath_abs)
def _runtool_hermetic(self, main, tool_name, args, distribution, tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's much cleaner having these broken out as methods. Thanks!

tool_classpath_abs = self.tool_classpath(tool_name)
tool_classpath = fast_relpath_collection(tool_classpath_abs)

classpath_for_cmd = os.pathsep.join(tool_classpath)
cmd = [
distribution.java,
]
cmd.extend(self.get_options().jvm_options)
cmd.extend(['-cp', classpath_for_cmd])
cmd.extend([main])
cmd.extend(args)

pathglobs = list(tool_classpath)
pathglobs.extend(f if os.path.isfile(f) else '{}/**'.format(f) for f in input_files)

if pathglobs:
root = PathGlobsAndRoot(
PathGlobs(tuple(pathglobs)),
text_type(get_buildroot()))
# dont capture snapshot, if pathglobs is empty
path_globs_input_digest = self.context._scheduler.capture_snapshots((root,))[0].directory_digest

if path_globs_input_digest and input_digest:
epr_input_files = self.context._scheduler.merge_directories(
(path_globs_input_digest, input_digest))
else:
epr_input_files = path_globs_input_digest or input_digest

epr = ExecuteProcessRequest(
argv=tuple(cmd),
input_files=epr_input_files,
output_files=tuple(),
output_directories=(output_dir,),
timeout_seconds=15*60,
description='run {} for {}'.format(tool_name, tgt),
# TODO: These should always be unicodes
# Since this is always hermetic, we need to use `underlying_dist`
jdk_home=text_type(self._zinc.underlying_dist.home),
)
res = self.context.execute_process_synchronously_without_raising(
epr,
self.name(),
[WorkUnitLabel.TOOL])

if res.exit_code != 0:
raise TaskError(res.stderr)

if output_dir:
dump_digest(output_dir, res.output_directory_digest)
self.context._scheduler.materialize_directories((
DirectoryToMaterialize(
# NB the first element here is the root to materialize into, not the dir to snapshot
text_type(get_buildroot()),
res.output_directory_digest),
))
# TODO drop a file containing the digest, named maybe output_dir.digest
return res
classpath_for_cmd = os.pathsep.join(tool_classpath)
cmd = [
distribution.java,
]
cmd.extend(self.get_options().jvm_options)
cmd.extend(['-cp', classpath_for_cmd])
cmd.extend([main])
cmd.extend(args)

pathglobs = list(tool_classpath)
pathglobs.extend(f if os.path.isfile(f) else '{}/**'.format(f) for f in input_files)

if pathglobs:
root = PathGlobsAndRoot(
PathGlobs(tuple(pathglobs)),
text_type(get_buildroot()))
# dont capture snapshot, if pathglobs is empty
path_globs_input_digest = self.context._scheduler.capture_snapshots((root,))[0].directory_digest

if path_globs_input_digest and input_digest:
epr_input_files = self.context._scheduler.merge_directories(
(path_globs_input_digest, input_digest))
else:
with self.context.new_workunit(tool_name) as wu:
result = self.runjava(classpath=self.tool_classpath(tool_name),
main=main,
jvm_options=self.get_options().jvm_options,
args=args,
workunit_name=tool_name,
workunit_labels=[WorkUnitLabel.TOOL],
dist=distribution
)
if result != 0:
raise TaskError('Running {} failed'.format(tool_name))
runjava_wu = None
for c in wu.children:
if c.name is tool_name:
runjava_wu = c
break
if runjava_wu is None:
raise Exception('couldnt find work unit for underlying execution')
return runjava_wu
epr_input_files = path_globs_input_digest or input_digest

epr = ExecuteProcessRequest(
argv=tuple(cmd),
input_files=epr_input_files,
output_files=tuple(),
output_directories=(output_dir,),
timeout_seconds=15*60,
description='run {} for {}'.format(tool_name, tgt),
# TODO: These should always be unicodes
# Since this is always hermetic, we need to use `underlying_dist`
jdk_home=text_type(self._zinc.underlying_dist.home),
)
res = self.context.execute_process_synchronously_without_raising(
epr,
self.name(),
[WorkUnitLabel.TOOL])

if res.exit_code != 0:
raise TaskError(res.stderr)

if output_dir:
dump_digest(output_dir, res.output_directory_digest)
self.context._scheduler.materialize_directories((
DirectoryToMaterialize(
# NB the first element here is the root to materialize into, not the dir to snapshot
text_type(get_buildroot()),
res.output_directory_digest),
))
# TODO drop a file containing the digest, named maybe output_dir.digest
return res

# The classpath is parameterized so that we can have a single nailgun instance serving all of our
# execution requests.
def _runtool_nonhermetic(self, parent_workunit, classpath, main, tool_name, args, distribution):
result = self.runjava(classpath=classpath,
main=main,
jvm_options=self.get_options().jvm_options,
args=args,
workunit_name=tool_name,
workunit_labels=[WorkUnitLabel.TOOL],
dist=distribution
)
if result != 0:
raise TaskError('Running {} failed'.format(tool_name))
runjava_workunit = None
for c in parent_workunit.children:
if c.name is tool_name:
runjava_workunit = c
break
# TODO: figure out and document when would this happen.
if runjava_workunit is None:
raise Exception('couldnt find work unit for underlying execution')
return runjava_workunit

def _runtool(self, main, tool_name, args, distribution,
tgt=None, input_files=tuple(), input_digest=None, output_dir=None):
with self.context.new_workunit(tool_name) as wu:
return self.do_for_execution_strategy_variant({
self.HERMETIC: lambda: self._runtool_hermetic(
main, tool_name, args, distribution,
tgt=tgt, input_files=input_files, input_digest=input_digest, output_dir=output_dir),
self.SUBPROCESS: lambda: self._runtool_nonhermetic(
wu, self.tool_classpath(tool_name), main, tool_name, args, distribution),
self.NAILGUN: lambda: self._runtool_nonhermetic(
wu, self._nailgunnable_combined_classpath, main, tool_name, args, distribution),
})

def _run_metai_tool(self,
distribution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def relative_to_exec_root(path):
# TODO: This should probably return a ClasspathEntry rather than a Digest
return res.output_directory_digest
else:
if self.runjava(classpath=[self._zinc.zinc],
if self.runjava(classpath=self.get_zinc_compiler_classpath(),
main=Zinc.ZINC_COMPILE_MAIN,
jvm_options=jvm_options,
args=zinc_args,
Expand All @@ -452,6 +452,16 @@ def relative_to_exec_root(path):
dist=self._zinc.dist):
raise TaskError('Zinc compile failed.')

def get_zinc_compiler_classpath(self):
"""Get the classpath for the zinc compiler JVM tool.

This will just be the zinc compiler tool classpath normally, but tasks which invoke zinc along
with other JVM tools with nailgun (such as RscCompile) require zinc to be invoked with this
method to ensure a single classpath is used for all the tools they need to invoke so that the
nailgun instance (which is keyed by classpath and JVM options) isn't invalidated.
"""
return [self._zinc.zinc]

def _verify_zinc_classpath(self, classpath, allow_dist=True):
def is_outside(path, putative_parent):
return os.path.relpath(path, putative_parent).startswith(os.pardir)
Expand Down
18 changes: 18 additions & 0 deletions src/python/pants/backend/jvm/tasks/nailgun_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@ class NailgunTaskBase(JvmToolTaskMixin, TaskBase):
SUBPROCESS = 'subprocess'
HERMETIC = 'hermetic'

class InvalidExecutionStrategyMapping(Exception): pass

_all_execution_strategies = frozenset([NAILGUN, SUBPROCESS, HERMETIC])

def do_for_execution_strategy_variant(self, mapping):
"""Invoke the method in `mapping` with the key corresponding to the execution strategy.

`mapping` is a dict mapping execution strategy -> zero-argument lambda.
"""
variants = frozenset(mapping.keys())
if variants != self._all_execution_strategies:
raise self.InvalidExecutionStrategyMapping(
'Must specify a mapping with exactly the keys {} (was: {})'
.format(self._all_execution_strategies, variants))
method_for_variant = mapping[self.execution_strategy]
# The methods need not return a value, but we pass it along if they do.
return method_for_variant()

@classmethod
def register_options(cls, register):
super(NailgunTaskBase, cls).register_options(register)
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/java/nailgun_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def run(this, stdout=None, stderr=None, stdin=None, cwd=None):

def _check_nailgun_state(self, new_fingerprint):
running = self.is_alive()
updated = self.needs_restart(new_fingerprint) or self.cmd != self._distribution.java
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this changing? I don't think it should be?

Copy link
Contributor Author

@cosmicexplorer cosmicexplorer Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree! I mentioned:

Remove the mysterious or self.cmd != self._distribution.java in nailgun_executor.py -- this is necessary for this PR to work, but an option can be plumbed in if it breaks pantsd

When adding a ton of debug logging I realized the nailguns were restarting despite matching fingerprints because of this line of code, and everything immediately started working after I removed it. I'm of course very concerned this will silently break pantsd, and an option can be plumbed through to avoid removing this check for specific nailguns, but the logic for nailgun and pailgun isn't clearly demarcated -- #6579 makes the distinction between nailgun and pailgun implementations more clear and could make this process easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could delve more into why the restarting is occurring, because to start at least self.cmd is a list, not a string like self._distribution.java.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to see whether this is still necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cmd is a string representation of command (

), but that doesn't clear up why we're comparing that string to self._distribution.java, which is the location of java being used.

I think self.cmd != self._distribution.java would only be False if the cmd was to just run java without any arguments.

Copy link
Contributor Author

@cosmicexplorer cosmicexplorer Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still necessary, if you run e.g. ./pants test tests/python/pants_test/backend/jvm/tasks/jvm_compile/rsc: -- -vs -k test_executing_multi_target_binary_nonhermetic it will reliably fail with the error described in the OP without this change, and that's not a fluke. Since the git log of that line only shows moving it from elsewhere (the first commit from git log -L 156,156:src/python/pants/java/nailgun_executor.py is 0d62074), I don't know how to evaluate this. It would be great if there were comments or tests about it, but in the worst case if we start seeing random pantsd issues and #6579 hasn't been merged yet (which I suspect would make all of this much more clear), we know what to fix. I also thought it was strange that it was comparing it to a bare java command and was wondering if it was necessary a long time ago and not anymore.

Copy link
Contributor Author

@cosmicexplorer cosmicexplorer Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I could see that being used as a testing hack to invalidate dummy java processes, for example, but I don't know where to find more context about this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong, looking at the wrong level of nesting. self.cmd in this context is from the ProcessManager, not the Runner. It's the first element in the command line.

def cmd(self):
"""The first element of the process commandline e.g. '/usr/bin/python2.7'.
:returns: The first element of the process command line or else `None` if the underlying
process has died.
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it confusing that cmd has so many meanings in that file.

Copy link
Contributor Author

@cosmicexplorer cosmicexplorer Jan 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!!!! Thank you for finding that, I thought I had tracked it down to executor.py.

updated = self.needs_restart(new_fingerprint)
logging.debug('Nailgun {nailgun} state: updated={up!s} running={run!s} fingerprint={old_fp} '
'new_fingerprint={new_fp} distribution={old_dist} new_distribution={new_dist}'
.format(nailgun=self._identity, up=updated, run=running,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,34 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import os
from functools import wraps

from pants.util.contextutil import temporary_dir
from pants_test.backend.jvm.tasks.jvm_compile.base_compile_integration_test import BaseCompileIT


def _execution_strategies(strategies, workers_range=[1]):
def wrapper(func):
@wraps(func)
def wrapper_self(*args, **kwargs):
for worker_count in workers_range:
for strategy in strategies:
func(*args, execution_strategy=strategy, worker_count=worker_count, **kwargs)
return wrapper_self
return wrapper


class RscCompileIntegration(BaseCompileIT):
def test_basic_binary(self):
@_execution_strategies(['nailgun', 'subprocess'])
def test_basic_binary_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {'execution_strategy': 'subprocess'},
'compile.rsc': {
'execution_strategy': execution_strategy,
'worker_count': worker_count,
},
}

pants_run = self.run_pants(
Expand Down Expand Up @@ -59,12 +75,16 @@ def test_basic_binary_hermetic(self):
'compile/rsc/current/.scala-library-synthetic/current/rsc/index/scala-library-synthetics.jar')
self.assertTrue(os.path.exists(path))

def test_executing_multi_target_binary(self):
@_execution_strategies(['nailgun', 'subprocess'], [2])
def test_executing_multi_target_binary_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {'execution_strategy': 'subprocess'}
'compile.rsc': {
'execution_strategy': execution_strategy,
'worker_count': worker_count,
}
}
with self.temporary_workdir() as workdir:
pants_run = self.run_pants_with_workdir(
Expand Down Expand Up @@ -97,14 +117,16 @@ def test_executing_multi_target_binary_hermetic(self):
self.assert_success(pants_run)
self.assertIn('Hello, Resource World!', pants_run.stdout_data)

def test_java_with_transitive_exported_scala_dep(self):
@_execution_strategies(['nailgun', 'subprocess'], [2])
def test_java_with_transitive_exported_scala_dep_nonhermetic(self, execution_strategy, worker_count):
with temporary_dir() as cache_dir:
config = {
'cache.compile.rsc': {'write_to': [cache_dir]},
'jvm-platform': {'compiler': 'rsc'},
'compile.rsc': {
'execution_strategy': 'subprocess',
'worker_count': 1}
'execution_strategy': execution_strategy,
'worker_count': worker_count,
},
}
with self.temporary_workdir() as workdir:
pants_run = self.run_pants_with_workdir(
Expand Down