From 277995489ca9fa911df89f0d2d13ed5a978dae5e Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 25 May 2021 18:09:12 +0300 Subject: [PATCH] New API: Comprehensive serialization testing The idea of this commit is to separate (de)serialization testing outside test_api.py and make sure we are testing from_dict/to_dict for all possible valid data for all classes. As a result of this commit, there will be a single place where one can have a quick look at what use-cases are we testing for (de)serialization. Signed-off-by: Martin Vrachev --- tests/test_api.py | 135 +-------------- tests/test_metadata_serilization.py | 259 ++++++++++++++++++++++++++++ 2 files changed, 261 insertions(+), 133 deletions(-) create mode 100644 tests/test_metadata_serilization.py diff --git a/tests/test_api.py b/tests/test_api.py index 7d00a85720..8bdb73a6d7 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -260,27 +260,6 @@ def test_metafile_class(self): self.assertEqual(metafile_obj.to_dict(), data) - def test_targetfile_class(self): - # Test from_dict and to_dict with all attributes. - data = { - "custom": { - "file_permissions": "0644" - }, - "hashes": { - "sha256": "65b8c67f51c993d898250f40aa57a317d854900b3a04895464313e48785440da", - "sha512": "467430a68afae8e9f9c0771ea5d78bf0b3a0d79a2d3d3b40c69fde4dd42c461448aef76fcef4f5284931a1ffd0ac096d138ba3a0d6ca83fa8d7285a47a296f77" - }, - "length": 31 - } - targetfile_obj = TargetFile.from_dict(copy.copy(data)) - self.assertEqual(targetfile_obj.to_dict(), data) - - # Test from_dict and to_dict without custom. - del data["custom"] - targetfile_obj = TargetFile.from_dict(copy.copy(data)) - self.assertEqual(targetfile_obj.to_dict(), data) - - def test_metadata_snapshot(self): snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') @@ -299,13 +278,6 @@ def test_metadata_snapshot(self): snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict() ) - # Test from_dict and to_dict without hashes and length. - snapshot_dict = snapshot.to_dict() - del snapshot_dict['signed']['meta']['role1.json']['length'] - del snapshot_dict['signed']['meta']['role1.json']['hashes'] - test_dict = copy.deepcopy(snapshot_dict['signed']) - snapshot = Snapshot.from_dict(test_dict) - self.assertEqual(snapshot_dict['signed'], snapshot.to_dict()) def test_metadata_timestamp(self): timestamp_path = os.path.join( @@ -344,13 +316,6 @@ def test_metadata_timestamp(self): timestamp.signed.meta['snapshot.json'].to_dict(), fileinfo.to_dict() ) - # Test from_dict and to_dict without hashes and length. - timestamp_dict = timestamp.to_dict() - del timestamp_dict['signed']['meta']['snapshot.json']['length'] - del timestamp_dict['signed']['meta']['snapshot.json']['hashes'] - test_dict = copy.deepcopy(timestamp_dict['signed']) - timestamp_test = Timestamp.from_dict(test_dict) - self.assertEqual(timestamp_dict['signed'], timestamp_test.to_dict()) def test_key_class(self): keys = { @@ -363,21 +328,12 @@ def test_key_class(self): }, } for key_dict in keys.values(): - # Testing that the workflow of deserializing and serializing - # a key dictionary doesn't change the content. - test_key_dict = key_dict.copy() - key_obj = Key.from_dict("id", test_key_dict) - self.assertEqual(key_dict, key_obj.to_dict()) # Test creating an instance without a required attribute. for key in key_dict.keys(): test_key_dict = key_dict.copy() del test_key_dict[key] with self.assertRaises(KeyError): - Key.from_dict("id", test_key_dict) - # Test creating a Key instance with wrong keyval format. - key_dict["keyval"] = {} - with self.assertRaises(ValueError): - Key.from_dict("id", key_dict) + Key.from_dict(test_key_dict) def test_role_class(self): @@ -396,23 +352,12 @@ def test_role_class(self): }, } for role_dict in roles.values(): - # Testing that the workflow of deserializing and serializing - # a role dictionary doesn't change the content. - test_role_dict = role_dict.copy() - role_obj = Role.from_dict(test_role_dict) - self.assertEqual(role_dict, role_obj.to_dict()) # Test creating an instance without a required attribute. for role_attr in role_dict.keys(): test_role_dict = role_dict.copy() del test_role_dict[role_attr] with self.assertRaises(KeyError): - Key.from_dict("id", test_role_dict) - # Test creating a Role instance with keyid dublicates. - # for keyid in role_dict["keyids"]: - role_dict["keyids"].append(role_dict["keyids"][0]) - test_role_dict = role_dict.copy() - with self.assertRaises(ValueError): - Role.from_dict(test_role_dict) + Key.from_dict(test_role_dict) def test_metadata_root(self): @@ -459,84 +404,8 @@ def test_metadata_root(self): with self.assertRaises(KeyError): root.signed.remove_key('root', 'nosuchkey') - # Test serializing and deserializing without consistent_snapshot. - root_dict = root.to_dict() - del root_dict["signed"]["consistent_snapshot"] - root = Root.from_dict(copy.deepcopy(root_dict["signed"])) - self.assertEqual(root_dict["signed"], root.to_dict()) - - def test_delegated_role_class(self): - roles = [ - { - "keyids": [ - "c8022fa1e9b9cb239a6b362bbdffa9649e61ad2cb699d2e4bc4fdf7930a0e64a" - ], - "name": "role1", - "paths": [ - "file3.txt" - ], - "terminating": False, - "threshold": 1 - } - ] - for role in roles: - # Testing that the workflow of deserializing and serializing - # a delegation role dictionary doesn't change the content. - key_obj = DelegatedRole.from_dict(role.copy()) - self.assertEqual(role, key_obj.to_dict()) - - # Test creating a DelegatedRole object with both "paths" and - # "path_hash_prefixes" set. - role["path_hash_prefixes"] = "foo" - with self.assertRaises(ValueError): - DelegatedRole.from_dict(role.copy()) - - # Test creating DelegatedRole only with "path_hash_prefixes" (an empty one) - del role["paths"] - role["path_hash_prefixes"] = [] - role_obj = DelegatedRole.from_dict(role.copy()) - self.assertEqual(role_obj.to_dict(), role) - - # Test creating DelegatedRole only with "paths" (now an empty one) - del role["path_hash_prefixes"] - role["paths"] = [] - role_obj = DelegatedRole.from_dict(role.copy()) - self.assertEqual(role_obj.to_dict(), role) - - # Test creating DelegatedRole without "paths" and - # "path_hash_prefixes" set - del role["paths"] - role_obj = DelegatedRole.from_dict(role.copy()) - self.assertEqual(role_obj.to_dict(), role) - def test_delegation_class(self): - roles = [ - { - "keyids": [ - "c8022fa1e9b9cb239a6b362bbdffa9649e61ad2cb699d2e4bc4fdf7930a0e64a" - ], - "name": "role1", - "paths": [ - "file3.txt" - ], - "terminating": False, - "threshold": 1 - } - ] - keys = { - "59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d":{ - "keytype": "ed25519", - "keyval": { - "public": "edcd0a32a07dce33f7c7873aaffbff36d20ea30787574ead335eefd337e4dacd" - }, - "scheme": "ed25519" - }, - } - delegations_dict = {"keys": keys, "roles": roles} - delegations = Delegations.from_dict(copy.deepcopy(delegations_dict)) - self.assertEqual(delegations_dict, delegations.to_dict()) - # empty keys and roles delegations_dict = {"keys":{}, "roles":[]} delegations = Delegations.from_dict(delegations_dict.copy()) diff --git a/tests/test_metadata_serilization.py b/tests/test_metadata_serilization.py new file mode 100644 index 0000000000..2b3b05cdeb --- /dev/null +++ b/tests/test_metadata_serilization.py @@ -0,0 +1,259 @@ +# Copyright New York University and the TUF contributors +# SPDX-License-Identifier: MIT OR Apache-2.0 + +""" Unit tests testing tuf/api/metadata.py classes +serialization and deserialization. + +""" + +import json +import sys +import logging +import os +import shutil +import tempfile +import unittest +import copy + +from typing import Any, Dict, List + +from tests import utils + +from tuf.api.metadata import ( + Metadata, + Root, + Snapshot, + Timestamp, + Targets, + Key, + Role, + MetaFile, + TargetFile, + Delegations, + DelegatedRole, +) + +logger = logging.getLogger(__name__) + +def copy_recur(input_dict: Dict[str, Any], ignore_attr_list: List[str]): + """Recursivly make a deep copy of the input_dict ignoring the attributes in + ignore_attr_list.""" + res_dict = {} + # Iterate over all items in the dict. + # If an item is a dict, recursivly call copy_recur to iterate its items, + # otherwise save it in res_dict. + for valname, value in input_dict.items(): + if valname in ignore_attr_list: + continue + if isinstance(value, dict): + res_dict[valname] = copy_recur(value, ignore_attr_list) + else: + res_dict[valname] = value + + return res_dict + + +def _get_metadata_dict(tmp_directory, metadata): + metadata_path = os.path.join(tmp_directory, "metadata", metadata + ".json") + with open(metadata_path) as f: + data = json.loads(f.read()) + return data["signed"] + + +class TestSerialization(unittest.TestCase): + + @classmethod + def setUpClass(cls): + # Create a temporary directory to store the repository, metadata, and + # target files. 'temporary_directory' must be deleted in + # tearDownClass() so that temporary files are always removed, even when + # exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + test_repo_data = os.path.join( + os.path.dirname(os.path.realpath(__file__)), 'repository_data') + + cls.repo_dir = os.path.join(cls.temporary_directory, 'repository') + shutil.copytree( + os.path.join(test_repo_data, 'repository'), cls.repo_dir) + + # Preparare root valid cases + root = _get_metadata_dict(cls.repo_dir, "root") + cls.valid_root_cases = { + "all_attributes": root, + "no_consistent_snapshot": copy_recur(root, ["consistent_snapshot"]) + } + # Preparare timestamp valid cases + cls.valid_timestamp_cases = { + "all_attributes": _get_metadata_dict(cls.repo_dir, "timestamp") + } + # Preparare snapshot valid cases + cls.valid_snapshot_cases = { + "all_attributes": _get_metadata_dict(cls.repo_dir, "snapshot") + } + # Preparare targets valid cases + targets = _get_metadata_dict(cls.repo_dir, "targets") + empty_targets = copy.deepcopy(targets) + empty_targets["targets"] = {} + cls.valid_targets_cases = { + "all_attributes": targets, + "empty_targets": empty_targets, + "no_delegations": copy_recur(targets, ["delegations"]) + } + + @classmethod + def tearDownClass(cls): + # Remove the temporary repository directory, which should contain all + # the metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + def test_key_serialization(self): + keys = self.valid_root_cases["all_attributes"]["keys"] + valid_key_cases = { + "all_attributes": keys + } + for case, data in valid_key_cases.items(): + with self.subTest(case=case): + for key_dict in data.values(): + key_obj = Key.from_dict(copy.deepcopy(key_dict)) + self.assertDictEqual(key_obj.to_dict(), key_dict) + + # Test creating a Key instance with wrong keyval format. + tmp_dict = copy.deepcopy(key_dict) + tmp_dict["keyval"] = {} + with self.assertRaises(ValueError): + Key.from_dict(tmp_dict) + + + def test_role_serialization(self): + roles = self.valid_root_cases["all_attributes"]["roles"] + valid_role_cases = { + "all_attributes": roles, + } + for case, data in valid_role_cases.items(): + with self.subTest(case=case): + for role_dict in data.values(): + role_obj = Role.from_dict(copy.deepcopy(role_dict)) + self.assertDictEqual(role_obj.to_dict(), role_dict) + + # Test creating a Role instance with keyid dublicates. + # for keyid in role_dict["keyids"]: + test_dict = copy.deepcopy(role_dict) + test_dict["keyids"].append(test_dict["keyids"][0]) + with self.assertRaises(ValueError): + Role.from_dict(test_dict) + + + def test_root_serialization(self): + for case, data in self.valid_root_cases.items(): + with self.subTest(case=case): + root = Root.from_dict(copy.deepcopy(data)) + self.assertDictEqual(root.to_dict(), data) + + def test_metafile_serialization(self): + meta = self.valid_timestamp_cases["all_attributes"]["meta"] + metafiles = meta["snapshot.json"] + valid_meta_file_cases = { + "all_attributes": metafiles, + "no_length": copy_recur(metafiles, ["length"]), + "no_hashes": copy_recur(metafiles, ["hashes"]) + } + for case, data in valid_meta_file_cases.items(): + with self.subTest(case=case): + metafile = MetaFile.from_dict(copy.deepcopy(data)) + self.assertDictEqual(metafile.to_dict(), data) + + def test_timestamp_serialization(self): + for case, data in self.valid_timestamp_cases.items(): + with self.subTest(case=case): + timestamp = Timestamp.from_dict(copy.deepcopy(data)) + self.assertDictEqual(timestamp.to_dict(), data) + + def test_snapshot_serialization(self): + for case, data in self.valid_snapshot_cases.items(): + with self.subTest(case=case): + snapshot = Snapshot.from_dict(copy.deepcopy(data)) + self.assertDictEqual(snapshot.to_dict(), data) + + def test_delegation_serialization(self): + delegations = self.valid_targets_cases["all_attributes"]["delegations"] + valid_delegation_cases = { + "all_attributes": delegations + } + for case, data in valid_delegation_cases.items(): + with self.subTest(case=case): + delegations = Delegations.from_dict(copy.deepcopy(data)) + self.assertDictEqual(delegations.to_dict(), data) + + + def _setup_delegated_role( + self, + delegations: Dict[str, Any], + ignore_attr: List[str] + ): + delegated_roles = delegations["roles"] + # Delegated roles is a list of dictionaries. + # That's why when setting up a new valid case we have to iterate + # through the different roles in the list. + delegated_roles_res = [] + for role in delegated_roles: + delegated_roles_res.append(copy_recur(role, ignore_attr)) + return delegated_roles_res + + + def test_delegated_role_serialization(self): + delegations = self.valid_targets_cases["all_attributes"]["delegations"] + delegated_roles = delegations["roles"] + delegated_roles_no_paths = self._setup_delegated_role( + delegations, "paths" + ) + delegated_roles_no_path_hash_prefixes = self._setup_delegated_role( + delegations, ["path_hash_prefixes"] + ) + + for role in delegated_roles_no_path_hash_prefixes: + role["paths"] = "foo" + delegated_no_optional_attr = self._setup_delegated_role( + delegations, ["paths", "path_hash_prefixes"] + ) + valid_delegatedrole_cases = { + "all_attributes": delegated_roles, + "no_paths": delegated_roles_no_paths, + "no_path_hash_prefixes": delegated_roles_no_path_hash_prefixes, + "no_optional_attributes": delegated_no_optional_attr + } + for case, data in valid_delegatedrole_cases.items(): + for role_d in data: + with self.subTest(case=case): + delegatedroles = DelegatedRole.from_dict( + copy.deepcopy(role_d) + ) + self.assertDictEqual(delegatedroles.to_dict(), role_d) + + + def test_targetfile_serialization(self): + targets = self.valid_targets_cases["all_attributes"]["targets"] + valid_targetfile_cases = { + "all_attributes": targets, + "targetfiles_no_custom": copy_recur(targets, ["custom"]) + } + for case, data in valid_targetfile_cases.items(): + for targetfile in data.values(): + with self.subTest(case=case): + targetfile_obj = TargetFile.from_dict( + copy.deepcopy(targetfile) + ) + self.assertDictEqual(targetfile_obj.to_dict(), targetfile) + + + def test_targets_serialization(self): + for case, data in self.valid_targets_cases.items(): + with self.subTest(case=case): + targets = Targets.from_dict(copy.deepcopy(data)) + self.assertDictEqual(targets.to_dict(), data) + +# Run unit test. +if __name__ == '__main__': + utils.configure_test_logging(sys.argv) + unittest.main()