Skip to content

Commit

Permalink
Merge pull request #1119 from moreati/ci-resourcewarnings
Browse files Browse the repository at this point in the history
CI: Reliability, eliminate a race condition and some resource leaks
  • Loading branch information
moreati authored Sep 10, 2024
2 parents 16c602a + d032c59 commit b8b1558
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 36 deletions.
2 changes: 2 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Unreleased
* :gh:issue:`905` Initial support for templated ``ansible_ssh_args``,
``ansible_ssh_common_args``, and ``ansible_ssh_extra_args`` variables.
NB: play or task scoped variables will probably still fail.
* :gh:issue:`694` CI: Fixed a race condition and some resource leaks causing
some of intermittent failures when running the test suite.


v0.3.9 (2024-08-13)
Expand Down
16 changes: 15 additions & 1 deletion mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,7 @@ def _signal_child(self, signum):
# because it is setuid, so this is best-effort only.
LOG.debug('%r: sending %s', self.proc, SIGNAL_BY_NUM[signum])
try:
os.kill(self.proc.pid, signum)
self.proc.send_signal(signum)
except OSError:
e = sys.exc_info()[1]
if e.args[0] != errno.EPERM:
Expand Down Expand Up @@ -2662,6 +2662,17 @@ def poll(self):
"""
raise NotImplementedError()

def send_signal(self, sig):
os.kill(self.pid, sig)

def terminate(self):
"Ask the process to gracefully shutdown."
self.send_signal(signal.SIGTERM)

def kill(self):
"Ask the operating system to forcefully destroy the process."
self.send_signal(signal.SIGKILL)


class PopenProcess(Process):
"""
Expand All @@ -2678,6 +2689,9 @@ def __init__(self, proc, stdin, stdout, stderr=None):
def poll(self):
return self.proc.poll()

def send_signal(self, sig):
self.proc.send_signal(sig)


class ModuleForwarder(object):
"""
Expand Down
10 changes: 7 additions & 3 deletions mitogen/unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,23 @@ def on_shutdown(self, broker):
def on_accept_client(self, sock):
sock.setblocking(True)
try:
pid, = struct.unpack('>L', sock.recv(4))
data = sock.recv(4)
pid, = struct.unpack('>L', data)
except (struct.error, socket.error):
LOG.error('listener: failed to read remote identity: %s',
sys.exc_info()[1])
LOG.error('listener: failed to read remote identity, got %d bytes: %s',
len(data), sys.exc_info()[1])
sock.close()
return

context_id = self._router.id_allocator.allocate()
try:
# FIXME #1109 send() returns number of bytes sent, check it
sock.send(struct.pack('>LLL', context_id, mitogen.context_id,
os.getpid()))
except socket.error:
LOG.error('listener: failed to assign identity to PID %d: %s',
pid, sys.exc_info()[1])
sock.close()
return

context = mitogen.parent.Context(self._router, context_id)
Expand Down
9 changes: 6 additions & 3 deletions tests/connection_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import signal
import sys
Expand Down Expand Up @@ -54,7 +55,9 @@ def do_detach(econtext):
class DetachReapTest(testlib.RouterMixin, testlib.TestCase):
def test_subprocess_preserved_on_shutdown(self):
c1 = self.router.local()
c1_stream = self.router.stream_by_id(c1.context_id)
pid = c1.call(os.getpid)
self.assertEqual(pid, c1_stream.conn.proc.pid)

l = mitogen.core.Latch()
mitogen.core.listen(c1, 'disconnect', l.put)
Expand All @@ -64,8 +67,8 @@ def test_subprocess_preserved_on_shutdown(self):
self.broker.shutdown()
self.broker.join()

os.kill(pid, 0) # succeeds if process still alive
self.assertIsNone(os.kill(pid, 0)) # succeeds if process still alive

# now clean up
os.kill(pid, signal.SIGTERM)
os.waitpid(pid, 0)
c1_stream.conn.proc.terminate()
c1_stream.conn.proc.proc.wait()
1 change: 1 addition & 0 deletions tests/create_child_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def close_proc(proc):
proc.stdout.close()
if proc.stderr:
proc.stderr.close()
proc.proc.wait()


def wait_read(fp, n):
Expand Down
2 changes: 1 addition & 1 deletion tests/data/importer/six_brokenpkg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@
else:
from . import _six as six
six_py_file = '{0}.py'.format(os.path.splitext(six.__file__)[0])
exec(open(six_py_file, 'rb').read())
with open(six_py_file, 'rb') as f: exec(f.read())
3 changes: 3 additions & 0 deletions tests/id_allocation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ def test_slave_allocates_id(self):
# Subsequent master allocation does not collide
c2 = self.router.local()
self.assertEqual(1002, c2.context_id)

context.shutdown()
c2.shutdown()
20 changes: 9 additions & 11 deletions tests/reaper_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@


class ReaperTest(testlib.TestCase):
@mock.patch('os.kill')
def test_calc_delay(self, kill):
def test_calc_delay(self):
broker = mock.Mock()
proc = mock.Mock()
proc.poll.return_value = None
Expand All @@ -24,29 +23,28 @@ def test_calc_delay(self, kill):
self.assertEqual(752, int(1000 * reaper._calc_delay(5)))
self.assertEqual(1294, int(1000 * reaper._calc_delay(6)))

@mock.patch('os.kill')
def test_reap_calls(self, kill):
def test_reap_calls(self):
broker = mock.Mock()
proc = mock.Mock()
proc.poll.return_value = None

reaper = mitogen.parent.Reaper(broker, proc, True, True)

reaper.reap()
self.assertEqual(0, kill.call_count)
self.assertEqual(0, proc.send_signal.call_count)

reaper.reap()
self.assertEqual(1, kill.call_count)
self.assertEqual(1, proc.send_signal.call_count)

reaper.reap()
reaper.reap()
reaper.reap()
self.assertEqual(1, kill.call_count)
self.assertEqual(1, proc.send_signal.call_count)

reaper.reap()
self.assertEqual(2, kill.call_count)
self.assertEqual(2, proc.send_signal.call_count)

self.assertEqual(kill.mock_calls, [
mock.call(proc.pid, signal.SIGTERM),
mock.call(proc.pid, signal.SIGKILL),
self.assertEqual(proc.send_signal.mock_calls, [
mock.call(signal.SIGTERM),
mock.call(signal.SIGKILL),
])
1 change: 1 addition & 0 deletions tests/ssh_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def test_verbose_enabled(self):
self.dockerized_ssh.port,
)
self.assertEqual(name, context.name)
context.shutdown(wait=True)


class StubPermissionDeniedTest(StubSshMixin, testlib.TestCase):
Expand Down
34 changes: 28 additions & 6 deletions tests/testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ def data_path(suffix):
return path


def retry(fn, on, max_attempts, delay):
for i in range(max_attempts):
try:
return fn()
except on:
if i >= max_attempts - 1:
raise
else:
time.sleep(delay)


def threading__thread_is_alive(thread):
"""Return whether the thread is alive (Python version compatibility shim).
Expand Down Expand Up @@ -562,18 +573,24 @@ def wait_for_sshd(self):
wait_for_port(self.get_host(), self.port, pattern='OpenSSH')

def check_processes(self):
args = ['docker', 'exec', self.container_name, 'ps', '-o', 'comm=']
# Get Accounting name (ucomm) & command line (args) of each process
# in the container. No truncation (-ww). No column headers (foo=).
ps_output = subprocess.check_output([
'docker', 'exec', self.container_name,
'ps', '-w', '-w', '-o', 'ucomm=', '-o', 'args=',
])
ps_lines = ps_output.decode().splitlines()
processes = [tuple(line.split(None, 1)) for line in ps_lines]
counts = {}
for comm in subprocess.check_output(args).decode().splitlines():
comm = comm.strip()
counts[comm] = counts.get(comm, 0) + 1
for ucomm, _ in processes:
counts[ucomm] = counts.get(ucomm, 0) + 1

if counts != {'ps': 1, 'sshd': 1}:
assert 0, (
'Docker container %r contained extra running processes '
'after test completed: %r' % (
self.container_name,
counts
processes,
)
)

Expand Down Expand Up @@ -630,7 +647,12 @@ def setUpClass(cls):

@classmethod
def tearDownClass(cls):
cls.dockerized_ssh.check_processes()
retry(
cls.dockerized_ssh.check_processes,
on=AssertionError,
max_attempts=5,
delay=0.1,
)
cls.dockerized_ssh.close()
super(DockerMixin, cls).tearDownClass()

Expand Down
18 changes: 7 additions & 11 deletions tests/unix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,13 @@ class ListenerTest(testlib.RouterMixin, testlib.TestCase):

def test_constructor_basic(self):
listener = self.klass.build_stream(router=self.router)
capture = testlib.LogCapturer()
capture.start()
try:
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)
# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True
finally:
capture.stop()
self.assertFalse(mitogen.unix.is_path_dead(listener.protocol.path))
os.unlink(listener.protocol.path)

# ensure we catch 0 byte read error log message
self.broker.shutdown()
self.broker.join()
self.broker_shutdown = True


class ClientTest(testlib.TestCase):
Expand Down

0 comments on commit b8b1558

Please sign in to comment.