Skip to content

Commit

Permalink
Merge pull request #1437 from sechkova/hash-verification
Browse files Browse the repository at this point in the history
Add hash and length verification to MetaFile and TargetFile
  • Loading branch information
Jussi Kukkonen authored Jun 16, 2021
2 parents 51c26b7 + dcdd332 commit 39ed706
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 16 deletions.
68 changes: 64 additions & 4 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from tests import utils

import tuf.exceptions
from tuf import exceptions
from tuf.api.metadata import (
Metadata,
Root,
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_sign_verify(self):
self.assertEqual(len(metadata_obj.signatures), 1)
# ... which is valid for the correct key.
targets_key.verify_signature(metadata_obj)
with self.assertRaises(tuf.exceptions.UnsignedMetadataError):
with self.assertRaises(exceptions.UnsignedMetadataError):
snapshot_key.verify_signature(metadata_obj)

sslib_signer = SSlibSigner(self.keystore['snapshot'])
Expand All @@ -197,7 +197,7 @@ def test_sign_verify(self):
self.assertEqual(len(metadata_obj.signatures), 1)
# ... valid for that key.
timestamp_key.verify_signature(metadata_obj)
with self.assertRaises(tuf.exceptions.UnsignedMetadataError):
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(metadata_obj)


Expand Down Expand Up @@ -286,7 +286,6 @@ def test_targetfile_class(self):
targetfile_obj = TargetFile.from_dict(copy.copy(data))
self.assertEqual(targetfile_obj.to_dict(), data)


def test_metadata_snapshot(self):
snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')
Expand Down Expand Up @@ -358,6 +357,7 @@ def test_metadata_timestamp(self):
timestamp_test = Timestamp.from_dict(test_dict)
self.assertEqual(timestamp_dict['signed'], timestamp_test.to_dict())


def test_key_class(self):
keys = {
"59a4df8af818e9ed7abe0764c0b47b4240952aa0d179b5b78346c470ac30278d":{
Expand Down Expand Up @@ -644,6 +644,66 @@ def test_support_for_unrecognized_fields(self):
metadata_obj.signed.to_dict(), metadata_obj2.signed.to_dict()
)

def test_length_and_hash_validation(self):

# Test metadata files' hash and length verification.
# Use timestamp to get a MetaFile object and snapshot
# for untrusted metadata file to verify.
timestamp_path = os.path.join(
self.repo_dir, 'metadata', 'timestamp.json')
timestamp = Metadata.from_file(timestamp_path)
snapshot_metafile = timestamp.signed.meta["snapshot.json"]

snapshot_path = os.path.join(
self.repo_dir, 'metadata', 'snapshot.json')

with open(snapshot_path, "rb") as file:
# test with data as a file object
snapshot_metafile.verify_length_and_hashes(file)
file.seek(0)
data = file.read()
# test with data as bytes
snapshot_metafile.verify_length_and_hashes(data)

# test exceptions
expected_length = snapshot_metafile.length
snapshot_metafile.length = 2345
self.assertRaises(exceptions.LengthOrHashMismatchError,
snapshot_metafile.verify_length_and_hashes, data)

snapshot_metafile.length = expected_length
snapshot_metafile.hashes = {'sha256': 'incorrecthash'}
self.assertRaises(exceptions.LengthOrHashMismatchError,
snapshot_metafile.verify_length_and_hashes, data)

# test optional length and hashes
snapshot_metafile.length = None
snapshot_metafile.hashes = None
snapshot_metafile.verify_length_and_hashes(data)


# Test target files' hash and length verification
targets_path = os.path.join(
self.repo_dir, 'metadata', 'targets.json')
targets = Metadata.from_file(targets_path)
file1_targetfile = targets.signed.targets['file1.txt']
filepath = os.path.join(
self.repo_dir, 'targets', 'file1.txt')

with open(filepath, "rb") as file1:
file1_targetfile.verify_length_and_hashes(file1)

# test exceptions
expected_length = file1_targetfile.length
file1_targetfile.length = 2345
self.assertRaises(exceptions.LengthOrHashMismatchError,
file1_targetfile.verify_length_and_hashes, file1)

file1_targetfile.length = expected_length
file1_targetfile.hashes = {'sha256': 'incorrecthash'}
self.assertRaises(exceptions.LengthOrHashMismatchError,
file1_targetfile.verify_length_and_hashes, file1)


# Run unit test.
if __name__ == '__main__':
Expand Down
119 changes: 110 additions & 9 deletions tuf/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,24 @@
"""
import abc
import io
import tempfile
from collections import OrderedDict
from datetime import datetime, timedelta
from typing import Any, ClassVar, Dict, List, Mapping, Optional, Tuple, Type
from typing import (
Any,
BinaryIO,
ClassVar,
Dict,
List,
Mapping,
Optional,
Tuple,
Type,
Union,
)

from securesystemslib import hash as sslib_hash
from securesystemslib import keys as sslib_keys
from securesystemslib.signer import Signature, Signer
from securesystemslib.storage import FilesystemBackend, StorageBackendInterface
Expand Down Expand Up @@ -644,7 +657,53 @@ def remove_key(self, role: str, keyid: str) -> None:
del self.keys[keyid]


class MetaFile:
class BaseFile:
"""A base class of MetaFile and TargetFile.
Encapsulates common static methods for length and hash verification.
"""

@staticmethod
def _verify_hashes(
data: Union[bytes, BinaryIO], expected_hashes: Dict[str, str]
) -> None:
"""Verifies that the hash of 'data' matches 'expected_hashes'"""
is_bytes = isinstance(data, bytes)
for algo, exp_hash in expected_hashes.items():
if is_bytes:
digest_object = sslib_hash.digest(algo)
digest_object.update(data)
else:
# if data is not bytes, assume it is a file object
digest_object = sslib_hash.digest_fileobject(data, algo)

observed_hash = digest_object.hexdigest()
if observed_hash != exp_hash:
raise exceptions.LengthOrHashMismatchError(
f"Observed hash {observed_hash} does not match"
f"expected hash {exp_hash}"
)

@staticmethod
def _verify_length(
data: Union[bytes, BinaryIO], expected_length: int
) -> None:
"""Verifies that the length of 'data' matches 'expected_length'"""
if isinstance(data, bytes):
observed_length = len(data)
else:
# if data is not bytes, assume it is a file object
data.seek(0, io.SEEK_END)
observed_length = data.tell()

if observed_length != expected_length:
raise exceptions.LengthOrHashMismatchError(
f"Observed length {observed_length} does not match"
f"expected length {expected_length}"
)


class MetaFile(BaseFile):
"""A container with information about a particular metadata file.
Attributes:
Expand Down Expand Up @@ -682,6 +741,13 @@ def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile":
version = meta_dict.pop("version")
length = meta_dict.pop("length", None)
hashes = meta_dict.pop("hashes", None)

# Do some basic input validation
if version <= 0:
raise ValueError(f"Metafile version must be > 0, got {version}")
if length is not None and length <= 0:
raise ValueError(f"Metafile length must be > 0, got {length}")

# All fields left in the meta_dict are unrecognized.
return cls(version, length, hashes, meta_dict)

Expand All @@ -700,6 +766,22 @@ def to_dict(self) -> Dict[str, Any]:

return res_dict

def verify_length_and_hashes(self, data: Union[bytes, BinaryIO]):
"""Verifies that the length and hashes of "data" match expected
values.
Args:
data: File object or its content in bytes.
Raises:
LengthOrHashMismatchError: Calculated length or hashes do not
match expected values.
"""
if self.length is not None:
self._verify_length(data, self.length)

# Skip the check in case of an empty dictionary too
if self.hashes:
self._verify_hashes(data, self.hashes)


class Timestamp(Signed):
"""A container for the signed part of timestamp metadata.
Expand Down Expand Up @@ -927,7 +1009,7 @@ def to_dict(self) -> Dict[str, Any]:
}


class TargetFile:
class TargetFile(BaseFile):
"""A container with information about a particular target file.
Attributes:
Expand All @@ -945,12 +1027,6 @@ class TargetFile:
"""

@property
def custom(self):
if self.unrecognized_fields is None:
return None
return self.unrecognized_fields.get("custom", None)

def __init__(
self,
length: int,
Expand All @@ -961,11 +1037,24 @@ def __init__(
self.hashes = hashes
self.unrecognized_fields = unrecognized_fields or {}

@property
def custom(self):
if self.unrecognized_fields is None:
return None
return self.unrecognized_fields.get("custom", None)

@classmethod
def from_dict(cls, target_dict: Dict[str, Any]) -> "TargetFile":
"""Creates TargetFile object from its dict representation."""
length = target_dict.pop("length")
hashes = target_dict.pop("hashes")

# Do some basic validation checks
if length <= 0:
raise ValueError(f"Targetfile length must be > 0, got {length}")
if not hashes:
raise ValueError("Missing targetfile hashes")

# All fields left in the target_dict are unrecognized.
return cls(length, hashes, target_dict)

Expand All @@ -977,6 +1066,18 @@ def to_dict(self) -> Dict[str, Any]:
**self.unrecognized_fields,
}

def verify_length_and_hashes(self, data: Union[bytes, BinaryIO]):
"""Verifies that the length and hashes of "data" match expected
values.
Args:
data: File object or its content in bytes.
Raises:
LengthOrHashMismatchError: Calculated length or hashes do not
match expected values.
"""
self._verify_length(data, self.length)
self._verify_hashes(data, self.hashes)


class Targets(Signed):
"""A container for the signed part of targets metadata.
Expand Down
5 changes: 2 additions & 3 deletions tuf/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def __repr__(self):
class UnsupportedAlgorithmError(Error):
"""Indicate an error while trying to identify a user-specified algorithm."""

class LengthOrHashMismatchError(Error):
"""Indicate an error while checking the length and hash values of an object"""

class BadHashError(Error):
"""Indicate an error while checking the value of a hash object."""
Expand All @@ -88,9 +90,6 @@ def __repr__(self):
# self.__class__.__name__ + '(' + repr(self.expected_hash) + ', ' +
# repr(self.observed_hash) + ')')




class BadVersionNumberError(Error):
"""Indicate an error for metadata that contains an invalid version number."""

Expand Down

0 comments on commit 39ed706

Please sign in to comment.