Skip to content

Commit

Permalink
Merge branch 'develop' into vameru/refactor-default-fields-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
vrdmr committed Nov 16, 2019
2 parents 7eeeae8 + 5ef3e86 commit 3ae9e05
Show file tree
Hide file tree
Showing 19 changed files with 650 additions and 247 deletions.
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# Virtualenv
py3env/
Expand All @@ -27,6 +28,7 @@ var/

# PyCharm
.idea/
.idea_modules/

# PyInstaller
# Usually these files are written by a python script from a template
Expand Down Expand Up @@ -72,3 +74,21 @@ bin/waagent2.0c

# mac osx specific files
.DS_Store

### VirtualEnv template
# Virtualenv
# http://iamzed.com/2009/05/07/a-primer-on-virtualenv/
.Python
pyvenv.cfg
.venv
pip-selfcheck.json

# virtualenv
venv/
ENV/

# dotenv
.env

# pyenv
.python-version
70 changes: 52 additions & 18 deletions azurelinuxagent/common/cgroupstelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# Requires Python 2.6+ and Openssl 1.0+
import errno
import threading
from collections import namedtuple
from datetime import datetime as dt

from azurelinuxagent.common import logger
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.exception import CGroupsException
from azurelinuxagent.common.future import ustr


MetricValue = namedtuple('Metric', ['category', 'counter', 'instance', 'value'])


class CGroupsTelemetry(object):
Expand Down Expand Up @@ -93,6 +97,16 @@ def stop_tracking(cgroup):

@staticmethod
def report_all_tracked():
"""
The report_all_tracked's purpose is to collect the data from the tracked cgroups and process the metric into a
data structure by _process_cgroup_metric. The perf metric is added into the data structure and returned to the
caller.
The report_all_tracked would be removed soon - in favor of sending report_metric directly, when polling the data
from tracked groups.
:return collected_metrics: dictionary of cgroups metrics.
"""
collected_metrics = {}

for name, cgroup_metrics in CGroupsTelemetry._cgroup_metrics.items():
Expand All @@ -112,18 +126,43 @@ def report_all_tracked():

@staticmethod
def poll_all_tracked():
metrics = []

with CGroupsTelemetry._rlock:
for cgroup in CGroupsTelemetry._tracked[:]:

# noinspection PyBroadException
if cgroup.name not in CGroupsTelemetry._cgroup_metrics:
CGroupsTelemetry._cgroup_metrics[cgroup.name] = CgroupMetrics()

CGroupsTelemetry._cgroup_metrics[cgroup.name].collect_data(cgroup)

try:
if cgroup.controller == "cpu":
current_cpu_usage = cgroup.get_cpu_usage()
CGroupsTelemetry._cgroup_metrics[cgroup.name].add_cpu_usage(current_cpu_usage)
metrics.append(MetricValue("Process", "% Processor Time", cgroup.name, current_cpu_usage))
elif cgroup.controller == "memory":
current_memory_usage = cgroup.get_memory_usage()
CGroupsTelemetry._cgroup_metrics[cgroup.name].add_memory_usage(current_memory_usage)
metrics.append(MetricValue("Memory", "Total Memory Usage", cgroup.name, current_memory_usage))

max_memory_usage = cgroup.get_max_memory_usage()
CGroupsTelemetry._cgroup_metrics[cgroup.name].add_max_memory_usage(max_memory_usage)
metrics.append(MetricValue("Memory", "Max Memory Usage", cgroup.name, max_memory_usage))
else:
raise CGroupsException('CGroup controller {0} is not supported for cgroup {1}'.format(
cgroup.controller, cgroup.name))
except Exception as e:
# There can be scenarios when the CGroup has been deleted by the time we are fetching the values
# from it. This would raise IOError with file entry not found (ERRNO: 2). We do not want to log
# every occurrences of such case as it would be very verbose. We do want to log all the other
# exceptions which could occur, which is why we do a periodic log for all the other errors.
if not isinstance(e, (IOError, OSError)) or e.errno != errno.ENOENT:
logger.periodic_warn(logger.EVERY_HOUR, '[PERIODIC] Could not collect metrics for cgroup '
'{0}. Error : {1}'.format(cgroup.name, ustr(e)))
if not cgroup.is_active():
CGroupsTelemetry.stop_tracking(cgroup)
CGroupsTelemetry._cgroup_metrics[cgroup.name].marked_for_delete = True

return metrics

@staticmethod
def prune_all_tracked():
with CGroupsTelemetry._rlock:
Expand All @@ -145,19 +184,14 @@ def __init__(self):
self._cpu_usage = Metric()
self.marked_for_delete = False

def collect_data(self, cgroup):
# noinspection PyBroadException
try:
if cgroup.controller == "cpu":
self._cpu_usage.append(cgroup.get_cpu_usage())
elif cgroup.controller == "memory":
self._memory_usage.append(cgroup.get_memory_usage())
self._max_memory_usage.append(cgroup.get_max_memory_usage())
else:
raise CGroupsException('CGroup controller {0} is not supported'.format(controller))
except Exception as e:
if not isinstance(e, (IOError, OSError)) or e.errno != errno.ENOENT:
logger.periodic_warn(logger.EVERY_HALF_HOUR, 'Could not collect metrics for cgroup {0}. Error : {1}'.format(cgroup.path, ustr(e)))
def add_memory_usage(self, usage):
self._memory_usage.append(usage)

def add_max_memory_usage(self, usage):
self._max_memory_usage.append(usage)

def add_cpu_usage(self, usage):
self._cpu_usage.append(usage)

def get_memory_usage(self):
return self._memory_usage
Expand Down
32 changes: 19 additions & 13 deletions azurelinuxagent/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

_EVENT_MSG = "Event: name={0}, op={1}, message={2}, duration={3}"
TELEMETRY_EVENT_PROVIDER_ID = "69B669B9-4AF8-4C50-BDC4-6006FA76E975"
TELEMETRY_METRICS_EVENT_ID = 4

# Store the last retrieved container id as an environment variable to be shared between threads for telemetry purposes
CONTAINER_ID_ENV_VARIABLE = "AZURE_GUEST_AGENT_CONTAINER_ID"
Expand Down Expand Up @@ -271,14 +272,15 @@ def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, durati

def _add_event(self, duration, evt_type, is_internal, is_success, message, name, op, version, event_id):
event = TelemetryEvent(event_id, TELEMETRY_EVENT_PROVIDER_ID)
event.parameters.append(TelemetryEventParam('Name', name))

event.parameters.append(TelemetryEventParam('Name', str(name)))
event.parameters.append(TelemetryEventParam('Version', str(version)))
event.parameters.append(TelemetryEventParam('IsInternal', is_internal))
event.parameters.append(TelemetryEventParam('Operation', op))
event.parameters.append(TelemetryEventParam('OperationSuccess', is_success))
event.parameters.append(TelemetryEventParam('Message', message))
event.parameters.append(TelemetryEventParam('Duration', duration))
event.parameters.append(TelemetryEventParam('ExtensionType', evt_type))
event.parameters.append(TelemetryEventParam('IsInternal', bool(is_internal)))
event.parameters.append(TelemetryEventParam('Operation', str(op)))
event.parameters.append(TelemetryEventParam('OperationSuccess', bool(is_success)))
event.parameters.append(TelemetryEventParam('Message', str(message)))
event.parameters.append(TelemetryEventParam('Duration', int(duration)))
event.parameters.append(TelemetryEventParam('ExtensionType', str(evt_type)))

event.parameters = self.add_default_parameters_to_event(event.parameters)
data = get_properties(event)
Expand Down Expand Up @@ -329,11 +331,11 @@ def add_metric(self, category, counter, instance, value, log_event=False):
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
_log_event(AGENT_NAME, "METRIC", message, 0)

event = TelemetryEvent(4, "69B669B9-4AF8-4C50-BDC4-6006FA76E975")
event.parameters.append(TelemetryEventParam('Category', category))
event.parameters.append(TelemetryEventParam('Counter', counter))
event.parameters.append(TelemetryEventParam('Instance', instance))
event.parameters.append(TelemetryEventParam('Value', value))
event = TelemetryEvent(TELEMETRY_METRICS_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
event.parameters.append(TelemetryEventParam('Category', str(category)))
event.parameters.append(TelemetryEventParam('Counter', str(counter)))
event.parameters.append(TelemetryEventParam('Instance', str(instance)))
event.parameters.append(TelemetryEventParam('Value', float(value)))

event.parameters = self.add_default_parameters_to_event(event.parameters)
data = get_properties(event)
Expand Down Expand Up @@ -438,7 +440,11 @@ def report_metric(category, counter, instance, value, log_event=False, reporter=
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
_log_event(AGENT_NAME, "METRIC", message, 0)
return
reporter.add_metric(category, counter, instance, value, log_event)
try:
reporter.add_metric(category, counter, instance, float(value), log_event)
except ValueError:
logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - "
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))


def add_event(name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION), message="",
Expand Down
6 changes: 3 additions & 3 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,11 @@ def get_current_handlers(self):

def get_ext_conf(self):
if self.ext_conf is None:
goal_state = self.get_goal_state()
if goal_state.ext_uri is None:
local_goal_state = self.get_goal_state()
if local_goal_state.ext_uri is None:
self.ext_conf = ExtensionsConfig(None)
else:
local_file = EXT_CONF_FILE_NAME.format(goal_state.incarnation)
local_file = EXT_CONF_FILE_NAME.format(local_goal_state.incarnation)
local_file = os.path.join(conf.get_lib_dir(), local_file)
xml_text = self.fetch_cache(local_file)
self.ext_conf = ExtensionsConfig(xml_text)
Expand Down
33 changes: 23 additions & 10 deletions azurelinuxagent/ga/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,22 @@
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.networkutil as networkutil

from azurelinuxagent.common.cgroupstelemetry import CGroupsTelemetry
from azurelinuxagent.common.datacontract import set_properties
from azurelinuxagent.common.errorstate import ErrorState
from azurelinuxagent.common.event import add_event, WALAEventOperation, CONTAINER_ID_ENV_VARIABLE, \
get_container_id_from_env, EventLogger
from azurelinuxagent.common.event import EventLogger
from azurelinuxagent.common.event import add_event, WALAEventOperation, report_metric
from azurelinuxagent.common.exception import EventError, ProtocolError, OSUtilError, HttpError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.protocol import get_protocol_util
from azurelinuxagent.common.protocol.healthservice import HealthService
from azurelinuxagent.common.protocol.imds import get_imds_client
from azurelinuxagent.common.telemetryevent import TelemetryEvent, TelemetryEventParam, TelemetryEventList
from azurelinuxagent.common.datacontract import set_properties
from azurelinuxagent.common.utils.restutil import IOErrorCounter
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib, hash_strings
from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \
DISTRO_CODE_NAME, AGENT_NAME, CURRENT_AGENT, CURRENT_VERSION, AGENT_EXECUTION_MODE
DISTRO_CODE_NAME, AGENT_NAME, CURRENT_VERSION, AGENT_EXECUTION_MODE


def parse_event(data_str):
Expand Down Expand Up @@ -178,8 +177,8 @@ def init_sysinfo(self):
try:
ram = self.osutil.get_total_mem()
processors = self.osutil.get_processor_cores()
self.sysinfo.append(TelemetryEventParam("RAM", ram))
self.sysinfo.append(TelemetryEventParam("Processors", processors))
self.sysinfo.append(TelemetryEventParam("RAM", int(ram)))
self.sysinfo.append(TelemetryEventParam("Processors", int(processors)))
except OSUtilError as e:
logger.warn("Failed to get system info: {0}", ustr(e))

Expand Down Expand Up @@ -207,7 +206,7 @@ def init_sysinfo(self):
self.sysinfo.append(TelemetryEventParam('VMId',
vminfo.vmId))
self.sysinfo.append(TelemetryEventParam('ImageOrigin',
vminfo.image_origin))
int(vminfo.image_origin)))
except (HttpError, ValueError) as e:
logger.warn("failed to get IMDS info: {0}", ustr(e))

Expand Down Expand Up @@ -281,7 +280,7 @@ def daemon(self):
while self.should_run:
self.send_telemetry_heartbeat()
self.poll_telemetry_metrics()
self.send_telemetry_metrics()
self.send_telemetry_metrics() # This will be removed in favor of poll_telemetry_metrics() and it'll directly send the perf data for each cgroup.
self.collect_and_send_events()
self.send_host_plugin_heartbeat()
self.send_imds_heartbeat()
Expand Down Expand Up @@ -456,16 +455,30 @@ def send_telemetry_heartbeat(self):
self.last_telemetry_heartbeat = datetime.datetime.utcnow()

def poll_telemetry_metrics(self):
"""
This method polls the tracked cgroups to get data from the cgroups filesystem and send the data directly.
:return:
"""
time_now = datetime.datetime.utcnow()
if not self.last_cgroup_polling_telemetry:
self.last_cgroup_polling_telemetry = time_now

if time_now >= (self.last_cgroup_polling_telemetry +
MonitorHandler.CGROUP_TELEMETRY_POLLING_PERIOD):
CGroupsTelemetry.poll_all_tracked()
metrics = CGroupsTelemetry.poll_all_tracked()
self.last_cgroup_polling_telemetry = time_now

if metrics:
for metric in metrics:
report_metric(metric.category, metric.counter, metric.instance, metric.value)

def send_telemetry_metrics(self):
"""
The send_telemetry_metrics would soon be removed in favor of sending performance metrics directly.
:return:
"""
time_now = datetime.datetime.utcnow()

if not self.last_cgroup_report_telemetry:
Expand Down
Loading

0 comments on commit 3ae9e05

Please sign in to comment.