Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extension telemetry pipeline scenario #2901

Merged
merged 42 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
da72c37
Update version to dummy 1.0.0.0'
maddieford Nov 8, 2022
59dbd22
Revert version change
maddieford Nov 8, 2022
633a826
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Nov 21, 2022
14a743f
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Dec 8, 2022
54ea0f3
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Jan 10, 2023
e79c4c5
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Feb 8, 2023
498b612
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Feb 14, 2023
1e269f4
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Mar 13, 2023
7b49e76
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Mar 24, 2023
0a426cc
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Apr 6, 2023
17fbf6a
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Apr 7, 2023
995cbb9
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Apr 13, 2023
eaadc83
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Apr 24, 2023
fb03e07
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Apr 27, 2023
6a8e0d6
Merge remote-tracking branch 'upstream/develop' into develop
maddieford May 19, 2023
b4951c8
Merge branch 'develop' of github.com:Azure/WALinuxAgent into develop
maddieford Jun 6, 2023
c6d9300
Merge branch 'develop' of github.com:maddieford/WALinuxAgent into dev…
maddieford Jun 23, 2023
f650fe4
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Jul 10, 2023
a10bdfa
Merge branch 'develop' of github.com:maddieford/WALinuxAgent into dev…
maddieford Jul 10, 2023
50dcec5
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Jul 18, 2023
b87db37
merge changes
maddieford Jul 20, 2023
56e84c1
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Jul 25, 2023
fd5d0f5
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Aug 2, 2023
b04fd81
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Aug 14, 2023
20d9a34
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Aug 15, 2023
d55f499
Merge remote-tracking branch 'upstream/develop' into develop
maddieford Aug 16, 2023
af3e211
Barebones for etp
maddieford Aug 17, 2023
854d26d
Scenario should own VM because of conf change
maddieford Aug 19, 2023
0042de6
Add extension telemetry pipeline test
maddieford Aug 20, 2023
7dabf43
Clean up code
maddieford Aug 20, 2023
d3e0bcf
Improve log messages
maddieford Aug 21, 2023
d515c4f
Fix pylint errors
maddieford Aug 21, 2023
f4725a9
Improve logging
maddieford Aug 21, 2023
a9748e8
Improve code comments
maddieford Aug 21, 2023
6678977
VmAccess is not supported on flatcar
maddieford Aug 21, 2023
f78eec6
Merge branch 'develop' into etp_scenario
maddieford Aug 21, 2023
3627b2e
Address PR comments
maddieford Aug 22, 2023
9b6952b
Add support_distros in VmExtensionIdentifier
maddieford Aug 22, 2023
bf43a65
Fix logic for support_distros in VmExtensionIdentifier
maddieford Aug 22, 2023
1ba6f44
Merge branch 'develop' into etp_scenario
maddieford Aug 22, 2023
73ca28d
Merge branch 'develop' into etp_scenario
maddieford Aug 23, 2023
4bf9c7a
Use run_remote_test for remote script
maddieford Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests_e2e/orchestrator/runbook.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ variable:
#
# The test suites to execute
- name: test_suites
value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall"
value: "agent_bvt, no_outbound_connections, extensions_disabled, agent_not_provisioned, fips, agent_ext_workflow, agent_update, agent_status, multi_config_ext, agent_cgroups, ext_cgroups, agent_firewall, ext_telemetry_pipeline"
- name: cloud
value: "AzureCloud"
is_case_visible: true
Expand Down
9 changes: 9 additions & 0 deletions tests_e2e/test_suites/ext_telemetry_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#
# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not
# validate if the events actually make it to wireserver
#
name: "ExtTelemetryPipeline"
tests:
- "agent_bvt/vm_access.py"
- "ext_telemetry_pipeline/ext_telemetry_pipeline.py"
images: "random(endorsed)"
4 changes: 2 additions & 2 deletions tests_e2e/tests/agent_bvt/vm_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
class VmAccessBvt(AgentTest):
def run(self):
ssh: SshClient = self._context.create_ssh_client()
if "-flatcar" in ssh.run_command("uname -a"):
raise TestSkipped("Currently VMAccess is not supported on Flatcar")
if not VmExtensionIds.VmAccess.supports_distro(ssh.run_command("uname -a")):
raise TestSkipped("Currently VMAccess is not supported on this distro")

# Try to use a unique username for each test run (note that we truncate to 32 chars to
# comply with the rules for usernames)
Expand Down
109 changes: 109 additions & 0 deletions tests_e2e/tests/ext_telemetry_pipeline/ext_telemetry_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python3

# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

#
# This test ensures that the agent does not throw any errors while trying to transmit events to wireserver. It does not
# validate if the events actually make it to wireserver
# TODO: Update this test suite to verify that the agent picks up AND sends telemetry produced by extensions
# (work item https://dev.azure.com/msazure/One/_workitems/edit/24903999)
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention of the test should actually be to verify that the agent picks up and sends telemetry produced by extensions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may end up adding this as a TODO and come back to it once I finish migrating my other scenarios. Right now this test takes the current dcr scenario behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a todo with work item


import random
from typing import List, Dict, Any

from azurelinuxagent.common.conf import get_etp_collection_period

from tests_e2e.tests.lib.agent_test import AgentTest
from tests_e2e.tests.lib.identifiers import VmExtensionIds
from tests_e2e.tests.lib.logging import log
from tests_e2e.tests.lib.ssh_client import SshClient
from tests_e2e.tests.lib.virtual_machine_extension_client import VirtualMachineExtensionClient


class ExtTelemetryPipeline(AgentTest):
def run(self):
ssh_client: SshClient = self._context.create_ssh_client()

# Extensions we will create events for
extensions = ["Microsoft.Azure.Extensions.CustomScript"]
if VmExtensionIds.VmAccess.supports_distro(ssh_client.run_command("uname -a")):
extensions.append("Microsoft.OSTCExtensions.VMAccessForLinux")

# Set the etp collection period to 30 seconds instead of default 5 minutes
default_collection_period = get_etp_collection_period()
log.info("")
log.info("Set ETP collection period to 30 seconds on the test VM [%s]", self._context.vm.name)
output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod=30", use_sudo=True)
log.info("Updated waagent conf with Debug.ETPCollectionPeriod=30 completed:\n%s", output)

# Add CSE to the test VM twice to ensure its events directory still exists after re-enabling
log.info("")
log.info("Add CSE to the test VM...")
cse = VirtualMachineExtensionClient(self._context.vm, VmExtensionIds.CustomScript, resource_name="CustomScript")
cse.enable(settings={'commandToExecute': "echo 'enable'"})
cse.assert_instance_view()

log.info("")
log.info("Add CSE to the test VM again...")
cse.enable(settings={'commandToExecute': "echo 'enable again'"})
cse.assert_instance_view()

# Check agent log to verify ETP is enabled
command = "agent_ext_workflow-check_data_in_agent_log.py --data 'Extension Telemetry pipeline enabled: True'"
log.info("")
log.info("Check agent log to verify ETP is enabled...")
log.info("Remote command [%s] completed:\n%s", command, ssh_client.run_command(command))

# Add good extension events for each extension and check that the TelemetryEventsCollector collects them
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than the test creating events on behalf of the extension, it should check that the agent is picking up the events generated by the extension

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. might add this as a TODO and come back to it once I finish migrating my other scenarios. Right now this test takes the current dcr scenario behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added todo with work item

# TODO: Update test suite to check that the agent is picking up the events generated by the extension, instead
# of generating on the extensions' behalf
# (work item - https://dev.azure.com/msazure/One/_workitems/edit/24903999)
log.info("")
log.info("Add good extension events and check they are reported...")
max_events = random.randint(10, 50)
self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py "
f"--extensions {','.join(extensions)} "
f"--num_events_total {max_events}", use_sudo=True)
log.info("")
log.info("Good extension events were successfully reported.")

# Add invalid events for each extension and check that the TelemetryEventsCollector drops them
log.info("")
log.info("Add bad extension events and check they are reported...")
self._run_remote_test(f"ext_telemetry_pipeline-add_extension_events.py "
f"--extensions {','.join(extensions)} "
f"--num_events_total {max_events} "
f"--num_events_bad {random.randint(5, max_events-5)}", use_sudo=True)
log.info("")
log.info("Bad extension events were successfully dropped.")

# Reset the etp collection period to the default value so this VM can be shared with other suites
log.info("")
log.info("Reset ETP collection period to {0} seconds on the test VM [{1}]".format(default_collection_period, self._context.vm.name))
output = ssh_client.run_command("update-waagent-conf Debug.EtpCollectionPeriod={0}".format(default_collection_period), use_sudo=True)
log.info("Updated waagent conf with default collection period completed:\n%s", output)

def get_ignore_error_rules(self) -> List[Dict[str, Any]]:
return [
{'message': r"Dropped events for Extension.*"}
]


if __name__ == "__main__":
ExtTelemetryPipeline.run_from_command_line()
15 changes: 15 additions & 0 deletions tests_e2e/tests/lib/identifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

from typing import Dict, List


class VmIdentifier(object):
def __init__(self, cloud: str, location: str, subscription: str, resource_group: str, name: str):
Expand Down Expand Up @@ -45,6 +47,19 @@ def __init__(self, publisher: str, ext_type: str, version: str):
self.type: str = ext_type
self.version: str = version

unsupported_distros: Dict[str, List[str]] = {
"Microsoft.OSTCExtensions.VMAccessForLinux": ["flatcar"]
}

def supports_distro(self, system_info: str) -> bool:
"""
Returns true if an unsupported distro name for the extension is found in the provided system info
"""
ext_unsupported_distros = VmExtensionIdentifier.unsupported_distros.get(self.publisher + "." + self.type)
if ext_unsupported_distros is not None and any(distro in system_info for distro in ext_unsupported_distros):
return False
return True

def __str__(self):
return f"{self.publisher}.{self.type}"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#!/usr/bin/env pypy3

# Microsoft Azure Linux Agent
#
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Adds extension events for each provided extension and verifies the TelemetryEventsCollector collected or dropped them
#

import argparse
import json
import os
import sys
import time
import uuid

from assertpy import fail
from datetime import datetime, timedelta
from random import choice
from typing import List

from tests_e2e.tests.lib.agent_log import AgentLog
from tests_e2e.tests.lib.logging import log


def add_extension_events(extensions: List[str], bad_event_count=0, no_of_events_per_extension=50):
def missing_key(bad_event):
key = choice(list(bad_event.keys()))
del bad_event[key]
return "MissingKeyError: {0}".format(key)

def oversize_error(bad_event):
bad_event["EventLevel"] = "ThisIsAnOversizeError\n" * 300
return "OversizeEventError"

def empty_message(bad_event):
bad_event["Message"] = ""
return "EmptyMessageError"

errors = [
missing_key,
oversize_error,
empty_message
]

sample_ext_event = {
"EventLevel": "INFO",
"Message": "Starting IaaS ScriptHandler Extension v1",
"Version": "1.0",
"TaskName": "Extension Info",
"EventPid": "3228",
"EventTid": "1",
"OperationId": "519e4beb-018a-4bd9-8d8e-c5226cf7f56e",
"TimeStamp": "2019-12-12T01:20:05.0950244Z"
}

sample_messages = [
"Starting IaaS ScriptHandler Extension v1",
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.",
"The quick brown fox jumps over the lazy dog",
"Cursus risus at ultrices mi.",
"Doing Something",
"Iaculis eu non diam phasellus.",
"Doing other thing",
"Look ma, lemons",
"Pretium quam vulputate dignissim suspendisse.",
"Man this is insane",
"I wish it worked as it should and not as it ain't",
"Ut faucibus pulvinar elementum integer enim neque volutpat ac tincidunt."
"Did you get any of that?",
"Non-English message - 此文字不是英文的"
"κόσμε",
"�",
"Quizdeltagerne spiste jordbær med fløde, mens cirkusklovnen Wolther spillede på xylofon.",
"Falsches Üben von Xylophonmusik quält jeden größeren Zwerg",
"Zwölf Boxkämpfer jagten Eva quer über den Sylter Deich",
"Heizölrückstoßabdämpfung",
"Γαζέες καὶ μυρτιὲς δὲν θὰ βρῶ πιὰ στὸ χρυσαφὶ ξέφωτο",
"Ξεσκεπάζω τὴν ψυχοφθόρα βδελυγμία",
"El pingüino Wenceslao hizo kilómetros bajo exhaustiva lluvia y frío, añoraba a su querido cachorro.",
"Portez ce vieux whisky au juge blond qui fume sur son île intérieure, à côté de l'alcôve ovoïde, où les bûches",
"se consument dans l'âtre, ce qui lui permet de penser à la cænogenèse de l'être dont il est question",
"dans la cause ambiguë entendue à Moÿ, dans un capharnaüm qui, pense-t-il, diminue çà et là la qualité de son œuvre.",
"D'fhuascail Íosa, Úrmhac na hÓighe Beannaithe, pór Éava agus Ádhaimh",
"Árvíztűrő tükörfúrógép",
"Kæmi ný öxi hér ykist þjófum nú bæði víl og ádrepa",
"Sævör grét áðan því úlpan var ónýt",
"いろはにほへとちりぬるを わかよたれそつねならむ うゐのおくやまけふこえて あさきゆめみしゑひもせす",
"イロハニホヘト チリヌルヲ ワカヨタレソ ツネナラム ウヰノオクヤマ ケフコエテ アサキユメミシ ヱヒモセスン",
"? דג סקרן שט בים מאוכזב ולפתע מצא לו חברה איך הקליטה"
"Pchnąć w tę łódź jeża lub ośm skrzyń fig",
"В чащах юга жил бы цитрус? Да, но фальшивый экземпляр!",
"๏ เป็นมนุษย์สุดประเสริฐเลิศคุณค่า กว่าบรรดาฝูงสัตว์เดรัจฉาน",
"Pijamalı hasta, yağız şoföre çabucak güvendi."
]

for ext in extensions:
bad_count = bad_event_count
event_dir = os.path.join("/var/log/azure/", ext, "events")
if not os.path.isdir(event_dir):
fail(f"Expected events dir: {event_dir} does not exist")

log.info("")
log.info("Expected dir: {0} exists".format(event_dir))
log.info("Creating random extension events for {0}. No of Good Events: {1}, No of Bad Events: {2}".format(
ext, no_of_events_per_extension - bad_event_count, bad_event_count))

new_opr_id = str(uuid.uuid4())
event_list = []

for _ in range(no_of_events_per_extension):
event = sample_ext_event.copy()
event["OperationId"] = new_opr_id
event["TimeStamp"] = datetime.utcnow().strftime(u'%Y-%m-%dT%H:%M:%S.%fZ')
event["Message"] = choice(sample_messages)

if bad_count != 0:
# Make this event a bad event
reason = choice(errors)(event)
bad_count -= 1

# Missing key error might delete the TaskName key from the event
if "TaskName" in event:
event["TaskName"] = "{0}. This is a bad event: {1}".format(event["TaskName"], reason)
else:
event["EventLevel"] = "{0}. This is a bad event: {1}".format(event["EventLevel"], reason)

event_list.append(event)

file_name = os.path.join(event_dir, '{0}.json'.format(int(time.time() * 1000000)))
log.info("Create json with extension events in event directory: {0}".format(file_name))
with open("{0}.tmp".format(file_name), 'w+') as f:
json.dump(event_list, f)
os.rename("{0}.tmp".format(file_name), file_name)


def wait_for_extension_events_dir_empty(extensions: List[str]):
# By ensuring events dir to be empty, we verify that the telemetry events collector has completed its run
start_time = datetime.now()
timeout = timedelta(minutes=2)
ext_event_dirs = [os.path.join("/var/log/azure/", ext, "events") for ext in extensions]

while (start_time + timeout) >= datetime.now():
log.info("")
log.info("Waiting for extension event directories to be empty...")
all_dir_empty = True
for event_dir in ext_event_dirs:
if not os.path.exists(event_dir) or len(os.listdir(event_dir)) != 0:
log.info("Dir: {0} is not yet empty".format(event_dir))
all_dir_empty = False

if all_dir_empty:
log.info("Extension event directories are empty: \n{0}".format(ext_event_dirs))
return

time.sleep(20)

fail("Extension events dir not empty before 2 minute timeout")


def main():
# This test is a best effort test to ensure that the agent does not throw any errors while trying to transmit
# events to wireserver. We're not validating if the events actually make it to wireserver.

parser = argparse.ArgumentParser()
parser.add_argument("--extensions", dest='extensions', type=str, required=True)
parser.add_argument("--num_events_total", dest='num_events_total', type=int, required=True)
parser.add_argument("--num_events_bad", dest='num_events_bad', type=int, required=False, default=0)
args, _ = parser.parse_known_args()

extensions = args.extensions.split(',')
add_extension_events(extensions=extensions, bad_event_count=args.num_events_bad,
no_of_events_per_extension=args.num_events_total)

# Ensure that the event collector ran after adding the events
wait_for_extension_events_dir_empty(extensions=extensions)

# Sleep for a min to ensure that the TelemetryService has enough time to send events and report errors if any
time.sleep(60)
found_error = False
agent_log = AgentLog()

log.info("")
log.info("Check that the TelemetryEventsCollector did not emit any errors while collecting and reporting events...")
telemetry_event_collector_name = "TelemetryEventsCollector"
for agent_record in agent_log.read():
if agent_record.thread == telemetry_event_collector_name and agent_record.level == "ERROR":
found_error = True
log.info("waagent.log contains the following errors emitted by the {0} thread: \n{1}".format(telemetry_event_collector_name, agent_record))

if found_error:
fail("Found error(s) emitted by the TelemetryEventsCollector, but none were expected.")
log.info("The TelemetryEventsCollector did not emit any errors while collecting and reporting events")

for ext in extensions:
good_count = args.num_events_total - args.num_events_bad
log.info("")
if not agent_log.agent_log_contains("Collected {0} events for extension: {1}".format(good_count, ext)):
fail("The TelemetryEventsCollector did not collect the expected number of events: {0} for {1}".format(good_count, ext))
log.info("All {0} good events for {1} were collected by the TelemetryEventsCollector".format(good_count, ext))

if args.num_events_bad != 0:
log.info("")
if not agent_log.agent_log_contains("Dropped events for Extension: {0}".format(ext)):
fail("The TelemetryEventsCollector did not drop bad events for {0} as expected".format(ext))
log.info("The TelemetryEventsCollector dropped bad events for {0} as expected".format(ext))

sys.exit(0)


if __name__ == "__main__":
main()
Loading