diff --git a/cloudinit/net/netplan.py b/cloudinit/net/netplan.py index 32047c0c90b..43dce44777a 100644 --- a/cloudinit/net/netplan.py +++ b/cloudinit/net/netplan.py @@ -5,6 +5,7 @@ import logging import os import textwrap +from tempfile import SpooledTemporaryFile from typing import Optional, cast from cloudinit import features, safeyaml, subp, util @@ -224,6 +225,79 @@ def _clean_default(target=None): os.unlink(f) +def netplan_api_write_yaml_file(net_config_content: str) -> bool: + """Use netplan.State._write_yaml_file to write netplan config + + Where netplan python API exists, prefer to use of the private + _write_yaml_file to ensure proper permissions and file locations + are chosen by the netplan python bindings in the environment. + + By calling the netplan API, allow netplan versions to change behavior + related to file permissions and treatment of sensitive configuration + under the API call to _write_yaml_file. + + In future netplan releases, security-sensitive config may be written to + separate file or directory paths than world-readable configuration parts. + """ + try: + from netplan.parser import Parser # type: ignore + from netplan.state import State # type: ignore + except ImportError: + LOG.debug( + "No netplan python module. Fallback to write %s", + CLOUDINIT_NETPLAN_FILE, + ) + return False + try: + with SpooledTemporaryFile(mode="w") as f: + f.write(net_config_content) + f.flush() + f.seek(0, io.SEEK_SET) + parser = Parser() + parser.load_yaml(f) + state_output_file = State() + state_output_file.import_parser_results(parser) + + # Write our desired basename 50-cloud-init.yaml, allow netplan to + # determine default root-dir /etc/netplan and/or specialized + # filenames or read permissions based on whether this config + # contains secrets. + state_output_file._write_yaml_file( + os.path.basename(CLOUDINIT_NETPLAN_FILE) + ) + except Exception as e: + LOG.warning( + "Unable to render network config using netplan python module." + " Fallback to write %s. %s", + CLOUDINIT_NETPLAN_FILE, + e, + ) + return False + LOG.debug("Rendered netplan config using netplan python API") + return True + + +def has_netplan_config_changed(cfg_file: str, content: str) -> bool: + """Return True when new netplan config has changed vs previous.""" + if not os.path.exists(cfg_file): + # This is our first write of netplan's cfg_file, representing change. + return True + # Check prev cfg vs current cfg. Ignore comments + prior_cfg = util.load_yaml(util.load_text_file(cfg_file)) + return prior_cfg != util.load_yaml(content) + + +def fallback_write_netplan_yaml(cfg_file: str, content: str): + """Write netplan config to cfg_file because python API was unavailable.""" + mode = 0o600 if features.NETPLAN_CONFIG_ROOT_READ_ONLY else 0o644 + if os.path.exists(cfg_file): + current_mode = util.get_permissions(cfg_file) + if current_mode & mode == current_mode: + # preserve mode if existing perms are more strict + mode = current_mode + util.write_file(cfg_file, content, mode=mode) + + class Renderer(renderer.Renderer): """Renders network information in a /etc/netplan/network.yaml format.""" @@ -276,33 +350,22 @@ def render_network_state( header += "\n" content = header + content - # determine if existing config files have the same content - same_content = False - if os.path.exists(fpnplan): - hashed_content = util.hash_buffer(io.BytesIO(content.encode())) - with open(fpnplan, "rb") as f: - hashed_original_content = util.hash_buffer(f) - if hashed_content == hashed_original_content: - same_content = True - - mode = 0o600 if features.NETPLAN_CONFIG_ROOT_READ_ONLY else 0o644 - if not same_content and os.path.exists(fpnplan): - current_mode = util.get_permissions(fpnplan) - if current_mode & mode == current_mode: - # preserve mode if existing perms are more strict than default - mode = current_mode - util.write_file(fpnplan, content, mode=mode) + netplan_config_changed = has_netplan_config_changed(fpnplan, content) + if not netplan_api_write_yaml_file(content): + fallback_write_netplan_yaml(fpnplan, content) if self.clean_default: _clean_default(target=target) - self._netplan_generate(run=self._postcmds, same_content=same_content) + self._netplan_generate( + run=self._postcmds, config_changed=netplan_config_changed + ) self._net_setup_link(run=self._postcmds) - def _netplan_generate(self, run: bool = False, same_content: bool = False): + def _netplan_generate(self, run: bool, config_changed: bool): if not run: - LOG.debug("netplan generate postcmd disabled") + LOG.debug("netplan generate postcmds disabled") return - if same_content: + if not config_changed: LOG.debug( "skipping call to `netplan generate`." " reason: identical netplan config" diff --git a/tests/integration_tests/modules/test_combined.py b/tests/integration_tests/modules/test_combined.py index 29219155e6d..8e7ec3c7f5b 100644 --- a/tests/integration_tests/modules/test_combined.py +++ b/tests/integration_tests/modules/test_combined.py @@ -19,7 +19,7 @@ from tests.integration_tests.decorators import retry from tests.integration_tests.instances import IntegrationInstance from tests.integration_tests.integration_settings import PLATFORM -from tests.integration_tests.releases import CURRENT_RELEASE, IS_UBUNTU +from tests.integration_tests.releases import CURRENT_RELEASE, IS_UBUNTU, MANTIC from tests.integration_tests.util import ( get_feature_flag_value, get_inactive_modules, @@ -89,6 +89,14 @@ def test_netplan_permissions(self, class_client: IntegrationInstance): """ Test that netplan config file is generated with proper permissions """ + log = class_client.read_from_file("/var/log/cloud-init.log") + if CURRENT_RELEASE < MANTIC: + assert ( + "No netplan python module. Fallback to write" + " /etc/netplan/50-cloud-init.yaml" in log + ) + else: + assert "Rendered netplan config using netplan python API" in log file_perms = class_client.execute( "stat -c %a /etc/netplan/50-cloud-init.yaml" ) diff --git a/tests/integration_tests/test_networking.py b/tests/integration_tests/test_networking.py index 6c133c33151..3a53b8da6c0 100644 --- a/tests/integration_tests/test_networking.py +++ b/tests/integration_tests/test_networking.py @@ -66,6 +66,13 @@ def test_skip(self, client: IntegrationInstance): client.execute( "mv /var/log/cloud-init.log /var/log/cloud-init.log.bak" ) + if CURRENT_RELEASE < MANTIC: + assert ( + "No netplan python module. Fallback to write" + " /etc/netplan/50-cloud-init.yaml" in log + ) + else: + assert "Rendered netplan config using netplan python API" in log netplan = yaml.safe_load( client.execute("cat /etc/netplan/50-cloud-init.yaml") ) diff --git a/tests/unittests/net/test_netplan.py b/tests/unittests/net/test_netplan.py index 28b0891d9f2..86bb32b106c 100644 --- a/tests/unittests/net/test_netplan.py +++ b/tests/unittests/net/test_netplan.py @@ -17,25 +17,37 @@ def renderer(tmp_path): class TestNetplanRenderer: - @pytest.mark.parametrize("write_config", [True, False]) - def test_skip_netplan_generate(self, renderer, write_config, mocker): - """Check `netplan generate` is called if netplan config has changed.""" + @pytest.mark.parametrize( + "orig_config", ["", "{'orig_cfg': true}", "{'new_cfg': true}"] + ) + def test_skip_netplan_generate(self, renderer, orig_config, mocker): + """Check `netplan generate` called when netplan config has changed.""" header = "\n" - content = "foo" + new_config = "{'new_cfg': true}" renderer_mocks = mocker.patch.multiple( renderer, - _render_content=mocker.Mock(return_value=content), + _render_content=mocker.Mock(return_value=new_config), _netplan_generate=mocker.DEFAULT, _net_setup_link=mocker.DEFAULT, ) - if write_config: + if orig_config: util.ensure_dir(os.path.dirname(renderer.netplan_path)) with open(renderer.netplan_path, "w") as f: f.write(header) - f.write(content) - + f.write(orig_config) renderer.render_network_state(mocker.Mock()) - + config_changed = bool(orig_config != new_config) assert renderer_mocks["_netplan_generate"].call_args_list == [ - mock.call(run=True, same_content=write_config) + mock.call(run=True, config_changed=config_changed) ] + + +class TestNetplanAPIWriteYAMLFile: + def test_no_netplan_python_api(self, caplog): + """Skip when no netplan available.""" + with mock.patch("builtins.__import__", side_effect=ImportError): + netplan.netplan_api_write_yaml_file("network: {version: 2}") + assert ( + "No netplan python module. Fallback to write" + f" {netplan.CLOUDINIT_NETPLAN_FILE}" in caplog.text + ) diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 98c38b5345b..970f3338451 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -3394,7 +3394,7 @@ def test_netplan_render_calls_postcmds( mock_subp.side_effect = iter([subp.ProcessExecutionError]) renderer.render_network_state(ns, target=render_dir) - mock_netplan_generate.assert_called_with(run=True, same_content=False) + mock_netplan_generate.assert_called_with(run=True, config_changed=True) mock_net_setup_link.assert_called_with(run=True) @mock.patch("cloudinit.util.SeLinuxGuard")