Skip to content

Commit

Permalink
Merge branch 'develop' into devuan_support_new
Browse files Browse the repository at this point in the history
  • Loading branch information
narrieta authored May 18, 2022
2 parents b6ab267 + b8ca432 commit e2d16e0
Show file tree
Hide file tree
Showing 43 changed files with 1,117 additions and 237 deletions.
2 changes: 1 addition & 1 deletion CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
#
# Linux Agent team
#
* @narrieta @larohra @kevinclark19a @ZhidongPeng @dhivyaganesan @nagworld9
* @narrieta @kevinclark19a @ZhidongPeng @dhivyaganesan @nagworld9
14 changes: 9 additions & 5 deletions azurelinuxagent/common/cgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _get_cpu_ticks(self, allow_no_such_file_or_directory_error=False):

return cpu_ticks

def _get_throttled_time(self):
def get_throttled_time(self):
try:
with open(os.path.join(self.path, 'cpu.stat')) as cpu_stat:
#
Expand Down Expand Up @@ -205,7 +205,7 @@ def initialize_cpu_usage(self):
raise CGroupsException("initialize_cpu_usage() should be invoked only once")
self._current_cgroup_cpu = self._get_cpu_ticks(allow_no_such_file_or_directory_error=True)
self._current_system_cpu = self._osutil.get_total_cpu_ticks_since_boot()
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

def get_cpu_usage(self):
"""
Expand All @@ -229,16 +229,20 @@ def get_cpu_usage(self):

return round(100.0 * self._osutil.get_processor_cores() * float(cgroup_delta) / float(system_delta), 3)

def get_throttled_time(self):
def get_cpu_throttled_time(self, read_previous_throttled_time=True):
"""
Computes the throttled time (in seconds) since the last call to this function.
NOTE: initialize_cpu_usage() must be invoked before calling this function
Compute only current throttled time if read_previous_throttled_time set to False
"""
if not read_previous_throttled_time:
return float(self.get_throttled_time() / 1E9)

if not self._cpu_usage_initialized():
raise CGroupsException("initialize_cpu_usage() must be invoked before the first call to get_throttled_time()")

self._previous_throttled_time = self._current_throttled_time
self._current_throttled_time = self._get_throttled_time()
self._current_throttled_time = self.get_throttled_time()

return float(self._current_throttled_time - self._previous_throttled_time) / 1E9

Expand All @@ -249,7 +253,7 @@ def get_tracked_metrics(self, **kwargs):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.PROCESSOR_PERCENT_TIME, self.name, cpu_usage))

if 'track_throttled_time' in kwargs and kwargs['track_throttled_time']:
throttled_time = self.get_throttled_time()
throttled_time = self.get_cpu_throttled_time()
if cpu_usage >= float(0) and throttled_time >= float(0):
tracked.append(MetricValue(MetricsCategory.CPU_CATEGORY, MetricsCounter.THROTTLED_TIME, self.name, throttled_time))

Expand Down
6 changes: 4 additions & 2 deletions azurelinuxagent/common/cgroupapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh

logger.info("Started extension in unit '{0}'", scope_name)

cpu_cgroup = None
try:
cgroup_relative_path = os.path.join('azure.slice/azure-vmextensions.slice', extension_slice_name)

Expand All @@ -289,7 +290,8 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
logger.info("The CPU controller is not mounted; will not track resource usage")
else:
cpu_cgroup_path = os.path.join(cpu_cgroup_mountpoint, cgroup_relative_path)
CGroupsTelemetry.track_cgroup(CpuCgroup(extension_name, cpu_cgroup_path))
cpu_cgroup = CpuCgroup(extension_name, cpu_cgroup_path)
CGroupsTelemetry.track_cgroup(cpu_cgroup)

except IOError as e:
if e.errno == 2: # 'No such file or directory'
Expand All @@ -301,7 +303,7 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
# Wait for process completion or timeout
try:
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout,
stderr=stderr, error_code=error_code)
stderr=stderr, error_code=error_code, cpu_cgroup=cpu_cgroup)
except ExtensionError as e:
# The extension didn't terminate successfully. Determine whether it was due to systemd errors or
# extension errors.
Expand Down
123 changes: 102 additions & 21 deletions azurelinuxagent/common/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
import glob
import json
import os
import re
import subprocess
Expand Down Expand Up @@ -56,6 +58,7 @@
Before=slices.target
[Slice]
CPUAccounting=yes
CPUQuota={cpu_quota}
"""
LOGCOLLECTOR_SLICE = "azure-walinuxagent-logcollector.slice"
# More info on resource limits properties in systemd here:
Expand Down Expand Up @@ -357,12 +360,16 @@ def __setup_azure_slice():
CGroupConfigurator._Impl.__cleanup_unit_file(unit_file)
return

# reload the systemd configuration; the new slices will be used once the agent's service restarts
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception))
CGroupConfigurator._Impl.__reload_systemd_config()

@staticmethod
def __reload_systemd_config():
# reload the systemd configuration; the new slices will be used once the agent's service restarts
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create azure slice): {0}", ustr(exception))

@staticmethod
def __create_unit_file(path, contents):
Expand Down Expand Up @@ -478,18 +485,23 @@ def enable(self):
self.__set_cpu_quota(conf.get_agent_cpu_quota())

def disable(self, reason, disable_cgroups):
# Todo: disable/reset extension when ext quotas introduced
if disable_cgroups == DisableCgroups.ALL: # disable all
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
# Reset quotas
self.__reset_agent_cpu_quota()
extension_services = self.get_extension_services_list()
for extension in extension_services:
logger.info("Resetting extension : {0} and it's services: {1} CPUQuota".format(extension, extension_services[extension]))
self.__reset_extension_cpu_quota(extension_name=extension)
self.__reset_extension_services_cpu_quota(extension_services[extension])
self.__reload_systemd_config()

CGroupsTelemetry.reset()
self._agent_cgroups_enabled = False
self._extensions_cgroups_enabled = False
elif disable_cgroups == DisableCgroups.AGENT: # disable agent
self._agent_cgroups_enabled = False
self.__reset_agent_cpu_quota()
CGroupsTelemetry.stop_tracking(CpuCgroup(AGENT_NAME_TELEMETRY, self._agent_cpu_cgroup_path))
elif disable_cgroups == DisableCgroups.EXTENSIONS: # disable extensions
self._extensions_cgroups_enabled = False

message = "[CGW] Disabling resource usage monitoring. Reason: {0}".format(reason)
logger.info(message) # log as INFO for now, in the future it should be logged as WARNING
Expand Down Expand Up @@ -518,7 +530,6 @@ def __reset_agent_cpu_quota():
"""
logger.info("Resetting agent's CPUQuota")
if CGroupConfigurator._Impl.__try_set_cpu_quota(''): # setting an empty value resets to the default (infinity)
CGroupsTelemetry.set_track_throttled_time(False)
_log_cgroup_info('CPUQuota: {0}', systemd.get_unit_property(systemd.get_agent_unit_name(), "CPUQuotaPerSecUSec"))

@staticmethod
Expand Down Expand Up @@ -762,24 +773,35 @@ def start_extension_command(self, extension_name, command, cmd_name, timeout, sh
process = subprocess.Popen(command, shell=shell, cwd=cwd, env=env, stdout=stdout, stderr=stderr, preexec_fn=os.setsid) # pylint: disable=W1509
return handle_process_completion(process=process, command=command, timeout=timeout, stdout=stdout, stderr=stderr, error_code=error_code)

def setup_extension_slice(self, extension_name):
def __reset_extension_cpu_quota(self, extension_name):
"""
Removes any CPUQuota on the extension
NOTE: This resets the quota on the extension's slice; any local overrides on the VM will take precedence
over this setting.
"""
if self.enabled():
self.setup_extension_slice(extension_name, cpu_quota=None)

def setup_extension_slice(self, extension_name, cpu_quota):
"""
Each extension runs under its own slice (Ex "Microsoft.CPlat.Extension.slice"). All the slices for
extensions are grouped under "azure-vmextensions.slice.
This method ensures that the extension slice is created. Setup should create
under /lib/systemd/system if it is not exist.
TODO: set cpu and memory quotas
TODO: set memory quotas
"""
if self.enabled():
unit_file_install_path = systemd.get_unit_file_install_path()
extension_slice_path = os.path.join(unit_file_install_path,
SystemdCgroupsApi.get_extension_slice_name(extension_name))
try:
slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name)
cpu_quota = str(cpu_quota) + "%" if cpu_quota is not None else "" # setting an empty value resets to the default (infinity)
slice_contents = _EXTENSION_SLICE_CONTENTS.format(extension_name=extension_name, cpu_quota=cpu_quota)
CGroupConfigurator._Impl.__create_unit_file(extension_slice_path, slice_contents)
except Exception as exception:
_log_cgroup_warning("Failed to create unit files for the extension slice: {0}", ustr(exception))
_log_cgroup_warning("Failed to set the extension {0} slice and quotas: {1}", extension_name, ustr(exception))
CGroupConfigurator._Impl.__cleanup_unit_file(extension_slice_path)

def remove_extension_slice(self, extension_name):
Expand All @@ -800,7 +822,7 @@ def set_extension_services_cpu_memory_quota(self, services_list):
Each extension service will have name, systemd path and it's quotas.
This method ensures that drop-in files are created under service.d folder if quotas given.
ex: /lib/systemd/system/extension.service.d/11-CPUAccounting.conf
TODO: set cpu and memory quotas
TODO: set memory quotas
"""
if self.enabled() and services_list is not None:
for service in services_list:
Expand All @@ -813,14 +835,43 @@ def set_extension_services_cpu_memory_quota(self, services_list):
_DROP_IN_FILE_CPU_ACCOUNTING)
files_to_create.append((drop_in_file_cpu_accounting, _DROP_IN_FILE_CPU_ACCOUNTING_CONTENTS))

cpu_quota = service.get('cpuQuotaPercentage', None)
if cpu_quota is not None:
cpu_quota = str(cpu_quota) + "%"
drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
cpu_quota_contents = _DROP_IN_FILE_CPU_QUOTA_CONTENTS_FORMAT.format(cpu_quota)
files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents))

self.__create_all_files(files_to_create)
self.__reload_systemd_config()

# reload the systemd configuration; the new unit will be used once the service restarts
def __reset_extension_services_cpu_quota(self, services_list):
"""
Removes any CPUQuota on the extension service
NOTE: This resets the quota on the extension service's default dropin file; any local overrides on the VM will take precedence
over this setting.
"""
if self.enabled() and services_list is not None:
try:
logger.info("Executing systemctl daemon-reload...")
shellutil.run_command(["systemctl", "daemon-reload"])
service_name = None
for service in services_list:
service_name = service.get('name', None)
unit_file_path = systemd.get_unit_file_install_path()
if service_name is not None and unit_file_path is not None:
files_to_create = []
drop_in_path = os.path.join(unit_file_path, "{0}.d".format(service_name))
cpu_quota = "" # setting an empty value resets to the default (infinity)
drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
cpu_quota_contents = _DROP_IN_FILE_CPU_QUOTA_CONTENTS_FORMAT.format(cpu_quota)
if os.path.exists(drop_in_file_cpu_quota):
with open(drop_in_file_cpu_quota, "r") as file_:
if file_.read() == cpu_quota_contents:
return
files_to_create.append((drop_in_file_cpu_quota, cpu_quota_contents))
self.__create_all_files(files_to_create)
except Exception as exception:
_log_cgroup_warning("daemon-reload failed (create service unit files): {0}", ustr(exception))
_log_cgroup_warning('Failed to reset CPUQuota for {0} : {1}', service_name, ustr(exception))

def remove_extension_services_drop_in_files(self, services_list):
"""
Expand All @@ -836,6 +887,11 @@ def remove_extension_services_drop_in_files(self, services_list):
drop_in_file_cpu_accounting = os.path.join(drop_in_path,
_DROP_IN_FILE_CPU_ACCOUNTING)
files_to_cleanup.append(drop_in_file_cpu_accounting)
cpu_quota = service.get('cpuQuotaPercentage', None)
if cpu_quota is not None:
drop_in_file_cpu_quota = os.path.join(drop_in_path, _DROP_IN_FILE_CPU_QUOTA)
files_to_cleanup.append(drop_in_file_cpu_quota)

CGroupConfigurator._Impl.__cleanup_all_files(files_to_cleanup)
_log_cgroup_info("Drop in files removed for {0}".format(service_name))

Expand All @@ -859,6 +915,31 @@ def start_tracking_extension_services_cgroups(self, services_list):
if service_name is not None:
self.start_tracking_unit_cgroups(service_name)

@staticmethod
def get_extension_services_list():
"""
ResourceLimits for extensions are coming from <extName>/HandlerManifest.json file.
Use this pattern to determine all the installed extension HandlerManifest files and
read the extension services if ResourceLimits are present.
"""
extensions_services = {}
for manifest_path in glob.iglob(os.path.join(conf.get_lib_dir(), "*/HandlerManifest.json")):
match = re.search("(?P<extname>[\\w+\\.-]+).HandlerManifest\\.json", manifest_path)
if match is not None:
extensions_name = match.group('extname')
if not extensions_name.startswith('WALinuxAgent'):
try:
data = json.loads(fileutil.read_file(manifest_path))
resource_limits = data[0].get('resourceLimits', None)
services = resource_limits.get('services') if resource_limits else None
extensions_services[extensions_name] = services
except (IOError, OSError) as e:
_log_cgroup_warning(
'Failed to load manifest file ({0}): {1}'.format(manifest_path, e.strerror))
except ValueError:
_log_cgroup_warning('Malformed manifest file ({0}).'.format(manifest_path))
return extensions_services

# unique instance for the singleton
_instance = None

Expand Down
4 changes: 4 additions & 0 deletions azurelinuxagent/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, logger=None, prefix=None):
self.logger = self if logger is None else logger
self.periodic_messages = {}
self.prefix = prefix
self.silent = False

def reset_periodic(self):
self.logger.periodic_messages = {}
Expand Down Expand Up @@ -124,6 +125,9 @@ def write_log(log_appender): # pylint: disable=W0612
finally:
log_appender.appender_lock = False

if self.silent:
return

# if msg_format is not unicode convert it to unicode
if type(msg_format) is not ustr:
msg_format = ustr(msg_format, errors="backslashreplace")
Expand Down
5 changes: 4 additions & 1 deletion azurelinuxagent/common/osutil/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from .nsbsd import NSBSDOSUtil
from .openbsd import OpenBSDOSUtil
from .openwrt import OpenWRTOSUtil
from .redhat import RedhatOSUtil, Redhat6xOSUtil
from .redhat import RedhatOSUtil, Redhat6xOSUtil, RedhatOSModernUtil
from .suse import SUSEOSUtil, SUSE11OSUtil
from .photonos import PhotonOSUtil
from .ubuntu import UbuntuOSUtil, Ubuntu12OSUtil, Ubuntu14OSUtil, \
Expand Down Expand Up @@ -118,6 +118,9 @@ def _get_osutil(distro_name, distro_code_name, distro_version, distro_full_name)
if Version(distro_version) < Version("7"):
return Redhat6xOSUtil()

if Version(distro_version) == Version("8.6") or Version(distro_version) > Version("9"):
return RedhatOSModernUtil()

return RedhatOSUtil()

if distro_name == "euleros":
Expand Down
22 changes: 22 additions & 0 deletions azurelinuxagent/common/osutil/redhat.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,25 @@ def get_dhcp_lease_endpoint(self):
endpoint = self.get_endpoint_from_leases_path('/var/lib/NetworkManager/dhclient-*.lease')

return endpoint


class RedhatOSModernUtil(RedhatOSUtil):
def __init__(self): # pylint: disable=W0235
super(RedhatOSModernUtil, self).__init__()

def restart_if(self, ifname, retries=3, wait=5):
"""
Restart an interface by bouncing the link. systemd-networkd observes
this event, and forces a renew of DHCP.
"""
retry_limit = retries + 1
for attempt in range(1, retry_limit):
return_code = shellutil.run("ip link set {0} down && ip link set {0} up".format(ifname))
if return_code == 0:
return
logger.warn("failed to restart {0}: return code {1}".format(ifname, return_code))
if attempt < retry_limit:
logger.info("retrying in {0} seconds".format(wait))
time.sleep(wait)
else:
logger.warn("exceeded restart retries")
Loading

0 comments on commit e2d16e0

Please sign in to comment.