Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pantsd] Add RunTracker stats. #5374

Merged
merged 5 commits into from
Jan 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class DaemonPantsRunner(ProcessManager):
N.B. this class is primarily used by the PailgunService in pantsd.
"""

def __init__(self, socket, exiter, args, env, graph_helper, fork_lock, deferred_exception=None):
def __init__(self, socket, exiter, args, env, graph_helper, fork_lock, preceding_graph_size,
deferred_exception=None):
"""
:param socket socket: A connected socket capable of speaking the nailgun protocol.
:param Exiter exiter: The Exiter instance for this run.
Expand All @@ -82,6 +83,7 @@ def __init__(self, socket, exiter, args, env, graph_helper, fork_lock, deferred_
construction. In the event of an exception, this will be
None.
:param threading.RLock fork_lock: A lock to use during forking for thread safety.
:param int preceding_graph_size: The size of the graph pre-warming, for stats.
:param Exception deferred_exception: A deferred exception from the daemon's graph construction.
If present, this will be re-raised in the client context.
"""
Expand All @@ -92,6 +94,7 @@ def __init__(self, socket, exiter, args, env, graph_helper, fork_lock, deferred_
self._env = env
self._graph_helper = graph_helper
self._fork_lock = fork_lock
self._preceding_graph_size = preceding_graph_size
self._deferred_exception = deferred_exception

def _make_identity(self):
Expand Down Expand Up @@ -201,7 +204,9 @@ def post_fork_child(self):
self._raise_deferred_exc()

# Otherwise, conduct a normal run.
LocalPantsRunner(self._exiter, self._args, self._env, self._graph_helper).run()
runner = LocalPantsRunner(self._exiter, self._args, self._env, self._graph_helper)
runner.set_preceding_graph_size(self._preceding_graph_size)
runner.run()
except KeyboardInterrupt:
self._exiter.exit(1, msg='Interrupted by user.\n')
except Exception:
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/bin/goal_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def run(self):

try:
result = self._execute_engine()
self._context.set_resulting_graph_size_in_runtracker()
if result:
self._run_tracker.set_root_outcome(WorkUnit.FAILURE)
except KeyboardInterrupt:
Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def __init__(self, exiter, args, env, daemon_build_graph=None, options_bootstrap
self._env = env
self._daemon_build_graph = daemon_build_graph
self._options_bootstrapper = options_bootstrapper
self._preceding_graph_size = -1

def set_preceding_graph_size(self, size):
self._preceding_graph_size = size

def run(self):
profile_path = self._env.get('PANTS_PROFILE')
Expand Down Expand Up @@ -70,6 +74,9 @@ def _run(self):
if repro:
repro.capture(run_tracker.run_info.get_as_dict())

# Record the preceding product graph size.
run_tracker.pantsd_stats.set_preceding_graph_size(self._preceding_graph_size)

# Setup and run GoalRunner.
goal_runner = GoalRunner.Factory(root_dir,
options,
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/goal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ python_library(
],
)

python_library(
name = 'pantsd_stats',
sources = ['pantsd_stats.py'],
)

python_library(
name = 'products',
sources = ['products.py'],
Expand All @@ -76,6 +81,7 @@ python_library(
dependencies = [
':aggregated_timings',
':artifact_cache_stats',
':pantsd_stats',
'3rdparty/python:requests',
'3rdparty/python:pyopenssl',
'src/python/pants/base:build_environment',
Expand Down
8 changes: 6 additions & 2 deletions src/python/pants/goal/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ def __init__(self, options, run_tracker, target_roots,
self._workspace = workspace or (ScmWorkspace(self._scm) if self._scm else None)
self._replace_targets(target_roots)
self._invalidation_report = invalidation_report
# TODO(#4769): This should not be exposed to anyone.
# Note that the Context created in unit tests by BaseTest uses a different codepath.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This note is probably still useful...

(I think @stuhood had plans to unify them? :))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that work should already be tracked in #4401 fwict

self._scheduler = scheduler

@property
Expand Down Expand Up @@ -157,6 +155,12 @@ def __str__(self):
ident = Target.identify(self.targets())
return 'Context(id:{}, targets:{})'.format(ident, self.targets())

def set_resulting_graph_size_in_runtracker(self):
"""Sets the resulting graph size in the run tracker's daemon stats object."""
node_count = self._scheduler.graph_len()
self.run_tracker.pantsd_stats.set_resulting_graph_size(node_count)
return node_count

def submit_background_work_chain(self, work_chain, parent_workunit_name=None):
"""
:API: public
Expand Down
26 changes: 26 additions & 0 deletions src/python/pants/goal/pantsd_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# coding=utf-8
# Copyright 2018 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)


class PantsDaemonStats(object):
"""Tracks various stats about the daemon."""

def __init__(self):
self.preceding_graph_size = None
self.resulting_graph_size = None

def set_preceding_graph_size(self, size):
self.preceding_graph_size = size

def set_resulting_graph_size(self, size):
self.resulting_graph_size = size

def get_all(self):
return {
'preceding_graph_size': self.preceding_graph_size,
'resulting_graph_size': self.resulting_graph_size,
}
16 changes: 12 additions & 4 deletions src/python/pants/goal/run_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pants.build_graph.target import Target
from pants.goal.aggregated_timings import AggregatedTimings
from pants.goal.artifact_cache_stats import ArtifactCacheStats
from pants.goal.pantsd_stats import PantsDaemonStats
from pants.reporting.report import Report
from pants.stats.statsdb import StatsDBFactory
from pants.subsystem.subsystem import Subsystem
Expand Down Expand Up @@ -92,6 +93,7 @@ def __init__(self, *args, **kwargs):
self.cumulative_timings = None
self.self_timings = None
self.artifact_cache_stats = None
self.pantsd_stats = None

# Initialized in `start()`.
self.report = None
Expand Down Expand Up @@ -160,8 +162,10 @@ def initialize(self):
# Initialize the run.
millis = int((self._run_timestamp * 1000) % 1000)
run_id = 'pants_run_{}_{}_{}'.format(
time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(self._run_timestamp)), millis,
uuid.uuid4().hex)
time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime(self._run_timestamp)),
millis,
uuid.uuid4().hex
)

info_dir = os.path.join(self.get_options().pants_workdir, self.options_scope)
self.run_info_dir = os.path.join(info_dir, run_id)
Expand All @@ -182,8 +186,11 @@ def initialize(self):
self.self_timings = AggregatedTimings(os.path.join(self.run_info_dir, 'self_timings'))

# Hit/miss stats for the artifact cache.
self.artifact_cache_stats = \
ArtifactCacheStats(os.path.join(self.run_info_dir, 'artifact_cache_stats'))
self.artifact_cache_stats = ArtifactCacheStats(os.path.join(self.run_info_dir,
'artifact_cache_stats'))

# Daemon stats.
self.pantsd_stats = PantsDaemonStats()

return run_id

Expand Down Expand Up @@ -342,6 +349,7 @@ def store_stats(self):
'cumulative_timings': self.cumulative_timings.get_all(),
'self_timings': self.self_timings.get_all(),
'artifact_cache_stats': self.artifact_cache_stats.get_all(),
'pantsd_stats': self.pantsd_stats.get_all(),
'outcomes': self.outcomes
}
# Dump individual stat file.
Expand Down
38 changes: 21 additions & 17 deletions src/python/pants/pantsd/service/pailgun_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,28 @@ def runner_factory(sock, arguments, environment):
graph_helper = None
deferred_exc = None

# Capture the size of the graph prior to any warming, for stats.
preceding_graph_size = self._scheduler_service.product_graph_len()
self._logger.debug('resident graph size: %s', preceding_graph_size)

self._logger.debug('execution commandline: %s', arguments)
if self._scheduler_service:
self._logger.debug('args are: %s', arguments)
options, _ = OptionsInitializer(OptionsBootstrapper(args=arguments)).setup(init_logging=False)
target_roots = self._target_roots_class.create(
options,
change_calculator=self._scheduler_service.change_calculator
options, _ = OptionsInitializer(OptionsBootstrapper(args=arguments)).setup(init_logging=False)
target_roots = self._target_roots_class.create(
options,
change_calculator=self._scheduler_service.change_calculator
)

try:
self._logger.debug('warming the product graph via %s', self._scheduler_service)
# N.B. This call is made in the pre-fork daemon context for reach and reuse of the
# resident scheduler.
graph_helper = self._scheduler_service.warm_product_graph(target_roots)
except Exception:
deferred_exc = sys.exc_info()
self._logger.warning(
'encountered exception during SchedulerService.warm_product_graph(), deferring:\n%s',
''.join(traceback.format_exception(*deferred_exc))
)
try:
self._logger.debug('warming the product graph via %s', self._scheduler_service)
# N.B. This call is made in the pre-fork daemon context for reach and reuse of the
# resident scheduler.
graph_helper = self._scheduler_service.warm_product_graph(target_roots)
except Exception:
deferred_exc = sys.exc_info()
self._logger.warning(
'encountered exception during SchedulerService.warm_product_graph(), deferring:\n%s',
''.join(traceback.format_exception(*deferred_exc))
)

return self._runner_class(
sock,
Expand All @@ -84,6 +87,7 @@ def runner_factory(sock, arguments, environment):
environment,
graph_helper,
self.fork_lock,
preceding_graph_size,
deferred_exc
)

Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ def _process_event_queue(self):

self._event_queue.task_done()

def product_graph_len(self):
"""Provides the size of the captive product graph.

:returns: The node count for the captive product graph.
"""
return self._scheduler.graph_len()

def warm_product_graph(self, spec_roots):
"""Runs an execution request against the captive scheduler given a set of input specs to warm.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def test_stats_local_json_file(self):
self.assertIn('run_info', stats_json)
self.assertIn('self_timings', stats_json)
self.assertIn('cumulative_timings', stats_json)
self.assertIn('pantsd_stats', stats_json)

def test_workunit_failure(self):
pants_run = self.run_pants([
Expand Down