diff --git a/convert2rhel/backup.py b/convert2rhel/backup.py index b2c4ec13bf..8d21ad2d04 100644 --- a/convert2rhel/backup.py +++ b/convert2rhel/backup.py @@ -348,6 +348,125 @@ def restore(self): super(RestorableRpmKey, self).restore() +class NewRestorableFile(RestorableChange): + def __init__(self, filepath): + super(NewRestorableFile, self).__init__() + self.filepath = filepath + + def enable(self): + """Save current version of a file""" + # Prevent multiple backup + if self.enabled: + return + + loggerinst.info("Backing up %s." % self.filepath) + if os.path.isfile(self.filepath): + try: + shutil.copy2(self.filepath, BACKUP_DIR) + loggerinst.debug("Copied %s to %s." % (self.filepath, BACKUP_DIR)) + + except (OSError, IOError) as err: + # IOError for py2 and OSError for py3 + loggerinst.critical("Error(%s): %s" % (err.errno, err.strerror)) + else: + loggerinst.info("Can't find %s.", self.filepath) + + # Set the enabled value + super(NewRestorableFile, self).enable() + + def restore(self, rollback=True): + """Restore a previously backed up file""" + if rollback: + loggerinst.task("Rollback: Restore %s from backup" % self.filepath) + else: + loggerinst.info("Restoring %s from backup" % self.filepath) + + # We do not have backup or not backed up by this + if not self.enabled: + loggerinst.info("%s hasn't been backed up." % self.filepath) + return + + backup_filepath = os.path.join(BACKUP_DIR, os.path.basename(self.filepath)) + + if not os.path.isfile(backup_filepath): + loggerinst.info("%s hasn't been backed up." % self.filepath) + return + + try: + shutil.copy2(backup_filepath, self.filepath) + except (OSError, IOError) as err: + # Do not call 'critical' which would halt the program. We are in + # a rollback phase now and we want to rollback as much as possible. + # IOError for py2 and OSError for py3 + loggerinst.warning("Error(%s): %s" % (err.errno, err.strerror)) + return + + if rollback: + loggerinst.info("File %s restored." % self.filepath) + super(NewRestorableFile, self).restore() + else: + loggerinst.debug("File %s restored." % self.filepath) + # not setting enabled to false since this is not being rollback + # restoring the backed up file for conversion purposes + + # Probably will be deprecated and unusable since using the BackupController + # Depends on specific usage of this + def remove(self): + """Remove restored file from original place, backup isn't removed""" + try: + os.remove(self.filepath) + loggerinst.debug("File %s removed." % self.filepath) + except (OSError, IOError): + loggerinst.debug("Couldn't remove restored file %s" % self.filepath) + + +class MissingFile(RestorableChange): + """ + File not present before conversion. Could be created during + conversion so should be removed in rollback. + """ + + def __init__(self, filepath): + super(MissingFile, self).__init__() + self.filepath = filepath + + def enable(self): + if self.enabled: + return + + if os.path.isfile(self.filepath): + loggerinst.debug( + "Shouldn't be called, file {filepath} is present before conversion".format(filepath=self.filepath) + ) + return + + loggerinst.info("Marking file {filepath} as missing on system.".format(filepath=self.filepath)) + super(MissingFile, self).enable() + + def restore(self): + if not self.enabled: + return + + loggerinst.task("Rollback: remove file created during conversion {filepath}".format(filepath=self.filepath)) + + if not os.path.isfile(self.filepath): + loggerinst.info("File {filepath} wasn't created during conversion".format(filepath=self.filepath)) + else: + try: + os.remove(self.filepath) + loggerinst.info("File {filepath} removed".format(filepath=self.filepath)) + except OSError as err: + # Do not call 'critical' which would halt the program. We are in + # a rollback phase now and we want to rollback as much as possible. + loggerinst.warning("Error(%s): %s" % (err.errno, err.strerror)) + return + + super(MissingFile, self).restore() + + +# Legacy class for creating the restorable file +# Can be removed after porting to new BackupController is finished +# https://issues.redhat.com/browse/RHELC-1153 class RestorableFile: def __init__(self, filepath): self.filepath = filepath diff --git a/convert2rhel/unit_tests/backup_test.py b/convert2rhel/unit_tests/backup_test.py index ba8ee28eef..bb954abb85 100644 --- a/convert2rhel/unit_tests/backup_test.py +++ b/convert2rhel/unit_tests/backup_test.py @@ -1,13 +1,14 @@ __metaclass__ = type import os +import shutil import pytest import six from convert2rhel import backup, exceptions, repo, unit_tests, utils # Imports unit_tests/__init__.py from convert2rhel.unit_tests import DownloadPkgMocked, ErrorOnRestoreRestorable, MinimalRestorable, RunSubprocessMocked -from convert2rhel.unit_tests.conftest import centos8 +from convert2rhel.unit_tests.conftest import centos7, centos8 six.add_move(six.MovedModule("mock", "mock", "unittest.mock")) @@ -656,6 +657,228 @@ def test_restore_previously_installed(self, run_subprocess_with_empty_rpmdb, rpm assert rpm_key.enabled is False +class TestNewRestorableFile: + @pytest.fixture + def get_backup_file_dir(self, tmpdir, filename="filename", content="content", backup_dir_name="backup"): + """Prepare the file for backup and backup folder""" + file_for_backup = tmpdir.join(filename) + file_for_backup.write(content) + backup_dir = tmpdir.mkdir(backup_dir_name) + return file_for_backup, backup_dir + + @pytest.mark.parametrize( + ("filename", "message_backup", "message_remove", "message_restore", "backup_exists"), + ( + ( + "filename", + "Copied {file_for_backup} to {backup_dir}.", + "File {file_for_backup} removed.", + "File {file_for_backup} restored.", + True, + ), + ( + None, + "Can't find {file_for_backup}.", + "Couldn't remove restored file {file_for_backup}", + "{file_for_backup} hasn't been backed up.", + False, + ), + ), + ) + def test_restorablefile_all( + self, + caplog, + filename, + get_backup_file_dir, + monkeypatch, + message_backup, + message_remove, + message_restore, + backup_exists, + ): + """Test the complete process of backup and restore the file using the BackupController. + Can be used as an example how to work with BackupController""" + # Prepare file and folder for backup + file_for_backup, backup_dir = get_backup_file_dir + + if filename: + # location, where the file should be after backup + backedup_file = os.path.join(str(backup_dir), filename) + else: + file_for_backup = "/invalid/path/invalid_name" + backedup_file = os.path.join(str(backup_dir), "invalid_name") + + # Format the messages which should be in output + message_backup = message_backup.format(file_for_backup=str(file_for_backup), backup_dir=str(backup_dir)) + message_restore = message_restore.format(file_for_backup=str(file_for_backup)) + message_remove = message_remove.format(file_for_backup=str(file_for_backup)) + + monkeypatch.setattr(backup, "BACKUP_DIR", str(backup_dir)) + + backup_controller = backup.BackupController() + file_backup = backup.NewRestorableFile(str(file_for_backup)) + + # Create the backup, testing method enable + backup_controller.push(file_backup) + assert message_backup in caplog.records[-1].message + assert os.path.isfile(backedup_file) == backup_exists + + # Remove the file from original place, testing method remove + file_backup.remove() + assert message_remove in caplog.records[-1].message + assert os.path.isfile(backedup_file) == backup_exists + if filename: + assert not os.path.isfile(str(file_for_backup)) + + # Restore the file + backup_controller.pop() + assert message_restore in caplog.records[-1].message + if filename: + assert os.path.isfile(str(file_for_backup)) + + @pytest.mark.parametrize( + ("filename", "enabled_preset", "enabled_value", "message", "backed_up"), + ( + ("filename", False, True, "Copied {file_for_backup} to {backup_dir}.", True), + (None, False, True, "Can't find {file_for_backup}.", False), + ("filename", True, True, "", False), + ), + ) + def test_restorablefile_enable( + self, + filename, + get_backup_file_dir, + monkeypatch, + enabled_preset, + enabled_value, + message, + caplog, + backed_up, + ): + # Prepare file and folder for backup + file_for_backup, backup_dir = get_backup_file_dir + # Prepare path where the file should be backed up + if filename: + backedup_file = os.path.join(str(backup_dir), filename) + else: + file_for_backup = "/invalid/path/invalid_name" + backedup_file = os.path.join(str(backup_dir), "invalid_name") + + # Prepare message + message = message.format(file_for_backup=file_for_backup, backup_dir=backup_dir) + + monkeypatch.setattr(backup, "BACKUP_DIR", str(backup_dir)) + file_backup = backup.NewRestorableFile(str(file_for_backup)) + # Set the enabled value if needed, default is False + file_backup.enabled = enabled_preset + + # Run the backup + file_backup.enable() + + assert os.path.isfile(backedup_file) == backed_up + assert file_backup.enabled == enabled_value + if message: + assert message in caplog.records[-1].message + else: + assert not caplog.records + + @pytest.mark.parametrize( + ("filename", "messages", "enabled", "rollback"), + ( + ("filename", ["Rollback: Restore {orig_path} from backup", "File {orig_path} restored."], True, True), + (None, ["Rollback: Restore {orig_path} from backup", "{orig_path} hasn't been backed up."], True, True), + ( + "filename", + ["Rollback: Restore {orig_path} from backup", "{orig_path} hasn't been backed up."], + False, + True, + ), + ("filename", ["Restoring {orig_path} from backup", "File {orig_path} restored."], True, False), + ), + ) + def test_restorablefile_restore(self, tmpdir, monkeypatch, caplog, filename, messages, enabled, rollback): + backup_dir = tmpdir.mkdir("backup") + orig_path = os.path.join(str(tmpdir), "filename") + + if filename: + file_for_restore = tmpdir.join("backup/filename") + file_for_restore.write("content") + + for i, _ in enumerate(messages): + messages[i] = messages[i].format(orig_path=orig_path) + + monkeypatch.setattr(backup, "BACKUP_DIR", str(backup_dir)) + file_backup = backup.NewRestorableFile(str(orig_path)) + + file_backup.enabled = enabled + + file_backup.restore(rollback=rollback) + + for i, message in enumerate(messages): + assert message in caplog.records[i].message + if filename and enabled: + assert os.path.isfile(orig_path) + + @centos7 + def test_restorablefile_backup_ioerror(self, tmpdir, caplog, monkeypatch, pretend_os): + backup_dir = tmpdir.mkdir("backup") + orig_path = os.path.join(str(tmpdir), "filename") + file_for_restore = tmpdir.join("backup/filename") + file_for_restore.write("content") + + copy2 = mock.Mock(side_effect=OSError(2, "No such file or directory")) + + monkeypatch.setattr(shutil, "copy2", copy2) + monkeypatch.setattr(backup, "BACKUP_DIR", str(backup_dir)) + file_backup = backup.NewRestorableFile(str(orig_path)) + + file_backup.enabled = True + + file_backup.restore() + + assert "Error(2): No such file or directory" in caplog.records[-1].message + + @centos8 + def test_restorablefile_backup_oserror(self, tmpdir, caplog, monkeypatch, pretend_os): + backup_dir = tmpdir.mkdir("backup") + orig_path = os.path.join(str(tmpdir), "filename") + file_for_restore = tmpdir.join("backup/filename") + file_for_restore.write("content") + + copy2 = mock.Mock(side_effect=OSError(2, "No such file or directory")) + + monkeypatch.setattr(shutil, "copy2", copy2) + monkeypatch.setattr(backup, "BACKUP_DIR", str(backup_dir)) + file_backup = backup.NewRestorableFile(str(orig_path)) + + file_backup.enabled = True + + file_backup.restore() + + assert "Error(2): No such file or directory" in caplog.records[-1].message + + @pytest.mark.parametrize( + ("file", "filepath", "message"), + ( + (False, "/invalid/path", "Couldn't remove restored file /invalid/path"), + (True, "filename", "File %s removed."), + ), + ) + def test_newrestorable_file_remove(self, tmpdir, caplog, file, filepath, message): + if file: + path = tmpdir.join(filepath) + path.write("content") + path = str(path) + message = message % path + else: + path = filepath + + restorable_file = backup.NewRestorableFile(path) + restorable_file.remove() + + assert message in caplog.text + + @pytest.mark.parametrize( ("pkg_nevra", "nvra_without_epoch"), ( @@ -689,3 +912,108 @@ def test_restorable_file_remove(tmpdir, caplog, file, filepath, message): restorable_file.remove() assert message in caplog.text + + +class TestMissingFile: + @pytest.mark.parametrize( + ("exists", "expected", "message"), + ( + (True, False, "Shouldn't be called, file {filepath} is present before conversion"), + (False, True, "Marking file {filepath} as missing on system."), + ), + ) + def test_created_file_enable(self, exists, expected, tmpdir, caplog, message): + path = tmpdir.join("filename") + + if exists: + path.write("content") + + created_file = backup.MissingFile(str(path)) + + created_file.enable() + + assert created_file.enabled == expected + assert message.format(filepath=str(path)) == caplog.records[-1].message + + @pytest.mark.parametrize( + ("exists", "enabled", "message"), + ( + (True, True, "File {filepath} removed"), + (True, False, None), + (False, True, "File {filepath} wasn't created during conversion"), + ), + ) + def test_created_file_restore(self, tmpdir, exists, enabled, message, caplog): + path = tmpdir.join("filename") + + if exists: + path.write("content") + + created_file = backup.MissingFile(str(path)) + created_file.enabled = enabled + + created_file.restore() + + if enabled and exists: + assert not exists == os.path.isfile(str(path)) + else: + assert exists == os.path.isfile(str(path)) + + if enabled: + assert message.format(filepath=str(path)) == caplog.records[-1].message + else: + assert not caplog.records + + def test_created_file_restore_oserror(self, monkeypatch, tmpdir, caplog): + path = tmpdir.join("filename") + path.write("content") + + remove = mock.Mock(side_effect=OSError(2, "No such file or directory")) + monkeypatch.setattr(os, "remove", remove) + + created_file = backup.MissingFile(str(path)) + created_file.enabled = True + + created_file.restore() + + assert "Error(2): No such file or directory" in caplog.records[-1].message + + @pytest.mark.parametrize( + ("exists", "created", "message_push", "message_pop"), + ( + (False, True, "Marking file {filepath} as missing on system.", "File {filepath} removed"), + (True, False, "Shouldn't be called, file {filepath} is present before conversion", None), + ( + False, + False, + "Marking file {filepath} as missing on system.", + "File {filepath} wasn't created during conversion", + ), + ), + ) + def test_created_file_all(self, tmpdir, exists, message_push, message_pop, caplog, created): + path = tmpdir.join("filename") + + if exists: + # exists before conversion + path.write("content") + + backup_controller = backup.BackupController() + created_file = backup.MissingFile(str(path)) + + backup_controller.push(created_file) + + assert message_push.format(filepath=str(path)) == caplog.records[-1].message + + if created: + # created during conversion the file + path.write("content") + + backup_controller.pop() + + if message_pop: + assert message_pop.format(filepath=str(path)) == caplog.records[-1].message + if exists: + assert os.path.isfile(str(path)) + else: + assert not os.path.isfile(str(path))