Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor logcollector memory usage #2637

Merged
merged 4 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions azurelinuxagent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
PY_VERSION_MAJOR, PY_VERSION_MINOR, \
PY_VERSION_MICRO, GOAL_STATE_AGENT_VERSION, \
get_daemon_version, set_daemon_version
from azurelinuxagent.ga.collect_logs import CollectLogsHandler
from azurelinuxagent.ga.collect_logs import CollectLogsHandler, get_log_collector_monitor_handler
from azurelinuxagent.pa.provision.default import ProvisionHandler


Expand Down Expand Up @@ -196,36 +196,45 @@ def show_configuration(self):
print("{0} = {1}".format(k, configuration[k]))

def collect_logs(self, is_full_mode):
logger.set_prefix("LogCollector")

if is_full_mode:
print("Running log collector mode full")
logger.info("Running log collector mode full")
else:
print("Running log collector mode normal")
logger.info("Running log collector mode normal")

# Check the cgroups unit
cpu_cgroup_path, memory_cgroup_path, log_collector_monitor = None, None, None
if CollectLogsHandler.should_validate_cgroups():
cpu_cgroup_path, memory_cgroup_path = SystemdCgroupsApi.get_process_cgroup_relative_paths("self")
cgroups_api = SystemdCgroupsApi()
cpu_cgroup_path, memory_cgroup_path = cgroups_api.get_process_cgroup_paths("self")

cpu_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in cpu_cgroup_path)
memory_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in memory_cgroup_path)

if not cpu_slice_matches or not memory_slice_matches:
print("The Log Collector process is not in the proper cgroups:")
logger.info("The Log Collector process is not in the proper cgroups:")
if not cpu_slice_matches:
print("\tunexpected cpu slice")
logger.info("\tunexpected cpu slice")
if not memory_slice_matches:
print("\tunexpected memory slice")
logger.info("\tunexpected memory slice")

sys.exit(logcollector.INVALID_CGROUPS_ERRCODE)

try:
log_collector = LogCollector(is_full_mode)
log_collector = LogCollector(is_full_mode, cpu_cgroup_path, memory_cgroup_path)
log_collector_monitor = get_log_collector_monitor_handler(log_collector.cgroups)
log_collector_monitor.run()
archive = log_collector.collect_logs_and_get_archive()
print("Log collection successfully completed. Archive can be found at {0} "
logger.info("Log collection successfully completed. Archive can be found at {0} "
"and detailed log output can be found at {1}".format(archive, OUTPUT_RESULTS_FILE_PATH))
except Exception as e:
print("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
print("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
logger.error("Log collection completed unsuccessfully. Error: {0}".format(ustr(e)))
logger.info("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH))
sys.exit(1)
finally:
if log_collector_monitor is not None:
narrieta marked this conversation as resolved.
Show resolved Hide resolved
log_collector_monitor.stop()

@staticmethod
def setup_firewall(firewall_metadata):
Expand Down
1 change: 1 addition & 0 deletions azurelinuxagent/common/cgroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
_DEFAULT_REPORT_PERIOD = timedelta(seconds=conf.get_cgroup_check_period())

AGENT_NAME_TELEMETRY = "walinuxagent.service" # Name used for telemetry; it needs to be consistent even if the name of the service changes
AGENT_LOG_COLLECTOR = "azure-walinuxagent-logcollector"


class CounterNotFound(Exception):
Expand Down
6 changes: 2 additions & 4 deletions azurelinuxagent/common/cgroupconfigurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@
CPUAccounting=yes
CPUQuota={cpu_quota}
MemoryAccounting=yes
MemoryLimit={memory_limit}
"""
_LOGCOLLECTOR_CPU_QUOTA = "5%"
_LOGCOLLECTOR_MEMORY_LIMIT = "30M" # K for kb, M for mb
LOGCOLLECTOR_MEMORY_LIMIT = 30 * 1024 ** 2 # 30Mb

_AGENT_DROP_IN_FILE_SLICE = "10-Slice.conf"
_AGENT_DROP_IN_FILE_SLICE_CONTENTS = """
Expand Down Expand Up @@ -349,8 +348,7 @@ def __setup_azure_slice():
files_to_create.append((vmextensions_slice, _VMEXTENSIONS_SLICE_CONTENTS))

if not os.path.exists(logcollector_slice):
slice_contents = _LOGCOLLECTOR_SLICE_CONTENTS_FMT.format(cpu_quota=_LOGCOLLECTOR_CPU_QUOTA,
memory_limit=_LOGCOLLECTOR_MEMORY_LIMIT)
slice_contents = _LOGCOLLECTOR_SLICE_CONTENTS_FMT.format(cpu_quota=_LOGCOLLECTOR_CPU_QUOTA)

files_to_create.append((logcollector_slice, slice_contents))

Expand Down
27 changes: 25 additions & 2 deletions azurelinuxagent/common/logcollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
from datetime import datetime
from heapq import heappush, heappop

from azurelinuxagent.common.cgroup import CpuCgroup, AGENT_LOG_COLLECTOR, MemoryCgroup
from azurelinuxagent.common.conf import get_lib_dir, get_ext_log_dir, get_agent_log_file
from azurelinuxagent.common.event import initialize_event_logger_vminfo_common_parameters
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.logcollector_manifests import MANIFEST_NORMAL, MANIFEST_FULL

# Please note: be careful when adding agent dependencies in this module.
# This module uses its own logger and logs to its own file, not to the agent log.
from azurelinuxagent.common.protocol.util import get_protocol_util

_EXTENSION_LOG_DIR = get_ext_log_dir()
_AGENT_LIB_DIR = get_lib_dir()
Expand All @@ -45,7 +48,7 @@

CGROUPS_UNIT = "collect-logs.scope"

FORCE_KILLED_ERRCODE = -9
GRACEFUL_KILL_ERRCODE = 3
INVALID_CGROUPS_ERRCODE = 2

_MUST_COLLECT_FILES = [
Expand All @@ -67,12 +70,14 @@ class LogCollector(object):

_TRUNCATED_FILE_PREFIX = "truncated_"

def __init__(self, is_full_mode=False):
def __init__(self, is_full_mode=False, cpu_cgroup_path=None, memory_cgroup_path=None):
self._is_full_mode = is_full_mode
self._manifest = MANIFEST_FULL if is_full_mode else MANIFEST_NORMAL
self._must_collect_files = self._expand_must_collect_files()
self._create_base_dirs()
self._set_logger()
self._initialize_telemetry()
self.cgroups = self._set_resource_usage_cgroups(cpu_cgroup_path, memory_cgroup_path)

@staticmethod
def _mkdir(dirname):
Expand All @@ -99,6 +104,24 @@ def _set_logger():
_LOGGER.addHandler(_f_handler)
_LOGGER.setLevel(logging.INFO)

@staticmethod
def _set_resource_usage_cgroups(cpu_cgroup_path, memory_cgroup_path):
cpu_cgroup = CpuCgroup(AGENT_LOG_COLLECTOR, cpu_cgroup_path)
msg = "Started tracking cpu cgroup {0}".format(cpu_cgroup)
_LOGGER.info(msg)
cpu_cgroup.initialize_cpu_usage()
memory_cgroup = MemoryCgroup(AGENT_LOG_COLLECTOR, memory_cgroup_path)
msg = "Started tracking memory cgroup {0}".format(memory_cgroup)
_LOGGER.info(msg)
return [cpu_cgroup, memory_cgroup]

@staticmethod
def _initialize_telemetry():
protocol = get_protocol_util().get_protocol()
protocol.client.update_goal_state(force_update=True)
# Initialize the common parameters for telemetry events
initialize_event_logger_vminfo_common_parameters(protocol)

@staticmethod
def _run_shell_command(command, stdout=subprocess.PIPE, log_output=False):
"""
Expand Down
154 changes: 124 additions & 30 deletions azurelinuxagent/ga/collect_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#
# Requires Python 2.6+ and Openssl 1.0+
#

import datetime
import os
import sys
Expand All @@ -26,17 +25,19 @@

import azurelinuxagent.common.conf as conf
from azurelinuxagent.common import logger
from azurelinuxagent.common.event import elapsed_milliseconds, add_event, WALAEventOperation
from azurelinuxagent.common.future import subprocess_dev_null, ustr
from azurelinuxagent.common.cgroup import MetricsCounter
from azurelinuxagent.common.event import elapsed_milliseconds, add_event, WALAEventOperation, report_metric
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.interfaces import ThreadHandlerInterface
from azurelinuxagent.common.logcollector import COMPRESSED_ARCHIVE_PATH
from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator
from azurelinuxagent.common.logcollector import COMPRESSED_ARCHIVE_PATH, GRACEFUL_KILL_ERRCODE
from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator, LOGCOLLECTOR_MEMORY_LIMIT
from azurelinuxagent.common.protocol.util import get_protocol_util
from azurelinuxagent.common.utils import shellutil
from azurelinuxagent.common.utils.shellutil import CommandError
from azurelinuxagent.common.version import PY_VERSION_MAJOR, PY_VERSION_MINOR, AGENT_NAME, CURRENT_VERSION

_INITIAL_LOG_COLLECTION_DELAY = 5 * 60 # Five minutes of delay
_INITIAL_LOG_COLLECTION_DELAY = 5 * 60 # Five minutes of delay


def get_collect_logs_handler():
return CollectLogsHandler()
Expand Down Expand Up @@ -128,7 +129,10 @@ def stopped(self):
def stop(self):
self.should_run = False
if self.is_alive():
self.join()
try:
self.join()
except RuntimeError:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeError being raised when agent try to stop the thread to exit gracefully. Seems like expected error ignoring this

File "bin/WALinuxAgent-9.9.9.9-py3.8.egg/azurelinuxagent/ga/collect_logs.py", line 123, in join
self.event_thread.join()
File "/usr/lib/python3.6/threading.py", line 1053, in join
raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

pass

def init_protocols(self):
# The initialization of ProtocolUtil for the log collection thread should be done within the thread itself
Expand Down Expand Up @@ -167,28 +171,31 @@ def collect_and_send_logs(self):
def _collect_logs(self):
logger.info("Starting log collection...")

# Invoke the command line tool in the agent to collect logs, with resource limits on CPU and memory (RAM).
# Invoke the command line tool in the agent to collect logs, with resource limits on CPU.
# Some distros like ubuntu20 by default cpu and memory accounting enabled. Thus create nested cgroups under the logcollector slice
# So disabling CPU and Memory accounting prevents from creating nested cgroups, so that all the counters will be present in logcollector Cgroup

systemd_cmd = [
"systemd-run", "--unit={0}".format(logcollector.CGROUPS_UNIT),
"systemd-run", "--property=CPUAccounting=no", "--property=MemoryAccounting=no",
narrieta marked this conversation as resolved.
Show resolved Hide resolved
"--unit={0}".format(logcollector.CGROUPS_UNIT),
"--slice={0}".format(cgroupconfigurator.LOGCOLLECTOR_SLICE), "--scope"
]

# The log tool is invoked from the current agent's egg with the command line option
collect_logs_cmd = [sys.executable, "-u", sys.argv[0], "-collect-logs"]
final_command = systemd_cmd + collect_logs_cmd

def exec_command(output_file):
def exec_command():
start_time = datetime.datetime.utcnow()
success = False
msg = None
try:
shellutil.run_command(final_command, log_error=False, stdout=output_file, stderr=output_file)
shellutil.run_command(final_command, log_error=False)
duration = elapsed_milliseconds(start_time)
archive_size = os.path.getsize(COMPRESSED_ARCHIVE_PATH)

msg = "Successfully collected logs. Archive size: {0} b, elapsed time: {1} ms.".format(archive_size,
duration)
duration)
logger.info(msg)
success = True

Expand All @@ -201,16 +208,17 @@ def exec_command(output_file):
# pylint has limited (i.e. no) awareness of control flow w.r.t. typing. we disable=no-member
# here because we know e must be a CommandError but pylint still considers the case where
# e is a different type of exception.
err_msg = ustr("Log Collector exited with code {0}").format(e.returncode) # pylint: disable=no-member
err_msg = ustr("Log Collector exited with code {0}").format(
e.returncode) # pylint: disable=no-member

if e.returncode == logcollector.INVALID_CGROUPS_ERRCODE: # pylint: disable=no-member
if e.returncode == logcollector.INVALID_CGROUPS_ERRCODE: # pylint: disable=no-member
logger.info("Disabling periodic log collection until service restart due to process error.")
self.stop()
# When the OOM killer is invoked on the log collector process, this error code is
# returned. Stop the periodic operation because it seems to be persistent.
elif e.returncode == logcollector.FORCE_KILLED_ERRCODE: # pylint: disable=no-member
logger.info("Disabling periodic log collection until service restart due to OOM error.")

# When the log collector memory limit is exceeded, Agent gracefully exit the process with this error code.
# Stop the periodic operation because it seems to be persistent.
elif e.returncode == logcollector.GRACEFUL_KILL_ERRCODE: # pylint: disable=no-member
logger.info("Disabling periodic log collection until service restart due to exceeded process memory limit.")
self.stop()
else:
logger.info(err_msg)
Expand All @@ -227,17 +235,8 @@ def exec_command(output_file):
is_success=success,
message=msg,
log_event=False)

try:
logfile = open(conf.get_agent_log_file(), "a+")
except Exception:
with subprocess_dev_null() as DEVNULL:
return exec_command(DEVNULL)
else:
return exec_command(logfile)
finally:
if logfile is not None:
logfile.close()

return exec_command()

def _send_logs(self):
msg = None
Expand All @@ -261,3 +260,98 @@ def _send_logs(self):
is_success=success,
message=msg,
log_event=False)


def get_log_collector_monitor_handler(cgroups):
return LogCollectorMonitorHandler(cgroups)


class LogCollectorMonitorHandler(ThreadHandlerInterface):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Montor thread

"""
Periodically monitor and checks the Log collector Cgroups and sends telemetry to Kusto.
"""

_THREAD_NAME = "LogCollectorMonitorHandler"

@staticmethod
def get_thread_name():
return LogCollectorMonitorHandler._THREAD_NAME

def __init__(self, cgroups):
self.event_thread = None
self.should_run = True
self.period = 2 # Log collector monitor runs every 2 secs.
self.cgroups = cgroups
self.__log_metrics = conf.get_cgroup_log_metrics()

def run(self):
self.start()

def stop(self):
self.should_run = False
if self.is_alive():
self.join()

def join(self):
self.event_thread.join()

def stopped(self):
return not self.should_run

def is_alive(self):
return self.event_thread is not None and self.event_thread.is_alive()

def start(self):
self.event_thread = threading.Thread(target=self.daemon)
self.event_thread.setDaemon(True)
self.event_thread.setName(self.get_thread_name())
self.event_thread.start()

def daemon(self):
try:
while not self.stopped():
try:
metrics = self._poll_resource_usage()
self._send_telemetry(metrics)
self._verify_memory_limit(metrics)
except Exception as e:
logger.error("An error occurred in the log collection monitor thread loop; "
"will skip the current iteration.\n{0}", ustr(e))
finally:
time.sleep(self.period)
except Exception as e:
logger.error(
"An error occurred in the MonitorLogCollectorCgroupsHandler thread; will exit the thread.\n{0}",
ustr(e))

def _poll_resource_usage(self):
metrics = []
for cgroup in self.cgroups:
metrics.extend(cgroup.get_tracked_metrics(track_throttled_time=True))
return metrics

def _send_telemetry(self, metrics):
for metric in metrics:
report_metric(metric.category, metric.counter, metric.instance, metric.value, log_event=self.__log_metrics)

def _verify_memory_limit(self, metrics):
current_usage = 0
max_usage = 0
for metric in metrics:
if metric.counter == MetricsCounter.TOTAL_MEM_USAGE:
current_usage += metric.value
elif metric.counter == MetricsCounter.SWAP_MEM_USAGE:
current_usage += metric.value
elif metric.counter == MetricsCounter.MAX_MEM_USAGE:
max_usage = metric.value

current_max = max(current_usage, max_usage)
if current_max > LOGCOLLECTOR_MEMORY_LIMIT:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block checks the memory limit and sends exit

msg = "Log collector memory limit {0} bytes exceeded. The max reported usage is {1} bytes.".format(LOGCOLLECTOR_MEMORY_LIMIT, current_max)
logger.info(msg)
add_event(
name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.LogCollection,
message=msg)
os._exit(GRACEFUL_KILL_ERRCODE)
Loading