From abdc4bd3d29fb3a54bfff95c43ee57cea3d8af71 Mon Sep 17 00:00:00 2001 From: Brett Holman Date: Wed, 21 Feb 2024 16:31:40 -0700 Subject: [PATCH] Requested changes, implement integration tests Make ftps:// only support ftp over TLS Make ftp:// only support unencrypted ftp Add integration tests that assert the following: ftp:// against an ftp-only server succeeds at retrieving configs (doesn't use TLS) ftp:// against an ftps-only server fails (doesn't use TLS) ftps:// against an ftps-only server succeeds at retrieving configs (uses TLS) ftps:// against an ftp-only server succeeds (cannot use TLS) --- cloudinit/sources/DataSourceNoCloud.py | 11 + cloudinit/url_helper.py | 194 +++++------ cloudinit/util.py | 11 +- .../datasources/test_nocloud.py | 309 +++++++++++++----- .../test_kernel_commandline_match.py | 59 +--- tests/integration_tests/util.py | 191 ++++++++++- 6 files changed, 540 insertions(+), 235 deletions(-) diff --git a/cloudinit/sources/DataSourceNoCloud.py b/cloudinit/sources/DataSourceNoCloud.py index ed018d0b01f4..a453ca3a720d 100644 --- a/cloudinit/sources/DataSourceNoCloud.py +++ b/cloudinit/sources/DataSourceNoCloud.py @@ -393,3 +393,14 @@ def ds_detect(self): # Return a list of data sources that match this set of dependencies def get_datasource_list(depends): return sources.list_from_depends(depends, datasources) + + +if __name__ == "__main__": + from sys import argv + + logging.basicConfig(level=logging.DEBUG) + seedfrom = argv[1] + md_seed, ud, vd = util.read_seeded(seedfrom) + print(f"seeded: {md_seed}") + print(f"ud: {ud}") + print(f"vd: {vd}") diff --git a/cloudinit/url_helper.py b/cloudinit/url_helper.py index 6ea8d1541590..335b0842ebc9 100644 --- a/cloudinit/url_helper.py +++ b/cloudinit/url_helper.py @@ -61,35 +61,45 @@ def combine_single(url, add_on): return url -def read_ftps(url: str, timeout: Optional[float] = None) -> "FtpResponse": +def ftp_get_return_code_from_exception(exc) -> int: + """helper for read_ftps to map return codes to a number""" + # ftplib doesn't expose error codes, so use this lookup table + ftp_error_codes = { + ftplib.error_reply: 300, # unexpected [123]xx reply + ftplib.error_temp: 400, # 4xx errors + ftplib.error_perm: 500, # 5xx errors + ftplib.error_proto: 600, # response does not begin with [1-5] + EOFError: 700, # made up + # OSError is also possible. Use OSError.errno for that. + } + code = ftp_error_codes.get(type(exc)) # pyright: ignore + if not code: + if isinstance(exc, OSError): + code = exc.errno + else: + LOG.warning( + "Unexpected exception type while connecting to ftp server." + ) + code = -99 + return code + + +def read_ftps(url: str, timeout: float = 5.0, **kwargs: dict) -> "FtpResponse": """connect to URL using ftp over TLS and read a file when using strict mode (ftps://), raise exception in event of failure when not using strict mode (ftp://), fall back to using unencrypted ftp + url: string containing the desination to read a file from. The url is + parsed with urllib.urlsplit to identify username, password, host, + path, and port in the following format: + ftps://[username:password@]host[:port]/[path] + host is the only required component + timeout: maximum time for the connection to take + kwargs: unused, for compatibility with read_url returns: UrlResponse """ - def get_return_code_from_exception(exc): - # ftplib doesn't expose error codes, so use this lookup table - ftp_error_codes = { - ftplib.error_reply: 300, # unexpected [123]xx reply - ftplib.error_temp: 400, # 4xx errors - ftplib.error_perm: 500, # 5xx errors - ftplib.error_proto: 600, # response does not begin with [1-5] - EOFError: 700, # made up - # OSError is also possible. Use OSError.errno for that. - } - code = ftp_error_codes.get(type(exc)) # pyright: ignore - if not code: - if isinstance(exc, OSError): - code = exc.errno - else: - LOG.warning( - "Unexpected exception type while connecting to ftp server." - ) - return code - url_parts = urlsplit(url) if not url_parts.hostname: raise UrlError( @@ -98,86 +108,86 @@ def get_return_code_from_exception(exc): with io.BytesIO() as buffer: port = url_parts.port or 21 user = url_parts.username or "anonymous" - try: - ftp_tls = ftplib.FTP_TLS( - context=create_default_context(), - ) - LOG.debug( - "Attempting to connect to %s via port [%s] over tls.", - url, port - ) - ftp_tls.connect( - host=url_parts.hostname, - port=port, - timeout=timeout or 0.0, # Python docs are wrong about types - ) - LOG.debug("Attempting to login with user [%s]", user) - ftp_tls.login( - user=user, - passwd=url_parts.password or "", - ) - LOG.debug("Creating a secure connection", user) - ftp_tls.prot_p() - LOG.debug("Reading file: %s", url_parts.path) - ftp_tls.retrbinary(f"RETR {url_parts.path}", callback=buffer.write) - response = FtpResponse(url_parts.path, contents=buffer) - LOG.debug("Closing connection", url_parts.path) - ftp_tls.close() - return response - except ftplib.all_errors as e: - code = get_return_code_from_exception(e), - if "ftps" == url_parts.scheme: + if "ftps" == url_parts.scheme: + try: + ftp_tls = ftplib.FTP_TLS( + context=create_default_context(), + ) + LOG.debug( + "Attempting to connect to %s via port [%s] over tls.", + url, + port, + ) + ftp_tls.connect( + host=url_parts.hostname, + port=port, + timeout=timeout or 5.0, # uses float internally + ) + LOG.debug("Attempting to login with user [%s]", user) + ftp_tls.login( + user=user, + passwd=url_parts.password or "", + ) + LOG.debug("Creating a secure connection") + ftp_tls.prot_p() + LOG.debug("Reading file: %s", url_parts.path) + ftp_tls.retrbinary( + f"RETR {url_parts.path}", callback=buffer.write + ) + response = FtpResponse(url_parts.path, contents=buffer) + LOG.debug("Closing connection") + ftp_tls.close() + return response + except ftplib.all_errors as e: + code = ftp_get_return_code_from_exception(e) raise UrlError( cause=( - "Connecting to ftp server over tls " + "Reading file from server over tls " f"failed for url {url} [{code}]" ), code=code, headers=None, url=url, ) from e - LOG.info( - "Connecting to ftp server over tls " - "failed for url %s [%s]", url, code - ) - try: - LOG.debug( - "Couldn't connect to %s over tls. Strict mode not " - "required (using protocol 'ftp://' not 'ftps://'), so falling " - "back to ftp. Use 'ftps://' to prevent unencrypted ftp.", - url_parts.hostname, - ) + else: + try: + ftp = ftplib.FTP() + LOG.debug( + "Attempting to connect to %s via port %s.", url, port) + ftp.connect( + host=url_parts.hostname, + port=port, + timeout=timeout or 5.0, # uses float internally + ) + LOG.debug("Attempting to login with user [%s]", user) + ftp.login( + user=user, + passwd=url_parts.password or "", + ) + LOG.debug("Reading file: %s", url_parts.path) + ftp.retrbinary(f"RETR {url_parts.path}", callback=buffer.write) + response = FtpResponse(url_parts.path, contents=buffer) + LOG.debug("Closing connection") + ftp.close() + return response + except ftplib.all_errors as e: + code=ftp_get_return_code_from_exception(e), + raise UrlError( + cause=( + "Reading file from ftp server" + f" failed for url {url} [{code}]" + ), + code=code, + headers=None, + url=url, + ) from e - ftp = ftplib.FTP() - LOG.debug("Attempting to connect to %s via port %s.", url, port) - ftp.connect( - host=url_parts.hostname, - port=port, - timeout=timeout or 0.0, # Python docs are wrong about types - ) - LOG.debug("Attempting to login with user [%s]", user) - ftp.login( - user=user, - passwd=url_parts.password or "", - ) - LOG.debug("Reading file: %s", url_parts.path) - ftp.retrbinary(f"RETR {url_parts.path}", callback=buffer.write) - response = FtpResponse(url_parts.path, contents=buffer) - LOG.debug("Closing connection", url_parts.path) - ftp.close() - return response - except ftplib.all_errors as e: - raise UrlError( - cause=( - f"Connecting to ftp server failed for url {url} [{code}]" - ), - code=get_return_code_from_exception(e), - headers=None, - url=url, - ) from e +def _read_file(path: str, **kwargs) -> "FileResponse": + """read a binary file and return a FileResponse -def read_file(path: str, **kwargs) -> "FileResponse": + matches function signature with read_ftps and read_url + """ if kwargs.get("data"): LOG.warning("Unable to post data to file resource %s", path) try: @@ -203,9 +213,9 @@ def read_file_or_url( parsed = urlparse(url) scheme = parsed.scheme if scheme == "file" or (url and "/" == url[0]): - return read_file(parsed.path, **kwargs) + return _read_file(parsed.path, **kwargs) elif scheme in ("ftp", "ftps"): - return read_ftps(url) + return read_ftps(url, **kwargs) elif scheme in ("http", "https"): return readurl(url, **kwargs) else: diff --git a/cloudinit/util.py b/cloudinit/util.py index 6b0acb5b3b87..15e229c20e6f 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -940,13 +940,14 @@ def del_dir(path): shutil.rmtree(path) -# read_optional_seed -# returns boolean indicating success or failure (presense of files) -# if files are present, populates 'fill' dictionary with 'user-data' and -# 'meta-data' entries def read_optional_seed(fill, base="", ext="", timeout=5): + """ + returns boolean indicating success or failure (presense of files) + if files are present, populates 'fill' dictionary with 'user-data' and + 'meta-data' entries + """ try: - (md, ud, vd) = read_seeded(base, ext, timeout) + md, ud, vd = read_seeded(base=base, ext=ext, timeout=timeout) fill["user-data"] = ud fill["vendor-data"] = vd fill["meta-data"] = md diff --git a/tests/integration_tests/datasources/test_nocloud.py b/tests/integration_tests/datasources/test_nocloud.py index 90c8ac1f68f6..c0535157d076 100644 --- a/tests/integration_tests/datasources/test_nocloud.py +++ b/tests/integration_tests/datasources/test_nocloud.py @@ -1,15 +1,16 @@ """NoCloud datasource integration tests.""" -import os from textwrap import dedent import pytest +from pycloudlib.lxd.instance import LXDInstance from cloudinit.subp import subp -from pycloudlib.lxd.instance import LXDInstance from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.integration_settings import PLATFORM -from tests.integration_tests.test_kernel_commandline_match import ( +from tests.integration_tests.releases import CURRENT_RELEASE, FOCAL +from tests.integration_tests.util import ( override_kernel_cmdline, + verify_clean_boot, ) VENDOR_DATA = """\ @@ -98,91 +99,233 @@ def test_nocloud_seedfrom_vendordata(client: IntegrationInstance): @pytest.mark.skipif(PLATFORM != "lxd_vm", reason="Modifies grub config") @pytest.mark.lxd_use_exec -def test_nocloud_ftp(client: IntegrationInstance): - # creating an ftp service to run prior - # to cloud-config is bonkers, lets - # hope that users don't see this kind of - # test code as an example to follow +class TestFTP: + """Test nocloud's support for unencrypted FTP and FTP over TLS (ftps). - client.execute("apt update && apt install -yq python3-pyftpdlib") + These tests work by setting up a local ftp server on the test instance + and then rebooting the instance clean (cloud-init clean --logs --reboot). - client.write_to_file( - "/server.py", - dedent( - """\ - #!/usr/bin/python3 - from pyftpdlib.authorizers import DummyAuthorizer - from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler - from pyftpdlib.servers import FTPServer - from pyftpdlib.filesystems import UnixFilesystem - - # yeah, it's not secure but that's not the point - authorizer = DummyAuthorizer() - - # Define a read-only anonymous user - authorizer.add_anonymous("/home/anonymous") - - # Instantiate FTP handler class - handler = FTPHandler - handler.authorizer = authorizer - handler.abstracted_fs = UnixFilesystem - server = FTPServer(("localhost", 2121), handler) - - # start the ftp server - server.serve_forever() - """ - ), - ) - client.execute("chmod +x /server.py") - client.write_to_file( - "/lib/systemd/system/local-ftp.service", - dedent( - """\ - [Unit] - Description=run a local ftp server against - Wants=cloud-init-local.service - DefaultDependencies=no - - # we want the network up for network operations - # and NoCloud operates in network timeframe - After=systemd-networkd-wait-online.service - After=networking.service - Before=cloud-init.service - - [Service] - Type=exec - ExecStart=/server.py - - [Install] - WantedBy=cloud-init.target - """ + Check for the existence (or non-existence) of specific log messages to + verify functionality. + """ + + # should we really be surfacing this netplan stderr as a warning? + # i.e. how does it affect the users? + expected_warnings = [ + "Falling back to a hard restart of systemd-networkd.service" + ] + + @staticmethod + def _boot_with_cmdline( + cmdline: str, client: IntegrationInstance, encrypted: bool = False + ) -> None: + """configure an ftp server to start prior to network timeframe + optionally install certs and make the server support only FTP over TLS + + cmdline: a string containing the kernel commandline set on reboot + client: an instance to configure + encrypted: a boolean which modifies the configured ftp server + """ + + # install the essential bits + assert client.execute( + "apt update && apt install -yq python3-pyftpdlib " + "python3-openssl ca-certificates libnss3-tools" + ).ok + + # How do you reliably run a ftp server for your instance to + # read files from during early boot? In typical production + # environments, the ftp server would be separate from the instance. + # + # For a reliable server that fits with the framework of running tests + # on a single instance, it is easier to just install an ftp server + # that runs on the second boot prior to the cloud-init unit which + # reaches out to the ftp server. This achieves reaching out to an + # ftp(s) server for testing - cloud-init just doesn't have to reach + # very far to get what it needs. + # + # DO NOT use these concepts in a production. + # + # This configuration is neither secure nor production-grade - intended + # only for testing purposes. + client.write_to_file( + "/server.py", + dedent( + """\ + #!/usr/bin/python3 + import logging + + from pyftpdlib.authorizers import DummyAuthorizer + from pyftpdlib.handlers import FTPHandler, TLS_FTPHandler + from pyftpdlib.servers import FTPServer + from pyftpdlib.filesystems import UnixFilesystem + + encrypted = """ + + str(encrypted) + + """ + + logging.basicConfig(level=logging.DEBUG) + + # yeah, it's not secure but that's not the point + authorizer = DummyAuthorizer() + + # Define a read-only anonymous user + authorizer.add_anonymous("/home/anonymous") + + # Instantiate FTP handler class + if not encrypted: + handler = FTPHandler + logging.info("Running unencrypted ftp server") + else: + handler = TLS_FTPHandler + handler.certfile = "/cert.pem" + handler.keyfile = "/key.pem" + logging.info("Running encrypted ftp server") + + handler.authorizer = authorizer + handler.abstracted_fs = UnixFilesystem + server = FTPServer(("localhost", 2121), handler) + + # start the ftp server + server.serve_forever() + """ + ), ) - ) - client.execute("chmod 644 /lib/systemd/system/local-ftp.service") - client.execute("systemctl enable local-ftp.service") + assert client.execute("chmod +x /server.py").ok - client.execute("mkdir /home/anonymous/") - client.write_to_file( - "/home/anonymous/user-data", - dedent( - """ - #cloud-config + if encrypted: + if CURRENT_RELEASE > FOCAL: + assert client.execute("apt install -yq mkcert").ok + else: + + # install golang + assert client.execute("apt install -yq golang").ok - hostname: ftp-bootstrapper - """ + # build mkcert from source + # + # we could check out a tag, but the project hasn't + # been updated in 2 years + # + # instructions from https://github.com/FiloSottile/mkcert + assert client.execute( + "git clone https://github.com/FiloSottile/mkcert && " + "cd mkcert && " + "go build -ldflags " + '"-X main.Version=$(git describe --tags)"' + ).ok + + # giddyup + assert client.execute( + "ln -s $HOME/mkcert/mkcert /usr/local/bin/mkcert" + ).ok + + # more palatable than openssl commands + assert client.execute( + "mkcert -install -cert-file /cert.pem -key-file /key.pem " + "localhost 127.0.0.1 0.0.0.0 ::1" + ).ok + + client.write_to_file( + "/lib/systemd/system/local-ftp.service", + dedent( + """\ + [Unit] + Description=run a local ftp server against + Wants=cloud-init-local.service + DefaultDependencies=no + + # we want the network up for network operations + # and NoCloud operates in network timeframe + After=systemd-networkd-wait-online.service + After=networking.service + Before=cloud-init.service + + [Service] + Type=exec + ExecStart=/server.py + + [Install] + WantedBy=cloud-init.target + """ + ), ) - ) - client.write_to_file( - "/home/anonymous/meta-data", - dedent( - """ - instance-id: ftp-instance - """ + assert client.execute( + "chmod 644 /lib/systemd/system/local-ftp.service" + ).ok + assert client.execute("systemctl enable local-ftp.service").ok + assert client.execute("mkdir /home/anonymous").ok + + client.write_to_file( + "/user-data", + dedent( + """\ + #cloud-config + + hostname: ftp-bootstrapper + """ + ), ) - ) - client.write_to_file("/home/anonymous/vendor-data", "") + client.write_to_file( + "/meta-data", + dedent( + """\ + instance-id: ftp-instance + """ + ), + ) + client.write_to_file("/vendor-data", "\n") - # set the kernel commandline, reboot with it - override_kernel_cmdline( - "ds=nocloud;seedfrom=ftp://0.0.0.0:2121", client - ) + # set the kernel commandline, reboot with it + override_kernel_cmdline(cmdline, client) + + def test_nocloud_ftp_unencrypted_server_succeeds( + self, client: IntegrationInstance + ): + """check that ftp:// succeeds to unencrypted ftp server + + this mode allows administrators to choose unencrypted ftp, + at their own risk + """ + cmdline = "ds=nocloud;seedfrom=ftp://0.0.0.0:2121" + self._boot_with_cmdline(cmdline, client) + verify_clean_boot(client, ignore_warnings=self.expected_warnings) + + def test_nocloud_ftps_unencrypted_server_fails( + self, client: IntegrationInstance + ): + """check that ftps:// fails to unencrypted ftp server + + this mode allows administrators to enforce TLS encryption + """ + cmdline = "ds=nocloud;seedfrom=ftps://localhost:2121" + self._boot_with_cmdline(cmdline, client) + log = client.read_from_file("/var/log/cloud-init.log") + assert "Reading file from server over tls failed for url" in log + verify_clean_boot( + client, + ignore_warnings=self.expected_warnings, + require_warnings=[ + "Getting data from failed", + "Used fallback datasource", + ], + ) + + def test_nocloud_ftps_encrypted_server_succeeds( + self, client: IntegrationInstance + ): + """check that ftps:// encrypted ftp server succeeds + + this mode allows administrators to enforce TLS encryption + """ + cmdline = "ds=nocloud;seedfrom=ftps://localhost:2121" + self._boot_with_cmdline(cmdline, client, encrypted=True) + verify_clean_boot(client, ignore_warnings=self.expected_warnings) + + def test_nocloud_ftp_encrypted_server_fails( + self, client: IntegrationInstance + ): + """check that using ftp:// to encrypted ftp server fails""" + cmdline = "ds=nocloud;seedfrom=ftp://0.0.0.0:2121" + self._boot_with_cmdline(cmdline, client, encrypted=True) + verify_clean_boot(client, ignore_warnings=self.expected_warnings) diff --git a/tests/integration_tests/test_kernel_commandline_match.py b/tests/integration_tests/test_kernel_commandline_match.py index dbaf567cb642..06b70a06ff3b 100644 --- a/tests/integration_tests/test_kernel_commandline_match.py +++ b/tests/integration_tests/test_kernel_commandline_match.py @@ -6,64 +6,15 @@ from tests.integration_tests.conftest import get_validated_source from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.integration_settings import PLATFORM -from tests.integration_tests.util import wait_for_cloud_init +from tests.integration_tests.util import ( + override_kernel_cmdline, + restart_cloud_init, + wait_for_cloud_init, +) log = logging.getLogger("integration_testing") -def restart_cloud_init(c): - client = c - client.instance.shutdown(wait=False) - try: - client.instance.wait_for_state("STOPPED", num_retries=20) - except RuntimeError as e: - log.warning( - "Retrying shutdown due to timeout on initial shutdown request %s", - str(e), - ) - client.instance.shutdown() - - client.instance.execute_via_ssh = False - client.instance.start() - client.execute("cloud-init status --wait") - - -def override_kernel_cmdline(ds_str: str, c: IntegrationInstance): - """ - Configure grub's kernel command line to tell cloud-init to use OpenStack - - even though LXD should naturally be detected. - - This runs on LXD, but forces cloud-init to attempt to run OpenStack. - This will inevitably fail on LXD, but we only care that it tried - on - Ironic, for example, it will succeed. - """ - client = c - - # The final output in /etc/default/grub should be: - # - # GRUB_CMDLINE_LINUX="'ds=nocloud;s=http://my-url/'" - # - # That ensures that the kernel commandline passed into - # /boot/efi/EFI/ubuntu/grub.cfg will be properly single-quoted - # - # Example: - # - # linux /boot/vmlinuz-5.15.0-1030-kvm ro 'ds=nocloud;s=http://my-url/' - # - # Not doing this will result in a semicolon-delimited ds argument - # terminating the kernel arguments prematurely. - client.execute('printf "GRUB_CMDLINE_LINUX=\\"" >> /etc/default/grub') - client.execute('printf "\'" >> /etc/default/grub') - client.execute(f"printf '{ds_str}' >> /etc/default/grub") - client.execute('printf "\'\\"" >> /etc/default/grub') - - # We should probably include non-systemd distros at some point. This should - # most likely be as simple as updating the output path for grub-mkconfig - client.execute("grub-mkconfig -o /boot/efi/EFI/ubuntu/grub.cfg") - client.execute("cloud-init clean --logs") - restart_cloud_init(client) - - @pytest.mark.skipif(PLATFORM != "lxd_vm", reason="Modifies grub config") @pytest.mark.lxd_use_exec @pytest.mark.parametrize( diff --git a/tests/integration_tests/util.py b/tests/integration_tests/util.py index 8e10b4872b83..b45ba1015992 100644 --- a/tests/integration_tests/util.py +++ b/tests/integration_tests/util.py @@ -1,3 +1,4 @@ +import json import logging import multiprocessing import os @@ -8,12 +9,14 @@ from functools import lru_cache from itertools import chain from pathlib import Path -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, List, Optional, Set, Union import pytest from cloudinit.subp import subp +LOG = logging.getLogger("integration_testing.util") + if TYPE_CHECKING: # instances.py has imports util.py, so avoid circular import from tests.integration_tests.instances import IntegrationInstance @@ -42,6 +45,132 @@ def verify_ordered_items_in_text(to_verify: list, text: str): index = matched.start() +def format_found(header: str, items: list) -> str: + """Helper function to format assertion message """ + + # do nothing, allows this formatter to be "stackable" + if not items: + return "" + + # if only one error put the header and the error message on a single line + if 1 == len(items): + return f"\n{header}: {items.pop(0)}" + + # otherwise make a list after header + else: + return f"\n{header}:\n\t- " + "\n\t- ".join(items) + + +def verify_clean_boot( + c: "IntegrationInstance", + ignore_warnings: Optional[Union[List[str], bool]] = None, + ignore_errors: Optional[Union[List[str], bool]] = None, + require_warnings: Optional[list] = None, + require_errors: Optional[list] = None, +): + """raise assertions if the client experienced unexpected warnings or errors + + fail when an required error isn't found + + This function is similar to verify_clean_log, hence the similar name. + + differences from verify_clean_log: + + - more expressive syntax + - extensible (can be easily extended for other log levels) + - less resource intensive (no log copying required) + - nice error formatting + + c: test instance + ignored_warnings: list of expected warnings to ignore, + or true to ignore all + ignored_errors: list of expected errors to ignore, or true to ignore all + require_warnings: Optional[list] = None, + require_errors: Optional[list] = None, + fail_when_expected_not_found: optional list of expected errors + """ + unexpected_warnings = [] + unexpected_errors = [] + ignore_errors = ignore_errors or [] + ignore_warnings = ignore_warnings or [] + require_errors = require_errors or [] + require_warnings = require_warnings or [] + status = json.loads(c.execute("cloud-init status --format=json")) + + unexpected_errors = set() + unexpected_warnings = set() + + required_warnings_found = set() + required_errors_found = set() + + for current_error in status["errors"]: + + # check for required errors + for expected in require_errors: + if expected in current_error: + required_errors_found.add(expected) + + # check for unexpected errors + if ignore_errors is True: + continue + for expected in [*ignore_errors, *require_errors]: + if expected in current_error: + break + else: + unexpected_errors.add(current_error) + + # check for unexpected warnings + for current_warning in status["recoverable_errors"].get("WARNING", []): + + # check for required warnings + for expected in require_warnings: + if expected in current_warning: + required_warnings_found.add(expected) + + # check for unexpected warnings + if ignore_warnings is True: + continue + for expected in [*ignore_warnings, *require_warnings]: + if expected in current_warning: + break + else: + unexpected_warnings.add(current_warning) + + required_errors_not_found = set(require_errors) - required_errors_found + required_warnings_not_found = ( + set(require_warnings) - required_warnings_found + ) + + errors = [ + *unexpected_errors, + *required_errors_not_found, + *unexpected_warnings, + *required_warnings_not_found, + ] + if errors: + message = "" + # if there is only one message, don't include the generic header + # so that the user can read the exact message in the pytest summary + if len(errors) > 1: + # more than one error, so include a generic message + message += "Unexpected warnings or errors found" + + # errors are probably more important, order them first + message += format_found( + "Found unexpected errors", list(unexpected_errors) + ) + message += format_found( + "Required errors not found", list(required_errors_not_found) + ) + message += format_found( + "Found unexpected warnings", list(unexpected_warnings) + ) + message += format_found( + "Required warnings not found", list(required_warnings_not_found) + ) + assert not errors, message + + def verify_clean_log(log: str, ignore_deprecations: bool = True): """Assert no unexpected tracebacks or warnings in logs""" if ignore_deprecations: @@ -220,3 +349,63 @@ def get_feature_flag_value(client: "IntegrationInstance", key): if "NameError" in value: raise NameError(f"name '{key}' is not defined") return value + + +def restart_cloud_init(c: "IntegrationInstance"): + """restart cloud-init on an instance, wait for cloud-init to boot + + c: instance to restart + """ + client = c + client.instance.shutdown(wait=False) + try: + client.instance.wait_for_state("STOPPED", num_retries=20) + except RuntimeError as e: + log.warning( + "Retrying shutdown due to timeout on initial shutdown request %s", + str(e), + ) + client.instance.shutdown() + + # required for lxc + client.instance.execute_via_ssh = False + client.instance.start() + client.execute("cloud-init status --wait") + + +def override_kernel_cmdline(ds_str: str, c: "IntegrationInstance"): + """set the kernel commandline and reboot, return after boot done + + This will not work with containers. This is only tested with lxd vms + but in theory should work on any virtual machine using grub. + + ds_str: the string that will be inserted into /proc/cmdline + c: instance to set kernel commandline for + """ + client = c + + # The final output in /etc/default/grub should be: + # + # GRUB_CMDLINE_LINUX="'ds=nocloud;s=http://my-url/'" + # + # That ensures that the kernel commandline passed into + # /boot/efi/EFI/ubuntu/grub.cfg will be properly single-quoted + # + # Example: + # + # linux /boot/vmlinuz-5.15.0-1030-kvm ro 'ds=nocloud;s=http://my-url/' + # + # Not doing this will result in a semicolon-delimited ds argument + # terminating the kernel arguments prematurely. + assert client.execute( + 'printf "GRUB_CMDLINE_LINUX=\\"" >> /etc/default/grub' + ).ok + assert client.execute('printf "\'" >> /etc/default/grub').ok + assert client.execute(f"printf '{ds_str}' >> /etc/default/grub").ok + assert client.execute('printf "\'\\"" >> /etc/default/grub').ok + + # We should probably include non-systemd distros at some point. This should + # most likely be as simple as updating the output path for grub-mkconfig + assert client.execute("grub-mkconfig -o /boot/efi/EFI/ubuntu/grub.cfg").ok + assert client.execute("cloud-init clean --logs").ok + restart_cloud_init(client)