From f561662dd672c36e78f22776ba26e8ce654d9126 Mon Sep 17 00:00:00 2001 From: Stanislav Datskevych Date: Thu, 14 Sep 2023 18:38:05 +0000 Subject: [PATCH] Implement DataSourceCloudStack.get_hostname() that returns correct hostname and FQDN --- cloudinit/sources/DataSourceCloudStack.py | 52 ++++++ tests/unittests/sources/test_cloudstack.py | 200 ++++++++++++++++++++- 2 files changed, 251 insertions(+), 1 deletion(-) diff --git a/cloudinit/sources/DataSourceCloudStack.py b/cloudinit/sources/DataSourceCloudStack.py index fd2482a3fb6..bc6c7d224fd 100644 --- a/cloudinit/sources/DataSourceCloudStack.py +++ b/cloudinit/sources/DataSourceCloudStack.py @@ -93,6 +93,58 @@ def __init__(self, sys_cfg, distro, paths): self.metadata_address = "http://%s/" % (self.vr_addr,) self.cfg = {} + def _get_domainname(self): + """ + Try obtaining a "domain-name" DHCP lease parameter: + - From systemd-networkd lease + - From dhclient lease + """ + LOG.debug("Try obtaining domain name from networkd leases") + domainname = dhcp.networkd_get_option_from_leases("DOMAINNAME") + if domainname: + return domainname + LOG.debug( + "Could not obtain FQDN from networkd leases. " + "Falling back to ISC dhclient" + ) + + lease_file = dhcp.IscDhclient.get_latest_lease() + if not lease_file: + LOG.debug("Dhclient lease file wasn't found") + return None + + latest_lease = dhcp.IscDhclient.parse_dhcp_lease_file(lease_file)[-1] + domainname = latest_lease.get("domain-name", None) + return domainname if domainname else None + + def get_hostname( + self, + fqdn=False, + resolve_ip=False, + metadata_only=False, + ): + """ + Returns instance's hostname / fqdn + First probes the parent class method. + + If fqdn is requested, and the parent method didn't return it, + then attach the domain-name from DHCP response. + """ + hostname = super().get_hostname(fqdn, resolve_ip, metadata_only) + if fqdn and "." not in hostname.hostname: + LOG.debug("FQDN requested") + domainname = self._get_domainname() + if domainname: + fqdn = f"{hostname.hostname}.{domainname}" + LOG.debug("Obtained the following FQDN: %s", fqdn) + return sources.DataSourceHostname(fqdn, hostname.is_default) + LOG.debug( + "Could not determine domain name for FQDN. " + "Fall back to hostname as an FQDN: %s", + fqdn, + ) + return hostname + def wait_for_metadata_service(self): url_params = self.get_url_params() diff --git a/tests/unittests/sources/test_cloudstack.py b/tests/unittests/sources/test_cloudstack.py index 463a9c7a5ed..1ea4889a45c 100644 --- a/tests/unittests/sources/test_cloudstack.py +++ b/tests/unittests/sources/test_cloudstack.py @@ -5,11 +5,209 @@ from cloudinit import helpers, util from cloudinit.net.dhcp import IscDhclient +from cloudinit.sources import DataSourceHostname from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack from tests.unittests.helpers import CiTestCase, ExitStack, mock -MOD_PATH = "cloudinit.sources.DataSourceCloudStack" +SOURCES_PATH = "cloudinit.sources" +MOD_PATH = SOURCES_PATH + ".DataSourceCloudStack" DS_PATH = MOD_PATH + ".DataSourceCloudStack" +DHCP_MOD_PATH = "cloudinit.net.dhcp" + + +class TestCloudStackHostname(CiTestCase): + def setUp(self): + super(TestCloudStackHostname, self).setUp() + self.patches = ExitStack() + self.addCleanup(self.patches.close) + self.hostname = "vm-hostname" + self.networkd_domainname = "networkd.local" + self.isc_dhclient_domainname = "dhclient.local" + + # Mock the parent class get_hostname() method to return + # a non-fqdn hostname + get_hostname_parent = mock.MagicMock( + return_value=DataSourceHostname(self.hostname, True) + ) + self.patches.enter_context( + mock.patch( + SOURCES_PATH + ".DataSource.get_hostname", get_hostname_parent + ) + ) + + # Mock cloudinit.net.dhcp.networkd_get_option_from_leases() method \ + # result since we don't have a DHCP client running + networkd_get_option_from_leases = mock.MagicMock( + return_value=self.networkd_domainname + ) + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + networkd_get_option_from_leases, + ) + ) + + # Mock cloudinit.net.dhcp.get_latest_lease() method \ + # result since we don't have a DHCP client running + isc_dhclient_get_latest_lease = mock.MagicMock( + return_value="/var/run/dhclient.eth0.lease" + ) + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".IscDhclient.get_latest_lease", + isc_dhclient_get_latest_lease, + ) + ) + + # Mock cloudinit.net.dhcp.networkd_get_option_from_leases() method \ + # result since we don't have a DHCP client running + parse_dhcp_lease_file = mock.MagicMock( + return_value=[ + { + "interface": "eth0", + "fixed-address": "192.168.0.1", + "subnet-mask": "255.255.255.0", + "routers": "192.168.0.1", + "domain-name": self.isc_dhclient_domainname, + "renew": "4 2017/07/27 18:02:30", + "expire": "5 2017/07/28 07:08:15", + } + ] + ) + + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".IscDhclient.parse_dhcp_lease_file", + parse_dhcp_lease_file, + ) + ) + + # Mock get_vr_address() method as it relies to + # parsing DHCP/networkd files + self.patches.enter_context( + mock.patch( + MOD_PATH + ".get_vr_address", + mock.MagicMock(return_value="192.168.0.1"), + ) + ) + + self.tmp = self.tmp_dir() + + def test_get_domainname_networkd(self): + """ + Test if DataSourceCloudStack._get_domainname() + gets domain name from systemd-networkd leases. + """ + ds = DataSourceCloudStack( + {}, None, helpers.Paths({"run_dir": self.tmp}) + ) + result = ds._get_domainname() + self.assertEqual(self.networkd_domainname, result) + + def test_get_domainname_isc_dhclient(self): + """ + Test if DataSourceCloudStack._get_domainname() + gets domain name from isc-dhcp-client leases + """ + + # Override systemd-networkd reply mock to None + # to force the code to fallback to IscDhclient + get_networkd_domain = mock.MagicMock(return_value=None) + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + ) + + ds = DataSourceCloudStack( + {}, None, helpers.Paths({"run_dir": self.tmp}) + ) + result = ds._get_domainname() + self.assertEqual(self.isc_dhclient_domainname, result) + + def test_get_hostname_non_fqdn(self): + """ + Test get_hostname() method implementation + with fqdn parameter=False. + It should call the parent class method and should + return its response intact. + """ + expected = DataSourceHostname(self.hostname, True) + + ds = DataSourceCloudStack( + {}, None, helpers.Paths({"run_dir": self.tmp}) + ) + result = ds.get_hostname(fqdn=False) + self.assertTupleEqual(expected, result) + + def test_get_hostname_fqdn(self): + """ + Test get_hostname() method implementation + with fqdn parameter=True. + It should look for domain name in DHCP leases. + """ + expected = DataSourceHostname( + self.hostname + "." + self.networkd_domainname, True + ) + + ds = DataSourceCloudStack( + {}, None, helpers.Paths({"run_dir": self.tmp}) + ) + result = ds.get_hostname(fqdn=True) + self.assertTupleEqual(expected, result) + + def test_get_hostname_fqdn_fallback(self): + """ + Test get_hostname() when some error happens + during domainname discovery. + + We mock both systemd-networkd discovery as None, + And the IscDhclient not having domain-name option + in the lease. + + It should return the hostname without domainname + in such cases. + """ + expected = DataSourceHostname(self.hostname, True) + + # Override systemd-networkd reply mock to None + # to force the code to fallback to IscDhclient + get_networkd_domain = mock.MagicMock(return_value=None) + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".networkd_get_option_from_leases", + get_networkd_domain, + ) + ) + + # Override IscDhclient.parse_dhcp_lease_file() + # to return a lease without domain-name option. + parse_dhcp_lease_file = mock.MagicMock( + return_value=[ + { + "interface": "eth0", + "fixed-address": "192.168.0.1", + "subnet-mask": "255.255.255.0", + "routers": "192.168.0.1", + "renew": "4 2017/07/27 18:02:30", + "expire": "5 2017/07/28 07:08:15", + } + ] + ) + + self.patches.enter_context( + mock.patch( + DHCP_MOD_PATH + ".IscDhclient.parse_dhcp_lease_file", + parse_dhcp_lease_file, + ) + ) + + ds = DataSourceCloudStack( + {}, None, helpers.Paths({"run_dir": self.tmp}) + ) + result = ds.get_hostname(fqdn=True) + self.assertTupleEqual(expected, result) class TestCloudStackPasswordFetching(CiTestCase):