diff --git a/tests_e2e/orchestrator/runbook.yml b/tests_e2e/orchestrator/runbook.yml index 016f79546e..3492e9c80c 100644 --- a/tests_e2e/orchestrator/runbook.yml +++ b/tests_e2e/orchestrator/runbook.yml @@ -51,7 +51,7 @@ variable: # # The test suites to execute - name: test_suites - value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall" + value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall, ext_telemetry_pipeline" - name: cloud value: "AzureCloud" is_case_visible: true diff --git a/tests_e2e/test_suites/ext_telemetry_pipeline.yml b/tests_e2e/test_suites/ext_telemetry_pipeline.yml new file mode 100644 index 0000000000..f309f5cb8a --- /dev/null +++ b/tests_e2e/test_suites/ext_telemetry_pipeline.yml @@ -0,0 +1,9 @@ +# +# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not +# validate if the events actually make it to wireserver +# +name: "ExtTelemetryPipeline" +tests: + - "agent_bvt/vm_access.py" + - "ext_telemetry_pipeline/ext_telemetry_pipeline.py" +images: "random(endorsed)" diff --git a/tests_e2e/tests/agent_bvt/vm_access.py b/tests_e2e/tests/agent_bvt/vm_access.py index 7983d41479..9b52ac2453 100755 --- a/tests_e2e/tests/agent_bvt/vm_access.py +++ b/tests_e2e/tests/agent_bvt/vm_access.py @@ -39,8 +39,8 @@ class VmAccessBvt(AgentTest): def run(self): ssh: SshClient = self._context.create_ssh_client() - if "-flatcar" in ssh.run_command("uname -a"): - raise TestSkipped("Currently VMAccess is not supported on Flatcar") + if not VmExtensionIds.VmAccess.supports_distro(ssh.run_command("uname -a")): + raise TestSkipped("Currently VMAccess is not supported on this distro") # Try to use a unique username for each test run (note that we truncate to 32 chars to # comply with the rules for usernames) diff --git a/tests_e2e/tests/ext_telemetry_pipeline/ext_telemetry_pipeline.py b/tests_e2e/tests/ext_telemetry_pipeline/ext_telemetry_pipeline.py new file mode 100755 index 0000000000..de051485ad --- /dev/null +++ b/tests_e2e/tests/ext_telemetry_pipeline/ext_telemetry_pipeline.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python3 + +# Microsoft Azure Linux Agent +# +# Copyright 2018 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not +# validate if the events actually make it to wireserver +# TODO: Update this test suite to verify that the agent picks up AND sends telemetry produced by extensions +# (work item https://dev.azure.com/msazure/One/_workitems/edit/24903999) +# + +import random +from typing import List, Dict, Any + +from azurelinuxagent.common.conf import get_etp_collection_period + +from tests_e2e.tests.lib.agent_test import AgentTest +from tests_e2e.tests.lib.identifiers import VmExtensionIds +from tests_e2e.tests.lib.logging import log +from tests_e2e.tests.lib.ssh_client import SshClient +from tests_e2e.tests.lib.virtual_machine_extension_client import VirtualMachineExtensionClient + + +class ExtTelemetryPipeline(AgentTest): + def run(self): + ssh_client: SshClient = self._context.create_ssh_client() + + # Extensions we will create events for + extensions = ["Microsoft.Azure.Extensions.CustomScript"] + if VmExtensionIds.VmAccess.supports_distro(ssh_client.run_command("uname -a")): + extensions.append("Microsoft.OSTCExtensions.VMAccessForLinux") + + # Set the etp collection period to 30 seconds instead of default 5 minutes + default_collection_period = get_etp_collection_period() + log.info("") + log.info("Set ETP collection period to 30 seconds on the test VM [%s]", self._context.vm.name) + output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod=30", use_sudo=True) + log.info("Updated waagent conf with Debug.ETPCollectionPeriod=30 completed:\n%s", output) + + # Add CSE to the test VM twice to ensure its events directory still exists after re-enabling + log.info("") + log.info("Add CSE to the test VM...") + cse = VirtualMachineExtensionClient(self._context.vm, VmExtensionIds.CustomScript, resource_name="CustomScript") + cse.enable(settings={'commandToExecute': "echo 'enable'"}) + cse.assert_instance_view() + + log.info("") + log.info("Add CSE to the test VM again...") + cse.enable(settings={'commandToExecute': "echo 'enable again'"}) + cse.assert_instance_view() + + # Check agent log to verify ETP is enabled + command = "agent_ext_workflow-check_data_in_agent_log.py --data 'Extension Telemetry pipeline enabled: True'" + log.info("") + log.info("Check agent log to verify ETP is enabled...") + log.info("Remote command [%s] completed:\n%s", command, ssh_client.run_command(command)) + + # Add good extension events for each extension and check that the TelemetryEventsCollector collects them + # TODO: Update test suite to check that the agent is picking up the events generated by the extension, instead + # of generating on the extensions' behalf + # (work item - https://dev.azure.com/msazure/One/_workitems/edit/24903999) + log.info("") + log.info("Add good extension events and check they are reported...") + max_events = random.randint(10, 50) + self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py " + f"--extensions {','.join(extensions)} " + f"--num_events_total {max_events}", use_sudo=True) + log.info("") + log.info("Good extension events were successfully reported.") + + # Add invalid events for each extension and check that the TelemetryEventsCollector drops them + log.info("") + log.info("Add bad extension events and check they are reported...") + self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py " + f"--extensions {','.join(extensions)} " + f"--num_events_total {max_events} " + f"--num_events_bad {random.randint(5, max_events-5)}", use_sudo=True) + log.info("") + log.info("Bad extension events were successfully dropped.") + + # Reset the etp collection period to the default value so this VM can be shared with other suites + log.info("") + log.info("Reset ETP collection period to {0} seconds on the test VM [{1}]".format(default_collection_period, self._context.vm.name)) + output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod={0}".format(default_collection_period), use_sudo=True) + log.info("Updated waagent conf with default collection period completed:\n%s", output) + + def get_ignore_error_rules(self) -> List[Dict[str, Any]]: + return [ + {'message': r"Dropped events for Extension.*"} + ] + + +if __name__ == "__main__": + ExtTelemetryPipeline.run_from_command_line() diff --git a/tests_e2e/tests/lib/identifiers.py b/tests_e2e/tests/lib/identifiers.py index 7bb067a835..45af22745f 100644 --- a/tests_e2e/tests/lib/identifiers.py +++ b/tests_e2e/tests/lib/identifiers.py @@ -15,6 +15,8 @@ # limitations under the License. # +from typing import Dict, List + class VmIdentifier(object): def __init__(self, cloud: str, location: str, subscription: str, resource_group: str, name: str): @@ -45,6 +47,19 @@ def __init__(self, publisher: str, ext_type: str, version: str): self.type: str = ext_type self.version: str = version + unsupported_distros: Dict[str, List[str]] = { + "Microsoft.OSTCExtensions.VMAccessForLinux": ["flatcar"] + } + + def supports_distro(self, system_info: str) -> bool: + """ + Returns true if an unsupported distro name for the extension is found in the provided system info + """ + ext_unsupported_distros = VmExtensionIdentifier.unsupported_distros.get(self.publisher + "." + self.type) + if ext_unsupported_distros is not None and any(distro in system_info for distro in ext_unsupported_distros): + return False + return True + def __str__(self): return f"{self.publisher}.{self.type}" diff --git a/tests_e2e/tests/scripts/ext_telemetry_pipeline-add_extension_events.py b/tests_e2e/tests/scripts/ext_telemetry_pipeline-add_extension_events.py new file mode 100755 index 0000000000..2e5776c714 --- /dev/null +++ b/tests_e2e/tests/scripts/ext_telemetry_pipeline-add_extension_events.py @@ -0,0 +1,224 @@ +#!/usr/bin/env pypy3 + +# Microsoft Azure Linux Agent +# +# Copyright 2018 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Adds extension events for each provided extension and verifies the TelemetryEventsCollector collected or dropped them +# + +import argparse +import json +import os +import sys +import time +import uuid + +from assertpy import fail +from datetime import datetime, timedelta +from random import choice +from typing import List + +from tests_e2e.tests.lib.agent_log import AgentLog +from tests_e2e.tests.lib.logging import log + + +def add_extension_events(extensions: List[str], bad_event_count=0, no_of_events_per_extension=50): + def missing_key(bad_event): + key = choice(list(bad_event.keys())) + del bad_event[key] + return "MissingKeyError: {0}".format(key) + + def oversize_error(bad_event): + bad_event["EventLevel"] = "ThisIsAnOversizeError\n" * 300 + return "OversizeEventError" + + def empty_message(bad_event): + bad_event["Message"] = "" + return "EmptyMessageError" + + errors = [ + missing_key, + oversize_error, + empty_message + ] + + sample_ext_event = { + "EventLevel": "INFO", + "Message": "Starting IaaS ScriptHandler Extension v1", + "Version": "1.0", + "TaskName": "Extension Info", + "EventPid": "3228", + "EventTid": "1", + "OperationId": "519e4beb-018a-4bd9-8d8e-c5226cf7f56e", + "TimeStamp": "2019-12-12T01:20:05.0950244Z" + } + + sample_messages = [ + "Starting IaaS ScriptHandler Extension v1", + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.", + "The quick brown fox jumps over the lazy dog", + "Cursus risus at ultrices mi.", + "Doing Something", + "Iaculis eu non diam phasellus.", + "Doing other thing", + "Look ma, lemons", + "Pretium quam vulputate dignissim suspendisse.", + "Man this is insane", + "I wish it worked as it should and not as it ain't", + "Ut faucibus pulvinar elementum integer enim neque volutpat ac tincidunt." + "Did you get any of that?", + "Non-English message - 此文字不是英文的" + "κόσμε", + "�", + "Quizdeltagerne spiste jordbær med fløde, mens cirkusklovnen Wolther spillede på xylofon.", + "Falsches Üben von Xylophonmusik quält jeden größeren Zwerg", + "Zwölf Boxkämpfer jagten Eva quer über den Sylter Deich", + "Heizölrückstoßabdämpfung", + "Γαζέες καὶ μυρτιὲς δὲν θὰ βρῶ πιὰ στὸ χρυσαφὶ ξέφωτο", + "Ξεσκεπάζω τὴν ψυχοφθόρα βδελυγμία", + "El pingüino Wenceslao hizo kilómetros bajo exhaustiva lluvia y frío, añoraba a su querido cachorro.", + "Portez ce vieux whisky au juge blond qui fume sur son île intérieure, à côté de l'alcôve ovoïde, où les bûches", + "se consument dans l'âtre, ce qui lui permet de penser à la cænogenèse de l'être dont il est question", + "dans la cause ambiguë entendue à Moÿ, dans un capharnaüm qui, pense-t-il, diminue çà et là la qualité de son œuvre.", + "D'fhuascail Íosa, Úrmhac na hÓighe Beannaithe, pór Éava agus Ádhaimh", + "Árvíztűrő tükörfúrógép", + "Kæmi ný öxi hér ykist þjófum nú bæði víl og ádrepa", + "Sævör grét áðan því úlpan var ónýt", + "いろはにほへとちりぬるを わかよたれそつねならむ うゐのおくやまけふこえて あさきゆめみしゑひもせす", + "イロハニホヘト チリヌルヲ ワカヨタレソ ツネナラム ウヰノオクヤマ ケフコエテ アサキユメミシ ヱヒモセスン", + "? דג סקרן שט בים מאוכזב ולפתע מצא לו חברה איך הקליטה" + "Pchnąć w tę łódź jeża lub ośm skrzyń fig", + "В чащах юга жил бы цитрус? Да, но фальшивый экземпляр!", + "๏ เป็นมนุษย์สุดประเสริฐเลิศคุณค่า กว่าบรรดาฝูงสัตว์เดรัจฉาน", + "Pijamalı hasta, yağız şoföre çabucak güvendi." + ] + + for ext in extensions: + bad_count = bad_event_count + event_dir = os.path.join("/var/log/azure/", ext, "events") + if not os.path.isdir(event_dir): + fail(f"Expected events dir: {event_dir} does not exist") + + log.info("") + log.info("Expected dir: {0} exists".format(event_dir)) + log.info("Creating random extension events for {0}. No of Good Events: {1}, No of Bad Events: {2}".format( + ext, no_of_events_per_extension - bad_event_count, bad_event_count)) + + new_opr_id = str(uuid.uuid4()) + event_list = [] + + for _ in range(no_of_events_per_extension): + event = sample_ext_event.copy() + event["OperationId"] = new_opr_id + event["TimeStamp"] = datetime.utcnow().strftime(u'%Y-%m-%dT%H:%M:%S.%fZ') + event["Message"] = choice(sample_messages) + + if bad_count != 0: + # Make this event a bad event + reason = choice(errors)(event) + bad_count -= 1 + + # Missing key error might delete the TaskName key from the event + if "TaskName" in event: + event["TaskName"] = "{0}. This is a bad event: {1}".format(event["TaskName"], reason) + else: + event["EventLevel"] = "{0}. This is a bad event: {1}".format(event["EventLevel"], reason) + + event_list.append(event) + + file_name = os.path.join(event_dir, '{0}.json'.format(int(time.time() * 1000000))) + log.info("Create json with extension events in event directory: {0}".format(file_name)) + with open("{0}.tmp".format(file_name), 'w+') as f: + json.dump(event_list, f) + os.rename("{0}.tmp".format(file_name), file_name) + + +def wait_for_extension_events_dir_empty(extensions: List[str]): + # By ensuring events dir to be empty, we verify that the telemetry events collector has completed its run + start_time = datetime.now() + timeout = timedelta(minutes=2) + ext_event_dirs = [os.path.join("/var/log/azure/", ext, "events") for ext in extensions] + + while (start_time + timeout) >= datetime.now(): + log.info("") + log.info("Waiting for extension event directories to be empty...") + all_dir_empty = True + for event_dir in ext_event_dirs: + if not os.path.exists(event_dir) or len(os.listdir(event_dir)) != 0: + log.info("Dir: {0} is not yet empty".format(event_dir)) + all_dir_empty = False + + if all_dir_empty: + log.info("Extension event directories are empty: \n{0}".format(ext_event_dirs)) + return + + time.sleep(20) + + fail("Extension events dir not empty before 2 minute timeout") + + +def main(): + # This test is a best effort test to ensure that the agent does not throw any errors while trying to transmit + # events to wireserver. We're not validating if the events actually make it to wireserver. + + parser = argparse.ArgumentParser() + parser.add_argument("--extensions", dest='extensions', type=str, required=True) + parser.add_argument("--num_events_total", dest='num_events_total', type=int, required=True) + parser.add_argument("--num_events_bad", dest='num_events_bad', type=int, required=False, default=0) + args, _ = parser.parse_known_args() + + extensions = args.extensions.split(',') + add_extension_events(extensions=extensions, bad_event_count=args.num_events_bad, + no_of_events_per_extension=args.num_events_total) + + # Ensure that the event collector ran after adding the events + wait_for_extension_events_dir_empty(extensions=extensions) + + # Sleep for a min to ensure that the TelemetryService has enough time to send events and report errors if any + time.sleep(60) + found_error = False + agent_log = AgentLog() + + log.info("") + log.info("Check that the TelemetryEventsCollector did not emit any errors while collecting and reporting events...") + telemetry_event_collector_name = "TelemetryEventsCollector" + for agent_record in agent_log.read(): + if agent_record.thread == telemetry_event_collector_name and agent_record.level == "ERROR": + found_error = True + log.info("waagent.log contains the following errors emitted by the {0} thread: \n{1}".format(telemetry_event_collector_name, agent_record)) + + if found_error: + fail("Found error(s) emitted by the TelemetryEventsCollector, but none were expected.") + log.info("The TelemetryEventsCollector did not emit any errors while collecting and reporting events") + + for ext in extensions: + good_count = args.num_events_total - args.num_events_bad + log.info("") + if not agent_log.agent_log_contains("Collected {0} events for extension: {1}".format(good_count, ext)): + fail("The TelemetryEventsCollector did not collect the expected number of events: {0} for {1}".format(good_count, ext)) + log.info("All {0} good events for {1} were collected by the TelemetryEventsCollector".format(good_count, ext)) + + if args.num_events_bad != 0: + log.info("") + if not agent_log.agent_log_contains("Dropped events for Extension: {0}".format(ext)): + fail("The TelemetryEventsCollector did not drop bad events for {0} as expected".format(ext)) + log.info("The TelemetryEventsCollector dropped bad events for {0} as expected".format(ext)) + + sys.exit(0) + + +if __name__ == "__main__": + main()