Skip to content

Commit

Permalink
Merge pull request #1052 from sechkova/issue-1045
Browse files Browse the repository at this point in the history
Load correctly the delegated Targets objects hierarchy
  • Loading branch information
lukpueh authored Jun 29, 2020
2 parents 5d16f91 + 6ae3ea6 commit 49031b4
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 75 deletions.
13 changes: 8 additions & 5 deletions tests/test_repository_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def test_import_ed25519_privatekey_from_file(self):



def test_get_metadata_filenames(self):
def test_get_top_level_metadata_filenames(self):

# Test normal case.
metadata_directory = os.path.join('metadata/')
Expand All @@ -210,7 +210,8 @@ def test_get_metadata_filenames(self):
'snapshot.json': metadata_directory + 'snapshot.json',
'timestamp.json': metadata_directory + 'timestamp.json'}

self.assertEqual(filenames, repo_lib.get_metadata_filenames('metadata/'))
self.assertEqual(filenames,
repo_lib.get_top_level_metadata_filenames('metadata/'))

# If a directory argument is not specified, the current working directory
# is used.
Expand All @@ -219,11 +220,13 @@ def test_get_metadata_filenames(self):
'targets.json': os.path.join(metadata_directory, 'targets.json'),
'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'),
'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')}
self.assertEqual(filenames, repo_lib.get_metadata_filenames(metadata_directory))
self.assertEqual(filenames,
repo_lib.get_top_level_metadata_filenames(metadata_directory))


# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_filenames, 3)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.get_top_level_metadata_filenames, 3)



Expand Down Expand Up @@ -797,7 +800,7 @@ def test__load_top_level_metadata(self):
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib.write_metadata_file(signable, root_file, 8, False, storage_backend)

filenames = repo_lib.get_metadata_filenames(metadata_directory)
filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory)
repository = repo_tool.create_new_repository(repository_directory, repository_name)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)

Expand Down
4 changes: 4 additions & 0 deletions tests/test_repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,10 @@ def test_load_repository(self):

repository = repo_tool.load_repository(repository_directory)
self.assertTrue(isinstance(repository, repo_tool.Repository))
self.assertTrue(isinstance(repository.targets('role1'),
repo_tool.Targets))
self.assertTrue(isinstance(repository.targets('role1')('role2'),
repo_tool.Targets))

# Verify the expected roles have been loaded. See
# 'tuf/tests/repository_data/repository/'.
Expand Down
4 changes: 2 additions & 2 deletions tuf/client/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ def __init__(self, repository_name, repository_mirrors):

# Load current and previous metadata.
for metadata_set in ['current', 'previous']:
for metadata_role in ['root', 'targets', 'snapshot', 'timestamp']:
for metadata_role in tuf.roledb.TOP_LEVEL_ROLES:
self._load_metadata_from_file(metadata_set, metadata_role)

# Raise an exception if the repository is missing the required 'root'
Expand Down Expand Up @@ -2435,7 +2435,7 @@ def all_targets(self):
# all roles available on the repository.
delegated_targets = []
for role in tuf.roledb.get_rolenames(self.repository_name):
if role in ['root', 'snapshot', 'targets', 'timestamp']:
if role in tuf.roledb.TOP_LEVEL_ROLES:
continue

else:
Expand Down
65 changes: 56 additions & 9 deletions tuf/repository_lib.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def _generate_and_write_metadata(rolename, metadata_filename,
else:
logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.')

if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed:
if rolename in tuf.roledb.TOP_LEVEL_ROLES and not allow_partially_signed:
# Verify that the top-level 'rolename' is fully signed. Only a delegated
# role should not be written to disk without full verification of its
# signature(s), since it can only be considered fully signed depending on
Expand Down Expand Up @@ -394,18 +394,15 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
else:
logger.debug(repr(metadata_role) + ' found in the snapshot role.')



# Strip metadata extension from filename. The role database does not
# include the metadata extension.
if metadata_role.endswith(METADATA_EXTENSION):
metadata_role = metadata_role[:-len(METADATA_EXTENSION)]

else:
logger.debug(repr(metadata_role) + ' does not match'
' supported extension ' + repr(METADATA_EXTENSION))

if metadata_role in ['root', 'targets', 'snapshot', 'timestamp']:
if metadata_role in tuf.roledb.TOP_LEVEL_ROLES:
logger.debug('Not removing top-level metadata ' + repr(metadata_role))
return

Expand Down Expand Up @@ -811,7 +808,57 @@ def import_ed25519_privatekey_from_file(filepath, password=None):
return private_key


def get_metadata_filenames(metadata_directory):

def get_delegated_roles_metadata_filenames(metadata_directory,
consistent_snapshot, storage_backend=None):
"""
Return a dictionary containing all filenames in 'metadata_directory'
except the top-level roles.
If multiple versions of a file exist because of a consistent snapshot,
only the file with biggest version prefix is included.
"""

filenames = {}
metadata_files = sorted(storage_backend.list_folder(metadata_directory),
reverse=True)

# Iterate over role metadata files, sorted by their version-number prefix, with
# more recent versions first, and only add the most recent version of any
# (non top-level) metadata to the list of returned filenames. Note that there
# should only be one version of each file, if consistent_snapshot is False.
for metadata_role in metadata_files:
metadata_path = os.path.join(metadata_directory, metadata_role)

# Strip the version number if 'consistent_snapshot' is True,
# or if 'metadata_role' is Root.
# Example: '10.django.json' --> 'django.json'
consistent_snapshot = \
metadata_role.endswith('root.json') or consistent_snapshot == True
metadata_name, junk = _strip_version_number(metadata_role,
consistent_snapshot)

if metadata_name.endswith(METADATA_EXTENSION):
extension_length = len(METADATA_EXTENSION)
metadata_name = metadata_name[:-extension_length]

else:
logger.debug('Skipping file with unsupported metadata'
' extension: ' + repr(metadata_path))
continue

# Skip top-level roles, only interested in delegated roles.
if metadata_name in tuf.roledb.TOP_LEVEL_ROLES:
continue

# Prevent reloading duplicate versions if consistent_snapshot is True
if metadata_name not in filenames:
filenames[metadata_name] = metadata_path

return filenames



def get_top_level_metadata_filenames(metadata_directory):
"""
<Purpose>
Return a dictionary containing the filenames of the top-level roles.
Expand Down Expand Up @@ -1081,7 +1128,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot,
# Extract the role, threshold, and keyid information of the top-level roles,
# which Root stores in its metadata. The necessary role metadata is generated
# from this information.
for rolename in ['root', 'targets', 'snapshot', 'timestamp']:
for rolename in tuf.roledb.TOP_LEVEL_ROLES:

# If a top-level role is missing from 'tuf.roledb.py', raise an exception.
if not tuf.roledb.role_exists(rolename, repository_name):
Expand Down Expand Up @@ -1457,7 +1504,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
# snapshot and timestamp roles are not listed in snapshot.json, do not
# list these roles found in the metadata directory.
if tuf.roledb.role_exists(rolename, repository_name) and \
rolename not in ['root', 'snapshot', 'timestamp', 'targets']:
rolename not in tuf.roledb.TOP_LEVEL_ROLES:
fileinfodict[metadata_name] = get_metadata_versioninfo(rolename,
repository_name)

Expand Down Expand Up @@ -1776,7 +1823,7 @@ def _log_status_of_top_level_roles(targets_directory, metadata_directory,

# The expected full filenames of the top-level roles needed to write them to
# disk.
filenames = get_metadata_filenames(metadata_directory)
filenames = get_top_level_metadata_filenames(metadata_directory)
root_filename = filenames[ROOT_FILENAME]
targets_filename = filenames[TARGETS_FILENAME]
snapshot_filename = filenames[SNAPSHOT_FILENAME]
Expand Down
126 changes: 67 additions & 59 deletions tuf/repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import shutil
import json

from collections import deque

import tuf
import tuf.formats
import tuf.roledb
Expand Down Expand Up @@ -293,7 +295,7 @@ def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False):
for dirty_rolename in dirty_rolenames:

# Ignore top-level roles, they will be generated later in this method.
if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']:
if dirty_rolename in tuf.roledb.TOP_LEVEL_ROLES:
continue

dirty_filename = os.path.join(self._metadata_directory,
Expand Down Expand Up @@ -2393,7 +2395,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1,
self._parent_targets_object.add_delegated_role(rolename,
new_targets_object)

# Add 'new_targets_object' to the 'targets' role object (this object).
# Add 'new_targets_object' to the delegating role object (this object).
self.add_delegated_role(rolename, new_targets_object)

# Update the 'delegations' field of the current role.
Expand Down Expand Up @@ -3088,7 +3090,7 @@ def load_repository(repository_directory, repository_name='default',
repository = Repository(repository_directory, metadata_directory,
targets_directory, storage_backend, repository_name)

filenames = repo_lib.get_metadata_filenames(metadata_directory)
filenames = repo_lib.get_top_level_metadata_filenames(metadata_directory)

# The Root file is always available without a version number (a consistent
# snapshot) attached to the filename. Store the 'consistent_snapshot' value
Expand All @@ -3100,50 +3102,49 @@ def load_repository(repository_directory, repository_name='default',
repository, consistent_snapshot = repo_lib._load_top_level_metadata(repository,
filenames, repository_name)

# Load the delegated targets metadata and generate their fileinfo. The
# extracted fileinfo is stored in the 'meta' field of the snapshot metadata
# object.
targets_objects = {}
loaded_metadata = []
targets_objects['targets'] = repository.targets

metadata_files = sorted(storage_backend.list_folder(metadata_directory),
reverse=True)
for metadata_role in metadata_files:

metadata_path = os.path.join(metadata_directory, metadata_role)
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)

# Strip the version number if 'consistent_snapshot' is True,
# or if 'metadata_role' is Root.
# Example: '10.django.json' --> 'django.json'
consistent_snapshot = \
metadata_role.endswith('root.json') or consistent_snapshot == True
metadata_name, junk = repo_lib._strip_version_number(metadata_name,
consistent_snapshot)

if metadata_name.endswith(METADATA_EXTENSION):
extension_length = len(METADATA_EXTENSION)
metadata_name = metadata_name[:-extension_length]

else:
logger.debug('Skipping file with unsupported metadata'
' extension: ' + repr(metadata_path))
delegated_roles_filenames = repo_lib.get_delegated_roles_metadata_filenames(
metadata_directory, consistent_snapshot, storage_backend)

# Load the delegated targets metadata and their fileinfo.
# The delegated targets roles form a tree/graph which is traversed in a
# breadth-first-search manner starting from 'targets' in order to correctly
# load the delegations hierarchy.
parent_targets_object = repository.targets

# Keep the next delegations to be loaded in a deque structure which
# has the properties of a list but is designed to have fast appends
# and pops from both ends
delegations = deque()
# A set used to keep the already loaded delegations and avoid an infinite
# loop in case of cycles in the delegations graph
loaded_delegations = set()

# Top-level roles are already loaded, fetch targets and get its delegations.
# Store the delegations in the form of delegated-delegating role tuples,
# starting from the top-level targets:
# [('role1', 'targets'), ('role2', 'targets'), ... ]
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
for role in roleinfo['delegations']['roles']:
delegations.append((role['name'], 'targets'))

# Traverse the graph by appending the next delegation to the deque and
# 'pop'-ing and loading the left-most element.
while delegations:
rolename, delegating_role = delegations.popleft()
if (rolename, delegating_role) in loaded_delegations:
logger.warning('Detected cycle in the delegation graph: ' +
repr(delegating_role) + ' -> ' +
repr(rolename) +
' is reached more than once.')
continue

# Skip top-level roles, only interested in delegated roles now that the
# top-level roles have already been loaded.
if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']:
continue

# Keep a store of metadata previously loaded metadata to prevent re-loading
# duplicate versions. Duplicate versions may occur with
# 'consistent_snapshot', where the same metadata may be available in
# multiples files (the different hash is included in each filename.
if metadata_name in loaded_metadata:
continue
# Instead of adding only rolename to the set, store the already loaded
# delegated-delegating role tuples. This way a delegated role is added
# to each of its delegating roles but when the role is reached twice
# from the same delegating role an infinite loop is avoided.
loaded_delegations.add((rolename, delegating_role))

metadata_path = delegated_roles_filenames[rolename]
signable = None

try:
Expand All @@ -3156,9 +3157,9 @@ def load_repository(repository_directory, repository_name='default',

metadata_object = signable['signed']

# Extract the metadata attributes of 'metadata_name' and update its
# Extract the metadata attributes of 'metadata_object' and update its
# corresponding roleinfo.
roleinfo = {'name': metadata_name,
roleinfo = {'name': rolename,
'signing_keyids': [],
'signatures': [],
'partial_loaded': False
Expand All @@ -3170,18 +3171,23 @@ def load_repository(repository_directory, repository_name='default',
roleinfo['paths'] = metadata_object['targets']
roleinfo['delegations'] = metadata_object['delegations']

tuf.roledb.add_role(metadata_name, roleinfo, repository_name)
loaded_metadata.append(metadata_name)

# Generate the Targets objects of the delegated roles of 'metadata_name'
# and add it to the top-level 'targets' object.
new_targets_object = Targets(targets_directory, metadata_name, roleinfo,
repository_name=repository_name)
targets_object = targets_objects['targets']
targets_objects[metadata_name] = new_targets_object
# Generate the Targets object of the delegated role,
# add it to the top-level 'targets' object and to its
# direct delegating role object.
new_targets_object = Targets(targets_directory, rolename,
roleinfo, parent_targets_object=parent_targets_object,
repository_name=repository_name)

parent_targets_object.add_delegated_role(rolename,
new_targets_object)
if delegating_role != 'targets':
parent_targets_object(delegating_role).add_delegated_role(rolename,
new_targets_object)

targets_object._delegated_roles[(os.path.basename(metadata_name))] = \
new_targets_object
# Append the next level delegations to the deque:
# the 'delegated' role becomes the 'delegating'
for delegation in metadata_object['delegations']['roles']:
delegations.append((delegation['name'], rolename))

# Extract the keys specified in the delegations field of the Targets
# role. Add 'key_object' to the list of recognized keys. Keys may be
Expand All @@ -3196,8 +3202,10 @@ def load_repository(repository_directory, repository_name='default',
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = \
key_metadata['keyid_hash_algorithms']
key_object, keyids = \
securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
try:
for keyid in keyids: # pragma: no branch
Expand Down
3 changes: 3 additions & 0 deletions tuf/roledb.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
_dirty_roles['default'] = set()


TOP_LEVEL_ROLES = ['root', 'targets', 'snapshot', 'timestamp']


def create_roledb_from_root_metadata(root_metadata, repository_name='default'):
"""
<Purpose>
Expand Down

0 comments on commit 49031b4

Please sign in to comment.