Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API for uploading logs via host plugin #1902

Merged
merged 10 commits into from
Aug 4, 2020
88 changes: 67 additions & 21 deletions azurelinuxagent/common/protocol/hostplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,39 @@
import base64
import datetime
import json
import uuid
pgombar marked this conversation as resolved.
Show resolved Hide resolved

from azurelinuxagent.common import logger
from azurelinuxagent.common.errorstate import ErrorState, ERROR_STATE_HOST_PLUGIN_FAILURE
from azurelinuxagent.common.event import WALAEventOperation, add_event
from azurelinuxagent.common.exception import HttpError, ProtocolError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.protocol.healthservice import HealthService
from azurelinuxagent.common.utils import restutil
from azurelinuxagent.common.utils import textutil
from azurelinuxagent.common.utils.textutil import remove_bom
from azurelinuxagent.common.version import PY_VERSION_MAJOR
from azurelinuxagent.common.version import AGENT_NAME, AGENT_VERSION, PY_VERSION_MAJOR

HOST_PLUGIN_PORT = 32526

URI_FORMAT_GET_API_VERSIONS = "http://{0}:{1}/versions"
URI_FORMAT_GET_EXTENSION_ARTIFACT = "http://{0}:{1}/extensionArtifact"
URI_FORMAT_PUT_VM_STATUS = "http://{0}:{1}/status"
URI_FORMAT_PUT_LOG = "http://{0}:{1}/vmAgentLog"
URI_FORMAT_HEALTH = "http://{0}:{1}/health"

API_VERSION = "2015-09-01"
HEADER_CONTAINER_ID = "x-ms-containerid"
HEADER_VERSION = "x-ms-version"
HEADER_HOST_CONFIG_NAME = "x-ms-host-config-name"
HEADER_ARTIFACT_LOCATION = "x-ms-artifact-location"
HEADER_ARTIFACT_MANIFEST_LOCATION = "x-ms-artifact-manifest-location"

_HEADER_CLIENT_NAME = "x-ms-client-name"
_HEADER_CLIENT_VERSION = "x-ms-client-version"
_HEADER_CORRELATION_ID = "x-ms-client-correlationid"
_HEADER_CONTAINER_ID = "x-ms-containerid"
_HEADER_DEPLOYMENT_ID = "x-ms-vmagentlog-deploymentid"
_HEADER_VERSION = "x-ms-version"
_HEADER_HOST_CONFIG_NAME = "x-ms-host-config-name"
_HEADER_ARTIFACT_LOCATION = "x-ms-artifact-location"
_HEADER_ARTIFACT_MANIFEST_LOCATION = "x-ms-artifact-manifest-location"

MAXIMUM_PAGEBLOB_PAGE_SIZE = 4 * 1024 * 1024 # Max page size: 4MB


Expand All @@ -60,7 +70,7 @@ def __init__(self, endpoint, container_id, role_config_name):
self.api_versions = None
self.endpoint = endpoint
self.container_id = container_id
self.deployment_id = None
self.deployment_id = self._extract_deployment_id(role_config_name)
larohra marked this conversation as resolved.
Show resolved Hide resolved
self.role_config_name = role_config_name
self.manifest_uri = None
self.health_service = HealthService(endpoint)
Expand All @@ -69,6 +79,11 @@ def __init__(self, endpoint, container_id, role_config_name):
self.fetch_last_timestamp = None
self.status_last_timestamp = None

@staticmethod
def _extract_deployment_id(role_config_name):
# Role config name consists of: <deployment id>.<incarnation>(...)
return role_config_name.split(".")[0] if role_config_name is not None else None

@staticmethod
def is_default_channel():
return HostPluginProtocol._is_default_channel
Expand All @@ -82,6 +97,7 @@ def update_container_id(self, new_container_id):

def update_role_config_name(self, new_role_config_name):
self.role_config_name = new_role_config_name
self.deployment_id = self._extract_deployment_id(new_role_config_name)

def update_manifest_uri(self, new_manifest_uri):
self.manifest_uri = new_manifest_uri
Expand All @@ -91,9 +107,8 @@ def ensure_initialized(self):
self.api_versions = self.get_api_versions()
self.is_available = API_VERSION in self.api_versions
self.is_initialized = self.is_available
from azurelinuxagent.common.event import WALAEventOperation, report_event
report_event(WALAEventOperation.InitializeHostPlugin,
is_success=self.is_available)
add_event(WALAEventOperation.InitializeHostPlugin,
is_success=self.is_available)
return self.is_available

def get_health(self):
Expand All @@ -116,7 +131,7 @@ def get_api_versions(self):
error_response = ''
is_healthy = False
try:
headers = {HEADER_CONTAINER_ID: self.container_id}
headers = {_HEADER_CONTAINER_ID: self.container_id}
response = restutil.http_get(url, headers)

if restutil.request_failed(response):
Expand All @@ -142,13 +157,13 @@ def get_artifact_request(self, artifact_url, artifact_manifest_url=None):

url = URI_FORMAT_GET_EXTENSION_ARTIFACT.format(self.endpoint,
HOST_PLUGIN_PORT)
headers = {HEADER_VERSION: API_VERSION,
HEADER_CONTAINER_ID: self.container_id,
HEADER_HOST_CONFIG_NAME: self.role_config_name,
HEADER_ARTIFACT_LOCATION: artifact_url}
headers = {_HEADER_VERSION: API_VERSION,
_HEADER_CONTAINER_ID: self.container_id,
_HEADER_HOST_CONFIG_NAME: self.role_config_name,
_HEADER_ARTIFACT_LOCATION: artifact_url}

if artifact_manifest_url is not None:
headers[HEADER_ARTIFACT_MANIFEST_LOCATION] = artifact_manifest_url
headers[_HEADER_ARTIFACT_MANIFEST_LOCATION] = artifact_manifest_url

return url, headers

Expand Down Expand Up @@ -202,7 +217,28 @@ def should_report(is_healthy, error_state, last_timestamp, period):
return datetime.datetime.utcnow() >= (last_timestamp + period)

def put_vm_log(self, content):
raise NotImplementedError("Unimplemented")
"""
Try to upload VM logs, a compressed zip file, via the host plugin /vmAgentLog channel.
:param content: the binary content of the zip file to upload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, so the host ga plugin requires to load the whole file to memory? what is the max size of the zip, again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't figured out a way to not load it. Everything we send using restutil loads the object in-memory. The restriction on the agent side for the uncompressed archive is 150 MB (from my testing on actual log data, this will result in a compressed archive of 13-15 MB). On the host, the restriction is 100 MB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we plan on adding resource limiters to this process/thread, can that potentially break/delay uploading the logs to Host?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it could. However, from the memory point of view, all this is adding is an overhead of ~15 MB (worst case scenario) every hour. Disk I/O could be a bigger problem, but we don't have any plans for limiting that in the near future.

"""
if not self.ensure_initialized():
raise ProtocolError("HostGAPlugin: HostGAPlugin is not available")

if content is None:
raise ProtocolError("HostGAPlugin: Invalid argument passed to upload VM logs. Content was not provided.")

url = URI_FORMAT_PUT_LOG.format(self.endpoint, HOST_PLUGIN_PORT)
response = restutil.http_put(url,
pgombar marked this conversation as resolved.
Show resolved Hide resolved
data=content,
headers=self._build_log_headers())

if restutil.request_failed(response):
error_response = restutil.read_response_error(response)
raise HttpError("HostGAPlugin: Upload VM logs failed: {0}".format(error_response))
else:
logger.info("HostGAPlugin: Upload VM logs succeeded")

return response

def put_vm_status(self, status_blob, sas_url, config_blob_type=None):
"""
Expand Down Expand Up @@ -322,12 +358,22 @@ def _build_status_data(self, sas_url, blob_headers, content=None):

def _build_status_headers(self):
return {
HEADER_VERSION: API_VERSION,
_HEADER_VERSION: API_VERSION,
"Content-type": "application/json",
HEADER_CONTAINER_ID: self.container_id,
HEADER_HOST_CONFIG_NAME: self.role_config_name
_HEADER_CONTAINER_ID: self.container_id,
_HEADER_HOST_CONFIG_NAME: self.role_config_name
}


def _build_log_headers(self):
return {
_HEADER_VERSION: API_VERSION,
_HEADER_CONTAINER_ID: self.container_id,
_HEADER_DEPLOYMENT_ID: self.deployment_id,
_HEADER_CLIENT_NAME: AGENT_NAME,
_HEADER_CLIENT_VERSION: AGENT_VERSION,
_HEADER_CORRELATION_ID: str(uuid.uuid4())
}

def _base64_encode(self, data):
s = base64.b64encode(bytes(data))
if PY_VERSION_MAJOR > 2:
Expand Down
94 changes: 54 additions & 40 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common.datacontract import validate_param
from azurelinuxagent.common.event import add_event, add_periodic, WALAEventOperation, EVENTS_DIRECTORY, EventLogger
from azurelinuxagent.common.event import add_event, add_periodic, WALAEventOperation, EVENTS_DIRECTORY, EventLogger, \
report_event
from azurelinuxagent.common.exception import ProtocolNotFoundError, \
ResourceGoneError, ExtensionDownloadError, InvalidContainerError, ProtocolError, HttpError
from azurelinuxagent.common.future import httpclient, bytebuffer, ustr
Expand Down Expand Up @@ -192,6 +193,9 @@ def report_event(self, events):
validate_param(EVENTS_DIRECTORY, events, TelemetryEventList)
self.client.report_event(events)

def upload_logs(self, logs):
self.client.upload_logs(logs)


def _build_role_properties(container_id, role_instance_id, thumbprint):
xml = (u"<?xml version=\"1.0\" encoding=\"utf-8\"?>"
Expand Down Expand Up @@ -875,41 +879,8 @@ def check_wire_protocol_version(self):
"advised by Fabric.").format(PROTOCOL_VERSION)
raise ProtocolNotFoundError(error)

def send_request_using_appropriate_channel(self, direct_func, host_func):
# A wrapper method for all function calls that send HTTP requests. The purpose of the method is to
# define which channel to use, direct or through the host plugin. For the host plugin channel,
# also implement a retry mechanism.

# By default, the direct channel is the default channel. If that is the case, try getting a response
# through that channel. On failure, fall back to the host plugin channel.

# When using the host plugin channel, regardless if it's set as default or not, try sending the request first.
# On specific failures that indicate a stale goal state (such as resource gone or invalid container parameter),
# refresh the goal state and try again. If successful, set the host plugin channel as default. If failed,
# raise the exception.

# NOTE: direct_func and host_func are passed as lambdas. Be careful about capturing goal state data in them as
# they will not be refreshed even if a goal state refresh is called before retrying the host_func.

if not HostPluginProtocol.is_default_channel():
ret = None
try:
ret = direct_func()

# Different direct channel functions report failure in different ways: by returning None, False,
# or raising ResourceGone or InvalidContainer exceptions.
if not ret:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin.")
except (ResourceGoneError, InvalidContainerError) as e:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin. Error: {0}".format(ustr(e)))

if ret:
return ret
else:
logger.periodic_info(logger.EVERY_HALF_DAY, "[PERIODIC] Using host plugin as default channel.")

def call_hostplugin_with_container_check(self, host_func):
pgombar marked this conversation as resolved.
Show resolved Hide resolved
ret = None
try:
ret = host_func()
except (ResourceGoneError, InvalidContainerError) as e:
Expand Down Expand Up @@ -959,6 +930,45 @@ def send_request_using_appropriate_channel(self, direct_func, host_func):
log_event=True)
raise

return ret

def send_request_using_appropriate_channel(self, direct_func, host_func):
# A wrapper method for all function calls that send HTTP requests. The purpose of the method is to
# define which channel to use, direct or through the host plugin. For the host plugin channel,
# also implement a retry mechanism.

# By default, the direct channel is the default channel. If that is the case, try getting a response
# through that channel. On failure, fall back to the host plugin channel.

# When using the host plugin channel, regardless if it's set as default or not, try sending the request first.
# On specific failures that indicate a stale goal state (such as resource gone or invalid container parameter),
# refresh the goal state and try again. If successful, set the host plugin channel as default. If failed,
# raise the exception.

# NOTE: direct_func and host_func are passed as lambdas. Be careful about capturing goal state data in them as
# they will not be refreshed even if a goal state refresh is called before retrying the host_func.

if not HostPluginProtocol.is_default_channel():
ret = None
try:
ret = direct_func()

# Different direct channel functions report failure in different ways: by returning None, False,
# or raising ResourceGone or InvalidContainer exceptions.
if not ret:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin.")
except (ResourceGoneError, InvalidContainerError) as e:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin. Error: {0}".format(ustr(e)))

if ret:
return ret
else:
logger.periodic_info(logger.EVERY_HALF_DAY, "[PERIODIC] Using host plugin as default channel.")

ret = self.call_hostplugin_with_container_check(host_func)

if not HostPluginProtocol.is_default_channel():
logger.info("Setting host plugin as default channel from now on. "
"Restart the agent to reset the default channel.")
Expand Down Expand Up @@ -1139,9 +1149,6 @@ def report_event(self, event_list):
self.send_encoded_event(provider_id, buf[provider_id])

def report_status_event(self, message, is_success):
from azurelinuxagent.common.event import report_event, \
WALAEventOperation

report_event(op=WALAEventOperation.ReportStatus,
is_success=is_success,
message=message,
Expand Down Expand Up @@ -1218,14 +1225,21 @@ def get_artifacts_profile(self):
msg = "Content: [{0}]".format(profile)
logger.verbose(msg)

from azurelinuxagent.common.event import report_event, WALAEventOperation
report_event(op=WALAEventOperation.ArtifactsProfileBlob,
is_success=False,
message=msg,
log_event=False)

return artifacts_profile

def upload_logs(self, content):
host_func = lambda: self._upload_logs_through_host(content)
return self.call_hostplugin_with_container_check(host_func)

def _upload_logs_through_host(self, content):
host = self.get_host_plugin()
return host.put_vm_log(content)


class VersionInfo(object):
def __init__(self, xml_text):
Expand Down
13 changes: 7 additions & 6 deletions tests/common/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
import threading
import xml.dom
from datetime import datetime, timedelta

import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common import event, logger
from azurelinuxagent.common.AgentGlobals import AgentGlobals
from azurelinuxagent.common.event import add_event, add_periodic, add_log_event, elapsed_milliseconds, report_metric, \
WALAEventOperation, parse_xml_event, parse_json_event, AGENT_EVENT_FILE_EXTENSION, EVENTS_DIRECTORY, \
TELEMETRY_EVENT_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID, TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.telemetryevent import CommonTelemetryEventSchema, GuestAgentGenericLogsSchema, \
GuestAgentExtensionEventsSchema, GuestAgentPerfCounterEventsSchema
from azurelinuxagent.common.version import CURRENT_AGENT, CURRENT_VERSION, AGENT_EXECUTION_MODE
from tests.protocol import mockwiredata
from tests.protocol.mocks import mock_wire_protocol, HttpRequestPredicates, MockHttpResponse
from azurelinuxagent.common.version import CURRENT_AGENT, CURRENT_VERSION, AGENT_EXECUTION_MODE
from azurelinuxagent.common.osutil import get_osutil
from tests.tools import AgentTestCase, data_dir, load_data, Mock, patch, skip_if_predicate_true
from tests.tools import AgentTestCase, data_dir, load_data, patch, skip_if_predicate_true
from tests.utils.event_logger_tools import EventLoggerTools


Expand All @@ -52,8 +53,8 @@ def setUp(self):
osutil = get_osutil()

self.expected_common_parameters = {
# common parameters computed at event creation; the timestamp (stored as the opcode name) is not included here and
# is checked separately from these parameters
# common parameters computed at event creation; the timestamp (stored as the opcode name) is not included
# here and is checked separately from these parameters
CommonTelemetryEventSchema.GAVersion: CURRENT_AGENT,
pgombar marked this conversation as resolved.
Show resolved Hide resolved
CommonTelemetryEventSchema.ContainerId: AgentGlobals.get_container_id(),
CommonTelemetryEventSchema.EventTid: threading.current_thread().ident,
Expand All @@ -68,7 +69,7 @@ def setUp(self):
# common parameters from the goal state
CommonTelemetryEventSchema.TenantName: 'db00a7755a5e4e8a8fe4b19bc3b330c3',
CommonTelemetryEventSchema.RoleName: 'MachineRole',
CommonTelemetryEventSchema.RoleInstanceName: 'MachineRole_IN_0',
CommonTelemetryEventSchema.RoleInstanceName: 'b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0',
# common parameters
CommonTelemetryEventSchema.Location: EventLoggerTools.mock_imds_data['location'],
CommonTelemetryEventSchema.SubscriptionId: EventLoggerTools.mock_imds_data['subscriptionId'],
Expand Down
4 changes: 2 additions & 2 deletions tests/data/wire/goal_state.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
<ContainerId>c6d5526c-5ac2-4200-b6e2-56f2b70c5ab2</ContainerId>
<RoleInstanceList>
<RoleInstance>
<InstanceId>MachineRole_IN_0</InstanceId>
<InstanceId>b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0</InstanceId>
larohra marked this conversation as resolved.
Show resolved Hide resolved
<State>Started</State>
<Configuration>
<HostingEnvironmentConfig>http://168.63.129.16:80/hostingenvuri/</HostingEnvironmentConfig>
<SharedConfig>http://168.63.129.16:80/sharedconfiguri/</SharedConfig>
<Certificates>http://168.63.129.16:80/certificatesuri/</Certificates>
<ExtensionsConfig>http://168.63.129.16:80/extensionsconfiguri/</ExtensionsConfig>
<FullConfig>http://168.63.129.16:80/fullconfiguri/</FullConfig>
<ConfigName>DummyRoleConfigName.xml</ConfigName>
<ConfigName>b61f93d0-e1ed-40b2-b067-22c243233448.1.b61f93d0-e1ed-40b2-b067-22c243233448.2.MachineRole_IN_0.xml</ConfigName>
</Configuration>
</RoleInstance>
</RoleInstanceList>
Expand Down
3 changes: 2 additions & 1 deletion tests/data/wire/goal_state_no_ext.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
<ContainerId>c6d5526c-5ac2-4200-b6e2-56f2b70c5ab2</ContainerId>
<RoleInstanceList>
<RoleInstance>
<InstanceId>MachineRole_IN_0</InstanceId>
<InstanceId>b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0</InstanceId>
<State>Started</State>
<Configuration>
<HostingEnvironmentConfig>http://168.63.129.16:80/hostingenvuri/</HostingEnvironmentConfig>
<SharedConfig>http://168.63.129.16:80/sharedconfiguri/</SharedConfig>
<Certificates>http://168.63.129.16:80/certificatesuri/</Certificates>
<FullConfig>http://168.63.129.16:80/fullconfiguri/</FullConfig>
<ConfigName>b61f93d0-e1ed-40b2-b067-22c243233448.1.b61f93d0-e1ed-40b2-b067-22c243233448.2.MachineRole_IN_0.xml</ConfigName>
</Configuration>
</RoleInstance>
</RoleInstanceList>
Expand Down
Loading