From e8d8e84a847ea5fa48d0d0347ef26728a85d3517 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 18 May 2020 16:37:50 +0300 Subject: [PATCH 1/7] Load correctly the delegated Targets objects hierarchy Update load_repository() function to load the delegations metadata starting from 'targets' and traversing downwards the delegated roles in order to load correctly the delegations hierarchy. Signed-off-by: Teodora Sechkova --- tuf/repository_lib.py | 53 +++++++++++++++++++++++++ tuf/repository_tool.py | 89 ++++++++++++++++++------------------------ 2 files changed, 92 insertions(+), 50 deletions(-) mode change 100755 => 100644 tuf/repository_lib.py diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py old mode 100755 new mode 100644 index 0966b1f4b9..174d14f580 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -811,6 +811,59 @@ def import_ed25519_privatekey_from_file(filepath, password=None): return private_key + +def get_delegations_filenames(metadata_directory, consistent_snapshot, + storage_backend=None): + """ + Return a dictionary containing all filenames in 'metadata_directory' + except the top-level roles. + If multiple versions of a file exist because of a consistent snapshot, + only the file with biggest version prefix is included. + """ + + filenames = {} + loaded_metadata = [] + metadata_files = sorted(storage_backend.list_folder(metadata_directory), + reverse=True) + for metadata_role in metadata_files: + metadata_path = os.path.join(metadata_directory, metadata_role) + + # Strip the version number if 'consistent_snapshot' is True, + # or if 'metadata_role' is Root. + # Example: '10.django.json' --> 'django.json' + consistent_snapshot = \ + metadata_role.endswith('root.json') or consistent_snapshot == True + metadata_name, junk = _strip_version_number(metadata_role, + consistent_snapshot) + + if metadata_name.endswith(METADATA_EXTENSION): + extension_length = len(METADATA_EXTENSION) + metadata_name = metadata_name[:-extension_length] + + else: + logger.debug('Skipping file with unsupported metadata' + ' extension: ' + repr(metadata_path)) + continue + + # Skip top-level roles, only interested in delegated roles now that the + # top-level roles have already been loaded. + if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']: + continue + + # Keep a store of metadata previously loaded metadata to prevent re-loading + # duplicate versions. Duplicate versions may occur with + # 'consistent_snapshot', where the same metadata may be available in + # multiples files (the different hash is included in each filename). + if metadata_name in loaded_metadata: + continue + + filenames[metadata_name] = metadata_path + loaded_metadata.append(metadata_name) + + return filenames + + + def get_metadata_filenames(metadata_directory): """ diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index a11816e9df..44fd5c0cba 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -2393,7 +2393,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1, self._parent_targets_object.add_delegated_role(rolename, new_targets_object) - # Add 'new_targets_object' to the 'targets' role object (this object). + # Add 'new_targets_object' to the delegating role object (this object). self.add_delegated_role(rolename, new_targets_object) # Update the 'delegations' field of the current role. @@ -3104,45 +3104,24 @@ def load_repository(repository_directory, repository_name='default', # extracted fileinfo is stored in the 'meta' field of the snapshot metadata # object. targets_objects = {} - loaded_metadata = [] targets_objects['targets'] = repository.targets + # A list of delegated-delegating role pairs + delegations = [] - metadata_files = sorted(storage_backend.list_folder(metadata_directory), - reverse=True) - for metadata_role in metadata_files: + delegations_filenames = repo_lib.get_delegations_filenames(metadata_directory, + consistent_snapshot, storage_backend) - metadata_path = os.path.join(metadata_directory, metadata_role) - metadata_name = \ - metadata_path[len(metadata_directory):].lstrip(os.path.sep) + # Top-level roles are already loaded, fetch targets and get its delegations. + # Collect a list of delegated-delegating role pairs, starting from the + # top-level targets: [('role1', 'targets'), ('role2', 'targets'), ... ] + roleinfo = tuf.roledb.get_roleinfo('targets', repository_name) + for role in roleinfo['delegations']['roles']: + delegations.append([role['name'], 'targets']) - # Strip the version number if 'consistent_snapshot' is True, - # or if 'metadata_role' is Root. - # Example: '10.django.json' --> 'django.json' - consistent_snapshot = \ - metadata_role.endswith('root.json') or consistent_snapshot == True - metadata_name, junk = repo_lib._strip_version_number(metadata_name, - consistent_snapshot) - - if metadata_name.endswith(METADATA_EXTENSION): - extension_length = len(METADATA_EXTENSION) - metadata_name = metadata_name[:-extension_length] - - else: - logger.debug('Skipping file with unsupported metadata' - ' extension: ' + repr(metadata_path)) - continue - - # Skip top-level roles, only interested in delegated roles now that the - # top-level roles have already been loaded. - if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']: - continue - - # Keep a store of metadata previously loaded metadata to prevent re-loading - # duplicate versions. Duplicate versions may occur with - # 'consistent_snapshot', where the same metadata may be available in - # multiples files (the different hash is included in each filename. - if metadata_name in loaded_metadata: - continue + # Load the delegated roles by starting from 'targets' and continuously + # appending the next level delegations to the list + for rolename, delegating_role in delegations: + metadata_path = delegations_filenames[rolename] signable = None @@ -3156,9 +3135,9 @@ def load_repository(repository_directory, repository_name='default', metadata_object = signable['signed'] - # Extract the metadata attributes of 'metadata_name' and update its + # Extract the metadata attributes of 'metadata_object' and update its # corresponding roleinfo. - roleinfo = {'name': metadata_name, + roleinfo = {'name': rolename, 'signing_keyids': [], 'signatures': [], 'partial_loaded': False @@ -3170,18 +3149,26 @@ def load_repository(repository_directory, repository_name='default', roleinfo['paths'] = metadata_object['targets'] roleinfo['delegations'] = metadata_object['delegations'] - tuf.roledb.add_role(metadata_name, roleinfo, repository_name) - loaded_metadata.append(metadata_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name) + + # Generate the Targets object of the delegated role, + # add it to the top-level 'targets' object and to its + # direct delegating role object. + new_targets_object = Targets(targets_directory, rolename, + roleinfo, parent_targets_object=targets_objects['targets'], + repository_name=repository_name) + + targets_objects[rolename] = new_targets_object - # Generate the Targets objects of the delegated roles of 'metadata_name' - # and add it to the top-level 'targets' object. - new_targets_object = Targets(targets_directory, metadata_name, roleinfo, - repository_name=repository_name) - targets_object = targets_objects['targets'] - targets_objects[metadata_name] = new_targets_object + targets_objects['targets'].add_delegated_role(rolename, + new_targets_object) + targets_objects[delegating_role].add_delegated_role(rolename, + new_targets_object) - targets_object._delegated_roles[(os.path.basename(metadata_name))] = \ - new_targets_object + # Append the next level delegations to the list: + # the 'delegated' role becomes the 'delegating' + for delegation in metadata_object['delegations']['roles']: + delegations.append([delegation['name'], rolename]) # Extract the keys specified in the delegations field of the Targets # role. Add 'key_object' to the list of recognized keys. Keys may be @@ -3196,8 +3183,10 @@ def load_repository(repository_directory, repository_name='default', # that doesn't match the client's set of hash algorithms. Make sure # to only used the repo's selected hashing algorithms. hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS - securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms'] - key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata) + securesystemslib.settings.HASH_ALGORITHMS = \ + key_metadata['keyid_hash_algorithms'] + key_object, keyids = \ + securesystemslib.keys.format_metadata_to_key(key_metadata) securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms try: for keyid in keyids: # pragma: no branch From 73bff87b6813f56d09edaf672dfe089fb61cfa06 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 10 Jun 2020 17:15:45 +0300 Subject: [PATCH 2/7] Update test_load_repository Add a tests case checking if delegated Targets() objects are loaded correctly. Signed-off-by: Teodora Sechkova --- tests/test_repository_tool.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index ae3c1dfd53..f314ff4a02 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -2053,6 +2053,10 @@ def test_load_repository(self): repository = repo_tool.load_repository(repository_directory) self.assertTrue(isinstance(repository, repo_tool.Repository)) + self.assertTrue(isinstance(repository.targets('role1'), + repo_tool.Targets)) + self.assertTrue(isinstance(repository.targets('role1')('role2'), + repo_tool.Targets)) # Verify the expected roles have been loaded. See # 'tuf/tests/repository_data/repository/'. From da09a2286175a13e52babde614f122278eeb27b4 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Jun 2020 13:15:27 +0300 Subject: [PATCH 3/7] Improve get_delegations_filenames performance and readability Remove unnecessary list keeping track of loaded file names and rewrite outdated comments. Signed-off-by: Teodora Sechkova --- tuf/repository_lib.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 174d14f580..de088aa462 100644 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -822,9 +822,13 @@ def get_delegations_filenames(metadata_directory, consistent_snapshot, """ filenames = {} - loaded_metadata = [] metadata_files = sorted(storage_backend.list_folder(metadata_directory), reverse=True) + + # Iterate over role metadata files, sorted by their version-number prefix, with + # more recent versions first, and only add the most recent version of any + # (non top-level) metadata to the list of returned filenames. Note that there + # should only be one version of each file, if consistent_snapshot is False. for metadata_role in metadata_files: metadata_path = os.path.join(metadata_directory, metadata_role) @@ -845,20 +849,13 @@ def get_delegations_filenames(metadata_directory, consistent_snapshot, ' extension: ' + repr(metadata_path)) continue - # Skip top-level roles, only interested in delegated roles now that the - # top-level roles have already been loaded. + # Skip top-level roles, only interested in delegated roles. if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']: continue - # Keep a store of metadata previously loaded metadata to prevent re-loading - # duplicate versions. Duplicate versions may occur with - # 'consistent_snapshot', where the same metadata may be available in - # multiples files (the different hash is included in each filename). - if metadata_name in loaded_metadata: - continue - - filenames[metadata_name] = metadata_path - loaded_metadata.append(metadata_name) + # Prevent reloading duplicate versions if consistent_snapshot is True + if metadata_name not in filenames: + filenames[metadata_name] = metadata_path return filenames From f1a66760845dc89286eeff28fca18c0d8b37ddce Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Jun 2020 13:34:05 +0300 Subject: [PATCH 4/7] Improve delegated roles loading in load_repository() Replace the list used for the delegations graph traversal with a deque and use a set to store already loaded roles and avoid loops in case of cycles in the graph. Improve comments and readability. Signed-off-by: Teodora Sechkova --- tuf/repository_tool.py | 57 +++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 44fd5c0cba..e85d0fbbf7 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -39,6 +39,8 @@ import shutil import json +from collections import deque + import tuf import tuf.formats import tuf.roledb @@ -3100,29 +3102,50 @@ def load_repository(repository_directory, repository_name='default', repository, consistent_snapshot = repo_lib._load_top_level_metadata(repository, filenames, repository_name) - # Load the delegated targets metadata and generate their fileinfo. The - # extracted fileinfo is stored in the 'meta' field of the snapshot metadata - # object. + delegated_roles_filenames = repo_lib.get_delegations_filenames( + metadata_directory, consistent_snapshot, storage_backend) + + # Load the delegated targets metadata and their fileinfo. + # The delegated targets roles form a tree/graph which is traversed in a + # breadth-first-search manner starting from 'targets' in order to correctly + # load the delegations hierarchy. targets_objects = {} targets_objects['targets'] = repository.targets - # A list of delegated-delegating role pairs - delegations = [] - delegations_filenames = repo_lib.get_delegations_filenames(metadata_directory, - consistent_snapshot, storage_backend) + # Keep the next delegations to be loaded in a deque structure which + # has the properties of a list but is designed to have fast appends + # and pops from both ends + delegations = deque() + # A set used to keep the already loaded delegations and avoid an infinite + # loop in case of cycles in the delegations graph + loaded_delegations = set() # Top-level roles are already loaded, fetch targets and get its delegations. - # Collect a list of delegated-delegating role pairs, starting from the - # top-level targets: [('role1', 'targets'), ('role2', 'targets'), ... ] + # Store the delegations in the form of delegated-delegating role tuples, + # starting from the top-level targets: + # [('role1', 'targets'), ('role2', 'targets'), ... ] roleinfo = tuf.roledb.get_roleinfo('targets', repository_name) for role in roleinfo['delegations']['roles']: - delegations.append([role['name'], 'targets']) + delegations.append((role['name'], 'targets')) + + # Traverse the graph by appending the next delegation to the deque and + # 'pop'-ing and loading the left-most element. + while delegations: + rolename, delegating_role = delegations.popleft() + if (rolename, delegating_role) in loaded_delegations: + logger.warning('Detected cycle in the delegation graph: ' + + repr(delegating_role) + ' -> ' + + repr(rolename) + + ' is reached more than once.') + continue - # Load the delegated roles by starting from 'targets' and continuously - # appending the next level delegations to the list - for rolename, delegating_role in delegations: - metadata_path = delegations_filenames[rolename] + # Instead of adding only rolename to the set, store the already loaded + # delegated-delegating role tuples. This way a delegated role is added + # to each of its delegating roles but when the role is reached twice + # from the same delegating role an infinite loop is avoided. + loaded_delegations.add((rolename, delegating_role)) + metadata_path = delegated_roles_filenames[rolename] signable = None try: @@ -3149,8 +3172,6 @@ def load_repository(repository_directory, repository_name='default', roleinfo['paths'] = metadata_object['targets'] roleinfo['delegations'] = metadata_object['delegations'] - tuf.roledb.add_role(rolename, roleinfo, repository_name) - # Generate the Targets object of the delegated role, # add it to the top-level 'targets' object and to its # direct delegating role object. @@ -3165,10 +3186,10 @@ def load_repository(repository_directory, repository_name='default', targets_objects[delegating_role].add_delegated_role(rolename, new_targets_object) - # Append the next level delegations to the list: + # Append the next level delegations to the deque: # the 'delegated' role becomes the 'delegating' for delegation in metadata_object['delegations']['roles']: - delegations.append([delegation['name'], rolename]) + delegations.append((delegation['name'], rolename)) # Extract the keys specified in the delegations field of the Targets # role. Add 'key_object' to the list of recognized keys. Keys may be From 97eff9e1cb9583f14791aff344d0b82cee6fcb6d Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Mon, 29 Jun 2020 13:33:21 +0300 Subject: [PATCH 5/7] Reference loaded delegated targets objects by top-level targets Use the top-level targets object to reference already loaded delegated targets instead of storing them in an additional dictionary in load_repository(). Signed-off-by: Teodora Sechkova --- tuf/repository_tool.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index e85d0fbbf7..960322794b 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -3109,8 +3109,7 @@ def load_repository(repository_directory, repository_name='default', # The delegated targets roles form a tree/graph which is traversed in a # breadth-first-search manner starting from 'targets' in order to correctly # load the delegations hierarchy. - targets_objects = {} - targets_objects['targets'] = repository.targets + parent_targets_object = repository.targets # Keep the next delegations to be loaded in a deque structure which # has the properties of a list but is designed to have fast appends @@ -3176,15 +3175,14 @@ def load_repository(repository_directory, repository_name='default', # add it to the top-level 'targets' object and to its # direct delegating role object. new_targets_object = Targets(targets_directory, rolename, - roleinfo, parent_targets_object=targets_objects['targets'], + roleinfo, parent_targets_object=parent_targets_object, repository_name=repository_name) - targets_objects[rolename] = new_targets_object - - targets_objects['targets'].add_delegated_role(rolename, - new_targets_object) - targets_objects[delegating_role].add_delegated_role(rolename, + parent_targets_object.add_delegated_role(rolename, new_targets_object) + if delegating_role != 'targets': + parent_targets_object(delegating_role).add_delegated_role(rolename, + new_targets_object) # Append the next level delegations to the deque: # the 'delegated' role becomes the 'delegating' From a69208c1c762e3222b49432361d75d3c53dbc442 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 24 Jun 2020 13:40:27 +0300 Subject: [PATCH 6/7] Rename get_*_metadata_filenames functions Rename repository_lib.get_metadata_filenames() and get_delegations_filenames() to better match their functionality and tuf terminology. Signed-off-by: Teodora Sechkova --- tests/test_repository_lib.py | 13 ++++++++----- tuf/repository_lib.py | 8 ++++---- tuf/repository_tool.py | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 9e07b0be25..53ca0000d9 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -201,7 +201,7 @@ def test_import_ed25519_privatekey_from_file(self): - def test_get_metadata_filenames(self): + def test_get_top_level_metadata_filenames(self): # Test normal case. metadata_directory = os.path.join('metadata/') @@ -210,7 +210,8 @@ def test_get_metadata_filenames(self): 'snapshot.json': metadata_directory + 'snapshot.json', 'timestamp.json': metadata_directory + 'timestamp.json'} - self.assertEqual(filenames, repo_lib.get_metadata_filenames('metadata/')) + self.assertEqual(filenames, + repo_lib.get_top_level_metadata_filenames('metadata/')) # If a directory argument is not specified, the current working directory # is used. @@ -219,11 +220,13 @@ def test_get_metadata_filenames(self): 'targets.json': os.path.join(metadata_directory, 'targets.json'), 'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'), 'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')} - self.assertEqual(filenames, repo_lib.get_metadata_filenames(metadata_directory)) + self.assertEqual(filenames, + repo_lib.get_top_level_metadata_filenames(metadata_directory)) # Test improperly formatted argument. - self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_filenames, 3) + self.assertRaises(securesystemslib.exceptions.FormatError, + repo_lib.get_top_level_metadata_filenames, 3) @@ -797,7 +800,7 @@ def test__load_top_level_metadata(self): storage_backend = securesystemslib.storage.FilesystemBackend() repo_lib.write_metadata_file(signable, root_file, 8, False, storage_backend) - filenames = repo_lib.get_metadata_filenames(metadata_directory) + filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory) repository = repo_tool.create_new_repository(repository_directory, repository_name) repo_lib._load_top_level_metadata(repository, filenames, repository_name) diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index de088aa462..5f39d6edff 100644 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -812,8 +812,8 @@ def import_ed25519_privatekey_from_file(filepath, password=None): -def get_delegations_filenames(metadata_directory, consistent_snapshot, - storage_backend=None): +def get_delegated_roles_metadata_filenames(metadata_directory, + consistent_snapshot, storage_backend=None): """ Return a dictionary containing all filenames in 'metadata_directory' except the top-level roles. @@ -861,7 +861,7 @@ def get_delegations_filenames(metadata_directory, consistent_snapshot, -def get_metadata_filenames(metadata_directory): +def get_top_level_metadata_filenames(metadata_directory): """ Return a dictionary containing the filenames of the top-level roles. @@ -1826,7 +1826,7 @@ def _log_status_of_top_level_roles(targets_directory, metadata_directory, # The expected full filenames of the top-level roles needed to write them to # disk. - filenames = get_metadata_filenames(metadata_directory) + filenames = get_top_level_metadata_filenames(metadata_directory) root_filename = filenames[ROOT_FILENAME] targets_filename = filenames[TARGETS_FILENAME] snapshot_filename = filenames[SNAPSHOT_FILENAME] diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 960322794b..2bfafd032f 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -3090,7 +3090,7 @@ def load_repository(repository_directory, repository_name='default', repository = Repository(repository_directory, metadata_directory, targets_directory, storage_backend, repository_name) - filenames = repo_lib.get_metadata_filenames(metadata_directory) + filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory) # The Root file is always available without a version number (a consistent # snapshot) attached to the filename. Store the 'consistent_snapshot' value @@ -3102,7 +3102,7 @@ def load_repository(repository_directory, repository_name='default', repository, consistent_snapshot = repo_lib._load_top_level_metadata(repository, filenames, repository_name) - delegated_roles_filenames = repo_lib.get_delegations_filenames( + delegated_roles_filenames = repo_lib.get_delegated_roles_metadata_filenames( metadata_directory, consistent_snapshot, storage_backend) # Load the delegated targets metadata and their fileinfo. From 6ae3ea6d7d2aa80ba0571503a5e6c3808c44ff64 Mon Sep 17 00:00:00 2001 From: Teodora Sechkova Date: Wed, 20 May 2020 21:36:00 +0300 Subject: [PATCH 7/7] Add TOP_LEVEL_ROLES as a global variable Add TOP_LEVEL_ROLES as a global variable in roledb. Signed-off-by: Teodora Sechkova --- tuf/client/updater.py | 4 ++-- tuf/repository_lib.py | 13 +++++-------- tuf/repository_tool.py | 2 +- tuf/roledb.py | 3 +++ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/tuf/client/updater.py b/tuf/client/updater.py index c52ec661c2..2515ef66fb 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -755,7 +755,7 @@ def __init__(self, repository_name, repository_mirrors): # Load current and previous metadata. for metadata_set in ['current', 'previous']: - for metadata_role in ['root', 'targets', 'snapshot', 'timestamp']: + for metadata_role in tuf.roledb.TOP_LEVEL_ROLES: self._load_metadata_from_file(metadata_set, metadata_role) # Raise an exception if the repository is missing the required 'root' @@ -2435,7 +2435,7 @@ def all_targets(self): # all roles available on the repository. delegated_targets = [] for role in tuf.roledb.get_rolenames(self.repository_name): - if role in ['root', 'snapshot', 'targets', 'timestamp']: + if role in tuf.roledb.TOP_LEVEL_ROLES: continue else: diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 5f39d6edff..5eb88a01f0 100644 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -175,7 +175,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, else: logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.') - if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed: + if rolename in tuf.roledb.TOP_LEVEL_ROLES and not allow_partially_signed: # Verify that the top-level 'rolename' is fully signed. Only a delegated # role should not be written to disk without full verification of its # signature(s), since it can only be considered fully signed depending on @@ -394,18 +394,15 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, else: logger.debug(repr(metadata_role) + ' found in the snapshot role.') - - # Strip metadata extension from filename. The role database does not # include the metadata extension. if metadata_role.endswith(METADATA_EXTENSION): metadata_role = metadata_role[:-len(METADATA_EXTENSION)] - else: logger.debug(repr(metadata_role) + ' does not match' ' supported extension ' + repr(METADATA_EXTENSION)) - if metadata_role in ['root', 'targets', 'snapshot', 'timestamp']: + if metadata_role in tuf.roledb.TOP_LEVEL_ROLES: logger.debug('Not removing top-level metadata ' + repr(metadata_role)) return @@ -850,7 +847,7 @@ def get_delegated_roles_metadata_filenames(metadata_directory, continue # Skip top-level roles, only interested in delegated roles. - if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']: + if metadata_name in tuf.roledb.TOP_LEVEL_ROLES: continue # Prevent reloading duplicate versions if consistent_snapshot is True @@ -1131,7 +1128,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # Extract the role, threshold, and keyid information of the top-level roles, # which Root stores in its metadata. The necessary role metadata is generated # from this information. - for rolename in ['root', 'targets', 'snapshot', 'timestamp']: + for rolename in tuf.roledb.TOP_LEVEL_ROLES: # If a top-level role is missing from 'tuf.roledb.py', raise an exception. if not tuf.roledb.role_exists(rolename, repository_name): @@ -1507,7 +1504,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # snapshot and timestamp roles are not listed in snapshot.json, do not # list these roles found in the metadata directory. if tuf.roledb.role_exists(rolename, repository_name) and \ - rolename not in ['root', 'snapshot', 'timestamp', 'targets']: + rolename not in tuf.roledb.TOP_LEVEL_ROLES: fileinfodict[metadata_name] = get_metadata_versioninfo(rolename, repository_name) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 2bfafd032f..be01ad3888 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -295,7 +295,7 @@ def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False): for dirty_rolename in dirty_rolenames: # Ignore top-level roles, they will be generated later in this method. - if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']: + if dirty_rolename in tuf.roledb.TOP_LEVEL_ROLES: continue dirty_filename = os.path.join(self._metadata_directory, diff --git a/tuf/roledb.py b/tuf/roledb.py index 5931ce6748..37add72e3a 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -73,6 +73,9 @@ _dirty_roles['default'] = set() +TOP_LEVEL_ROLES = ['root', 'targets', 'snapshot', 'timestamp'] + + def create_roledb_from_root_metadata(root_metadata, repository_name='default'): """