From 2b343043d0b1f41c5d5737a32662a440da3f8192 Mon Sep 17 00:00:00 2001 From: miro Date: Fri, 3 Jan 2025 01:13:02 +0000 Subject: [PATCH] feat:z85b encoding --- hivemind_bus_client/encryption.py | 73 +++++++++++++------- hivemind_bus_client/exceptions.py | 4 ++ hivemind_bus_client/z85b.py | 108 ++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 23 deletions(-) create mode 100644 hivemind_bus_client/z85b.py diff --git a/hivemind_bus_client/encryption.py b/hivemind_bus_client/encryption.py index b6d265a..4b08866 100644 --- a/hivemind_bus_client/encryption.py +++ b/hivemind_bus_client/encryption.py @@ -1,13 +1,15 @@ import enum import json from binascii import hexlify, unhexlify -from typing import Union, Optional, Dict, Any, Literal, List +from typing import Union, Optional, Dict, Any, Literal, List, Callable import pybase64 +from hivemind_bus_client.z85b import Z85B from Cryptodome.Cipher import AES, ChaCha20_Poly1305 from cpuinfo import get_cpu_info -from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidCipher, InvalidKeySize +from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidEncoding, InvalidCipher, \ + InvalidKeySize # Cipher-specific constants AES_KEY_SIZES = [16, 24, 32] # poorman_handshake generates 32 bit secrets @@ -39,8 +41,31 @@ class SupportedEncodings(str, enum.Enum): Ciphers output binary data, and JSON needs to transmit that data as plaintext. The supported encodings include Base64 and Hex encoding. """ + JSON_Z85B = "JSON-Z85B" # JSON text output with Z85B encoding JSON_B64 = "JSON-B64" # JSON text output with Base64 encoding - JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding (LEGACY SUPPORT) + JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding + + +def get_encoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: + encoding = _norm_encoding(encoding) + if encoding == SupportedEncodings.JSON_B64: + return pybase64.b64encode + if encoding == SupportedEncodings.JSON_HEX: + return hexlify + if encoding == SupportedEncodings.JSON_Z85B: + return Z85B.encode + raise InvalidEncoding(f"Invalid encoding: {encoding}") + + +def get_decoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: + encoding = _norm_encoding(encoding) + if encoding == SupportedEncodings.JSON_B64: + return pybase64.b64decode + if encoding == SupportedEncodings.JSON_HEX: + return unhexlify + if encoding == SupportedEncodings.JSON_Z85B: + return Z85B.decode + raise InvalidEncoding(f"Invalid encoding: {encoding}") class SupportedCiphers(str, enum.Enum): @@ -112,7 +137,7 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin This function takes either an encoding string or an enum member and ensures it is converted to the corresponding enum member of SupportedEncodings. If the input - is invalid, an InvalidCipher exception is raised. + is invalid, an InvalidEncoding exception is raised. Args: encoding (Union[SupportedEncodings, str]): The encoding to normalize, either a string or an enum member. @@ -121,7 +146,7 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin SupportedEncodings: The corresponding enum member of SupportedEncodings. Raises: - InvalidCipher: If the encoding is invalid. + InvalidEncoding: If the encoding is invalid. """ if isinstance(encoding, SupportedEncodings): return encoding # If already an enum member, just return it @@ -131,14 +156,14 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin if member.value == encoding: return member - raise InvalidCipher(f"Invalid JSON encoding: {encoding}") + raise InvalidEncoding(f"Invalid JSON encoding: {encoding}") def encrypt_as_json( - key: Union[str, bytes], - plaintext: Union[str, Dict[str, Any]], - cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM, - encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64 + key: Union[str, bytes], + plaintext: Union[str, Dict[str, Any]], + cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM, + encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64 ) -> str: """ Encrypts the given data and outputs it as a JSON string. @@ -155,7 +180,8 @@ def encrypt_as_json( str: A JSON string containing the encrypted data, nonce, and tag. Raises: - InvalidCipher: If an unsupported cipher or encoding is provided. + InvalidCipher: If an unsupported cipher is provided. + InvalidEncoding: If an unsupported encoding is provided. """ cipher = _norm_cipher(cipher) @@ -187,7 +213,7 @@ def encrypt_as_json( ) # Choose encoder based on the encoding - encoder = pybase64.b64encode if encoding == SupportedEncodings.JSON_B64 else hexlify + encoder = get_encoder(encoding) # Return the JSON-encoded ciphertext, tag, and nonce return json.dumps({ @@ -197,7 +223,7 @@ def encrypt_as_json( }) -def decrypt_from_json(key: Union[str, bytes], ciphertextjson: Union[str, bytes], +def decrypt_from_json(key: Union[str, bytes], ciphertext_json: Union[str, bytes], cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM, encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64) -> str: """ @@ -205,7 +231,7 @@ def decrypt_from_json(key: Union[str, bytes], ciphertextjson: Union[str, bytes], Args: key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. - ciphertextjson (Union[str, bytes]): The encrypted data as a JSON string or bytes. + ciphertext_json (Union[str, bytes]): The encrypted data as a JSON string or bytes. cipher (SupportedEncodings): The cipher used for encryption. Returns: @@ -213,31 +239,32 @@ def decrypt_from_json(key: Union[str, bytes], ciphertextjson: Union[str, bytes], Raises: InvalidCipher: If an unsupported cipher is provided. + InvalidEncoding: If an unsupported encoding is provided. DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. """ cipher = _norm_cipher(cipher) encoding = _norm_encoding(encoding) - if isinstance(ciphertextjson, str): - ciphertextjson = json.loads(ciphertextjson) + if isinstance(ciphertext_json, str): + ciphertext_json = json.loads(ciphertext_json) - decoder = pybase64.b64decode if encoding == SupportedEncodings.JSON_B64 else unhexlify + decoder = get_decoder(encoding) - ciphertext = decoder(ciphertextjson["ciphertext"]) + ciphertext: bytes = decoder(ciphertext_json["ciphertext"]) - if "tag" not in ciphertextjson: # web crypto compatibility + if "tag" not in ciphertext_json: # web crypto compatibility if cipher in AES_CIPHERS: ciphertext, tag = ciphertext[:-AES_TAG_SIZE], ciphertext[-AES_TAG_SIZE:] else: ciphertext, tag = ciphertext[:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:] else: - tag = decoder(ciphertextjson["tag"]) - nonce = decoder(ciphertextjson["nonce"]) + tag = decoder(ciphertext_json["tag"]) + nonce = decoder(ciphertext_json["nonce"]) try: ciphertext = decrypt_bin(key=key, - ciphertext=nonce + ciphertext + tag, - cipher=cipher) + ciphertext=nonce + ciphertext + tag, + cipher=cipher) return ciphertext.decode("utf-8") except InvalidKeySize as e: raise e diff --git a/hivemind_bus_client/exceptions.py b/hivemind_bus_client/exceptions.py index 98d7942..bd7bb05 100644 --- a/hivemind_bus_client/exceptions.py +++ b/hivemind_bus_client/exceptions.py @@ -15,6 +15,10 @@ class InvalidCipher(HiveMindException): """unknown encryption scheme requested""" +class InvalidEncoding(HiveMindException): + """unknown encoding scheme requested""" + + class InvalidKeySize(HiveMindException): """ Encryption Key size does not obey specification""" diff --git a/hivemind_bus_client/z85b.py b/hivemind_bus_client/z85b.py new file mode 100644 index 0000000..ced977f --- /dev/null +++ b/hivemind_bus_client/z85b.py @@ -0,0 +1,108 @@ +""" +Python implementation of Z85b 85-bit encoding. + +Z85b is a variation of ZMQ RFC 32 Z85 85-bit encoding with the following differences: +1. Little-endian encoding (to facilitate alignment with lower byte indices). +2. No requirement for a multiple of 4/5 length. +3. `decode_z85b()` eliminates whitespace from the input. +4. `decode_z85b()` raises a clear exception if invalid characters are encountered. + +This file is a derivative work of z85.py from pyzmq. + +Copyright (c) 2013 Brian Granger, Min Ragan-Kelley +Distributed under the terms of the New BSD License. +""" + +from typing import Union +import re +import struct + + + +class Z85DecodeError(Exception): + """Exception raised for errors in decoding Z85b.""" + pass + + +class Z85B: + # Z85CHARS is the base 85 symbol table + Z85CHARS = bytearray(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#") + + # Z85MAP maps integers in [0, 84] to the appropriate character in Z85CHARS + Z85MAP = {char: idx for idx, char in enumerate(Z85CHARS)} + + # Powers of 85 for encoding/decoding + _85s = [85 ** i for i in range(5)] + + # Padding lengths for encoding and decoding + _E_PADDING = [0, 3, 2, 1] + _D_PADDING = [0, 4, 3, 2, 1] + + + @classmethod + def encode(cls, rawbytes: Union[str, bytes]) -> bytes: + """ + Encode raw bytes into Z85b format. + + Args: + rawbytes (Union[str, bytes]): Input data to encode. + + Returns: + bytes: Z85b-encoded bytes. + """ + rawbytes = bytearray(rawbytes) if isinstance(rawbytes, (bytes, str)) else rawbytes + padding = cls._E_PADDING[len(rawbytes) % 4] + rawbytes += b'\x00' * padding + nvalues = len(rawbytes) // 4 + + # Pack the raw bytes into little-endian 32-bit integers + values = struct.unpack(f'<{nvalues}I', rawbytes) + encoded = bytearray() + + for value in values: + for offset in cls._85s: + encoded.append(cls.Z85CHARS[(value // offset) % 85]) + + # Remove padding characters from the encoded output + if padding: + encoded = encoded[:-padding] + return bytes(encoded) + + @classmethod + def decode(cls, z85bytes: Union[str, bytes]) -> bytes: + """ + Decode Z85b-encoded bytes into raw bytes. + + Args: + z85bytes (Union[str, bytes]): Z85b-encoded data. + + Returns: + bytes: Decoded raw bytes. + + Raises: + Z85DecodeError: If invalid characters are encountered during decoding. + """ + # Normalize input by removing whitespace + z85bytes = bytearray(re.sub(rb'\s+', b'', z85bytes if isinstance(z85bytes, bytes) else z85bytes.encode())) + padding = cls._D_PADDING[len(z85bytes) % 5] + nvalues = (len(z85bytes) + padding) // 5 + + values = [] + for i in range(0, len(z85bytes), 5): + value = 0 + for j, offset in enumerate(cls._85s): + try: + value += cls.Z85MAP[z85bytes[i + j]] * offset + except IndexError: + break # End of input reached + except KeyError as e: + raise Z85DecodeError(f"Invalid byte code: {e.args[0]!r}") + values.append(value) + + # Unpack the values back into raw bytes + decoded = struct.pack(f'<{nvalues}I', *values) + + # Remove padding from the decoded output + if padding: + decoded = decoded[:-padding] + return decoded \ No newline at end of file