Skip to content

Commit

Permalink
Fix PID tracking for cgroups (#1489)
Browse files Browse the repository at this point in the history
* add fixes for cgroup setup and PID tracking

* cleanup code comments

* disable cgroups test if not supported by environment

* remove wrapper cgroup tracking and other nits

* fix whitespace

* nit fixes; addressing CR comments
  • Loading branch information
pgombar authored Mar 26, 2019
1 parent ca1d395 commit ff01b37
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 31 deletions.
59 changes: 38 additions & 21 deletions azurelinuxagent/common/cgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION


WRAPPER_CGROUP_NAME = "Agent+Extensions"
WRAPPER_CGROUP_NAME = "WALinuxAgent"
AGENT_CGROUP_NAME = "WALinuxAgent"
METRIC_HIERARCHIES = ['cpu', 'memory']
MEMORY_DEFAULT = -1

Expand Down Expand Up @@ -215,6 +216,7 @@ def track_systemd_service(name):
service_name = "{0}.service".format(name).lower()
if CGroups.enabled() and not CGroupsTelemetry.is_tracked(service_name):
cgroup = CGroups.for_systemd_service(service_name)
logger.info("Now tracking cgroup {0}".format(service_name))
tracker = CGroupsTelemetry(service_name, cgroup=cgroup)
CGroupsTelemetry._tracked[service_name] = tracker

Expand Down Expand Up @@ -248,9 +250,12 @@ def track_agent():
if not CGroups.enabled():
return
if CGroups.is_systemd_manager():
CGroupsTelemetry.track_systemd_service(AGENT_NAME)
logger.info("Tracking systemd cgroup for {0}".format(AGENT_CGROUP_NAME))
CGroupsTelemetry.track_systemd_service(AGENT_CGROUP_NAME)
else:
CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_NAME))
logger.info("Tracking cgroup for {0}".format(AGENT_CGROUP_NAME))
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent
CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_CGROUP_NAME))

@staticmethod
def is_tracked(name):
Expand Down Expand Up @@ -282,7 +287,6 @@ def collect_all_tracked():
limits = {}

for cgroup_name, collector in CGroupsTelemetry._tracked.copy().items():
cgroup_name = cgroup_name if cgroup_name else WRAPPER_CGROUP_NAME
results[cgroup_name] = collector.collect()
limits[cgroup_name] = collector.cgroup.threshold

Expand Down Expand Up @@ -364,10 +368,12 @@ class CGroups(object):

@staticmethod
def _construct_custom_path_for_hierarchy(hierarchy, cgroup_name):
return os.path.join(BASE_CGROUPS, hierarchy, AGENT_NAME, cgroup_name).rstrip(os.path.sep)
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/cgroup_name
return os.path.join(BASE_CGROUPS, hierarchy, WRAPPER_CGROUP_NAME, cgroup_name).rstrip(os.path.sep)

@staticmethod
def _construct_systemd_path_for_hierarchy(hierarchy, cgroup_name):
# This creates /sys/fs/cgroup/{cpu,memory}/system.slice/cgroup_name
return os.path.join(BASE_CGROUPS, hierarchy, 'system.slice', cgroup_name).rstrip(os.path.sep)

@staticmethod
Expand Down Expand Up @@ -420,8 +426,9 @@ def __init__(self, name, path_maker, limits=None):
cgroup_name = "" if self.is_wrapper_cgroup else self.name
cgroup_path = path_maker(hierarchy, cgroup_name)
if not os.path.isdir(cgroup_path):
logger.info("Creating cgroup directory {0}".format(cgroup_path))
CGroups._try_mkdir(cgroup_path)
logger.info("Created cgroup {0}".format(cgroup_path))

self.cgroups[hierarchy] = cgroup_path

@staticmethod
Expand Down Expand Up @@ -539,6 +546,7 @@ def _setup_wrapper_groups():
For each hierarchy, construct the wrapper cgroup and apply the appropriate limits
"""
for hierarchy in METRIC_HIERARCHIES:
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent
root_dir = CGroups._construct_custom_path_for_hierarchy(hierarchy, "")
CGroups._try_mkdir(root_dir)
CGroups._apply_wrapper_limits(root_dir, hierarchy)
Expand All @@ -556,18 +564,24 @@ def setup(suppress_process_add=False):
try:
CGroups._osutil.mount_cgroups()
if not suppress_process_add:
# Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent wrapper cgroup
CGroups._setup_wrapper_groups()
pid = int(os.getpid())
if not CGroups.is_systemd_manager():
cg = CGroups.for_extension(AGENT_NAME)
logger.info("Add daemon process pid {0} to {1} cgroup".format(pid, cg.name))
if CGroups.is_systemd_manager():
# When daemon is running as a service, it's called walinuxagent.service
# and is created and tracked by systemd, so we don't explicitly add the PID ourselves,
# just track it for our reporting purposes
cg = CGroups.for_systemd_service(AGENT_CGROUP_NAME.lower() + ".service")
logger.info("Daemon process id {0} is tracked in systemd cgroup {1}".format(pid, cg.name))
# systemd sets limits; any limits we write would be overwritten
else:
# Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent cgroup
cg = CGroups.for_extension(AGENT_CGROUP_NAME)
logger.info("Daemon process id {0} is tracked in cgroup {1}".format(pid, cg.name))
cg.add(pid)
cg.set_limits()
else:
cg = CGroups.for_systemd_service(AGENT_NAME)
logger.info("Add daemon process pid {0} to {1} systemd cgroup".format(pid, cg.name))
# systemd sets limits; any limits we write would be overwritten
status = "ok"

status = "successfully set up agent cgroup"
except CGroupsException as cge:
status = cge.msg
CGroups.disable()
Expand All @@ -590,7 +604,7 @@ def setup(suppress_process_add=False):
log_event=False)

@staticmethod
def add_to_extension_cgroup(name, pid=int(os.getpid())):
def add_to_extension_cgroup(name, pid):
"""
Create cgroup directories for this extension in each of the hierarchies and add this process to the new cgroup.
Should only be called when creating sub-processes and invoked inside the fork/exec window. As a result,
Expand All @@ -602,15 +616,16 @@ def add_to_extension_cgroup(name, pid=int(os.getpid())):
"""
if not CGroups.enabled():
return
if name == AGENT_NAME:
logger.warn('Extension cgroup name cannot match agent cgroup name ({0})'.format(AGENT_NAME))
if name == AGENT_CGROUP_NAME:
logger.warn('Extension cgroup name cannot match extension handler cgroup name ({0}). ' \
'Will not track extension.'.format(AGENT_CGROUP_NAME))
return

try:
logger.info("Move process {0} into cgroups for extension {1}".format(pid, name))
logger.info("Move process {0} into cgroup for extension {1}".format(pid, name))
CGroups.for_extension(name).add(pid)
except Exception as ex:
logger.warn("Unable to move process {0} into cgroups for extension {1}: {2}".format(pid, name, ex))
logger.warn("Unable to move process {0} into cgroup for extension {1}: {2}".format(pid, name, ex))

@staticmethod
def get_my_cgroup_path(hierarchy_id):
Expand Down Expand Up @@ -796,7 +811,9 @@ def __init__(self, cgroup_name, threshold=None):
@staticmethod
def get_default_cpu_limits(cgroup_name):
# default values
cpu_limit = DEFAULT_CPU_LIMIT_AGENT if AGENT_NAME.lower() in cgroup_name.lower() else DEFAULT_CPU_LIMIT_EXT
cpu_limit = DEFAULT_CPU_LIMIT_EXT
if AGENT_CGROUP_NAME.lower() in cgroup_name.lower():
cpu_limit = DEFAULT_CPU_LIMIT_AGENT
return cpu_limit

@staticmethod
Expand All @@ -807,6 +824,6 @@ def get_default_memory_limits(cgroup_name):
mem_limit = max(DEFAULT_MEM_LIMIT_MIN_MB, round(os_util.get_total_mem() * DEFAULT_MEM_LIMIT_PCT / 100, 0))

# agent values
if AGENT_NAME.lower() in cgroup_name.lower():
if AGENT_CGROUP_NAME.lower() in cgroup_name.lower():
mem_limit = min(DEFAULT_MEM_LIMIT_MAX_MB, mem_limit)
return mem_limit
2 changes: 1 addition & 1 deletion azurelinuxagent/ga/exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ def pre_exec_function():
the fork() and the exec() of sub-process creation.
"""
os.setsid()
CGroups.add_to_extension_cgroup(self.ext_handler.name)
CGroups.add_to_extension_cgroup(self.ext_handler.name, os.getpid())

process = subprocess.Popen(full_path,
shell=True,
Expand Down
24 changes: 18 additions & 6 deletions azurelinuxagent/ga/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,16 @@ def send_telemetry_heartbeat(self):

@staticmethod
def init_cgroups():
# Track metrics for the roll-up cgroup and for the agent cgroup
# Track metrics for the wrapper cgroup and for the agent cgroup
try:
CGroupsTelemetry.track_cgroup(CGroups.for_extension(""))
# This creates the wrapper cgroup for everything under agent,
# /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/
# There is no need in tracking this cgroup, as it only serves
# as an umbrella for the agent and extensions cgroups
CGroups.for_extension("")
# This creates the agent's cgroup (for the daemon and extension handler)
# /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent
# If the system is using systemd, it would have already been set up under /system.slice
CGroupsTelemetry.track_agent()
except Exception as e:
# when a hierarchy is not mounted, we raise an exception
Expand All @@ -430,8 +437,10 @@ def send_cgroup_telemetry(self):
# Memory is collected in bytes, and limit is set in megabytes.
if value >= CGroups._format_memory_value('megabytes', thresholds.memory_limit):
msg = "CGroup {0}: Crossed the Memory Threshold. " \
"Current Value:{1} bytes, Threshold:{2} megabytes.".format(cgroup_name, value,
thresholds.memory_limit)
"Current Value: {1} bytes, Threshold: {2} megabytes." \
.format(cgroup_name, value, thresholds.memory_limit)

logger.warn(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.CGroupsLimitsCrossed,
Expand All @@ -441,8 +450,11 @@ def send_cgroup_telemetry(self):

if metric_group == "Process":
if value >= thresholds.cpu_limit:
msg = "CGroup {0}: Crossed the Processor Threshold. Current Value:{1}, Threshold:{2}.".format(
cgroup_name, value, thresholds.cpu_limit)
msg = "CGroup {0}: Crossed the Processor Threshold. " \
"Current Value: {1}, Threshold: {2}." \
.format(cgroup_name, value, thresholds.cpu_limit)

logger.warn(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.CGroupsLimitsCrossed,
Expand Down
49 changes: 46 additions & 3 deletions tests/ga/test_exthandlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache License.
import json
import re
import stat

from azurelinuxagent.common.protocol.restapi import ExtensionStatus, Extension, ExtHandler, ExtHandlerProperties
from azurelinuxagent.ga.exthandlers import parse_ext_status, ExtHandlerInstance, get_exthandlers_handler
from azurelinuxagent.common.exception import ProtocolError, ExtensionError, ExtensionErrorCodes
from azurelinuxagent.common.event import WALAEventOperation
from azurelinuxagent.common.utils.processutil import TELEMETRY_MESSAGE_MAX_LEN, format_stdout_stderr
from azurelinuxagent.common.cgroups import CGroups
from tests.tools import *


Expand Down Expand Up @@ -221,9 +223,17 @@ def setUp(self):

ext_handler_properties = ExtHandlerProperties()
ext_handler_properties.version = "1.2.3"
ext_handler = ExtHandler(name='foo')
ext_handler.properties = ext_handler_properties
self.ext_handler_instance = ExtHandlerInstance(ext_handler=ext_handler, protocol=None)
self.ext_handler = ExtHandler(name='foo')
self.ext_handler.properties = ext_handler_properties
self.ext_handler_instance = ExtHandlerInstance(ext_handler=self.ext_handler, protocol=None)

self.base_cgroups = os.path.join(self.tmp_dir, "cgroup")
os.mkdir(self.base_cgroups)
os.mkdir(os.path.join(self.base_cgroups, "cpu"))
os.mkdir(os.path.join(self.base_cgroups, "memory"))

self.mock__base_cgroups = patch("azurelinuxagent.common.cgroups.BASE_CGROUPS", self.base_cgroups)
self.mock__base_cgroups.start()

self.mock_get_base_dir = patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.get_base_dir", lambda *_: self.tmp_dir)
self.mock_get_base_dir.start()
Expand All @@ -235,6 +245,7 @@ def setUp(self):
def tearDown(self):
self.mock_get_log_dir.stop()
self.mock_get_base_dir.stop()
self.mock__base_cgroups.stop()

AgentTestCase.tearDown(self)

Expand Down Expand Up @@ -599,3 +610,35 @@ def test_it_should_handle_exceptions_from_cgroups_and_run_command(self):
self.ext_handler_instance.launch_command(command)

self.assertTrue(os.path.exists(signal_file))

@skip_if_predicate_false(CGroups.enabled, "CGroups not supported in this environment")
def test_it_should_add_the_child_process_to_its_own_cgroup(self):
# We are checking for the parent PID here since the PID getting written to the corresponding cgroup
# would be from the shell process started before launch_command invokes the actual command.
# In a non-mocked scenario, the kernel would actually also write all the children's PIDs to the procs
# file as well, but here we are mocking the base cgroup path, so it is not taken care for us.
command = self._create_script("output_parent_pid.py", '''
import os
print(os.getppid())
''')

output = self.ext_handler_instance.launch_command(command)

match = re.match(LaunchCommandTestCase._output_regex('(\d+)', '.*'), output)
if match is None or match.group(1) is None:
raise Exception("Could not extract the PID of the child command from its output")

expected_pid = int(match.group(1))

controllers = os.listdir(self.base_cgroups)
for c in controllers:
procs = os.path.join(self.base_cgroups, c, "WALinuxAgent", self.ext_handler.name, "cgroup.procs")
with open(procs, "r") as f:
contents = f.read()
pid = int(contents)

self.assertNotEqual(os.getpid(), pid, "The PID {0} added to {1} was of the launch command caller, not the command itself.".format(pid, procs))
self.assertEquals(pid, expected_pid, "The PID of the command was not added to {0}. Expected: {1}, got: {2}".format(procs, expected_pid, pid))

0 comments on commit ff01b37

Please sign in to comment.