From e20fb54f5ce53240ab5686b532434a956151ba3b Mon Sep 17 00:00:00 2001 From: Sebastien Awwad Date: Mon, 25 Feb 2019 12:42:26 -0500 Subject: [PATCH] <~> DO NOT MERGE: status freeze to switch tracks saving changes pending commit here, to switch tracks from ASN.1 support itself to finally resolve #660, which is causing problems here. Signed-off-by: Sebastien Awwad --- requirements.txt | 9 +- tests/aggregate_tests.py | 1 + tests/test_asn1_convert.py | 62 ++- tests/test_sig.py | 175 +++---- tuf/client/updater.py | 31 +- tuf/developer_tool.py | 9 +- tuf/encoding/asn1_convert.py | 22 +- tuf/encoding/asn1_metadata_definitions.py | 27 +- tuf/encoding/metadata_definitions.asn1 | 6 +- tuf/encoding/util.py | 607 ++++++++++++++++++++++ tuf/exceptions.py | 3 +- tuf/formats.py | 6 + tuf/repository_lib.py | 29 +- tuf/repository_tool.py | 7 +- tuf/sig.py | 598 +++++++++++++++------ 15 files changed, 1283 insertions(+), 309 deletions(-) create mode 100644 tuf/encoding/util.py diff --git a/requirements.txt b/requirements.txt index a6dc9a4a11..df02f2a092 100644 --- a/requirements.txt +++ b/requirements.txt @@ -129,9 +129,12 @@ pynacl==1.3.0 \ requests==2.21.0 \ --hash=sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e \ --hash=sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b -securesystemslib==0.11.3 \ - --hash=sha256:368ef6f6cc40d3636e271485c7adb21c53c22200bab44a2fe8af62886a01c3d5 \ - --hash=sha256:cbd1f7f1af2f2921be33b9fd17384705f5f4147d3a8b5d95b33ec3ce2213f176 +# Temporary. Switch to main-branch git dependency when the PR is merged, then +# switch back to a normal PyPI dependency when a release is made. +git+https://github.com/secure-systems-lab/securesystemslib.git@separate_data_serialization_from_signing +# securesystemslib==0.11.3 \ +# --hash=sha256:368ef6f6cc40d3636e271485c7adb21c53c22200bab44a2fe8af62886a01c3d5 \ +# --hash=sha256:cbd1f7f1af2f2921be33b9fd17384705f5f4147d3a8b5d95b33ec3ce2213f176 six==1.12.0 \ --hash=sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c \ --hash=sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73 diff --git a/tests/aggregate_tests.py b/tests/aggregate_tests.py index 2654bee6a8..ce301d9bef 100755 --- a/tests/aggregate_tests.py +++ b/tests/aggregate_tests.py @@ -59,6 +59,7 @@ # configured to use. Note also that this TUF implementation does not support # any Python versions <2.7 or any Python3 versions <3.4. VERSION_SPECIFIC_TESTS = { + 'test_asn1_convert': {'major': 42, 'minor': 42}, # skip me for now. DEBUG ONLY 'test_proxy_use': {'major': 2, 'minor': 7}} # Run test only if Python2.7 # Further example: # 'test_abc': {'major': 2} # Run test only if Python2 diff --git a/tests/test_asn1_convert.py b/tests/test_asn1_convert.py index ab795cbad1..b204e094aa 100644 --- a/tests/test_asn1_convert.py +++ b/tests/test_asn1_convert.py @@ -28,6 +28,7 @@ import unittest unittest.util._MAX_LENGTH=20000 # DEBUG import os +import os.path import logging import binascii # for bytes to hex # Dependency Imports @@ -377,7 +378,7 @@ def test_key_conversion(self): - def test_signed_portion_of_root_conversion(self): + def test_root_conversion(self): root = { 'signatures': [ { @@ -437,22 +438,25 @@ def test_signed_portion_of_root_conversion(self): # Test by calling the general to_asn1 and from_asn1 calls that will call # the helper functions. # 'signed' subsection first. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=root['signed'], datatype=asn1_defs.RootMetadata, expected_der=root_data_expected_der) + write_test_data_file('root_signed_portion_only.der', data_der) + # Then the whole thing. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=root, datatype=asn1_defs.RootEnvelope, expected_der=root_signable_expected_der) + write_test_data_file('root_envelope.der', data_der) - def test_signed_portion_of_timestamp_conversion(self): + def test_timestamp_conversion(self): timestamp = { 'signatures': [ { @@ -486,22 +490,27 @@ def test_signed_portion_of_timestamp_conversion(self): # Test by calling the general to_asn1 and from_asn1 calls that will call # the helper functions. # 'signed' subsection first. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=timestamp['signed'], datatype=asn1_defs.TimestampMetadata, expected_der=timestamp_data_expected_der) + write_test_data_file('timestamp_signed_portion_only.der', data_der) + + # Then the whole thing. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=timestamp, datatype=asn1_defs.TimestampEnvelope, expected_der=timestamp_signable_expected_der) + write_test_data_file('timestamp_envelope.der', data_der) - def test_signed_portion_of_snapshot_conversion(self): + + def test_snapshot_conversion(self): snapshot = { 'signatures': [ { @@ -536,17 +545,23 @@ def test_signed_portion_of_snapshot_conversion(self): # Test by calling the general to_asn1 and from_asn1 calls that will call # the helper functions. # 'signed' subsection first. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=snapshot['signed'], datatype=asn1_defs.SnapshotMetadata, expected_der=snapshot_data_expected_der) + write_test_data_file('snapshot_signed_contents.der', data_der) + + # Then the whole thing. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=snapshot, datatype=asn1_defs.SnapshotEnvelope, expected_der=snapshot_signable_expected_der) + write_test_data_file('snapshot_envelope.der', data_der) + + @@ -623,17 +638,20 @@ def test_targets_conversion(self): # Test by calling the general to_asn1 and from_asn1 calls that will call # the helper functions. # 'signed' subsection first. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=targets['signed'], datatype=asn1_defs.TargetsMetadata, expected_der=targets_data_expected_der) + write_test_data_file('targets_w_delegation_and_targets_signed_contents.der', data_der) + # Then the whole thing. - self.conversion_check( + data_asn1, data_der = self.conversion_check( data=targets, datatype=asn1_defs.TargetsEnvelope, expected_der=targets_signable_expected_der) + write_test_data_file('targets_w_delegation_and_targets_full_envelope.der', data_der) @@ -789,6 +807,28 @@ def assert_asn1_obj_equivalent(self, obj1, obj2): + + +def write_test_data_file(fname, data_der): + + if not os.path.exists('der_test_data'): + os.mkdir('der_test_data') + + # Lazy protection. + assert '/' not in fname + assert '\\' not in fname + assert '~' not in fname + assert '..' not in fname + fullpath = os.path.abspath(os.path.join('der_test_data', fname)) + assert fullpath.startswith(os.getcwd()) + + with open(fullpath, 'wb') as fobj: + fobj.write(data_der) + + + + + # Run unit test. if __name__ == '__main__': unittest.main() diff --git a/tests/test_sig.py b/tests/test_sig.py index 60a14c36da..1650a96ecb 100755 --- a/tests/test_sig.py +++ b/tests/test_sig.py @@ -31,6 +31,7 @@ import unittest import logging +import copy import tuf import tuf.log @@ -39,6 +40,7 @@ import tuf.roledb import tuf.sig import tuf.exceptions +import tuf.encoding.util import securesystemslib import securesystemslib.keys @@ -51,6 +53,31 @@ KEYS.append(securesystemslib.keys.generate_rsa_key(2048)) +tuf.DEBUG = False # TODO: <~> REMOVE THIS! This was for a particular test. + +# An example of a piece of signable metadata that has no signatures yet. +SIGNABLE_TIMESTAMP = { + "signatures": [ + # a valid signature, for reference: + # {"keyid": "8a1c4a3ac2d515dec982ba9910c5fd79b91ae57f625b9cff25d06bf0a61c1758", "sig": "7dddbfe94d6d80253433551700ea6dfe4171a33f1227a07830e951900b8325d67c3dce6410b9cf55abefa3dfca0b57814a4965c2d6ee60bb0336755cd0557e03"} + ], + "signed": { + "_type": "timestamp", + "expires": "2030-01-01T00:00:00Z", + "meta": { + "snapshot.json": { + "hashes": { + "sha256": "6990b6586ed545387c6a51db62173b903a5dff46b17b1bc3fe1e6ca0d0844f2f" + }, + "length": 554, + "version": 1 + } + }, + "spec_version": "1.0", + "version": 1 + } +} + class TestSig(unittest.TestCase): def setUp(self): @@ -62,9 +89,10 @@ def tearDown(self): def test_get_signature_status_no_role(self): - signable = {'signed': 'test', 'signatures': []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) # A valid, but empty signature status. + signable['signatures'] = [] sig_status = tuf.sig.get_signature_status(signable) self.assertTrue(securesystemslib.formats.SIGNATURESTATUS_SCHEMA.matches(sig_status)) @@ -83,7 +111,7 @@ def test_get_signature_status_no_role(self): # when doing the following action. Here we know 'signable' # has only one signature so it's okay. signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) @@ -91,20 +119,22 @@ def test_get_signature_status_no_role(self): self.assertRaises(securesystemslib.exceptions.FormatError, tuf.sig.get_signature_status, signable, 1) - # Not allowed to call verify() without having specified a role. - args = (signable, None) - self.assertRaises(securesystemslib.exceptions.Error, tuf.sig.verify, *args) + # Not allowed to call verify_signable() without having specified a role. + with self.assertRaises(securesystemslib.exceptions.Error): + tuf.sig.verify_signable(signable, None) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) def test_get_signature_status_bad_sig(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) - signable['signed'] += 'signature no longer matches signed data' + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) + + # Alter the metadata so that the signature over it is no longer correct. + signable['signed']['version'] += 1 tuf.keydb.add_key(KEYS[0]) threshold = 1 @@ -121,7 +151,7 @@ def test_get_signature_status_bad_sig(self): self.assertEqual([], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) - self.assertFalse(tuf.sig.verify(signable, 'Root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'Root')) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -130,10 +160,10 @@ def test_get_signature_status_bad_sig(self): def test_get_signature_status_unknown_signing_scheme(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) valid_scheme = KEYS[0]['scheme'] KEYS[0]['scheme'] = 'unknown_signing_scheme' @@ -153,7 +183,7 @@ def test_get_signature_status_unknown_signing_scheme(self): self.assertEqual([KEYS[0]['keyid']], sig_status['unknown_signing_schemes']) - self.assertFalse(tuf.sig.verify(signable, 'root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'root')) # Done. Let's remove the added key(s) from the key database. KEYS[0]['scheme'] = valid_scheme @@ -163,10 +193,10 @@ def test_get_signature_status_unknown_signing_scheme(self): def test_get_signature_status_single_key(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) threshold = 1 roleinfo = tuf.formats.make_role_metadata( @@ -184,16 +214,23 @@ def test_get_signature_status_single_key(self): self.assertEqual([], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) - self.assertTrue(tuf.sig.verify(signable, 'Root')) + self.assertTrue(tuf.sig.verify_signable(signable, 'Root')) - # Test for an unknown signature when 'role' is left unspecified. + # <~> (remove this comment and add to commit summary) + # The old behavior was wrong, I think. The key is known ----- + # If get_signature_status is not provided authorized keyids and threshold, + # and is also not provided a role to use to determine what keyids and + # threshold are authorized, then we expect any good signature to come back + # as untrustworthy, and any bad signature to come back as a bad signature. + # tuf.DEBUG = True # TODO: <~> Remove this. sig_status = tuf.sig.get_signature_status(signable) + # tuf.DEBUG = False self.assertEqual(0, sig_status['threshold']) self.assertEqual([], sig_status['good_sigs']) self.assertEqual([], sig_status['bad_sigs']) - self.assertEqual([KEYS[0]['keyid']], sig_status['unknown_sigs']) - self.assertEqual([], sig_status['untrusted_sigs']) + self.assertEqual([], sig_status['unknown_sigs']) + self.assertEqual([KEYS[0]['keyid']], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) # Done. Let's remove the added key(s) from the key database. @@ -203,10 +240,10 @@ def test_get_signature_status_single_key(self): def test_get_signature_status_below_threshold(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) threshold = 2 @@ -224,7 +261,7 @@ def test_get_signature_status_below_threshold(self): self.assertEqual([], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) - self.assertFalse(tuf.sig.verify(signable, 'Root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'Root')) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -234,13 +271,13 @@ def test_get_signature_status_below_threshold(self): def test_get_signature_status_below_threshold_unrecognized_sigs(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) # Two keys sign it, but only one of them will be trusted. signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[2], signable['signed'])) + KEYS[2], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) tuf.keydb.add_key(KEYS[1]) @@ -259,7 +296,7 @@ def test_get_signature_status_below_threshold_unrecognized_sigs(self): self.assertEqual([], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) - self.assertFalse(tuf.sig.verify(signable, 'Root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'Root')) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -270,14 +307,14 @@ def test_get_signature_status_below_threshold_unrecognized_sigs(self): def test_get_signature_status_below_threshold_unauthorized_sigs(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) # Two keys sign it, but one of them is only trusted for a different # role. signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[1], signable['signed'])) + KEYS[1], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) tuf.keydb.add_key(KEYS[1]) @@ -298,7 +335,7 @@ def test_get_signature_status_below_threshold_unauthorized_sigs(self): self.assertEqual([KEYS[1]['keyid']], sig_status['untrusted_sigs']) self.assertEqual([], sig_status['unknown_signing_schemes']) - self.assertFalse(tuf.sig.verify(signable, 'Root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'Root')) self.assertRaises(tuf.exceptions.UnknownRoleError, tuf.sig.get_signature_status, signable, 'unknown_role') @@ -314,18 +351,18 @@ def test_get_signature_status_below_threshold_unauthorized_sigs(self): def test_check_signatures_no_role(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) # No specific role we're considering. It's invalid to use the - # function tuf.sig.verify() without a role specified because - # tuf.sig.verify() is checking trust, as well. - args = (signable, None) - self.assertRaises(securesystemslib.exceptions.Error, tuf.sig.verify, *args) + # function tuf.sig.verify_signable() without a role specified because + # tuf.sig.verify_signable() is checking trust, as well. + with self.assertRaises(securesystemslib.exceptions.Error): + tuf.sig.verify_signable(signable, None) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -333,9 +370,9 @@ def test_check_signatures_no_role(self): def test_verify_single_key(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) threshold = 1 @@ -343,9 +380,9 @@ def test_verify_single_key(self): [KEYS[0]['keyid']], threshold) tuf.roledb.add_role('Root', roleinfo) - # This will call verify() and return True if 'signable' is valid, + # This will call verify_signable() and return True if 'signable' is valid, # False otherwise. - self.assertTrue(tuf.sig.verify(signable, 'Root')) + self.assertTrue(tuf.sig.verify_signable(signable, 'Root')) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -355,13 +392,13 @@ def test_verify_single_key(self): def test_verify_unrecognized_sig(self): - signable = {'signed' : 'test', 'signatures' : []} + signable = copy.deepcopy(SIGNABLE_TIMESTAMP) # Two keys sign it, but only one of them will be trusted. signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) + KEYS[0], tuf.encoding.util.serialize(signable['signed']))) signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[2], signable['signed'])) + KEYS[2], tuf.encoding.util.serialize(signable['signed']))) tuf.keydb.add_key(KEYS[0]) tuf.keydb.add_key(KEYS[1]) @@ -370,7 +407,7 @@ def test_verify_unrecognized_sig(self): [KEYS[0]['keyid'], KEYS[1]['keyid']], threshold) tuf.roledb.add_role('Root', roleinfo) - self.assertFalse(tuf.sig.verify(signable, 'Root')) + self.assertFalse(tuf.sig.verify_signable(signable, 'Root')) # Done. Let's remove the added key(s) from the key database. tuf.keydb.remove_key(KEYS[0]['keyid']) @@ -381,63 +418,19 @@ def test_verify_unrecognized_sig(self): - def test_generate_rsa_signature(self): - signable = {'signed' : 'test', 'signatures' : []} - - signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) - - self.assertEqual(1, len(signable['signatures'])) - signature = signable['signatures'][0] - self.assertEqual(KEYS[0]['keyid'], signature['keyid']) - - returned_signature = tuf.sig.generate_rsa_signature(signable['signed'], KEYS[0]) - self.assertTrue(securesystemslib.formats.SIGNATURE_SCHEMA.matches(returned_signature)) - - signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[1], signable['signed'])) - - self.assertEqual(2, len(signable['signatures'])) - signature = signable['signatures'][1] - self.assertEqual(KEYS[1]['keyid'], signature['keyid']) - - - - def test_may_need_new_keys(self): - # One untrusted key in 'signable'. - signable = {'signed' : 'test', 'signatures' : []} - - signable['signatures'].append(securesystemslib.keys.create_signature( - KEYS[0], signable['signed'])) - - tuf.keydb.add_key(KEYS[1]) - threshold = 1 - roleinfo = tuf.formats.make_role_metadata( - [KEYS[1]['keyid']], threshold) - tuf.roledb.add_role('Root', roleinfo) - - sig_status = tuf.sig.get_signature_status(signable, 'Root') - - self.assertTrue(tuf.sig.may_need_new_keys(sig_status)) - - - # Done. Let's remove the added key(s) from the key database. - tuf.keydb.remove_key(KEYS[1]['keyid']) - - # Remove the roles. - tuf.roledb.remove_role('Root') def test_signable_has_invalid_format(self): - # get_signature_status() and verify() validate 'signable' before continuing. + # get_signature_status() and verify_signable() verify 'signable' before + # continuing. # 'signable' must be of the form: {'signed': , 'signatures': [{}]}. # Object types are checked as well. - signable = {'not_signed' : 'test', 'signatures' : []} + signable = {'not_signed' : {'test'}, 'signatures' : []} args = (signable['not_signed'], KEYS[0]) self.assertRaises(securesystemslib.exceptions.FormatError, tuf.sig.get_signature_status, *args) # 'signatures' value must be a list. Let's try a dict. - signable = {'signed' : 'test', 'signatures' : {}} + signable = {'signed' : {'type': 'some_role'}, 'signatures' : {}} args = (signable['signed'], KEYS[0]) self.assertRaises(securesystemslib.exceptions.FormatError, tuf.sig.get_signature_status, *args) diff --git a/tuf/client/updater.py b/tuf/client/updater.py index a7ffaaa971..06652f2c8a 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -139,6 +139,7 @@ import tuf.roledb import tuf.sig import tuf.exceptions +import tuf.encoding.util import securesystemslib.hash import securesystemslib.keys @@ -833,17 +834,23 @@ def _load_metadata_from_file(self, metadata_set, metadata_role): # Load the file. The loaded object should conform to # 'tuf.formats.SIGNABLE_SCHEMA'. try: - metadata_signable = securesystemslib.util.load_json_file( + metadata_signable = tuf.encoding.util.deserialize_file( metadata_filepath) # Although the metadata file may exist locally, it may not # be a valid json file. On the next refresh cycle, it will be # updated as required. If Root if cannot be loaded from disk # successfully, an exception should be raised by the caller. - except securesystemslib.exceptions.Error: + except (securesystemslib.exceptions.Error, tuf.exceptions.Error): return - tuf.formats.check_signable_object_format(metadata_signable) + # JUST DEBUGGING. Get rid of the try/except again afterwards. + try: + tuf.formats.check_signable_object_format(metadata_signable) + except securesystemslib.exceptions.FormatError: + import pdb; pdb.set_trace() + print('debugging') + # Extract the 'signed' role object from 'metadata_signable'. metadata_object = metadata_signable['signed'] @@ -1134,8 +1141,8 @@ def _update_root_metadata(self, current_root_metadata): latest_root_metadata_file = self._get_metadata_file( 'root', 'root.json', DEFAULT_ROOT_UPPERLENGTH, None) - latest_root_metadata = securesystemslib.util.load_json_string( - latest_root_metadata_file.read().decode('utf-8')) + latest_root_metadata = tuf.encoding.util.deserialize( + latest_root_metadata_file.read()) next_version = current_root_metadata['version'] + 1 @@ -1402,10 +1409,10 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, None. """ - metadata = metadata_file_object.read().decode('utf-8') + metadata = metadata_file_object.read() try: - metadata_signable = securesystemslib.util.load_json_string(metadata) + metadata_signable = tuf.encoding.util.deserialize(metadata) except Exception as exception: raise tuf.exceptions.InvalidMetadataJSONError(exception) @@ -1423,7 +1430,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, # metadata. # Verify the signature on the downloaded metadata object. - valid = tuf.sig.verify(metadata_signable, metadata_role, + valid = tuf.sig.verify_signable(metadata_signable, metadata_role, self.repository_name) if not valid: @@ -1487,8 +1494,7 @@ def _get_metadata_file(self, metadata_role, remote_filename, # Verify 'file_object' according to the callable function. # 'file_object' is also verified if decompressed above (i.e., the # uncompressed version). - metadata_signable = \ - securesystemslib.util.load_json_string(file_object.read().decode('utf-8')) + metadata_signable = tuf.encoding.util.deserialize(file_object.read()) # Determine if the specification version number is supported. It is # assumed that "spec_version" is in (major.minor.fix) format, (for @@ -1570,7 +1576,8 @@ def _verify_root_chain_link(self, rolename, current_root_metadata, current_root_role = current_root_metadata['roles'][rolename] # Verify next metadata with current keys/threshold - valid = tuf.sig.verify(next_root_metadata, rolename, self.repository_name, + valid = tuf.sig.verify_signable( + next_root_metadata, rolename, self.repository_name, current_root_role['threshold'], current_root_role['keyids']) if not valid: @@ -1766,7 +1773,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None): # Note that the 'move' method comes from securesystemslib.util's TempFile class. # 'metadata_file_object' is an instance of securesystemslib.util.TempFile. metadata_signable = \ - securesystemslib.util.load_json_string(metadata_file_object.read().decode('utf-8')) + tuf.encoding.util.deserialize(metadata_file_object.read()) metadata_file_object.move(current_filepath) diff --git a/tuf/developer_tool.py b/tuf/developer_tool.py index 613906fc4c..a478325979 100755 --- a/tuf/developer_tool.py +++ b/tuf/developer_tool.py @@ -46,6 +46,7 @@ import tuf.log import tuf.repository_lib as repo_lib import tuf.repository_tool +import tuf.encoding.util import securesystemslib import securesystemslib.util @@ -470,7 +471,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, # non-partial write() else: - if tuf.sig.verify(signable, rolename, repository_name): + if tuf.sig.verify_signable(signable, rolename, repository_name): metadata['version'] = metadata['version'] + 1 signable = repo_lib.sign_metadata(metadata, roleinfo['signing_keyids'], metadata_filename, repository_name) @@ -478,7 +479,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, # Write the metadata to file if contains a threshold of signatures. signable['signatures'].extend(roleinfo['signatures']) - if tuf.sig.verify(signable, rolename, repository_name) or write_partial: + if tuf.sig.verify_signable(signable, rolename, repository_name) or write_partial: repo_lib._remove_invalid_and_duplicate_signatures(signable, repository_name) filename = repo_lib.write_metadata_file(signable, metadata_filename, metadata['version'], False) @@ -832,7 +833,7 @@ def load_project(project_directory, prefix='', new_targets_location=None, # Load the project's metadata. targets_metadata_path = os.path.join(project_directory, metadata_directory, project_filename) - signable = securesystemslib.util.load_json_file(targets_metadata_path) + signable = tuf.encoding.util.deserialize_file(targets_metadata_path) tuf.formats.check_signable_object_format(signable) targets_metadata = signable['signed'] @@ -898,7 +899,7 @@ def load_project(project_directory, prefix='', new_targets_location=None, continue signable = None - signable = securesystemslib.util.load_json_file(metadata_path) + signable = tuf.encoding.util.deserialize_file(metadata_path) # Strip the prefix from the local working copy, it will be added again # when the targets metadata is written to disk. diff --git a/tuf/encoding/asn1_convert.py b/tuf/encoding/asn1_convert.py index c3c6f09b42..23ff2684ed 100644 --- a/tuf/encoding/asn1_convert.py +++ b/tuf/encoding/asn1_convert.py @@ -35,8 +35,12 @@ import tuf.exceptions -# DEBUG ONLY; remove. -DEBUG_MODE = True +# DEBUG ONLY; remove. These messages are low level and frequent enough that +# they're not even appropriate for logger.debug(), so I have this here until +# I'm satisfied with the behavior of this module. +# TODO: Add a few logger messages at higher level (success and failure of +# conversions, probably at the end to_asn1 and from_asn1.) +DEBUG_MODE = False recursion_level = -1 def debug(msg): if DEBUG_MODE: @@ -664,6 +668,10 @@ def _structlike_dict_to_asn1(data, datatype): 'to convert subcomponent named "' + element_name_asn1 + '", of type ' + str(element_type)) + if element_type is None: + import pdb; pdb.set_trace() + print('debugging') + element_asn1 = to_asn1(data[element_name_py], element_type) asn1_obj[element_name_asn1] = element_asn1 @@ -742,6 +750,13 @@ def from_asn1(asn1_obj): and convert to ASN.1 and back, you will get a dict back that uses strings for those indices instead. There are probably a few quirky edge cases like this to keep in mind. + + Cases addressed here: asn1_obj is: + Void + Primitive + List-like to be converted to list + List-like to be converted to dict + Struct-like """ debug('from_asn1() called to convert from ' + str(type(asn1_obj)) + '. asn1crypto data: ' + str(asn1_obj)) @@ -817,6 +832,9 @@ def from_asn1(asn1_obj): elif is_structlike_datatype(type(asn1_obj)): + # In the case of converting from a Sequence/Set, we convert back to a dict, + # assuming that the structure of the dict should be the same as the + # structure of the ASN.1 object. debug('Converting to struct-like dict from ' + str(type(asn1_obj))) data = _structlike_dict_from_asn1(asn1_obj) debug( diff --git a/tuf/encoding/asn1_metadata_definitions.py b/tuf/encoding/asn1_metadata_definitions.py index 297033ccf2..16384c1f76 100644 --- a/tuf/encoding/asn1_metadata_definitions.py +++ b/tuf/encoding/asn1_metadata_definitions.py @@ -551,7 +551,24 @@ class TargetsEnvelope(ac.Sequence): - +class AnyMetadata(ac.Choice): + _alternatives = [ + ('root', RootMetadata), + ('timestamp', TimestampMetadata), + ('snapshot', SnapshotMetadata), + ('targets', TargetsMetadata)] + +class AnyEnvelope(ac.Choice): + _alternatives = [ + ('root', RootEnvelope), + ('timestamp', TimestampEnvelope), + ('snapshot', SnapshotEnvelope), + ('targets', TargetsEnvelope)] + +class SomeType(ac.Choice): + _alternatives = [ + ('int', ac.Integer), + ('oct', ac.OctetString)] # # Or we could define the following, instead of the above four Envelope classes. @@ -567,3 +584,11 @@ class TargetsEnvelope(ac.Sequence): # _fields = [ # ('signatures', Signatures), # ('signed', AnyMetadata)] + + +KNOWN_TYPE_STRING_MAPPINGS = { + 'root': RootMetadata, + 'timestamp': TimestampMetadata, + 'snapshot': SnapshotMetadata, + 'targets': TargetsMetadata} + diff --git a/tuf/encoding/metadata_definitions.asn1 b/tuf/encoding/metadata_definitions.asn1 index e639230ae8..763b837ac6 100644 --- a/tuf/encoding/metadata_definitions.asn1 +++ b/tuf/encoding/metadata_definitions.asn1 @@ -228,12 +228,12 @@ TUFMetadataDefinitions DEFINITIONS AUTOMATIC TAGS ::= BEGIN expires VisibleString, -- date&time, UTC, as a UNIX timestamp version IntegerNatural, - meta SET OF RoleInfo -- unordered set + meta SET OF RoleVersion -- unordered set -- What these should probably be called instead: - -- role-infos SET OF RoleInfo + -- role-versions SET OF RoleVersion } - RoleInfo ::= SEQUENCE { + RoleVersion ::= SEQUENCE { filename VisibleString, version IntegerNatural -- Older versions of TUF used a hash here instead of a version number. diff --git a/tuf/encoding/util.py b/tuf/encoding/util.py new file mode 100644 index 0000000000..6be98ea056 --- /dev/null +++ b/tuf/encoding/util.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python + +""" + + util.py + + + See LICENSE-MIT OR LICENSE for licensing information. + + + tuf.encoding.util performs serialization and deserialization of JSON and + ASN.1/DER, using existing functions in securesystemslib.util for JSON and + asn1crypto for ASN.1. + + Provides: + serialize() + deserialize_der() + deserialize() + deserialize_der_file() + deserialize_file() +""" + +# Support some Python 3 style and functionality in Python 2 (example: print()) +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import logging + +import six +import asn1crypto.core + +import securesystemslib +import securesystemslib.formats +import securesystemslib.util +import tuf +import tuf.formats +import tuf.encoding.asn1_convert as asn1_convert +import tuf.encoding.asn1_metadata_definitions as asn1_defs + +# See 'log.py' to learn how logging is handled in TUF. +logger = logging.getLogger('tuf_util') + + +def serialize(obj): + """ + + Encode an asn1crypto object or JSON-compatible dictionary as bytes, in the + serialized form of that object: + If obj is an asn1crypto object, it is converted to ASN.1/DER bytes. + If obj is a dictionary, it is converted into UTF-8-encoded JSON bytes. + + Wrapper for 'securesystemslib.formats.encode_canonical()' and the + asn1crypto method 'dump()'. + + + obj + an asn1crypto object or JSON-compatible dictionary. + # TODO: Consider defining what makes a dict JSON-compatible somewhere and + # referencing that here.... + + + tuf.exceptions.Error if both attempts to deserialize from JSON and DER fail. + + + If given JSON data, returns a dictionary. + Otherwise, if given ASN.1/DER data, returns an asn1crypto object. + """ + + if isinstance(obj, asn1crypto.core.Asn1Value): + # If the given object is an asn1crypto object, then it has a dump() method + # that returns the serialized DER bytes that represent the ASN.1 object. + return obj.dump() + + elif isinstance(obj, dict): + # If the given object is instead a dictionary, assume it is a dictionary + # that can be converted to canonicalized JSON and encoded as UTF-8. + return securesystemslib.formats.encode_canonical(obj).encode('utf-8') + + else: + raise tuf.exceptions.FormatError( + 'Received an object that appears to be neither an asn1crypto object ' + 'nor a dictionary.') + +# def serialize_json(dictionary): +# return securesystemslib.formats.encode_canonical(dictionary).encode('utf-8') + + + + + +def deserialize(data, convert=True): + """ + + bytes encoding JSON or ASN.1/DER -> dictionary or asn1crypto object + + + Wrapper for deserialize_json and deserialize_der. Also see docstrings there. + + Deserializes the given bytes of ASN.1/DER or JSON/UTF-8. + Produces an asn1crypto object (from ASN.1) or dictionary object (from JSON). + + Tries JSON UTF-8 first. If that fails, tries ASN.1 DER. + + + data + bytes. Data in either ASN.1/DER or JSON/UTF8. + + + tuf.exceptions.Error if both attempts to deserialize from JSON and DER fail. + + + If given JSON data, returns a dictionary. + Otherwise, if given ASN.1/DER data, returns an asn1crypto object. + + """ + exception_msgs = [] + + + # Try JSON first. + try: + deserialized = deserialize_json(data) + + except tuf.exceptions.InvalidMetadataJSONError as e: + exception_msgs.append(str(e)) + + else: + return deserialized + + + # Try ASN.1 second. + try: + deserialized = deserialize_der(data) + + except Exception as e: + # TODO: <~> Refine expected errors. Catch only expected DER errors...? + exception_msgs.append(str(e)) + # TODO: Create or choose a better error class for the below. + raise tuf.exceptions.Error( + 'Unable to deserialize given data as JSON in UTF-8 or as ASN.1 in DER. ' + 'Exceptions follow: ' + str(exception_msgs)) + + else: + return deserialized + + + + + +def deserialize_file(filepath, convert=True): + """ + + Deserialize a JSON or ASN.1/DER object from a file containing the object. + Tries JSON first, and if that fails, tries ASN.1. + + Wrapper for deserialize_json_file and deserialize_der_file. Also see + docstrings there. + + Produces an asn1crypto object (from ASN.1) or dictionary object (from JSON). + + + filepath + The path of a file containing data in either ASN.1/DER or JSON/UTF8. + + convert + boolean, optional. If True, converts ASN.1/DER data into JSON-compatible + dictionaries, the old TUF internal format, matching the specification. + Note that if this is done and there are signatures in the given data, + those signatures will still be signed over whatever format they were + signed over, and you should make sure to check them over the right format. + + # TODO: Consider marking signatures here, if the file given has a + # 'signatures' element at the top level, and elements under it, + # by adding an 'over_der' field to each signature, and adding a + # tuf.formats.SIGNATURE_SCHEMA that takes over for + # securesystemslib.formats.SIGNATURE_SCHEMA and includes an optional + # element 'over_der'. All uses of + # securesystemslib.formats.SIGNATURE_SCHEMA in TUF should then be + # switched. + + + tuf.exceptions.FormatError: + if 'filepath' is improperly formatted. + + securesystemslib.exceptions.FormatError: + if 'filepath' is improperly formatted according to securesystemslib but + not according to tuf (unexpected, may occur if code changes). + + tuf.exceptions.Error: + if 'filepath' cannot be deserialized to a Python object. + + IOError: + if file manipulation fails due to IO errors. + + tuf.exceptions.Error: + if both attempts to deserialize from JSON and DER fail + + + If given JSON data, returns a dictionary. + Otherwise, if given ASN.1/DER data, returns an asn1crypto object. + """ + + # Making sure that the format of 'filepath' is a path string. + # tuf.FormatError is raised on incorrect format. + securesystemslib.formats.PATH_SCHEMA.check_match(filepath) + + exception_msgs = [] + + deserialized = None + + # Try JSON first. (Quite a bit less work while blind.) + try: + deserialized = deserialize_json_file(filepath) # securesystemslib.util.load_json_file(filepath) + + except securesystemslib.exceptions.Error as e: + exception_msgs.append(str(e)) + + + if deserialized is None: + # Try ASN.1/DER second. + try: + deserialized = deserialize_der_file(filepath) + + except (tuf.exceptions.Error, securesystemslib.exceptions.Error) as e: + exception_msgs.append(str(e)) + raise tuf.exceptions.Error( + 'Unable to deserialize given data as JSON in UTF-8 or as ASN.1 in ' + 'DER. Exceptions follow: ' + str(exception_msgs)) + + + logger.debug('Successfully read data from filepath ' + str(filepath)) + return deserialized + + + + + +def deserialize_json_file(filepath): + """ + + Read in a utf-8-encoded JSON file and return a dictionary object with the + parsed JSON data. + + Currently just uses securesystemslib.util.load_json_file. + + + filepath: + Path of DER file. + + + tuf.exceptions.Error: + if 'filepath' cannot be deserialized to a Python object. + + IOError: + if file manipulation fails due to IO errors. + + tuf.exceptions.Error: + if the contents of filepath cannot be deserialized to a Python object. + + + None. + + + An asn1crypto object deserialized from the DER data in the file whose path + was provided, + """ + return securesystemslib.util.load_json_file(filepath) + + + + + +def deserialize_der_file(filepath): + """ + + Read in an ASN.1/DER file and return an asn1crypto object containing the + translated contents of the DER file. + + + filepath: + Path of DER file. + + + tuf.exceptions.Error: + if 'filepath' cannot be deserialized to a Python object. + + IOError: + if file manipulation fails due to IO errors. + + tuf.exceptions.Error: + if the contents of filepath cannot be deserialized to a Python object. + + + None. + + + An asn1crypto object deserialized from the DER data in the file whose path + was provided, + + # NO: Trying something different + # # in TUF's standard format, conforming to + # # tuf.formats.SIGNABLE_SCHEMA, where the 'signed' entry matches + # # tuf.formats.ANYROLE_SCHEMA (though conversion of the Mirrors role is not + # # supported). + # # The signatures contained in the returned dictionary (the 'signatures' + # # entry), if any, will have been unchanged. If, for example, the signatures + # # were over a DER object, they will remain that way, even though the 'signed' + # # portion will no longer be in DER. + """ + + # Making sure that the format of 'filepath' is a path string. + # tuf.FormatError is raised on incorrect format. + securesystemslib.formats.PATH_SCHEMA.check_match(filepath) + + logger.debug('Reading file ' + str(filepath)) + with open(filepath, 'rb') as fobj: + data = fobj.read() + + # Decode the DER into an abstract asn1crypto ASN.1 representation of its data, + + + # NO: trying something new. + # # then convert that into a basic Python dictionary representation of the + # # data within. + + return deserialize_der(data) + + + + + +def deserialize_json(data): + """ + + Deserializes the given bytes of utf-8-encoded JSON into a dictionary. + + + data + bytes. JSON data encoded as utf-8. + + + tuf.exceptions.InvalidMetadataJSONError + if unable to decode data as utf-8, or unable to parse resulting string + as valid JSON. + + + Deserialized object, as a dictionary. + """ + + # TODO: Format check on data. + + try: + deserialized = securesystemslib.util.load_json_string(data.decode('utf-8')) + + except ( + securesystemslib.exceptions.InvalidMetadataJSONError, # never raised? + securesystemslib.exceptions.Error, # takes the place of the former + UnicodeDecodeError) as e: # if not valid utf-8 + # raise tuf.exceptions.InvalidMetadataJSONError('Cannot parse as JSON+utf8.') from e # Python3-only + raise tuf.exceptions.InvalidMetadataJSONError(str(e)) + + # NOTE: Unit testing should try "\xfc\xa1\xa1\xa1\xa1\xa1", which is not + # valid utf-8, but is valid octet string. + + else: + return deserialized + + + + + +def deserialize_der(data, datatype=None): + """ + + Deserializes the given bytes of ASN.1/DER into an asn1crypto object. + + Can be called without the datatype of the object to be deserialized known, + but will attempt to guess several types in order to avoid returning the + result of a blind conversion (a conversion that does not know the expected + datatype). + + See docstring of tuf.encoding.asn1_convert.asn1_from_der() to have the + difference explained. + + If datatype is None, attempts to avoid blind conversion by trying to + interpret the given data as, first, role metadata, then, second, a signing + envelope around role metadata. + + If both fail, returns the results of the blind conversion. + + This function will validate its output: whatever deserialized data it + produces will only be returned if serializing that data produces the + original bytes (variable 'data'). + + + data + bytes. Data in ASN.1/DER format. + + datatype (optional) + A subclass of asn1crypto.core.Asn1Value, the type of data expected to + be returned. + + + tuf.exceptions.ASN1ConversionError + if a deserialized object is produced, but that object does not produce + the original bytes ('data') when serialized again. + + asn1crypto errors or tuf.exceptions.ASN1ConversionError + if DER deserialization fails otherwise. + # TODO: delineate the above errors? + + + Deserialized object, as an asn1crypto object. + """ + + # TODO: Format check on data. + + deserialized = None + + # If we were told the datatype, then convert expecting that type. + if datatype is not None: + + if not issubclass(datatype, asn1crypto.core.Asn1Value): + raise tuf.exceptions.FormatError( + 'Received a datatype that was not an asn1crypto class.') + + deserialized = asn1_convert.asn1_from_der(data, datatype) + + + else: # datatype is None, so we must be clever + + # ABANDONED STRATEGY 1: No: do not just blind load. + # # If we were NOT told the datatype, get creative. Attempt a blind conversion, + # # not knowing what datatype the encoded object is (the data definition). See + # # asn1_from_der docstring for the differences. + # deserialized = asn1_convert.asn1_from_der(data) + + # Given the result of the blind conversion, attempt to deduce the type from + # the converted data in a few ways.... + + # (It would be nice to look for a '_type' field in the data or a '_type' + # field under the object in a 'signed' field in the data, but we can't, + # because, after a blind conversion, we don't have field names.) + # # if '_type' in deserialized: + # # datatype = interpret_datatype(deserialized['_type'].native) + # # else if 'signed' in deserialized and '_type' in deserialized['signed']: + # # datatype = interpret_datatype(deserialized['signed']['_type'].native) + + + # ABANDONED STRATEGY 2: No. Do not try creating an additional level of + # Choice on top of the objects. This has to be part of + # the original DER we're now loading. Try AnyEnvelope + # and AnyMetadata. These are guaranteed to have a + # '_type' field somewhere that defines their metadata + # type. + # asn1_obj = None + # datatype_str = None + # is_envelope = None + # + # try: + # asn1_obj = asn1_convert.asn1_from_der(data, asn1_defs.AnyEnvelope) + # datatype_str = asn1_obj.native['signed']['_type'] + # is_envelope = True + # + # except: + # # TODO: Refine the expected exceptions from the above. + # # It looks like ValueError, btw, when unexpected structures are encountered. + # pass + # + # # Note that we check for the success of the parsing of datatype_str as well, + # # since asn1crypto will still provide an object in some circumstances if the + # # parsing failed to produce a coherent object. Trying to use asn1_obj.native + # # or asn1_obj.debug() is the easiest way to check. + # # I think this is related to a kind of lazy parsing in which some checks are + # # skipped for speed.... + # if asn1_obj is None or datatype_str is None or is_envelope is None: + # try: + # asn1_obj = asn1_convert.asn1_from_der(data, asn1_defs.AnyMetadata) + # datatype_str = asn1_obj.native['_type'] + # is_envelope = False + # + # except: + # # TODO: Refine the expected exceptions from the above. + # pass + # + # # If neither of those succeeded, give up and return the results of a blind + # # conversion. + # if asn1_obj is None or datatype_str is None or is_envelope is None: + # return asn1_convert.asn1_from_der(data) + # + # + # # If one of those succeeded, then asn1_obj is now either a Choice object that + # # is an AnyMetadata or an AnyEnvelope, and datatype_str contains all we need + # # to re-parse the object with the correct subclasses and field names. + # datatype = _interpret_datatype(datatype_str, is_envelope) + # return asn1_convert.asn1_from_der(data, datatype) + + + # Strategy 3: Just try every role type metadata definition individually.... + # This is presumably quite slow, so they're in order of likely access. + for datatype in [ + asn1_defs.RootEnvelope, + asn1_defs.TimestampEnvelope, + asn1_defs.SnapshotEnvelope, + asn1_defs.TargetsEnvelope, + asn1_defs.RootMetadata, + asn1_defs.TimestampMetadata, + asn1_defs.SnapshotMetadata, + asn1_defs.TargetsMetadata]: + + try: + deserialized = asn1_convert.asn1_from_der(data, datatype) + + except (tuf.exceptions.ASN1ConversionError, ValueError): + # Note that asn1crypto often raises ValueError if parsing fails. + continue + + + # If NONE of those succeeded, then give up and return the results of a blind + # conversion. + if deserialized is None: + logger.debug( + 'Failed to interpret ASN.1/DER as role metadata. Converting into ' + 'generic asn1crypto object (no field data or subclass data).') + deserialized = asn1_convert.asn1_from_der(data) + + + # Regardless of how we produced the deserialized object, we must now do + # consistency checking, as asn1crypto is a little bit too happy to produce + # something when the data doesn't actually make sense. + # Our primary expectation is that if we try to serialize the data again, we + # get the same thing we loaded. + der_sanity_check(deserialized, data) + + # If it worked, return the object.... + logger.debug('Successfully interpreted ASN.1/DER as ' + str(datatype)) + return deserialized + + + + + +def der_sanity_check(asn1_obj, expected_der_bytes): + """ + Raises tuf.exceptions.ASN1ConversionError if the given asn1_obj does not + serialize to produce the expected DER bytes. + Intended as helper function for deserialize_der(). + """ + # First, force some lazy loading to complete. This also sometimes raises + # errors if the object is malformed. + try: + asn1_obj.contents + except Exception: + raise tuf.exceptions.ASN1ConversionError( + 'Attempted deserialization of ASN.1/DER data resulted in an asn1crypto ' + 'object which was not as expected (would not serialize back to the ' + 'same data.') + + if asn1_obj.dump() != expected_der_bytes: + raise tuf.exceptions.ASN1ConversionError( + 'Attempted deserialization of ASN.1/DER data resulted in an asn1crypto ' + 'object which was not as expected (would not serialize back to the ' + 'same data.') + + + + + +# This was used for Abandoned Strategy 2 in deserialize_der. +# def _interpret_datatype(datatype_str, is_envelope): +# """ +# Converts role type string to a type of asn1crypto object for that role type. +# +# e.g. 'root' to type tuf.encoding.asn1_metadata_definitions.RootMetadata +# """ +# datatype_str = datatype_str.lower() +# +# if datatype_str == 'root': +# if is_envelope: +# return tuf.encoding.asn1_metadata_definitions.RootEnvelope +# else: +# return tuf.encoding.asn1_metadata_definitions.RootMetadata +# +# elif datatype_str == 'timestamp': +# if is_envelope: +# return tuf.encoding.asn1_metadata_definitions.TimestampEnvelope +# else: +# return tuf.encoding.asn1_metadata_definitions.TimestampMetadata +# +# elif datatype_str == 'snapshot': +# if is_envelope: +# return tuf.encoding.asn1_metadata_definitions.SnapshotEnvelope +# else: +# return tuf.encoding.asn1_metadata_definitions.SnapshotMetadata +# +# elif datatype_str == 'targets': +# if is_envelope: +# return tuf.encoding.asn1_metadata_definitions.TargetsEnvelope +# else: +# return tuf.encoding.asn1_metadata_definitions.TargetsMetadata +# +# else: +# # TODO: Consider a different exception class. UnknownRoleError is used +# # pretty differently in other parts of the code. +# raise tuf.exceptions.UnknownRoleError( +# 'Given type string, "' + datatype_str + '" matches no known datatype.') diff --git a/tuf/exceptions.py b/tuf/exceptions.py index 7bb8b04d2b..41dd686103 100755 --- a/tuf/exceptions.py +++ b/tuf/exceptions.py @@ -138,7 +138,7 @@ def __str__(self): class UnknownMethodError(CryptoError): - """Indicate that a user-specified cryptograpthic method is unknown.""" + """Indicate that a user-specified cryptographic method is unknown.""" class UnsupportedLibraryError(Error): @@ -264,4 +264,3 @@ class ASN1ConversionError(Error): defined in the specification) to the ASN.1 format this implementation allows, or vice versa. """ - pass diff --git a/tuf/formats.py b/tuf/formats.py index e6eadbe45e..aed19574fe 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -110,6 +110,12 @@ # A string representing a role's name. ROLENAME_SCHEMA = SCHEMA.AnyString() +TOPLEVEL_ROLENAME_SCHEMA = SCHEMA.OneOf([ + SCHEMA.String('root'), SCHEMA.String('timestamp'), + SCHEMA.String('snapshot'), SCHEMA.String('targets'), + SCHEMA.String('Root'), SCHEMA.String('Timestamp'), + SCHEMA.String('Snapshot'), SCHEMA.String('Targets')]) + # Role object in {'keyids': [keydids..], 'name': 'ABC', 'threshold': 1, # 'paths':[filepaths..]} format. ROLE_SCHEMA = SCHEMA.Object( diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index a2b56567fe..f27ee4f6a9 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -47,6 +47,7 @@ import tuf.sig import tuf.log import tuf.settings +import tuf.encoding.util import securesystemslib import securesystemslib.interface @@ -187,7 +188,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, def should_write(): # Root must be signed by its previous keys and threshold. if rolename == 'root' and len(previous_keyids) > 0: - if not tuf.sig.verify(signable, rolename, repository_name, + if not tuf.sig.verify_signable(signable, rolename, repository_name, previous_threshold, previous_keyids): return False @@ -195,7 +196,7 @@ def should_write(): logger.debug('Root is signed by a threshold of its previous keyids.') # In the normal case, we should write metadata if the threshold is met. - return tuf.sig.verify(signable, rolename, repository_name, + return tuf.sig.verify_signable(signable, rolename, repository_name, roleinfo['threshold'], roleinfo['signing_keyids']) @@ -248,14 +249,15 @@ def should_write(): def _metadata_is_partially_loaded(rolename, signable, repository_name): """ Non-public function that determines whether 'rolename' is loaded with - at least zero good signatures, but an insufficient threshold (which means - 'rolename' was written to disk with repository.write_partial()). A repository - maintainer may write partial metadata without including a valid signature. - Howerver, the final repository.write() must include a threshold number of + insufficient good signatures to meet its signing threshold. 'rolename' may + have been written to disk with repository.write_partial(), which allows a + role to be written even if it does not have enough signatures to be trusted. + (A repository maintainer may write partial metadata without including a valid + signature, or enough valid signatures.) + However, the final repository.write() must include a threshold number of signatures. - If 'rolename' is found to be partially loaded, mark it as partially loaded in - its 'tuf.roledb' roleinfo. This function exists to assist in deciding whether + This function exists to assist in deciding whether a role's version number should be incremented when write() or write_parital() is called. Return True if 'rolename' was partially loaded, False otherwise. """ @@ -264,8 +266,7 @@ def _metadata_is_partially_loaded(rolename, signable, repository_name): # bad, untrusted, unknown, etc. status = tuf.sig.get_signature_status(signable, rolename, repository_name) - if len(status['good_sigs']) < status['threshold'] and \ - len(status['good_sigs']) >= 0: + if len(status['good_sigs']) < status['threshold']: return True else: @@ -542,7 +543,7 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): if os.path.exists(root_filename): # Initialize the key and role metadata of the top-level roles. - signable = securesystemslib.util.load_json_file(root_filename) + signable = tuf.encoding.util.deserialize_file(root_filename) tuf.formats.check_signable_object_format(signable) root_metadata = signable['signed'] tuf.keydb.create_keydb_from_root_metadata(root_metadata, repository_name) @@ -585,7 +586,7 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): # Load 'timestamp.json'. A Timestamp role file without a version number is # always written. if os.path.exists(timestamp_filename): - signable = securesystemslib.util.load_json_file(timestamp_filename) + signable = tuf.encoding.util.deserialize_file(timestamp_filename) timestamp_metadata = signable['signed'] for signature in signable['signatures']: repository.timestamp.add_signature(signature, mark_role_as_dirty=False) @@ -622,7 +623,7 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): str(snapshot_version) + '.' + basename + METADATA_EXTENSION) if os.path.exists(snapshot_filename): - signable = securesystemslib.util.load_json_file(snapshot_filename) + signable = tuf.encoding.util.deserialize_file(snapshot_filename) tuf.formats.check_signable_object_format(signable) snapshot_metadata = signable['signed'] @@ -657,7 +658,7 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): targets_filename = os.path.join(dirname, str(targets_version) + '.' + basename) if os.path.exists(targets_filename): - signable = securesystemslib.util.load_json_file(targets_filename) + signable = tuf.encoding.util.deserialize_file(targets_filename) tuf.formats.check_signable_object_format(signable) targets_metadata = signable['signed'] diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 9f5a4158a5..58466826eb 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -47,6 +47,7 @@ import tuf.log import tuf.exceptions import tuf.repository_lib as repo_lib +import tuf.encoding.util import securesystemslib.keys import securesystemslib.formats @@ -2976,7 +2977,7 @@ def load_repository(repository_directory, repository_name='default'): signable = None try: - signable = securesystemslib.util.load_json_file(metadata_path) + signable = tuf.encoding.util.deserialize_file(metadata_path) except (securesystemslib.exceptions.Error, ValueError, IOError): logger.debug('Tried to load metadata with invalid JSON' @@ -3081,7 +3082,7 @@ def dump_signable_metadata(metadata_filepath): # Are the argument properly formatted? securesystemslib.formats.PATH_SCHEMA.check_match(metadata_filepath) - signable = securesystemslib.util.load_json_file(metadata_filepath) + signable = tuf.encoding.util.deserialize_file(metadata_filepath) # Is 'signable' a valid metadata file? tuf.formats.SIGNABLE_SCHEMA.check_match(signable) @@ -3137,7 +3138,7 @@ def append_signature(signature, metadata_filepath): securesystemslib.formats.SIGNATURE_SCHEMA.check_match(signature) securesystemslib.formats.PATH_SCHEMA.check_match(metadata_filepath) - signable = securesystemslib.util.load_json_file(metadata_filepath) + signable = tuf.encoding.util.deserialize_file(metadata_filepath) # Is 'signable' a valid metadata file? tuf.formats.SIGNABLE_SCHEMA.check_match(signable) diff --git a/tuf/sig.py b/tuf/sig.py index 3caf68b97e..4151c465de 100755 --- a/tuf/sig.py +++ b/tuf/sig.py @@ -17,27 +17,70 @@ See LICENSE-MIT OR LICENSE for licensing information. - Survivable key compromise is one feature of a secure update system - incorporated into TUF's design. Responsibility separation through - the use of multiple roles, multi-signature trust, and explicit and - implicit key revocation are some of the mechanisms employed towards - this goal of survivability. These mechanisms can all be seen in - play by the functions available in this module. - - The signed metadata files utilized by TUF to download target files - securely are used and represented here as the 'signable' object. - More precisely, the signature structures contained within these metadata - files are packaged into 'signable' dictionaries. This module makes it - possible to capture the states of these signatures by organizing the - keys into different categories. As keys are added and removed, the - system must securely and efficiently verify the status of these signatures. - For instance, a bunch of keys have recently expired. How many valid keys - are now available to the Snapshot role? This question can be answered by - get_signature_status(), which will return a full 'status report' of these - 'signable' dicts. This module also provides a convenient verify() function - that will determine if a role still has a sufficient number of valid keys. - If a caller needs to update the signatures of a 'signable' object, there - is also a function for that. + sig provides a higher-level signature handling interface for tuf.updater, + tuf.repository_lib, and tuf.developer_tool. Lower-level functionality used + here comes primarily from securesystemslib, tuf.roledb, and tuf.keydb. + + sig also helps isolate signature-over-encoding issues from the rest of TUF. + Signatures should be made and verified over the serialized form of metadata, + which may or may not be JSON. If signatures over ASN.1/DER metadata need to + be handled, that is abstracted away here. + + + + NOTE that EVERY function in this module abstracts away serialization format, + attempting to handles metadata in the form of BOTH ASN1 (asn1crypto objects + of classes defined in tuf.encoding.asn1_definitions) AND JSON-compatible + dictionaries (matching tuf.formats.ANYROLE_SCHEMA). + + These are provided from lowest to highest level: + + + HELPER FUNCTIONS: + + is_top_level_role() + True if the given rolename is a top-level role's name (root, targets, + etc.) + + check_is_serializable_role_metadata() + makes sure that the given data is serializable TUF role metadata in + either a JSON-compatible dictionary or an asn1crypto ASN1 object. + + + SINGLE SIGNATURE MANIPULATION: + + create_signature_over_metadata() + given key and data, wraps securesystemslib.keys.create_signature(), + creating a signature over given TUF role metadata, which it first + canonicalizes and serializes, handling either ASN.1 or JSON- compatible + formats. + + verify_signature_over_metadata() + given key, signature, and data, wraps + securesystemslib.keys.verify_signature(), verifying a signature over + given TUF role metadata by a given key. It first canonicalizes and + serializes the role metadata, handling either ASN.1 or JSON-compatible + formats. + + + FULL METADATA VERIFICATION: + + get_signature_status() + Analyzes the signatures included in given role metadata that includes + signatures, taking arguments that convey the expected keyids and + threshold for those signatures (either directly or in the form of a + rolename to look up in roledb), produces a report of the validity of the + signatures provided in the metadata indicating whether or not they + correctly sign the given metadata and whether or each signature is from + an authorized key. + + verify_signable() + Verifies a full piece of role metadata, returning True if the given role + metadata is verified (signed by at least enough correct signatures from + authorized keys to meet the threshold expected for this metadata) and + False otherwise. It uses get_signature_status() to glean the status of + each signature. + """ # Help with Python 3 compatibility, where the print statement is a function, an @@ -56,6 +99,7 @@ import tuf.formats import securesystemslib +import securesystemslib.keys # See 'log.py' to learn how logging is handled in TUF. logger = logging.getLogger('tuf.sig') @@ -66,9 +110,228 @@ iso8601_logger.disabled = True +def _is_top_level_role(rolename): + tuf.formats.ROLENAME_SCHEMA.check_match(rolename) + return rolename.lower() in ['root', 'timestamp', 'snapshot', 'targets'] + + +def check_is_serializable_role_metadata(data): + """ + # TODO: write good docstring + + raises an appropriate error if the provided data is neither permitted format + for TUF metadata: + - JSON-compatible role dictionary conforming to tuf.formats.ANYROLE_SCHEMA + - asn1crypto object, instance of one of the four role types defined in + tuf.encoding.asn1_definitions (e.g TargetsMetadata). + """ + + if isinstance(data, dict): + # Assume JSON-compatible metadata conforming to TUF specification. + tuf.formats.ANYROLE_SCHEMA.check_match(data) + + elif isinstance(data, asn1core.Sequence): + # Assume ASN.1 metadata conforming to tuf.encoding.asn1_metadata_definitions + if not (isinstance(data, asn1defs.TargetsMetadata) + or isinstance(data, asn1defs.RootMetadata) + or isinstance(data, asn1defs.TimestampMetadata) + or isinstance(data, asn1defs.SnapshotMetadata)): + raise tuf.exceptions.FormatError('Unrecognized ASN1 metadata object.') + + + else: + raise tuf.exceptions.FormatError( + 'Unrecognized metadata object. Expecting dictionary or asn1crypto ' + 'object. Received object of type: ' + str(type(data)) + ', with ' + 'value: ' + repr(data)) + + + + + +def create_signature_over_metadata( + key, data): + """ + + Given a public key and data (JSON-compatible dictionary or asn1crypto ASN1 + object), create a signature using that key over a canonical, serialized + form of the given data. + + Higher level function that wraps securesystemslib.keys.create_signature, + and works specifically with metadata in the JSON-compatible metadata format + from the TUF specification or an ASN.1 format defined by + tuf.encoding.asn1_definitions. + + + key: + A dictionary representing a public key and its properties, conforming to + securesystemslib.formats.PUBLIC_KEY_SCHEMA. + + For example, if 'key' is an RSA key, it has the form: + {'keytype': 'rsa', + 'keyid': 'f30a0870d026980100c0573bd557394f8c1bbd6...', + 'keyid_hash_algorithms': ['sha256', 'sha512'], + 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...'}}# PEM format + + data: + Data object over which a signature will be produced. + + Acceptable formats are: + + - ASN.1 metadata: + an asn1crypto object, specifically an instance of one of these + classes defined in tuf.encoding.asn1_metadata_definitions: + RootMetadata, TimestampMetadata, SnapshotMetadata, TargetsMetadata. + ASN.1 metadata will be serialized into to bytes as ASN.1/DER + (Distinguished Encoding Rules) for signature checks. + + - JSON-compatible standard TUF-internal metadata: + a dictionary conforming to one of these schemas from tuf.formats: + ROOT_SCHEMA, TARGETS_SCHEMA, TIMESTAMP_SCHEMA, SNAPSHOT_SCHEMA. + This is the usual metadata format defined in the TUF specification. + JSON-compatible metadata will be serialized to bytes encoding + canonical JSON for signature checks. + + (Note: While this function is intended to create signatures over + these metadata types, it can technically be used more broadly with + any dictionary that can be canonicalized to JSON or any serializable + asn1crypto object. Please be careful with such use, support for + which may change.) + + + tuf.FormatError, raised if either 'key' or 'signature' are improperly + formatted, or if data does not seem to match one of the expected formats. + + tuf.UnsupportedLibraryError, if an unsupported or unavailable library is + detected. + + # TODO: Determine the likely types of errors asn1crypto will raise. It + # doesn't look like they have the error classes I'd expect. + + + signature: + The signature dictionary produced by one of the key generation functions, + conforming to securesystemslib.formats.SIGNATURE_SCHEMA. + + For example: + {'keyid': 'f30a0870d026980100c0573bd557394f8c1bbd6...', + 'sig': 'abcdef0123456...'}. + """ + + securesystemslib.formats.ANYKEY_SCHEMA.check_match(key) + + # Validate format of data and serialize data. Note that + # tuf.encoding.util.serialize() only checks to make sure the data is a + # JSON-compatible dict or any asn1crypto value that can be serialized, while + # check_is_serializable_role_metadata() checks to make sure the metadata is + # specifically TUF role metadata (of either type) that can be serialized. + check_is_serializable_role_metadata(data) + serialized_data = tuf.encoding.util.serialize(data) + + # All's well and the data is serialized. Check the signature over it. + return securesystemslib.keys.create_signature(key, serialized_data) + + + + + + +def verify_signature_over_metadata( + key, signature, data): + """ + + Determine whether the given signature is a valid signature by key over + the given data. securesystemslib.keys.verify_signature() will use the + public key found in 'key', the 'sig' objects contained in 'signature', + along with 'data', to complete the verification. + + Higher level function that wraps securesystemslib.keys.verify_signature, + and works specifically with metadata in the JSON-compatible metadata format + from the TUF specification or an ASN.1 format defined by + tuf.encoding.asn1_definitions. + + + key: + A dictionary representing a public key and its properties, conforming to + securesystemslib.formats.PUBLIC_KEY_SCHEMA. + + For example, if 'key' is an RSA key, it has the form: + {'keytype': 'rsa', + 'keyid': 'f30a0870d026980100c0573bd557394f8c1bbd6...', + 'keyid_hash_algorithms': ['sha256', 'sha512'], + 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...'}}# PEM format + + signature: + The signature dictionary produced by one of the key generation functions, + conforming to securesystemslib.formats.SIGNATURE_SCHEMA. + + For example: + {'keyid': 'f30a0870d026980100c0573bd557394f8c1bbd6...', + 'sig': 'abcdef0123456...'}. + + data: + Data object over which the validity of the provided signature will be + checked. + + Acceptable formats are: + + - ASN.1 metadata: + an asn1crypto object, specifically an instance of one of these + classes defined in tuf.encoding.asn1_metadata_definitions: + RootMetadata, TimestampMetadata, SnapshotMetadata, TargetsMetadata. + ASN.1 metadata will be serialized into to bytes as ASN.1/DER + (Distinguished Encoding Rules) for signature checks. + + - JSON-compatible standard TUF-internal metadata: + a dictionary conforming to one of these schemas from tuf.formats: + ROOT_SCHEMA, TARGETS_SCHEMA, TIMESTAMP_SCHEMA, SNAPSHOT_SCHEMA. + This is the usual metadata format defined in the TUF specification. + JSON-compatible metadata will be serialized to bytes encoding + canonical JSON for signature checks. + + (Note: While this function is intended to verify signatures over + these metadata types, it can technically be used more broadly with + any dictionary that can be canonicalized to JSON or any serializable + asn1crypto object. Please be careful with such use, support for + which may change.) + + + tuf.FormatError, raised if either 'key' or 'signature' are improperly + formatted, or if data does not seem to match one of the expected formats. + + tuf.UnsupportedLibraryError, if an unsupported or unavailable library is + detected. + + # TODO: Determine the likely types of errors asn1crypto will raise. It + # doesn't look like they have the error classes I'd expect. + + + Boolean. True if the signature is valid, False otherwise. + """ + + securesystemslib.formats.ANYKEY_SCHEMA.check_match(key) + securesystemslib.formats.SIGNATURE_SCHEMA.check_match(signature) + + # Validate format of data and serialize data. Note that + # tuf.encoding.util.serialize() only checks to make sure the data is a + # JSON-compatible dict or any asn1crypto value that can be serialized, while + # check_is_serializable_role_metadata() checks to make sure the metadata is + # specifically TUF role metadata (of either type) that can be serialized. + check_is_serializable_role_metadata(data) + serialized_data = tuf.encoding.util.serialize(data) + + # All's well and the data is serialized. Check the signature over it. + return securesystemslib.keys.verify_signature(key, signature, serialized_data) + + + + + def get_signature_status(signable, role=None, repository_name='default', threshold=None, keyids=None): """ + # TODO: should probably be called get_status_of_signatures, plural? + Return a dictionary representing the status of the signatures listed in 'signable'. Given an object conformant to SIGNABLE_SCHEMA, a set of public @@ -77,23 +340,38 @@ def get_signature_status(signable, role=None, repository_name='default', the signatures in 'signable' and enumerate all the keys that are valid, invalid, unrecognized, or unauthorized. + Top-level roles (root, snapshot, timestamp, targets) have unambiguous + signature expectations: the expected keyids and threshold come only from + trusted root metadata. Therefore, if optional args threshold and keyids + are not provided, the expected values can be taken from trusted root + metadata in tuf.roledb. Delegated targets roles, on the other hand, may be + the objects of multiple different delegations from different roles that can + each have different keyid and threshold expectations, so it is not possible + to deduce these without knowing the delegating role of interest. Please + always provide threshold and keyids if providing a role that isn't a + top-level role. + + # TODO: After Issue #660 is fixed, update the above. + # Replace "Please always provide..." with: + # "If 'role' is not a top-level role but a delegated targets role, 'keyids' + # and 'threshold' MUST be provided." + - signable: - A dictionary containing a list of signatures and a 'signed' identifier. - signable = {'signed': 'signer', - 'signatures': [{'keyid': keyid, - 'sig': sig}]} - Conformant to tuf.formats.SIGNABLE_SCHEMA. + signable: + A metadata dictionary conformant to tuf.formats.SIGNABLE_SCHEMA. + For example: + {'signed': {...}, + 'signatures': [{'keyid': '1234ef...', 'sig': 'abcd1234...'}]} role: - TUF role (e.g., 'root', 'targets', 'snapshot'). + TUF role (e.g., 'root', 'targets', 'some_delegated_project'). threshold: Rather than reference the role's threshold as set in tuf.roledb.py, use the given 'threshold' to calculate the signature status of 'signable'. 'threshold' is an integer value that sets the role's threshold value, or - the miminum number of signatures needed for metadata to be considered + the minimum number of signatures needed for metadata to be considered fully signed. keyids: @@ -102,17 +380,28 @@ def get_signature_status(signable, role=None, repository_name='default', in tuf.roledb.py for 'role'. + securesystemslib.exceptions.FormatError, if 'signable' does not have the correct format. tuf.exceptions.UnknownRoleError, if 'role' is not recognized. + tuf.exceptions.Error, if the optional arguments keyids and threshold are + partially provided -- i.e. one is provided and one is not. (They must + both be provided or both not be provided.) + + # TODO: After Issue #660 is fixed, add the following: + # tuf.exceptions.Error, if role is not a top-level role and keyids and + # threshold are not provided. + None. A dictionary representing the status of the signatures in 'signable'. Conformant to tuf.formats.SIGNATURESTATUS_SCHEMA. + Includes threshold, good_sigs, bad_sigs, unknown_sigs, untrusted_sigs, + and unknown_signing_schemes. """ # Do the arguments have the correct format? This check will ensure that @@ -122,33 +411,93 @@ def get_signature_status(signable, role=None, repository_name='default', tuf.formats.SIGNABLE_SCHEMA.check_match(signable) securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) + # Argument sanity: we must either be given both the authorized keyids + # and the threshold, or neither. Receiving just one or the other makes no + # sense. + if (threshold is None) != (keyids is None): + raise tuf.exceptions.Error( + 'Incoherent optional arguments: we must receive either both expected ' + 'keyids and threshold, or neither.') + + # Argument sanity: We need either keyids&threshold or role. + if keyids is None and role is None: + logger.warning( + 'Given no information to use to validate signatures -- neither the ' + 'expected keys and threshold, nor a role from which to derive them. ' + 'Signature report will be of very limited use.') + # raise tuf.exceptions.Error( + # 'Invalid arguments: no keyids or threshold provided, and no ' # update after #660 is fixed, to: ', and no top-level ' + # 'role provided from which to deduce them.') + + # Argument sanity: role has the right format, if provided. if role is not None: + assert threshold is None and keyids is None, 'Not possible; mistake in this function!' # TODO: consider removing after debug tuf.formats.ROLENAME_SCHEMA.check_match(role) - - if threshold is not None: - securesystemslib.formats.THRESHOLD_SCHEMA.check_match(threshold) - + # The following code must be used when it is time to fix #660.... + # if not _is_top_level_role(role): # implicit -- and (threshold is None or keyids is None): + # raise tuf.exceptions.Error( + # # See github.com/theupdateframework/tuf/issues/660 + # 'Unable to determine keyids and threshold to expect from delegated ' + # 'targets role, "' + role + '"; when called for a delegated targets ' + # 'role, sig.get_signature_status() must be told which keyids and ' + # 'threshold should be used to validate the role. A delegated role ' + # 'rolename need never be provided as argument.') + + # Argument sanity: keyids and threshold have the right format, if provided. if keyids is not None: securesystemslib.formats.KEYIDS_SCHEMA.check_match(keyids) + assert threshold is not None, 'Not possible; mistake in this function!' # TODO: consider removing after testing + assert role is None, 'Not possible: mistake in this function!' # TODO: consider removing after testing + securesystemslib.formats.THRESHOLD_SCHEMA.check_match(threshold) + + + # Determine which keyids and threshold should be used to verify this + # metadata. Either they are provided as arguments, or, if not, we will try + # to check the roledb ourselves to see if the expected keyids and threshold + # for this role (****) are known there. (This only works for the four + # top-level roles. See TUF Issue #660 on GitHub.) # TODO: <~> Review this section! + if keyids is None: + # Redundant argument sanity check + assert threshold is None, 'Not possible; mistake in this function!' + + + if role is None: + # We can only reach this spot if no role information AND no keyids were + # given to this function, in which case our return data is QUITE limited, + # but we can still check to see if a given signature is correct (though + # not if that key is authorized to sign). + keyids = [] + + else: + # Note that if the role is not known, tuf.exceptions.UnknownRoleError + # is raised here. + keyids = tuf.roledb.get_role_keyids(role, repository_name) + threshold = tuf.roledb.get_role_threshold( + role, repository_name=repository_name) + - # The signature status dictionary returned. + # The signature status dictionary we will return. signature_status = {} - # The fields of the signature_status dict, where each field stores keyids. A - # description of each field: + # The fields of the signature_status dict, where each field is a list of + # keyids. A description of each field: # - # good_sigs = keys confirmed to have produced 'sig' using 'signed', which are - # associated with 'role'; + # good_sigs = keyids confirmed to have produced 'sig' over 'signed', + # which are associated with 'role'. # - # bad_sigs = negation of good_sigs; + # bad_sigs = keyids for which a signature is included that is not a + # valid signature using the key indicated over 'signed'. # - # unknown_sigs = keys not found in the 'keydb' database; + # unknown_sigs = unknown keyids: keyids from signatures for which the keyid + # has no entry in the 'keydb' database. # - # untrusted_sigs = keys that are not in the list of keyids associated with - # 'role'; + # untrusted_sigs = untrusted keyids: keyids from signatures whose keyids + # correspond to known keys, but which are not authorized to + # sign this metadata (according to keyids arg or rolename + # lookup in roledb). # - # unknown_signing_scheme = signing schemes specified in keys that are - # unsupported; + # unknown_signing_scheme = keyids from signatures that list a signing scheme + # that is not supported. good_sigs = [] bad_sigs = [] unknown_sigs = [] @@ -165,7 +514,9 @@ def get_signature_status(signable, role=None, repository_name='default', for signature in signatures: keyid = signature['keyid'] - # Does the signature use an unrecognized key? + # Try to find the public key corresponding to the keyid (fingerprint) + # listed in the signature, so that we can actually verify the signature. + # If we can't find it, note this as an unknown key, and skip to the next. try: key = tuf.keydb.get_key(keyid, repository_name) @@ -173,40 +524,37 @@ def get_signature_status(signable, role=None, repository_name='default', unknown_sigs.append(keyid) continue - # Does the signature use an unknown/unsupported signing scheme? + # Now try verifying the signature (whether it's over canonical JSON + utf-8 + # or over ASN.1/DER). + # If the signature use an unknown/unsupported signing scheme and cannot be + # verified, note that and skip to the next signature. + # TODO: Make sure that verify_signature_over_metadata will actually raise + # this unsupported algorithm error appropriately. try: - valid_sig = securesystemslib.keys.verify_signature(key, signature, signed) - + valid_sig = verify_signature_over_metadata(key, signature, signed) except securesystemslib.exceptions.UnsupportedAlgorithmError: unknown_signing_schemes.append(keyid) continue - # We are now dealing with either a trusted or untrusted key... - if valid_sig: - if role is not None: - - # Is this an unauthorized key? (a keyid associated with 'role') - # Note that if the role is not known, tuf.exceptions.UnknownRoleError - # is raised here. - if keyids is None: - keyids = tuf.roledb.get_role_keyids(role, repository_name) + # We know the key, we support the signing scheme, and + # verify_signature_over_metadata completed, its boolean return telling us if + # the signature is a valid signature by the key the signature mentions, + # over the data provided. + # We now ascertain whether or not this known key is one trusted to sign + # this particular metadata. - if keyid not in keyids: - untrusted_sigs.append(keyid) - continue - - # This is an unset role, thus an unknown signature. + if valid_sig: + # Is this an authorized key? (a keyid associated with 'role') + if keyid in keyids: + good_sigs.append(keyid) # good sig from right key else: - unknown_sigs.append(keyid) - continue - - # Identify good/authorized key. - good_sigs.append(keyid) + untrusted_sigs.append(keyid) # good sig from wrong key else: - # This is a bad signature for a trusted key. + # The signature not even valid for the key the signature says it's using. bad_sigs.append(keyid) + # Retrieve the threshold value for 'role'. Raise # securesystemslib.exceptions.UnknownRoleError if we were given an invalid # role. @@ -237,7 +585,7 @@ def get_signature_status(signable, role=None, repository_name='default', -def verify(signable, role, repository_name='default', threshold=None, +def verify_signable(signable, role, repository_name='default', threshold=None, keyids=None): """ @@ -246,6 +594,17 @@ def verify(signable, role, repository_name='default', threshold=None, associated with 'role'. 'signable' must conform to SIGNABLE_SCHEMA and 'role' must not equal 'None' or be less than zero. + Top-level roles (root, snapshot, timestamp, targets) have unambiguous + signature expectations: the expected keyids and threshold come only from + trusted root metadata. Therefore, if optional args threshold and keyids + are not provided, the expected values can be taken from trusted root + metadata in tuf.roledb. Delegated targets roles, on the other hand, may be + the objects of multiple different delegations from different roles that can + each have different keyid and threshold expectations, so it is not possible + to deduce these without knowing the delegating role of interest; therefore, + if 'role' is not a top-level role but a delegated targets role, 'keyids' + and 'threshold' MUST be provided. + signable: A dictionary containing a list of signatures and a 'signed' identifier. @@ -274,6 +633,9 @@ def verify(signable, role, repository_name='default', threshold=None, securesystemslib.exceptions.Error, if an invalid threshold is encountered. + tuf.exceptions.Error, if role is not a top-level role and keyids and + threshold are not provided. + tuf.sig.get_signature_status() called. Any exceptions thrown by get_signature_status() will be caught here and re-raised. @@ -284,13 +646,15 @@ def verify(signable, role, repository_name='default', threshold=None, """ tuf.formats.SIGNABLE_SCHEMA.check_match(signable) - tuf.formats.ROLENAME_SCHEMA.check_match(role) - securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) + + # The other arguments are checked by the get_signature_status call. # Retrieve the signature status. tuf.sig.get_signature_status() raises: # securesystemslib.exceptions.UnknownRoleError # securesystemslib.exceptions.FormatError. 'threshold' and 'keyids' are also # validated. + # tuf.exceptions.Error if the role is a delegated targets role but keyids and + # threshold are not provided. status = get_signature_status(signable, role, repository_name, threshold, keyids) # Retrieve the role's threshold and the authorized keys of 'status' @@ -305,95 +669,3 @@ def verify(signable, role, repository_name='default', threshold=None, raise securesystemslib.exceptions.Error("Invalid threshold: " + repr(threshold)) return len(good_sigs) >= threshold - - - - - -def may_need_new_keys(signature_status): - """ - - Return true iff downloading a new set of keys might tip this - signature status over to valid. This is determined by checking - if either the number of unknown or untrused keys is > 0. - - - signature_status: - The dictionary returned by tuf.sig.get_signature_status(). - - - securesystemslib.exceptions.FormatError, if 'signature_status does not have - the correct format. - - - None. - - - Boolean. - """ - - # Does 'signature_status' have the correct format? - # This check will ensure 'signature_status' has the appropriate number - # of objects and object types, and that all dict keys are properly named. - # Raise 'securesystemslib.exceptions.FormatError' if the check fails. - securesystemslib.formats.SIGNATURESTATUS_SCHEMA.check_match(signature_status) - - unknown = signature_status['unknown_sigs'] - untrusted = signature_status['untrusted_sigs'] - - return len(unknown) or len(untrusted) - - - - - -def generate_rsa_signature(signed, rsakey_dict): - """ - - Generate a new signature dict presumably to be added to the 'signatures' - field of 'signable'. The 'signable' dict is of the form: - - {'signed': 'signer', - 'signatures': [{'keyid': keyid, - 'method': 'evp', - 'sig': sig}]} - - The 'signed' argument is needed here for the signing process. - The 'rsakey_dict' argument is used to generate 'keyid', 'method', and 'sig'. - - The caller should ensure the returned signature is not already in - 'signable'. - - - signed: - The data used by 'securesystemslib.keys.create_signature()' to generate - signatures. It is stored in the 'signed' field of 'signable'. - - rsakey_dict: - The RSA key, a 'securesystemslib.formats.RSAKEY_SCHEMA' dictionary. - Used here to produce 'keyid', 'method', and 'sig'. - - - securesystemslib.exceptions.FormatError, if 'rsakey_dict' does not have the - correct format. - - TypeError, if a private key is not defined for 'rsakey_dict'. - - - None. - - - Signature dictionary conformant to securesystemslib.formats.SIGNATURE_SCHEMA. - Has the form: - {'keyid': keyid, 'method': 'evp', 'sig': sig} - """ - - # We need 'signed' in canonical JSON format to generate - # the 'method' and 'sig' fields of the signature. - signed = securesystemslib.formats.encode_canonical(signed) - - # Generate the RSA signature. - # Raises securesystemslib.exceptions.FormatError and TypeError. - signature = securesystemslib.keys.create_signature(rsakey_dict, signed) - - return signature