Skip to content

Commit

Permalink
Fix code formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksihakli committed May 16, 2022
1 parent fffb539 commit 621dfa6
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 35 deletions.
6 changes: 5 additions & 1 deletion axes/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ def initialize(cls):
mode = "blocking by username or IP"
else:
mode = "blocking by IP only"
log.info("AXES: BEGIN version %s, %s", get_distribution("django-axes").version, mode)
log.info(
"AXES: BEGIN version %s, %s",
get_distribution("django-axes").version,
mode,
)

def ready(self):
self.initialize()
12 changes: 9 additions & 3 deletions axes/attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ def get_cool_off_threshold(attempt_time: Optional[datetime] = None) -> datetime:
return attempt_time - cool_off


def filter_user_attempts(request: HttpRequest, credentials: Optional[dict] = None) -> List[QuerySet]:
def filter_user_attempts(
request: HttpRequest, credentials: Optional[dict] = None
) -> List[QuerySet]:
"""
Return a list querysets of AccessAttempts that match the given request and credentials.
"""
Expand All @@ -45,7 +47,9 @@ def filter_user_attempts(request: HttpRequest, credentials: Optional[dict] = Non
return attempts_list


def get_user_attempts(request: HttpRequest, credentials: Optional[dict] = None) -> List[QuerySet]:
def get_user_attempts(
request: HttpRequest, credentials: Optional[dict] = None
) -> List[QuerySet]:
"""
Get list of querysets with valid user attempts that match the given request and credentials.
"""
Expand Down Expand Up @@ -84,7 +88,9 @@ def clean_expired_user_attempts(attempt_time: Optional[datetime] = None) -> int:
return count


def reset_user_attempts(request: HttpRequest, credentials: Optional[dict] = None) -> int:
def reset_user_attempts(
request: HttpRequest, credentials: Optional[dict] = None
) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
Expand Down
6 changes: 5 additions & 1 deletion axes/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ class AxesBackend(ModelBackend):

@toggleable
def authenticate(
self, request: HttpRequest, username: Optional[str] = None, password: Optional[str] = None, **kwargs: dict
self,
request: HttpRequest,
username: Optional[str] = None,
password: Optional[str] = None,
**kwargs: dict,
):
"""
Checks user lockout status and raises an exception if user is not allowed to log in.
Expand Down
8 changes: 6 additions & 2 deletions axes/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@

settings.AXES_DISABLE_ACCESS_LOG = getattr(settings, "AXES_DISABLE_ACCESS_LOG", False)

settings.AXES_ENABLE_ACCESS_FAILURE_LOG = getattr(settings, "AXES_ENABLE_ACCESS_FAILURE_LOG", False)
settings.AXES_ENABLE_ACCESS_FAILURE_LOG = getattr(
settings, "AXES_ENABLE_ACCESS_FAILURE_LOG", False
)

settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT = getattr(settings, "AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT", 1000)
settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT = getattr(
settings, "AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT", 1000
)

settings.AXES_HANDLER = getattr(
settings, "AXES_HANDLER", "axes.handlers.database.AxesDatabaseHandler"
Expand Down
6 changes: 4 additions & 2 deletions axes/handlers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ def reset_failure_logs(self, *, age_days: Optional[int] = None) -> int:
"""
return 0


def remove_out_of_limit_failure_logs(self, *, username: str, limit: Optional[int] = None) -> int:
def remove_out_of_limit_failure_logs(
self, *, username: str, limit: Optional[int] = None
) -> int:
"""Remove access failure logs that are over
AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT for user username.
Expand All @@ -199,6 +200,7 @@ def remove_out_of_limit_failure_logs(self, *, username: str, limit: Optional[int
"""
return 0


class AxesHandler(AbstractAxesHandler, AxesBaseHandler):
"""
Signal bare handler implementation without any storage backend.
Expand Down
10 changes: 5 additions & 5 deletions axes/handlers/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ def reset_failure_logs(self, *, age_days: Optional[int] = None) -> int:
return count

def remove_out_of_limit_failure_logs(
self,
*,
username: str,
limit: Optional[int] = settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT) -> int:
self,
*,
username: str,
limit: Optional[int] = settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT,
) -> int:
count = 0
failures = AccessFailureLog.objects.filter(username=username)
out_of_limit_failures_logs = failures.count() - limit
Expand Down Expand Up @@ -252,7 +253,6 @@ def user_login_failed(self, sender, credentials: dict, request=None, **kwargs):
)
self.remove_out_of_limit_failure_logs(username=username)


def user_logged_in(self, sender, request, user, **kwargs):
"""
When user logs in, update the AccessLog related to the user.
Expand Down
8 changes: 6 additions & 2 deletions axes/handlers/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ def reset_failure_logs(cls, *, age_days: Optional[int] = None) -> int:
return cls.get_implementation().reset_failure_logs(age_days=age_days)

@classmethod
def remove_out_of_limit_failure_logs(cls, *, username: str, limit: Optional[int] = None) -> int:
return cls.get_implementation().remove_out_of_limit_failure_logs(username=username)
def remove_out_of_limit_failure_logs(
cls, *, username: str, limit: Optional[int] = None
) -> int:
return cls.get_implementation().remove_out_of_limit_failure_logs(
username=username
)

@staticmethod
def update_request(request):
Expand Down
15 changes: 11 additions & 4 deletions axes/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ def get_credentials(username: Optional[str] = None, **kwargs) -> dict:
return credentials


def get_client_username(request: HttpRequest, credentials: Optional[dict] = None) -> str:
def get_client_username(
request: HttpRequest, credentials: Optional[dict] = None
) -> str:
"""
Resolve client username from the given request or credentials if supplied.
Expand Down Expand Up @@ -220,7 +222,8 @@ def make_cache_key_list(filter_kwargs_list):


def get_client_cache_key(
request_or_attempt: Union[HttpRequest, AccessBase], credentials: Optional[dict] = None
request_or_attempt: Union[HttpRequest, AccessBase],
credentials: Optional[dict] = None,
) -> str:
"""
Build cache key name from request or AccessAttempt object.
Expand Down Expand Up @@ -362,7 +365,9 @@ def get_lockout_message() -> str:
return settings.AXES_PERMALOCK_MESSAGE


def get_lockout_response(request: HttpRequest, credentials: Optional[dict] = None) -> HttpResponse:
def get_lockout_response(
request: HttpRequest, credentials: Optional[dict] = None
) -> HttpResponse:
if settings.AXES_LOCKOUT_CALLABLE:
if callable(settings.AXES_LOCKOUT_CALLABLE):
return settings.AXES_LOCKOUT_CALLABLE( # pylint: disable=not-callable
Expand Down Expand Up @@ -477,7 +482,9 @@ def is_client_method_whitelisted(request: HttpRequest) -> bool:
return False


def is_user_attempt_whitelisted(request: HttpRequest, credentials: Optional[dict] = None) -> bool:
def is_user_attempt_whitelisted(
request: HttpRequest, credentials: Optional[dict] = None
) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.
Expand Down
63 changes: 51 additions & 12 deletions axes/migrations/0008_accessfailurelog.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,64 @@
class Migration(migrations.Migration):

dependencies = [
('axes', '0007_alter_accessattempt_unique_together'),
("axes", "0007_alter_accessattempt_unique_together"),
]

operations = [
migrations.CreateModel(
name='AccessFailureLog',
name="AccessFailureLog",
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('user_agent', models.CharField(db_index=True, max_length=255, verbose_name='User Agent')),
('ip_address', models.GenericIPAddressField(db_index=True, null=True, verbose_name='IP Address')),
('username', models.CharField(db_index=True, max_length=255, null=True, verbose_name='Username')),
('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')),
('path_info', models.CharField(max_length=255, verbose_name='Path')),
('attempt_time', models.DateTimeField(auto_now_add=True, verbose_name='Attempt Time')),
('locked_out', models.BooleanField(blank=True, default=False, verbose_name='Access lock out')),
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"user_agent",
models.CharField(
db_index=True, max_length=255, verbose_name="User Agent"
),
),
(
"ip_address",
models.GenericIPAddressField(
db_index=True, null=True, verbose_name="IP Address"
),
),
(
"username",
models.CharField(
db_index=True,
max_length=255,
null=True,
verbose_name="Username",
),
),
(
"http_accept",
models.CharField(max_length=1025, verbose_name="HTTP Accept"),
),
("path_info", models.CharField(max_length=255, verbose_name="Path")),
(
"attempt_time",
models.DateTimeField(
auto_now_add=True, verbose_name="Attempt Time"
),
),
(
"locked_out",
models.BooleanField(
blank=True, default=False, verbose_name="Access lock out"
),
),
],
options={
'verbose_name': 'access failure',
'verbose_name_plural': 'access failures',
"verbose_name": "access failure",
"verbose_name_plural": "access failures",
},
),
]
6 changes: 4 additions & 2 deletions axes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ class Meta:


class AccessFailureLog(AccessBase):
locked_out = models.BooleanField(_("Access lock out"), null=False, blank=True, default=False)
locked_out = models.BooleanField(
_("Access lock out"), null=False, blank=True, default=False
)

def __str__(self):
locked_out_str = ' locked out' if self.locked_out else ''
locked_out_str = " locked out" if self.locked_out else ""
return f"Failed access: user {self.username}{locked_out_str} on {self.attempt_time} from {self.ip_address}"

class Meta:
Expand Down
4 changes: 3 additions & 1 deletion axes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
log = getLogger(__name__)


def reset(ip: Optional[str] = None, username: Optional[str] = None, ip_or_username=False) -> int:
def reset(
ip: Optional[str] = None, username: Optional[str] = None, ip_or_username=False
) -> int:
"""
Reset records that match IP or username, and return the count of removed attempts.
Expand Down

0 comments on commit 621dfa6

Please sign in to comment.