diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 3fcdad8..b7b6944 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -15,6 +15,7 @@ Submodules Subpackages ciphersuites prepending +subconstruct subconstructs versa prf diff --git a/tls/_common/_constructs.py b/tls/_common/_constructs.py index 75ddd24..c20ff5d 100644 --- a/tls/_common/_constructs.py +++ b/tls/_common/_constructs.py @@ -10,6 +10,8 @@ import six +from tls.exceptions import TLSValidationException + class _UBInt24(construct.Adapter): def _encode(self, obj, context): @@ -211,6 +213,39 @@ def EnumSwitch(type_field, type_enum, value_field, value_choices, # noqa default=default)) +class TLSExprValidator(construct.Validator): + """ + Like :py:class:`construct.ExprValidator`, but raises a + :py:class:`tls.exceptions.TLSValidationException` on validation failure. + + This is necessary because any ConstructError signifies the end of + subconstruct repetition to Range, which in turn breaks use with + ``TLSPrefixedArray``. + """ + def __init__(self, subcon, validator): + super(TLSExprValidator, self).__init__(subcon) + self._validate = validator + + def _decode(self, obj, context): + if not self._validate(obj, context): + raise TLSValidationException("object failed validation", obj) + return obj + + +def TLSOneOf(subcon, valids): # noqa + """ + Validates that the object is one of the listed values, both during parsing + and building. Like :py:meth:`construct.OneOf`, but raises a + :py:class:`tls.exceptions.TLSValidationException` instead of a + ``ConstructError`` subclass on mismatch. + + This is necessary because any ConstructError signifies the end of + subconstruct repetition to Range, which in turn breaks use with + ``TLSPrefixedArray``. + """ + return TLSExprValidator(subcon, lambda obj, ctx: obj in valids) + + class SizeAtLeast(construct.Validator): """ A :py:class:`construct.adapter.Validator` that validates a diff --git a/tls/_common/enums.py b/tls/_common/enums.py index ea36d1e..acba44c 100644 --- a/tls/_common/enums.py +++ b/tls/_common/enums.py @@ -45,6 +45,8 @@ class HandshakeType(Enum): CERTIFICATE_VERIFY = 15 CLIENT_KEY_EXCHANGE = 16 FINISHED = 20 + CERTIFICATE_URL = 21 + CERTIFICATE_STATUS = 22 class ContentType(Enum): @@ -85,6 +87,10 @@ class AlertDescription(Enum): USER_CANCELED = 90 NO_RENEGOTIATION = 100 UNSUPPORTED_EXTENSION = 110 + CERTIFICATE_UNOBTAINABLE = 111 + UNRECOGNIZED_NAME = 112 + BAD_CERTIFICATE_STATUS_RESPONSE = 113 + BAD_CERTIFICATE_HASH_VALUE = 114 class ExtensionType(Enum): @@ -128,3 +134,8 @@ class CompressionMethod(Enum): class NameType(Enum): HOST_NAME = 0 + + +class CertChainType(Enum): + INDIVIDUAL_CERTS = 0 + PKIPATH = 1 diff --git a/tls/_common/test/test_constructs.py b/tls/_common/test/test_constructs.py index 514710f..7ea717d 100644 --- a/tls/_common/test/test_constructs.py +++ b/tls/_common/test/test_constructs.py @@ -12,12 +12,13 @@ import pytest -from tls._common._constructs import (BytesAdapter, EnumClass, - EnumSwitch, Opaque, - PrefixedBytes, SizeAtLeast, +from tls._common._constructs import (BytesAdapter, EnumClass, EnumSwitch, + Opaque, PrefixedBytes, SizeAtLeast, SizeAtMost, SizeWithin, - TLSPrefixedArray, UBInt24, - _UBInt24) + TLSExprValidator, TLSOneOf, + TLSPrefixedArray, UBInt24, _UBInt24) + +from tls.exceptions import TLSValidationException @pytest.mark.parametrize("byte,number", [ @@ -94,6 +95,111 @@ def test_decode_passes_value_through(self, bytes_adapted, value): assert bytes_adapted._decode(value, context=object()) is value +class TestTLSExprValidator(object): + """ + Tests for :py:class:`tls._common._constructs.TLSExprValidator`. + """ + @pytest.fixture + def data_class(self): + """ + A :py:func:`construct.macros.UBInt8` construct that requires the + input value to be equal to 6. + """ + return TLSExprValidator(UBInt8('input_byte'), + lambda obj, ctx: obj == 6) + + def test_parse_invalid(self, data_class): + """ + :py:class:`tls.common._constructs.TLSExprValidator` raises a + ``TLSValidationException`` when parsing a value that does not + evaluate to the provided expression. + """ + with pytest.raises(TLSValidationException): + data_class.parse(b'\xff') + + def test_parse_valid(self, data_class): + """ + :py:class:`tls.common._constructs.TLSExprValidator` parses a value + that evaluates to the provided expression. + """ + assert data_class.parse(b'\x06') == 6 + + def test_build_invalid(self, data_class): + """ + :py:class:`tls.common._constructs.TLSExprValidator` raises a + ``TLSValidationException`` when serializing a value that does not + evaluate to the provided expression. + """ + with pytest.raises(TLSValidationException): + data_class.build(2) + + def test_build_valid(self, data_class): + """ + :py:class:`tls.common._construct.TLSExprValidator` successfully + serializes a value into bytes when it evaluates to the provided + expression. + """ + assert data_class.build(6) == b'\x06' + + +class TestTLSOneOf(object): + """ + Tests for :py:meth:`tls._common._constructs.TLSOneOf`. + """ + + @pytest.fixture + def data_class(self): + """ + A :py:func:`construct.macros.UBInt8` construct that requires the + input value to be equal to one of 1, 3, or 5. + """ + return TLSOneOf(UBInt8('input'), + [1, 3, 5]) + + def test_parse_invalid(self, data_class): + """ + :py:meth:`tls.common._constructs.TLSOneOf` raises a + ``TLSValidationException`` when parsing a value that is not one of + the values in the provided list. + """ + with pytest.raises(TLSValidationException): + data_class.parse(b'\xff') + + @pytest.mark.parametrize('input_bytes,parsed_output', [ + (b'\x01', 1), + (b'\x03', 3), + (b'\x05', 5), + ]) + def test_parse_valid(self, data_class, input_bytes, parsed_output): + """ + :py:meth:`tls.common._constructs.TLSOneOf` parses a value that + equals one of the values in the provided list. + """ + assert data_class.parse(input_bytes) == parsed_output + + def test_build_invalid(self, data_class): + """ + :py:meth:`tls.common._constructs.TLSOneOf` raises a + ``TLSValidationException`` when serializing a value that is not one + of the values in the provided list. + """ + with pytest.raises(TLSValidationException): + data_class.build(2) + + @pytest.mark.parametrize('input,built_bytes', [ + (1, b'\x01'), + (3, b'\x03'), + (5, b'\x05'), + ]) + def test_build_valid(self, data_class, input, built_bytes): + """ + :py:meth:`tls.common._construct.TLSOneOf` successfully serializes a + value into bytes when it evaluates to one of the values in the + provided list. + """ + assert data_class.build(input) == built_bytes + + @pytest.mark.parametrize("bytestring,encoded", [ (b"", b"\x00" + b""), (b"some value", b"\x0A" + b"some value"), diff --git a/tls/_constructs.py b/tls/_constructs.py index 4da2757..e5bc8ed 100644 --- a/tls/_constructs.py +++ b/tls/_constructs.py @@ -6,14 +6,16 @@ from functools import partial -from construct import (Array, Bytes, Pass, Struct, Switch, UBInt16, UBInt32, +from construct import (Array, Bytes, Pass, Struct, + Switch, UBInt16, UBInt32, UBInt8) from tls._common import enums from tls._common._constructs import (EnumClass, EnumSwitch, Opaque, PrefixedBytes, SizeAtLeast, SizeAtMost, - SizeWithin, TLSPrefixedArray, UBInt24) + SizeWithin, TLSOneOf, TLSPrefixedArray, + UBInt24) from tls.ciphersuites import CipherSuites @@ -84,6 +86,11 @@ ServerNameList = TLSPrefixedArray("server_name_list", ServerName) +ClientCertificateURL = Struct( + "client_certificate_url", + # The "extension_data" field of this extension SHALL be empty. +) + SignatureAndHashAlgorithm = Struct( "algorithms", EnumClass(UBInt8("hash"), enums.HashAlgorithm), @@ -107,6 +114,9 @@ enums.ExtensionType.SIGNATURE_ALGORITHMS: Opaque( SupportedSignatureAlgorithms ), + enums.ExtensionType.CLIENT_CERTIFICATE_URL: Opaque( + ClientCertificateURL + ), }, default=Pass, ) @@ -195,3 +205,18 @@ UBInt8("level"), UBInt8("description"), ) + +URLAndHash = Struct( + "url_and_hash", + SizeWithin(UBInt16("length"), + min_size=1, max_size=2 ** 16 - 1), + Bytes("url", lambda ctx: ctx.length), + TLSOneOf(UBInt8('padding'), [1]), + Bytes("sha1_hash", 20), +) + +CertificateURL = Struct( + "CertificateURL", + EnumClass(UBInt8("type"), enums.CertChainType), + TLSPrefixedArray("url_and_hash_list", URLAndHash), +) diff --git a/tls/exceptions.py b/tls/exceptions.py index 417cbe4..41f71a0 100644 --- a/tls/exceptions.py +++ b/tls/exceptions.py @@ -5,9 +5,21 @@ from __future__ import absolute_import, division, print_function -class UnsupportedCipherException(Exception): +class TLSException(Exception): + """ + This is the root exception from which all other exceptions inherit. + Lower-level parsing code raises very specific exceptions that higher-level + code can catch with this exception. + """ + + +class UnsupportedCipherException(TLSException): + pass + + +class UnsupportedExtensionException(TLSException): pass -class UnsupportedExtensionException(Exception): +class TLSValidationException(TLSException): pass diff --git a/tls/message.py b/tls/message.py index c82470c..3722b7e 100644 --- a/tls/message.py +++ b/tls/message.py @@ -204,6 +204,59 @@ def from_bytes(cls, bytes): ) +@attr.s +class URLAndHash(object): + """ + An object representing a URLAndHash struct. + """ + url = attr.ib() + padding = attr.ib() + sha1_hash = attr.ib() + + +@attr.s +class CertificateURL(object): + """ + An object representing a CertificateURL struct. + """ + type = attr.ib() + url_and_hash_list = attr.ib() + + def as_bytes(self): + return _constructs.CertificateURL.build(Container( + type=self.type, + url_and_hash_list=ListContainer( + Container( + length=len(url_and_hash.url), + url=url_and_hash.url, + padding=url_and_hash.padding, + sha1_hash=url_and_hash.sha1_hash, + ) + for url_and_hash in self.url_and_hash_list + ) + )) + + @classmethod + def from_bytes(cls, bytes): + """ + Parse a ``CertificateURL`` struct. + + :param bytes: the bytes representing the input. + :return: CertificateURL object. + """ + construct = _constructs.CertificateURL.parse(bytes) + return cls( + type=construct.type, + url_and_hash_list=[ + URLAndHash( + url=url_and_hash.url, + padding=url_and_hash.padding, + sha1_hash=url_and_hash.sha1_hash, + ) + for url_and_hash in construct.url_and_hash_list], + ) + + @attr.s class Finished(object): verify_data = attr.ib() @@ -230,7 +283,8 @@ def as_bytes(self): enums.HandshakeType.CERTIFICATE_REQUEST, enums.HandshakeType.HELLO_REQUEST, enums.HandshakeType.SERVER_HELLO_DONE, - enums.HandshakeType.FINISHED + enums.HandshakeType.FINISHED, + enums.HandshakeType.CERTIFICATE_URL, ]: _body_as_bytes = self.body.as_bytes() else: @@ -271,6 +325,8 @@ def _get_handshake_message(msg_type, body): CertificateRequest.from_bytes, # 15: parse_certificate_verify, # 16: parse_client_key_exchange, + enums.HandshakeType.CERTIFICATE_URL: CertificateURL.from_bytes, + # 22: parse certificate_status, } try: @@ -282,7 +338,8 @@ def _get_handshake_message(msg_type, body): return Finished(verify_data=body) elif msg_type in [enums.HandshakeType.SERVER_KEY_EXCHANGE, enums.HandshakeType.CERTIFICATE_VERIFY, - enums.HandshakeType.CLIENT_KEY_EXCHANGE]: + enums.HandshakeType.CLIENT_KEY_EXCHANGE, + enums.HandshakeType.CERTIFICATE_STATUS]: raise NotImplementedError else: return _handshake_message_parser[msg_type](body) diff --git a/tls/test/test_hello_message.py b/tls/test/test_hello_message.py index 1fa7862..8a424c9 100644 --- a/tls/test/test_hello_message.py +++ b/tls/test/test_hello_message.py @@ -4,6 +4,8 @@ from __future__ import absolute_import, division, print_function +from construct import Container + from construct.adapters import ValidationError import pytest @@ -104,6 +106,17 @@ class TestClientHello(object): b'\x00\x12' ) + server_name_extension_data + client_certificate_url_extension = ( + b'\x00\x02' # Extension Type: Server Certificate Type + b'\x00\x00' # Length + b'' # Data + ) + + client_hello_packet_with_client_certificate_url_extension = ( + common_client_hello_data + + b'\x00\x04' + ) + client_certificate_url_extension + def test_resumption_no_extensions(self): """ :func:`parse_client_hello` returns an instance of @@ -219,6 +232,30 @@ def test_hello_from_bytes_with_unsupported_extension(self): client_hello_packet ) + def test_parse_client_certificate_url_extension(self): + """ + :py:func:`tls.hello_message.ClientHello` parses a packet with + CLIENT_CERTIFICATE_URL extension. + """ + record = ClientHello.from_bytes( + self.client_hello_packet_with_client_certificate_url_extension + ) + assert len(record.extensions) == 1 + assert (record.extensions[0].type == + enums.ExtensionType.CLIENT_CERTIFICATE_URL) + assert record.extensions[0].data == Container() + + def test_as_bytes_client_certificate_url_extension(self): + """ + :py:func:`tls.hello_message.ClientHello` serializes a message + containing the CLIENT_CERTIFICATE_URL extension. + """ + record = ClientHello.from_bytes( + self.client_hello_packet_with_client_certificate_url_extension + ) + assert (record.as_bytes() == + self.client_hello_packet_with_client_certificate_url_extension) + def test_as_bytes_unsupported_extension(self): """ :func:`ClientHello.as_bytes` fails to serialize a message that diff --git a/tls/test/test_message.py b/tls/test/test_message.py index c58b93d..5b21292 100644 --- a/tls/test/test_message.py +++ b/tls/test/test_message.py @@ -10,12 +10,14 @@ from tls._common import enums +from tls.exceptions import TLSValidationException + from tls.hello_message import ClientHello, ProtocolVersion, ServerHello from tls.message import (ASN1Cert, Certificate, CertificateRequest, - Finished, Handshake, HelloRequest, - PreMasterSecret, ServerDHParams, - ServerHelloDone) + CertificateURL, Finished, Handshake, HelloRequest, + PreMasterSecret, ServerDHParams, ServerHelloDone, + URLAndHash) class TestCertificateRequestParsing(object): @@ -283,6 +285,76 @@ def test_as_bytes_too_long(self): certificate.as_bytes() +class TestCertificateURLParsing(object): + """ + Tests for parsing of :py:class:`tls.message.CertificateURL` messages. + """ + url_and_hash_list_bytes = ( + b'\x00\x10' # url length + b'cert.example.com' # url + b'\x01' # padding + b'abcdefghijklmnopqrst' # SHA1Hash[20] + ) + + certificate_url_packet = ( + b'\x00' # CertChainType + b'\x00\x27' # url_and_hash_list length + ) + url_and_hash_list_bytes + + def test_parse_certificate_url(self): + """ + :py:meth:`tls.message.CertificateURL.from_bytes` parses a valid + packet. + """ + record = CertificateURL.from_bytes(self.certificate_url_packet) + assert isinstance(record, CertificateURL) + assert record.type == enums.CertChainType.INDIVIDUAL_CERTS + assert len(record.url_and_hash_list) == 1 + assert record.url_and_hash_list[0].url == b'cert.example.com' + assert record.url_and_hash_list[0].padding == 1 + assert record.url_and_hash_list[0].sha1_hash == b'abcdefghijklmnopqrst' + + def test_incorrect_padding_parsing(self): + """ + :py:meth:`tls._constructs.URLAndHash.parse` rejects a packet + whose ``padding`` is not 1. + """ + bad_padding_bytes = ( + b'\x00\x10' # url length + b'cert.example.com' # url + b'\x03' # padding + b'abcdefghijklmnopqrst' # SHA1Hash[20] + ) + + certificate_url_packet = ( + b'\x00' # CertChainType + b'\x00\x27' # url_and_hash_list length + ) + bad_padding_bytes + + with pytest.raises(TLSValidationException) as exc_info: + CertificateURL.from_bytes(certificate_url_packet) + + assert exc_info.value.args == ('object failed validation', 3) + + def test_as_bytes(self): + """ + :py:meth:`tls.message.CertificateUrl.as_bytes` returns a valid + packet. + """ + record = CertificateURL.from_bytes(self.certificate_url_packet) + assert record.as_bytes() == self.certificate_url_packet + + def test_as_bytes_with_bad_padding(self): + """ + :py:meth:`tls.message.CertificateURL.as_bytes` fails to serialize a + record whose ``padding`` is not 1. + """ + record = CertificateURL.from_bytes(self.certificate_url_packet) + record.url_and_hash_list[0].padding = 5 + with pytest.raises(TLSValidationException): + record.as_bytes() + + class TestHandshakeStructParsing(object): """ Tests for parsing of :py:class:`tls.messages.Handshake` structs. @@ -387,6 +459,20 @@ class TestHandshakeStructParsing(object): b'some-encrypted-bytes' ) + certificate_url_packet = ( + b'\x00' # CertChainType + b'\x00\x27' # url_and_hash_list length + b'\x00\x10' # url length + b'cert.example.com' # url + b'\x01' # padding + b'abcdefghijklmnopqrst' # SHA1Hash[20] + ) + + certificate_url_handshake_packet = ( + b'\x15' + b'\x00\x00\x2a' + ) + certificate_url_packet + def test_parse_client_hello_in_handshake(self): record = Handshake.from_bytes(self.client_hello_handshake_packet) assert isinstance(record, Handshake) @@ -477,3 +563,29 @@ def test_as_bytes_server_hello_done(self): def test_as_bytes_finished(self): record = Handshake.from_bytes(self.finished_handshake) assert record.as_bytes() == self.finished_handshake + + def test_from_bytes_certificate_url(self): + """ + :py:class:`tls.messages.Handshake` parses a valid packet with a + ``CertificateURL`` message. + """ + record = Handshake.from_bytes(self.certificate_url_handshake_packet) + assert isinstance(record, Handshake) + assert record.msg_type == enums.HandshakeType.CERTIFICATE_URL + assert record.length == 42 + assert isinstance(record.body, CertificateURL) + assert record.body.type == enums.CertChainType.INDIVIDUAL_CERTS + assert len(record.body.url_and_hash_list) == 1 + assert isinstance(record.body.url_and_hash_list[0], URLAndHash) + assert record.body.url_and_hash_list[0].url == b'cert.example.com' + assert record.body.url_and_hash_list[0].padding == 1 + assert (record.body.url_and_hash_list[0].sha1_hash == + b'abcdefghijklmnopqrst') + + def test_as_bytes_certificate_url(self): + """ + :py:meth:`tls.message.Handshake.as_bytes` returns a valid packet when + the body contains a ``CertificateURL`` message. + """ + record = Handshake.from_bytes(self.certificate_url_handshake_packet) + assert record.as_bytes() == self.certificate_url_handshake_packet