From 428d8e7a306e232389be4efd59525608f73bf756 Mon Sep 17 00:00:00 2001 From: Kairo de Araujo Date: Tue, 24 May 2022 19:13:17 +0200 Subject: [PATCH] Removed RolesPayload data structure The RolesPayload was used by the TUF initialization (for development purposes) and during the Roles Delegations. The RolesPayload is no longer necessary during the TUF development initialization once all the configuration is available on request settings. During the Roles Delegations, it was replaced by the python-TUF data structure DelegatedRole, reusing it from ``tuf.repository``. Signed-off-by: Kairo de Araujo --- tests/conftest.py | 22 ++- tests/unit/cli/test_tuf.py | 10 +- tests/unit/tuf/test_repository.py | 314 +++++++++--------------------- tests/unit/tuf/test_services.py | 115 +++-------- tests/unit/tuf/test_tasks.py | 6 +- warehouse/cli/tuf.py | 6 +- warehouse/tuf/interfaces.py | 2 +- warehouse/tuf/repository.py | 139 +++++-------- warehouse/tuf/services.py | 108 +++++----- warehouse/tuf/tasks.py | 4 +- 10 files changed, 257 insertions(+), 469 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 34a4b70f6c41..3b9243f053cc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -356,14 +356,32 @@ def xmlrpc(self, path, method, *args): @pytest.fixture -def tuf_repository(): +def tuf_repository(db_request): class FakeStorageBackend(StorageBackendInterface): pass class FakeKeyBackend(IKeyService): pass - tuf_repo = MetadataRepository(FakeStorageBackend, FakeKeyBackend) + db_request.registry.settings = { + "tuf.keytype": "ed25519", + "tuf.root.threshold": 1, + "tuf.root.expiry": 31536000, + "tuf.snapshot.threshold": 1, + "tuf.snapshot.expiry": 86400, + "tuf.targets.threshold": 2, + "tuf.targets.expiry": 31536000, + "tuf.timestamp.threshold": 1, + "tuf.timestamp.expiry": 86400, + "tuf.bins.threshold": 1, + "tuf.bins.expiry": 31536000, + "tuf.bin-n.threshold": 1, + "tuf.bin-n.expiry": 604800, + } + + tuf_repo = MetadataRepository( + FakeStorageBackend, FakeKeyBackend, db_request.registry.settings + ) return tuf_repo diff --git a/tests/unit/cli/test_tuf.py b/tests/unit/cli/test_tuf.py index 382647aee461..d70f767f7940 100644 --- a/tests/unit/cli/test_tuf.py +++ b/tests/unit/cli/test_tuf.py @@ -27,7 +27,7 @@ add_hashed_targets as _add_hashed_targets, bump_bin_n_roles as _bump_bin_n_roles, bump_snapshot as _bump_snapshot, - init_repository as _init_repository, + init_dev_repository as _init_dev_repository, init_targets_delegation as _init_targets_delegation, ) @@ -76,8 +76,8 @@ def test_init_repo(self, cli): assert result.exit_code == 0 assert "Repository Initialization finished." in result.output assert config.task.calls == [ - pretend.call(_init_repository), - pretend.call(_init_repository), + pretend.call(_init_dev_repository), + pretend.call(_init_dev_repository), ] assert task.get_request.calls == [pretend.call()] assert task.run.calls == [pretend.call(request)] @@ -96,8 +96,8 @@ def test_init_repo_raise_fileexistserror(self, cli): assert result.exit_code == 1 assert "Error: TUF Error detail\n" == result.output assert config.task.calls == [ - pretend.call(_init_repository), - pretend.call(_init_repository), + pretend.call(_init_dev_repository), + pretend.call(_init_dev_repository), ] assert task.get_request.calls == [pretend.call()] diff --git a/tests/unit/tuf/test_repository.py b/tests/unit/tuf/test_repository.py index 0d585e417a31..d080c168bb82 100644 --- a/tests/unit/tuf/test_repository.py +++ b/tests/unit/tuf/test_repository.py @@ -17,8 +17,6 @@ from securesystemslib.exceptions import StorageError from tuf.api.metadata import ( TOP_LEVEL_ROLE_NAMES, - DelegatedRole, - Delegations, MetaFile, Snapshot, StorageBackendInterface, @@ -29,7 +27,7 @@ class TestMetadataRepository: - def test_basic_init(self, tuf_repository): + def test_basic_init(self, db_request): class FakeStorageBackend(StorageBackendInterface): pass @@ -37,7 +35,7 @@ class FakeKeyBackend(IKeyService): pass tuf_repository = repository.MetadataRepository( - FakeStorageBackend, FakeKeyBackend + FakeStorageBackend, FakeKeyBackend, db_request.registry.settings ) assert tuf_repository.storage_backend == FakeStorageBackend assert tuf_repository.key_backend == FakeKeyBackend @@ -81,6 +79,15 @@ def test_is_initialized_raise_storageerror(self, tuf_repository): pretend.call("timestamp"), ] + def test__set_expiration_for_role(self, tuf_repository, monkeypatch): + fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) + fake_datetime = pretend.stub(now=pretend.call_recorder(lambda: fake_time)) + monkeypatch.setattr("warehouse.tuf.repository.datetime", fake_datetime) + + result = tuf_repository._set_expiration_for_role(Snapshot.type) + assert str(result) == "2019-06-17 09:05:01" + assert fake_datetime.now.calls == [pretend.call()] + def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) fake_targets_md = pretend.stub( @@ -93,18 +100,20 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch): tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else None ) - tuf_repository._build_delegations = pretend.call_recorder( - lambda *a, **kw: "delegation" - ) + tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) test_delegate_roles_parameters = [ - repository.RolesPayload( - fake_time, - 1, + ( + repository.DelegatedRole( + "test_bin", + ["key1", "key2"], + 1, + False, + paths=["*/*"], + ), [{"keyid": "key1"}, {"keyid": "key2"}], - "test_bin", - paths=["*/*"], + fake_time, ) ] @@ -117,23 +126,16 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch): ) monkeypatch.setattr( "warehouse.tuf.repository.Key.from_securesystemslib_key", - lambda *a, **kw: None, + lambda *a, **kw: "fake_Key", ) result = tuf_repository._create_delegated_targets_roles( delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, + delegatees=test_delegate_roles_parameters, ) assert "test_bin.json" in result.signed.meta assert tuf_repository.load_role.calls == [ pretend.call(Snapshot.type), - pretend.call("test_bin"), - ] - assert tuf_repository._build_delegations.calls[0].args[0] == "test_bin" - assert type(tuf_repository._build_delegations.calls[0].args[1]) == DelegatedRole - assert tuf_repository._build_delegations.calls[0].args[2] == [ - {"keyid": "key1"}, - {"keyid": "key2"}, ] assert tuf_repository._store.calls[0].args[0] == "test_bin" @@ -151,74 +153,19 @@ def test__create_delegated_targets_roles_with_snapshot_md( tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else None ) - tuf_repository._build_delegations = pretend.call_recorder( - lambda *a, **kw: "delegation" - ) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) test_delegate_roles_parameters = [ - repository.RolesPayload( - fake_time, - 1, + ( + repository.DelegatedRole( + "test_bin", + ["key1", "key2"], + 1, + False, + paths=["*/*"], + ), [{"keyid": "key1"}, {"keyid": "key2"}], - "test_bin", - paths=["*/*"], - ) - ] - - fake_metadata = pretend.stub( - sign=pretend.call_recorder(lambda *a, **kw: None), - signed=pretend.stub(version=3), - ) - monkeypatch.setattr( - "warehouse.tuf.repository.Metadata", lambda *a, **kw: fake_metadata - ) - monkeypatch.setattr( - "warehouse.tuf.repository.Key.from_securesystemslib_key", - lambda *a, **kw: None, - ) - result = tuf_repository._create_delegated_targets_roles( - delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, - snapshot_metadata=fake_snapshot_md, - ) - - assert "test_bin.json" in result.signed.meta - assert tuf_repository.load_role.calls == [ - pretend.call("test_bin"), - ] - assert tuf_repository._build_delegations.calls[0].args[0] == "test_bin" - assert type(tuf_repository._build_delegations.calls[0].args[1]) == DelegatedRole - assert tuf_repository._build_delegations.calls[0].args[2] == [ - {"keyid": "key1"}, - {"keyid": "key2"}, - ] - assert tuf_repository._store.calls[0].args[0] == "test_bin" - - def test__create_delegated_targets_roles_raises_storageerror( - self, tuf_repository, monkeypatch - ): - fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) - fake_targets_md = pretend.stub( - signed=pretend.stub( - delegations=None, add_key=pretend.call_recorder(lambda *a, **kw: None) - ) - ) - fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) - - tuf_repository.load_role = pretend.raiser(StorageError()) - tuf_repository._build_delegations = pretend.call_recorder( - lambda *a, **kw: "delegation" - ) - tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) - - test_delegate_roles_parameters = [ - repository.RolesPayload( fake_time, - 1, - [{"keyid": "key1"}, {"keyid": "key2"}], - "test_bin", - paths=["*/*"], ) ] @@ -235,80 +182,14 @@ def test__create_delegated_targets_roles_raises_storageerror( ) result = tuf_repository._create_delegated_targets_roles( delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, + delegatees=test_delegate_roles_parameters, snapshot_metadata=fake_snapshot_md, ) assert "test_bin.json" in result.signed.meta - assert tuf_repository._build_delegations.calls[0].args[0] == "test_bin" - assert type(tuf_repository._build_delegations.calls[0].args[1]) == DelegatedRole - assert tuf_repository._build_delegations.calls[0].args[2] == [ - {"keyid": "key1"}, - {"keyid": "key2"}, - ] + assert tuf_repository.load_role.calls == [] assert tuf_repository._store.calls[0].args[0] == "test_bin" - def test__create_delegated_targets_roles_missing_delegated_role( - self, tuf_repository - ): - fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) - fake_targets_md = pretend.stub( - signed=pretend.stub( - delegations=None, add_key=pretend.call_recorder(lambda *a, **kw: None) - ) - ) - fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) - - test_delegate_roles_parameters = [ - repository.RolesPayload( - expiration=fake_time, - threshold=1, - keys=[{"keyid": "key1"}, {"keyid": "key2"}], - paths=["*/*"], - ) - ] - - with pytest.raises(ValueError) as err: - tuf_repository._create_delegated_targets_roles( - delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, - snapshot_metadata=fake_snapshot_md, - ) - - assert "A delegation role name is required." in str(err.value) - - def test__create_delegated_targets_roles_raise_fileexists(self, tuf_repository): - fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) - fake_targets_md = pretend.stub( - signed=pretend.stub( - delegations=None, add_key=pretend.call_recorder(lambda *a, **kw: None) - ) - ) - fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) - - tuf_repository.load_role = pretend.call_recorder( - lambda role: fake_snapshot_md if role == Snapshot.type else True - ) - test_delegate_roles_parameters = [ - repository.RolesPayload( - fake_time, - 1, - [{"keyid": "key1"}, {"keyid": "key2"}], - "test_bin", - paths=["*/*"], - ) - ] - - with pytest.raises(FileExistsError) as err: - tuf_repository._create_delegated_targets_roles( - delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, - snapshot_metadata=fake_snapshot_md, - ) - - assert "Role test_bin already exists." in str(err.value) - assert tuf_repository.load_role.calls == [pretend.call("test_bin")] - def test__create_delegated_targets_roles_has_delegations( self, tuf_repository, monkeypatch ): @@ -327,12 +208,16 @@ def test__create_delegated_targets_roles_has_delegations( tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) test_delegate_roles_parameters = [ - repository.RolesPayload( - fake_time, - 1, + ( + repository.DelegatedRole( + "test_bin", + ["key1", "key2"], + 1, + False, + paths=["*/*"], + ), [{"keyid": "key1"}, {"keyid": "key2"}], - "test_bin", - paths=["*/*"], + fake_time, ) ] @@ -349,13 +234,12 @@ def test__create_delegated_targets_roles_has_delegations( ) result = tuf_repository._create_delegated_targets_roles( delegator_metadata=fake_targets_md, - delegate_role_parameters=test_delegate_roles_parameters, + delegatees=test_delegate_roles_parameters, ) assert "test_bin.json" in result.signed.meta assert tuf_repository.load_role.calls == [ pretend.call(Snapshot.type), - pretend.call("test_bin"), ] assert "role1" in fake_targets_md.signed.delegations.roles.keys() assert "test_bin" in fake_targets_md.signed.delegations.roles.keys() @@ -380,29 +264,6 @@ def test__store(self, tuf_repository): assert result is None assert fake_metadata.to_file.calls[0].args[0] == "1.root.json" - def test__build_delegations(self, tuf_repository, monkeypatch): - - fake_delegated_role = DelegatedRole( - name="xxxx-yyyy", - keyids=["fake_key1", "fake_key2"], - threshold=1, - terminating=None, - paths=None, - path_hash_prefixes=["00-07"], - ) - - monkeypatch.setattr( - "warehouse.tuf.repository.Key.from_securesystemslib_key", - lambda *a, **kw: {"key": "data"}, - ) - result = tuf_repository._build_delegations( - "00-07", fake_delegated_role, [{"keyid": "key1"}, {"keyid": "key2"}] - ) - - assert type(result) == Delegations - assert "00-07" in result.roles.keys() - assert result.keys == {"key1": {"key": "data"}, "key2": {"key": "data"}} - def test_initialization(self, tuf_repository): fake_key = { "keytype": "ed25519", @@ -420,13 +281,10 @@ def test_initialization(self, tuf_repository): ), }, } + top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = repository.RolesPayload( - expiration=datetime.datetime(2019, 6, 16, 9, 5, 1), - threshold=1, - keys=[fake_key], - ) + top_roles_payload[role] = [fake_key, fake_key] tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -459,11 +317,7 @@ def test_initialization_store_false(self, tuf_repository): } top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = repository.RolesPayload( - expiration=datetime.datetime(2019, 6, 16, 9, 5, 1), - threshold=1, - keys=[fake_key], - ) + top_roles_payload[role] = [fake_key, fake_key] tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -480,11 +334,8 @@ def test_initialization_store_false(self, tuf_repository): def test_initialization_already_initialized(self, tuf_repository): top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = repository.RolesPayload( - expiration=datetime.datetime(2019, 6, 16, 9, 5, 1), - threshold=1, - keys=[{"key1": "key1_data"}], - ) + top_roles_payload[role] = [{"key1": "key1_data"}] + tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: True) with pytest.raises(FileExistsError) as err: tuf_repository.initialize(top_roles_payload, store=False) @@ -516,11 +367,7 @@ def test_initialization_threshold_more_than_keys(self, tuf_repository): } top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = repository.RolesPayload( - expiration=datetime.datetime(2019, 6, 16, 9, 5, 1), - threshold=2, - keys=[fake_key], - ) + top_roles_payload[role] = [fake_key] tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -562,17 +409,7 @@ def test_delegate_targets_roles(self, tuf_repository): ), }, } - payload = { - "xxxx-yyyy": [ - repository.RolesPayload( - expiration=fake_time, - threshold=1, - keys=[fake_key], - delegation_role="targets", - paths=["*/*"], - ) - ] - } + payload = {"xxxx-yyyy": [fake_key]} fake_targets_md = pretend.stub( signed=pretend.stub( delegations=None, @@ -595,6 +432,9 @@ def test_delegate_targets_roles(self, tuf_repository): tuf_repository.snapshot_update_meta = pretend.call_recorder( lambda *a, **kw: fake_snapshot_md ) + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda *a: fake_time + ) result = tuf_repository.delegate_targets_roles(payload) assert result == fake_snapshot_md assert tuf_repository.load_role.calls == [ @@ -616,6 +456,9 @@ def test_delegate_targets_roles(self, tuf_repository): assert tuf_repository.snapshot_update_meta.calls == [ pretend.call("xxxx-yyyy", 2, fake_snapshot_md) ] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("xxxx-yyyy") + ] def test_bump_role_version(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -690,19 +533,22 @@ def test_bump_timestamp_version(self, tuf_repository): ), sign=lambda *a, **kw: None, ) - + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda role: fake_new_time + ) tuf_repository.load_role = pretend.call_recorder(lambda role: fake_timestamp_md) tuf_repository.key_backend = pretend.stub( get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) ) - result = tuf_repository.timestamp_bump_version( - snapshot_version=20, timestamp_expires=fake_new_time - ) + result = tuf_repository.timestamp_bump_version(snapshot_version=20) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time assert tuf_repository.load_role.calls == [pretend.call("timestamp")] assert tuf_repository.key_backend.get.calls == [pretend.call("timestamp")] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("timestamp") + ] def test_bump_timestamp_version_store_true(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -719,16 +565,20 @@ def test_bump_timestamp_version_store_true(self, tuf_repository): tuf_repository.key_backend = pretend.stub( get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) ) + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda role: fake_new_time + ) tuf_repository._store = pretend.call_recorder(lambda role, role_md: None) - result = tuf_repository.timestamp_bump_version( - snapshot_version=20, timestamp_expires=fake_new_time, store=True - ) + result = tuf_repository.timestamp_bump_version(snapshot_version=20, store=True) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time assert result.signed.snapshot_meta.version == 20 assert tuf_repository.load_role.calls == [pretend.call("timestamp")] assert tuf_repository.key_backend.get.calls == [pretend.call("timestamp")] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("timestamp") + ] assert tuf_repository._store.calls == [ pretend.call("timestamp", fake_timestamp_md) ] @@ -745,15 +595,21 @@ def test_bump_snapshot_version(self, tuf_repository): ) tuf_repository.load_role = pretend.call_recorder(lambda role: fake_snapshot_md) + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda role: fake_new_time + ) tuf_repository.key_backend = pretend.stub( get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) ) - result = tuf_repository.snapshot_bump_version(fake_new_time) + result = tuf_repository.snapshot_bump_version() assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time assert tuf_repository.load_role.calls == [pretend.call("snapshot")] assert tuf_repository.key_backend.get.calls == [pretend.call("snapshot")] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("snapshot") + ] def test_bump_snapshot_version_store_true(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -770,13 +626,19 @@ def test_bump_snapshot_version_store_true(self, tuf_repository): tuf_repository.key_backend = pretend.stub( get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) ) + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda role: fake_new_time + ) tuf_repository._store = pretend.call_recorder(lambda role, role_md: None) - result = tuf_repository.snapshot_bump_version(fake_new_time, store=True) + result = tuf_repository.snapshot_bump_version(store=True) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time assert tuf_repository.load_role.calls == [pretend.call("snapshot")] assert tuf_repository.key_backend.get.calls == [pretend.call("snapshot")] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("snapshot") + ] assert tuf_repository._store.calls == [ pretend.call("snapshot", fake_snapshot_md) ] @@ -795,11 +657,17 @@ def test_bump_snapshot_version_with_snapshot_metadata(self, tuf_repository): tuf_repository.key_backend = pretend.stub( get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) ) + tuf_repository._set_expiration_for_role = pretend.call_recorder( + lambda role: fake_new_time + ) - result = tuf_repository.snapshot_bump_version(fake_new_time, fake_snapshot_md) + result = tuf_repository.snapshot_bump_version(fake_snapshot_md) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time assert tuf_repository.key_backend.get.calls == [pretend.call("snapshot")] + assert tuf_repository._set_expiration_for_role.calls == [ + pretend.call("snapshot") + ] def test_snapshot_update_meta(self, tuf_repository): diff --git a/tests/unit/tuf/test_services.py b/tests/unit/tuf/test_services.py index 8b93d0b45134..c110b46b411b 100644 --- a/tests/unit/tuf/test_services.py +++ b/tests/unit/tuf/test_services.py @@ -25,7 +25,6 @@ from warehouse.tuf.constants import BIN_N_COUNT, Role from warehouse.tuf.hash_bins import HashBins from warehouse.tuf.interfaces import IKeyService, IRepositoryService, IStorageService -from warehouse.tuf.repository import TargetFile class TestLocalKeyService: @@ -296,7 +295,7 @@ class TestRepositoryService: def test_verify_service(self): assert verifyClass(IRepositoryService, services.RepositoryService) - def test_create_service(self): + def test_create_service(self, db_request): fake_service = "Fake Service" request = pretend.stub( find_service=pretend.call_recorder(lambda interface: fake_service) @@ -342,38 +341,6 @@ def test__get_hash_bins_production(self, db_request): assert result.number_of_prefixes == 65536 assert result.bin_size == 4 - def test__set_expiration_for_role_development(self, db_request, monkeypatch): - fake_storage = pretend.stub() - fake_key_storage = pretend.stub() - db_request.registry.settings["warehouse.env"] = Environment.development - db_request.registry.settings["tuf.development_metadata_expiry"] = 31536000 - - fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) - fake_datetime = pretend.stub(now=pretend.call_recorder(lambda: fake_time)) - monkeypatch.setattr(datetime, "datetime", fake_datetime) - - service = services.RepositoryService(fake_storage, fake_key_storage, db_request) - - result = service._set_expiration_for_role(Role.ROOT.value) - - assert str(result) == "2020-06-15 09:05:01" - assert fake_datetime.now.calls == [pretend.call()] - - def test__set_expiration_for_role_production(self, db_request, monkeypatch): - fake_storage = pretend.stub() - fake_key_storage = pretend.stub() - db_request.registry.settings["warehouse.env"] = Environment.production - db_request.registry.settings["tuf.root.expiry"] = 94608000 - - fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) - fake_datetime = pretend.stub(now=pretend.call_recorder(lambda: fake_time)) - monkeypatch.setattr(datetime, "datetime", fake_datetime) - - service = services.RepositoryService(fake_storage, fake_key_storage, db_request) - result = service._set_expiration_for_role(Role.ROOT.value) - assert str(result) == "2022-06-15 09:05:01" - assert fake_datetime.now.calls == [pretend.call()] - def test_init_repository(self, db_request, monkeypatch): fake_storage = pretend.stub() fake_key_storage = pretend.stub( @@ -399,7 +366,8 @@ def test_init_repository(self, db_request, monkeypatch): db_request.registry.settings[name] = value fake_metadata_repository = pretend.stub( - is_initialized=False, initialize=pretend.call_recorder(lambda *a: None) + is_initialized=False, + initialize=pretend.call_recorder(lambda *a, **kw: None), ) monkeypatch.setattr( "warehouse.tuf.services.MetadataRepository", @@ -407,48 +375,20 @@ def test_init_repository(self, db_request, monkeypatch): ) service = services.RepositoryService(fake_storage, fake_key_storage, db_request) - result = service.init_repository() + result = service.init_dev_repository() assert result is None - # one call for role (4) - assert fake_datetime.now.calls == [ - pretend.call(), - pretend.call(), - pretend.call(), - pretend.call(), + assert fake_metadata_repository.initialize.calls == [ + pretend.call( + { + "targets": "fake_key", + "root": "fake_key", + "timestamp": "fake_key", + "snapshot": "fake_key", + }, + store=True, + ) ] - call_args = fake_metadata_repository.initialize.calls[0].args[0] - assert str(call_args["snapshot"].expiration) == "2019-06-17 09:05:01" - assert str(call_args["timestamp"].expiration) == "2019-06-17 09:05:01" - assert str(call_args["root"].expiration) == "2020-06-15 09:05:01" - assert str(call_args["targets"].expiration) == "2020-06-15 09:05:01" - assert ( - call_args["snapshot"].threshold == test_tuf_config["tuf.snapshot.threshold"] - ) - assert ( - call_args["timestamp"].threshold - == test_tuf_config["tuf.timestamp.threshold"] - ) - assert call_args["root"].threshold == test_tuf_config["tuf.root.threshold"] - assert ( - call_args["targets"].threshold == test_tuf_config["tuf.targets.threshold"] - ) - assert call_args["snapshot"].keys == "fake_key" - assert call_args["timestamp"].keys == "fake_key" - assert call_args["root"].keys == "fake_key" - assert call_args["targets"].keys == "fake_key" - assert call_args["snapshot"].delegation_role is None - assert call_args["timestamp"].delegation_role is None - assert call_args["root"].delegation_role is None - assert call_args["targets"].delegation_role is None - assert call_args["snapshot"].paths is None - assert call_args["timestamp"].paths is None - assert call_args["root"].paths is None - assert call_args["targets"].paths is None - assert call_args["snapshot"].path_hash_prefixes is None - assert call_args["timestamp"].path_hash_prefixes is None - assert call_args["root"].path_hash_prefixes is None - assert call_args["targets"].path_hash_prefixes is None for test_call in [ pretend.call(Role.SNAPSHOT.value), pretend.call(Role.ROOT.value), @@ -474,14 +414,16 @@ def test_init_repository_already_initialized(self, db_request, monkeypatch): service = services.RepositoryService(fake_storage, fake_key_storage, db_request) with pytest.raises(FileExistsError) as err: - service.init_repository() + service.init_dev_repository() assert "TUF Metadata Repository files already exists." in str(err.value) def test_init_targets_delegation(self, db_request, monkeypatch): fake_storage = pretend.stub() fake_key_storage = pretend.stub( - get=pretend.call_recorder(lambda role: "fake_key") + get=pretend.call_recorder( + lambda role: [{"keyid": "key1"}, {"keyid": "key2"}] + ) ) fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -505,6 +447,7 @@ def test_init_targets_delegation(self, db_request, monkeypatch): fake_metadata_repository = pretend.stub( is_initialized=False, delegate_targets_roles=pretend.call_recorder(lambda *a: None), + _set_expiration_for_role=pretend.call_recorder(lambda *a: fake_datetime), ) monkeypatch.setattr( "warehouse.tuf.services.MetadataRepository", @@ -519,16 +462,15 @@ def test_init_targets_delegation(self, db_request, monkeypatch): call_args = fake_metadata_repository.delegate_targets_roles.calls[0].args[0] assert sorted(["targets", "bins"]) == sorted(list(call_args.keys())) assert len(call_args["targets"]) == 1 - assert call_args["targets"][0].paths == ["*/*", "*/*/*/*"] - assert call_args["targets"][0].path_hash_prefixes is None - assert call_args["targets"][0].delegation_role == Role.BINS.value - assert call_args["targets"][0].threshold == 1 + assert type(call_args["targets"][0][0]) == services.DelegatedRole + assert call_args["targets"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}] assert ( len(call_args["bins"]) == 16384 ) # PEP458 https://peps.python.org/pep-0458/#metadata-scalability - assert call_args["bins"][0].paths is None - assert type(call_args["bins"][0].path_hash_prefixes) == list - assert len(call_args["bins"][0].path_hash_prefixes) == 4 + assert type(call_args["bins"][0][0]) == services.DelegatedRole + assert call_args["bins"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}] + # 1 target + # PEP458 https://peps.python.org/pep-0458/#metadata-scalability + assert len(fake_metadata_repository._set_expiration_for_role.calls) == 16385 def test_bump_snapshot(self, db_request, monkeypatch): fake_storage = pretend.stub() @@ -568,10 +510,8 @@ def test_bump_snapshot(self, db_request, monkeypatch): assert result is None assert fake_metadata_repository.load_role.calls == [pretend.call("snapshot")] - assert str(bump_s_calls["snapshot_expires"]) == "2019-06-17 09:05:01" assert bump_s_calls["snapshot_metadata"].signed.version == 2 assert bump_s_calls["store"] is True - assert str(bump_t_calls["timestamp_expires"]) == "2019-06-17 09:05:01" assert bump_t_calls["snapshot_version"] == 2 assert bump_t_calls["store"] is True @@ -611,10 +551,8 @@ def test_bump_snapshot_specific_snapshot_metadata(self, db_request, monkeypatch) bump_t_calls = fake_metadata_repository.timestamp_bump_version.calls[0].kwargs assert result is None - assert str(bump_s_calls["snapshot_expires"]) == "2019-06-17 09:05:01" assert bump_s_calls["snapshot_metadata"].signed.version == 2 assert bump_s_calls["store"] is True - assert str(bump_t_calls["timestamp_expires"]) == "2019-06-17 09:05:01" assert bump_t_calls["snapshot_version"] == 2 assert bump_t_calls["store"] is True @@ -653,6 +591,7 @@ def test_bump_bin_n_roles(self, db_request, monkeypatch): lambda *a, **kw: "snapshot_metadata" ), timestamp_bump_version=pretend.call_recorder(lambda *a, **kw: None), + _set_expiration_for_role=pretend.call_recorder(lambda *a: fake_datetime), ) monkeypatch.setattr( "warehouse.tuf.services.MetadataRepository", @@ -671,11 +610,11 @@ def test_bump_bin_n_roles(self, db_request, monkeypatch): assert ( len(fake_metadata_repository.load_role.calls) == 16385 ) # +1 snapshot call - assert ( fake_metadata_repository.load_role.calls.count(pretend.call("snapshot")) == 1 ) + assert len(fake_metadata_repository._set_expiration_for_role.calls) == 16384 assert service.bump_snapshot.calls == [pretend.call("snapshot_metadata")] def test_add_hashed_targets(self, db_request, monkeypatch): diff --git a/tests/unit/tuf/test_tasks.py b/tests/unit/tuf/test_tasks.py index 3467ed72c92b..250fab5f7820 100644 --- a/tests/unit/tuf/test_tasks.py +++ b/tests/unit/tuf/test_tasks.py @@ -89,7 +89,7 @@ class TestInitRepository: def test_success(self, db_request): fake_irepository = pretend.stub() - fake_irepository.init_repository = pretend.call_recorder(lambda: None) + fake_irepository.init_dev_repository = pretend.call_recorder(lambda: None) db_request.registry.settings["celery.scheduler_url"] = "fake_schedule" db_request.find_service = pretend.call_recorder( @@ -97,9 +97,9 @@ def test_success(self, db_request): ) task = pretend.stub() - tasks.init_repository(task, db_request) + tasks.init_dev_repository(task, db_request) - assert fake_irepository.init_repository.calls == [pretend.call()] + assert fake_irepository.init_dev_repository.calls == [pretend.call()] assert db_request.find_service.calls == [pretend.call(IRepositoryService)] diff --git a/warehouse/cli/tuf.py b/warehouse/cli/tuf.py index 3f32b22d89ee..4812eeeed491 100644 --- a/warehouse/cli/tuf.py +++ b/warehouse/cli/tuf.py @@ -23,7 +23,7 @@ add_hashed_targets as _add_hashed_targets, bump_bin_n_roles as _bump_bin_n_roles, bump_snapshot as _bump_snapshot, - init_repository as _init_repository, + init_dev_repository as _init_dev_repository, init_targets_delegation as _init_targets_delegation, ) @@ -68,8 +68,8 @@ def init_repo(config): Initialize a new TUF repository if it does not exist. """ try: - request = config.task(_init_repository).get_request() - config.task(_init_repository).run(request) + request = config.task(_init_dev_repository).get_request() + config.task(_init_dev_repository).run(request) except FileExistsError as err: raise click.ClickException(str(err)) diff --git a/warehouse/tuf/interfaces.py b/warehouse/tuf/interfaces.py index ed3653abb7e4..c32c3cdfa673 100644 --- a/warehouse/tuf/interfaces.py +++ b/warehouse/tuf/interfaces.py @@ -57,7 +57,7 @@ def create_service(context, request): created. """ - def init_repository(): + def init_dev_repository(): """ Initializes a Metadata Repository from scratch, including a new root. """ diff --git a/warehouse/tuf/repository.py b/warehouse/tuf/repository.py index f39d6c348ca3..94cb119ac746 100644 --- a/warehouse/tuf/repository.py +++ b/warehouse/tuf/repository.py @@ -10,9 +10,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Dict, List, Optional +from datetime import datetime, timedelta +from typing import Any, Dict, List, Optional, Tuple from securesystemslib.exceptions import StorageError # type: ignore from securesystemslib.signer import SSlibSigner # type: ignore @@ -39,27 +38,6 @@ SPEC_VERSION = ".".join(SPECIFICATION_VERSION) -@dataclass -class RolesPayload: - """ - Container for various role data. - - This includes data that can be assigned to any role (``expiration`` and - ``threshold``), data that can only be assigned to delegating roles - (``delegation_role``, ``paths``, ``path_hash_prefixes`` and the keyids or public - portions of ``keys``), and data that is used to sign any roles (private portion of - ``keys``). - - """ - - expiration: datetime - threshold: int - keys: List[Dict[str, Any]] - delegation_role: Optional[str] = None - paths: Optional[List[str]] = None - path_hash_prefixes: Optional[List[str]] = None - - class MetadataRepository: """ TUF metadata repository abstraction to create and maintain role metadata. @@ -69,9 +47,11 @@ def __init__( self, storage_backend: StorageBackendInterface, key_backend: IKeyService, + settings: Dict, ): self.storage_backend: StorageBackendInterface = storage_backend self.key_backend: IKeyService = key_backend + self.settings: Dict = settings @property def is_initialized(self) -> bool: @@ -86,10 +66,18 @@ def is_initialized(self) -> bool: return False + def _set_expiration_for_role(self, role_name): + """ + Returns a metadata expiration date (now + role-specific interval). + """ + return datetime.now().replace(microsecond=0) + timedelta( + seconds=self.settings[f"tuf.{role_name}.expiry"] + ) + def _create_delegated_targets_roles( self, delegator_metadata: Metadata, - delegate_role_parameters: List[RolesPayload], + delegatees: List[Tuple[DelegatedRole, List[Dict[str, Any]], datetime]], snapshot_metadata: Optional[Metadata[Snapshot]] = None, ) -> Metadata[Snapshot]: """ @@ -98,45 +86,29 @@ def _create_delegated_targets_roles( if snapshot_metadata is None: snapshot_metadata = self.load_role(Snapshot.type) - for role_parameter in delegate_role_parameters: - rolename = role_parameter.delegation_role - if rolename is None: - raise ValueError("A delegation role name is required.") - try: - if self.load_role(rolename): - raise FileExistsError(f"Role {rolename} already exists.") - except StorageError: - pass - - delegated_role = DelegatedRole( - name=rolename, - keyids=[key["keyid"] for key in role_parameter.keys], - threshold=role_parameter.threshold, - terminating=False, - paths=role_parameter.paths, - path_hash_prefixes=role_parameter.path_hash_prefixes, - ) + for delegatee, keys, expiration in delegatees: if delegator_metadata.signed.delegations is None: - delegation = self._build_delegations( - rolename, delegated_role, role_parameter.keys + delegator_metadata.signed.delegations = Delegations( + {key["keyid"]: Key.from_securesystemslib_key(key) for key in keys}, + {delegatee.name: delegatee}, ) - delegator_metadata.signed.delegations = delegation else: - delegator_metadata.signed.delegations.roles[rolename] = delegated_role + delegator_metadata.signed.delegations.roles[delegatee.name] = delegatee - targets = Targets(1, SPEC_VERSION, role_parameter.expiration, {}, None) + targets = Targets(1, SPEC_VERSION, expiration, {}, None) role_metadata = Metadata(targets, {}) - for key in role_parameter.keys: + for key in keys: delegator_metadata.signed.add_key( - rolename, Key.from_securesystemslib_key(key) + delegatee.name, Key.from_securesystemslib_key(key) ) role_metadata.sign(SSlibSigner(key), append=True) - self._store(rolename, role_metadata) + self._store(delegatee.name, role_metadata) + snapshot_metadata = self.snapshot_update_meta( - rolename, role_metadata.signed.version, snapshot_metadata + delegatee.name, role_metadata.signed.version, snapshot_metadata ) return snapshot_metadata @@ -159,25 +131,14 @@ def _store(self, rolename: str, metadata: Metadata) -> None: filename = self._filename(rolename, metadata.signed.version) metadata.to_file(filename, JSONSerializer(), self.storage_backend) - def _build_delegations( - self, rolename: str, delegated_role: DelegatedRole, keys: List[Dict[str, Any]] - ) -> Delegations: - """ - Returns ``Delegations`` object assigning passed keys and roles information. - """ - return Delegations( - keys={key["keyid"]: Key.from_securesystemslib_key(key) for key in keys}, - roles={rolename: delegated_role}, - ) - def initialize( - self, payload: Dict[str, RolesPayload], store: Optional[bool] + self, keys: Dict[str, List[Dict[str, Any]]], store: Optional[bool] ) -> Dict[str, Metadata]: """ Initializes metadata repository with basic top-level role metadata. Args: - payload: Initial per-role infos to populate metadata. + keys: per-role keys for signing. store: Indicates whether metadata should be written to storage. Raises: @@ -192,43 +153,52 @@ def initialize( if self.is_initialized: raise FileExistsError("Metadata already exists in the Storage Service") - targets = Targets(1, SPEC_VERSION, payload[Targets.type].expiration, {}, None) + targets = Targets( + 1, SPEC_VERSION, self._set_expiration_for_role(Targets.type), {}, None + ) targets_metadata = Metadata(targets, {}) top_level_roles_metadata[Targets.type] = targets_metadata meta = {"targets.json": MetaFile(targets.version)} - snapshot = Snapshot(1, SPEC_VERSION, payload[Snapshot.type].expiration, meta) + snapshot = Snapshot( + 1, SPEC_VERSION, self._set_expiration_for_role(Snapshot.type), meta + ) snapshot_metadata = Metadata(snapshot, {}) top_level_roles_metadata[Snapshot.type] = snapshot_metadata snapshot_meta = MetaFile(snapshot.version) timestamp = Timestamp( - 1, SPEC_VERSION, payload[Timestamp.type].expiration, snapshot_meta + 1, + SPEC_VERSION, + self._set_expiration_for_role(Timestamp.type), + snapshot_meta, ) timestamp_metadata = Metadata(timestamp, {}) top_level_roles_metadata[Timestamp.type] = timestamp_metadata roles = { - role_name: Role([], payload[role_name].threshold) + role_name: Role([], self.settings[f"tuf.{role_name}.threshold"]) for role_name in TOP_LEVEL_ROLE_NAMES } - root = Root(1, SPEC_VERSION, payload[Root.type].expiration, {}, roles, True) + root = Root( + 1, SPEC_VERSION, self._set_expiration_for_role(Root.type), {}, roles, True + ) # Sign all top level roles metadata signers = dict() for role in TOP_LEVEL_ROLE_NAMES: - if payload[role].threshold > len(payload[role].keys): + role_keys = keys[role] + if self.settings[f"tuf.{role}.threshold"] > len(role_keys): raise ValueError( f"Role {role} has missing Key(s) " - f"to match to defined threshold {payload[role].threshold}." + f"to match to defined threshold " + f"{self.settings[f'tuf.{role}.threshold']}." ) - for key in payload[role].keys: + for key in role_keys: root.add_key(role, Key.from_securesystemslib_key(key)) - signers[role] = { - key["keyid"]: SSlibSigner(key) for key in payload[role].keys - } + signers[role] = {key["keyid"]: SSlibSigner(key) for key in role_keys} root_metadata = Metadata(root, {}) top_level_roles_metadata[Root.type] = root_metadata @@ -255,7 +225,7 @@ def load_role(self, rolename: str) -> Metadata: def delegate_targets_roles( self, - payload: Dict[str, List[RolesPayload]], + payload: Dict[str, List[Tuple[DelegatedRole, List[Dict[str, Any]], datetime]]], ) -> Metadata[Snapshot]: """ Performs targets delegation for delegator-to-delegates items in passed payload. @@ -274,19 +244,18 @@ def delegate_targets_roles( Updated snapshot metadata ``tuf.api.metadata.Metadata[Snapshot]`` """ - snapshot_metadata = self.load_role(Snapshot.type) - for delegator, delegate_role_parameters in payload.items(): + for delegator, delegatee in payload.items(): delegator_metadata = self.load_role(delegator) snapshot_metadata = self._create_delegated_targets_roles( delegator_metadata, - delegate_role_parameters, + delegatee, snapshot_metadata, ) delegator_metadata = self.bump_role_version( rolename=delegator, role_metadata=delegator_metadata, - role_expires=delegator_metadata.signed.expires, + role_expires=self._set_expiration_for_role(delegator), key_rolename=None, store=True, ) @@ -336,7 +305,6 @@ def bump_role_version( def timestamp_bump_version( self, snapshot_version: int, - timestamp_expires: datetime, store: bool = False, ) -> Metadata[Timestamp]: """ @@ -353,7 +321,9 @@ def timestamp_bump_version( """ timestamp_metadata = self.load_role(Timestamp.type) timestamp_metadata.signed.version += 1 - timestamp_metadata.signed.expires = timestamp_expires + timestamp_metadata.signed.expires = self._set_expiration_for_role( + Timestamp.type + ) timestamp_metadata.signed.snapshot_meta = MetaFile(version=snapshot_version) timestamp_keys = self.key_backend.get(Timestamp.type) for key in timestamp_keys: @@ -366,7 +336,6 @@ def timestamp_bump_version( def snapshot_bump_version( self, - snapshot_expires: datetime, snapshot_metadata: Optional[Metadata[Snapshot]] = None, store: Optional[bool] = False, ) -> Metadata[Snapshot]: @@ -388,7 +357,7 @@ def snapshot_bump_version( snapshot_metadata = self.load_role(Snapshot.type) snapshot_metadata.signed.version += 1 - snapshot_metadata.signed.expires = snapshot_expires + snapshot_metadata.signed.expires = self._set_expiration_for_role(Snapshot.type) snapshot_keys = self.key_backend.get(Snapshot.type) for key in snapshot_keys: snapshot_metadata.sign(SSlibSigner(key), append=True) diff --git a/warehouse/tuf/services.py b/warehouse/tuf/services.py index 47138377517d..7f0f7bb83eb0 100644 --- a/warehouse/tuf/services.py +++ b/warehouse/tuf/services.py @@ -11,7 +11,6 @@ # limitations under the License. -import datetime import glob import os.path import shutil @@ -31,8 +30,8 @@ from warehouse.tuf.interfaces import IKeyService, IRepositoryService, IStorageService from warehouse.tuf.repository import ( TOP_LEVEL_ROLE_NAMES, + DelegatedRole, MetadataRepository, - RolesPayload, TargetFile, ) @@ -180,50 +179,28 @@ def _get_hash_bins(self): return HashBins(number_of_bins) - def _set_expiration_for_role(self, role_name): + def init_dev_repository(self): """ - Returns a metadata expiration date (now + role-specific interval). - """ - # In a development environment metadata expires less frequently so that - # developers don't have to continually re-initialize it. - if self._request.registry.settings["warehouse.env"] == Environment.development: - return datetime.datetime.now().replace(microsecond=0) + datetime.timedelta( - seconds=self._request.registry.settings[ - "tuf.development_metadata_expiry" - ] - ) - else: - return datetime.datetime.now().replace(microsecond=0) + datetime.timedelta( - seconds=self._request.registry.settings[f"tuf.{role_name}.expiry"] - ) - - def init_repository(self): - """ - Creates TUF top-level role metadata (root, targets, snapshot, timestamp). - - The metadata is populated with configured expiration times, signature thresholds - and verification keys, and signed and persisted using the configured key and - storage services. + Creates development TUF top-level role metadata (root, targets, snapshot, + timestamp). FIXME: In production 'root' and 'targets' roles require offline singing keys, which may not be available at the time of initializing this metadata. """ metadata_repository = MetadataRepository( - self._storage_backend, self._key_storage_backend + self._storage_backend, + self._key_storage_backend, + self._request.registry.settings, ) if metadata_repository.is_initialized: raise FileExistsError("TUF Metadata Repository files already exists.") - top_roles_payload = dict() + keys = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = RolesPayload( - expiration=self._set_expiration_for_role(role), - threshold=self._request.registry.settings[f"tuf.{role}.threshold"], - keys=self._key_storage_backend.get(role), - ) + keys[role] = self._key_storage_backend.get(role) - metadata_repository.initialize(top_roles_payload, True) + metadata_repository.initialize(keys, store=True) def init_targets_delegation(self): """ @@ -239,39 +216,50 @@ def init_targets_delegation(self): """ hash_bins = self._get_hash_bins() metadata_repository = MetadataRepository( - self._storage_backend, self._key_storage_backend + self._storage_backend, + self._key_storage_backend, + self._request.registry.settings, ) # Top-level 'targets' role delegates trust for all target files to 'bins' role. + keys = self._key_storage_backend.get(Role.BINS.value) delegate_roles_payload = dict() delegate_roles_payload["targets"] = list() delegate_roles_payload["targets"].append( - RolesPayload( - expiration=self._set_expiration_for_role(Role.BINS.value), - threshold=self._request.registry.settings[ - f"tuf.{Role.BINS.value}.threshold" - ], - keys=self._key_storage_backend.get(Role.BINS.value), - delegation_role=Role.BINS.value, - paths=["*/*", "*/*/*/*"], + ( + DelegatedRole( + Role.BINS.value, + [key["keyid"] for key in keys], + self._request.registry.settings[f"tuf.{Role.BINS.value}.threshold"], + False, + paths=["*/*", "*/*/*/*"], + ), + keys, + metadata_repository._set_expiration_for_role(Role.BIN_N.value), ) ) + # The 'bins' role delegates trust for target files to 'bin-n' roles based on # target file path hash prefixes. delegate_roles_payload[Role.BINS.value] = list() + keys = self._key_storage_backend.get(Role.BIN_N.value) + key_ids = [key["keyid"] for key in keys] for bin_n_name, bin_n_hash_prefixes in hash_bins.generate(): delegate_roles_payload[Role.BINS.value].append( - RolesPayload( - expiration=self._set_expiration_for_role(Role.BIN_N.value), - threshold=self._request.registry.settings[ - f"tuf.{Role.BIN_N.value}.threshold" - ], - keys=self._key_storage_backend.get(Role.BIN_N.value), - delegation_role=bin_n_name, - path_hash_prefixes=bin_n_hash_prefixes, + ( + DelegatedRole( + bin_n_name, + key_ids, + self._request.registry.settings[ + f"tuf.{Role.BIN_N.value}.threshold" + ], + False, + path_hash_prefixes=bin_n_hash_prefixes, + ), + keys, + metadata_repository._set_expiration_for_role(Role.BIN_N.value), ) ) - snapshot_metadata = metadata_repository.delegate_targets_roles( delegate_roles_payload, ) @@ -288,21 +276,21 @@ def bump_snapshot(self, snapshot_metadata=None): Bumping 'snapshot' transitively bumps the 'timestamp' role. """ metadata_repository = MetadataRepository( - self._storage_backend, self._key_storage_backend + self._storage_backend, + self._key_storage_backend, + self._request.registry.settings, ) if snapshot_metadata is None: snapshot_metadata = metadata_repository.load_role(Role.SNAPSHOT.value) snapshot_metadata = metadata_repository.snapshot_bump_version( - snapshot_expires=self._set_expiration_for_role(Role.SNAPSHOT.value), snapshot_metadata=snapshot_metadata, store=True, ) metadata_repository.timestamp_bump_version( snapshot_version=snapshot_metadata.signed.version, - timestamp_expires=self._set_expiration_for_role(Role.TIMESTAMP.value), store=True, ) @@ -319,7 +307,9 @@ def bump_bin_n_roles(self): # 1. Grab metadata Repository metadata_repository = MetadataRepository( - self._storage_backend, self._key_storage_backend + self._storage_backend, + self._key_storage_backend, + self._request.registry.settings, ) # 2. Load Snapshot role. @@ -332,7 +322,9 @@ def bump_bin_n_roles(self): metadata_repository.bump_role_version( rolename=bin_n_name, role_metadata=role_metadata, - role_expires=self._set_expiration_for_role(Role.BINS.value), + role_expires=metadata_repository._set_expiration_for_role( + Role.BINS.value + ), key_rolename=Role.BIN_N.value, store=True, ) @@ -367,7 +359,9 @@ def add_hashed_targets(self, targets): targets_payload[delegated_role_bin_name].append(target_file) metadata_repository = MetadataRepository( - self._storage_backend, self._key_storage_backend + self._storage_backend, + self._key_storage_backend, + self._request.registry.settings, ) snapshot_metadata = metadata_repository.add_targets( diff --git a/warehouse/tuf/tasks.py b/warehouse/tuf/tasks.py index 241004d9be06..9fa44fe0a3c6 100644 --- a/warehouse/tuf/tasks.py +++ b/warehouse/tuf/tasks.py @@ -35,9 +35,9 @@ def bump_bin_n_roles(task, request): @task(bind=True, ignore_result=True, acks_late=True) -def init_repository(task, request): +def init_dev_repository(task, request): repository_service = request.find_service(IRepositoryService) - repository_service.init_repository() + repository_service.init_dev_repository() @task(bind=True, ignore_result=True, acks_late=True)