Skip to content

Commit

Permalink
Adding a regression test for the hanging direct+ssh combination
Browse files Browse the repository at this point in the history
This test checks that submitting a long job over the direct scheduler
does not "hang" with any plugin.
  • Loading branch information
giovannipizzi committed Feb 10, 2021
1 parent d0ccfb0 commit 041b78b
Showing 1 changed file with 81 additions and 97 deletions.
178 changes: 81 additions & 97 deletions tests/transports/test_all_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@
Plugin specific tests will be written in the plugin itself.
"""
import io
import os
import random
import tempfile
import signal
import shutil
import string
import time
import unittest

import psutil

from aiida.plugins import SchedulerFactory

# TODO : test for copy with pattern
# TODO : test for copy with/without patterns, overwriting folder
# TODO : test for exotic cases of copy with source = destination
Expand All @@ -35,7 +46,6 @@ def get_all_custom_transports():
it was found)
"""
import importlib
import os

modulename = __name__.rpartition('.')[0]
this_full_fname = __file__
Expand Down Expand Up @@ -133,11 +143,6 @@ def test_makedirs(self, custom_transport):
"""
Verify the functioning of makedirs command
"""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand Down Expand Up @@ -176,11 +181,6 @@ def test_rmtree(self, custom_transport):
"""
Verify the functioning of rmtree command
"""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand Down Expand Up @@ -221,12 +221,6 @@ def test_listdir(self, custom_transport):
"""
create directories, verify listdir, delete a folder with subfolders
"""
# Imports required later
import tempfile
import random
import string
import os

with custom_transport as trans:
# We cannot use tempfile.mkdtemp because we're on a remote folder
location = trans.normalize(os.path.join('/', 'tmp'))
Expand Down Expand Up @@ -270,11 +264,6 @@ def test_listdir_withattributes(self, custom_transport):
"""
create directories, verify listdir_withattributes, delete a folder with subfolders
"""
# Imports required later
import tempfile
import random
import string
import os

def simplify_attributes(data):
"""
Expand Down Expand Up @@ -340,11 +329,6 @@ def simplify_attributes(data):
@run_for_all_plugins
def test_dir_creation_deletion(self, custom_transport):
"""Test creating and deleting directories."""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand All @@ -370,11 +354,6 @@ def test_dir_copy(self, custom_transport):
Verify if in the copy of a directory also the protection bits
are carried over
"""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand Down Expand Up @@ -403,11 +382,6 @@ def test_dir_permissions_creation_modification(self, custom_transport): # pylin
verify if chmod raises IOError when trying to change bits on a
non-existing folder
"""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand Down Expand Up @@ -460,11 +434,6 @@ def test_dir_reading_permissions(self, custom_transport):
Try to enter a directory with no read permissions.
Verify that the cwd has not changed after failed try.
"""
# Imports required later
import random
import string
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
directory = 'temp_dir_test'
Expand Down Expand Up @@ -503,8 +472,6 @@ def test_isfile_isdir_to_empty_string(self, custom_transport):
I check that isdir or isfile return False when executed on an
empty string
"""
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
transport.chdir(location)
Expand All @@ -517,8 +484,6 @@ def test_isfile_isdir_to_non_existing_string(self, custom_transport):
I check that isdir or isfile return False when executed on an
empty string
"""
import os

with custom_transport as transport:
location = transport.normalize(os.path.join('/', 'tmp'))
transport.chdir(location)
Expand All @@ -535,8 +500,6 @@ def test_chdir_to_empty_string(self, custom_transport):
not change (this is a paramiko default behavior), but getcwd()
is still correctly defined.
"""
import os

with custom_transport as transport:
new_dir = transport.normalize(os.path.join('/', 'tmp'))
transport.chdir(new_dir)
Expand All @@ -555,10 +518,6 @@ class TestPutGetFile(unittest.TestCase):
@run_for_all_plugins
def test_put_and_get(self, custom_transport):
"""Test putting and getting files."""
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -605,10 +564,6 @@ def test_put_get_abs_path(self, custom_transport):
"""
test of exception for non existing files and abs path
"""
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -669,10 +624,6 @@ def test_put_get_empty_string(self, custom_transport):
test of exception put/get of empty strings
"""
# TODO : verify the correctness of \n at the end of a file
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -752,10 +703,6 @@ class TestPutGetTree(unittest.TestCase):
@run_for_all_plugins
def test_put_and_get(self, custom_transport):
"""Test putting and getting files."""
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -807,8 +754,6 @@ def test_put_and_get(self, custom_transport):
self.assertTrue('file.txt' in list_pushed_file)
self.assertTrue('file.txt' in list_retrieved_file)

import shutil

shutil.rmtree(local_subfolder)
shutil.rmtree(retrieved_subfolder)
transport.rmtree(remote_subfolder)
Expand All @@ -819,11 +764,6 @@ def test_put_and_get(self, custom_transport):
@run_for_all_plugins
def test_put_and_get_overwrite(self, custom_transport):
"""Test putting and getting files with overwrites."""
import os
import random
import shutil
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -877,10 +817,6 @@ def test_put_and_get_overwrite(self, custom_transport):
@run_for_all_plugins
def test_copy(self, custom_transport):
"""Test copying."""
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -952,10 +888,6 @@ def test_put(self, custom_transport):
# pylint: disable=too-many-statements
# exactly the same tests of copy, just with the put function
# and therefore the local path must be absolute
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -1033,11 +965,6 @@ def test_get(self, custom_transport):
# pylint: disable=too-many-statements
# exactly the same tests of copy, just with the put function
# and therefore the local path must be absolute
import os
import random
import shutil
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -1119,10 +1046,6 @@ def test_put_get_abs_path(self, custom_transport):
"""
test of exception for non existing files and abs path
"""
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -1194,10 +1117,6 @@ def test_put_get_empty_string(self, custom_transport):
test of exception put/get of empty strings
"""
# TODO : verify the correctness of \n at the end of a file
import os
import random
import string

local_dir = os.path.join('/', 'tmp')
remote_dir = local_dir
directory = 'tmp_try'
Expand Down Expand Up @@ -1263,9 +1182,6 @@ def test_put_get_empty_string(self, custom_transport):
@run_for_all_plugins
def test_gettree_nested_directory(self, custom_transport): # pylint: disable=no-self-use
"""Test `gettree` for a nested directory."""
import os
import tempfile

with tempfile.TemporaryDirectory() as dir_remote, tempfile.TemporaryDirectory() as dir_local:
content = b'dummy\ncontent'
filepath = os.path.join(dir_remote, 'sub', 'path', 'filename.txt')
Expand Down Expand Up @@ -1294,8 +1210,6 @@ def test_exec_pwd(self, custom_transport):
creation (which should be done by paramiko) and in the command
execution (done in this module, in the _exec_command_internal function).
"""
import os

# Start value
delete_at_end = False

Expand Down Expand Up @@ -1365,3 +1279,73 @@ def test_exec_with_wrong_stdin(self, custom_transport):
with custom_transport as transport:
with self.assertRaises(ValueError):
transport.exec_command_wait('cat', stdin=1)


class TestDirectScheduler(unittest.TestCase):
"""
Test how the direct scheduler works.
While this is technically a scheduler test, I put it under the transport tests
because 1) in reality I am testing the interaction of each transport with the
direct scheduler; 2) the direct scheduler is always available; 3) I am reusing
the infrastructure to test on multiple transport plugins.
"""

@run_for_all_plugins
def test_asynchronous_execution(self, custom_transport):
"""Test that the execution of a long(ish) command via the direct scheduler does not block.
This is a regression test for #3094, where running a long job on the direct scheduler
(via SSH) would lock the interpreter until the job was done.
"""
# Use a unique name, using the current process PID as well, to avoid concurrent tests (or very rapid
# tests that follow each other) to overwrite the same destination
script_fname = f'sleep-submit-{os.getpid()}-{custom_transport.__class__.__name__}.sh'

scheduler = SchedulerFactory('direct')()
scheduler.set_transport(custom_transport)
with custom_transport as transport:
with tempfile.NamedTemporaryFile() as tmpf:
# Put a submission script that sleeps 10 seconds
tmpf.write(b'#!/bin/bash\nsleep 10\n')
tmpf.flush()

transport.chdir('/tmp')
transport.putfile(tmpf.name, script_fname)

timestamp_before = time.time()
job_id_string = scheduler.submit_from_script('/tmp', script_fname)

elapsed_time = time.time() - timestamp_before
# We want to get back control. If it takes < 5 seconds, it means that it is not blocking
# as the job is taking at least 10 seconds. I put 5 as the machine could be slow (including the
# SSH connection etc.) and I don't want to have false failures.
# Actually, if the time is short, it could mean also that the execution failed!
# So I double check later that the execution was successful.
self.assertTrue(
elapsed_time < 5,
'Getting back control after remote execution took more than 5 seconds! Probably submission is blocking'
)

# Check that the job is still running
# Wait 1 more second, so that I don't do a super-quick check that might return True
# even if it's not sleeping
time.sleep(1)
# Check that the job is still running - IMPORTANT, I'm assuming that all transports actually act
# on the *same* local machine, and that the job_id is actually the process PID.
# This needs to be adapted if:
# - a new transport plugin is tested and this does not test the same machine
# - a new scheduler is used and does not use the process PID, or the job_id of the 'direct' scheduler
# is not anymore simply the job PID
job_id = int(job_id_string)
self.assertTrue(
psutil.pid_exists(job_id), 'The job is not there after a bit more than 1 second! Probably it failed'
)

# Clean up by killing the remote job (if the job succeeds, to avoid
# leaving arount it; if it doesn't, it's not a big deal anyway, it's a
# sleep that will die in < 10 seconds
os.kill(job_id, signal.SIGTERM)

# Also remove the script
os.remove(f'/tmp/{script_fname}')

0 comments on commit 041b78b

Please sign in to comment.