diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py old mode 100755 new mode 100644 index 9ca2991f3e..ce09e2e660 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -719,10 +719,10 @@ def test_add_verification_key(self): 'consistent_snapshot': False, 'expires': expiration, 'partial_loaded': False} - tuf.roledb.add_role('Root', roleinfo, 'test_repository') - tuf.roledb.add_role('Targets', roleinfo, 'test_repository') - tuf.roledb.add_role('Snapshot', roleinfo, 'test_repository') - tuf.roledb.add_role('Timestamp', roleinfo, 'test_repository') + tuf.roledb.add_role('Root', roleinfo, repository_name='test_repository') + tuf.roledb.add_role('Targets', roleinfo, repository_name='test_repository') + tuf.roledb.add_role('Snapshot', roleinfo, repository_name='test_repository') + tuf.roledb.add_role('Timestamp', roleinfo, repository_name='test_repository') # Test for different top-level role names. self.metadata._rolename = 'Targets' diff --git a/tests/test_roledb.py b/tests/test_roledb.py old mode 100755 new mode 100644 index f036c763fd..bc0ef60409 --- a/tests/test_roledb.py +++ b/tests/test_roledb.py @@ -119,7 +119,7 @@ def test_clear_roledb(self): repository_name = 'example_repository' self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name)) tuf.roledb.clear_roledb(repository_name) self.assertFalse(tuf.roledb.role_exists(rolename, repository_name)) @@ -151,7 +151,7 @@ def test_add_role(self): self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name)) @@ -166,7 +166,8 @@ def test_add_role(self): self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, None) self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, 123) self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, ['']) - self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, 123) + self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, False, 123) + self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, []) # Test condition where the rolename already exists in the role database. @@ -175,7 +176,7 @@ def test_add_role(self): # Test where the repository name does not exist in the role database. self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.add_role, - 'new_role', roleinfo, 'non-existent') + 'new_role', roleinfo, repository_name='non-existent') # Test conditions for invalid rolenames. self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.add_role, ' badrole ', @@ -206,7 +207,7 @@ def test_role_exists(self): tuf.roledb.create_roledb(repository_name) self.assertEqual(False, tuf.roledb.role_exists(rolename, repository_name)) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertTrue(tuf.roledb.role_exists(rolename, repository_name)) # Reset the roledb so that subsequent tests have access to the original, @@ -251,7 +252,7 @@ def test_remove_role(self): self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.remove_role, rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name)) self.assertEqual(None, tuf.roledb.remove_role(rolename, repository_name)) @@ -293,8 +294,8 @@ def test_get_rolenames(self): repository_name = 'example_repository' self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_rolenames, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) - tuf.roledb.add_role(rolename2, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) + tuf.roledb.add_role(rolename2, roleinfo, repository_name=repository_name) self.assertEqual(set(['targets', 'role1']), set(tuf.roledb.get_rolenames())) @@ -328,7 +329,7 @@ def test_get_role_info(self): rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(roleinfo, tuf.roledb.get_roleinfo(rolename, repository_name)) # Verify that a roleinfo cannot be retrieved for a non-existent repository @@ -368,7 +369,7 @@ def test_get_role_keyids(self): self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_role_keyids, rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(['123'], tuf.roledb.get_role_keyids(rolename, repository_name)) # Verify that rolekeyids cannot be retrieved from a non-existent repository @@ -406,7 +407,7 @@ def test_get_role_threshold(self): self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_role_threshold, rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(roleinfo['threshold'], tuf.roledb.get_role_threshold(rolename, repository_name)) # Verify that a role's threshold cannot be retrieved from a non-existent @@ -445,7 +446,7 @@ def test_get_role_paths(self): rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename2, roleinfo2, repository_name) + tuf.roledb.add_role(rolename2, roleinfo2, repository_name=repository_name) self.assertEqual(roleinfo2['paths'], tuf.roledb.get_role_paths(rolename2, repository_name)) @@ -516,7 +517,7 @@ def test_get_delegated_rolenames(self): self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_delegated_rolenames, rolename, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) self.assertEqual(set(['django', 'tuf']), set(tuf.roledb.get_delegated_rolenames(rolename, repository_name))) @@ -635,7 +636,7 @@ def test_update_roleinfo(self): mark_role_as_dirty = True self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name) tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo, repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name) tuf.roledb.update_roleinfo(rolename, roleinfo, mark_role_as_dirty, repository_name) self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name)) @@ -667,32 +668,42 @@ def test_update_roleinfo(self): def test_get_dirty_roles(self): # Verify that the dirty roles of a role are returned. rolename = 'targets' + mark_role_as_dirty = True + roleinfo1 = {'keyids': ['123'], 'threshold': 1} - tuf.roledb.add_role(rolename, roleinfo1) + tuf.roledb.add_role(rolename, roleinfo1, mark_role_as_dirty) + # new role added: targets -> snapshot -> timestamp + self.assertEqual(['snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles()) + roleinfo2 = {'keyids': ['123'], 'threshold': 2} - mark_role_as_dirty = True tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty) # Note: The 'default' repository is searched if the repository name is # not given to get_dirty_roles(). - self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) + # update targets key: targets -> snapshot -> timestamp AND targets -> root + self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles()) # Verify that a list of dirty roles is returned for a non-default # repository. repository_name = 'example_repository' tuf.roledb.create_roledb(repository_name) - tuf.roledb.add_role(rolename, roleinfo1, repository_name) + tuf.roledb.add_role(rolename, roleinfo1, repository_name=repository_name) tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty, repository_name) - self.assertEqual([rolename], tuf.roledb.get_dirty_roles(repository_name)) + self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles(repository_name)) # Verify that dirty roles are not returned for a non-existent repository. - self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_dirty_roles, 'non-existent') + self.assertRaises(securesystemslib.exceptions.InvalidNameError, + tuf.roledb.get_dirty_roles, 'non-existent') # Reset the roledb so that subsequent tests have access to a default # roledb. tuf.roledb.remove_roledb(repository_name) # Test for improperly formatted argument. - self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.get_dirty_roles, 123) + self.assertRaises(securesystemslib.exceptions.FormatError, + tuf.roledb.get_dirty_roles, 123) @@ -703,19 +714,31 @@ def test_mark_dirty(self): tuf.roledb.add_role(rolename, roleinfo1) rolename2 = 'dirty_role' roleinfo2 = {'keyids': ['123'], 'threshold': 2} + mark_role_as_dirty = True tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty) # Note: The 'default' repository is searched if the repository name is # not given to get_dirty_roles(). - self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) - - tuf.roledb.mark_dirty(['dirty_role']) - self.assertEqual([rolename2, rolename], tuf.roledb.get_dirty_roles()) + # targets -> snapshot -> timestamp + self.assertEqual(['snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles()) + + # manually mark dirty: new role is added to the dirty list + tuf.roledb.mark_dirty([rolename2]) + self.assertEqual([rolename2, 'snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles()) + + # targets' key update: + # targets -> snapshot -> timestamp AND targets -> root + tuf.roledb.unmark_dirty(tuf.roledb.get_dirty_roles()) + tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty) + self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'], + tuf.roledb.get_dirty_roles()) # Verify that a role cannot be marked as dirty for a non-existent # repository. - self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.mark_dirty, - ['dirty_role'], 'non-existent') + self.assertRaises(securesystemslib.exceptions.InvalidNameError, + tuf.roledb.mark_dirty, ['dirty_role'], 'non-existent') @@ -727,16 +750,19 @@ def test_unmark_dirty(self): rolename2 = 'dirty_role' roleinfo2 = {'keyids': ['123'], 'threshold': 2} tuf.roledb.add_role(rolename2, roleinfo2) + mark_role_as_dirty = True tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty) # Note: The 'default' repository is searched if the repository name is # not given to get_dirty_roles(). - self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) - tuf.roledb.update_roleinfo(rolename2, roleinfo2, mark_role_as_dirty) + # update targets roleinfo marks as dirty also snapshot and timestamp + role_chain_list = ['snapshot', rolename, 'timestamp'] + self.assertEqual(role_chain_list, tuf.roledb.get_dirty_roles()) - tuf.roledb.unmark_dirty(['dirty_role']) - self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) - tuf.roledb.unmark_dirty(['targets']) + tuf.roledb.update_roleinfo(rolename2, roleinfo2, mark_role_as_dirty) + tuf.roledb.unmark_dirty([rolename2]) + self.assertEqual(role_chain_list, tuf.roledb.get_dirty_roles()) + tuf.roledb.unmark_dirty(role_chain_list) self.assertEqual([], tuf.roledb.get_dirty_roles()) # What happens for a role that isn't dirty? unmark_dirty() should just @@ -745,8 +771,8 @@ def test_unmark_dirty(self): # Verify that a role cannot be unmarked as dirty for a non-existent # repository. - self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.unmark_dirty, - ['dirty_role'], 'non-existent') + self.assertRaises(securesystemslib.exceptions.InvalidNameError, + tuf.roledb.unmark_dirty, [rolename2], 'non-existent') def _test_rolename(self, test_function): diff --git a/tests/test_slow_retrieval_attack.py b/tests/test_slow_retrieval_attack.py old mode 100755 new mode 100644 index 2c7b38c562..75febc90a1 --- a/tests/test_slow_retrieval_attack.py +++ b/tests/test_slow_retrieval_attack.py @@ -193,10 +193,11 @@ def setUp(self): targets_private = repo_tool.import_ed25519_privatekey_from_file(key_file, 'password') - repository.targets.load_signing_key(targets_private) - repository.snapshot.load_signing_key(snapshot_private) - repository.timestamp.load_signing_key(timestamp_private) - + # Mark roles as dirty to force metadata rewrite on writeall() + mark_dirty = True + repository.targets.load_signing_key(targets_private, mark_dirty) + repository.snapshot.load_signing_key(snapshot_private, mark_dirty) + repository.timestamp.load_signing_key(timestamp_private, mark_dirty) repository.writeall() # Move the staged metadata to the "live" metadata. diff --git a/tests/test_tutorial.py b/tests/test_tutorial.py index e3102c66cc..e97ea176f4 100644 --- a/tests/test_tutorial.py +++ b/tests/test_tutorial.py @@ -284,9 +284,8 @@ def test_tutorial(self): repository.dirty_roles() # Concat strings to avoid Python2/3 unicode prefix problems ('' vs. u'') mock_logger.info.assert_called_with( - "Dirty roles: " + str(['targets'])) + "Dirty roles: " + str(['snapshot', 'targets', 'timestamp'])) - repository.mark_dirty(['snapshot', 'timestamp']) repository.writeall() @@ -326,9 +325,8 @@ def test_tutorial(self): repository.dirty_roles() # Concat strings to avoid Python2/3 unicode prefix problems ('' vs. u'') mock_logger.info.assert_called_with( - "Dirty roles: " + str(['targets', 'unclaimed'])) + "Dirty roles: " + str(['snapshot', 'targets', 'timestamp', 'unclaimed'])) - repository.mark_dirty(["snapshot", "timestamp"]) repository.writeall() @@ -378,9 +376,8 @@ def test_tutorial(self): '28-2f', '30-37', '38-3f', '40-47', '48-4f', '50-57', '58-5f', '60-67', '68-6f', '70-77', '78-7f', '80-87', '88-8f', '90-97', '98-9f', 'a0-a7', 'a8-af', 'b0-b7', 'b8-bf', 'c0-c7', 'c8-cf', - 'd0-d7', 'd8-df', 'e0-e7', 'e8-ef', 'f0-f7', 'f8-ff', 'unclaimed'])) + 'd0-d7', 'd8-df', 'e0-e7', 'e8-ef', 'f0-f7', 'f8-ff', 'snapshot', 'timestamp', 'unclaimed'])) - repository.mark_dirty(["snapshot", "timestamp"]) repository.writeall() # ----- Tutorial Section: How to Perform an Update diff --git a/tests/test_updater.py b/tests/test_updater.py index 24c0f236be..ff14fb38c0 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -813,7 +813,8 @@ def test_3__get_metadata_file(self): tuf.SPECIFICATION_VERSION = '0.9.0' repository = repo_tool.load_repository(self.repository_directory) - repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) + repository.timestamp.load_signing_key( + self.role_keys['timestamp']['private'], True) repository.writeall() # Move the staged metadata to the "live" metadata. @@ -1283,10 +1284,16 @@ def test_6_download_target(self): # is now being set to true (i.e., the pre-generated repository isn't set # to support consistent snapshots. A new version of targets.json is needed # to ensure .filename target files are written to disk. - repository.targets.load_signing_key(self.role_keys['targets']['private']) - repository.root.load_signing_key(self.role_keys['root']['private']) - repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) - repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) + # Mark roles as dirty to force metadata rewrite on writeall() + mark_role_as_dirty = True + repository.targets.load_signing_key(self.role_keys['targets']['private'], + mark_role_as_dirty) + repository.root.load_signing_key(self.role_keys['root']['private'], + mark_role_as_dirty) + repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'], + mark_role_as_dirty) + repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'], + mark_role_as_dirty) repository.writeall(consistent_snapshot=True) diff --git a/tuf/client/updater.py b/tuf/client/updater.py old mode 100755 new mode 100644 index c52ec661c2..64fa50aba6 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -755,7 +755,7 @@ def __init__(self, repository_name, repository_mirrors): # Load current and previous metadata. for metadata_set in ['current', 'previous']: - for metadata_role in ['root', 'targets', 'snapshot', 'timestamp']: + for metadata_role in tuf.roledb.TOP_LEVEL_ROLES: self._load_metadata_from_file(metadata_set, metadata_role) # Raise an exception if the repository is missing the required 'root' @@ -984,7 +984,7 @@ def _import_delegations(self, parent_role): # is None. rolename = roleinfo.get('name') logger.debug('Adding delegated role: ' + str(rolename) + '.') - tuf.roledb.add_role(rolename, roleinfo, self.repository_name) + tuf.roledb.add_role(rolename, roleinfo, repository_name=self.repository_name) except tuf.exceptions.RoleAlreadyExistsError: logger.warning('Role already exists: ' + rolename) @@ -2435,7 +2435,7 @@ def all_targets(self): # all roles available on the repository. delegated_targets = [] for role in tuf.roledb.get_rolenames(self.repository_name): - if role in ['root', 'snapshot', 'targets', 'timestamp']: + if role in tuf.roledb.TOP_LEVEL_ROLES: continue else: diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py old mode 100755 new mode 100644 index 319ec862ea..ad88a09977 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -175,7 +175,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, else: logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.') - if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed: + if rolename in tuf.roledb.TOP_LEVEL_ROLES and not allow_partially_signed: # Verify that the top-level 'rolename' is fully signed. Only a delegated # role should not be written to disk without full verification of its # signature(s), since it can only be considered fully signed depending on @@ -394,18 +394,15 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, else: logger.debug(repr(metadata_role) + ' found in the snapshot role.') - - # Strip metadata extension from filename. The role database does not # include the metadata extension. if metadata_role.endswith(METADATA_EXTENSION): metadata_role = metadata_role[:-len(METADATA_EXTENSION)] - else: logger.debug(repr(metadata_role) + ' does not match' ' supported extension ' + repr(METADATA_EXTENSION)) - if metadata_role in ['root', 'targets', 'snapshot', 'timestamp']: + if metadata_role in tuf.roledb.TOP_LEVEL_ROLES: logger.debug('Not removing top-level metadata ' + repr(metadata_role)) return @@ -812,7 +809,51 @@ def import_ed25519_privatekey_from_file(filepath, password=None): return private_key -def get_metadata_filenames(metadata_directory): + +def get_delegations_filenames(metadata_directory, consistent_snapshot, + storage_backend=None): + """ + Return a dictionary containing all filenames in 'metadata_directory' + except the top-level roles. + """ + + filenames = {} + metadata_files = sorted(storage_backend.list_folder(metadata_directory), + reverse=True) + for metadata_role in metadata_files: + metadata_path = os.path.join(metadata_directory, metadata_role) + metadata_name = \ + metadata_path[len(metadata_directory):].lstrip(os.path.sep) + + # Strip the version number if 'consistent_snapshot' is True, + # or if 'metadata_role' is Root. + # Example: '10.django.json' --> 'django.json' + consistent_snapshot = \ + metadata_role.endswith('root.json') or consistent_snapshot == True + metadata_name, junk = _strip_version_number(metadata_name, + consistent_snapshot) + + if metadata_name.endswith(METADATA_EXTENSION): + extension_length = len(METADATA_EXTENSION) + metadata_name = metadata_name[:-extension_length] + + else: + logger.debug('Skipping file with unsupported metadata' + ' extension: ' + repr(metadata_path)) + continue + + # Skip top-level roles, only interested in delegated roles now that the + # top-level roles have already been loaded. + if metadata_name in tuf.roledb.TOP_LEVEL_ROLES: + continue + + filenames[metadata_name] = metadata_path + + return filenames + + + +def get_metadata_filenames(metadata_directory=None): """ Return a dictionary containing the filenames of the top-level roles. @@ -1082,7 +1123,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # Extract the role, threshold, and keyid information of the top-level roles, # which Root stores in its metadata. The necessary role metadata is generated # from this information. - for rolename in ['root', 'targets', 'snapshot', 'timestamp']: + for rolename in tuf.roledb.TOP_LEVEL_ROLES: # If a top-level role is missing from 'tuf.roledb.py', raise an exception. if not tuf.roledb.role_exists(rolename, repository_name): @@ -1444,7 +1485,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # snapshot and timestamp roles are not listed in snapshot.json, do not # list these roles found in the metadata directory. if tuf.roledb.role_exists(rolename, repository_name) and \ - rolename not in ['root', 'snapshot', 'timestamp', 'targets']: + rolename not in tuf.roledb.TOP_LEVEL_ROLES: fileinfodict[metadata_name] = get_metadata_versioninfo(rolename, repository_name) @@ -1761,130 +1802,66 @@ def _log_status_of_top_level_roles(targets_directory, metadata_directory, should be a directory in a temporary repository directory. """ + _log_role_keys_status(repository_name) + # The expected full filenames of the top-level roles needed to write them to # disk. filenames = get_metadata_filenames(metadata_directory) - root_filename = filenames[ROOT_FILENAME] - targets_filename = filenames[TARGETS_FILENAME] - snapshot_filename = filenames[SNAPSHOT_FILENAME] - timestamp_filename = filenames[TIMESTAMP_FILENAME] - - # Verify that the top-level roles contain a valid number of public keys and - # that their corresponding private keys have been loaded. - for rolename in ['root', 'targets', 'snapshot', 'timestamp']: - try: - _check_role_keys(rolename, repository_name) - - except tuf.exceptions.InsufficientKeysError as e: - logger.info(str(e)) # Do the top-level roles contain a valid threshold of signatures? Top-level # metadata is verified in Root -> Targets -> Snapshot -> Timestamp order. # Verify the metadata of the Root role. dirty_rolenames = tuf.roledb.get_dirty_roles(repository_name) - root_roleinfo = tuf.roledb.get_roleinfo('root', repository_name) - root_is_dirty = None - if 'root' in dirty_rolenames: - root_is_dirty = True - - else: - root_is_dirty = False + for rolename in tuf.roledb.TOP_LEVEL_ROLES: - try: - signable, root_filename = \ - _generate_and_write_metadata('root', root_filename, targets_directory, - metadata_directory, storage_backend, repository_name=repository_name) - _log_status('root', signable, repository_name) - - # 'tuf.exceptions.UnsignedMetadataError' raised if metadata contains an - # invalid threshold of signatures. log the valid/threshold message, where - # valid < threshold. - except tuf.exceptions.UnsignedMetadataError as e: - _log_status('root', e.signable, repository_name) - return - - finally: - tuf.roledb.unmark_dirty(['root'], repository_name) - tuf.roledb.update_roleinfo('root', root_roleinfo, - mark_role_as_dirty=root_is_dirty, repository_name=repository_name) - - # Verify the metadata of the Targets role. - targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name) - targets_is_dirty = None - if 'targets' in dirty_rolenames: - targets_is_dirty = True + listed_filenames = None + if rolename == 'snapshot': + listed_filenames = {'root': filenames[ROOT_FILENAME], + 'targets': filenames[TARGETS_FILENAME]} + if rolename == 'timestamp': + listed_filenames = {'snapshot': filenames[SNAPSHOT_FILENAME]} - else: - targets_is_dirty = False + # Verify the metadata of the Timestamp role. + roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name) + role_filename = filenames[rolename + METADATA_EXTENSION] - try: - signable, targets_filename = \ - _generate_and_write_metadata('targets', targets_filename, - targets_directory, metadata_directory, storage_backend, - repository_name=repository_name) - _log_status('targets', signable, repository_name) + try: + signable, role_filename = \ + _generate_and_write_metadata(rolename, role_filename, + targets_directory, metadata_directory, storage_backend, + filenames=listed_filenames, + repository_name=repository_name) + _log_status(rolename, signable, repository_name) + + except tuf.exceptions.UnsignedMetadataError as e: + _log_status(rolename, e.signable, repository_name) + return - except tuf.exceptions.UnsignedMetadataError as e: - _log_status('targets', e.signable, repository_name) - return + finally: + # recover the metadata state + tuf.roledb.unmark_dirty(tuf.roledb.TOP_LEVEL_ROLES, repository_name) + tuf.roledb.mark_dirty(dirty_rolenames, repository_name) + tuf.roledb.update_roleinfo(rolename, roleinfo, + mark_role_as_dirty=False, repository_name=repository_name) - finally: - tuf.roledb.unmark_dirty(['targets'], repository_name) - tuf.roledb.update_roleinfo('targets', targets_roleinfo, - mark_role_as_dirty=targets_is_dirty, repository_name=repository_name) - # Verify the metadata of the snapshot role. - snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name) - snapshot_is_dirty = None - if 'snapshot' in dirty_rolenames: - snapshot_is_dirty = True - else: - snapshot_is_dirty = False - filenames = {'root': root_filename, 'targets': targets_filename} - try: - signable, snapshot_filename = \ - _generate_and_write_metadata('snapshot', snapshot_filename, - targets_directory, metadata_directory, storage_backend, False, - filenames, repository_name=repository_name) - _log_status('snapshot', signable, repository_name) - - except tuf.exceptions.UnsignedMetadataError as e: - _log_status('snapshot', e.signable, repository_name) - return - - finally: - tuf.roledb.unmark_dirty(['snapshot'], repository_name) - tuf.roledb.update_roleinfo('snapshot', snapshot_roleinfo, - mark_role_as_dirty=snapshot_is_dirty, repository_name=repository_name) - - # Verify the metadata of the Timestamp role. - timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name) - timestamp_is_dirty = None - if 'timestamp' in dirty_rolenames: - timestamp_is_dirty = True +def _log_role_keys_status(repository_name): + """ + Verifies that the top-level roles contain a valid number of public keys and + that their corresponding private keys have been loaded. + """ - else: - timestamp_is_dirty = False + # Verify that the top-level roles contain a valid number of public keys and + # that their corresponding private keys have been loaded. + for rolename in tuf.roledb.TOP_LEVEL_ROLES: + try: + _check_role_keys(rolename, repository_name) - filenames = {'snapshot': snapshot_filename} - try: - signable, timestamp_filename = \ - _generate_and_write_metadata('timestamp', timestamp_filename, - targets_directory, metadata_directory, storage_backend, - False, filenames, repository_name=repository_name) - _log_status('timestamp', signable, repository_name) - - except tuf.exceptions.UnsignedMetadataError as e: - _log_status('timestamp', e.signable, repository_name) - return - - finally: - tuf.roledb.unmark_dirty(['timestamp'], repository_name) - tuf.roledb.update_roleinfo('timestamp', timestamp_roleinfo, - mark_role_as_dirty=timestamp_is_dirty, repository_name=repository_name) + except tuf.exceptions.InsufficientKeysError as e: + logger.info(str(e)) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py old mode 100755 new mode 100644 index 6e2e967f0a..60e5486f2b --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -293,7 +293,7 @@ def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False): for dirty_rolename in dirty_rolenames: # Ignore top-level roles, they will be generated later in this method. - if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']: + if dirty_rolename in tuf.roledb.TOP_LEVEL_ROLES: continue dirty_filename = os.path.join(self._metadata_directory, @@ -815,7 +815,7 @@ def remove_verification_key(self, key): - def load_signing_key(self, key): + def load_signing_key(self, key, mark_role_as_dirty=False): """ Load the role key, which must contain the private portion, so that role @@ -833,6 +833,12 @@ def load_signing_key(self, key): generated when writeall() or write() is eventually called to generate valid metadata files. + mark_role_as_dirty: + A boolean indicating whether the updated 'roleinfo' for 'rolename' + should be marked as dirty. The caller might not want to mark + 'rolename' as dirty if it is loading metadata from disk and only wants + to populate roledb.py. + securesystemslib.exceptions.FormatError, if 'key' is improperly formatted. @@ -870,12 +876,12 @@ def load_signing_key(self, key): if key['keyid'] not in roleinfo['signing_keyids']: roleinfo['signing_keyids'].append(key['keyid']) - tuf.roledb.update_roleinfo(self.rolename, roleinfo, + tuf.roledb.update_roleinfo(self.rolename, roleinfo, mark_role_as_dirty, repository_name=self._repository_name) - def unload_signing_key(self, key): + def unload_signing_key(self, key, mark_role_as_dirty=False): """ Remove a previously loaded role private key (i.e., load_signing_key()). @@ -891,6 +897,12 @@ def unload_signing_key(self, key): The role key to be unloaded, conformant to 'securesystemslib.formats.ANYKEY_SCHEMA'. + mark_role_as_dirty: + A boolean indicating whether the updated 'roleinfo' for 'rolename' + should be marked as dirty. The caller might not want to mark + 'rolename' as dirty if it is loading metadata from disk and only wants + to populate roledb.py. + securesystemslib.exceptions.FormatError, if the 'key' argument is improperly formatted. @@ -920,7 +932,7 @@ def unload_signing_key(self, key): if key['keyid'] in roleinfo['signing_keyids']: roleinfo['signing_keyids'].remove(key['keyid']) - tuf.roledb.update_roleinfo(self.rolename, roleinfo, + tuf.roledb.update_roleinfo(self.rolename, roleinfo, mark_role_as_dirty, repository_name=self._repository_name) else: @@ -949,10 +961,7 @@ def add_signature(self, signature, mark_role_as_dirty=True): A boolean indicating whether the updated 'roleinfo' for 'rolename' should be marked as dirty. The caller might not want to mark 'rolename' as dirty if it is loading metadata from disk and only wants - to populate roledb.py. Likewise, add_role() would support a similar - boolean to allow the repository tools to successfully load roles via - load_repository() without needing to mark these roles as dirty (default - behavior). + to populate roledb.py. securesystemslib.exceptions.FormatError, if the 'signature' argument is @@ -991,7 +1000,7 @@ def add_signature(self, signature, mark_role_as_dirty=True): - def remove_signature(self, signature): + def remove_signature(self, signature, mark_role_as_dirty=True): """ Remove a previously loaded, or added, role 'signature'. A role must @@ -1006,6 +1015,12 @@ def remove_signature(self, signature): The role signature to remove, conformant to 'securesystemslib.formats.SIGNATURE_SCHEMA'. + mark_role_as_dirty: + A boolean indicating whether the updated 'roleinfo' for 'rolename' + should be marked as dirty. The caller might not want to mark + 'rolename' as dirty if it is loading metadata from disk and only wants + to populate roledb.py. + securesystemslib.exceptions.FormatError, if the 'signature' argument is improperly formatted. @@ -1031,7 +1046,7 @@ def remove_signature(self, signature): if signature in roleinfo['signatures']: roleinfo['signatures'].remove(signature) - tuf.roledb.update_roleinfo(self.rolename, roleinfo, + tuf.roledb.update_roleinfo(self.rolename, roleinfo, mark_role_as_dirty, repository_name=self._repository_name) else: @@ -1450,7 +1465,8 @@ def __init__(self, repository_name): 'signatures': [], 'version': 0, 'consistent_snapshot': False, 'expires': expiration, 'partial_loaded': False} try: - tuf.roledb.add_role(self._rolename, roleinfo, self._repository_name) + tuf.roledb.add_role(self._rolename, roleinfo, + repository_name=self._repository_name) except tuf.exceptions.RoleAlreadyExistsError: pass @@ -1519,7 +1535,8 @@ def __init__(self, repository_name): 'partial_loaded': False} try: - tuf.roledb.add_role(self.rolename, roleinfo, self._repository_name) + tuf.roledb.add_role(self.rolename, roleinfo, + repository_name=self._repository_name) except tuf.exceptions.RoleAlreadyExistsError: pass @@ -1582,7 +1599,8 @@ def __init__(self, repository_name): 'partial_loaded': False} try: - tuf.roledb.add_role(self._rolename, roleinfo, self._repository_name) + tuf.roledb.add_role(self._rolename, roleinfo, + repository_name=self._repository_name) except tuf.exceptions.RoleAlreadyExistsError: pass @@ -1647,7 +1665,8 @@ class Targets(Metadata): """ def __init__(self, targets_directory, rolename='targets', roleinfo=None, - parent_targets_object=None, repository_name='default'): + parent_targets_object=None, mark_role_as_dirty=False, + repository_name='default'): # Do the arguments have the correct format? # Ensure the arguments have the appropriate number of objects and object @@ -1690,7 +1709,8 @@ def __init__(self, targets_directory, rolename='targets', roleinfo=None, # Add the new role to the 'tuf.roledb'. try: - tuf.roledb.add_role(self.rolename, roleinfo, self._repository_name) + tuf.roledb.add_role(self.rolename, roleinfo, mark_role_as_dirty, + repository_name=self._repository_name) except tuf.exceptions.RoleAlreadyExistsError: pass @@ -2210,11 +2230,13 @@ def _create_delegated_target(self, rolename, keyids, threshold, paths): roleinfo = {'name': rolename, 'keyids': keyids, 'signing_keyids': [], 'threshold': threshold, 'version': 0, 'expires': expiration, 'signatures': [], 'partial_loaded': False, - 'paths': paths, 'delegations': {'keys': {}, 'roles': []}} + 'paths': paths, 'delegations': {'keys': {}, 'roles': []}, + 'delegating_role': self.rolename} # The new targets object is added as an attribute to this Targets object. new_targets_object = Targets(self._targets_directory, rolename, roleinfo, parent_targets_object=self._parent_targets_object, + mark_role_as_dirty=True, repository_name=self._repository_name) return new_targets_object @@ -2393,7 +2415,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1, self._parent_targets_object.add_delegated_role(rolename, new_targets_object) - # Add 'new_targets_object' to the 'targets' role object (this object). + # Add 'new_targets_object' to the delegating role object (this object). self.add_delegated_role(rolename, new_targets_object) # Update the 'delegations' field of the current role. @@ -3105,42 +3127,27 @@ def load_repository(repository_directory, repository_name='default', targets_objects = {} loaded_metadata = [] targets_objects['targets'] = repository.targets - - metadata_files = sorted(storage_backend.list_folder(metadata_directory), - reverse=True) - for metadata_role in metadata_files: - - metadata_path = os.path.join(metadata_directory, metadata_role) - metadata_name = \ - metadata_path[len(metadata_directory):].lstrip(os.path.sep) - - # Strip the version number if 'consistent_snapshot' is True, - # or if 'metadata_role' is Root. - # Example: '10.django.json' --> 'django.json' - consistent_snapshot = \ - metadata_role.endswith('root.json') or consistent_snapshot == True - metadata_name, junk = repo_lib._strip_version_number(metadata_name, - consistent_snapshot) - - if metadata_name.endswith(METADATA_EXTENSION): - extension_length = len(METADATA_EXTENSION) - metadata_name = metadata_name[:-extension_length] - - else: - logger.debug('Skipping file with unsupported metadata' - ' extension: ' + repr(metadata_path)) - continue - - # Skip top-level roles, only interested in delegated roles now that the - # top-level roles have already been loaded. - if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']: - continue - + # A list of delegation pairs + delegations = [] + + # Top-level roles are already loaded, fetch targets and get its delegations + # Collect a list of delegation pairs delegated-delegating role. + roleinfo = tuf.roledb.get_roleinfo('targets', repository_name) + for delegation in roleinfo['delegations']['roles']: + delegations.append([delegation['name'], 'targets']) + + filenames = repo_lib.get_delegations_filenames(metadata_directory, + consistent_snapshot, storage_backend) + + # Load the delegated roles by starting from 'targets' and continuously + # adding the next level delegations to the list + for delegated_rolename, delegating_role in delegations: + metadata_path = filenames[delegated_rolename] # Keep a store of metadata previously loaded metadata to prevent re-loading # duplicate versions. Duplicate versions may occur with # 'consistent_snapshot', where the same metadata may be available in # multiples files (the different hash is included in each filename. - if metadata_name in loaded_metadata: + if delegated_rolename in loaded_metadata: continue signable = None @@ -3155,14 +3162,13 @@ def load_repository(repository_directory, repository_name='default', metadata_object = signable['signed'] - # Extract the metadata attributes of 'metadata_name' and update its + # Extract the metadata attributes of 'delegated_rolename' and update its # corresponding roleinfo. - roleinfo = {'name': metadata_name, + roleinfo = {'name': delegated_rolename, 'signing_keyids': [], 'signatures': [], 'partial_loaded': False, - 'paths': {}, - } + 'paths': {}} roleinfo['signatures'].extend(signable['signatures']) roleinfo['version'] = metadata_object['version'] @@ -3171,18 +3177,27 @@ def load_repository(repository_directory, repository_name='default', for filepath, fileinfo in six.iteritems(metadata_object['targets']): roleinfo['paths'].update({filepath: fileinfo.get('custom', {})}) roleinfo['delegations'] = metadata_object['delegations'] + roleinfo['delegating_role'] = delegating_role + + tuf.roledb.add_role(delegated_rolename, roleinfo, + repository_name=repository_name) + loaded_metadata.append(delegated_rolename) - tuf.roledb.add_role(metadata_name, roleinfo, repository_name) - loaded_metadata.append(metadata_name) + # Add the next level delegations to the list: + # the 'delegated' role become the 'delegating' + for delegation in metadata_object['delegations']['roles']: + delegations.append([delegation['name'], delegated_rolename]) - # Generate the Targets objects of the delegated roles of 'metadata_name' + # Generate the Targets objects of the delegated role # and add it to the top-level 'targets' object. - new_targets_object = Targets(targets_directory, metadata_name, roleinfo, - repository_name=repository_name) - targets_object = targets_objects['targets'] - targets_objects[metadata_name] = new_targets_object + new_targets_object = Targets(targets_directory, delegated_rolename, + roleinfo, repository_name=repository_name) + + targets_objects[delegated_rolename] = new_targets_object - targets_object._delegated_roles[(os.path.basename(metadata_name))] = \ + targets_objects['targets']._delegated_roles[delegated_rolename] = \ + new_targets_object + targets_objects[delegating_role]._delegated_roles[delegated_rolename] = \ new_targets_object # Extract the keys specified in the delegations field of the Targets @@ -3210,6 +3225,7 @@ def load_repository(repository_directory, repository_name='default', except tuf.exceptions.KeyAlreadyExistsError: pass + return repository diff --git a/tuf/roledb.py b/tuf/roledb.py old mode 100755 new mode 100644 index 5931ce6748..d602fb6525 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -72,6 +72,24 @@ _dirty_roles = {} _dirty_roles['default'] = set() +# A dictionary representing the relations between roles in an update chain. +# e.g _upper_level_role['snapshot'] gives the next role that needs to be +# marked as dirty in case of a version update of 'snapshot'. +_upper_level_role = {'root': None, + 'timestamp': None, + 'snapshot': 'timestamp', + 'targets': 'snapshot'} + +# A dictionary representing a reverse graph of the top-level roles delegations +# Given a delegated role as a key, it gives the delegating role. +# e.g 'root' = _delegating_role['snapshot'] +_delegating_role = {'root': None, + 'timestamp': 'root', + 'snapshot': 'root', + 'targets': 'root'} + +TOP_LEVEL_ROLES = ['root', 'targets', 'snapshot', 'timestamp'] + def create_roledb_from_root_metadata(root_metadata, repository_name='default'): """ @@ -145,7 +163,7 @@ def create_roledb_from_root_metadata(root_metadata, repository_name='default'): roleinfo['paths'] = {} roleinfo['delegations'] = {'keys': {}, 'roles': []} - add_role(rolename, roleinfo, repository_name) + add_role(rolename, roleinfo, repository_name=repository_name) @@ -241,7 +259,7 @@ def remove_roledb(repository_name): -def add_role(rolename, roleinfo, repository_name='default'): +def add_role(rolename, roleinfo, mark_role_as_dirty=False, repository_name='default'): """ Add to the role database the 'roleinfo' associated with 'rolename'. @@ -267,6 +285,12 @@ def add_role(rolename, roleinfo, repository_name='default'): The 'target' role has an additional 'paths' key. Its value is a list of strings representing the path of the target file(s). + mark_role_as_dirty: + A boolean indicating whether the updated 'roleinfo' for 'rolename' should + be marked as dirty. The caller might not want to mark 'rolename' as + dirty if it is loading metadata from disk and only wants to populate + roledb.py. + repository_name: The name of the repository to store 'rolename'. If not supplied, 'rolename' is added to the 'default' repository. @@ -296,6 +320,8 @@ def add_role(rolename, roleinfo, repository_name='default'): # Does 'roleinfo' have the correct object format? tuf.formats.ROLEDB_SCHEMA.check_match(roleinfo) + tuf.formats.BOOLEAN_SCHEMA.check_match(mark_role_as_dirty) + # Is 'repository_name' correctly formatted? securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) @@ -312,6 +338,9 @@ def add_role(rolename, roleinfo, repository_name='default'): _roledb_dict[repository_name][rolename] = copy.deepcopy(roleinfo) + if mark_role_as_dirty: + _propagate_dirty_roles(rolename, False, repository_name) + @@ -347,9 +376,7 @@ def update_roleinfo(rolename, roleinfo, mark_role_as_dirty=True, repository_name A boolean indicating whether the updated 'roleinfo' for 'rolename' should be marked as dirty. The caller might not want to mark 'rolename' as dirty if it is loading metadata from disk and only wants to populate - roledb.py. Likewise, add_role() would support a similar boolean to allow - the repository tools to successfully load roles via load_repository() - without needing to mark these roles as dirty (default behavior). + roledb.py. repository_name: The name of the repository to update the roleinfo of 'rolename'. If not @@ -395,14 +422,94 @@ def update_roleinfo(rolename, roleinfo, mark_role_as_dirty=True, repository_name if rolename not in _roledb_dict[repository_name]: raise tuf.exceptions.UnknownRoleError('Role does not exist: ' + rolename) + if mark_role_as_dirty: + # Check if the update is triggered by a key update and use it to + # determine the chain of roles affected by the update + key_update = _is_key_update(rolename, roleinfo, repository_name) + # Update the global _roledb_dict and _dirty_roles structures so that # the latest 'roleinfo' is available to other modules, and the repository # tools know which roles should be saved to disk. _roledb_dict[repository_name][rolename] = copy.deepcopy(roleinfo) if mark_role_as_dirty: - _dirty_roles[repository_name].add(rolename) + # Recursively mark as dirty all roles affected by this roledb update + _propagate_dirty_roles(rolename, key_update, repository_name) + + + +def _is_key_update(rolename, roleinfo, repository='default'): + """ + Checks roleinfo against the rolename's 'keyids' and 'threshold' stored + in roledb. Returns True if roleinfo contains different values. + """ + + key_update = False + try: + if roleinfo['keyids'] != _roledb_dict[repository][rolename]['keyids'] or \ + roleinfo['threshold'] != _roledb_dict[repository][rolename]['threshold']: + key_update = True + + except KeyError: + pass + + return key_update + + + +def _propagate_dirty_roles(rolename, key_update, repository_name='default'): + """ + + Recursively mark as dirty all roles affected by the roledb update + of rolename e.g.: + 'targets' -> 'snapshot' -> 'timestamp' + and in case of a key update: + 'targets' -> 'root' + + + rolename: + An object representing the role's name, conformant to 'ROLENAME_SCHEMA' + (e.g., 'root', 'snapshot', 'timestamp'). + + key_update: + a boolean indicating if the delegation chain of roles should + be triggered by this update + + repository_name: + The name of the repository to get the dirty roles. If not supplied, the + 'default' repository is searched. + + + None. + """ + + global _dirty_roles + + # Recursion exit case + if rolename is None: + return + + # Add the role to the global dirty roles dictionary + _dirty_roles[repository_name].add(rolename) + + # Check for the next role affected by the update of the current role + try: + # in case of top level roles, the rolenames are already known + # so look them up in the dictionaries + delegating_role = _delegating_role[rolename] + upper_level_role = _upper_level_role[rolename] + except KeyError: + # delegations + upper_level_role = 'snapshot' + delegating_role = \ + _roledb_dict[repository_name][rolename].get('delegating_role') + if key_update: + # In case of a key update propagate the update to the delegating role + _propagate_dirty_roles(delegating_role, False, repository_name) + + # Continue with the next level role + _propagate_dirty_roles(upper_level_role, False, repository_name) @@ -437,9 +544,6 @@ def get_dirty_roles(repository_name='default'): # 'securesystemslib.exceptions.FormatError' if not. securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) - global _roledb_dict - global _dirty_roles - if repository_name not in _roledb_dict or repository_name not in _dirty_roles: raise securesystemslib.exceptions.InvalidNameError('Repository name does' ' not' ' exist: ' + repository_name) @@ -480,7 +584,6 @@ def mark_dirty(roles, repository_name='default'): securesystemslib.formats.NAMES_SCHEMA.check_match(roles) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) - global _roledb_dict global _dirty_roles if repository_name not in _roledb_dict or repository_name not in _dirty_roles: @@ -523,7 +626,6 @@ def unmark_dirty(roles, repository_name='default'): securesystemslib.formats.NAMES_SCHEMA.check_match(roles) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) - global _roledb_dict global _dirty_roles if repository_name not in _roledb_dict or repository_name not in _dirty_roles: @@ -629,7 +731,6 @@ def remove_role(rolename, repository_name='default'): _check_rolename(rolename, repository_name) global _roledb_dict - global _dirty_roles # 'rolename' was verified to exist in _check_rolename(). # Remove 'rolename' now. @@ -667,9 +768,6 @@ def get_rolenames(repository_name='default'): # 'securesystemslib.exceptions.FormatError' if it is improperly formatted. securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) - global _roledb_dict - global _dirty_roles - if repository_name not in _roledb_dict or repository_name not in _dirty_roles: raise securesystemslib.exceptions.InvalidNameError('Repository name does' ' not' ' exist: ' + repository_name) @@ -730,9 +828,6 @@ def get_roleinfo(rolename, repository_name='default'): # securesystemslib.exceptions.InvalidNameError. _check_rolename(rolename, repository_name) - global _roledb_dict - global _dirty_roles - return copy.deepcopy(_roledb_dict[repository_name][rolename]) @@ -783,9 +878,6 @@ def get_role_keyids(rolename, repository_name='default'): # securesystemslib.exceptions.InvalidNameError. _check_rolename(rolename, repository_name) - global _roledb_dict - global _dirty_roles - roleinfo = _roledb_dict[repository_name][rolename] return roleinfo['keyids'] @@ -835,9 +927,6 @@ def get_role_threshold(rolename, repository_name='default'): # securesystemslib.exceptions.InvalidNameError. _check_rolename(rolename, repository_name) - global _roledb_dict - global _dirty_roles - roleinfo = _roledb_dict[repository_name][rolename] return roleinfo['threshold'] @@ -886,9 +975,6 @@ def get_role_paths(rolename, repository_name='default'): # securesystemslib.exceptions.InvalidNameError. _check_rolename(rolename, repository_name) - global _roledb_dict - global _dirty_roles - roleinfo = _roledb_dict[repository_name][rolename] # Paths won't exist for non-target roles. @@ -946,9 +1032,6 @@ def get_delegated_rolenames(rolename, repository_name='default'): # securesystemslib.exceptions.InvalidNameError. _check_rolename(rolename, repository_name) - global _roledb_dict - global _dirty_roles - # get_roleinfo() raises a 'securesystemslib.exceptions.InvalidNameError' if # 'repository_name' does not exist in the role database. roleinfo = get_roleinfo(rolename, repository_name) @@ -1035,9 +1118,6 @@ def _check_rolename(rolename, repository_name='default'): # Raises securesystemslib.exceptions.InvalidNameError. _validate_rolename(rolename) - global _roledb_dict - global _dirty_roles - if repository_name not in _roledb_dict or repository_name not in _dirty_roles: raise securesystemslib.exceptions.InvalidNameError('Repository name does not' ' exist: ' + repository_name)