Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ngclient: use Metadata.verify_delegate #1509

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 7 additions & 59 deletions tuf/ngclient/_internal/trusted_metadata_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,47 +71,11 @@
from typing import Dict, Iterator, Optional

from tuf import exceptions
from tuf.api.metadata import Metadata, Root, Targets
from tuf.api.metadata import Metadata
from tuf.api.serialization import DeserializationError

logger = logging.getLogger(__name__)

# This is a placeholder until ...
# TODO issue 1306: implement this in Metadata API
def verify_with_threshold(
delegator: Metadata, role_name: str, unverified: Metadata
) -> bool:
"""Verify 'unverified' with keys and threshold defined in delegator"""
role = None
keys = {}
if isinstance(delegator.signed, Root):
keys = delegator.signed.keys
role = delegator.signed.roles.get(role_name)
elif isinstance(delegator.signed, Targets):
if delegator.signed.delegations:
keys = delegator.signed.delegations.keys
# role names are unique: first match is enough
roles = delegator.signed.delegations.roles
role = next((r for r in roles if r.name == role_name), None)
else:
raise ValueError("Call is valid only on delegator metadata")

if role is None:
raise ValueError(f"Delegated role {role_name} not found")

# verify that delegate is signed by correct threshold of unique keys
unique_keys = set()
for keyid in role.keyids:
key = keys[keyid]
try:
key.verify_signature(unverified)
unique_keys.add(key.keyval["public"])
except Exception as e: # pylint: disable=broad-except
# TODO specify the Exceptions (see issue #1351)
logger.info("verify failed: %s", e)

return len(unique_keys) >= role.threshold


class TrustedMetadataSet(abc.Mapping):
"""Internal class to keep track of trusted metadata in Updater
Expand Down Expand Up @@ -207,20 +171,14 @@ def update_root(self, data: bytes):

if self.root is not None:
# We are not loading initial trusted root: verify the new one
if not verify_with_threshold(self.root, "root", new_root):
raise exceptions.UnsignedMetadataError(
"New root is not signed by root", new_root.signed
)
self.root.verify_delegate("root", new_root)

if new_root.signed.version != self.root.signed.version + 1:
raise exceptions.ReplayedMetadataError(
"root", new_root.signed.version, self.root.signed.version
)

if not verify_with_threshold(new_root, "root", new_root):
raise exceptions.UnsignedMetadataError(
"New root is not signed by itself", new_root.signed
)
new_root.verify_delegate("root", new_root)

self._trusted_set["root"] = new_root
logger.debug("Updated root")
Expand Down Expand Up @@ -270,10 +228,7 @@ def update_timestamp(self, data: bytes):
f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
)

if not verify_with_threshold(self.root, "timestamp", new_timestamp):
raise exceptions.UnsignedMetadataError(
"New timestamp is not signed by root", new_timestamp.signed
)
self.root.verify_delegate("timestamp", new_timestamp)

# If an existing trusted timestamp is updated,
# check for a rollback attack
Expand Down Expand Up @@ -339,10 +294,7 @@ def update_snapshot(self, data: bytes):
f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
)

if not verify_with_threshold(self.root, "snapshot", new_snapshot):
raise exceptions.UnsignedMetadataError(
"New snapshot is not signed by root", new_snapshot.signed
)
self.root.verify_delegate("snapshot", new_snapshot)

if (
new_snapshot.signed.version
Expand Down Expand Up @@ -408,7 +360,7 @@ def update_delegated_targets(
if self.snapshot is None:
raise RuntimeError("Cannot load targets before snapshot")

delegator = self.get(delegator_name)
delegator: Optional[Metadata] = self.get(delegator_name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we annotating this as Optional so that mypy can understand that None is a valid value for delegator? Why wasn't this required before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, mypy was not complaining about this. I did it to help the code editor figure it out. I can separate it in another commit or explain it in the commit message if it looks weird in between the other changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look a bit out of place in this commit, though it's not an objectionable change by itself.

if delegator is None:
raise RuntimeError("Cannot load targets before delegator")

Expand Down Expand Up @@ -438,11 +390,7 @@ def update_delegated_targets(
f"Expected 'targets', got '{new_delegate.signed.type}'"
)

if not verify_with_threshold(delegator, role_name, new_delegate):
raise exceptions.UnsignedMetadataError(
f"New {role_name} is not signed by {delegator_name}",
new_delegate,
)
delegator.verify_delegate(role_name, new_delegate)

if new_delegate.signed.version != meta.version:
raise exceptions.BadVersionNumberError(
Expand Down