Skip to content

Commit

Permalink
test enhancement, resolving race condition when tracking cgroups
Browse files Browse the repository at this point in the history
  • Loading branch information
pgombar committed Sep 12, 2019
1 parent e7f38c4 commit 99d907c
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 120 deletions.
52 changes: 31 additions & 21 deletions azurelinuxagent/common/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from azurelinuxagent.common.exception import CGroupsException, ExtensionError, ExtensionErrorCodes
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.utils import fileutil, shellutil
from azurelinuxagent.common.utils.processutil import read_output, start_subprocess_and_wait_for_completion, \
from azurelinuxagent.common.utils.extensionprocessutil import read_output, handle_process_completion, \
wait_for_process_completion_or_timeout
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION

Expand Down Expand Up @@ -348,16 +348,21 @@ def pre_exec_function():
logger.warn("Failed to add extension {0} to its cgroup. Resource usage will not be tracked. "
"Error: {1}".format(extension_name, ustr(e)))

process = subprocess.Popen(command,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=pre_exec_function)

self.track_cgroups(extension_cgroups)
process_output = start_subprocess_and_wait_for_completion(command=command,
timeout=timeout,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=pre_exec_function,
error_code=error_code)
process_output = handle_process_completion(process=process,
command=command,
timeout=timeout,
stdout=stdout,
stderr=stderr,
error_code=error_code)

return extension_cgroups, process_output

Expand Down Expand Up @@ -463,7 +468,7 @@ def create_cgroup(controller):
return cgroups

@staticmethod
def is_systemd_failure(scope_name, process_output):
def _is_systemd_failure(scope_name, process_output):
unit_not_found = "Unit {0} not found.".format(scope_name)
return unit_not_found in process_output or scope_name not in process_output

Expand Down Expand Up @@ -501,7 +506,7 @@ def create_cgroup(controller):
else:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
systemd_failure = self.is_systemd_failure(scope_name, process_output)
systemd_failure = self._is_systemd_failure(scope_name, process_output)

if not systemd_failure:
# There was an extension error; it either timed out or returned a non-zero exit code.
Expand All @@ -528,15 +533,20 @@ def create_cgroup(controller):
stderr.truncate(0)

# Try invoking the process again, this time without systemd-run
process_output = start_subprocess_and_wait_for_completion(command=command,
timeout=timeout,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid,
error_code=error_code)
process = subprocess.Popen(command,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid)

process_output = handle_process_completion(process=process,
command=command,
timeout=timeout,
stdout=stdout,
stderr=stderr,
error_code=error_code)

return [], process_output

Expand Down
28 changes: 17 additions & 11 deletions azurelinuxagent/common/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
# Requires Python 2.6+ and Openssl 1.0+

import os
import subprocess

from azurelinuxagent.common import logger
from azurelinuxagent.common.cgroupapi import CGroupsApi
from azurelinuxagent.common.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.common.exception import CGroupsException, ExtensionError, ExtensionErrorCodes
from azurelinuxagent.common.exception import CGroupsException, ExtensionErrorCodes
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.utils.processutil import start_subprocess_and_wait_for_completion
from azurelinuxagent.common.utils.extensionprocessutil import handle_process_completion
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
from azurelinuxagent.common.event import add_event, WALAEventOperation

Expand Down Expand Up @@ -155,15 +156,20 @@ def start_extension_command(self, extension_name, command, timeout, shell, cwd,
:param error_code: Extension error code to raise in case of error
"""
if not self.enabled():
process_output = start_subprocess_and_wait_for_completion(command=command,
timeout=timeout,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid,
error_code=error_code)
process = subprocess.Popen(command,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=os.setsid)

process_output = handle_process_completion(process=process,
command=command,
timeout=timeout,
stdout=stdout,
stderr=stderr,
error_code=error_code)
else:
extension_cgroups, process_output = self._cgroups_api.start_extension_command(extension_name,
command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@
from azurelinuxagent.common.future import ustr
import os
import signal
import subprocess
import time

TELEMETRY_MESSAGE_MAX_LEN = 3200


def wait_for_process_completion_or_timeout(process, timeout):
"""
Utility function that waits for the process to complete within the given time frame. This function will terminate
the process if when the given time frame elapses.
:param process: Reference to a running process
:param timeout: Number of seconds to wait for the process to complete before killing it
:return: Two parameters: boolean for if the process timed out and the return code of the process (None if timed out)
"""
while timeout > 0 and process.poll() is None:
time.sleep(1)
timeout -= 1
Expand All @@ -44,16 +50,19 @@ def wait_for_process_completion_or_timeout(process, timeout):
return timeout == 0, return_code


def start_subprocess_and_wait_for_completion(command, timeout, shell, cwd, env, stdout, stderr, preexec_fn, error_code):
process = subprocess.Popen(
command,
shell=shell,
cwd=cwd,
env=env,
stdout=stdout,
stderr=stderr,
preexec_fn=preexec_fn)

def handle_process_completion(process, command, timeout, stdout, stderr, error_code):
"""
Utility function that waits for process completion and retrieves its output (stdout and stderr) if it completed
before the timeout period. Otherwise, the process will get killed and an ExtensionError will be raised.
In case the return code is non-zero, ExtensionError will be raised.
:param process: Reference to a running process
:param command: The extension command to run
:param timeout: Number of seconds to wait before killing the process
:param stdout: Must be a file since we seek on it when parsing the subprocess output
:param stderr: Must be a file since we seek on it when parsing the subprocess outputs
:param error_code: The error code to set if we raise an ExtensionError
:return:
"""
# Wait for process completion or timeout
timed_out, return_code = wait_for_process_completion_or_timeout(process, timeout)
process_output = read_output(stdout, stderr)
Expand All @@ -70,6 +79,12 @@ def start_subprocess_and_wait_for_completion(command, timeout, shell, cwd, env,


def read_output(stdout, stderr):
"""
Read the output of the process sent to stdout and stderr and trim them to the max appropriate length.
:param stdout: File containing the stdout of the process
:param stderr: File containing the stderr of the process
:return: Returns the formatted concatenated stdout and stderr of the process
"""
try:
stdout.seek(0)
stderr.seek(0)
Expand Down Expand Up @@ -124,4 +139,3 @@ def to_s(captured_stdout, stdout_offset, captured_stderr, stderr_offset):
return to_s(stdout, -1*stdout_len, stderr, 0)
else:
return to_s(stdout, -1*max_len_each, stderr, -1*max_len_each)

1 change: 0 additions & 1 deletion azurelinuxagent/ga/exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
get_properties, \
set_properties
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
from azurelinuxagent.common.utils.processutil import read_output
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION, GOAL_STATE_AGENT_VERSION, \
DISTRO_NAME, DISTRO_VERSION, PY_VERSION_MAJOR, PY_VERSION_MINOR, PY_VERSION_MICRO

Expand Down
51 changes: 33 additions & 18 deletions tests/common/test_cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,24 @@ def test_start_extension_command_should_add_the_child_process_to_the_extension_c
stdout=stdout,
stderr=stderr)

expected_output = "[stdout]\n{0}\n\n\n[stderr]\n"
# The expected format of the process output is [stdout]\n{PID}\n\n\n[stderr]\n"
pattern = re.compile(r"\[stdout\]\n(\d+)\n\n\n\[stderr\]\n")
m = pattern.match(process_output)

try:
pid_from_output = int(m.group(1))
except Exception as e:
self.fail("No PID could be extracted from the process output! Error: {0}".format(ustr(e)))

for cgroup in extension_cgroups:
cgroups_procs_path = os.path.join(cgroup.path, "cgroup.procs")
with open(cgroups_procs_path, "r") as f:
contents = f.read()
pid = int(contents)
pid_from_cgroup = int(contents)

self.assertEquals(process_output, expected_output.format(pid),
"The PID of the command was not added to {0}. Expected:\n{1}\ngot:\n{2}".format(cgroups_procs_path, expected_output.format(pid), process_output))
self.assertEquals(pid_from_output, pid_from_cgroup,
"The PID from the process output ({0}) does not match the PID found in the"
"process cgroup {1} ({2})".format(pid_from_output, cgroups_procs_path, pid_from_cgroup))


@skip_if_predicate_false(is_systemd_present, "Systemd cgroups API doesn't manage cgroups on systems not using systemd.")
Expand Down Expand Up @@ -517,7 +525,7 @@ def mock_popen(*args, **kwargs):
with patch("azurelinuxagent.common.cgroupapi.subprocess.Popen", side_effect=mock_popen):
with patch("azurelinuxagent.common.cgroupapi.wait_for_process_completion_or_timeout",
return_value=[True, None]):
with patch("azurelinuxagent.common.cgroupapi.SystemdCgroupsApi.is_systemd_failure",
with patch("azurelinuxagent.common.cgroupapi.SystemdCgroupsApi._is_systemd_failure",
return_value=True):
extension_cgroups, process_output = SystemdCgroupsApi().start_extension_command(
extension_name="Microsoft.Compute.TestExtension-1.2.3",
Expand All @@ -539,19 +547,26 @@ def test_start_extension_command_should_not_use_fallback_option_if_extension_fai

with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stdout:
with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stderr:
with self.assertRaises(ExtensionError) as context_manager:
SystemdCgroupsApi().start_extension_command(
extension_name="Microsoft.Compute.TestExtension-1.2.3",
command="ls folder_does_not_exist",
timeout=300,
shell=True,
cwd=self.tmp_dir,
env={},
stdout=stdout,
stderr=stderr)
with patch("azurelinuxagent.common.cgroupapi.subprocess.Popen", wraps=subprocess.Popen) \
as patch_mock_popen:
with self.assertRaises(ExtensionError) as context_manager:
SystemdCgroupsApi().start_extension_command(
extension_name="Microsoft.Compute.TestExtension-1.2.3",
command="ls folder_does_not_exist",
timeout=300,
shell=True,
cwd=self.tmp_dir,
env={},
stdout=stdout,
stderr=stderr)

# We should have invoked the extension command only once, in the systemd-run case
self.assertEquals(1, patch_mock_popen.call_count)
args = patch_mock_popen.call_args[0][0]
self.assertIn("systemd-run --unit", args)

self.assertEquals(context_manager.exception.code, ExtensionErrorCodes.PluginUnknownFailure)
self.assertIn("Non-zero exit code", ustr(context_manager.exception))
self.assertEquals(context_manager.exception.code, ExtensionErrorCodes.PluginUnknownFailure)
self.assertIn("Non-zero exit code", ustr(context_manager.exception))

@attr('requires_sudo')
@patch("azurelinuxagent.common.cgroupapi.add_event")
Expand All @@ -562,7 +577,7 @@ def test_start_extension_command_should_not_use_fallback_option_if_extension_tim
with tempfile.TemporaryFile(dir=self.tmp_dir, mode="w+b") as stderr:
with patch("azurelinuxagent.common.cgroupapi.wait_for_process_completion_or_timeout",
return_value=[True, None]):
with patch("azurelinuxagent.common.cgroupapi.SystemdCgroupsApi.is_systemd_failure",
with patch("azurelinuxagent.common.cgroupapi.SystemdCgroupsApi._is_systemd_failure",
return_value=False):
with self.assertRaises(ExtensionError) as context_manager:
SystemdCgroupsApi().start_extension_command(
Expand Down
2 changes: 1 addition & 1 deletion tests/common/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
WALAEventOperation, elapsed_milliseconds
from azurelinuxagent.common.exception import EventError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.utils.processutil import read_output
from azurelinuxagent.common.utils.extensionprocessutil import read_output
from azurelinuxagent.common.version import CURRENT_VERSION
from azurelinuxagent.ga.monitor import MonitorHandler

Expand Down
6 changes: 3 additions & 3 deletions tests/ga/test_exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from azurelinuxagent.ga.exthandlers import parse_ext_status, ExtHandlerInstance, get_exthandlers_handler
from azurelinuxagent.common.exception import ProtocolError, ExtensionError, ExtensionErrorCodes
from azurelinuxagent.common.event import WALAEventOperation
from azurelinuxagent.common.utils.processutil import TELEMETRY_MESSAGE_MAX_LEN, format_stdout_stderr, read_output
from azurelinuxagent.common.utils.extensionprocessutil import TELEMETRY_MESSAGE_MAX_LEN, format_stdout_stderr, read_output
from tests.tools import *


Expand Down Expand Up @@ -539,7 +539,7 @@ def test_it_should_read_only_the_head_of_large_outputs(self):
# Mocking the call to file.read() is difficult, so instead we mock the call to format_stdout_stderr, which takes the
# return value of the calls to file.read(). The intention of the test is to verify we never read (and load in memory)
# more than a few KB of data from the files used to capture stdout/stderr
with patch('azurelinuxagent.common.utils.processutil.format_stdout_stderr', side_effect=format_stdout_stderr) as mock_format:
with patch('azurelinuxagent.common.utils.extensionprocessutil.format_stdout_stderr', side_effect=format_stdout_stderr) as mock_format:
output = self.ext_handler_instance.launch_command(command)

self.assertGreaterEqual(len(output), 1024)
Expand Down Expand Up @@ -571,7 +571,7 @@ def test_it_should_handle_errors_while_reading_the_command_output(self):
def capture_process_output(stdout_file, stderr_file):
return original_capture_process_output(None, None)

with patch('azurelinuxagent.common.utils.processutil.read_output', side_effect=capture_process_output):
with patch('azurelinuxagent.common.utils.extensionprocessutil.read_output', side_effect=capture_process_output):
output = self.ext_handler_instance.launch_command(command)

self.assertIn("[stderr]\nCannot read stdout/stderr:", output)
Loading

0 comments on commit 99d907c

Please sign in to comment.