Skip to content

Commit

Permalink
Load correctly the delegated Targets objects hierarchy
Browse files Browse the repository at this point in the history
Update load_repository() function to load the delegations metadata
starting from 'targets' and traversing downwards the delegated
roles in order to load correctly the delegations hierarchy.

Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
  • Loading branch information
sechkova committed Jun 10, 2020
1 parent ff5afe4 commit 1bc320f
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 50 deletions.
53 changes: 53 additions & 0 deletions tuf/repository_lib.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,59 @@ def import_ed25519_privatekey_from_file(filepath, password=None):
return private_key



def get_delegations_filenames(metadata_directory, consistent_snapshot,
storage_backend=None):
"""
Return a dictionary containing all filenames in 'metadata_directory'
except the top-level roles.
If multiple versions of a file exist because of a consistent snapshot,
only the file with biggest version prefix is included.
"""

filenames = {}
loaded_metadata = []
metadata_files = sorted(storage_backend.list_folder(metadata_directory),
reverse=True)
for metadata_role in metadata_files:
metadata_path = os.path.join(metadata_directory, metadata_role)

# Strip the version number if 'consistent_snapshot' is True,
# or if 'metadata_role' is Root.
# Example: '10.django.json' --> 'django.json'
consistent_snapshot = \
metadata_role.endswith('root.json') or consistent_snapshot == True
metadata_name, junk = _strip_version_number(metadata_role,
consistent_snapshot)

if metadata_name.endswith(METADATA_EXTENSION):
extension_length = len(METADATA_EXTENSION)
metadata_name = metadata_name[:-extension_length]

else:
logger.debug('Skipping file with unsupported metadata'
' extension: ' + repr(metadata_path))
continue

# Skip top-level roles, only interested in delegated roles now that the
# top-level roles have already been loaded.
if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']:
continue

# Keep a store of metadata previously loaded metadata to prevent re-loading
# duplicate versions. Duplicate versions may occur with
# 'consistent_snapshot', where the same metadata may be available in
# multiples files (the different hash is included in each filename).
if metadata_name in loaded_metadata:
continue

filenames[metadata_name] = metadata_path
loaded_metadata.append(metadata_name)

return filenames



def get_metadata_filenames(metadata_directory):
"""
<Purpose>
Expand Down
89 changes: 39 additions & 50 deletions tuf/repository_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2393,7 +2393,7 @@ def delegate(self, rolename, public_keys, paths, threshold=1,
self._parent_targets_object.add_delegated_role(rolename,
new_targets_object)

# Add 'new_targets_object' to the 'targets' role object (this object).
# Add 'new_targets_object' to the delegating role object (this object).
self.add_delegated_role(rolename, new_targets_object)

# Update the 'delegations' field of the current role.
Expand Down Expand Up @@ -3104,45 +3104,24 @@ def load_repository(repository_directory, repository_name='default',
# extracted fileinfo is stored in the 'meta' field of the snapshot metadata
# object.
targets_objects = {}
loaded_metadata = []
targets_objects['targets'] = repository.targets
# A list of delegated-delegating role pairs
delegations = []

metadata_files = sorted(storage_backend.list_folder(metadata_directory),
reverse=True)
for metadata_role in metadata_files:
delegations_filenames = repo_lib.get_delegations_filenames(metadata_directory,
consistent_snapshot, storage_backend)

metadata_path = os.path.join(metadata_directory, metadata_role)
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Top-level roles are already loaded, fetch targets and get its delegations.
# Collect a list of delegated-delegating role pairs, starting from the
# top-level targets: [('role1', 'targets'), ('role2', 'targets'), ... ]
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
for role in roleinfo['delegations']['roles']:
delegations.append([role['name'], 'targets'])

# Strip the version number if 'consistent_snapshot' is True,
# or if 'metadata_role' is Root.
# Example: '10.django.json' --> 'django.json'
consistent_snapshot = \
metadata_role.endswith('root.json') or consistent_snapshot == True
metadata_name, junk = repo_lib._strip_version_number(metadata_name,
consistent_snapshot)

if metadata_name.endswith(METADATA_EXTENSION):
extension_length = len(METADATA_EXTENSION)
metadata_name = metadata_name[:-extension_length]

else:
logger.debug('Skipping file with unsupported metadata'
' extension: ' + repr(metadata_path))
continue

# Skip top-level roles, only interested in delegated roles now that the
# top-level roles have already been loaded.
if metadata_name in ['root', 'snapshot', 'targets', 'timestamp']:
continue

# Keep a store of metadata previously loaded metadata to prevent re-loading
# duplicate versions. Duplicate versions may occur with
# 'consistent_snapshot', where the same metadata may be available in
# multiples files (the different hash is included in each filename.
if metadata_name in loaded_metadata:
continue
# Load the delegated roles by starting from 'targets' and continuously
# appending the next level delegations to the list
for rolename, delegating_role in delegations:
metadata_path = delegations_filenames[rolename]

signable = None

Expand All @@ -3156,9 +3135,9 @@ def load_repository(repository_directory, repository_name='default',

metadata_object = signable['signed']

# Extract the metadata attributes of 'metadata_name' and update its
# Extract the metadata attributes of 'metadata_object' and update its
# corresponding roleinfo.
roleinfo = {'name': metadata_name,
roleinfo = {'name': rolename,
'signing_keyids': [],
'signatures': [],
'partial_loaded': False
Expand All @@ -3170,18 +3149,26 @@ def load_repository(repository_directory, repository_name='default',
roleinfo['paths'] = metadata_object['targets']
roleinfo['delegations'] = metadata_object['delegations']

tuf.roledb.add_role(metadata_name, roleinfo, repository_name)
loaded_metadata.append(metadata_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)

# Generate the Targets object of the delegated role,
# add it to the top-level 'targets' object and to its
# direct delegating role object.
new_targets_object = Targets(targets_directory, rolename,
roleinfo, parent_targets_object=targets_objects['targets'],
repository_name=repository_name)

targets_objects[rolename] = new_targets_object

# Generate the Targets objects of the delegated roles of 'metadata_name'
# and add it to the top-level 'targets' object.
new_targets_object = Targets(targets_directory, metadata_name, roleinfo,
repository_name=repository_name)
targets_object = targets_objects['targets']
targets_objects[metadata_name] = new_targets_object
targets_objects['targets'].add_delegated_role(rolename,
new_targets_object)
targets_objects[delegating_role].add_delegated_role(rolename,
new_targets_object)

targets_object._delegated_roles[(os.path.basename(metadata_name))] = \
new_targets_object
# Append the next level delegations to the list:
# the 'delegated' role becomes the 'delegating'
for delegation in metadata_object['delegations']['roles']:
delegations.append([delegation['name'], rolename])

# Extract the keys specified in the delegations field of the Targets
# role. Add 'key_object' to the list of recognized keys. Keys may be
Expand All @@ -3196,8 +3183,10 @@ def load_repository(repository_directory, repository_name='default',
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = \
key_metadata['keyid_hash_algorithms']
key_object, keyids = \
securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
try:
for keyid in keyids: # pragma: no branch
Expand Down

0 comments on commit 1bc320f

Please sign in to comment.