diff --git a/azurelinuxagent/agent.py b/azurelinuxagent/agent.py index b70429a129..8c303482e8 100644 --- a/azurelinuxagent/agent.py +++ b/azurelinuxagent/agent.py @@ -45,7 +45,7 @@ PY_VERSION_MAJOR, PY_VERSION_MINOR, \ PY_VERSION_MICRO, GOAL_STATE_AGENT_VERSION, \ get_daemon_version, set_daemon_version -from azurelinuxagent.ga.collect_logs import CollectLogsHandler +from azurelinuxagent.ga.collect_logs import CollectLogsHandler, get_log_collector_monitor_handler from azurelinuxagent.pa.provision.default import ProvisionHandler @@ -196,36 +196,45 @@ def show_configuration(self): print("{0} = {1}".format(k, configuration[k])) def collect_logs(self, is_full_mode): + logger.set_prefix("LogCollector") + if is_full_mode: - print("Running log collector mode full") + logger.info("Running log collector mode full") else: - print("Running log collector mode normal") + logger.info("Running log collector mode normal") # Check the cgroups unit + cpu_cgroup_path, memory_cgroup_path, log_collector_monitor = None, None, None if CollectLogsHandler.should_validate_cgroups(): - cpu_cgroup_path, memory_cgroup_path = SystemdCgroupsApi.get_process_cgroup_relative_paths("self") + cgroups_api = SystemdCgroupsApi() + cpu_cgroup_path, memory_cgroup_path = cgroups_api.get_process_cgroup_paths("self") cpu_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in cpu_cgroup_path) memory_slice_matches = (cgroupconfigurator.LOGCOLLECTOR_SLICE in memory_cgroup_path) if not cpu_slice_matches or not memory_slice_matches: - print("The Log Collector process is not in the proper cgroups:") + logger.info("The Log Collector process is not in the proper cgroups:") if not cpu_slice_matches: - print("\tunexpected cpu slice") + logger.info("\tunexpected cpu slice") if not memory_slice_matches: - print("\tunexpected memory slice") + logger.info("\tunexpected memory slice") sys.exit(logcollector.INVALID_CGROUPS_ERRCODE) try: - log_collector = LogCollector(is_full_mode) + log_collector = LogCollector(is_full_mode, cpu_cgroup_path, memory_cgroup_path) + log_collector_monitor = get_log_collector_monitor_handler(log_collector.cgroups) + log_collector_monitor.run() archive = log_collector.collect_logs_and_get_archive() - print("Log collection successfully completed. Archive can be found at {0} " + logger.info("Log collection successfully completed. Archive can be found at {0} " "and detailed log output can be found at {1}".format(archive, OUTPUT_RESULTS_FILE_PATH)) except Exception as e: - print("Log collection completed unsuccessfully. Error: {0}".format(ustr(e))) - print("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH)) + logger.error("Log collection completed unsuccessfully. Error: {0}".format(ustr(e))) + logger.info("Detailed log output can be found at {0}".format(OUTPUT_RESULTS_FILE_PATH)) sys.exit(1) + finally: + if log_collector_monitor is not None: + log_collector_monitor.stop() @staticmethod def setup_firewall(firewall_metadata): diff --git a/azurelinuxagent/common/cgroup.py b/azurelinuxagent/common/cgroup.py index 2af4c0a117..b22ea2994e 100644 --- a/azurelinuxagent/common/cgroup.py +++ b/azurelinuxagent/common/cgroup.py @@ -29,6 +29,7 @@ _DEFAULT_REPORT_PERIOD = timedelta(seconds=conf.get_cgroup_check_period()) AGENT_NAME_TELEMETRY = "walinuxagent.service" # Name used for telemetry; it needs to be consistent even if the name of the service changes +AGENT_LOG_COLLECTOR = "azure-walinuxagent-logcollector" class CounterNotFound(Exception): diff --git a/azurelinuxagent/common/cgroupconfigurator.py b/azurelinuxagent/common/cgroupconfigurator.py index d79af493d6..31b8457894 100644 --- a/azurelinuxagent/common/cgroupconfigurator.py +++ b/azurelinuxagent/common/cgroupconfigurator.py @@ -74,10 +74,9 @@ CPUAccounting=yes CPUQuota={cpu_quota} MemoryAccounting=yes -MemoryLimit={memory_limit} """ _LOGCOLLECTOR_CPU_QUOTA = "5%" -_LOGCOLLECTOR_MEMORY_LIMIT = "30M" # K for kb, M for mb +LOGCOLLECTOR_MEMORY_LIMIT = 30 * 1024 ** 2 # 30Mb _AGENT_DROP_IN_FILE_SLICE = "10-Slice.conf" _AGENT_DROP_IN_FILE_SLICE_CONTENTS = """ @@ -349,8 +348,7 @@ def __setup_azure_slice(): files_to_create.append((vmextensions_slice, _VMEXTENSIONS_SLICE_CONTENTS)) if not os.path.exists(logcollector_slice): - slice_contents = _LOGCOLLECTOR_SLICE_CONTENTS_FMT.format(cpu_quota=_LOGCOLLECTOR_CPU_QUOTA, - memory_limit=_LOGCOLLECTOR_MEMORY_LIMIT) + slice_contents = _LOGCOLLECTOR_SLICE_CONTENTS_FMT.format(cpu_quota=_LOGCOLLECTOR_CPU_QUOTA) files_to_create.append((logcollector_slice, slice_contents)) diff --git a/azurelinuxagent/common/logcollector.py b/azurelinuxagent/common/logcollector.py index 9b88681fa1..b0da848fc5 100644 --- a/azurelinuxagent/common/logcollector.py +++ b/azurelinuxagent/common/logcollector.py @@ -26,12 +26,15 @@ from datetime import datetime from heapq import heappush, heappop +from azurelinuxagent.common.cgroup import CpuCgroup, AGENT_LOG_COLLECTOR, MemoryCgroup from azurelinuxagent.common.conf import get_lib_dir, get_ext_log_dir, get_agent_log_file +from azurelinuxagent.common.event import initialize_event_logger_vminfo_common_parameters from azurelinuxagent.common.future import ustr from azurelinuxagent.common.logcollector_manifests import MANIFEST_NORMAL, MANIFEST_FULL # Please note: be careful when adding agent dependencies in this module. # This module uses its own logger and logs to its own file, not to the agent log. +from azurelinuxagent.common.protocol.util import get_protocol_util _EXTENSION_LOG_DIR = get_ext_log_dir() _AGENT_LIB_DIR = get_lib_dir() @@ -45,7 +48,7 @@ CGROUPS_UNIT = "collect-logs.scope" -FORCE_KILLED_ERRCODE = -9 +GRACEFUL_KILL_ERRCODE = 3 INVALID_CGROUPS_ERRCODE = 2 _MUST_COLLECT_FILES = [ @@ -67,12 +70,14 @@ class LogCollector(object): _TRUNCATED_FILE_PREFIX = "truncated_" - def __init__(self, is_full_mode=False): + def __init__(self, is_full_mode=False, cpu_cgroup_path=None, memory_cgroup_path=None): self._is_full_mode = is_full_mode self._manifest = MANIFEST_FULL if is_full_mode else MANIFEST_NORMAL self._must_collect_files = self._expand_must_collect_files() self._create_base_dirs() self._set_logger() + self._initialize_telemetry() + self.cgroups = self._set_resource_usage_cgroups(cpu_cgroup_path, memory_cgroup_path) @staticmethod def _mkdir(dirname): @@ -99,6 +104,24 @@ def _set_logger(): _LOGGER.addHandler(_f_handler) _LOGGER.setLevel(logging.INFO) + @staticmethod + def _set_resource_usage_cgroups(cpu_cgroup_path, memory_cgroup_path): + cpu_cgroup = CpuCgroup(AGENT_LOG_COLLECTOR, cpu_cgroup_path) + msg = "Started tracking cpu cgroup {0}".format(cpu_cgroup) + _LOGGER.info(msg) + cpu_cgroup.initialize_cpu_usage() + memory_cgroup = MemoryCgroup(AGENT_LOG_COLLECTOR, memory_cgroup_path) + msg = "Started tracking memory cgroup {0}".format(memory_cgroup) + _LOGGER.info(msg) + return [cpu_cgroup, memory_cgroup] + + @staticmethod + def _initialize_telemetry(): + protocol = get_protocol_util().get_protocol() + protocol.client.update_goal_state(force_update=True) + # Initialize the common parameters for telemetry events + initialize_event_logger_vminfo_common_parameters(protocol) + @staticmethod def _run_shell_command(command, stdout=subprocess.PIPE, log_output=False): """ diff --git a/azurelinuxagent/ga/collect_logs.py b/azurelinuxagent/ga/collect_logs.py index dc62fccf21..616d875a30 100644 --- a/azurelinuxagent/ga/collect_logs.py +++ b/azurelinuxagent/ga/collect_logs.py @@ -16,7 +16,6 @@ # # Requires Python 2.6+ and Openssl 1.0+ # - import datetime import os import sys @@ -26,17 +25,19 @@ import azurelinuxagent.common.conf as conf from azurelinuxagent.common import logger -from azurelinuxagent.common.event import elapsed_milliseconds, add_event, WALAEventOperation -from azurelinuxagent.common.future import subprocess_dev_null, ustr +from azurelinuxagent.common.cgroup import MetricsCounter +from azurelinuxagent.common.event import elapsed_milliseconds, add_event, WALAEventOperation, report_metric +from azurelinuxagent.common.future import ustr from azurelinuxagent.common.interfaces import ThreadHandlerInterface -from azurelinuxagent.common.logcollector import COMPRESSED_ARCHIVE_PATH -from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator +from azurelinuxagent.common.logcollector import COMPRESSED_ARCHIVE_PATH, GRACEFUL_KILL_ERRCODE +from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator, LOGCOLLECTOR_MEMORY_LIMIT from azurelinuxagent.common.protocol.util import get_protocol_util from azurelinuxagent.common.utils import shellutil from azurelinuxagent.common.utils.shellutil import CommandError from azurelinuxagent.common.version import PY_VERSION_MAJOR, PY_VERSION_MINOR, AGENT_NAME, CURRENT_VERSION -_INITIAL_LOG_COLLECTION_DELAY = 5 * 60 # Five minutes of delay +_INITIAL_LOG_COLLECTION_DELAY = 5 * 60 # Five minutes of delay + def get_collect_logs_handler(): return CollectLogsHandler() @@ -128,7 +129,10 @@ def stopped(self): def stop(self): self.should_run = False if self.is_alive(): - self.join() + try: + self.join() + except RuntimeError: + pass def init_protocols(self): # The initialization of ProtocolUtil for the log collection thread should be done within the thread itself @@ -167,10 +171,13 @@ def collect_and_send_logs(self): def _collect_logs(self): logger.info("Starting log collection...") - # Invoke the command line tool in the agent to collect logs, with resource limits on CPU and memory (RAM). + # Invoke the command line tool in the agent to collect logs, with resource limits on CPU. + # Some distros like ubuntu20 by default cpu and memory accounting enabled. Thus create nested cgroups under the logcollector slice + # So disabling CPU and Memory accounting prevents from creating nested cgroups, so that all the counters will be present in logcollector Cgroup systemd_cmd = [ - "systemd-run", "--unit={0}".format(logcollector.CGROUPS_UNIT), + "systemd-run", "--property=CPUAccounting=no", "--property=MemoryAccounting=no", + "--unit={0}".format(logcollector.CGROUPS_UNIT), "--slice={0}".format(cgroupconfigurator.LOGCOLLECTOR_SLICE), "--scope" ] @@ -178,17 +185,17 @@ def _collect_logs(self): collect_logs_cmd = [sys.executable, "-u", sys.argv[0], "-collect-logs"] final_command = systemd_cmd + collect_logs_cmd - def exec_command(output_file): + def exec_command(): start_time = datetime.datetime.utcnow() success = False msg = None try: - shellutil.run_command(final_command, log_error=False, stdout=output_file, stderr=output_file) + shellutil.run_command(final_command, log_error=False) duration = elapsed_milliseconds(start_time) archive_size = os.path.getsize(COMPRESSED_ARCHIVE_PATH) msg = "Successfully collected logs. Archive size: {0} b, elapsed time: {1} ms.".format(archive_size, - duration) + duration) logger.info(msg) success = True @@ -201,16 +208,17 @@ def exec_command(output_file): # pylint has limited (i.e. no) awareness of control flow w.r.t. typing. we disable=no-member # here because we know e must be a CommandError but pylint still considers the case where # e is a different type of exception. - err_msg = ustr("Log Collector exited with code {0}").format(e.returncode) # pylint: disable=no-member + err_msg = ustr("Log Collector exited with code {0}").format( + e.returncode) # pylint: disable=no-member - if e.returncode == logcollector.INVALID_CGROUPS_ERRCODE: # pylint: disable=no-member + if e.returncode == logcollector.INVALID_CGROUPS_ERRCODE: # pylint: disable=no-member logger.info("Disabling periodic log collection until service restart due to process error.") self.stop() - - # When the OOM killer is invoked on the log collector process, this error code is - # returned. Stop the periodic operation because it seems to be persistent. - elif e.returncode == logcollector.FORCE_KILLED_ERRCODE: # pylint: disable=no-member - logger.info("Disabling periodic log collection until service restart due to OOM error.") + + # When the log collector memory limit is exceeded, Agent gracefully exit the process with this error code. + # Stop the periodic operation because it seems to be persistent. + elif e.returncode == logcollector.GRACEFUL_KILL_ERRCODE: # pylint: disable=no-member + logger.info("Disabling periodic log collection until service restart due to exceeded process memory limit.") self.stop() else: logger.info(err_msg) @@ -227,17 +235,8 @@ def exec_command(output_file): is_success=success, message=msg, log_event=False) - - try: - logfile = open(conf.get_agent_log_file(), "a+") - except Exception: - with subprocess_dev_null() as DEVNULL: - return exec_command(DEVNULL) - else: - return exec_command(logfile) - finally: - if logfile is not None: - logfile.close() + + return exec_command() def _send_logs(self): msg = None @@ -261,3 +260,98 @@ def _send_logs(self): is_success=success, message=msg, log_event=False) + + +def get_log_collector_monitor_handler(cgroups): + return LogCollectorMonitorHandler(cgroups) + + +class LogCollectorMonitorHandler(ThreadHandlerInterface): + """ + Periodically monitor and checks the Log collector Cgroups and sends telemetry to Kusto. + """ + + _THREAD_NAME = "LogCollectorMonitorHandler" + + @staticmethod + def get_thread_name(): + return LogCollectorMonitorHandler._THREAD_NAME + + def __init__(self, cgroups): + self.event_thread = None + self.should_run = True + self.period = 2 # Log collector monitor runs every 2 secs. + self.cgroups = cgroups + self.__log_metrics = conf.get_cgroup_log_metrics() + + def run(self): + self.start() + + def stop(self): + self.should_run = False + if self.is_alive(): + self.join() + + def join(self): + self.event_thread.join() + + def stopped(self): + return not self.should_run + + def is_alive(self): + return self.event_thread is not None and self.event_thread.is_alive() + + def start(self): + self.event_thread = threading.Thread(target=self.daemon) + self.event_thread.setDaemon(True) + self.event_thread.setName(self.get_thread_name()) + self.event_thread.start() + + def daemon(self): + try: + while not self.stopped(): + try: + metrics = self._poll_resource_usage() + self._send_telemetry(metrics) + self._verify_memory_limit(metrics) + except Exception as e: + logger.error("An error occurred in the log collection monitor thread loop; " + "will skip the current iteration.\n{0}", ustr(e)) + finally: + time.sleep(self.period) + except Exception as e: + logger.error( + "An error occurred in the MonitorLogCollectorCgroupsHandler thread; will exit the thread.\n{0}", + ustr(e)) + + def _poll_resource_usage(self): + metrics = [] + for cgroup in self.cgroups: + metrics.extend(cgroup.get_tracked_metrics(track_throttled_time=True)) + return metrics + + def _send_telemetry(self, metrics): + for metric in metrics: + report_metric(metric.category, metric.counter, metric.instance, metric.value, log_event=self.__log_metrics) + + def _verify_memory_limit(self, metrics): + current_usage = 0 + max_usage = 0 + for metric in metrics: + if metric.counter == MetricsCounter.TOTAL_MEM_USAGE: + current_usage += metric.value + elif metric.counter == MetricsCounter.SWAP_MEM_USAGE: + current_usage += metric.value + elif metric.counter == MetricsCounter.MAX_MEM_USAGE: + max_usage = metric.value + + current_max = max(current_usage, max_usage) + if current_max > LOGCOLLECTOR_MEMORY_LIMIT: + msg = "Log collector memory limit {0} bytes exceeded. The max reported usage is {1} bytes.".format(LOGCOLLECTOR_MEMORY_LIMIT, current_max) + logger.info(msg) + add_event( + name=AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.LogCollection, + message=msg) + os._exit(GRACEFUL_KILL_ERRCODE) diff --git a/tests/common/test_logcollector.py b/tests/common/test_logcollector.py index d601d93cab..521e0f23ed 100644 --- a/tests/common/test_logcollector.py +++ b/tests/common/test_logcollector.py @@ -23,8 +23,9 @@ import zipfile from azurelinuxagent.common.logcollector import LogCollector +from azurelinuxagent.common.utils import fileutil from azurelinuxagent.common.utils.fileutil import rm_dirs, mkdir, rm_files -from tests.tools import AgentTestCase, is_python_version_26, patch, skip_if_predicate_true +from tests.tools import AgentTestCase, is_python_version_26, patch, skip_if_predicate_true, data_dir SMALL_FILE_SIZE = 1 * 1024 * 1024 # 1 MB LARGE_FILE_SIZE = 5 * 1024 * 1024 # 5 MB @@ -43,6 +44,7 @@ def setUpClass(cls): mkdir(cls.root_collect_dir) cls._mock_constants() + cls._mock_cgroup() @classmethod def _mock_constants(cls): @@ -69,6 +71,22 @@ def _mock_constants(cls): cls.compressed_archive_path) cls.mock_compressed_archive_path.start() + @classmethod + def _mock_cgroup(cls): + # CPU Cgroups compute usage based on /proc/stat and /sys/fs/cgroup/.../cpuacct.stat; use mock data for those + # files + original_read_file = fileutil.read_file + + def mock_read_file(filepath, **args): + if filepath == "/proc/stat": + filepath = os.path.join(data_dir, "cgroups", "proc_stat_t0") + elif filepath.endswith("/cpuacct.stat"): + filepath = os.path.join(data_dir, "cgroups", "cpuacct.stat_t0") + return original_read_file(filepath, **args) + + cls._mock_read_cpu_cgroup_file = patch("azurelinuxagent.common.utils.fileutil.read_file", side_effect=mock_read_file) + cls._mock_read_cpu_cgroup_file.start() + @classmethod def tearDownClass(cls): cls.mock_manifest.stop() @@ -76,6 +94,7 @@ def tearDownClass(cls): cls.mock_truncated_files_dir.stop() cls.mock_output_results_file_path.stop() cls.mock_compressed_archive_path.stop() + cls._mock_read_cpu_cgroup_file.stop() shutil.rmtree(cls.tmp_dir) @@ -192,8 +211,9 @@ def test_log_collector_parses_commands_in_manifest(self): diskinfo,""".format(folder_to_list, file_to_collect) with patch("azurelinuxagent.common.logcollector.MANIFEST_NORMAL", manifest): - log_collector = LogCollector() - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() with open(self.output_results_file_path, "r") as fh: results = fh.readlines() @@ -220,8 +240,9 @@ def test_log_collector_uses_full_manifest_when_full_mode_enabled(self): """.format(file_to_collect) with patch("azurelinuxagent.common.logcollector.MANIFEST_FULL", manifest): - log_collector = LogCollector(is_full_mode=True) - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(is_full_mode=True, cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(archive) self._assert_files_are_in_archive(expected_files=[file_to_collect]) @@ -233,8 +254,9 @@ def test_log_collector_should_collect_all_files(self): # All files in the manifest should be collected, since none of them are over the individual file size limit, # and combined they do not cross the archive size threshold. - log_collector = LogCollector() - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(archive) @@ -254,8 +276,9 @@ def test_log_collector_should_collect_all_files(self): def test_log_collector_should_truncate_large_text_files_and_ignore_large_binary_files(self): # Set the size limit so that some files are too large to collect in full. with patch("azurelinuxagent.common.logcollector._FILE_SIZE_LIMIT", SMALL_FILE_SIZE): - log_collector = LogCollector() - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(archive) @@ -287,8 +310,9 @@ def test_log_collector_should_prioritize_important_files_if_archive_too_big(self with patch("azurelinuxagent.common.logcollector._UNCOMPRESSED_ARCHIVE_SIZE_LIMIT", 10 * 1024 * 1024): with patch("azurelinuxagent.common.logcollector._MUST_COLLECT_FILES", must_collect_files): - log_collector = LogCollector() - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(archive) @@ -337,8 +361,9 @@ def test_log_collector_should_prioritize_important_files_if_archive_too_big(self def test_log_collector_should_update_archive_when_files_are_new_or_modified_or_deleted(self): # Ensure the archive reflects the state of files on the disk at collection time. If a file was updated, it # needs to be updated in the archive, deleted if removed from disk, and added if not previously seen. - log_collector = LogCollector() - first_archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + first_archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(first_archive) # Everything should be in the archive @@ -407,8 +432,9 @@ def test_log_collector_should_clean_up_uncollected_truncated_files(self): with patch("azurelinuxagent.common.logcollector._UNCOMPRESSED_ARCHIVE_SIZE_LIMIT", 2 * SMALL_FILE_SIZE): with patch("azurelinuxagent.common.logcollector._MUST_COLLECT_FILES", must_collect_files): with patch("azurelinuxagent.common.logcollector._FILE_SIZE_LIMIT", SMALL_FILE_SIZE): - log_collector = LogCollector() - archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + archive = log_collector.collect_logs_and_get_archive() self._assert_archive_created(archive) @@ -428,8 +454,9 @@ def test_log_collector_should_clean_up_uncollected_truncated_files(self): with patch("azurelinuxagent.common.logcollector._UNCOMPRESSED_ARCHIVE_SIZE_LIMIT", 2 * SMALL_FILE_SIZE): with patch("azurelinuxagent.common.logcollector._MUST_COLLECT_FILES", must_collect_files): with patch("azurelinuxagent.common.logcollector._FILE_SIZE_LIMIT", SMALL_FILE_SIZE): - log_collector = LogCollector() - second_archive = log_collector.collect_logs_and_get_archive() + with patch('azurelinuxagent.common.logcollector.LogCollector._initialize_telemetry'): + log_collector = LogCollector(cpu_cgroup_path="dummy_cpu_path", memory_cgroup_path="dummy_memory_path") + second_archive = log_collector.collect_logs_and_get_archive() expected_files = [ os.path.join(self.root_collect_dir, "waagent.log"), diff --git a/tests/ga/test_collect_logs.py b/tests/ga/test_collect_logs.py index 27a03f5598..14593726d6 100644 --- a/tests/ga/test_collect_logs.py +++ b/tests/ga/test_collect_logs.py @@ -18,15 +18,18 @@ import os from azurelinuxagent.common import logger, conf +from azurelinuxagent.common.cgroup import CpuCgroup, MemoryCgroup, MetricValue from azurelinuxagent.common.cgroupconfigurator import CGroupConfigurator from azurelinuxagent.common.logger import Logger from azurelinuxagent.common.protocol.util import ProtocolUtil -from azurelinuxagent.ga.collect_logs import get_collect_logs_handler, is_log_collection_allowed +from azurelinuxagent.common.utils import fileutil +from azurelinuxagent.ga.collect_logs import get_collect_logs_handler, is_log_collection_allowed, \ + get_log_collector_monitor_handler from tests.protocol.mocks import mock_wire_protocol, MockHttpResponse from tests.protocol.HttpRequestPredicates import HttpRequestPredicates from tests.protocol.mockwiredata import DATA_FILE from tests.tools import Mock, MagicMock, patch, AgentTestCase, clear_singleton_instances, skip_if_predicate_true, \ - is_python_version_26 + is_python_version_26, data_dir @contextlib.contextmanager @@ -46,13 +49,14 @@ def _create_collect_logs_handler(iterations=1, cgroups_enabled=True, collect_log protocol_util = MagicMock() protocol_util.get_protocol = Mock(return_value=protocol) with patch("azurelinuxagent.ga.collect_logs.get_protocol_util", return_value=protocol_util): - with patch("azurelinuxagent.ga.collect_logs.CollectLogsHandler.stopped", side_effect=[False] * iterations + [True]): + with patch("azurelinuxagent.ga.collect_logs.CollectLogsHandler.stopped", + side_effect=[False] * iterations + [True]): with patch("time.sleep"): - # Grab the singleton to patch it cgroups_configurator_singleton = CGroupConfigurator.get_instance() with patch.object(cgroups_configurator_singleton, "enabled", return_value=cgroups_enabled): - with patch("azurelinuxagent.ga.collect_logs.conf.get_collect_logs", return_value=collect_logs_conf): + with patch("azurelinuxagent.ga.collect_logs.conf.get_collect_logs", + return_value=collect_logs_conf): def run_and_wait(): collect_logs_handler.run() collect_logs_handler.join() @@ -140,11 +144,14 @@ def http_put_handler(url, content, **__): collect_logs_handler.run_and_wait() self.assertEqual(http_put_handler.counter, 1, "The PUT API to upload logs should have been called once") self.assertTrue(os.path.exists(self.archive_path), "The archive file should exist on disk") - self.assertEqual(archive_size, len(http_put_handler.archive), "The archive file should have {0} bytes, not {1}".format(archive_size, len(http_put_handler.archive))) + self.assertEqual(archive_size, len(http_put_handler.archive), + "The archive file should have {0} bytes, not {1}".format(archive_size, + len(http_put_handler.archive))) def test_it_does_not_upload_logs_when_collection_is_unsuccessful(self): with _create_collect_logs_handler() as collect_logs_handler: - with patch("azurelinuxagent.ga.collect_logs.shellutil.run_command", side_effect=Exception("test exception")): + with patch("azurelinuxagent.ga.collect_logs.shellutil.run_command", + side_effect=Exception("test exception")): def http_put_handler(url, _, **__): if self.is_host_plugin_put_logs_request(url): http_put_handler.counter += 1 @@ -158,3 +165,75 @@ def http_put_handler(url, _, **__): collect_logs_handler.run_and_wait() self.assertFalse(os.path.exists(self.archive_path), "The archive file should not exist on disk") self.assertEqual(http_put_handler.counter, 0, "The PUT API to upload logs shouldn't have been called") + + +@contextlib.contextmanager +def _create_log_collector_monitor_handler(iterations=1): + """ + Creates an instance of LogCollectorMonitorHandler that + * Runs its main loop only the number of times given in the 'iterations' parameter, and + * Does not sleep at the end of each iteration + + The returned CollectLogsHandler is augmented with 2 methods: + * run_and_wait() - invokes run() and wait() on the CollectLogsHandler + + """ + with patch("azurelinuxagent.ga.collect_logs.LogCollectorMonitorHandler.stopped", + side_effect=[False] * iterations + [True]): + with patch("time.sleep"): + + original_read_file = fileutil.read_file + + def mock_read_file(filepath, **args): + if filepath == "/proc/stat": + filepath = os.path.join(data_dir, "cgroups", "proc_stat_t0") + elif filepath.endswith("/cpuacct.stat"): + filepath = os.path.join(data_dir, "cgroups", "cpuacct.stat_t0") + return original_read_file(filepath, **args) + + with patch("azurelinuxagent.common.utils.fileutil.read_file", side_effect=mock_read_file): + def run_and_wait(): + monitor_log_collector.run() + monitor_log_collector.join() + + cgroups = [ + CpuCgroup("test", "dummy_cpu_path"), + MemoryCgroup("test", "dummy_memory_path") + ] + monitor_log_collector = get_log_collector_monitor_handler(cgroups) + monitor_log_collector.run_and_wait = run_and_wait + yield monitor_log_collector + + +class TestLogCollectorMonitorHandler(AgentTestCase): + + @patch('azurelinuxagent.common.event.EventLogger.add_metric') + @patch("azurelinuxagent.ga.collect_logs.LogCollectorMonitorHandler._poll_resource_usage") + def test_send_extension_metrics_telemetry(self, patch_poll_resource_usage, patch_add_metric): + with _create_log_collector_monitor_handler() as log_collector_monitor_handler: + patch_poll_resource_usage.return_value = [MetricValue("Process", "% Processor Time", "service", 1), + MetricValue("Process", "Throttled Time", "service", 1), + MetricValue("Memory", "Total Memory Usage", "service", 1), + MetricValue("Memory", "Max Memory Usage", "service", 1), + MetricValue("Memory", "Swap Memory Usage", "service", 1) + ] + log_collector_monitor_handler.run_and_wait() + self.assertEqual(1, patch_poll_resource_usage.call_count) + self.assertEqual(5, patch_add_metric.call_count) # Five metrics being sent. + + @patch("os._exit", side_effect=Exception) + @patch("azurelinuxagent.ga.collect_logs.LogCollectorMonitorHandler._poll_resource_usage") + def test_verify_log_collector_memory_limit_exceeded(self, patch_poll_resource_usage, mock_exit): + with _create_log_collector_monitor_handler() as log_collector_monitor_handler: + with patch("azurelinuxagent.common.cgroupconfigurator.LOGCOLLECTOR_MEMORY_LIMIT", 8): + patch_poll_resource_usage.return_value = [MetricValue("Process", "% Processor Time", "service", 1), + MetricValue("Process", "Throttled Time", "service", 1), + MetricValue("Memory", "Total Memory Usage", "service", 9), + MetricValue("Memory", "Max Memory Usage", "service", 7), + MetricValue("Memory", "Swap Memory Usage", "service", 0) + + ] + try: + log_collector_monitor_handler.run_and_wait() + except Exception: + self.assertEqual(mock_exit.call_count, 1)