From 559d84473147f5e78a66df4b3a36c37f83e17ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Voron?= Date: Mon, 12 Feb 2024 17:54:10 +0100 Subject: [PATCH] Add bcrypt hasher --- pwdlib/_hash.py | 2 +- pwdlib/hashers/argon2.py | 6 ++-- pwdlib/hashers/base.py | 10 ++++-- pwdlib/hashers/bcrypt.py | 58 +++++++++++++++++++++++++++++++ pyproject.toml | 4 +++ tests/hashers/test_bcrypt.py | 67 ++++++++++++++++++++++++++++++++++++ tests/test_hash.py | 38 ++++++++++++++++++-- 7 files changed, 176 insertions(+), 9 deletions(-) create mode 100644 pwdlib/hashers/bcrypt.py create mode 100644 tests/hashers/test_bcrypt.py diff --git a/pwdlib/_hash.py b/pwdlib/_hash.py index 93c2776..eaa2400 100644 --- a/pwdlib/_hash.py +++ b/pwdlib/_hash.py @@ -36,6 +36,6 @@ def verify_and_update( else: updated_hash: typing.Union[str, None] = None if hasher != self.current_hasher or hasher.check_needs_rehash(hash): - updated_hash = hasher.hash(password) + updated_hash = self.current_hasher.hash(password) return True, updated_hash raise exceptions.UnknownHashError(hash) diff --git a/pwdlib/hashers/argon2.py b/pwdlib/hashers/argon2.py index acdf1d5..f1bc593 100644 --- a/pwdlib/hashers/argon2.py +++ b/pwdlib/hashers/argon2.py @@ -8,7 +8,7 @@ raise HasherNotAvailable("argon2") from e -from .base import HasherProtocol, ensure_str_hash +from .base import HasherProtocol, ensure_str class Argon2Hasher(HasherProtocol): @@ -17,7 +17,7 @@ def __init__(self) -> None: @classmethod def identify(cls, hash: typing.Union[str, bytes]) -> bool: - return ensure_str_hash(hash).startswith("$argon2id$") + return ensure_str(hash).startswith("$argon2id$") def hash( self, @@ -39,4 +39,4 @@ def verify( return False def check_needs_rehash(self, hash: typing.Union[str, bytes]) -> bool: - return self._hasher.check_needs_rehash(ensure_str_hash(hash)) + return self._hasher.check_needs_rehash(ensure_str(hash)) diff --git a/pwdlib/hashers/base.py b/pwdlib/hashers/base.py index 8d6b151..cd21167 100644 --- a/pwdlib/hashers/base.py +++ b/pwdlib/hashers/base.py @@ -1,8 +1,12 @@ import typing -def ensure_str_hash(hash: typing.Union[str, bytes]) -> str: - return hash.decode("ascii") if isinstance(hash, bytes) else typing.cast(str, hash) +def ensure_str(v: typing.Union[str, bytes]) -> str: + return v.decode("utf-8") if isinstance(v, bytes) else typing.cast(str, v) + + +def ensure_bytes(v: typing.Union[str, bytes]) -> bytes: + return v.encode("utf-8") if isinstance(v, str) else v class HasherProtocol(typing.Protocol): @@ -27,4 +31,4 @@ def check_needs_rehash(self, hash: typing.Union[str, bytes]) -> bool: ... -__all__ = ["HasherProtocol", "ensure_str_hash"] +__all__ = ["HasherProtocol", "ensure_str"] diff --git a/pwdlib/hashers/bcrypt.py b/pwdlib/hashers/bcrypt.py new file mode 100644 index 0000000..d60a4bc --- /dev/null +++ b/pwdlib/hashers/bcrypt.py @@ -0,0 +1,58 @@ +import re +import typing + +try: + import bcrypt +except ImportError as e: + from ..exceptions import HasherNotAvailable + + raise HasherNotAvailable("bcrypt") from e + +from .base import HasherProtocol, ensure_bytes, ensure_str + +_IDENTIFY_REGEX = ( + r"^\$(?P2[abxy])\$(?P\d{2})" + r"\$(?P[A-Za-z0-9+/.]{22})(?P[A-Za-z0-9+/.]{31})$" +) + + +def _match_regex_hash( + hash: typing.Union[str, bytes], +) -> typing.Optional[typing.Match[str]]: + return re.match(_IDENTIFY_REGEX, ensure_str(hash)) + + +class BcryptHasher(HasherProtocol): + def __init__( + self, rounds: int = 12, prefix: typing.Literal["2a", "2b"] = "2b" + ) -> None: + self.rounds = rounds + self.prefix = prefix.encode("utf-8") + + @classmethod + def identify(cls, hash: typing.Union[str, bytes]) -> bool: + return _match_regex_hash(hash) is not None + + def hash( + self, + password: typing.Union[str, bytes], + *, + salt: typing.Union[bytes, None] = None, + ) -> str: + if salt is None: + salt = bcrypt.gensalt(self.rounds, self.prefix) + return ensure_str(bcrypt.hashpw(ensure_bytes(password), salt)) + + def verify( + self, hash: typing.Union[str, bytes], password: typing.Union[str, bytes] + ) -> bool: + return bcrypt.checkpw(ensure_bytes(password), ensure_bytes(hash)) + + def check_needs_rehash(self, hash: typing.Union[str, bytes]) -> bool: + _hash_match = _match_regex_hash(hash) + if _hash_match is None: + return True + + return int(_hash_match.group("rounds")) != self.rounds or _hash_match.group( + "prefix" + ) != self.prefix.decode("utf-8") diff --git a/pyproject.toml b/pyproject.toml index 9f93f30..a3cbda2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ path = "pwdlib/__init__.py" python = "3.8" features = [ "argon2", + "bcrypt", ] dependencies = [ "mypy", @@ -82,6 +83,9 @@ dependencies = [ argon2 = [ "argon2-cffi ==23.1.0", ] +bcrypt = [ + "bcrypt ==4.1.2", +] [project.urls] Documentation = "https://frankie567.github.io/pwdlib/" diff --git a/tests/hashers/test_bcrypt.py b/tests/hashers/test_bcrypt.py new file mode 100644 index 0000000..fdda49f --- /dev/null +++ b/tests/hashers/test_bcrypt.py @@ -0,0 +1,67 @@ +import typing + +import pytest + +from pwdlib.hashers.bcrypt import BcryptHasher + +_PASSWORD = "herminetincture" + +_HASHER = BcryptHasher() +_HASH_STR = _HASHER.hash(_PASSWORD) +_HASH_BYTES = _HASH_STR.encode("ascii") + + +@pytest.fixture +def bcrypt_hasher() -> BcryptHasher: + return BcryptHasher() + + +@pytest.mark.parametrize( + "hash,result", + [ + (_HASH_STR, True), + (_HASH_BYTES, True), + ("INVALID_HASH", False), + (b"INVALID_HASH", False), + ], +) +def test_identify(hash: typing.Union[str, bytes], result: bool) -> None: + assert BcryptHasher.identify(hash) == result + + +def test_hash(bcrypt_hasher: BcryptHasher) -> None: + hash = bcrypt_hasher.hash("herminetincture") + assert isinstance(hash, str) + + +@pytest.mark.parametrize( + "hash,password,result", + [ + (_HASH_STR, _PASSWORD, True), + (_HASH_BYTES, _PASSWORD, True), + (_HASH_STR, "INVALID_PASSWORD", False), + (_HASH_BYTES, "INVALID_PASSWORD", False), + ], +) +def test_verify( + hash: typing.Union[str, bytes], + password: str, + result: bool, + bcrypt_hasher: BcryptHasher, +) -> None: + assert bcrypt_hasher.verify(hash, password) == result + + +def test_check_needs_rehash(bcrypt_hasher: BcryptHasher) -> None: + assert not bcrypt_hasher.check_needs_rehash(_HASH_STR) + assert not bcrypt_hasher.check_needs_rehash(_HASH_BYTES) + assert bcrypt_hasher.check_needs_rehash("INVALID_HASH") + assert bcrypt_hasher.check_needs_rehash(b"INVALID_HASH") + + bcrypt_hasher_different_rounds = BcryptHasher(rounds=10) + hash = bcrypt_hasher_different_rounds.hash("herminetincture") + assert bcrypt_hasher.check_needs_rehash(hash) + + bcrypt_hasher_different_prefix = BcryptHasher(prefix="2a") + hash = bcrypt_hasher_different_prefix.hash("herminetincture") + assert bcrypt_hasher.check_needs_rehash(hash) diff --git a/tests/test_hash.py b/tests/test_hash.py index 44d5fa6..307781f 100644 --- a/tests/test_hash.py +++ b/tests/test_hash.py @@ -4,16 +4,20 @@ from pwdlib import PasswordHash, exceptions from pwdlib.hashers.argon2 import Argon2Hasher +from pwdlib.hashers.bcrypt import BcryptHasher _PASSWORD = "herminetincture" _ARGON2_HASHER = Argon2Hasher() _ARGON2_HASH_STR = _ARGON2_HASHER.hash(_PASSWORD) +_BCRYPT_HASHER = BcryptHasher() +_BCRYPT_HASH_STR = _BCRYPT_HASHER.hash(_PASSWORD) + @pytest.fixture def password_hash() -> PasswordHash: - return PasswordHash((Argon2Hasher(),)) + return PasswordHash((Argon2Hasher(), BcryptHasher())) def test_hash(password_hash: PasswordHash) -> None: @@ -27,6 +31,8 @@ def test_hash(password_hash: PasswordHash) -> None: [ (_ARGON2_HASH_STR, _PASSWORD, True), (_ARGON2_HASH_STR, "INVALID_PASSWORD", False), + (_BCRYPT_HASH_STR, _PASSWORD, True), + (_BCRYPT_HASH_STR, "INVALID_PASSWORD", False), ], ) def test_verify( @@ -40,4 +46,32 @@ def test_verify( def test_verify_unknown_hash(password_hash: PasswordHash) -> None: with pytest.raises(exceptions.UnknownHashError): - assert password_hash.verify("INVALID_HASH", _PASSWORD) + password_hash.verify("INVALID_HASH", _PASSWORD) + + +@pytest.mark.parametrize( + "hash,password,result,has_updated_hash", + [ + (_ARGON2_HASH_STR, _PASSWORD, True, False), + (_ARGON2_HASH_STR, "INVALID_PASSWORD", False, False), + (_BCRYPT_HASH_STR, _PASSWORD, True, True), + (_BCRYPT_HASH_STR, "INVALID_PASSWORD", False, False), + ], +) +def test_verify_and_update( + hash: typing.Union[str, bytes], + password: str, + result: bool, + has_updated_hash: bool, + password_hash: PasswordHash, +) -> None: + valid, updated_hash = password_hash.verify_and_update(hash, password) + assert valid == result + assert updated_hash is not None if has_updated_hash else updated_hash is None + if updated_hash is not None: + assert password_hash.current_hasher.identify(updated_hash) + + +def test_verify_and_update_unknown_hash(password_hash: PasswordHash) -> None: + with pytest.raises(exceptions.UnknownHashError): + password_hash.verify_and_update("INVALID_HASH", _PASSWORD)