Skip to content
This repository has been archived by the owner on Aug 2, 2023. It is now read-only.

Commit

Permalink
Merge pull request #994 from int19h/503
Browse files Browse the repository at this point in the history
 Fix #503: Subprocesses are not killed when stopping the debugger
  • Loading branch information
int19h authored Nov 9, 2018
2 parents f3cfd0f + 73dafb2 commit a45272b
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 589 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ ci-lint: depends lint
ci-test: depends
# For now we use --quickpy2.
$(PYTHON) -m tests -v --full --no-network --quick-py2
$(PYTHON) setup.py test
$(PYTHON) -m pytest -vv

.PHONY: ci-coverage
ci-coverage: depends
Expand Down
28 changes: 16 additions & 12 deletions ptvsd/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ class JsonMessageChannel(object):

def __init__(self, stream, handlers=None, name='vsc_messaging'):
self.stream = stream
self.send_callback = lambda channel, message: None
self.receive_callback = lambda channel, message: None
self._lock = threading.Lock()
self._stop = threading.Event()
self._seq_iter = itertools.count(1)
Expand Down Expand Up @@ -293,7 +291,6 @@ def _send_message(self, type, rest={}):
with self._lock:
yield seq
self.stream.write_json(message)
self.send_callback(self, message)

def send_request(self, command, arguments=None):
d = {'command': command}
Expand Down Expand Up @@ -327,7 +324,6 @@ def _send_response(self, request_seq, success, command, error_message, body):
pass

def on_message(self, message):
self.receive_callback(self, message)
seq = message['seq']
typ = message['type']
if typ == 'request':
Expand Down Expand Up @@ -359,7 +355,7 @@ def on_request(self, seq, command, arguments):
request = Request(self, seq, command, arguments)
try:
response_body = handler(request)
except Exception as ex:
except RequestFailure as ex:
self._send_response(seq, False, command, str(ex), None)
else:
if isinstance(response_body, Exception):
Expand Down Expand Up @@ -387,6 +383,14 @@ def on_response(self, seq, request_seq, success, command, error_message, body):
body = RequestFailure(error_message)
return request._handle_response(seq, command, body)

def on_disconnect(self):
# There's no more incoming messages, so any requests that are still pending
# must be marked as failed to unblock anyone waiting on them.
with self._lock:
for request in self._requests.values():
request._handle_response(None, request.command, EOFError('No response'))
getattr(self._handlers, 'disconnect', lambda: None)()

def _process_incoming_messages(self):
try:
while True:
Expand All @@ -401,17 +405,17 @@ def _process_incoming_messages(self):
traceback.print_exc(file=sys.__stderr__)
raise
finally:
# There's no more incoming messages, so any requests that are still pending
# must be marked as failed to unblock anyone waiting on them.
with self._lock:
for request in self._requests.values():
request._handle_response(None, request.command, EOFError('No response'))

try:
self.on_disconnect()
except Exception:
print('Error while processing disconnect', file=sys.__stderr__)
traceback.print_exc(file=sys.__stderr__)
raise

class MessageHandlers(object):
"""A simple delegating message handlers object for use with JsonMessageChannel.
For every argument provided, the object has an attribute with the corresponding
name and value. Example:
name and value.
"""

def __init__(self, **kwargs):
Expand Down
37 changes: 37 additions & 0 deletions ptvsd/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import itertools
import os
import re
import signal
import socket
import sys
import threading
import time
import traceback

Expand All @@ -27,8 +29,15 @@
from _pydevd_bundle.pydevd_comm import get_global_debugger


subprocess_lock = threading.Lock()

subprocess_listener_socket = None

subprocesses = {}
"""List of known subprocesses. Keys are process IDs, values are JsonMessageChannel
instances; subprocess_lock must be used to synchronize access.
"""

subprocess_queue = queue.Queue()
"""A queue of incoming 'ptvsd_subprocess' notifications. Whenenever a new request
is received, a tuple of (subprocess_request, subprocess_response) is placed in the
Expand Down Expand Up @@ -66,6 +75,7 @@ def listen_for_subprocesses():

subprocess_listener_socket = create_server('localhost', 0)
atexit.register(stop_listening_for_subprocesses)
atexit.register(kill_subprocesses)
new_hidden_thread('SubprocessListener', _subprocess_listener).start()


Expand All @@ -80,6 +90,18 @@ def stop_listening_for_subprocesses():
subprocess_listener_socket = None


def kill_subprocesses():
with subprocess_lock:
pids = list(subprocesses.keys())
for pid in pids:
with subprocess_lock:
subprocesses.pop(pid, None)
try:
os.kill(pid, signal.SIGTERM)
except Exception:
pass


def subprocess_listener_port():
if subprocess_listener_socket is None:
return None
Expand All @@ -100,6 +122,8 @@ def _subprocess_listener():

def _handle_subprocess(n, stream):
class Handlers(object):
_pid = None

def ptvsd_subprocess_request(self, request):
# When child process is spawned, the notification it sends only
# contains information about itself and its immediate parent.
Expand All @@ -110,12 +134,21 @@ def ptvsd_subprocess_request(self, request):
'rootStartRequest': root_start_request,
})

self._pid = arguments['processId']
with subprocess_lock:
subprocesses[self._pid] = channel

debug('ptvsd_subprocess: %r' % arguments)
response = {'incomingConnection': False}
subprocess_queue.put((arguments, response))
subprocess_queue.join()
return response

def disconnect(self):
if self._pid is not None:
with subprocess_lock:
subprocesses.pop(self._pid, None)

name = 'SubprocessListener-%d' % n
channel = JsonMessageChannel(stream, Handlers(), name)
channel.start()
Expand Down Expand Up @@ -151,6 +184,10 @@ def notify_root(port):
traceback.print_exc()
sys.exit(0)

# Keep the channel open until we exit - root process uses open channels to keep
# track of which subprocesses are alive and which are not.
atexit.register(lambda: channel.close())

if not response['incomingConnection']:
debugger = get_global_debugger()
while debugger is None:
Expand Down
2 changes: 2 additions & 0 deletions ptvsd/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,8 @@ def on_configurationDone(self, request, args):
self._notify_ready()

def on_disconnect(self, request, args):
multiproc.kill_subprocesses()

debugger_attached.clear()
self._restart_debugger = args.get('restart', False)

Expand Down
5 changes: 4 additions & 1 deletion pytests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import types

from . import helpers
from .helpers.printer import wait_for_output
from .helpers.session import DebugSession


Expand All @@ -29,9 +30,11 @@ def pytest_runtest_makereport(item, call):

@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_pyfunc_call(pyfuncitem):
# Resets the timestamp zero for every new test.
# Resets the timestamp to zero for every new test, and ensures that
# all output is printed after the test.
helpers.timestamp_zero = helpers.clock()
yield
wait_for_output()


@pytest.fixture
Expand Down
61 changes: 57 additions & 4 deletions pytests/func/test_multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from pytests.helpers.pattern import ANY
from pytests.helpers.session import DebugSession
from pytests.helpers.timeline import Event, Request
from pytests.helpers.timeline import Event, Request, Response


@pytest.mark.timeout(60)
Expand Down Expand Up @@ -137,6 +137,7 @@ def child(q):

assert debug_session.read_json() == 'done'


@pytest.mark.timeout(60)
@pytest.mark.skipif(sys.version_info < (3, 0) and (platform.system() != 'Windows'),
reason='Bug #935')
Expand All @@ -152,8 +153,7 @@ def parent():
import os
import subprocess
import sys
argv = [sys.executable]
argv += [sys.argv[1], '--arg1', '--arg2', '--arg3']
argv = [sys.executable, sys.argv[1], '--arg1', '--arg2', '--arg3']
env = os.environ.copy()
process = subprocess.Popen(argv, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
Expand All @@ -180,10 +180,11 @@ def parent():
'arguments': root_start_request.arguments,
}
})
child_pid = child_subprocess.body['processId']
child_port = child_subprocess.body['port']
debug_session.proceed()

child_session = DebugSession(method='attach_socket', ptvsd_port=child_port)
child_session = DebugSession(method='attach_socket', ptvsd_port=child_port, pid=child_pid)
child_session.ignore_unobserved = debug_session.ignore_unobserved
child_session.connect()
child_session.handshake()
Expand All @@ -192,4 +193,56 @@ def parent():
child_argv = debug_session.read_json()
assert child_argv == [child, '--arg1', '--arg2', '--arg3']

child_session.wait_for_exit()
debug_session.wait_for_exit()


@pytest.mark.timeout(60)
@pytest.mark.skipif(sys.version_info < (3, 0) and (platform.system() != 'Windows'),
reason='Bug #935')
def test_autokill(debug_session, pyfile):
@pyfile
def child():
while True:
pass

@pyfile
def parent():
import backchannel
import os
import subprocess
import sys
argv = [sys.executable, sys.argv[1]]
env = os.environ.copy()
subprocess.Popen(argv, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
backchannel.read_json()

debug_session.multiprocess = True
debug_session.program_args += [child]
debug_session.prepare_to_run(filename=parent, backchannel=True)
debug_session.start_debugging()

child_subprocess = debug_session.wait_for_next(Event('ptvsd_subprocess'))
child_pid = child_subprocess.body['processId']
child_port = child_subprocess.body['port']

debug_session.proceed()

child_session = DebugSession(method='attach_socket', ptvsd_port=child_port, pid=child_pid)
child_session.expected_returncode = ANY
child_session.connect()
child_session.handshake()
child_session.start_debugging()

if debug_session.method == 'launch':
# In launch scenario, terminate the parent process by disconnecting from it.
debug_session.expected_returncode = ANY
disconnect = debug_session.send_request('disconnect', {})
debug_session.wait_for_next(Response(disconnect))
else:
# In attach scenario, just let the parent process run to completion.
debug_session.expected_returncode = 0
debug_session.write_json(None)

debug_session.wait_for_exit()
child_session.wait_for_exit()
21 changes: 3 additions & 18 deletions pytests/helpers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ def timestamp():
return clock() - timestamp_zero


print_lock = threading.Lock()
real_print = print

def print(*args, **kwargs):
"""Like builtin print(), but synchronized using a global lock,
and adds a timestamp
"""
from . import colors
timestamped = kwargs.pop('timestamped', True)
with print_lock:
if timestamped:
t = timestamp()
real_print(colors.LIGHT_BLACK, end='')
real_print('@%09.6f: ' % t, end='')
real_print(colors.RESET, end='')
real_print(*args, **kwargs)


def dump_stacks():
"""Dump the stacks of all threads except the current thread"""
current_ident = threading.current_thread().ident
Expand Down Expand Up @@ -72,3 +54,6 @@ def dumper():
thread = threading.Thread(target=dumper)
thread.daemon = True
thread.start()


from .printer import print
13 changes: 7 additions & 6 deletions pytests/helpers/colors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

from __future__ import print_function, with_statement, absolute_import

import platform


if platform.system() != 'Linux':
# pytest-timeout seems to be buggy wrt colorama when capturing output.
#
# TODO: re-enable after enabling proper ANSI sequence handling:
if True:
# On Win32, colorama is not active when pytest-timeout dumps captured output
# on timeout, and ANSI sequences aren't properly interpreted.
# TODO: re-enable on Windows after enabling proper ANSI sequence handling:
# https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
#
# Azure Pipelines doesn't support ANSI sequences at all.
# TODO: re-enable on all platforms after adding Azure Pipelines detection.

RESET = ''
BLACK = ''
Expand Down
Loading

0 comments on commit a45272b

Please sign in to comment.