From fea707d2ca88f9fc675dc85bfb3892699a949c5a Mon Sep 17 00:00:00 2001 From: Kairo de Araujo Date: Wed, 25 May 2022 20:05:27 +0200 Subject: [PATCH] Replaced the Key signatures dict to Signers This commit adds a refactoring on the key signature used. Instead of using from Key Storage Service keys as a dictionary, uses that as a ``securesystemslib.signer.Signer``. It gives more flexibility and uses the same data structure across the services, repository and TUF. Signed-off-by: Kairo de Araujo --- tests/unit/tuf/test_repository.py | 103 +++++++++++++++++++++++------- tests/unit/tuf/test_services.py | 25 ++++++-- warehouse/tuf/repository.py | 88 +++++++++++-------------- warehouse/tuf/services.py | 30 +++++---- 4 files changed, 153 insertions(+), 93 deletions(-) diff --git a/tests/unit/tuf/test_repository.py b/tests/unit/tuf/test_repository.py index d080c168bb82..2dae6561e207 100644 --- a/tests/unit/tuf/test_repository.py +++ b/tests/unit/tuf/test_repository.py @@ -100,8 +100,13 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch): tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else None ) - tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "key1"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] test_delegate_roles_parameters = [ ( @@ -112,7 +117,7 @@ def test__create_delegated_targets_roles(self, tuf_repository, monkeypatch): False, paths=["*/*"], ), - [{"keyid": "key1"}, {"keyid": "key2"}], + fake_signers, fake_time, ) ] @@ -149,6 +154,12 @@ def test__create_delegated_targets_roles_with_snapshot_md( ) ) fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "key1"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else None @@ -164,7 +175,7 @@ def test__create_delegated_targets_roles_with_snapshot_md( False, paths=["*/*"], ), - [{"keyid": "key1"}, {"keyid": "key2"}], + fake_signers, fake_time, ) ] @@ -201,6 +212,12 @@ def test__create_delegated_targets_roles_has_delegations( ) ) fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "key1"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else None @@ -216,7 +233,7 @@ def test__create_delegated_targets_roles_has_delegations( False, paths=["*/*"], ), - [{"keyid": "key1"}, {"keyid": "key2"}], + fake_signers, fake_time, ) ] @@ -281,10 +298,20 @@ def test_initialization(self, tuf_repository): ), }, } + fake_signers = [ + pretend.stub( + key_dict=fake_key, + sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")), + ), + pretend.stub( + key_dict=fake_key, + sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key2")), + ), + ] top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = [fake_key, fake_key] + top_roles_payload[role] = fake_signers tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -315,9 +342,20 @@ def test_initialization_store_false(self, tuf_repository): ), }, } + fake_signers = [ + pretend.stub( + key_dict=fake_key, + sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")), + ), + pretend.stub( + key_dict=fake_key, + sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key2")), + ), + ] + top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = [fake_key, fake_key] + top_roles_payload[role] = fake_signers tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -365,9 +403,15 @@ def test_initialization_threshold_more_than_keys(self, tuf_repository): ), }, } + fake_signers = [ + pretend.stub( + key_dict=fake_key, + sign=pretend.call_recorder(lambda *a: pretend.stub(keyid="key1")), + ) + ] top_roles_payload = dict() for role in TOP_LEVEL_ROLE_NAMES: - top_roles_payload[role] = [fake_key] + top_roles_payload[role] = fake_signers tuf_repository.load_role = pretend.call_recorder(lambda *a, **kw: None) tuf_repository._store = pretend.call_recorder(lambda *a, **kw: None) @@ -409,7 +453,12 @@ def test_delegate_targets_roles(self, tuf_repository): ), }, } - payload = {"xxxx-yyyy": [fake_key]} + fake_signers = [ + pretend.stub( + key_dict=fake_key, sign=pretend.call_recorder(lambda *a: "key1") + ) + ] + payload = {"xxxx-yyyy": fake_signers} fake_targets_md = pretend.stub( signed=pretend.stub( delegations=None, @@ -449,7 +498,7 @@ def test_delegate_targets_roles(self, tuf_repository): rolename="xxxx-yyyy", role_metadata=fake_targets_md, role_expires=fake_time, - key_rolename=None, + signers=None, store=True, ) ] @@ -463,6 +512,12 @@ def test_delegate_targets_roles(self, tuf_repository): def test_bump_role_version(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) fake_new_time = datetime.datetime(2022, 6, 16, 9, 5, 1) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "fake_id"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] initial_version = 1983 fake_role_metadata = pretend.stub( signed=pretend.stub(expires=fake_time, version=initial_version), @@ -470,15 +525,14 @@ def test_bump_role_version(self, tuf_repository): ) tuf_repository.key_backend = pretend.stub( - get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) + get=pretend.call_recorder(lambda role: fake_signers) ) result = tuf_repository.bump_role_version( - "fake_role", fake_role_metadata, fake_new_time + "fake_role", fake_role_metadata, fake_new_time, fake_signers ) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time - assert tuf_repository.key_backend.get.calls == [pretend.call("fake_role")] def test_bump_role_version_store_true(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -488,17 +542,19 @@ def test_bump_role_version_store_true(self, tuf_repository): signed=pretend.stub(expires=fake_time, version=initial_version), sign=lambda *a, **kw: None, ) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "fake_id"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] - tuf_repository.key_backend = pretend.stub( - get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) - ) tuf_repository._store = pretend.call_recorder(lambda rolename, role_md: None) result = tuf_repository.bump_role_version( - "fake_role", fake_role_metadata, fake_new_time, store=True + "fake_role", fake_role_metadata, fake_new_time, fake_signers, store=True ) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time - assert tuf_repository.key_backend.get.calls == [pretend.call("fake_role")] assert tuf_repository._store.calls == [ pretend.call("fake_role", fake_role_metadata) ] @@ -511,17 +567,18 @@ def test_bump_role_version_with_key_rolename(self, tuf_repository): signed=pretend.stub(expires=fake_time, version=initial_version), sign=lambda *a, **kw: None, ) - - tuf_repository.key_backend = pretend.stub( - get=pretend.call_recorder(lambda role: [{"key": "key_data"}]) - ) + fake_signers = [ + pretend.stub( + key_dict={"keyid": "fake_id"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] result = tuf_repository.bump_role_version( - "fake_role", fake_role_metadata, fake_new_time, "key_role_name" + "fake_role", fake_role_metadata, fake_new_time, fake_signers ) assert result.signed.version == initial_version + 1 assert result.signed.expires == fake_new_time - assert tuf_repository.key_backend.get.calls == [pretend.call("key_role_name")] def test_bump_timestamp_version(self, tuf_repository): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) diff --git a/tests/unit/tuf/test_services.py b/tests/unit/tuf/test_services.py index c110b46b411b..8508d7bd2d29 100644 --- a/tests/unit/tuf/test_services.py +++ b/tests/unit/tuf/test_services.py @@ -61,7 +61,7 @@ def test_get(self, db_request, monkeypatch): root_keyid = service.get("root") - assert root_keyid == [expected_priv_key_dict] + assert root_keyid[0].key_dict == expected_priv_key_dict class TestLocalStorageService: @@ -420,10 +420,14 @@ def test_init_repository_already_initialized(self, db_request, monkeypatch): def test_init_targets_delegation(self, db_request, monkeypatch): fake_storage = pretend.stub() - fake_key_storage = pretend.stub( - get=pretend.call_recorder( - lambda role: [{"keyid": "key1"}, {"keyid": "key2"}] + fake_signers = [ + pretend.stub( + key_dict={"keyid": "fake_id"}, + sign=pretend.call_recorder(lambda *a: "key1"), ) + ] + fake_key_storage = pretend.stub( + get=pretend.call_recorder(lambda role: fake_signers) ) fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -463,12 +467,12 @@ def test_init_targets_delegation(self, db_request, monkeypatch): assert sorted(["targets", "bins"]) == sorted(list(call_args.keys())) assert len(call_args["targets"]) == 1 assert type(call_args["targets"][0][0]) == services.DelegatedRole - assert call_args["targets"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}] + assert call_args["targets"][0][1][0].key_dict == {"keyid": "fake_id"} assert ( len(call_args["bins"]) == 16384 ) # PEP458 https://peps.python.org/pep-0458/#metadata-scalability assert type(call_args["bins"][0][0]) == services.DelegatedRole - assert call_args["bins"][0][1] == [{"keyid": "key1"}, {"keyid": "key2"}] + assert call_args["bins"][0][1][0].key_dict == {"keyid": "fake_id"} # 1 target + # PEP458 https://peps.python.org/pep-0458/#metadata-scalability assert len(fake_metadata_repository._set_expiration_for_role.calls) == 16385 @@ -558,8 +562,14 @@ def test_bump_snapshot_specific_snapshot_metadata(self, db_request, monkeypatch) def test_bump_bin_n_roles(self, db_request, monkeypatch): fake_storage = pretend.stub() + fake_signers = [ + pretend.stub( + key_dict={"keyid": "fake_id"}, + sign=pretend.call_recorder(lambda *a: "key1"), + ) + ] fake_key_storage = pretend.stub( - get=pretend.call_recorder(lambda role: "fake_key") + get=pretend.call_recorder(lambda role: fake_signers) ) fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) @@ -592,6 +602,7 @@ def test_bump_bin_n_roles(self, db_request, monkeypatch): ), timestamp_bump_version=pretend.call_recorder(lambda *a, **kw: None), _set_expiration_for_role=pretend.call_recorder(lambda *a: fake_datetime), + _key_storage_backend=pretend.call_recorder(lambda *a: fake_signers), ) monkeypatch.setattr( "warehouse.tuf.services.MetadataRepository", diff --git a/warehouse/tuf/repository.py b/warehouse/tuf/repository.py index 94cb119ac746..12ea1adf872f 100644 --- a/warehouse/tuf/repository.py +++ b/warehouse/tuf/repository.py @@ -14,7 +14,7 @@ from typing import Any, Dict, List, Optional, Tuple from securesystemslib.exceptions import StorageError # type: ignore -from securesystemslib.signer import SSlibSigner # type: ignore +from securesystemslib.signer import Signer # type: ignore from tuf.api.metadata import ( SPECIFICATION_VERSION, TOP_LEVEL_ROLE_NAMES, @@ -77,7 +77,7 @@ def _set_expiration_for_role(self, role_name): def _create_delegated_targets_roles( self, delegator_metadata: Metadata, - delegatees: List[Tuple[DelegatedRole, List[Dict[str, Any]], datetime]], + delegatees: List[Tuple[DelegatedRole, List[Signer], datetime]], snapshot_metadata: Optional[Metadata[Snapshot]] = None, ) -> Metadata[Snapshot]: """ @@ -86,24 +86,23 @@ def _create_delegated_targets_roles( if snapshot_metadata is None: snapshot_metadata = self.load_role(Snapshot.type) - for delegatee, keys, expiration in delegatees: + for delegatee, signers, expiration in delegatees: + targets = Targets(1, SPEC_VERSION, expiration, {}, None) + role_metadata = Metadata(targets, {}) if delegator_metadata.signed.delegations is None: delegator_metadata.signed.delegations = Delegations( - {key["keyid"]: Key.from_securesystemslib_key(key) for key in keys}, + {signer.key_dict["keyid"]: signer for signer in signers}, {delegatee.name: delegatee}, ) else: delegator_metadata.signed.delegations.roles[delegatee.name] = delegatee - targets = Targets(1, SPEC_VERSION, expiration, {}, None) - role_metadata = Metadata(targets, {}) - - for key in keys: + for signer in signers: delegator_metadata.signed.add_key( - delegatee.name, Key.from_securesystemslib_key(key) + delegatee.name, Key.from_securesystemslib_key(signer.key_dict) ) - role_metadata.sign(SSlibSigner(key), append=True) + role_metadata.sign(signer, append=True) self._store(delegatee.name, role_metadata) @@ -132,13 +131,13 @@ def _store(self, rolename: str, metadata: Metadata) -> None: metadata.to_file(filename, JSONSerializer(), self.storage_backend) def initialize( - self, keys: Dict[str, List[Dict[str, Any]]], store: Optional[bool] + self, role_signers: Dict[str, Signer], store: Optional[bool] ) -> Dict[str, Metadata]: """ Initializes metadata repository with basic top-level role metadata. Args: - keys: per-role keys for signing. + signer: per-role ``securesystemslib.signer.Signer``. store: Indicates whether metadata should be written to storage. Raises: @@ -184,26 +183,20 @@ def initialize( 1, SPEC_VERSION, self._set_expiration_for_role(Root.type), {}, roles, True ) + root_metadata = Metadata(root, {}) + top_level_roles_metadata[Root.type] = root_metadata + # Sign all top level roles metadata - signers = dict() for role in TOP_LEVEL_ROLE_NAMES: - role_keys = keys[role] - if self.settings[f"tuf.{role}.threshold"] > len(role_keys): + if self.settings[f"tuf.{role}.threshold"] > len(role_signers[role]): raise ValueError( f"Role {role} has missing Key(s) " f"to match to defined threshold " f"{self.settings[f'tuf.{role}.threshold']}." ) - for key in role_keys: - root.add_key(role, Key.from_securesystemslib_key(key)) - - signers[role] = {key["keyid"]: SSlibSigner(key) for key in role_keys} - - root_metadata = Metadata(root, {}) - top_level_roles_metadata[Root.type] = root_metadata - for role in signers: - for signer in signers[role].values(): + for signer in role_signers[role]: + root.add_key(role, Key.from_securesystemslib_key(signer.key_dict)) top_level_roles_metadata[role].sign(signer, append=True) if store: @@ -225,17 +218,17 @@ def load_role(self, rolename: str) -> Metadata: def delegate_targets_roles( self, - payload: Dict[str, List[Tuple[DelegatedRole, List[Dict[str, Any]], datetime]]], + payload: Dict[str, List[Tuple[DelegatedRole, List[Signer], datetime]]], ) -> Metadata[Snapshot]: """ - Performs targets delegation for delegator-to-delegates items in passed payload. + Performs targets delegation for delegator-to-delegatees items in passed payload. Creates new basic delegate metadata, configures delegation in delegator metadata and bumps its version, and updates snapshot metadata accordingly. Args: - payload: Dictionary of delegator role names as keys and lists of - per-delegate info to populate the delegate metadata. + payload: Dictionary of delegator as Tuple containing ``DelegatedRole``, + list of signers and ``datetime`` expiration. Raises: FileExistsError: Delegate metadata already exists. @@ -256,7 +249,7 @@ def delegate_targets_roles( rolename=delegator, role_metadata=delegator_metadata, role_expires=self._set_expiration_for_role(delegator), - key_rolename=None, + signers=self.key_backend.get(delegator), store=True, ) snapshot_metadata = self.snapshot_update_meta( @@ -270,7 +263,7 @@ def bump_role_version( rolename: str, role_metadata: Metadata, role_expires: datetime, - key_rolename: Optional[str] = None, + signers: List[Signer], store: Optional[bool] = False, ) -> Metadata: """ @@ -280,22 +273,17 @@ def bump_role_version( rolename: Used to associate signing key and (optionally) store metadata. role_metadata: Role metadata to be bumped. role_expires: New role expiration date. - key_rolename: Used to associate a signing key by a name other than rolename. + signers: List of ``Signers``. store: Indicates whether metadata should be written to storage. Returns: Updated metadata ``tuf.api.metadata.Metadata`` """ - if key_rolename: - key_rolename = key_rolename - else: - key_rolename = rolename role_metadata.signed.expires = role_expires role_metadata.signed.version += 1 - key_rolename_keys = self.key_backend.get(key_rolename) - for key in key_rolename_keys: - role_metadata.sign(SSlibSigner(key), append=True) + for signer in signers: + role_metadata.sign(signer, append=True) if store: self._store(rolename, role_metadata) @@ -325,9 +313,9 @@ def timestamp_bump_version( Timestamp.type ) timestamp_metadata.signed.snapshot_meta = MetaFile(version=snapshot_version) - timestamp_keys = self.key_backend.get(Timestamp.type) - for key in timestamp_keys: - timestamp_metadata.sign(SSlibSigner(key), append=True) + timestamp_signers = self.key_backend.get(Timestamp.type) + for signer in timestamp_signers: + timestamp_metadata.sign(signer, append=True) if store: self._store(Timestamp.type, timestamp_metadata) @@ -358,9 +346,9 @@ def snapshot_bump_version( snapshot_metadata.signed.version += 1 snapshot_metadata.signed.expires = self._set_expiration_for_role(Snapshot.type) - snapshot_keys = self.key_backend.get(Snapshot.type) - for key in snapshot_keys: - snapshot_metadata.sign(SSlibSigner(key), append=True) + snapshot_signers = self.key_backend.get(Snapshot.type) + for signer in snapshot_signers: + snapshot_metadata.sign(signer, append=True) if store is True: self._store(Snapshot.type, snapshot_metadata) @@ -398,7 +386,7 @@ def snapshot_update_meta( def add_targets( self, payload: Dict[str, List[TargetFile]], - key_rolename: str, + target_rolename_signer: str, ) -> Metadata[Snapshot]: """ Adds target files info to targets metadata and updates snapshot. @@ -413,7 +401,7 @@ def add_targets( Args: payload: Dictionary of targets role names as keys and lists of target file info objects. - key_rolename: Targets metadata signing key in key storage. + target_rolename_signer: Targets metadata name in key storage. Returns: Updated snapshot metadata @@ -427,16 +415,16 @@ def add_targets( role_metadata.signed.targets[target.path] = target role_metadata.signed.version += 1 - role_keys = self.key_backend.get(key_rolename) - for key in role_keys: - role_metadata.sign(SSlibSigner(key), append=True) + role_signers = self.key_backend.get(target_rolename_signer) + for signer in role_signers: + role_metadata.sign(signer, append=True) self._store(rolename, role_metadata) role_metadata = self.bump_role_version( rolename=rolename, role_metadata=role_metadata, role_expires=role_metadata.signed.expires, - key_rolename=key_rolename, + signers=role_signers, store=True, ) snapshot_metadata = self.snapshot_update_meta( diff --git a/warehouse/tuf/services.py b/warehouse/tuf/services.py index 7f0f7bb83eb0..6b7225ec3f8b 100644 --- a/warehouse/tuf/services.py +++ b/warehouse/tuf/services.py @@ -22,6 +22,7 @@ from securesystemslib.interface import ( # type: ignore import_ed25519_privatekey_from_file, ) +from securesystemslib.signer import SSlibSigner # type: ignore from zope.interface import implementer from warehouse.config import Environment @@ -64,13 +65,16 @@ def create_service(cls, context, request): def get(self, rolename): """ - Returns Key objects for passed TUF role name from configured TUF key path. + Returns a list of ``securesystemslib.signer.Signer`` objects for passed + TUF role name from configured TUF key path. """ privkey_path = os.path.join(self._key_path, "tufkeys", f"{rolename}*") role_keys = glob.glob(privkey_path) keys_sslib = [ - import_ed25519_privatekey_from_file( - key, self._request.registry.settings[f"tuf.{rolename}.secret"] + SSlibSigner( + import_ed25519_privatekey_from_file( + key, self._request.registry.settings[f"tuf.{rolename}.secret"] + ) ) for key in role_keys if "pub" not in key @@ -196,11 +200,11 @@ def init_dev_repository(self): if metadata_repository.is_initialized: raise FileExistsError("TUF Metadata Repository files already exists.") - keys = dict() + role_signers = dict() for role in TOP_LEVEL_ROLE_NAMES: - keys[role] = self._key_storage_backend.get(role) + role_signers[role] = self._key_storage_backend.get(role) - metadata_repository.initialize(keys, store=True) + metadata_repository.initialize(role_signers, store=True) def init_targets_delegation(self): """ @@ -222,19 +226,19 @@ def init_targets_delegation(self): ) # Top-level 'targets' role delegates trust for all target files to 'bins' role. - keys = self._key_storage_backend.get(Role.BINS.value) + signers = self._key_storage_backend.get(Role.BINS.value) delegate_roles_payload = dict() delegate_roles_payload["targets"] = list() delegate_roles_payload["targets"].append( ( DelegatedRole( Role.BINS.value, - [key["keyid"] for key in keys], + [signer.key_dict["keyid"] for signer in signers], self._request.registry.settings[f"tuf.{Role.BINS.value}.threshold"], False, paths=["*/*", "*/*/*/*"], ), - keys, + signers, metadata_repository._set_expiration_for_role(Role.BIN_N.value), ) ) @@ -242,8 +246,8 @@ def init_targets_delegation(self): # The 'bins' role delegates trust for target files to 'bin-n' roles based on # target file path hash prefixes. delegate_roles_payload[Role.BINS.value] = list() - keys = self._key_storage_backend.get(Role.BIN_N.value) - key_ids = [key["keyid"] for key in keys] + signers = self._key_storage_backend.get(Role.BIN_N.value) + key_ids = [signer.key_dict["keyid"] for signer in signers] for bin_n_name, bin_n_hash_prefixes in hash_bins.generate(): delegate_roles_payload[Role.BINS.value].append( ( @@ -256,7 +260,7 @@ def init_targets_delegation(self): False, path_hash_prefixes=bin_n_hash_prefixes, ), - keys, + signers, metadata_repository._set_expiration_for_role(Role.BIN_N.value), ) ) @@ -325,7 +329,7 @@ def bump_bin_n_roles(self): role_expires=metadata_repository._set_expiration_for_role( Role.BINS.value ), - key_rolename=Role.BIN_N.value, + signers=self._key_storage_backend.get(Role.BIN_N.value), store=True, )