Skip to content

Commit

Permalink
Pantsd terminates if its pidfile changes (#5877)
Browse files Browse the repository at this point in the history
This avoids stray pantsd processes. I'm not sure how these have ended up
appearing, but I've had multiple reports of this happening. Hopefully
the log it adds will provide sufficient information to help debug the
root cause, but this is a secondary aim.
  • Loading branch information
illicitonion authored and Stu Hood committed Jun 4, 2018
1 parent 4673459 commit d1749d1
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 27 deletions.
14 changes: 13 additions & 1 deletion src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,23 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_scheduler, watch
bootstrap_options.pantsd_fs_event_workers
)

pidfile_absolute = PantsDaemon.metadata_file_path('pantsd', 'pid', bootstrap_options.pants_subprocessdir)
if pidfile_absolute.startswith(build_root):
pidfile = os.path.relpath(pidfile_absolute, build_root)
else:
pidfile = None
logging.getLogger(__name__).warning(
'Not watching pantsd pidfile because subprocessdir is outside of buildroot. Having '
'subprocessdir be a child of buildroot (as it is by default) may help avoid stray '
'pantsd processes.'
)

scheduler_service = SchedulerService(
fs_event_service,
legacy_graph_scheduler,
build_root,
bootstrap_options.pantsd_invalidation_globs
bootstrap_options.pantsd_invalidation_globs,
pidfile,
)

pailgun_service = PailgunService(
Expand Down
22 changes: 15 additions & 7 deletions src/python/pants/pantsd/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,24 @@ def file_waiter():
action_msg = 'file {} to appear'.format(filename)
return cls._deadline_until(file_waiter, action_msg, timeout=timeout)

def _get_metadata_dir_by_name(self, name):
@staticmethod
def _get_metadata_dir_by_name(name, metadata_base_dir):
"""Retrieve the metadata dir by name.
This should always live outside of the workdir to survive a clean-all.
"""
return os.path.join(self._metadata_base_dir, name)
return os.path.join(metadata_base_dir, name)

def _maybe_init_metadata_dir_by_name(self, name):
"""Initialize the metadata directory for a named identity if it doesn't exist."""
safe_mkdir(self._get_metadata_dir_by_name(name))
safe_mkdir(self.__class__._get_metadata_dir_by_name(name, self._metadata_base_dir))

def _metadata_file_path(self, name, metadata_key):
return self.metadata_file_path(name, metadata_key, self._metadata_base_dir)

@classmethod
def metadata_file_path(cls, name, metadata_key, metadata_base_dir):
return os.path.join(cls._get_metadata_dir_by_name(name, metadata_base_dir), metadata_key)

def read_metadata_by_name(self, name, metadata_key, caster=None):
"""Read process metadata using a named identity.
Expand All @@ -166,8 +174,8 @@ def read_metadata_by_name(self, name, metadata_key, caster=None):
:param string metadata_key: The metadata key (e.g. 'pid').
:param func caster: A casting callable to apply to the read value (e.g. `int`).
"""
file_path = self._metadata_file_path(name, metadata_key)
try:
file_path = os.path.join(self._get_metadata_dir_by_name(name), metadata_key)
return self._maybe_cast(read_file(file_path).strip(), caster)
except (IOError, OSError):
return None
Expand All @@ -180,7 +188,7 @@ def write_metadata_by_name(self, name, metadata_key, metadata_value):
:param string metadata_value: The metadata value (e.g. '1729').
"""
self._maybe_init_metadata_dir_by_name(name)
file_path = os.path.join(self._get_metadata_dir_by_name(name), metadata_key)
file_path = self._metadata_file_path(name, metadata_key)
safe_file_dump(file_path, metadata_value)

def await_metadata_by_name(self, name, metadata_key, timeout, caster=None):
Expand All @@ -193,7 +201,7 @@ def await_metadata_by_name(self, name, metadata_key, timeout, caster=None):
:returns: The value of the metadata key (read from disk post-write).
:raises: :class:`ProcessMetadataManager.Timeout` on timeout.
"""
file_path = os.path.join(self._get_metadata_dir_by_name(name), metadata_key)
file_path = self._metadata_file_path(name, metadata_key)
self._wait_for_file(file_path, timeout=timeout)
return self.read_metadata_by_name(name, metadata_key, caster)

Expand All @@ -202,7 +210,7 @@ def purge_metadata_by_name(self, name):
:raises: `ProcessManager.MetadataError` when OSError is encountered on metadata dir removal.
"""
meta_dir = self._get_metadata_dir_by_name(name)
meta_dir = self._get_metadata_dir_by_name(name, self._metadata_base_dir)
logger.debug('purging metadata directory: {}'.format(meta_dir))
try:
rm_rf(meta_dir)
Expand Down
22 changes: 22 additions & 0 deletions src/python/pants/pantsd/service/fs_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class FSEventService(PantsService):

ZERO_DEPTH = ['depth', 'eq', 0]

PANTS_PID_SUBSCRIPTION_NAME = 'pantsd_pid'

def __init__(self, watchman, build_root, worker_count):
"""
:param Watchman watchman: The Watchman instance as provided by the WatchmanLauncher subsystem.
Expand Down Expand Up @@ -80,6 +82,26 @@ def register_all_files_handler(self, callback, name='all_files'):
callback
)

def register_pidfile_handler(self, pidfile_path, callback):
"""
:param pidfile_path: Path to the pidfile, relative to the build root
:param callback:
:return:
"""
self.register_handler(
self.PANTS_PID_SUBSCRIPTION_NAME,
dict(
fields=['name'],
expression=[
'allof',
['dirname', os.path.dirname(pidfile_path)],
['name', os.path.basename(pidfile_path)],
],
),
callback,
)

def register_handler(self, name, metadata, callback):
"""Register subscriptions and their event handlers.
Expand Down
45 changes: 41 additions & 4 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
unicode_literals, with_statement)

import logging
import os
import Queue
import threading

Expand All @@ -24,7 +25,14 @@ class SchedulerService(PantsService):

QUEUE_SIZE = 64

def __init__(self, fs_event_service, legacy_graph_scheduler, build_root, invalidation_globs):
def __init__(
self,
fs_event_service,
legacy_graph_scheduler,
build_root,
invalidation_globs,
pantsd_pidfile,
):
"""
:param FSEventService fs_event_service: An unstarted FSEventService instance for setting up
filesystem event handlers.
Expand All @@ -39,6 +47,7 @@ def __init__(self, fs_event_service, legacy_graph_scheduler, build_root, invalid
self._graph_helper = legacy_graph_scheduler
self._invalidation_globs = invalidation_globs
self._build_root = build_root
self._pantsd_pidfile = pantsd_pidfile

self._scheduler = legacy_graph_scheduler.scheduler
self._logger = logging.getLogger(__name__)
Expand All @@ -65,23 +74,48 @@ def setup(self, lifecycle_lock, fork_lock):
)
self._logger.info('watching invalidating files: {}'.format(self._invalidating_files))

if self._pantsd_pidfile:
self._fs_event_service.register_pidfile_handler(self._pantsd_pidfile, self._enqueue_fs_event)

def _enqueue_fs_event(self, event):
"""Watchman filesystem event handler for BUILD/requirements.txt updates. Called via a thread."""
self._logger.info('enqueuing {} changes for subscription {}'
.format(len(event['files']), event['subscription']))
self._event_queue.put(event)

def _maybe_invalidate_scheduler(self, files):
def _maybe_invalidate_scheduler_batch(self, files):
invalidating_files = self._invalidating_files
if any(f in invalidating_files for f in files):
self._logger.fatal('saw file events covered by invalidation globs, terminating the daemon.')
self.terminate()

def _maybe_invalidate_scheduler_pidfile(self):
new_pid = self._check_pid_changed()
if new_pid is not False:
self._logger.fatal('{} says pantsd PID is {} but my PID is: {}: terminating'.format(
self._pantsd_pidfile,
new_pid,
os.getpid(),
))
self.terminate()

def _check_pid_changed(self):
"""Reads pidfile and returns False if its PID is ours, else a printable (maybe falsey) value."""
try:
with open(os.path.join(self._build_root, self._pantsd_pidfile), "r") as f:
pid_from_file = f.read()
except IOError:
return "[no file could be read]"
if int(pid_from_file) != os.getpid():
return pid_from_file
else:
return False

def _handle_batch_event(self, files):
self._logger.debug('handling change event for: %s', files)

with self.lifecycle_lock:
self._maybe_invalidate_scheduler(files)
self._maybe_invalidate_scheduler_batch(files)

with self.fork_lock:
self._scheduler.invalidate_files(files)
Expand All @@ -106,7 +140,10 @@ def _process_event_queue(self):

# The first watchman event is a listing of all files - ignore it.
if not is_initial_event:
self._handle_batch_event(files)
if subscription == self._fs_event_service.PANTS_PID_SUBSCRIPTION_NAME:
self._maybe_invalidate_scheduler_pidfile()
else:
self._handle_batch_event(files)

if not self._watchman_is_running.is_set():
self._watchman_is_running.set()
Expand Down
65 changes: 52 additions & 13 deletions tests/python/pants_test/pantsd/test_pantsd_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ def pantsd_successful_run_context(self, log_level='info', extra_config=None):
pantsd_config
),
checker,
workdir
workdir,
pantsd_config,
)

def _run_count(self, workdir):
Expand Down Expand Up @@ -171,13 +172,13 @@ def assert_success_runner(self, workdir, config, cmd, extra_config={}, expected_
return run

def test_pantsd_compile(self):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _, _):
# This tests a deeper pantsd-based run by actually invoking a full compile.
pantsd_run(['compile', 'examples/src/scala/org/pantsbuild/example/hello/welcome'])
checker.assert_started()

def test_pantsd_run(self):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, workdir, _):
pantsd_run(['list', '3rdparty:'])
checker.assert_started()

Expand Down Expand Up @@ -207,7 +208,7 @@ def test_pantsd_broken_pipe(self):
checker.assert_started()

def test_pantsd_stacktrace_dump(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, _):
pantsd_run(['help'])
checker.assert_started()

Expand Down Expand Up @@ -239,7 +240,7 @@ def test_pantsd_lifecycle_invalidation(self):
"""Runs pants commands with pantsd enabled, in a loop, alternating between options that
should invalidate pantsd and incur a restart and then asserts for pid consistency.
"""
with self.pantsd_successful_run_context() as (pantsd_run, checker, _):
with self.pantsd_successful_run_context() as (pantsd_run, checker, _, _):
variants = (
['debug', 'help'],
['info', 'help']
Expand All @@ -258,7 +259,7 @@ def test_pantsd_lifecycle_invalidation(self):
checker.assert_running()

def test_pantsd_lifecycle_non_invalidation(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, _):
with self.pantsd_successful_run_context() as (pantsd_run, checker, _, _):
variants = (
['-q', 'help'],
['--no-colors', 'help'],
Expand Down Expand Up @@ -287,7 +288,7 @@ def test_pantsd_lifecycle_non_invalidation_on_config_string(self):
with open(invalidating_config, 'wb') as fh:
fh.write('[GLOBAL]\npants_distdir: {}\n'.format(os.path.join(dist_dir_root, 'v2')))

with self.pantsd_successful_run_context() as (pantsd_run, checker, _):
with self.pantsd_successful_run_context() as (pantsd_run, checker, _, _):
variants = [['--pants-config-files={}'.format(f), 'help'] for f in config_files]
pantsd_pid = None
for cmd in itertools.chain(*itertools.repeat(variants, 2)):
Expand All @@ -306,7 +307,7 @@ def test_pantsd_stray_runners(self):
cmd = os.environ.get('PANTS_TEST_PANTSD_STRESS_CMD', 'help').split()

with no_lingering_process_by_command('pantsd-runner'):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _, _):
pantsd_run(cmd)
checker.assert_started()
for _ in range(attempts):
Expand All @@ -328,7 +329,7 @@ def test_pantsd_aligned_output(self):

non_daemon_runs = [self.run_pants(cmd) for cmd in cmds]

with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, _):
daemon_runs = [pantsd_run(cmd) for cmd in cmds]
checker.assert_started()

Expand All @@ -341,7 +342,7 @@ def test_pantsd_aligned_output(self):

def test_pantsd_filesystem_invalidation(self):
"""Runs with pantsd enabled, in a loop, while another thread invalidates files."""
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, _):
cmd = ['list', '::']
pantsd_run(cmd)
checker.assert_started()
Expand All @@ -358,7 +359,7 @@ def test_pantsd_filesystem_invalidation(self):

def test_pantsd_client_env_var_is_inherited_by_pantsd_runner_children(self):
EXPECTED_VALUE = '333'
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, _):
# First, launch the daemon without any local env vars set.
pantsd_run(['help'])
checker.assert_started()
Expand Down Expand Up @@ -399,7 +400,9 @@ def test_pantsd_launch_env_var_is_not_inherited_by_pantsd_runner_children(self):
def test_pantsd_invalidation_file_tracking(self):
test_file = 'testprojects/src/python/print_env/main.py'
config = {'pantsd_invalidation_globs': '["testprojects/src/python/print_env/*"]'}
with self.pantsd_successful_run_context(extra_config=config) as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context(extra_config=config) as (
pantsd_run, checker, workdir, _
):
pantsd_run(['help'])
checker.assert_started()

Expand All @@ -420,6 +423,42 @@ def test_pantsd_invalidation_file_tracking(self):

self.assertIn('saw file events covered by invalidation globs', full_pantsd_log(workdir))

def test_pantsd_pid_deleted(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, config):
pantsd_run(['help'])
checker.assert_started()

# Let any fs events quiesce.
time.sleep(5)

checker.assert_running()
os.unlink(os.path.join(config["GLOBAL"]["pants_subprocessdir"], "pantsd", "pid"))

# Permit ample time for the async file event propagate in CI.
time.sleep(10)
checker.assert_stopped()

def test_pantsd_pid_change(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, config):
pantsd_run(['help'])
checker.assert_started()

# Let any fs events quiesce.
time.sleep(5)

checker.assert_running()

pidpath = os.path.join(config["GLOBAL"]["pants_subprocessdir"], "pantsd", "pid")
with open(pidpath, 'w') as f:
f.write('9')

# Permit ample time for the async file event propagate in CI.
time.sleep(10)
checker.assert_stopped()

# Remove the pidfile so that the teardown script doesn't try to kill process 9.
os.unlink(pidpath)

def test_pantsd_invalidation_stale_sources(self):
test_path = 'tests/python/pants_test/daemon_correctness_test_0001'
test_build_file = os.path.join(test_path, 'BUILD')
Expand All @@ -428,7 +467,7 @@ def test_pantsd_invalidation_stale_sources(self):
export_cmd = ['export', test_path]

try:
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir, _):
safe_mkdir(test_path, clean=True)

pantsd_run(['help'])
Expand Down
4 changes: 2 additions & 2 deletions tests/python/pants_test/pantsd/test_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ def test_maybe_cast(self):

def test_get_metadata_dir_by_name(self):
self.pmm = ProcessMetadataManager(metadata_base_dir=self.BUILDROOT)
self.assertEqual(self.pmm._get_metadata_dir_by_name(self.NAME),
self.assertEqual(self.pmm._get_metadata_dir_by_name(self.NAME, self.BUILDROOT),
os.path.join(self.BUILDROOT, self.NAME))

def test_maybe_init_metadata_dir_by_name(self):
with mock.patch('pants.pantsd.process_manager.safe_mkdir', **PATCH_OPTS) as mock_mkdir:
self.pmm._maybe_init_metadata_dir_by_name(self.NAME)
mock_mkdir.assert_called_once_with(
self.pmm._get_metadata_dir_by_name(self.NAME))
self.pmm._get_metadata_dir_by_name(self.NAME, self.subprocess_dir))

def test_readwrite_metadata_by_name(self):
with temporary_dir() as tmpdir, \
Expand Down

0 comments on commit d1749d1

Please sign in to comment.