Skip to content

Commit

Permalink
feat:z85b encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Jan 3, 2025
1 parent e11ca29 commit 2b34304
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 23 deletions.
73 changes: 50 additions & 23 deletions hivemind_bus_client/encryption.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import enum
import json
from binascii import hexlify, unhexlify
from typing import Union, Optional, Dict, Any, Literal, List
from typing import Union, Optional, Dict, Any, Literal, List, Callable

import pybase64
from hivemind_bus_client.z85b import Z85B
from Cryptodome.Cipher import AES, ChaCha20_Poly1305
from cpuinfo import get_cpu_info

from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidCipher, InvalidKeySize
from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidEncoding, InvalidCipher, \
InvalidKeySize

# Cipher-specific constants
AES_KEY_SIZES = [16, 24, 32] # poorman_handshake generates 32 bit secrets
Expand Down Expand Up @@ -39,8 +41,31 @@ class SupportedEncodings(str, enum.Enum):
Ciphers output binary data, and JSON needs to transmit that data as plaintext.
The supported encodings include Base64 and Hex encoding.
"""
JSON_Z85B = "JSON-Z85B" # JSON text output with Z85B encoding
JSON_B64 = "JSON-B64" # JSON text output with Base64 encoding
JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding (LEGACY SUPPORT)
JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding


def get_encoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]:
encoding = _norm_encoding(encoding)
if encoding == SupportedEncodings.JSON_B64:
return pybase64.b64encode
if encoding == SupportedEncodings.JSON_HEX:
return hexlify
if encoding == SupportedEncodings.JSON_Z85B:
return Z85B.encode
raise InvalidEncoding(f"Invalid encoding: {encoding}")


def get_decoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]:
encoding = _norm_encoding(encoding)
if encoding == SupportedEncodings.JSON_B64:
return pybase64.b64decode
if encoding == SupportedEncodings.JSON_HEX:
return unhexlify
if encoding == SupportedEncodings.JSON_Z85B:
return Z85B.decode
raise InvalidEncoding(f"Invalid encoding: {encoding}")


class SupportedCiphers(str, enum.Enum):
Expand Down Expand Up @@ -112,7 +137,7 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin
This function takes either an encoding string or an enum member and ensures it
is converted to the corresponding enum member of SupportedEncodings. If the input
is invalid, an InvalidCipher exception is raised.
is invalid, an InvalidEncoding exception is raised.
Args:
encoding (Union[SupportedEncodings, str]): The encoding to normalize, either a string or an enum member.
Expand All @@ -121,7 +146,7 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin
SupportedEncodings: The corresponding enum member of SupportedEncodings.
Raises:
InvalidCipher: If the encoding is invalid.
InvalidEncoding: If the encoding is invalid.
"""
if isinstance(encoding, SupportedEncodings):
return encoding # If already an enum member, just return it
Expand All @@ -131,14 +156,14 @@ def _norm_encoding(encoding: Union[SupportedEncodings, str]) -> SupportedEncodin
if member.value == encoding:
return member

raise InvalidCipher(f"Invalid JSON encoding: {encoding}")
raise InvalidEncoding(f"Invalid JSON encoding: {encoding}")


def encrypt_as_json(
key: Union[str, bytes],
plaintext: Union[str, Dict[str, Any]],
cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM,
encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64
key: Union[str, bytes],
plaintext: Union[str, Dict[str, Any]],
cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM,
encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64
) -> str:
"""
Encrypts the given data and outputs it as a JSON string.
Expand All @@ -155,7 +180,8 @@ def encrypt_as_json(
str: A JSON string containing the encrypted data, nonce, and tag.
Raises:
InvalidCipher: If an unsupported cipher or encoding is provided.
InvalidCipher: If an unsupported cipher is provided.
InvalidEncoding: If an unsupported encoding is provided.
"""

cipher = _norm_cipher(cipher)
Expand Down Expand Up @@ -187,7 +213,7 @@ def encrypt_as_json(
)

# Choose encoder based on the encoding
encoder = pybase64.b64encode if encoding == SupportedEncodings.JSON_B64 else hexlify
encoder = get_encoder(encoding)

# Return the JSON-encoded ciphertext, tag, and nonce
return json.dumps({
Expand All @@ -197,47 +223,48 @@ def encrypt_as_json(
})


def decrypt_from_json(key: Union[str, bytes], ciphertextjson: Union[str, bytes],
def decrypt_from_json(key: Union[str, bytes], ciphertext_json: Union[str, bytes],
cipher: Union[SupportedCiphers, str] = SupportedCiphers.AES_GCM,
encoding: Union[SupportedEncodings, str] = SupportedEncodings.JSON_B64) -> str:
"""
Decrypts data from a JSON string.
Args:
key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated.
ciphertextjson (Union[str, bytes]): The encrypted data as a JSON string or bytes.
ciphertext_json (Union[str, bytes]): The encrypted data as a JSON string or bytes.
cipher (SupportedEncodings): The cipher used for encryption.
Returns:
str: The decrypted plaintext data.
Raises:
InvalidCipher: If an unsupported cipher is provided.
InvalidEncoding: If an unsupported encoding is provided.
DecryptionKeyError: If decryption fails due to an invalid key or corrupted data.
"""
cipher = _norm_cipher(cipher)
encoding = _norm_encoding(encoding)

if isinstance(ciphertextjson, str):
ciphertextjson = json.loads(ciphertextjson)
if isinstance(ciphertext_json, str):
ciphertext_json = json.loads(ciphertext_json)

decoder = pybase64.b64decode if encoding == SupportedEncodings.JSON_B64 else unhexlify
decoder = get_decoder(encoding)

ciphertext = decoder(ciphertextjson["ciphertext"])
ciphertext: bytes = decoder(ciphertext_json["ciphertext"])

if "tag" not in ciphertextjson: # web crypto compatibility
if "tag" not in ciphertext_json: # web crypto compatibility
if cipher in AES_CIPHERS:
ciphertext, tag = ciphertext[:-AES_TAG_SIZE], ciphertext[-AES_TAG_SIZE:]
else:
ciphertext, tag = ciphertext[:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:]
else:
tag = decoder(ciphertextjson["tag"])
nonce = decoder(ciphertextjson["nonce"])
tag = decoder(ciphertext_json["tag"])
nonce = decoder(ciphertext_json["nonce"])

try:
ciphertext = decrypt_bin(key=key,
ciphertext=nonce + ciphertext + tag,
cipher=cipher)
ciphertext=nonce + ciphertext + tag,
cipher=cipher)
return ciphertext.decode("utf-8")
except InvalidKeySize as e:
raise e
Expand Down
4 changes: 4 additions & 0 deletions hivemind_bus_client/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class InvalidCipher(HiveMindException):
"""unknown encryption scheme requested"""


class InvalidEncoding(HiveMindException):
"""unknown encoding scheme requested"""


class InvalidKeySize(HiveMindException):
""" Encryption Key size does not obey specification"""

Expand Down
108 changes: 108 additions & 0 deletions hivemind_bus_client/z85b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""
Python implementation of Z85b 85-bit encoding.
Z85b is a variation of ZMQ RFC 32 Z85 85-bit encoding with the following differences:
1. Little-endian encoding (to facilitate alignment with lower byte indices).
2. No requirement for a multiple of 4/5 length.
3. `decode_z85b()` eliminates whitespace from the input.
4. `decode_z85b()` raises a clear exception if invalid characters are encountered.
This file is a derivative work of z85.py from pyzmq.
Copyright (c) 2013 Brian Granger, Min Ragan-Kelley
Distributed under the terms of the New BSD License.
"""

from typing import Union
import re
import struct



class Z85DecodeError(Exception):
"""Exception raised for errors in decoding Z85b."""
pass


class Z85B:
# Z85CHARS is the base 85 symbol table
Z85CHARS = bytearray(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#")

# Z85MAP maps integers in [0, 84] to the appropriate character in Z85CHARS
Z85MAP = {char: idx for idx, char in enumerate(Z85CHARS)}

# Powers of 85 for encoding/decoding
_85s = [85 ** i for i in range(5)]

# Padding lengths for encoding and decoding
_E_PADDING = [0, 3, 2, 1]
_D_PADDING = [0, 4, 3, 2, 1]


@classmethod
def encode(cls, rawbytes: Union[str, bytes]) -> bytes:
"""
Encode raw bytes into Z85b format.
Args:
rawbytes (Union[str, bytes]): Input data to encode.
Returns:
bytes: Z85b-encoded bytes.
"""
rawbytes = bytearray(rawbytes) if isinstance(rawbytes, (bytes, str)) else rawbytes
padding = cls._E_PADDING[len(rawbytes) % 4]
rawbytes += b'\x00' * padding
nvalues = len(rawbytes) // 4

# Pack the raw bytes into little-endian 32-bit integers
values = struct.unpack(f'<{nvalues}I', rawbytes)
encoded = bytearray()

for value in values:
for offset in cls._85s:
encoded.append(cls.Z85CHARS[(value // offset) % 85])

# Remove padding characters from the encoded output
if padding:
encoded = encoded[:-padding]
return bytes(encoded)

@classmethod
def decode(cls, z85bytes: Union[str, bytes]) -> bytes:
"""
Decode Z85b-encoded bytes into raw bytes.
Args:
z85bytes (Union[str, bytes]): Z85b-encoded data.
Returns:
bytes: Decoded raw bytes.
Raises:
Z85DecodeError: If invalid characters are encountered during decoding.
"""
# Normalize input by removing whitespace
z85bytes = bytearray(re.sub(rb'\s+', b'', z85bytes if isinstance(z85bytes, bytes) else z85bytes.encode()))
padding = cls._D_PADDING[len(z85bytes) % 5]
nvalues = (len(z85bytes) + padding) // 5

values = []
for i in range(0, len(z85bytes), 5):
value = 0
for j, offset in enumerate(cls._85s):
try:
value += cls.Z85MAP[z85bytes[i + j]] * offset
except IndexError:
break # End of input reached
except KeyError as e:
raise Z85DecodeError(f"Invalid byte code: {e.args[0]!r}")
values.append(value)

# Unpack the values back into raw bytes
decoded = struct.pack(f'<{nvalues}I', *values)

# Remove padding from the decoded output
if padding:
decoded = decoded[:-padding]
return decoded

0 comments on commit 2b34304

Please sign in to comment.