diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ffbc92..4e208b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.10.3 - TBD + +* Support input password string encoded with the `surrogatepass` error option + * This allows the caller to provide a password for a gMSA or machine account that could contain invalid surrogate pairs for both NTLM and Kerberos auth. + ## 0.10.2 - 2023-10-04 * Another rename of the `sspi` package dependency to `sspilib` diff --git a/src/spnego/_gss.py b/src/spnego/_gss.py index 33da624..380277e 100644 --- a/src/spnego/_gss.py +++ b/src/spnego/_gss.py @@ -145,16 +145,16 @@ def _get_gssapi_credential( if isinstance(cred, KerberosKeytab): username = cred.principal or "" - password = cred.keytab + password = to_bytes(cred.keytab) is_keytab = True else: username = cred.username - password = cred.password + password = _encode_kerb_password(cred.password) is_keytab = False raw_cred = _kinit( to_bytes(username), - to_bytes(password), + password, forwardable=forwardable, is_keytab=is_keytab, ) @@ -293,6 +293,40 @@ def _gss_acquire_cred_from_ccache( return gssapi_creds +def _encode_kerb_password( + value: str, +) -> bytes: + """Encode string to use for Kerberos passwords. + + Encodes the input string to use with Kerberos functions as a password. This + is a special encoding method to ensure that any invalid surrogate chars are + encoded as the replacement char U+FFFD. This is needed when dealing with + randomly generated passwords like gMSA or machine accounts. The raw UTF-16 + bytes can be encoded in a string with the following: + + b"...".decode("utf-16-le", errors="surrogatepass") + + The invalid surrogate pairs in the UTF-16 byte sequence will be preserved + in the str value allowing this function to replace it as needed. This + means the value can be used with both NTLM and Kerberos authentication with + the same value. + + Args: + value: The string to encode to bytes. + + Returns: + bytes: The encoded string value. + """ + b_data = [] + for c in value: + try: + b_data.append(c.encode("utf-8", errors="strict")) + except UnicodeEncodeError: + b_data.append(b"\xEF\xBF\xBD") + + return b"".join(b_data) + + class GSSAPIProxy(ContextProxy): """GSSAPI proxy class for GSSAPI on Linux. @@ -313,7 +347,6 @@ def __init__( options: NegotiateOptions = NegotiateOptions.none, **kwargs: typing.Any, ) -> None: - if not HAS_GSSAPI: raise ImportError("GSSAPIProxy requires the Python gssapi library: %s" % GSSAPI_IMP_ERR) diff --git a/src/spnego/_ntlm_raw/crypto.py b/src/spnego/_ntlm_raw/crypto.py index fe1135a..b16d50b 100644 --- a/src/spnego/_ntlm_raw/crypto.py +++ b/src/spnego/_ntlm_raw/crypto.py @@ -331,7 +331,7 @@ def lmowfv1(password: str) -> bytes: # Fix the password to upper case and pad the length to exactly 14 bytes. While it is true LM only authentication # will fail if the password exceeds 14 bytes typically it is used in conjunction with the NTv1 hash which has no # such restrictions. - b_password = password.upper().encode("utf-8").ljust(14, b"\x00")[:14] + b_password = password.upper().encode("utf-8", errors="surrogatepass").ljust(14, b"\x00")[:14] b_hash = io.BytesIO() for start, end in [(0, 7), (7, 14)]: @@ -366,7 +366,7 @@ def ntowfv1(password: str) -> bytes: if is_ntlm_hash(password): return base64.b16decode(password.split(":")[1].upper()) - return md4(password.encode("utf-16-le")) + return md4(password.encode("utf-16-le", errors="surrogatepass")) def ntowfv2(username: str, nt_hash: bytes, domain_name: typing.Optional[str]) -> bytes: diff --git a/src/spnego/_version.py b/src/spnego/_version.py index e86d169..300b5f3 100644 --- a/src/spnego/_version.py +++ b/src/spnego/_version.py @@ -1,4 +1,4 @@ # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) -__version__ = "0.10.2" +__version__ = "0.10.3" diff --git a/tests/test_gss.py b/tests/test_gss.py index 7718f98..294bf71 100644 --- a/tests/test_gss.py +++ b/tests/test_gss.py @@ -175,3 +175,27 @@ def test_gssapi_no_valid_acceptor_cred(): with pytest.raises(InvalidCredentialError, match="No applicable credentials available"): spnego._gss.GSSAPIProxy(cred, protocol=protocol, usage="accept") + + +@pytest.mark.parametrize( + "value, expected", + [ + ("", b""), + ("foo", b"foo"), + ("café", b"caf\xC3\xA9"), + ( + b"\xDD\xBA\xE2\xD9\x12\x53".decode("utf-16-le", errors="surrogatepass"), + b"\xEB\xAB\x9D\xEF\xBF\xBD\xE5\x8C\x92", + ), + ( + b"\xDD\xBA\xE2\xD9\x12\x53".decode("utf-16-le", errors="replace"), + b"\xEB\xAB\x9D\xEF\xBF\xBD\xE5\x8C\x92", + ), + ], +) +def test_encode_password( + value: str, + expected: bytes, +) -> None: + actual = spnego._gss._encode_kerb_password(value) + assert actual == expected diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 8dc5a00..88fb2f4 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -4,6 +4,7 @@ import base64 import os +import pathlib import re import socket @@ -14,6 +15,7 @@ import spnego.channel_bindings import spnego.iov from spnego._credential import CredentialCache +from spnego._ntlm_raw.crypto import md4 from spnego._ntlm_raw.messages import ( Authenticate, AvId, @@ -716,6 +718,72 @@ def test_ntlm_no_nt_v1_allowed(ntlm_cred, monkeypatch): s.step(auth) +def test_ntlm_with_invalid_surrogate_pair_pass( + monkeypatch: pytest.MonkeyPatch, + tmp_path: pathlib.Path, +) -> None: + username = "user" + + # This is a password for a gMSA account used as a test. It is technically + # a UTF-16-LE string value but contains invalid surrogate pairs which is + # important for testing. + b_gmsa_password = ( + b"\x91\x45\xC9\xD1\x1D\x74\xA9\xE2" + b"\x27\x5A\x4C\xBE\x13\xC1\xE2\xF4" + b"\x89\x94\x49\x5E\x01\x60\xDD\xBA" + b"\xE2\xD9\x12\x53\xF0\xEB\x96\x38" + b"\xB3\x8B\xD2\x17\xC9\xCC\x9B\xB6" + b"\xC7\xF0\xCC\x8F\xEB\x75\x03\x77" + b"\x30\xD3\xE2\x6C\xE6\x00\x04\x39" + b"\xF5\x5F\xD9\xA5\xD8\xEF\xB3\x9F" + b"\xDE\x4A\xB5\xC1\x51\xC0\x44\x3B" + b"\x66\xC6\xF4\x68\x8D\xE1\x78\xBE" + b"\x3D\x35\x34\xC7\x4A\x91\x6D\x7B" + b"\x3C\xE0\x6E\x1C\xE9\xA9\x96\x6E" + b"\xDA\x09\x6A\x39\x1A\x2E\x5F\xD2" + b"\x92\x86\x46\x3B\x8B\x9A\xD3\xCE" + b"\xED\x83\x03\x2A\x33\x0D\xBC\x06" + b"\x91\xC6\x0C\xB1\x69\x5D\x2D\x59" + b"\xE0\x66\x18\x99\x00\xD1\x5A\x55" + b"\x85\xA3\xA8\x23\x0E\xCC\x16\x08" + b"\xF9\xE1\x9D\xF7\x09\x24\x66\xB3" + b"\x56\x6D\xC8\x2B\x4D\x33\x7F\x1A" + b"\xED\x69\x24\x09\xB1\x0C\xD6\x51" + b"\xBB\x62\xD9\x82\xD4\xA6\x1D\x91" + b"\x6F\xC4\xB2\xB0\x45\x9A\x40\x5A" + b"\xEC\x81\x71\xA1\x48\xB3\x52\x37" + b"\x26\x72\x98\x01\x22\x31\xF2\xD8" + b"\xD4\x83\x7B\xF3\xCA\xD5\x81\x24" + b"\xDC\xA9\xC2\xBF\x6D\x8E\x87\x7D" + b"\x24\x87\x49\x6C\x46\xE6\x67\x8B" + b"\x10\x69\x00\x04\xCA\x17\x4B\xC8" + b"\x04\x33\x69\x06\x61\x57\xB9\xC7" + b"\x3B\xFC\x0A\xCD\x35\xCE\x61\xB9" + b"\x87\x3B\xFF\x3A\x2D\x55\x67\xF6" + ) + + tmp_creds = tmp_path / "ntlm.cred" + with open(tmp_creds, mode="w") as fd: + nt_hash = base64.b16encode(md4(b_gmsa_password)).decode() + fd.write(f"{username}:1:00000000000000000000000000000000:{nt_hash}:[U]:LCT-1589398321") + + monkeypatch.setenv("NTLM_USER_FILE", str(tmp_creds.absolute())) + + c = spnego.client( + username, + b_gmsa_password.decode("utf-16-le", errors="surrogatepass"), + hostname=socket.gethostname(), + options=spnego.NegotiateOptions.use_ntlm, + protocol="ntlm", + ) + s = spnego.server(options=spnego.NegotiateOptions.use_ntlm, protocol="ntlm") + + s.step(c.step(s.step(c.step()))) + + assert c.complete + assert s.complete + + @pytest.mark.parametrize( "client_opt", [