From ff01b372acf090cece0564c396b496783985624a Mon Sep 17 00:00:00 2001 From: Paula Gombar Date: Tue, 26 Mar 2019 09:46:47 -0700 Subject: [PATCH] Fix PID tracking for cgroups (#1489) * add fixes for cgroup setup and PID tracking * cleanup code comments * disable cgroups test if not supported by environment * remove wrapper cgroup tracking and other nits * fix whitespace * nit fixes; addressing CR comments --- azurelinuxagent/common/cgroups.py | 59 ++++++++++++++++++++----------- azurelinuxagent/ga/exthandlers.py | 2 +- azurelinuxagent/ga/monitor.py | 24 +++++++++---- tests/ga/test_exthandlers.py | 49 +++++++++++++++++++++++-- 4 files changed, 103 insertions(+), 31 deletions(-) diff --git a/azurelinuxagent/common/cgroups.py b/azurelinuxagent/common/cgroups.py index fad8629ee4..556dc91f76 100644 --- a/azurelinuxagent/common/cgroups.py +++ b/azurelinuxagent/common/cgroups.py @@ -28,7 +28,8 @@ from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION -WRAPPER_CGROUP_NAME = "Agent+Extensions" +WRAPPER_CGROUP_NAME = "WALinuxAgent" +AGENT_CGROUP_NAME = "WALinuxAgent" METRIC_HIERARCHIES = ['cpu', 'memory'] MEMORY_DEFAULT = -1 @@ -215,6 +216,7 @@ def track_systemd_service(name): service_name = "{0}.service".format(name).lower() if CGroups.enabled() and not CGroupsTelemetry.is_tracked(service_name): cgroup = CGroups.for_systemd_service(service_name) + logger.info("Now tracking cgroup {0}".format(service_name)) tracker = CGroupsTelemetry(service_name, cgroup=cgroup) CGroupsTelemetry._tracked[service_name] = tracker @@ -248,9 +250,12 @@ def track_agent(): if not CGroups.enabled(): return if CGroups.is_systemd_manager(): - CGroupsTelemetry.track_systemd_service(AGENT_NAME) + logger.info("Tracking systemd cgroup for {0}".format(AGENT_CGROUP_NAME)) + CGroupsTelemetry.track_systemd_service(AGENT_CGROUP_NAME) else: - CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_NAME)) + logger.info("Tracking cgroup for {0}".format(AGENT_CGROUP_NAME)) + # This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent + CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_CGROUP_NAME)) @staticmethod def is_tracked(name): @@ -282,7 +287,6 @@ def collect_all_tracked(): limits = {} for cgroup_name, collector in CGroupsTelemetry._tracked.copy().items(): - cgroup_name = cgroup_name if cgroup_name else WRAPPER_CGROUP_NAME results[cgroup_name] = collector.collect() limits[cgroup_name] = collector.cgroup.threshold @@ -364,10 +368,12 @@ class CGroups(object): @staticmethod def _construct_custom_path_for_hierarchy(hierarchy, cgroup_name): - return os.path.join(BASE_CGROUPS, hierarchy, AGENT_NAME, cgroup_name).rstrip(os.path.sep) + # This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/cgroup_name + return os.path.join(BASE_CGROUPS, hierarchy, WRAPPER_CGROUP_NAME, cgroup_name).rstrip(os.path.sep) @staticmethod def _construct_systemd_path_for_hierarchy(hierarchy, cgroup_name): + # This creates /sys/fs/cgroup/{cpu,memory}/system.slice/cgroup_name return os.path.join(BASE_CGROUPS, hierarchy, 'system.slice', cgroup_name).rstrip(os.path.sep) @staticmethod @@ -420,8 +426,9 @@ def __init__(self, name, path_maker, limits=None): cgroup_name = "" if self.is_wrapper_cgroup else self.name cgroup_path = path_maker(hierarchy, cgroup_name) if not os.path.isdir(cgroup_path): - logger.info("Creating cgroup directory {0}".format(cgroup_path)) CGroups._try_mkdir(cgroup_path) + logger.info("Created cgroup {0}".format(cgroup_path)) + self.cgroups[hierarchy] = cgroup_path @staticmethod @@ -539,6 +546,7 @@ def _setup_wrapper_groups(): For each hierarchy, construct the wrapper cgroup and apply the appropriate limits """ for hierarchy in METRIC_HIERARCHIES: + # This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent root_dir = CGroups._construct_custom_path_for_hierarchy(hierarchy, "") CGroups._try_mkdir(root_dir) CGroups._apply_wrapper_limits(root_dir, hierarchy) @@ -556,18 +564,24 @@ def setup(suppress_process_add=False): try: CGroups._osutil.mount_cgroups() if not suppress_process_add: + # Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent wrapper cgroup CGroups._setup_wrapper_groups() pid = int(os.getpid()) - if not CGroups.is_systemd_manager(): - cg = CGroups.for_extension(AGENT_NAME) - logger.info("Add daemon process pid {0} to {1} cgroup".format(pid, cg.name)) + if CGroups.is_systemd_manager(): + # When daemon is running as a service, it's called walinuxagent.service + # and is created and tracked by systemd, so we don't explicitly add the PID ourselves, + # just track it for our reporting purposes + cg = CGroups.for_systemd_service(AGENT_CGROUP_NAME.lower() + ".service") + logger.info("Daemon process id {0} is tracked in systemd cgroup {1}".format(pid, cg.name)) + # systemd sets limits; any limits we write would be overwritten + else: + # Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent cgroup + cg = CGroups.for_extension(AGENT_CGROUP_NAME) + logger.info("Daemon process id {0} is tracked in cgroup {1}".format(pid, cg.name)) cg.add(pid) cg.set_limits() - else: - cg = CGroups.for_systemd_service(AGENT_NAME) - logger.info("Add daemon process pid {0} to {1} systemd cgroup".format(pid, cg.name)) - # systemd sets limits; any limits we write would be overwritten - status = "ok" + + status = "successfully set up agent cgroup" except CGroupsException as cge: status = cge.msg CGroups.disable() @@ -590,7 +604,7 @@ def setup(suppress_process_add=False): log_event=False) @staticmethod - def add_to_extension_cgroup(name, pid=int(os.getpid())): + def add_to_extension_cgroup(name, pid): """ Create cgroup directories for this extension in each of the hierarchies and add this process to the new cgroup. Should only be called when creating sub-processes and invoked inside the fork/exec window. As a result, @@ -602,15 +616,16 @@ def add_to_extension_cgroup(name, pid=int(os.getpid())): """ if not CGroups.enabled(): return - if name == AGENT_NAME: - logger.warn('Extension cgroup name cannot match agent cgroup name ({0})'.format(AGENT_NAME)) + if name == AGENT_CGROUP_NAME: + logger.warn('Extension cgroup name cannot match extension handler cgroup name ({0}). ' \ + 'Will not track extension.'.format(AGENT_CGROUP_NAME)) return try: - logger.info("Move process {0} into cgroups for extension {1}".format(pid, name)) + logger.info("Move process {0} into cgroup for extension {1}".format(pid, name)) CGroups.for_extension(name).add(pid) except Exception as ex: - logger.warn("Unable to move process {0} into cgroups for extension {1}: {2}".format(pid, name, ex)) + logger.warn("Unable to move process {0} into cgroup for extension {1}: {2}".format(pid, name, ex)) @staticmethod def get_my_cgroup_path(hierarchy_id): @@ -796,7 +811,9 @@ def __init__(self, cgroup_name, threshold=None): @staticmethod def get_default_cpu_limits(cgroup_name): # default values - cpu_limit = DEFAULT_CPU_LIMIT_AGENT if AGENT_NAME.lower() in cgroup_name.lower() else DEFAULT_CPU_LIMIT_EXT + cpu_limit = DEFAULT_CPU_LIMIT_EXT + if AGENT_CGROUP_NAME.lower() in cgroup_name.lower(): + cpu_limit = DEFAULT_CPU_LIMIT_AGENT return cpu_limit @staticmethod @@ -807,6 +824,6 @@ def get_default_memory_limits(cgroup_name): mem_limit = max(DEFAULT_MEM_LIMIT_MIN_MB, round(os_util.get_total_mem() * DEFAULT_MEM_LIMIT_PCT / 100, 0)) # agent values - if AGENT_NAME.lower() in cgroup_name.lower(): + if AGENT_CGROUP_NAME.lower() in cgroup_name.lower(): mem_limit = min(DEFAULT_MEM_LIMIT_MAX_MB, mem_limit) return mem_limit diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py index d7503935f4..4797ac901e 100644 --- a/azurelinuxagent/ga/exthandlers.py +++ b/azurelinuxagent/ga/exthandlers.py @@ -1128,7 +1128,7 @@ def pre_exec_function(): the fork() and the exec() of sub-process creation. """ os.setsid() - CGroups.add_to_extension_cgroup(self.ext_handler.name) + CGroups.add_to_extension_cgroup(self.ext_handler.name, os.getpid()) process = subprocess.Popen(full_path, shell=True, diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py index f59aa72417..c4eb6a3c9f 100644 --- a/azurelinuxagent/ga/monitor.py +++ b/azurelinuxagent/ga/monitor.py @@ -401,9 +401,16 @@ def send_telemetry_heartbeat(self): @staticmethod def init_cgroups(): - # Track metrics for the roll-up cgroup and for the agent cgroup + # Track metrics for the wrapper cgroup and for the agent cgroup try: - CGroupsTelemetry.track_cgroup(CGroups.for_extension("")) + # This creates the wrapper cgroup for everything under agent, + # /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/ + # There is no need in tracking this cgroup, as it only serves + # as an umbrella for the agent and extensions cgroups + CGroups.for_extension("") + # This creates the agent's cgroup (for the daemon and extension handler) + # /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent + # If the system is using systemd, it would have already been set up under /system.slice CGroupsTelemetry.track_agent() except Exception as e: # when a hierarchy is not mounted, we raise an exception @@ -430,8 +437,10 @@ def send_cgroup_telemetry(self): # Memory is collected in bytes, and limit is set in megabytes. if value >= CGroups._format_memory_value('megabytes', thresholds.memory_limit): msg = "CGroup {0}: Crossed the Memory Threshold. " \ - "Current Value:{1} bytes, Threshold:{2} megabytes.".format(cgroup_name, value, - thresholds.memory_limit) + "Current Value: {1} bytes, Threshold: {2} megabytes." \ + .format(cgroup_name, value, thresholds.memory_limit) + + logger.warn(msg) add_event(name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.CGroupsLimitsCrossed, @@ -441,8 +450,11 @@ def send_cgroup_telemetry(self): if metric_group == "Process": if value >= thresholds.cpu_limit: - msg = "CGroup {0}: Crossed the Processor Threshold. Current Value:{1}, Threshold:{2}.".format( - cgroup_name, value, thresholds.cpu_limit) + msg = "CGroup {0}: Crossed the Processor Threshold. " \ + "Current Value: {1}, Threshold: {2}." \ + .format(cgroup_name, value, thresholds.cpu_limit) + + logger.warn(msg) add_event(name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.CGroupsLimitsCrossed, diff --git a/tests/ga/test_exthandlers.py b/tests/ga/test_exthandlers.py index c93e04e811..2d905867ab 100644 --- a/tests/ga/test_exthandlers.py +++ b/tests/ga/test_exthandlers.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the Apache License. import json +import re import stat from azurelinuxagent.common.protocol.restapi import ExtensionStatus, Extension, ExtHandler, ExtHandlerProperties @@ -8,6 +9,7 @@ from azurelinuxagent.common.exception import ProtocolError, ExtensionError, ExtensionErrorCodes from azurelinuxagent.common.event import WALAEventOperation from azurelinuxagent.common.utils.processutil import TELEMETRY_MESSAGE_MAX_LEN, format_stdout_stderr +from azurelinuxagent.common.cgroups import CGroups from tests.tools import * @@ -221,9 +223,17 @@ def setUp(self): ext_handler_properties = ExtHandlerProperties() ext_handler_properties.version = "1.2.3" - ext_handler = ExtHandler(name='foo') - ext_handler.properties = ext_handler_properties - self.ext_handler_instance = ExtHandlerInstance(ext_handler=ext_handler, protocol=None) + self.ext_handler = ExtHandler(name='foo') + self.ext_handler.properties = ext_handler_properties + self.ext_handler_instance = ExtHandlerInstance(ext_handler=self.ext_handler, protocol=None) + + self.base_cgroups = os.path.join(self.tmp_dir, "cgroup") + os.mkdir(self.base_cgroups) + os.mkdir(os.path.join(self.base_cgroups, "cpu")) + os.mkdir(os.path.join(self.base_cgroups, "memory")) + + self.mock__base_cgroups = patch("azurelinuxagent.common.cgroups.BASE_CGROUPS", self.base_cgroups) + self.mock__base_cgroups.start() self.mock_get_base_dir = patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.get_base_dir", lambda *_: self.tmp_dir) self.mock_get_base_dir.start() @@ -235,6 +245,7 @@ def setUp(self): def tearDown(self): self.mock_get_log_dir.stop() self.mock_get_base_dir.stop() + self.mock__base_cgroups.stop() AgentTestCase.tearDown(self) @@ -599,3 +610,35 @@ def test_it_should_handle_exceptions_from_cgroups_and_run_command(self): self.ext_handler_instance.launch_command(command) self.assertTrue(os.path.exists(signal_file)) + + @skip_if_predicate_false(CGroups.enabled, "CGroups not supported in this environment") + def test_it_should_add_the_child_process_to_its_own_cgroup(self): + # We are checking for the parent PID here since the PID getting written to the corresponding cgroup + # would be from the shell process started before launch_command invokes the actual command. + # In a non-mocked scenario, the kernel would actually also write all the children's PIDs to the procs + # file as well, but here we are mocking the base cgroup path, so it is not taken care for us. + command = self._create_script("output_parent_pid.py", ''' +import os + +print(os.getppid()) + +''') + + output = self.ext_handler_instance.launch_command(command) + + match = re.match(LaunchCommandTestCase._output_regex('(\d+)', '.*'), output) + if match is None or match.group(1) is None: + raise Exception("Could not extract the PID of the child command from its output") + + expected_pid = int(match.group(1)) + + controllers = os.listdir(self.base_cgroups) + for c in controllers: + procs = os.path.join(self.base_cgroups, c, "WALinuxAgent", self.ext_handler.name, "cgroup.procs") + with open(procs, "r") as f: + contents = f.read() + pid = int(contents) + + self.assertNotEqual(os.getpid(), pid, "The PID {0} added to {1} was of the launch command caller, not the command itself.".format(pid, procs)) + self.assertEquals(pid, expected_pid, "The PID of the command was not added to {0}. Expected: {1}, got: {2}".format(procs, expected_pid, pid)) +