Skip to content

Commit

Permalink
tests: cc_set_passoword update for systemd, non-systemd distros (#1449)
Browse files Browse the repository at this point in the history
- Also refactor a couple CiTestCases to pytest
- use get_cloud instead of self.tmp_cloud where possible

Co-authored-by: Chad Smith <chad.smith@canonical.com>
  • Loading branch information
TheRealFalcon and blackboxsw authored May 12, 2022
1 parent f0c457c commit 136ccae
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 148 deletions.
4 changes: 2 additions & 2 deletions cloudinit/config/cc_set_passwords.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def handle_ssh_pwauth(pw_auth, distro: Distro):
if e.exit_code == 3:
# Service is not running. Write ssh config.
LOG.warning(
"Writing config 'ssh_pwauth: %s'."
" SSH service '%s' will not be restarted because is stopped.",
"Writing config 'ssh_pwauth: %s'. SSH service '%s'"
" will not be restarted because it is stopped.",
pw_auth,
service,
)
Expand Down
301 changes: 155 additions & 146 deletions tests/unittests/config/test_cc_set_passwords.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.

import logging
from unittest import mock

import pytest
Expand All @@ -12,177 +13,183 @@
validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, skipUnlessJsonSchema
from tests.unittests.util import get_cloud

MODPATH = "cloudinit.config.cc_set_passwords."


class TestHandleSshPwauth(CiTestCase):
"""Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth."""
@pytest.fixture()
def mock_uses_systemd(mocker):
mocker.patch("cloudinit.distros.uses_systemd", return_value=True)

with_logs = True

class TestHandleSSHPwauth:
@pytest.mark.parametrize(
"uses_systemd,cmd",
(
(True, ["systemctl", "status", "ssh"]),
(False, ["service", "ssh", "status"]),
),
)
@mock.patch("cloudinit.distros.subp.subp")
def test_unknown_value_logs_warning(self, m_subp):
cloud = self.tmp_cloud(distro="ubuntu")
setpass.handle_ssh_pwauth("floo", cloud.distro)
self.assertIn(
"Unrecognized value: ssh_pwauth=floo", self.logs.getvalue()
)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config):
"""If systemctl in service cmd: systemctl restart name."""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
setpass.handle_ssh_pwauth(True, cloud.distro)
m_subp.assert_called_with(
["systemctl", "restart", "ssh"], capture=True
)
self.assertIn("DEBUG: Restarted the SSH daemon.", self.logs.getvalue())
def test_unknown_value_logs_warning(
self, m_subp, uses_systemd, cmd, caplog
):
cloud = get_cloud("ubuntu")
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=uses_systemd
):
setpass.handle_ssh_pwauth("floo", cloud.distro)
assert "Unrecognized value: ssh_pwauth=floo" in caplog.text
assert [mock.call(cmd, capture=True)] == m_subp.call_args_list

@mock.patch(MODPATH + "update_ssh_config", return_value=False)
@pytest.mark.parametrize(
"uses_systemd,ssh_updated,cmd,expected_log",
(
(
True,
True,
["systemctl", "restart", "ssh"],
"Restarted the SSH daemon.",
),
(
True,
False,
["systemctl", "status", "ssh"],
"No need to restart SSH",
),
(
False,
True,
["service", "ssh", "restart"],
"Restarted the SSH daemon.",
),
(
False,
False,
["service", "ssh", "status"],
"No need to restart SSH",
),
),
)
@mock.patch(MODPATH + "update_ssh_config")
@mock.patch("cloudinit.distros.subp.subp")
def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config):
"""If config is not updated, then no system restart should be done."""
cloud = self.tmp_cloud(distro="ubuntu")
setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
def test_restart_ssh_only_when_changes_made_and_ssh_installed(
self,
m_subp,
update_ssh_config,
uses_systemd,
ssh_updated,
cmd,
expected_log,
caplog,
):
update_ssh_config.return_value = ssh_updated
cloud = get_cloud("ubuntu")
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=uses_systemd
):
setpass.handle_ssh_pwauth(True, cloud.distro)
if ssh_updated:
m_subp.assert_called_with(cmd, capture=True)
else:
assert [mock.call(cmd, capture=True)] == m_subp.call_args_list
assert expected_log in "\n".join(
r.msg for r in caplog.records if r.levelname == "DEBUG"
)
self.assertIn("No need to restart SSH", self.logs.getvalue())

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config):
def test_unchanged_value_does_nothing(
self, m_subp, update_ssh_config, mock_uses_systemd
):
"""If 'unchanged', then no updates to config and no restart."""
cloud = self.tmp_cloud(distro="ubuntu")
update_ssh_config.assert_not_called()
cloud = get_cloud("ubuntu")
setpass.handle_ssh_pwauth("unchanged", cloud.distro)
m_update_ssh_config.assert_not_called()
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
)
assert [
mock.call(["systemctl", "status", "ssh"], capture=True)
] == m_subp.call_args_list

@pytest.mark.allow_subp_for("systemctl")
@mock.patch("cloudinit.distros.subp.subp")
def test_valid_change_values(self, m_subp):
"""If value is a valid changen value, then update should be called."""
cloud = self.tmp_cloud(distro="ubuntu")
def test_valid_value_changes_updates_ssh(self, m_subp, mock_uses_systemd):
"""If value is a valid changed value, then update will be called."""
cloud = get_cloud("ubuntu")
upname = MODPATH + "update_ssh_config"
optname = "PasswordAuthentication"
for n, value in enumerate(util.FALSE_STRINGS + util.TRUE_STRINGS, 1):
optval = "yes" if value in util.TRUE_STRINGS else "no"
with mock.patch(upname, return_value=False) as m_update:
setpass.handle_ssh_pwauth(value, cloud.distro)
self.assertEqual(
mock.call({optname: optval}), m_update.call_args_list[-1]
assert (
mock.call({optname: optval}) == m_update.call_args_list[-1]
)
self.assertEqual(m_subp.call_count, n)
self.assertEqual(
mock.call(["systemctl", "status", "ssh"], capture=True),
m_subp.call_args_list[-1],
)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_runing(
self, m_subp, m_update_ssh_config
):
"""If the ssh service is not running, then the config is updated and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr="Service is not running.", exit_code=3
)
)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Writing config 'ssh_pwauth: True'."
r" SSH service 'ssh' will not be restarted because is stopped.",
self.logs.getvalue(),
)
self.assertIn(
r"DEBUG: Not restarting SSH service: service is stopped.",
self.logs.getvalue(),
)
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 1)
self.assertEqual(m_subp.call_count, 0)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_installed(
self, m_subp, m_update_ssh_config
):
"""If the ssh service is not installed, then no updates config and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr="Service is not installed.", exit_code=4
)
)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Ignoring config 'ssh_pwauth: True'."
r" SSH service 'ssh' is not installed.",
self.logs.getvalue(),
)
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(m_subp.call_count, 0)
assert m_subp.call_count == n

@pytest.mark.parametrize(
"raised_error,warning_log,debug_log,update_ssh_call_count",
(
(
subp.ProcessExecutionError(
stderr="Service is not running.", exit_code=3
),
"Writing config 'ssh_pwauth: True'. SSH service"
" 'ssh' will not be restarted because it is stopped.",
"Not restarting SSH service: service is stopped.",
1,
),
(
subp.ProcessExecutionError(
stderr="Service is not installed.", exit_code=4
),
"Ignoring config 'ssh_pwauth: True'. SSH service 'ssh' is"
" not installed.",
None,
0,
),
(
subp.ProcessExecutionError(
stderr="Service is not available.", exit_code=2
),
"Ignoring config 'ssh_pwauth: True'. SSH service 'ssh'"
" is not available. Error: ",
None,
0,
),
),
)
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_available(
self, m_subp, m_update_ssh_config
def test_no_restart_when_service_is_not_running(
self,
m_subp,
m_update_ssh_config,
raised_error,
warning_log,
debug_log,
update_ssh_call_count,
caplog,
):
"""If the ssh service is not available, then no updates config and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
process_error = "Service is not available."
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr=process_error, exit_code=2
)
)
"""Write config but don't restart SSH service when not running."""
cloud = get_cloud("ubuntu")
cloud.distro.manage_service = mock.Mock(side_effect=raised_error)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Ignoring config 'ssh_pwauth: True'."
r" SSH service 'ssh' is not available. Error: ",
self.logs.getvalue(),
)
self.assertIn(process_error, self.logs.getvalue())
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(m_subp.call_count, 0)
logs_by_level = {logging.WARNING: [], logging.DEBUG: []}
for _, level, msg in caplog.record_tuples:
logs_by_level[level].append(msg)
assert warning_log in "\n".join(logs_by_level[logging.WARNING])
if debug_log:
assert debug_log in logs_by_level[logging.DEBUG]
assert [
mock.call("status", "ssh")
] == cloud.distro.manage_service.call_args_list
assert m_update_ssh_config.call_count == update_ssh_call_count
m_subp.assert_not_called()


@pytest.mark.usefixtures("mock_uses_systemd")
class TestSetPasswordsHandle(CiTestCase):
"""Test cc_set_passwords.handle"""

Expand Down Expand Up @@ -231,19 +238,21 @@ def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
called = chpasswd.call_args[0][1]
self.assertEqual(valid, called)

@mock.patch(MODPATH + "util.is_BSD")
@mock.patch(MODPATH + "util.is_BSD", return_value=True)
@mock.patch(MODPATH + "subp.subp")
def test_bsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
self, m_subp, m_is_bsd
):
"""BSD don't use chpasswd"""
m_is_bsd.return_value = True
cloud = self.tmp_cloud(distro="freebsd")
cloud = get_cloud(distro="freebsd")
valid_pwds = ["ubuntu:passw0rd"]
cfg = {"chpasswd": {"list": valid_pwds}}
setpass.handle(
"IGNORED", cfg=cfg, cloud=cloud, log=self.logger, args=[]
)
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=False
):
setpass.handle(
"IGNORED", cfg=cfg, cloud=cloud, log=self.logger, args=[]
)
self.assertEqual(
[
mock.call(
Expand All @@ -252,7 +261,7 @@ def test_bsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
logstring="chpasswd for ubuntu",
),
mock.call(["pw", "usermod", "ubuntu", "-p", "01-Jan-1970"]),
mock.call(["systemctl", "status", "sshd"], capture=True),
mock.call(["service", "sshd", "status"], capture=True),
],
m_subp.call_args_list,
)
Expand Down

0 comments on commit 136ccae

Please sign in to comment.