Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix PID tracking for cgroups #1489

Merged
merged 11 commits into from
Mar 26, 2019
59 changes: 38 additions & 21 deletions azurelinuxagent/common/cgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION


WRAPPER_CGROUP_NAME = "Agent+Extensions"
WRAPPER_CGROUP_NAME = "WALinuxAgent"
AGENT_CGROUP_NAME = "WALinuxAgent"
METRIC_HIERARCHIES = ['cpu', 'memory']
MEMORY_DEFAULT = -1

Expand Down Expand Up @@ -215,6 +216,7 @@ def track_systemd_service(name):
service_name = "{0}.service".format(name).lower()
if CGroups.enabled() and not CGroupsTelemetry.is_tracked(service_name):
cgroup = CGroups.for_systemd_service(service_name)
logger.info("Now tracking cgroup {0}".format(service_name))
tracker = CGroupsTelemetry(service_name, cgroup=cgroup)
CGroupsTelemetry._tracked[service_name] = tracker

Expand Down Expand Up @@ -248,9 +250,12 @@ def track_agent():
if not CGroups.enabled():
return
if CGroups.is_systemd_manager():
CGroupsTelemetry.track_systemd_service(AGENT_NAME)
logger.info("Tracking systemd cgroup for {0}".format(AGENT_CGROUP_NAME))
CGroupsTelemetry.track_systemd_service(AGENT_CGROUP_NAME)
else:
CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_NAME))
logger.info("Tracking cgroup for {0}".format(AGENT_CGROUP_NAME))
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent
CGroupsTelemetry.track_cgroup(CGroups.for_extension(AGENT_CGROUP_NAME))

@staticmethod
def is_tracked(name):
Expand Down Expand Up @@ -282,7 +287,6 @@ def collect_all_tracked():
limits = {}

for cgroup_name, collector in CGroupsTelemetry._tracked.copy().items():
cgroup_name = cgroup_name if cgroup_name else WRAPPER_CGROUP_NAME
results[cgroup_name] = collector.collect()
limits[cgroup_name] = collector.cgroup.threshold

Expand Down Expand Up @@ -364,10 +368,12 @@ class CGroups(object):

@staticmethod
def _construct_custom_path_for_hierarchy(hierarchy, cgroup_name):
return os.path.join(BASE_CGROUPS, hierarchy, AGENT_NAME, cgroup_name).rstrip(os.path.sep)
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/cgroup_name
return os.path.join(BASE_CGROUPS, hierarchy, WRAPPER_CGROUP_NAME, cgroup_name).rstrip(os.path.sep)

@staticmethod
def _construct_systemd_path_for_hierarchy(hierarchy, cgroup_name):
# This creates /sys/fs/cgroup/{cpu,memory}/system.slice/cgroup_name
return os.path.join(BASE_CGROUPS, hierarchy, 'system.slice', cgroup_name).rstrip(os.path.sep)

@staticmethod
Expand Down Expand Up @@ -420,8 +426,9 @@ def __init__(self, name, path_maker, limits=None):
cgroup_name = "" if self.is_wrapper_cgroup else self.name
cgroup_path = path_maker(hierarchy, cgroup_name)
if not os.path.isdir(cgroup_path):
logger.info("Creating cgroup directory {0}".format(cgroup_path))
CGroups._try_mkdir(cgroup_path)
logger.info("Created cgroup {0}".format(cgroup_path))

self.cgroups[hierarchy] = cgroup_path

@staticmethod
Expand Down Expand Up @@ -539,6 +546,7 @@ def _setup_wrapper_groups():
For each hierarchy, construct the wrapper cgroup and apply the appropriate limits
"""
for hierarchy in METRIC_HIERARCHIES:
# This creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent
root_dir = CGroups._construct_custom_path_for_hierarchy(hierarchy, "")
CGroups._try_mkdir(root_dir)
CGroups._apply_wrapper_limits(root_dir, hierarchy)
Expand All @@ -556,18 +564,24 @@ def setup(suppress_process_add=False):
try:
CGroups._osutil.mount_cgroups()
if not suppress_process_add:
# Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent wrapper cgroup
CGroups._setup_wrapper_groups()
pid = int(os.getpid())
if not CGroups.is_systemd_manager():
cg = CGroups.for_extension(AGENT_NAME)
logger.info("Add daemon process pid {0} to {1} cgroup".format(pid, cg.name))
if CGroups.is_systemd_manager():
# When daemon is running as a service, it's called walinuxagent.service
# and is created and tracked by systemd, so we don't explicitly add the PID ourselves,
# just track it for our reporting purposes
cg = CGroups.for_systemd_service(AGENT_CGROUP_NAME.lower() + ".service")
logger.info("Daemon process id {0} is tracked in systemd cgroup {1}".format(pid, cg.name))
# systemd sets limits; any limits we write would be overwritten
else:
# Creates /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent cgroup
cg = CGroups.for_extension(AGENT_CGROUP_NAME)
logger.info("Daemon process id {0} is tracked in cgroup {1}".format(pid, cg.name))
cg.add(pid)
cg.set_limits()
else:
cg = CGroups.for_systemd_service(AGENT_NAME)
logger.info("Add daemon process pid {0} to {1} systemd cgroup".format(pid, cg.name))
# systemd sets limits; any limits we write would be overwritten
status = "ok"

status = "successfully set up agent cgroup"
except CGroupsException as cge:
status = cge.msg
CGroups.disable()
Expand All @@ -590,7 +604,7 @@ def setup(suppress_process_add=False):
log_event=False)

@staticmethod
def add_to_extension_cgroup(name, pid=int(os.getpid())):
def add_to_extension_cgroup(name, pid):
"""
Create cgroup directories for this extension in each of the hierarchies and add this process to the new cgroup.
Should only be called when creating sub-processes and invoked inside the fork/exec window. As a result,
Expand All @@ -602,15 +616,16 @@ def add_to_extension_cgroup(name, pid=int(os.getpid())):
"""
if not CGroups.enabled():
return
if name == AGENT_NAME:
logger.warn('Extension cgroup name cannot match agent cgroup name ({0})'.format(AGENT_NAME))
if name == AGENT_CGROUP_NAME:
logger.warn('Extension cgroup name cannot match extension handler cgroup name ({0}). ' \
'Will not track extension.'.format(AGENT_CGROUP_NAME))
return

try:
logger.info("Move process {0} into cgroups for extension {1}".format(pid, name))
logger.info("Move process {0} into cgroup for extension {1}".format(pid, name))
CGroups.for_extension(name).add(pid)
except Exception as ex:
logger.warn("Unable to move process {0} into cgroups for extension {1}: {2}".format(pid, name, ex))
logger.warn("Unable to move process {0} into cgroup for extension {1}: {2}".format(pid, name, ex))

@staticmethod
def get_my_cgroup_path(hierarchy_id):
Expand Down Expand Up @@ -796,7 +811,9 @@ def __init__(self, cgroup_name, threshold=None):
@staticmethod
def get_default_cpu_limits(cgroup_name):
# default values
cpu_limit = DEFAULT_CPU_LIMIT_AGENT if AGENT_NAME.lower() in cgroup_name.lower() else DEFAULT_CPU_LIMIT_EXT
cpu_limit = DEFAULT_CPU_LIMIT_EXT
if AGENT_CGROUP_NAME.lower() in cgroup_name.lower():
cpu_limit = DEFAULT_CPU_LIMIT_AGENT
return cpu_limit

@staticmethod
Expand All @@ -807,6 +824,6 @@ def get_default_memory_limits(cgroup_name):
mem_limit = max(DEFAULT_MEM_LIMIT_MIN_MB, round(os_util.get_total_mem() * DEFAULT_MEM_LIMIT_PCT / 100, 0))

# agent values
if AGENT_NAME.lower() in cgroup_name.lower():
if AGENT_CGROUP_NAME.lower() in cgroup_name.lower():
mem_limit = min(DEFAULT_MEM_LIMIT_MAX_MB, mem_limit)
return mem_limit
2 changes: 1 addition & 1 deletion azurelinuxagent/ga/exthandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ def pre_exec_function():
the fork() and the exec() of sub-process creation.
"""
os.setsid()
CGroups.add_to_extension_cgroup(self.ext_handler.name)
CGroups.add_to_extension_cgroup(self.ext_handler.name, os.getpid())

process = subprocess.Popen(full_path,
shell=True,
Expand Down
24 changes: 18 additions & 6 deletions azurelinuxagent/ga/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,16 @@ def send_telemetry_heartbeat(self):

@staticmethod
def init_cgroups():
# Track metrics for the roll-up cgroup and for the agent cgroup
# Track metrics for the wrapper cgroup and for the agent cgroup
try:
CGroupsTelemetry.track_cgroup(CGroups.for_extension(""))
# This creates the wrapper cgroup for everything under agent,
# /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/
# There is no need in tracking this cgroup, as it only serves
# as an umbrella for the agent and extensions cgroups
CGroups.for_extension("")
# This creates the agent's cgroup (for the daemon and extension handler)
# /sys/fs/cgroup/{cpu,memory}/WALinuxAgent/WALinuxAgent
# If the system is using systemd, it would have already been set up under /system.slice
CGroupsTelemetry.track_agent()
except Exception as e:
# when a hierarchy is not mounted, we raise an exception
Expand All @@ -430,8 +437,10 @@ def send_cgroup_telemetry(self):
# Memory is collected in bytes, and limit is set in megabytes.
if value >= CGroups._format_memory_value('megabytes', thresholds.memory_limit):
msg = "CGroup {0}: Crossed the Memory Threshold. " \
"Current Value:{1} bytes, Threshold:{2} megabytes.".format(cgroup_name, value,
thresholds.memory_limit)
"Current Value: {1} bytes, Threshold: {2} megabytes." \
.format(cgroup_name, value, thresholds.memory_limit)

logger.warn(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.CGroupsLimitsCrossed,
Expand All @@ -441,8 +450,11 @@ def send_cgroup_telemetry(self):

if metric_group == "Process":
if value >= thresholds.cpu_limit:
msg = "CGroup {0}: Crossed the Processor Threshold. Current Value:{1}, Threshold:{2}.".format(
cgroup_name, value, thresholds.cpu_limit)
msg = "CGroup {0}: Crossed the Processor Threshold. " \
"Current Value: {1}, Threshold: {2}." \
.format(cgroup_name, value, thresholds.cpu_limit)

logger.warn(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.CGroupsLimitsCrossed,
Expand Down
49 changes: 46 additions & 3 deletions tests/ga/test_exthandlers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the Apache License.
import json
import re
import stat

from azurelinuxagent.common.protocol.restapi import ExtensionStatus, Extension, ExtHandler, ExtHandlerProperties
from azurelinuxagent.ga.exthandlers import parse_ext_status, ExtHandlerInstance, get_exthandlers_handler
from azurelinuxagent.common.exception import ProtocolError, ExtensionError, ExtensionErrorCodes
from azurelinuxagent.common.event import WALAEventOperation
from azurelinuxagent.common.utils.processutil import TELEMETRY_MESSAGE_MAX_LEN, format_stdout_stderr
from azurelinuxagent.common.cgroups import CGroups
from tests.tools import *


Expand Down Expand Up @@ -221,9 +223,17 @@ def setUp(self):

ext_handler_properties = ExtHandlerProperties()
ext_handler_properties.version = "1.2.3"
ext_handler = ExtHandler(name='foo')
ext_handler.properties = ext_handler_properties
self.ext_handler_instance = ExtHandlerInstance(ext_handler=ext_handler, protocol=None)
self.ext_handler = ExtHandler(name='foo')
self.ext_handler.properties = ext_handler_properties
self.ext_handler_instance = ExtHandlerInstance(ext_handler=self.ext_handler, protocol=None)

self.base_cgroups = os.path.join(self.tmp_dir, "cgroup")
os.mkdir(self.base_cgroups)
os.mkdir(os.path.join(self.base_cgroups, "cpu"))
os.mkdir(os.path.join(self.base_cgroups, "memory"))

self.mock__base_cgroups = patch("azurelinuxagent.common.cgroups.BASE_CGROUPS", self.base_cgroups)
self.mock__base_cgroups.start()

self.mock_get_base_dir = patch("azurelinuxagent.ga.exthandlers.ExtHandlerInstance.get_base_dir", lambda *_: self.tmp_dir)
self.mock_get_base_dir.start()
Expand All @@ -235,6 +245,7 @@ def setUp(self):
def tearDown(self):
self.mock_get_log_dir.stop()
self.mock_get_base_dir.stop()
self.mock__base_cgroups.stop()

AgentTestCase.tearDown(self)

Expand Down Expand Up @@ -599,3 +610,35 @@ def test_it_should_handle_exceptions_from_cgroups_and_run_command(self):
self.ext_handler_instance.launch_command(command)

self.assertTrue(os.path.exists(signal_file))

@skip_if_predicate_false(CGroups.enabled, "CGroups not supported in this environment")
def test_it_should_add_the_child_process_to_its_own_cgroup(self):
# We are checking for the parent PID here since the PID getting written to the corresponding cgroup
# would be from the shell process started before launch_command invokes the actual command.
# In a non-mocked scenario, the kernel would actually also write all the children's PIDs to the procs
# file as well, but here we are mocking the base cgroup path, so it is not taken care for us.
command = self._create_script("output_parent_pid.py", '''
import os

print(os.getppid())

''')

output = self.ext_handler_instance.launch_command(command)

match = re.match(LaunchCommandTestCase._output_regex('(\d+)', '.*'), output)
if match is None or match.group(1) is None:
raise Exception("Could not extract the PID of the child command from its output")

expected_pid = int(match.group(1))

controllers = os.listdir(self.base_cgroups)
for c in controllers:
procs = os.path.join(self.base_cgroups, c, "WALinuxAgent", self.ext_handler.name, "cgroup.procs")
with open(procs, "r") as f:
contents = f.read()
pid = int(contents)

self.assertNotEqual(os.getpid(), pid, "The PID {0} added to {1} was of the launch command caller, not the command itself.".format(pid, procs))
self.assertEquals(pid, expected_pid, "The PID of the command was not added to {0}. Expected: {1}, got: {2}".format(procs, expected_pid, pid))