-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
f75fbb0
commit de15f1b
Showing
16 changed files
with
186 additions
and
0 deletions.
There are no files selected for viewing
File renamed without changes.
Binary file not shown.
File renamed without changes.
Binary file added
BIN
+166 Bytes
src/crypto_pkg/attacks/block_ciphers/__pycache__/__init__.cpython-38.pyc
Binary file not shown.
Binary file added
BIN
+1.47 KB
src/crypto_pkg/attacks/block_ciphers/__pycache__/utils.cpython-38.pyc
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
from typing import Dict, Tuple, Union | ||
|
||
from Crypto.Cipher import AES | ||
|
||
from crypto_pkg.attacks.block_ciphers.utils import Text, prepare_key | ||
|
||
|
||
class DoubleAESAttack: | ||
|
||
@staticmethod | ||
def encrypt(key: Text, plain_text: Text) -> Text: | ||
cipher = AES.new(key.ascii_hex, AES.MODE_ECB) | ||
cipher_text = cipher.encrypt(plain_text.ascii_hex) | ||
return Text(text=cipher_text) | ||
|
||
@staticmethod | ||
def decrypt(key: Text, cipher_text: Text) -> Text: | ||
cipher = AES.new(key.ascii_hex, AES.MODE_ECB) | ||
plain_text = cipher.decrypt(cipher_text.ascii_hex) | ||
return Text(text=plain_text) | ||
|
||
@classmethod | ||
def lookup_table_computation(cls, plain_text: str) -> Dict[int, int]: | ||
p = Text(text=bytes.fromhex(plain_text)) | ||
out = {cls.encrypt(key=prepare_key(i), plain_text=p).integer: i for i in range(1, 2 ** 24)} | ||
return out | ||
|
||
@classmethod | ||
def search_match(cls, cipher_text: str, lookup_table: Dict[int, int]) -> Union[Tuple[Text, Text], None]: | ||
c = Text(text=bytes.fromhex(cipher_text)) | ||
|
||
for i in range(2 ** 24): | ||
key = prepare_key(i) | ||
m = cls.decrypt(key=key, cipher_text=c) | ||
if lookup_table.get(m.integer): | ||
return prepare_key(lookup_table.get(m.integer)), key | ||
|
||
@classmethod | ||
def attack(cls, plain_text: str, cipher_text: str): | ||
|
||
look_up_table = cls.lookup_table_computation(plain_text=plain_text) | ||
keys = cls.search_match(cipher_text=cipher_text, lookup_table=look_up_table) | ||
return keys | ||
|
||
|
||
if __name__ == '__main__': | ||
|
||
''' Example ''' | ||
|
||
pt = '2355502c48059b15f70ddf4938b3b97e' | ||
ct = '5d64800bce91edda9c3bad2956be5b12' | ||
ks = DoubleAESAttack.attack(plain_text=pt, cipher_text=ct) | ||
if ks: | ||
print("\nKeys found:") | ||
print(f"\tk1: 0x{ks[0].hex}") | ||
print(f"\tk2: 0x{ks[1].hex}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
import logging | ||
from logging import getLogger | ||
from multiprocessing import Pool | ||
import random | ||
|
||
from crypto_pkg.attacks.block_ciphers.utils import prepare_key | ||
from crypto_pkg.ciphers.symmetric.aes import CustomAES, array_to_matrix, get_array_from_state | ||
|
||
logging.basicConfig(level=logging.DEBUG) | ||
|
||
_log = getLogger(__name__) | ||
_log.setLevel(logging.DEBUG) | ||
|
||
|
||
class ModifiedAES(CustomAES): | ||
|
||
def aes_round_trans(self, plain_text, round_key=None, last=False): | ||
state_matrix = array_to_matrix(plain_text) | ||
# subbytes transformation | ||
s = self.aes_sub_bytes(state_matrix) | ||
# mixcolumns transformation | ||
if last: | ||
s_k = self.aes_add_round_key(s, round_key) | ||
return get_array_from_state(s_k) | ||
else: | ||
c = self.aes_mix_columns(s) | ||
s_k = self.aes_add_round_key(c, round_key) | ||
return get_array_from_state(s_k) | ||
|
||
def encrypt(self, plain_text, key): | ||
ks = array_to_matrix(key) | ||
state = array_to_matrix(plain_text) | ||
s_k = self.aes_add_round_key(state, ks) | ||
|
||
pn = get_array_from_state(s_k) | ||
for i in range(1, 10): | ||
tmp = self.aes_round_trans(plain_text=pn, round_key=ks) | ||
pn = tmp | ||
pn = self.aes_round_trans(plain_text=pn, round_key=ks, last=True) | ||
return pn | ||
|
||
def attack_section(self, plain_text, cipher_block_ref, init_pos, section_n=0): | ||
for i in range(2 ** 32): | ||
key = prepare_key(i, max_key=init_pos) | ||
k_block = [int(item, 16) for item in [key.hex[i * 2:i * 2 + 2] for i in range(len(key.hex))] if item != ''] | ||
c = self.encrypt(key=k_block, plain_text=plain_text) | ||
c_by_block = [c[i * 4:i * 4 + 4] for i in range(len(c))] | ||
if c_by_block[section_n] == cipher_block_ref[section_n]: | ||
_log.info(f"key guess for block {section_n}: {key.hex}") | ||
return key | ||
|
||
def attack(self, plain_text: str, cipher_text: str): | ||
p_int_list = [int(item, 16) for item in [plain_text[i * 2:i * 2 + 2] for i in range(len(plain_text))] if item != ''] | ||
c_int_list = [int(item, 16) for item in [cipher_text[i * 2:i * 2 + 2] for i in range(len(cipher_text))] if item != ''] | ||
|
||
c_by_block_ref = [c_int_list[i * 4:i * 4 + 4] for i in range(len(c))] | ||
args = ( | ||
[p_int_list, c_by_block_ref, 32, 0], | ||
[p_int_list, c_by_block_ref, 64, 1], | ||
[p_int_list, c_by_block_ref, 96, 2], | ||
[p_int_list, c_by_block_ref, 128, 3] | ||
) | ||
|
||
_log.debug("Run attack on sub-blocks in parallel") | ||
with Pool() as pool: | ||
res = pool.starmap(self.attack_section, args) | ||
_log.debug(f"Parallel execution terminated with keys guesses {res}") | ||
r = [int(item.hex, 16) for item in res] | ||
out = r[0] ^ r[1] ^ r[2] ^ r[3] | ||
_log.info(f"128bits key guess: {out}") | ||
return out | ||
|
||
|
||
if __name__ == '__main__': | ||
''' Example ''' | ||
|
||
# ---- Generation of the plain text - cipher text pair | ||
# Choose the key | ||
to_find_key = '00000001000000100000000000000a01' | ||
# Choose a random plain text | ||
pt = format(random.getrandbits(128), 'x') | ||
# Prepare plain text and key for encryption | ||
p = [int(item, 16) for item in [pt[i * 2:i * 2 + 2] for i in range(len(pt))] if item != ''] | ||
k = [int(item, 16) for item in [to_find_key[i * 2:i * 2 + 2] for i in range(len(to_find_key))] if item != ''] | ||
# Generate cipher text | ||
aes = ModifiedAES() | ||
c = aes.encrypt(key=k, plain_text=p) | ||
ct = bytes(c).hex() | ||
|
||
# ---- Run the attack | ||
_log.debug(f"Run the attack with plain-text {p} and cipher-text {p}") | ||
aes = ModifiedAES() | ||
result = aes.attack(plain_text=pt, cipher_text=ct) | ||
assert result == int(to_find_key, 16) | ||
print(f"\nSuccess: key {to_find_key} recovered") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
class Text: | ||
""" | ||
Number object whose attributes are | ||
ascii_hex: Hexadecimal ASCII representation | ||
hex: string with Hexadecimal base representation | ||
integer: integer number | ||
""" | ||
|
||
def __init__(self, text): | ||
self.ascii_hex = text | ||
self.hex = text.hex() | ||
self.integer = int(text.hex(), 16) | ||
|
||
|
||
def prepare_key(key: int, max_key=24) -> Text: | ||
""" | ||
Converts and integer to usable key. Appends 128-max_kex 0 bits after the integer and pads 0 bits before the number | ||
to satisfy the required number of bits. Then it converts to hexadecimal and instantiate a Text object | ||
Args: | ||
key: integer number | ||
max_key: after max_key bits, a sequence of 0 bits starts | ||
Returns: | ||
Text object corresponding to the prepared key. | ||
""" | ||
try: | ||
k = bin(key) | ||
k_n = k[2:] + (128 - max_key) * '0' | ||
n_k = format(int(k_n, 2), 'x') | ||
n_k2 = n_k.rjust(2 * 16, '0') | ||
k_n = bytes.fromhex(n_k2) | ||
|
||
except Exception as exc: | ||
raise exc | ||
return Text(text=k_n) |
Empty file.
Empty file.
File renamed without changes.
Empty file.
Empty file.
File renamed without changes.
Empty file.
Empty file.