Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Remove the need of manually marking roles as dirty #1038

Closed
wants to merge 10 commits into from
8 changes: 4 additions & 4 deletions tests/test_repository_tool.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -719,10 +719,10 @@ def test_add_verification_key(self):
'consistent_snapshot': False, 'expires': expiration,
'partial_loaded': False}

tuf.roledb.add_role('Root', roleinfo, 'test_repository')
tuf.roledb.add_role('Targets', roleinfo, 'test_repository')
tuf.roledb.add_role('Snapshot', roleinfo, 'test_repository')
tuf.roledb.add_role('Timestamp', roleinfo, 'test_repository')
tuf.roledb.add_role('Root', roleinfo, repository_name='test_repository')
tuf.roledb.add_role('Targets', roleinfo, repository_name='test_repository')
tuf.roledb.add_role('Snapshot', roleinfo, repository_name='test_repository')
tuf.roledb.add_role('Timestamp', roleinfo, repository_name='test_repository')

# Test for different top-level role names.
self.metadata._rolename = 'Targets'
Expand Down
94 changes: 60 additions & 34 deletions tests/test_roledb.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_clear_roledb(self):
repository_name = 'example_repository'
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name))
tuf.roledb.clear_roledb(repository_name)
self.assertFalse(tuf.roledb.role_exists(rolename, repository_name))
Expand Down Expand Up @@ -151,7 +151,7 @@ def test_add_role(self):
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb,
repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename,
repository_name))

Expand All @@ -166,7 +166,8 @@ def test_add_role(self):
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, None)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, 123)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, [''])
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, 123)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, False, 123)
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.add_role, rolename, roleinfo, [])


# Test condition where the rolename already exists in the role database.
Expand All @@ -175,7 +176,7 @@ def test_add_role(self):

# Test where the repository name does not exist in the role database.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.add_role,
'new_role', roleinfo, 'non-existent')
'new_role', roleinfo, repository_name='non-existent')

# Test conditions for invalid rolenames.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.add_role, ' badrole ',
Expand Down Expand Up @@ -206,7 +207,7 @@ def test_role_exists(self):

tuf.roledb.create_roledb(repository_name)
self.assertEqual(False, tuf.roledb.role_exists(rolename, repository_name))
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertTrue(tuf.roledb.role_exists(rolename, repository_name))

# Reset the roledb so that subsequent tests have access to the original,
Expand Down Expand Up @@ -251,7 +252,7 @@ def test_remove_role(self):
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.remove_role, rolename, repository_name)
tuf.roledb.create_roledb(repository_name)

tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name))
self.assertEqual(None, tuf.roledb.remove_role(rolename, repository_name))

Expand Down Expand Up @@ -293,8 +294,8 @@ def test_get_rolenames(self):
repository_name = 'example_repository'
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_rolenames, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename2, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
tuf.roledb.add_role(rolename2, roleinfo, repository_name=repository_name)

self.assertEqual(set(['targets', 'role1']),
set(tuf.roledb.get_rolenames()))
Expand Down Expand Up @@ -328,7 +329,7 @@ def test_get_role_info(self):
rolename, repository_name)

tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(roleinfo, tuf.roledb.get_roleinfo(rolename, repository_name))

# Verify that a roleinfo cannot be retrieved for a non-existent repository
Expand Down Expand Up @@ -368,7 +369,7 @@ def test_get_role_keyids(self):
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_role_keyids,
rolename, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(['123'], tuf.roledb.get_role_keyids(rolename, repository_name))

# Verify that rolekeyids cannot be retrieved from a non-existent repository
Expand Down Expand Up @@ -406,7 +407,7 @@ def test_get_role_threshold(self):
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_role_threshold,
rolename, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(roleinfo['threshold'], tuf.roledb.get_role_threshold(rolename, repository_name))

# Verify that a role's threshold cannot be retrieved from a non-existent
Expand Down Expand Up @@ -445,7 +446,7 @@ def test_get_role_paths(self):
rolename, repository_name)

tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename2, roleinfo2, repository_name)
tuf.roledb.add_role(rolename2, roleinfo2, repository_name=repository_name)
self.assertEqual(roleinfo2['paths'], tuf.roledb.get_role_paths(rolename2,
repository_name))

Expand Down Expand Up @@ -516,7 +517,7 @@ def test_get_delegated_rolenames(self):
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_delegated_rolenames,
rolename, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
self.assertEqual(set(['django', 'tuf']),
set(tuf.roledb.get_delegated_rolenames(rolename, repository_name)))

Expand Down Expand Up @@ -635,7 +636,7 @@ def test_update_roleinfo(self):
mark_role_as_dirty = True
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.clear_roledb, repository_name)
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=repository_name)
tuf.roledb.update_roleinfo(rolename, roleinfo, mark_role_as_dirty, repository_name)
self.assertEqual(roleinfo['keyids'], tuf.roledb.get_role_keyids(rolename, repository_name))

Expand Down Expand Up @@ -667,32 +668,42 @@ def test_update_roleinfo(self):
def test_get_dirty_roles(self):
# Verify that the dirty roles of a role are returned.
rolename = 'targets'
mark_role_as_dirty = True

roleinfo1 = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo1)
tuf.roledb.add_role(rolename, roleinfo1, mark_role_as_dirty)
# new role added: targets -> snapshot -> timestamp
self.assertEqual(['snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles())

roleinfo2 = {'keyids': ['123'], 'threshold': 2}
mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
# update targets key: targets -> snapshot -> timestamp AND targets -> root
self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles())

# Verify that a list of dirty roles is returned for a non-default
# repository.
repository_name = 'example_repository'
tuf.roledb.create_roledb(repository_name)
tuf.roledb.add_role(rolename, roleinfo1, repository_name)
tuf.roledb.add_role(rolename, roleinfo1, repository_name=repository_name)
tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty, repository_name)
self.assertEqual([rolename], tuf.roledb.get_dirty_roles(repository_name))
self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles(repository_name))

# Verify that dirty roles are not returned for a non-existent repository.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.get_dirty_roles, 'non-existent')
self.assertRaises(securesystemslib.exceptions.InvalidNameError,
tuf.roledb.get_dirty_roles, 'non-existent')

# Reset the roledb so that subsequent tests have access to a default
# roledb.
tuf.roledb.remove_roledb(repository_name)

# Test for improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError, tuf.roledb.get_dirty_roles, 123)
self.assertRaises(securesystemslib.exceptions.FormatError,
tuf.roledb.get_dirty_roles, 123)



Expand All @@ -703,19 +714,31 @@ def test_mark_dirty(self):
tuf.roledb.add_role(rolename, roleinfo1)
rolename2 = 'dirty_role'
roleinfo2 = {'keyids': ['123'], 'threshold': 2}

mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())

tuf.roledb.mark_dirty(['dirty_role'])
self.assertEqual([rolename2, rolename], tuf.roledb.get_dirty_roles())
# targets -> snapshot -> timestamp
self.assertEqual(['snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles())

# manually mark dirty: new role is added to the dirty list
tuf.roledb.mark_dirty([rolename2])
self.assertEqual([rolename2, 'snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles())

# targets' key update:
# targets -> snapshot -> timestamp AND targets -> root
tuf.roledb.unmark_dirty(tuf.roledb.get_dirty_roles())
tuf.roledb.update_roleinfo(rolename, roleinfo2, mark_role_as_dirty)
self.assertEqual(['root', 'snapshot', 'targets', 'timestamp'],
tuf.roledb.get_dirty_roles())

# Verify that a role cannot be marked as dirty for a non-existent
# repository.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.mark_dirty,
['dirty_role'], 'non-existent')
self.assertRaises(securesystemslib.exceptions.InvalidNameError,
tuf.roledb.mark_dirty, ['dirty_role'], 'non-existent')



Expand All @@ -727,16 +750,19 @@ def test_unmark_dirty(self):
rolename2 = 'dirty_role'
roleinfo2 = {'keyids': ['123'], 'threshold': 2}
tuf.roledb.add_role(rolename2, roleinfo2)

mark_role_as_dirty = True
tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty)
# Note: The 'default' repository is searched if the repository name is
# not given to get_dirty_roles().
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.update_roleinfo(rolename2, roleinfo2, mark_role_as_dirty)
# update targets roleinfo marks as dirty also snapshot and timestamp
role_chain_list = ['snapshot', rolename, 'timestamp']
self.assertEqual(role_chain_list, tuf.roledb.get_dirty_roles())

tuf.roledb.unmark_dirty(['dirty_role'])
self.assertEqual([rolename], tuf.roledb.get_dirty_roles())
tuf.roledb.unmark_dirty(['targets'])
tuf.roledb.update_roleinfo(rolename2, roleinfo2, mark_role_as_dirty)
tuf.roledb.unmark_dirty([rolename2])
self.assertEqual(role_chain_list, tuf.roledb.get_dirty_roles())
tuf.roledb.unmark_dirty(role_chain_list)
self.assertEqual([], tuf.roledb.get_dirty_roles())

# What happens for a role that isn't dirty? unmark_dirty() should just
Expand All @@ -745,8 +771,8 @@ def test_unmark_dirty(self):

# Verify that a role cannot be unmarked as dirty for a non-existent
# repository.
self.assertRaises(securesystemslib.exceptions.InvalidNameError, tuf.roledb.unmark_dirty,
['dirty_role'], 'non-existent')
self.assertRaises(securesystemslib.exceptions.InvalidNameError,
tuf.roledb.unmark_dirty, [rolename2], 'non-existent')


def _test_rolename(self, test_function):
Expand Down
9 changes: 5 additions & 4 deletions tests/test_slow_retrieval_attack.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,11 @@ def setUp(self):
targets_private = repo_tool.import_ed25519_privatekey_from_file(key_file,
'password')

repository.targets.load_signing_key(targets_private)
repository.snapshot.load_signing_key(snapshot_private)
repository.timestamp.load_signing_key(timestamp_private)

# Mark roles as dirty to force metadata rewrite on writeall()
mark_dirty = True
repository.targets.load_signing_key(targets_private, mark_dirty)
repository.snapshot.load_signing_key(snapshot_private, mark_dirty)
repository.timestamp.load_signing_key(timestamp_private, mark_dirty)
repository.writeall()

# Move the staged metadata to the "live" metadata.
Expand Down
9 changes: 3 additions & 6 deletions tests/test_tutorial.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,8 @@ def test_tutorial(self):
repository.dirty_roles()
# Concat strings to avoid Python2/3 unicode prefix problems ('' vs. u'')
mock_logger.info.assert_called_with(
"Dirty roles: " + str(['targets']))
"Dirty roles: " + str(['snapshot', 'targets', 'timestamp']))

repository.mark_dirty(['snapshot', 'timestamp'])
repository.writeall()


Expand Down Expand Up @@ -326,9 +325,8 @@ def test_tutorial(self):
repository.dirty_roles()
# Concat strings to avoid Python2/3 unicode prefix problems ('' vs. u'')
mock_logger.info.assert_called_with(
"Dirty roles: " + str(['targets', 'unclaimed']))
"Dirty roles: " + str(['snapshot', 'targets', 'timestamp', 'unclaimed']))

repository.mark_dirty(["snapshot", "timestamp"])
repository.writeall()


Expand Down Expand Up @@ -378,9 +376,8 @@ def test_tutorial(self):
'28-2f', '30-37', '38-3f', '40-47', '48-4f', '50-57', '58-5f',
'60-67', '68-6f', '70-77', '78-7f', '80-87', '88-8f', '90-97',
'98-9f', 'a0-a7', 'a8-af', 'b0-b7', 'b8-bf', 'c0-c7', 'c8-cf',
'd0-d7', 'd8-df', 'e0-e7', 'e8-ef', 'f0-f7', 'f8-ff', 'unclaimed']))
'd0-d7', 'd8-df', 'e0-e7', 'e8-ef', 'f0-f7', 'f8-ff', 'snapshot', 'timestamp', 'unclaimed']))

repository.mark_dirty(["snapshot", "timestamp"])
repository.writeall()

# ----- Tutorial Section: How to Perform an Update
Expand Down
17 changes: 12 additions & 5 deletions tests/test_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,8 @@ def test_3__get_metadata_file(self):
tuf.SPECIFICATION_VERSION = '0.9.0'

repository = repo_tool.load_repository(self.repository_directory)
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.timestamp.load_signing_key(
self.role_keys['timestamp']['private'], True)
repository.writeall()

# Move the staged metadata to the "live" metadata.
Expand Down Expand Up @@ -1283,10 +1284,16 @@ def test_6_download_target(self):
# is now being set to true (i.e., the pre-generated repository isn't set
# to support consistent snapshots. A new version of targets.json is needed
# to ensure <digest>.filename target files are written to disk.
repository.targets.load_signing_key(self.role_keys['targets']['private'])
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Mark roles as dirty to force metadata rewrite on writeall()
mark_role_as_dirty = True
repository.targets.load_signing_key(self.role_keys['targets']['private'],
mark_role_as_dirty)
repository.root.load_signing_key(self.role_keys['root']['private'],
mark_role_as_dirty)
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'],
mark_role_as_dirty)
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'],
mark_role_as_dirty)

repository.writeall(consistent_snapshot=True)

Expand Down
6 changes: 3 additions & 3 deletions tuf/client/updater.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ def __init__(self, repository_name, repository_mirrors):

# Load current and previous metadata.
for metadata_set in ['current', 'previous']:
for metadata_role in ['root', 'targets', 'snapshot', 'timestamp']:
for metadata_role in tuf.roledb.TOP_LEVEL_ROLES:
self._load_metadata_from_file(metadata_set, metadata_role)

# Raise an exception if the repository is missing the required 'root'
Expand Down Expand Up @@ -984,7 +984,7 @@ def _import_delegations(self, parent_role):
# is None.
rolename = roleinfo.get('name')
logger.debug('Adding delegated role: ' + str(rolename) + '.')
tuf.roledb.add_role(rolename, roleinfo, self.repository_name)
tuf.roledb.add_role(rolename, roleinfo, repository_name=self.repository_name)

except tuf.exceptions.RoleAlreadyExistsError:
logger.warning('Role already exists: ' + rolename)
Expand Down Expand Up @@ -2435,7 +2435,7 @@ def all_targets(self):
# all roles available on the repository.
delegated_targets = []
for role in tuf.roledb.get_rolenames(self.repository_name):
if role in ['root', 'snapshot', 'targets', 'timestamp']:
if role in tuf.roledb.TOP_LEVEL_ROLES:
continue

else:
Expand Down
Loading