Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add API for uploading logs via host plugin #1902

Merged
merged 10 commits into from
Aug 4, 2020
2 changes: 2 additions & 0 deletions azurelinuxagent/common/protocol/hostplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ def put_vm_log(self, content):
else:
logger.info("HostGAPlugin: Upload VM logs succeeded")

return response

def put_vm_status(self, status_blob, sas_url, config_blob_type=None):
"""
Try to upload the VM status via the host plugin /status channel
Expand Down
147 changes: 48 additions & 99 deletions azurelinuxagent/common/protocol/wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common.datacontract import validate_param
from azurelinuxagent.common.event import add_event, add_periodic, EVENTS_DIRECTORY
from azurelinuxagent.common.event import report_event, WALAEventOperation
from azurelinuxagent.common.event import add_event, add_periodic, WALAEventOperation, EVENTS_DIRECTORY, EventLogger, \
report_event
from azurelinuxagent.common.exception import ProtocolNotFoundError, \
ResourceGoneError, ExtensionDownloadError, InvalidContainerError, ProtocolError, HttpError
from azurelinuxagent.common.future import httpclient, bytebuffer, ustr
Expand Down Expand Up @@ -879,41 +879,8 @@ def check_wire_protocol_version(self):
"advised by Fabric.").format(PROTOCOL_VERSION)
raise ProtocolNotFoundError(error)

def send_request_using_appropriate_channel(self, direct_func, host_func):
# A wrapper method for all function calls that send HTTP requests. The purpose of the method is to
# define which channel to use, direct or through the host plugin. For the host plugin channel,
# also implement a retry mechanism.

# By default, the direct channel is the default channel. If that is the case, try getting a response
# through that channel. On failure, fall back to the host plugin channel.

# When using the host plugin channel, regardless if it's set as default or not, try sending the request first.
# On specific failures that indicate a stale goal state (such as resource gone or invalid container parameter),
# refresh the goal state and try again. If successful, set the host plugin channel as default. If failed,
# raise the exception.

# NOTE: direct_func and host_func are passed as lambdas. Be careful about capturing goal state data in them as
# they will not be refreshed even if a goal state refresh is called before retrying the host_func.

if not HostPluginProtocol.is_default_channel():
ret = None
try:
ret = direct_func()

# Different direct channel functions report failure in different ways: by returning None, False,
# or raising ResourceGone or InvalidContainer exceptions.
if not ret:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin.")
except (ResourceGoneError, InvalidContainerError) as e:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin. Error: {0}".format(ustr(e)))

if ret:
return ret
else:
logger.periodic_info(logger.EVERY_HALF_DAY, "[PERIODIC] Using host plugin as default channel.")

def call_hostplugin_with_container_check(self, host_func):
pgombar marked this conversation as resolved.
Show resolved Hide resolved
ret = None
try:
ret = host_func()
except (ResourceGoneError, InvalidContainerError) as e:
Expand Down Expand Up @@ -963,6 +930,45 @@ def send_request_using_appropriate_channel(self, direct_func, host_func):
log_event=True)
raise

return ret

def send_request_using_appropriate_channel(self, direct_func, host_func):
# A wrapper method for all function calls that send HTTP requests. The purpose of the method is to
# define which channel to use, direct or through the host plugin. For the host plugin channel,
# also implement a retry mechanism.

# By default, the direct channel is the default channel. If that is the case, try getting a response
# through that channel. On failure, fall back to the host plugin channel.

# When using the host plugin channel, regardless if it's set as default or not, try sending the request first.
# On specific failures that indicate a stale goal state (such as resource gone or invalid container parameter),
# refresh the goal state and try again. If successful, set the host plugin channel as default. If failed,
# raise the exception.

# NOTE: direct_func and host_func are passed as lambdas. Be careful about capturing goal state data in them as
# they will not be refreshed even if a goal state refresh is called before retrying the host_func.

if not HostPluginProtocol.is_default_channel():
ret = None
try:
ret = direct_func()

# Different direct channel functions report failure in different ways: by returning None, False,
# or raising ResourceGone or InvalidContainer exceptions.
if not ret:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin.")
except (ResourceGoneError, InvalidContainerError) as e:
logger.periodic_info(logger.EVERY_HOUR, "[PERIODIC] Request failed using the direct channel, "
"switching to host plugin. Error: {0}".format(ustr(e)))

if ret:
return ret
else:
logger.periodic_info(logger.EVERY_HALF_DAY, "[PERIODIC] Using host plugin as default channel.")

ret = self.call_hostplugin_with_container_check(host_func)

if not HostPluginProtocol.is_default_channel():
logger.info("Setting host plugin as default channel from now on. "
"Restart the agent to reset the default channel.")
Expand Down Expand Up @@ -1227,69 +1233,12 @@ def get_artifacts_profile(self):
return artifacts_profile

def upload_logs(self, content):
try:
host_plugin = self.get_host_plugin()
host_plugin.put_vm_log(content)

msg = "Upload VM logs request succeeded using the host plugin channel with " \
"container id {0} and role config file {1}".format(host_plugin.container_id,
host_plugin.role_config_name)
logger.info(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.HostPlugin,
is_success=True,
message=msg,
log_event=False)
return
except (ResourceGoneError, InvalidContainerError) as e:
host_plugin = self.get_host_plugin()
old_container_id = host_plugin.container_id
old_role_config_name = host_plugin.role_config_name

msg = "Upload VM logs request failed with the current host plugin configuration. " \
"ContainerId: {0}, role config file: {1}. Fetching new goal state and retrying the call." \
"Error: {2}".format(old_container_id, old_role_config_name, ustr(e))
logger.info(msg)

self.update_host_plugin_from_goal_state()

new_container_id = host_plugin.container_id
new_role_config_name = host_plugin.role_config_name
msg = "Host plugin reconfigured with new parameters. " \
"ContainerId: {0}, role config file: {1}.".format(new_container_id, new_role_config_name)
logger.info(msg)
host_func = lambda: self._upload_logs_through_host(content)
return self.call_hostplugin_with_container_check(host_func)

try:
host = self.get_host_plugin()
host.put_vm_log(content)

msg = "Upload VM logs request succeeded using the host plugin channel after goal state refresh. " \
"ContainerId changed from {0} to {1}, " \
"role config file changed from {2} to {3}.".format(old_container_id, new_container_id,
old_role_config_name, new_role_config_name)
logger.info(msg)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.HostPlugin,
is_success=True,
message=msg,
log_event=False)
return
except (ResourceGoneError, InvalidContainerError) as e:
msg = "Upload VM logs request failed using the host plugin channel after goal state refresh. " \
"ContainerId changed from {0} to {1}, role config file changed from {2} to {3}. " \
"Exception type: {4}.".format(old_container_id, new_container_id, old_role_config_name,
new_role_config_name, type(e).__name__)
add_event(name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.HostPlugin,
is_success=False,
message=msg,
log_event=True)
raise
except Exception as e:
raise ProtocolError("Failed to upload logs. Error: {0}".format(ustr(e)))
def _upload_logs_through_host(self, content):
host = self.get_host_plugin()
return host.put_vm_log(content)


class VersionInfo(object):
Expand Down
25 changes: 12 additions & 13 deletions tests/common/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import re
import shutil
import threading
from datetime import datetime, timedelta

import xml.dom
from datetime import datetime, timedelta

import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common import event, logger
Expand All @@ -35,6 +34,8 @@
TELEMETRY_EVENT_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID, TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.telemetryevent import CommonTelemetryEventSchema, GuestAgentGenericLogsSchema, \
GuestAgentExtensionEventsSchema, GuestAgentPerfCounterEventsSchema
from azurelinuxagent.common.version import CURRENT_AGENT, CURRENT_VERSION, AGENT_EXECUTION_MODE
from tests.protocol import mockwiredata
from tests.protocol.mocks import mock_wire_protocol, HttpRequestPredicates, MockHttpResponse
Expand All @@ -54,23 +55,21 @@ def setUp(self):
self.expected_common_parameters = {
# common parameters computed at event creation; the timestamp (stored as the opcode name) is not included
# here and is checked separately from these parameters
'GAVersion': CURRENT_AGENT,
'ContainerId': AgentGlobals.get_container_id(),
'EventTid': threading.current_thread().ident,
'EventPid': os.getpid(),
'TaskName': threading.current_thread().getName(),
'KeywordName': '',
'IsInternal': False,
CommonTelemetryEventSchema.GAVersion: CURRENT_AGENT,
pgombar marked this conversation as resolved.
Show resolved Hide resolved
CommonTelemetryEventSchema.ContainerId: AgentGlobals.get_container_id(),
CommonTelemetryEventSchema.EventTid: threading.current_thread().ident,
CommonTelemetryEventSchema.EventPid: os.getpid(),
CommonTelemetryEventSchema.TaskName: threading.current_thread().getName(),
CommonTelemetryEventSchema.KeywordName: '',
# common parameters computed from the OS platform
CommonTelemetryEventSchema.OSVersion: EventLoggerTools.get_expected_os_version(),
CommonTelemetryEventSchema.ExecutionMode: AGENT_EXECUTION_MODE,
CommonTelemetryEventSchema.RAM: int(osutil.get_total_mem()),
CommonTelemetryEventSchema.Processors: osutil.get_processor_cores(),
# common parameters from the goal state
'VMName': 'MachineRole_IN_0',
'TenantName': 'db00a7755a5e4e8a8fe4b19bc3b330c3',
'RoleName': 'MachineRole',
'RoleInstanceName': 'b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0',
CommonTelemetryEventSchema.TenantName: 'db00a7755a5e4e8a8fe4b19bc3b330c3',
CommonTelemetryEventSchema.RoleName: 'MachineRole',
CommonTelemetryEventSchema.RoleInstanceName: 'b61f93d0-e1ed-40b2-b067-22c243233448.MachineRole_IN_0',
# common parameters
CommonTelemetryEventSchema.Location: EventLoggerTools.mock_imds_data['location'],
CommonTelemetryEventSchema.SubscriptionId: EventLoggerTools.mock_imds_data['subscriptionId'],
Expand Down
31 changes: 27 additions & 4 deletions tests/protocol/test_hostplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,12 +538,11 @@ def test_validate_page_blobs(self):
test_goal_state,
exp_method, exp_url, exp_data)

def test_validate_http_request_when_uploading_logs(self):
def test_validate_http_request_for_put_vm_log(self):
def http_put_handler(url, *args, **kwargs):
if self.is_host_plugin_put_logs_request(url):
http_put_handler.args, http_put_handler.kwargs = args, kwargs
return MockResponse(body=b'', status_code=200)
self.fail('The upload logs request was sent to the wrong url: {0}'.format(url))

http_put_handler.args, http_put_handler.kwargs = [], {}

Expand Down Expand Up @@ -579,8 +578,32 @@ def http_put_handler(url, *args, **kwargs):

# Special check for correlation id header value, check for pattern, not exact value
self.assertTrue("x-ms-client-correlationid" in headers.keys(), "Correlation id not found in headers!")
self.assertTrue(UUID_PATTERN.match(headers["x-ms-client-correlationid"]),
"Correlation id is not in GUID form!")
self.assertTrue(UUID_PATTERN.match(headers["x-ms-client-correlationid"]), "Correlation id is not in GUID form!")

def test_put_vm_log_should_raise_an_exception_when_request_fails(self):
def http_put_handler(url, *args, **kwargs):
if self.is_host_plugin_put_logs_request(url):
http_put_handler.args, http_put_handler.kwargs = args, kwargs
return MockResponse(body=ustr('Gone'), status_code=410)

http_put_handler.args, http_put_handler.kwargs = [], {}

with mock_wire_protocol(DATA_FILE, http_put_handler=http_put_handler) as protocol:
test_goal_state = protocol.client.get_goal_state()

host_client = wire.HostPluginProtocol(wireserver_url,
test_goal_state.container_id,
test_goal_state.role_config_name)

self.assertFalse(host_client.is_initialized, "Host plugin should not be initialized!")

with self.assertRaises(HttpError) as context_manager:
content = b"test"
host_client.put_vm_log(content)

self.assertIsInstance(context_manager.exception, HttpError)
self.assertIn("410", ustr(context_manager.exception))
self.assertIn("Gone", ustr(context_manager.exception))

def test_validate_get_extension_artifacts(self):
with mock_wire_protocol(DATA_FILE) as protocol:
Expand Down
2 changes: 0 additions & 2 deletions tests/protocol/test_wire.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ def test_upload_status_blob_should_use_the_host_channel_by_default(self, *_):
def http_put_handler(url, *_, **__):
if protocol.get_endpoint() in url and url.endswith('/status'):
return MockResponse(body=b'', status_code=200)
self.fail('The upload status request was sent to the wrong url: {0}'.format(url))

with mock_wire_protocol(mockwiredata.DATA_FILE, http_put_handler=http_put_handler) as protocol:
HostPluginProtocol.set_default_channel(False)
Expand Down Expand Up @@ -820,7 +819,6 @@ def test_upload_logs_should_not_refresh_plugin_when_first_attempt_succeeds(self)
def http_put_handler(url, *_, **__):
if self.is_host_plugin_put_logs_request(url):
return MockResponse(body=b'', status_code=200)
self.fail('The upload logs request was sent to the wrong url: {0}'.format(url))

with mock_wire_protocol(mockwiredata.DATA_FILE, http_put_handler=http_put_handler) as protocol:
content = b"test"
Expand Down