diff --git a/.github/workflows/ci_linting.yml b/.github/workflows/ci_linting.yml index 9a076690f7e..62ece4c3fd3 100644 --- a/.github/workflows/ci_linting.yml +++ b/.github/workflows/ci_linting.yml @@ -77,11 +77,16 @@ jobs: steps: - name: checkout uses: actions/checkout@v3 - - name: pep8 exp - uses: lrstewart/autopep8_action@python-latest + - name: Run autopep8 + id: autopep8 + uses: peter-evans/autopep8@v2 with: - dry: true - checkpath: ./tests/integrationv2/*.py + args: --diff --exit-code . + - name: Check exit code + if: steps.autopep8.outputs.exit-code != 0 + run: | + echo "Run 'autopep8 --in-place .' to fix" + exit 1 clang-format: runs-on: ubuntu-latest steps: diff --git a/tests/integrationv2/.pep8 b/.pep8 similarity index 74% rename from tests/integrationv2/.pep8 rename to .pep8 index df41f1a15e9..ae8b67bcec6 100644 --- a/tests/integrationv2/.pep8 +++ b/.pep8 @@ -1,4 +1,3 @@ [pep8] max_line_length = 120 -in-place = true recursive = true diff --git a/scram/aes_scram.py b/scram/aes_scram.py index 30426e82bfc..c64112dd3a2 100755 --- a/scram/aes_scram.py +++ b/scram/aes_scram.py @@ -1,244 +1,247 @@ -# SCRAM mode Python script -from Crypto.Cipher import AES -from Crypto import Random -rndfile = Random.new() -import hashlib -import hmac -import sys - -DEBUG_ENABLED = True - -# When reading/writing byte strings, the first (aka left-most) byte is the Most Significant Byte (aka Big-Endian) -# (Eg "0x0001", 0x00 is the MSB and 0x01 is the LSB, meaning 0x0001 == 1) -ENDIANNESS = 'big' - -# Convert Integer value to byte string -def byteStr(val, numBytes): - return val.to_bytes(numBytes, ENDIANNESS) - -# Debug Print Byte String to Standard Out -def debugByteStr(debugStr, byteStrVal): - if DEBUG_ENABLED: - print(debugStr + ": 0x" + byteStrVal.hex().upper()) - -# Debug Print Integer value to Standard Out -def debugInt(debugStr, intVal): - if DEBUG_ENABLED: - print(debugStr + ": " + str(intVal)) - -# Generate a random Key -def scram_generate_key(): - # Generate Random 32 Byte Key - K = rndfile.read(32) - debugByteStr("K", K) - - return K - -def scram_encrypt(K, N, A, M, F): - """ - SCRAM Encryption - - Parameters: - K: Key - N: Nonce - A: Additional Authenticated Data - M: Plaintext Message - F: Frame Size - - Returns: - C: Ciphertext - X: Excrypted R and Padding Len - Tag: Authentication Tag - """ - # Generate a random 32-byte value R - R = rndfile.read(32) - - # Prepare the Padding. We append 0x00 bytes to the end up to the next frame size. - M_LEN = len(M) - PADDING_LEN = 0 - - if (F > 0): - PADDING_LEN = (F - M_LEN) % F - - PADDING_STR = byteStr(0x0, PADDING_LEN) - PADDING_LEN_STR = byteStr(PADDING_LEN, 2) - PADDED_MSG = M + PADDING_STR - - debugInt("len(M)", M_LEN) - debugInt("PADDING_LEN", PADDING_LEN) - debugByteStr("PADDING_STR", PADDING_STR) - debugByteStr("PADDING_LEN_STR", PADDING_LEN_STR) - debugByteStr("PADDED_MSG", PADDED_MSG) - - # Derive Message encryption key (KE) - # S1 = N || 0x00 0x00 0x00 0x1 || 0^{8} || 0^{8} || 0^{16} || R - S1 = N + byteStr(0x01, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + R - U1 = hmac.new(K, S1, hashlib.sha512).digest() - KE = U1[0:32] - - # AES_CTR encrypt PADDED_MSG with Nonce N and Key KE - C = AES.new(key=KE, mode=AES.MODE_CTR, nonce=N).encrypt(PADDED_MSG) - - # Derive MAC Key (KM) used to with GMAC to generate T - # S2 = N || 0x00 0x00 0x00 0x2 || 0^{8} || 0^{8} || 0^{16} || 0^{32} - S2 = N + byteStr(0x02, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + byteStr(0x0, 32) - U2 = hmac.new(K, S2, hashlib.sha512).digest() - KM = U2[0:32] - - # GMAC the string A || C , using the GMAC key KM and nonce N - T = AES.new(key=KM, mode=AES.MODE_GCM, nonce=N).update(A + C).digest() - - # Derive a one-time pad (U3) from T - # S3 = N || 0x00 0x00 0x00 0x3 || 0^{8} || 0^{8} || T || 0^{32} - S3 = N + byteStr(0x03, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + T + byteStr(0x0, 32) - U3 = hmac.new(K, S3, hashlib.sha512).digest() - - # Encrypt R and PaddingLen with one-time pad U3 - Y1 = bytes(a ^ b for (a,b) in zip (U3[0:32], R)) - Y0 = bytes(a ^ b for (a,b) in zip (U3[32:34], PADDING_LEN_STR)) - X = Y1 + Y0 - - # Authenticate (Tag) T and R - # S4 = N || 0x00 0x00 0x00 0x4 || A_LEN_STR || M_LEN_STR || T || R - S4 = N + byteStr(0x04, 4) + byteStr(len(A), 8) + byteStr(M_LEN, 8) + T + R - U4 = hmac.new(K, S4, hashlib.sha512).digest() - - # Truncate to 16 bytes tag - Tag = U4[0:16] - - debugByteStr("S1", S1) - debugByteStr("S2", S2) - debugByteStr("S3", S3) - debugByteStr("S4", S4) - debugByteStr("U1", U1) - debugByteStr("U2", U2) - debugByteStr("U3", U3) - debugByteStr("U4", U4) - debugByteStr("Y0", Y0) - debugByteStr("Y1", Y1) - debugByteStr("T", T) - debugByteStr("KE", KE) - debugByteStr("KM", KM) - debugInt("len(C)", len(C)) - debugByteStr("C", C) - debugByteStr("X", X) - debugByteStr("Tag", Tag) - - return C, X, Tag - - -def scram_decrypt(K, N, A, C, X, Tag): - """ - SCRAM Decryption - - Parameters: - K: Key - N: Nonce - A: Additional Authenticated Data - C: Ciphertext - X: Encrypted Random value R and Padding Length - Tag: Tag - - Returns: - M_calculated: The decrypted Message - """ - - # Derive MAC key (KM) - # S2 = N || 0x00 0x00 0x00 0x2 || 0^{8} || 0^{8} || 0^{16} || 0^{32} - S2_calculated = N + byteStr(0x02, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + byteStr(0x0, 32) - U2_calculated = hmac.new(K, S2_calculated, hashlib.sha512).digest() - KM_calculated = U2_calculated[0:32] - - # Derive T - # T = GMAC (N, A||C, null) - T_calculated = AES.new(key=KM_calculated, mode=AES.MODE_GCM, nonce=N).update(A + C).digest() - - # Derive one-time pad U3 from T_calculated, - # S3 = N || 0x00 0x00 0x00 0x3 || 0^{8} || 0^{8} || T || 0^{32} - S3_calculated = N + byteStr(0x03, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + T_calculated + byteStr(0x0, 32) - U3_calculated = hmac.new(K, S3_calculated, hashlib.sha512).digest() - - # Decrypt R and PADDING_LEN, by xor'ing X and U3 - R_calculated = bytes(a ^ b for (a,b) in zip (U3_calculated[0:32], X[0:32])) - PADDING_LEN_STR_calculated = bytes(a ^ b for (a,b) in zip (U3_calculated[32:34], X[32:34])) - - # Derive Message and Padding Lengths - PADDING_LEN_calculated = int.from_bytes(PADDING_LEN_STR_calculated, ENDIANNESS) - M_LEN_calculated = len(C) - PADDING_LEN_calculated - - # Authenticate R - # S4 = N || 0x00 0x00 0x00 0x4 || A_LEN_STR || M_LEN_STR || T || R - S4_calculated = N + byteStr(0x04, 4) + byteStr(len(A), 8) + byteStr(M_LEN_calculated, 8) + T_calculated + R_calculated - U4_calculated = hmac.new(K, S4_calculated, hashlib.sha512).digest() - Tag_calculated = U4_calculated[0:16] - - if(Tag == Tag_calculated): - print ("PASSED: Authentication") - else: - print ("FAILED: Authentication") - return None - - # Now that Ciphertext and other parameters are authenticated, we can decrypt Ciphertext to get Plaintext - # Derive Message Encryption key (KE) - # S1 = N || 0x00 0x00 0x00 0x1 || 0^{8} || 0^{8} || 0^{16} || R - S1_calculated = N + byteStr(0x01, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + R_calculated - U1_calculated = hmac.new(K, S1_calculated, hashlib.sha512).digest() - KE_calculated = U1_calculated[0:32] - - # Decrypt Ciphertext - PADDED_MSG_calculated = AES.new(key=KE_calculated, mode=AES.MODE_CTR, nonce=N).decrypt(C) - - # Strip off padding bytes - M_calculated = PADDED_MSG_calculated[0:M_LEN_calculated] - - if DEBUG_ENABLED: - print("\nDecryption Debug Info: ") - debugByteStr("S1_calculated", S1_calculated) - debugByteStr("S2_calculated", S2_calculated) - debugByteStr("S3_calculated", S3_calculated) - debugByteStr("S4_calculated", S4_calculated) - debugByteStr("U1_calculated", U1_calculated) - debugByteStr("U2_calculated", U2_calculated) - debugByteStr("U3_calculated", U3_calculated) - debugByteStr("U4_calculated", U4_calculated) - debugByteStr("T_calculated", T_calculated) - debugByteStr("R_calculated", R_calculated) - debugByteStr("KE_calculated", KE_calculated) - debugByteStr("KM_calculated", KM_calculated) - debugByteStr("PADDED_MSG_calculated", PADDED_MSG_calculated) - debugByteStr("M_calculated", M_calculated) - - return M_calculated - - -def main(argv): - # Generate Random 28 Byte Message - M = rndfile.read(28) - debugByteStr("M", M) - - # Generate Random 28 Byte Additional Authenticated Data - A = rndfile.read(28) - debugByteStr("A", A) - - # Generate Random 12 Byte Key - N = rndfile.read(12) - - # Frame Size. Messages will be padded up to the next Frame size before being encrypted. - F = 32 - debugInt("F", F) - - K = scram_generate_key() - - C, X, Tag = scram_encrypt(K, N, A, M, F) - - M_calculated = scram_decrypt(K, N, A, C, X, Tag) - - if(M != M_calculated): - print ("FAILED: Decryption") - else: - print("PASSED: Decryption") - - return - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) +# SCRAM mode Python script +import sys +import hmac +import hashlib +from Crypto.Cipher import AES +from Crypto import Random +rndfile = Random.new() + +DEBUG_ENABLED = True + +# When reading/writing byte strings, the first (aka left-most) byte is the Most Significant Byte (aka Big-Endian) +# (Eg "0x0001", 0x00 is the MSB and 0x01 is the LSB, meaning 0x0001 == 1) +ENDIANNESS = 'big' + + +def byteStr(val, numBytes): + return val.to_bytes(numBytes, ENDIANNESS) + + +def debugByteStr(debugStr, byteStrVal): + if DEBUG_ENABLED: + print(debugStr + ": 0x" + byteStrVal.hex().upper()) + + +def debugInt(debugStr, intVal): + if DEBUG_ENABLED: + print(debugStr + ": " + str(intVal)) + + +def scram_generate_key(): + # Generate Random 32 Byte Key + K = rndfile.read(32) + debugByteStr("K", K) + + return K + + +def scram_encrypt(K, N, A, M, F): + """ + SCRAM Encryption + + Parameters: + K: Key + N: Nonce + A: Additional Authenticated Data + M: Plaintext Message + F: Frame Size + + Returns: + C: Ciphertext + X: Excrypted R and Padding Len + Tag: Authentication Tag + """ + # Generate a random 32-byte value R + R = rndfile.read(32) + + # Prepare the Padding. We append 0x00 bytes to the end up to the next frame size. + M_LEN = len(M) + PADDING_LEN = 0 + + if (F > 0): + PADDING_LEN = (F - M_LEN) % F + + PADDING_STR = byteStr(0x0, PADDING_LEN) + PADDING_LEN_STR = byteStr(PADDING_LEN, 2) + PADDED_MSG = M + PADDING_STR + + debugInt("len(M)", M_LEN) + debugInt("PADDING_LEN", PADDING_LEN) + debugByteStr("PADDING_STR", PADDING_STR) + debugByteStr("PADDING_LEN_STR", PADDING_LEN_STR) + debugByteStr("PADDED_MSG", PADDED_MSG) + + # Derive Message encryption key (KE) + # S1 = N || 0x00 0x00 0x00 0x1 || 0^{8} || 0^{8} || 0^{16} || R + S1 = N + byteStr(0x01, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + R + U1 = hmac.new(K, S1, hashlib.sha512).digest() + KE = U1[0:32] + + # AES_CTR encrypt PADDED_MSG with Nonce N and Key KE + C = AES.new(key=KE, mode=AES.MODE_CTR, nonce=N).encrypt(PADDED_MSG) + + # Derive MAC Key (KM) used to with GMAC to generate T + # S2 = N || 0x00 0x00 0x00 0x2 || 0^{8} || 0^{8} || 0^{16} || 0^{32} + S2 = N + byteStr(0x02, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + byteStr(0x0, 32) + U2 = hmac.new(K, S2, hashlib.sha512).digest() + KM = U2[0:32] + + # GMAC the string A || C , using the GMAC key KM and nonce N + T = AES.new(key=KM, mode=AES.MODE_GCM, nonce=N).update(A + C).digest() + + # Derive a one-time pad (U3) from T + # S3 = N || 0x00 0x00 0x00 0x3 || 0^{8} || 0^{8} || T || 0^{32} + S3 = N + byteStr(0x03, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + T + byteStr(0x0, 32) + U3 = hmac.new(K, S3, hashlib.sha512).digest() + + # Encrypt R and PaddingLen with one-time pad U3 + Y1 = bytes(a ^ b for (a, b) in zip(U3[0:32], R)) + Y0 = bytes(a ^ b for (a, b) in zip(U3[32:34], PADDING_LEN_STR)) + X = Y1 + Y0 + + # Authenticate (Tag) T and R + # S4 = N || 0x00 0x00 0x00 0x4 || A_LEN_STR || M_LEN_STR || T || R + S4 = N + byteStr(0x04, 4) + byteStr(len(A), 8) + byteStr(M_LEN, 8) + T + R + U4 = hmac.new(K, S4, hashlib.sha512).digest() + + # Truncate to 16 bytes tag + Tag = U4[0:16] + + debugByteStr("S1", S1) + debugByteStr("S2", S2) + debugByteStr("S3", S3) + debugByteStr("S4", S4) + debugByteStr("U1", U1) + debugByteStr("U2", U2) + debugByteStr("U3", U3) + debugByteStr("U4", U4) + debugByteStr("Y0", Y0) + debugByteStr("Y1", Y1) + debugByteStr("T", T) + debugByteStr("KE", KE) + debugByteStr("KM", KM) + debugInt("len(C)", len(C)) + debugByteStr("C", C) + debugByteStr("X", X) + debugByteStr("Tag", Tag) + + return C, X, Tag + + +def scram_decrypt(K, N, A, C, X, Tag): + """ + SCRAM Decryption + + Parameters: + K: Key + N: Nonce + A: Additional Authenticated Data + C: Ciphertext + X: Encrypted Random value R and Padding Length + Tag: Tag + + Returns: + M_calculated: The decrypted Message + """ + + # Derive MAC key (KM) + # S2 = N || 0x00 0x00 0x00 0x2 || 0^{8} || 0^{8} || 0^{16} || 0^{32} + S2_calculated = N + byteStr(0x02, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + byteStr(0x0, 32) + U2_calculated = hmac.new(K, S2_calculated, hashlib.sha512).digest() + KM_calculated = U2_calculated[0:32] + + # Derive T + # T = GMAC (N, A||C, null) + T_calculated = AES.new(key=KM_calculated, mode=AES.MODE_GCM, nonce=N).update(A + C).digest() + + # Derive one-time pad U3 from T_calculated, + # S3 = N || 0x00 0x00 0x00 0x3 || 0^{8} || 0^{8} || T || 0^{32} + S3_calculated = N + byteStr(0x03, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + T_calculated + byteStr(0x0, 32) + U3_calculated = hmac.new(K, S3_calculated, hashlib.sha512).digest() + + # Decrypt R and PADDING_LEN, by xor'ing X and U3 + R_calculated = bytes(a ^ b for (a, b) in zip(U3_calculated[0:32], X[0:32])) + PADDING_LEN_STR_calculated = bytes(a ^ b for (a, b) in zip(U3_calculated[32:34], X[32:34])) + + # Derive Message and Padding Lengths + PADDING_LEN_calculated = int.from_bytes(PADDING_LEN_STR_calculated, ENDIANNESS) + M_LEN_calculated = len(C) - PADDING_LEN_calculated + + # Authenticate R + # S4 = N || 0x00 0x00 0x00 0x4 || A_LEN_STR || M_LEN_STR || T || R + S4_calculated = N + byteStr(0x04, 4) + byteStr(len(A), 8) + \ + byteStr(M_LEN_calculated, 8) + T_calculated + R_calculated + U4_calculated = hmac.new(K, S4_calculated, hashlib.sha512).digest() + Tag_calculated = U4_calculated[0:16] + + if (Tag == Tag_calculated): + print("PASSED: Authentication") + else: + print("FAILED: Authentication") + return None + + # Now that Ciphertext and other parameters are authenticated, we can decrypt Ciphertext to get Plaintext + # Derive Message Encryption key (KE) + # S1 = N || 0x00 0x00 0x00 0x1 || 0^{8} || 0^{8} || 0^{16} || R + S1_calculated = N + byteStr(0x01, 4) + byteStr(0x0, 8) + byteStr(0x0, 8) + byteStr(0x0, 16) + R_calculated + U1_calculated = hmac.new(K, S1_calculated, hashlib.sha512).digest() + KE_calculated = U1_calculated[0:32] + + # Decrypt Ciphertext + PADDED_MSG_calculated = AES.new(key=KE_calculated, mode=AES.MODE_CTR, nonce=N).decrypt(C) + + # Strip off padding bytes + M_calculated = PADDED_MSG_calculated[0:M_LEN_calculated] + + if DEBUG_ENABLED: + print("\nDecryption Debug Info: ") + debugByteStr("S1_calculated", S1_calculated) + debugByteStr("S2_calculated", S2_calculated) + debugByteStr("S3_calculated", S3_calculated) + debugByteStr("S4_calculated", S4_calculated) + debugByteStr("U1_calculated", U1_calculated) + debugByteStr("U2_calculated", U2_calculated) + debugByteStr("U3_calculated", U3_calculated) + debugByteStr("U4_calculated", U4_calculated) + debugByteStr("T_calculated", T_calculated) + debugByteStr("R_calculated", R_calculated) + debugByteStr("KE_calculated", KE_calculated) + debugByteStr("KM_calculated", KM_calculated) + debugByteStr("PADDED_MSG_calculated", PADDED_MSG_calculated) + debugByteStr("M_calculated", M_calculated) + + return M_calculated + + +def main(argv): + # Generate Random 28 Byte Message + M = rndfile.read(28) + debugByteStr("M", M) + + # Generate Random 28 Byte Additional Authenticated Data + A = rndfile.read(28) + debugByteStr("A", A) + + # Generate Random 12 Byte Key + N = rndfile.read(12) + + # Frame Size. Messages will be padded up to the next Frame size before being encrypted. + F = 32 + debugInt("F", F) + + K = scram_generate_key() + + C, X, Tag = scram_encrypt(K, N, A, M, F) + + M_calculated = scram_decrypt(K, N, A, C, X, Tag) + + if (M != M_calculated): + print("FAILED: Decryption") + else: + print("PASSED: Decryption") + + return + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/scripts/s2n_safety_macros.py b/scripts/s2n_safety_macros.py index 157b6170dd3..fbd5a5d977b 100644 --- a/scripts/s2n_safety_macros.py +++ b/scripts/s2n_safety_macros.py @@ -44,46 +44,46 @@ """ POSIX = dict( - name = "POSIX", - is_ok = "(result) > S2N_FAILURE", - ok = "S2N_SUCCESS", - error = "S2N_FAILURE", - ret = "int", - ret_doc = "`int` (POSIX error signal)", - expect_ok = "EXPECT_SUCCESS", - expect_err = "EXPECT_FAILURE_WITH_ERRNO", + name="POSIX", + is_ok="(result) > S2N_FAILURE", + ok="S2N_SUCCESS", + error="S2N_FAILURE", + ret="int", + ret_doc="`int` (POSIX error signal)", + expect_ok="EXPECT_SUCCESS", + expect_err="EXPECT_FAILURE_WITH_ERRNO", ) PTR = dict( - name = "PTR", - is_ok = "(result) != NULL", - ok = '"ok"', - error = "NULL", - ret = "const char*", - ret_doc = "a pointer", - expect_ok = "EXPECT_NOT_NULL", - expect_err = "EXPECT_NULL_WITH_ERRNO", + name="PTR", + is_ok="(result) != NULL", + ok='"ok"', + error="NULL", + ret="const char*", + ret_doc="a pointer", + expect_ok="EXPECT_NOT_NULL", + expect_err="EXPECT_NULL_WITH_ERRNO", ) RESULT = dict( - name = "RESULT", - is_ok = "s2n_result_is_ok(result)", - ok = "S2N_RESULT_OK", - error = "S2N_RESULT_ERROR", - ret = "s2n_result", - ret_doc = "`S2N_RESULT`", - expect_ok = "EXPECT_OK", - expect_err = "EXPECT_ERROR_WITH_ERRNO", + name="RESULT", + is_ok="s2n_result_is_ok(result)", + ok="S2N_RESULT_OK", + error="S2N_RESULT_ERROR", + ret="s2n_result", + ret_doc="`S2N_RESULT`", + expect_ok="EXPECT_OK", + expect_err="EXPECT_ERROR_WITH_ERRNO", ) DEFAULT = dict( - name = "", - is_ok = RESULT['is_ok'], - ok = RESULT['ok'], - error = RESULT['error'], - ret = RESULT['ret'], - expect_ok = RESULT['expect_ok'], - expect_err = RESULT['expect_err'], + name="", + is_ok=RESULT['is_ok'], + ok=RESULT['ok'], + error=RESULT['error'], + ret=RESULT['ret'], + expect_ok=RESULT['expect_ok'], + expect_err=RESULT['expect_err'], ) # TODO add DEFAULT and remove RESULT once all PR branches are up-to-date @@ -91,10 +91,11 @@ max_prefix_len = max(map(lambda c: len(c['name']), CONTEXTS)) + def cmp_check(op): return '__S2N_ENSURE((a) ' + op + ' (b), {bail}(S2N_ERR_SAFETY))' - ## TODO ensure type compatibility + # TODO ensure type compatibility # return '''\\ # do {{ \\ # static_assert(__builtin_types_compatible_p(__typeof(a), __typeof(b)), "types do not match"); \\ @@ -104,52 +105,53 @@ def cmp_check(op): # }} while(0) # ''' + MACROS = { 'BAIL(error)': dict( - doc = 'Sets the global `s2n_errno` to `error` and returns with an `{error}`', - impl = 'do {{ _S2N_ERROR((error)); __S2N_ENSURE_CHECKED_RETURN({error}); }} while (0)', - harness = ''' + doc='Sets the global `s2n_errno` to `error` and returns with an `{error}`', + impl='do {{ _S2N_ERROR((error)); __S2N_ENSURE_CHECKED_RETURN({error}); }} while (0)', + harness=''' static {ret} {bail}_harness() {{ {bail}(S2N_ERR_SAFETY); return {ok}; }} ''', - tests = [ + tests=[ '{expect_err}({bail}_harness(), S2N_ERR_SAFETY);' ], ), 'ENSURE(condition, error)': dict( - doc = 'Ensures the `condition` is `true`, otherwise the function will `{bail}` with `error`', - impl = '__S2N_ENSURE((condition), {bail}(error))', - harness = ''' + doc='Ensures the `condition` is `true`, otherwise the function will `{bail}` with `error`', + impl='__S2N_ENSURE((condition), {bail}(error))', + harness=''' static {ret} {prefix}ENSURE_harness(bool is_ok) {{ {prefix}ENSURE(is_ok, S2N_ERR_SAFETY); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_harness(true));', '{expect_err}({prefix}ENSURE_harness(false), S2N_ERR_SAFETY);' ], ), 'DEBUG_ENSURE(condition, error)': dict( - doc = ''' + doc=''' Ensures the `condition` is `true`, otherwise the function will `{bail}` with `error` NOTE: The condition will _only_ be checked when the code is compiled in debug mode. In release mode, the check is removed. ''', - impl = '__S2N_ENSURE_DEBUG((condition), {bail}(error))', - harness = ''' + impl='__S2N_ENSURE_DEBUG((condition), {bail}(error))', + harness=''' static {ret} {prefix}DEBUG_ENSURE_harness(bool is_ok) {{ {prefix}DEBUG_ENSURE(is_ok, S2N_ERR_SAFETY); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}DEBUG_ENSURE_harness(true));', '#ifdef NDEBUG', '{expect_ok}({prefix}DEBUG_ENSURE_harness(false));', @@ -159,30 +161,30 @@ def cmp_check(op): ], ), 'ENSURE_OK(result, error)': dict( - doc = ''' + doc=''' Ensures `{is_ok}`, otherwise the function will `{bail}` with `error` This can be useful for overriding the global `s2n_errno` ''', - impl = '__S2N_ENSURE({is_ok}, {bail}(error))', - harness = ''' + impl='__S2N_ENSURE({is_ok}, {bail}(error))', + harness=''' static {ret} {prefix}ENSURE_OK_harness(bool is_ok) {{ {prefix}ENSURE_OK({prefix}ENSURE_harness(is_ok), S2N_ERR_IO); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_OK_harness(true));', '{expect_err}({prefix}ENSURE_OK_harness(false), S2N_ERR_IO);' ], ), 'ENSURE_GTE(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is greater than or equal to `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('>='), - harness = ''' + impl=cmp_check('>='), + harness=''' static {ret} {prefix}ENSURE_GTE_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_GTE(a, b); @@ -199,7 +201,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_GTE_harness_uint32(0, 0));', '{expect_ok}({prefix}ENSURE_GTE_harness_uint32(1, 0));', '{expect_err}({prefix}ENSURE_GTE_harness_uint32(0, 1), S2N_ERR_SAFETY);', @@ -209,11 +211,11 @@ def cmp_check(op): ], ), 'ENSURE_LTE(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is less than or equal to `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('<='), - harness = ''' + impl=cmp_check('<='), + harness=''' static {ret} {prefix}ENSURE_LTE_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_LTE(a, b); @@ -230,7 +232,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_LTE_harness_uint32(0, 0));', '{expect_ok}({prefix}ENSURE_LTE_harness_uint32(0, 1));', '{expect_err}({prefix}ENSURE_LTE_harness_uint32(1, 0), S2N_ERR_SAFETY);', @@ -240,11 +242,11 @@ def cmp_check(op): ], ), 'ENSURE_GT(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is greater than `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('>'), - harness = ''' + impl=cmp_check('>'), + harness=''' static {ret} {prefix}ENSURE_GT_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_GT(a, b); @@ -261,7 +263,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_err}({prefix}ENSURE_GT_harness_uint32(0, 0), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_GT_harness_uint32(1, 0));', '{expect_err}({prefix}ENSURE_GT_harness_uint32(0, 1), S2N_ERR_SAFETY);', @@ -271,11 +273,11 @@ def cmp_check(op): ], ), 'ENSURE_LT(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is less than `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('<'), - harness = ''' + impl=cmp_check('<'), + harness=''' static {ret} {prefix}ENSURE_LT_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_LT(a, b); @@ -292,7 +294,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_err}({prefix}ENSURE_LT_harness_uint32(0, 0), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_LT_harness_uint32(0, 1));', '{expect_err}({prefix}ENSURE_LT_harness_uint32(1, 0), S2N_ERR_SAFETY);', @@ -302,11 +304,11 @@ def cmp_check(op): ], ), 'ENSURE_EQ(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is equal to `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('=='), - harness = ''' + impl=cmp_check('=='), + harness=''' static {ret} {prefix}ENSURE_EQ_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_EQ(a, b); @@ -321,7 +323,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_EQ_harness_uint32(0, 0));', '{expect_err}({prefix}ENSURE_EQ_harness_uint32(1, 0), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_EQ_harness_int32(-1, -1));', @@ -329,11 +331,11 @@ def cmp_check(op): ], ), 'ENSURE_NE(a, b)': dict( - doc = ''' + doc=''' Ensures `a` is not equal to `b`, otherwise the function will `{bail}` with a `S2N_ERR_SAFETY` error ''', - impl = cmp_check('!='), - harness = ''' + impl=cmp_check('!='), + harness=''' static {ret} {prefix}ENSURE_NE_harness_uint32(uint32_t a, uint32_t b) {{ {prefix}ENSURE_NE(a, b); @@ -348,7 +350,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_NE_harness_uint32(1, 0));', '{expect_err}({prefix}ENSURE_NE_harness_uint32(0, 0), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_NE_harness_int32(-2, -1));', @@ -356,8 +358,8 @@ def cmp_check(op): ], ), 'ENSURE_INCLUSIVE_RANGE(min, n, max)': dict( - doc = 'Ensures `min <= n <= max`, otherwise the function will `{bail}` with `S2N_ERR_SAFETY`', - impl = ''' \\ + doc='Ensures `min <= n <= max`, otherwise the function will `{bail}` with `S2N_ERR_SAFETY`', + impl=''' \\ do {{ \\ __typeof(n) __tmp_n = ( n ); \\ __typeof(n) __tmp_min = ( min ); \\ @@ -365,7 +367,7 @@ def cmp_check(op): {prefix}ENSURE_GTE(__tmp_n, __tmp_min); \\ {prefix}ENSURE_LTE(__tmp_n, __tmp_max); \\ }} while(0)''', - harness = ''' + harness=''' static {ret} {prefix}ENSURE_INCLUSIVE_RANGE_harness_uint32(uint32_t a, uint32_t b, uint32_t c) {{ {prefix}ENSURE_INCLUSIVE_RANGE(a, b, c); @@ -378,7 +380,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_err}({prefix}ENSURE_INCLUSIVE_RANGE_harness_uint32(1, 0, 2), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_INCLUSIVE_RANGE_harness_uint32(1, 1, 2));', '{expect_ok}({prefix}ENSURE_INCLUSIVE_RANGE_harness_uint32(1, 2, 2));', @@ -391,8 +393,8 @@ def cmp_check(op): ], ), 'ENSURE_EXCLUSIVE_RANGE(min, n, max)': dict( - doc = 'Ensures `min < n < max`, otherwise the function will `{bail}` with `S2N_ERR_SAFETY`', - impl = ''' \\ + doc='Ensures `min < n < max`, otherwise the function will `{bail}` with `S2N_ERR_SAFETY`', + impl=''' \\ do {{ \\ __typeof(n) __tmp_n = ( n ); \\ __typeof(n) __tmp_min = ( min ); \\ @@ -400,7 +402,7 @@ def cmp_check(op): {prefix}ENSURE_GT(__tmp_n, __tmp_min); \\ {prefix}ENSURE_LT(__tmp_n, __tmp_max); \\ }} while(0)''', - harness = ''' + harness=''' static {ret} {prefix}ENSURE_EXCLUSIVE_RANGE_harness_uint32(uint32_t a, uint32_t b, uint32_t c) {{ {prefix}ENSURE_EXCLUSIVE_RANGE(a, b, c); @@ -413,7 +415,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_err}({prefix}ENSURE_EXCLUSIVE_RANGE_harness_uint32(1, 0, 3), S2N_ERR_SAFETY);', '{expect_err}({prefix}ENSURE_EXCLUSIVE_RANGE_harness_uint32(1, 1, 3), S2N_ERR_SAFETY);', '{expect_ok}({prefix}ENSURE_EXCLUSIVE_RANGE_harness_uint32(1, 2, 3));', @@ -428,32 +430,32 @@ def cmp_check(op): ], ), 'ENSURE_REF(x)': dict( - doc = 'Ensures `x` is a readable reference, otherwise the function will `{bail}` with `S2N_ERR_NULL`', - impl = '__S2N_ENSURE(S2N_OBJECT_PTR_IS_READABLE(x), {bail}(S2N_ERR_NULL))', - harness = ''' + doc='Ensures `x` is a readable reference, otherwise the function will `{bail}` with `S2N_ERR_NULL`', + impl='__S2N_ENSURE(S2N_OBJECT_PTR_IS_READABLE(x), {bail}(S2N_ERR_NULL))', + harness=''' static {ret} {prefix}ENSURE_REF_harness(const char* str) {{ {prefix}ENSURE_REF(str); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}ENSURE_REF_harness(""));', '{expect_ok}({prefix}ENSURE_REF_harness("ok"));', '{expect_err}({prefix}ENSURE_REF_harness(NULL), S2N_ERR_NULL);', ], ), 'ENSURE_MUT(x)': dict( - doc = 'Ensures `x` is a mutable reference, otherwise the function will `{bail}` with `S2N_ERR_NULL`', - impl = '__S2N_ENSURE(S2N_OBJECT_PTR_IS_WRITABLE(x), {bail}(S2N_ERR_NULL))', - harness = ''' + doc='Ensures `x` is a mutable reference, otherwise the function will `{bail}` with `S2N_ERR_NULL`', + impl='__S2N_ENSURE(S2N_OBJECT_PTR_IS_WRITABLE(x), {bail}(S2N_ERR_NULL))', + harness=''' static {ret} {prefix}ENSURE_MUT_harness(uint32_t* v) {{ {prefix}ENSURE_MUT(v); return {ok}; }} ''', - tests = [ + tests=[ 'uint32_t {prefix}ensure_mut_test = 0;', '{expect_ok}({prefix}ENSURE_MUT_harness(&{prefix}ensure_mut_test));', '{prefix}ensure_mut_test = 1;', @@ -462,15 +464,15 @@ def cmp_check(op): ], ), 'PRECONDITION(result)': dict( - doc = ''' + doc=''' Ensures the `result` is `S2N_RESULT_OK`, otherwise the function will return an error signal `{prefix}PRECONDITION` should be used at the beginning of a function to make assertions about the provided arguments. By default, it is functionally equivalent to `{prefix}GUARD_RESULT(result)` but can be altered by a testing environment to provide additional guarantees. ''', - impl = '{prefix}GUARD_RESULT(__S2N_ENSURE_PRECONDITION((result)))', - harness = ''' + impl='{prefix}GUARD_RESULT(__S2N_ENSURE_PRECONDITION((result)))', + harness=''' static S2N_RESULT {prefix}PRECONDITION_harness_check(bool is_ok) {{ RESULT_ENSURE(is_ok, S2N_ERR_SAFETY); @@ -483,13 +485,13 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}PRECONDITION_harness({prefix}PRECONDITION_harness_check(true)));', '{expect_err}({prefix}PRECONDITION_harness({prefix}PRECONDITION_harness_check(false)), S2N_ERR_SAFETY);', ], ), 'POSTCONDITION(result)': dict( - doc = ''' + doc=''' Ensures the `result` is `S2N_RESULT_OK`, otherwise the function will return an error signal NOTE: The condition will _only_ be checked when the code is compiled in debug mode. @@ -500,8 +502,8 @@ def cmp_check(op): In production builds, it becomes a no-op. This can also be altered by a testing environment to provide additional guarantees. ''', - impl = '{prefix}GUARD_RESULT(__S2N_ENSURE_POSTCONDITION((result)))', - harness = ''' + impl='{prefix}GUARD_RESULT(__S2N_ENSURE_POSTCONDITION((result)))', + harness=''' static S2N_RESULT {prefix}POSTCONDITION_harness_check(bool is_ok) {{ RESULT_ENSURE(is_ok, S2N_ERR_SAFETY); @@ -514,7 +516,7 @@ def cmp_check(op): return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}POSTCONDITION_harness({prefix}POSTCONDITION_harness_check(true)));', '#ifdef NDEBUG', '{expect_ok}({prefix}POSTCONDITION_harness({prefix}POSTCONDITION_harness_check(false)));', @@ -524,7 +526,7 @@ def cmp_check(op): ], ), 'CHECKED_MEMCPY(destination, source, len)': dict( - doc = ''' + doc=''' Performs a safer memcpy. The following checks are performed: @@ -537,15 +539,15 @@ def cmp_check(op): * The size of the data pointed to by both the `destination` and `source` parameters, shall be at least `len` bytes. ''', - impl = '__S2N_ENSURE_SAFE_MEMCPY((destination), (source), (len), {prefix}ENSURE_REF)', - harness = ''' + impl='__S2N_ENSURE_SAFE_MEMCPY((destination), (source), (len), {prefix}ENSURE_REF)', + harness=''' static {ret} {prefix}CHECKED_MEMCPY_harness(uint32_t* dest, uint32_t* source, size_t len) {{ {prefix}CHECKED_MEMCPY(dest, source, len); return {ok}; }} ''', - tests = [ + tests=[ 'uint32_t {prefix}_checked_memcpy_dest = 1;', 'uint32_t {prefix}_checked_memcpy_source = 2;', '{expect_ok}({prefix}CHECKED_MEMCPY_harness(&{prefix}_checked_memcpy_dest, &{prefix}_checked_memcpy_source, 0));', @@ -557,7 +559,7 @@ def cmp_check(op): ], ), 'CHECKED_MEMSET(destination, value, len)': dict( - doc = ''' + doc=''' Performs a safer memset The following checks are performed: @@ -569,15 +571,15 @@ def cmp_check(op): * The size of the data pointed to by the `destination` parameter shall be at least `len` bytes. ''', - impl = '__S2N_ENSURE_SAFE_MEMSET((destination), (value), (len), {prefix}ENSURE_REF)', - harness = ''' + impl='__S2N_ENSURE_SAFE_MEMSET((destination), (value), (len), {prefix}ENSURE_REF)', + harness=''' static {ret} {prefix}CHECKED_MEMSET_harness(uint32_t* dest, uint8_t value, size_t len) {{ {prefix}CHECKED_MEMSET(dest, value, len); return {ok}; }} ''', - tests = [ + tests=[ 'uint32_t {prefix}_checked_memset_dest = 1;', '{expect_ok}({prefix}CHECKED_MEMSET_harness(&{prefix}_checked_memset_dest, 0x42, 0));', 'EXPECT_EQUAL({prefix}_checked_memset_dest, 1);', @@ -587,31 +589,31 @@ def cmp_check(op): ], ), 'GUARD(result)': dict( - doc = 'Ensures `{is_ok}`, otherwise the function will return `{error}`', - impl = '__S2N_ENSURE({is_ok}, __S2N_ENSURE_CHECKED_RETURN({error}))', - harness = ''' + doc='Ensures `{is_ok}`, otherwise the function will return `{error}`', + impl='__S2N_ENSURE({is_ok}, __S2N_ENSURE_CHECKED_RETURN({error}))', + harness=''' static {ret} {prefix}GUARD_harness({ret} result) {{ {prefix}GUARD(result); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}GUARD_harness({prefix}ENSURE_harness(true)));', '{expect_err}({prefix}GUARD_harness({prefix}ENSURE_harness(false)), S2N_ERR_SAFETY);', ], ), 'GUARD_OSSL(result, error)': dict( - doc = 'Ensures `result == _OSSL_SUCCESS`, otherwise the function will `{bail}` with `error`', - impl = '__S2N_ENSURE((result) == _OSSL_SUCCESS, {bail}(error))', - harness = ''' + doc='Ensures `result == _OSSL_SUCCESS`, otherwise the function will `{bail}` with `error`', + impl='__S2N_ENSURE((result) == _OSSL_SUCCESS, {bail}(error))', + harness=''' static {ret} {prefix}GUARD_OSSL_harness(int result, int error) {{ {prefix}GUARD_OSSL(result, error); return {ok}; }} ''', - tests = [ + tests=[ '{expect_ok}({prefix}GUARD_OSSL_harness(1, S2N_ERR_SAFETY));', '{expect_err}({prefix}GUARD_OSSL_harness(0, S2N_ERR_SAFETY), S2N_ERR_SAFETY);', ], @@ -620,6 +622,7 @@ def cmp_check(op): max_macro_len = max(map(len, MACROS.keys())) + 8 + def push_macro(args): macro_indent = ' ' * (max_macro_len - len(args['macro'])) @@ -643,6 +646,7 @@ def push_macro(args): return h + for context in CONTEXTS: # initialize contexts if len(context['name']) > 0: @@ -666,6 +670,7 @@ def push_macro(args): checks = [] deprecation_message = "DEPRECATED: all methods (except those in s2n.h) should return s2n_result." + def push_doc(args): args['doc'] = textwrap.dedent(args['doc']).format_map(args).strip() @@ -676,6 +681,7 @@ def push_doc(args): """).format_map(args) + for context in CONTEXTS: docs += textwrap.dedent(""" ## Macros for functions that return {ret_doc} @@ -712,7 +718,7 @@ def push_doc(args): doc = (deprecation_message + "\n\n" + doc) if other == context: - continue; + continue impl = '__S2N_ENSURE({is_ok}, __S2N_ENSURE_CHECKED_RETURN({error}))' args = { @@ -729,6 +735,7 @@ def push_doc(args): docs += push_doc(args) header += push_macro(args) + def cleanup(contents): # Remove any unnecessary generated "X_GUARD_X"s, like "RESULT_GUARD_RESULT" for context in CONTEXTS: @@ -737,11 +744,13 @@ def cleanup(contents): contents = contents.replace(x_guard_x, x_guard) return contents + def write(f, contents): contents = cleanup(contents) with open(f, "w") as header_file: header_file.write(contents) + write("utils/s2n_safety_macros.h", header) test = copyright + ''' @@ -781,4 +790,3 @@ def write(f, contents): write("tests/unit/s2n_safety_macros_test.c", test) write("docs/SAFETY-MACROS.md", docs) - diff --git a/tests/cbmc/proofs/lib/summarize.py b/tests/cbmc/proofs/lib/summarize.py index 50dbcc33ce4..ab3b526bc2c 100644 --- a/tests/cbmc/proofs/lib/summarize.py +++ b/tests/cbmc/proofs/lib/summarize.py @@ -139,5 +139,5 @@ def print_proof_results(out_file): logging.basicConfig(format="%(levelname)s: %(message)s") try: print_proof_results(args.run_file) - except Exception as ex: # pylint: disable=broad-except + except Exception as ex: # pylint: disable=broad-except logging.critical("Could not print results. Exception: %s", str(ex)) diff --git a/tests/cbmc/proofs/run-cbmc-proofs.py b/tests/cbmc/proofs/run-cbmc-proofs.py index e0f6454c146..9556be47078 100755 --- a/tests/cbmc/proofs/run-cbmc-proofs.py +++ b/tests/cbmc/proofs/run-cbmc-proofs.py @@ -237,6 +237,7 @@ def run_build(litani, jobs, fail_on_proof_failure, summarize): logging.error("One or more proofs failed") sys.exit(10) + def get_litani_path(proof_root): cmd = [ "make", @@ -301,7 +302,7 @@ def should_enable_pools(litani_caps, args): return "pools" in litani_caps -async def configure_proof_dirs( # pylint: disable=too-many-arguments +async def configure_proof_dirs( # pylint: disable=too-many-arguments queue, counter, proof_uids, enable_pools, enable_memory_profiling, report_target, debug): while True: print_counter(counter) @@ -350,7 +351,7 @@ def add_tool_version_job(): sys.exit(1) -async def main(): # pylint: disable=too-many-locals +async def main(): # pylint: disable=too-many-locals args = get_args() set_up_logging(args.verbose) diff --git a/tests/sidetrail/bin/bpl_trace_to_c.py b/tests/sidetrail/bin/bpl_trace_to_c.py index e730ec1a7df..bbeed38754c 100755 --- a/tests/sidetrail/bin/bpl_trace_to_c.py +++ b/tests/sidetrail/bin/bpl_trace_to_c.py @@ -7,6 +7,7 @@ assert len(args) == 2, "usage is " print args + def make_bpl_c_mapping(bpl_filename): linenum = 0 bpl_c_mapping = {} @@ -26,13 +27,14 @@ def make_bpl_c_mapping(bpl_filename): return bpl_c_mapping + def convert_trace_to_c(trace_filename, bpl_c_mapping): with open(trace_filename) as tracefile: for line in tracefile: matchObj = re.match(r'.*\.bpl\((\d+),\d+\).*', line) if matchObj: bpl_line = int(matchObj.group(1)) - if bpl_line in bpl_c_mapping: + if bpl_line in bpl_c_mapping: mapping = bpl_c_mapping[bpl_line] if len(mapping) > 0: print mapping[0] @@ -43,5 +45,6 @@ def convert_trace_to_c(trace_filename, bpl_c_mapping): print "\t\t", matchObj.group(0), matchObj.group(1) + bpl_c_mapping = make_bpl_c_mapping(args[0]) convert_trace_to_c(args[1], bpl_c_mapping) diff --git a/tests/unit/kats/generate_pq_hybrid_tls13_handshake_kats.py b/tests/unit/kats/generate_pq_hybrid_tls13_handshake_kats.py index 85e860e7175..f7608aa2ab2 100755 --- a/tests/unit/kats/generate_pq_hybrid_tls13_handshake_kats.py +++ b/tests/unit/kats/generate_pq_hybrid_tls13_handshake_kats.py @@ -210,9 +210,11 @@ }, ] + def hkdf_extract(key: bytes, info: bytes, hash_alg: str): return hmac.new(key, info, hash_alg).digest() + def hkdf_expand_label(key: bytes, label: str, context: bytes, hash_alg: str): label_arr = [0, hashlib.new(hash_alg).digest_size, len("tls13 ") + len(label)] for c in "tls13 ": @@ -228,6 +230,7 @@ def hkdf_expand_label(key: bytes, label: str, context: bytes, hash_alg: str): return hkdf_extract(key, bytearray(label_arr) + bytes([1]), hash_alg) + def compute_secrets(input_vector: dict): shared_secret = bytes.fromhex(input_vector["ec_shared_secret"] + input_vector["pq_shared_secret"]) hash_alg = input_vector["cipher_suite"].split("_")[-1].lower() @@ -241,11 +244,14 @@ def compute_secrets(input_vector: dict): secrets = {"early_secret": hkdf_extract(zeros, zeros, hash_alg)} secrets["derived_secret"] = hkdf_expand_label(secrets["early_secret"], "derived", empty_hash, hash_alg) secrets["handshake_secret"] = hkdf_extract(secrets["derived_secret"], shared_secret, hash_alg) - secrets["client_traffic_secret"] = hkdf_expand_label(secrets["handshake_secret"], "c hs traffic", transcript_hash, hash_alg) - secrets["server_traffic_secret"] = hkdf_expand_label(secrets["handshake_secret"], "s hs traffic", transcript_hash, hash_alg) + secrets["client_traffic_secret"] = hkdf_expand_label( + secrets["handshake_secret"], "c hs traffic", transcript_hash, hash_alg) + secrets["server_traffic_secret"] = hkdf_expand_label( + secrets["handshake_secret"], "s hs traffic", transcript_hash, hash_alg) return secrets + def main(): output = "" @@ -262,5 +268,6 @@ def main(): print(output) + if __name__ == '__main__': main()