Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Use separate keydb for delegated targets #1095

Closed
wants to merge 10 commits into from
2 changes: 1 addition & 1 deletion tests/test_developer_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def test_write(self):


# Load_signing_keys.
project('delegation').load_signing_key(delegation_private_key)
project('delegation').load_signing_key(delegation_private_key, 'targets')

project.status()

Expand Down
29 changes: 16 additions & 13 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ def test_1__init__exceptions(self):


def test_1__load_metadata_from_file(self):

# Setup
# Get the 'role1.json' filepath. Manually load the role metadata, and
# compare it against the loaded metadata by '_load_metadata_from_file()'.
Expand All @@ -320,6 +319,8 @@ def test_1__load_metadata_from_file(self):
self.assertEqual(len(self.repository_updater.metadata['current']), 5)

# Verify that the content of root metadata is valid.
# load_json_file does not set 'parent_role'
role1_meta['signed']['delegations']['roles'][0]['parent_role'] = 'role1'
self.assertEqual(self.repository_updater.metadata['current']['role1'],
role1_meta['signed'])

Expand Down Expand Up @@ -353,13 +354,14 @@ def test_1__rebuild_key_and_role_db(self):

self.assertEqual(root_roleinfo['threshold'], root_threshold)

# Ensure we add 2 to the number of root keys (actually, the number of root
# keys multiplied by the number of keyid hash algorithms), to include the
# delegated targets key (+1 for its sha512 keyid). The delegated roles of
# 'targets.json' are also loaded when the repository object is
# instantiated.
# Verify that the keydb for root contains only keys for top level roles

self.assertEqual(number_of_root_keys * 2, len(tuf.keydb._keydb_dict[self.repository_name]))

self.assertEqual(number_of_root_keys * 2 + 2, len(tuf.keydb._keydb_dict[self.repository_name]))
# Ensure that the delegated targets keys are stored in their own keydb.
# The delegated roles of 'targets.json' are also loaded when the repository
# object is instantiated.
self.assertEqual(2, len(tuf.keydb._keydb_dict[self.repository_name + ' targets']))

# Test: normal case.
self.repository_updater._rebuild_key_and_role_db()
Expand Down Expand Up @@ -583,9 +585,10 @@ def test_2__import_delegations(self):
self.repository_updater._import_delegations('targets')

self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 5)
# The number of root keys (times the number of key hash algorithms) +
# delegation's key (+1 for its sha512 keyid).
self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 2)
# The number of root keys (times the number of key hash algorithms)
self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2)
# The delegation's key (+1 for its sha512 keyid).
self.assertEqual(len(tuf.keydb._keydb_dict[repository_name + ' targets']), 2)

# Verify that roledb dictionary was added.
self.assertTrue('role1' in tuf.roledb._roledb_dict[repository_name])
Expand All @@ -599,7 +602,7 @@ def test_2__import_delegations(self):
keyids.append(signature['keyid'])

for keyid in keyids:
self.assertTrue(keyid in tuf.keydb._keydb_dict[repository_name])
self.assertTrue(keyid in tuf.keydb._keydb_dict[repository_name] or keyid in tuf.keydb._keydb_dict[repository_name + ' targets'])

# Verify that _import_delegations() ignores invalid keytypes in the 'keys'
# field of parent role's 'delegations'.
Expand Down Expand Up @@ -1168,8 +1171,8 @@ def test_6_get_one_valid_targetinfo(self):
repository.targets('role4').add_target(foo_package)

repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.targets('role3').load_signing_key(self.role_keys['targets']['private'])
repository.targets('role4').load_signing_key(self.role_keys['targets']['private'])
repository.targets('role3').load_signing_key(self.role_keys['targets']['private'], 'targets')
repository.targets('role4').load_signing_key(self.role_keys['targets']['private'], 'targets')
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.writeall()
Expand Down
66 changes: 26 additions & 40 deletions tuf/client/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,42 +940,19 @@ def _import_delegations(self, parent_role):
return

# This could be quite slow with a large number of delegations.
keys_info = current_parent_metadata['delegations'].get('keys', {})
roles_info = current_parent_metadata['delegations'].get('roles', [])

logger.debug('Adding roles delegated from ' + repr(parent_role) + '.')

# Iterate the keys of the delegated roles of 'parent_role' and load them.
for keyid, keyinfo in six.iteritems(keys_info):
if keyinfo['keytype'] in ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']:

# We specify the keyid to ensure that it's the correct keyid
# for the key.
try:

# The repo may have used hashing algorithms for the generated keyids
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = keyinfo['keyid_hash_algorithms']
key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms

for key_id in keyids:
key['keyid'] = key_id
tuf.keydb.add_key(key, keyid=None, repository_name=self.repository_name)

except tuf.exceptions.KeyAlreadyExistsError:
pass

except (securesystemslib.exceptions.FormatError, securesystemslib.exceptions.Error):
logger.exception('Invalid key for keyid: ' + repr(keyid) + '.')
logger.error('Aborting role delegation for parent role ' + parent_role + '.')
raise

else:
logger.warning('Invalid key type for ' + repr(keyid) + '.')
continue
try:
tuf.keydb.create_keydb_from_targets_metadata(current_parent_metadata['delegations'],
self.repository_name, parent_role)
except tuf.exceptions.KeyAlreadyExistsError:
pass
except (securesystemslib.exceptions.FormatError, securesystemslib.exceptions.Error):
logger.error('Aborting role delegation for parent role ' + parent_role + '.')
raise

# Add the roles to the role database.
for roleinfo in roles_info:
Expand All @@ -984,6 +961,7 @@ def _import_delegations(self, parent_role):
# is None.
rolename = roleinfo.get('name')
logger.debug('Adding delegated role: ' + str(rolename) + '.')
roleinfo['parent_role'] = parent_role
tuf.roledb.add_role(rolename, roleinfo, self.repository_name)

except tuf.exceptions.RoleAlreadyExistsError:
Expand Down Expand Up @@ -1380,7 +1358,7 @@ def verify_target_file(target_file_object):


def _verify_uncompressed_metadata_file(self, metadata_file_object,
metadata_role):
metadata_role, delegating_rolename='root'):
"""
<Purpose>
Non-public method that verifies an uncompressed metadata file. An
Expand Down Expand Up @@ -1443,7 +1421,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,

# Verify the signature on the downloaded metadata object.
valid = tuf.sig.verify(metadata_signable, metadata_role,
self.repository_name)
self.repository_name, delegating_rolename=delegating_rolename)

if not valid:
raise securesystemslib.exceptions.BadSignatureError(metadata_role)
Expand All @@ -1453,7 +1431,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,


def _get_metadata_file(self, metadata_role, remote_filename,
upperbound_filelength, expected_version):
upperbound_filelength, expected_version, delegating_rolename='root'):
"""
<Purpose>
Non-public method that tries downloading, up to a certain length, a
Expand Down Expand Up @@ -1578,7 +1556,8 @@ def _get_metadata_file(self, metadata_role, remote_filename,
except KeyError:
logger.info(metadata_role + ' not available locally.')

self._verify_uncompressed_metadata_file(file_object, metadata_role)
self._verify_uncompressed_metadata_file(file_object, metadata_role,
delegating_rolename=delegating_rolename)

except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
Expand Down Expand Up @@ -1709,7 +1688,8 @@ def _get_file(self, filepath, verify_file_function, file_type, file_length,



def _update_metadata(self, metadata_role, upperbound_filelength, version=None):
def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
delegating_rolename='root'):
"""
<Purpose>
Non-public method that downloads, verifies, and 'installs' the metadata
Expand Down Expand Up @@ -1779,7 +1759,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None):

metadata_file_object = \
self._get_metadata_file(metadata_role, remote_filename,
upperbound_filelength, version)
upperbound_filelength, version, delegating_rolename=delegating_rolename)

# The metadata has been verified. Move the metadata file into place.
# First, move the 'current' metadata file to the 'previous' directory
Expand Down Expand Up @@ -1828,7 +1808,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None):


def _update_metadata_if_changed(self, metadata_role,
referenced_metadata='snapshot'):
referenced_metadata='snapshot', delegating_rolename='root'):
"""
<Purpose>
Non-public method that updates the metadata for 'metadata_role' if it has
Expand Down Expand Up @@ -1944,7 +1924,7 @@ def _update_metadata_if_changed(self, metadata_role,

try:
self._update_metadata(metadata_role, upperbound_filelength,
expected_versioninfo['version'])
expected_versioninfo['version'], delegating_rolename=delegating_rolename)

except Exception:
# The current metadata we have is not current but we couldn't get new
Expand Down Expand Up @@ -2517,7 +2497,13 @@ def _refresh_targets_metadata(self, rolename='targets',
self._load_metadata_from_file('previous', rolename)
self._load_metadata_from_file('current', rolename)

self._update_metadata_if_changed(rolename)
roleinfo = tuf.roledb.get_roleinfo(rolename, self.repository_name)
try:
delegating_rolename = roleinfo['parent_role']
except KeyError:
delegating_rolename = 'root'
self._update_metadata_if_changed(rolename,
delegating_rolename=delegating_rolename)



Expand Down
Loading