diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index b21c1f4a0a..f643713dcf 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -5,3 +5,5 @@ 6ec972f4dbb880bf0c7a11809e6c1ba194c9784c # upgrade code style to use f-strings using flynt 313b80f27f525441c449593a3aeaf38389f63c13 +# upgrade typing annotations using fix-future-annotations +b5324820b299b1fe7da0608f0cc8ec47f58b1e40 diff --git a/locust/argument_parser.py b/locust/argument_parser.py index 68373cb263..2c8f950f14 100644 --- a/locust/argument_parser.py +++ b/locust/argument_parser.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import os import platform import sys import textwrap -from typing import Dict, List, NamedTuple, Optional, Any +from typing import NamedTuple, Any import configargparse import locust @@ -38,11 +40,11 @@ def add_argument(self, *args, **kwargs) -> configargparse.Action: return action @property - def args_included_in_web_ui(self) -> Dict[str, configargparse.Action]: + def args_included_in_web_ui(self) -> dict[str, configargparse.Action]: return {a.dest: a for a in self._actions if hasattr(a, "include_in_web_ui") and a.include_in_web_ui} @property - def secret_args_included_in_web_ui(self) -> Dict[str, configargparse.Action]: + def secret_args_included_in_web_ui(self) -> dict[str, configargparse.Action]: return { a.dest: a for a in self._actions @@ -91,7 +93,7 @@ def find_locustfile(locustfile): # Implicit 'return None' if nothing was found -def find_locustfiles(locustfiles: List[str], is_directory: bool) -> List[str]: +def find_locustfiles(locustfiles: list[str], is_directory: bool) -> list[str]: """ Returns a list of relative file paths for the Locustfile Picker. If is_directory is True, locustfiles is expected to have a single index which is a directory that will be searched for @@ -176,7 +178,7 @@ def get_empty_argument_parser(add_help=True, default_config_files=DEFAULT_CONFIG return parser -def parse_locustfile_option(args=None) -> List[str]: +def parse_locustfile_option(args=None) -> list[str]: """ Construct a command line parser that is only used to parse the -f argument so that we can import the test scripts in case any of them adds additional command line arguments to the @@ -667,10 +669,10 @@ class UIExtraArgOptions(NamedTuple): default_value: str is_secret: bool help_text: str - choices: Optional[List[str]] = None + choices: list[str] | None = None -def ui_extra_args_dict(args=None) -> Dict[str, Dict[str, Any]]: +def ui_extra_args_dict(args=None) -> dict[str, dict[str, Any]]: """Get all the UI visible arguments""" locust_args = default_args_dict() @@ -691,7 +693,7 @@ def ui_extra_args_dict(args=None) -> Dict[str, Dict[str, Any]]: return extra_args -def locustfile_is_directory(locustfiles: List[str]) -> bool: +def locustfile_is_directory(locustfiles: list[str]) -> bool: """ If a user passes in a locustfile without a file extension and there is a directory with the same name, this function defaults to using the file and will raise a warning. diff --git a/locust/clients.py b/locust/clients.py index 460ff6ad2b..0bfff066fe 100644 --- a/locust/clients.py +++ b/locust/clients.py @@ -2,7 +2,7 @@ import re import time from contextlib import contextmanager -from typing import Generator, Optional +from typing import Generator from urllib.parse import urlparse, urlunparse import requests @@ -48,7 +48,7 @@ class HttpSession(requests.Session): and then mark it as successful even if the response code was not (i.e 500 or 404). """ - def __init__(self, base_url, request_event, user, *args, pool_manager: Optional[PoolManager] = None, **kwargs): + def __init__(self, base_url, request_event, user, *args, pool_manager: PoolManager | None = None, **kwargs): super().__init__(*args, **kwargs) self.base_url = base_url @@ -57,7 +57,7 @@ def __init__(self, base_url, request_event, user, *args, pool_manager: Optional[ # User can group name, or use the group context manager to gather performance statistics under a specific name # This is an alternative to passing in the "name" parameter to the requests function - self.request_name: Optional[str] = None + self.request_name: str | None = None # Check for basic authentication parsed_url = urlparse(self.base_url) @@ -301,7 +301,7 @@ def failure(self, exc): class LocustHttpAdapter(HTTPAdapter): - def __init__(self, pool_manager: Optional[PoolManager], *args, **kwargs): + def __init__(self, pool_manager: PoolManager | None, *args, **kwargs): self.poolmanager = pool_manager super().__init__(*args, **kwargs) @@ -323,5 +323,5 @@ def _failure(self): ) -Response.success = _success -Response.failure = _failure +Response.success = _success # type: ignore[attr-defined] +Response.failure = _failure # type: ignore[attr-defined] diff --git a/locust/contrib/fasthttp.py b/locust/contrib/fasthttp.py index 754dcee7ac..dd70dcb055 100644 --- a/locust/contrib/fasthttp.py +++ b/locust/contrib/fasthttp.py @@ -10,7 +10,7 @@ from ssl import SSLError import time import traceback -from typing import Callable, Optional, Tuple, Dict, Any, Generator, cast +from typing import Callable, Any, Generator, cast from http.cookiejar import CookieJar @@ -79,10 +79,10 @@ def __init__( self, environment: Environment, base_url: str, - user: Optional[User], + user: User | None, insecure=True, - client_pool: Optional[HTTPClientPool] = None, - ssl_context_factory: Optional[Callable] = None, + client_pool: HTTPClientPool | None = None, + ssl_context_factory: Callable | None = None, **kwargs, ): self.environment = environment @@ -313,7 +313,7 @@ class FastHttpUser(User): insecure: bool = True """Parameter passed to FastHttpSession. Default True, meaning no SSL verification.""" - default_headers: Optional[dict] = None + default_headers: dict | None = None """Parameter passed to FastHttpSession. Adds the listed headers to every request.""" concurrency: int = 10 @@ -321,10 +321,10 @@ class FastHttpUser(User): Note that setting this value has no effect when custom client_pool was given, and you need to spawn a your own gevent pool to use it (as Users only have one greenlet). See test_fasthttp.py / test_client_pool_concurrency for an example.""" - client_pool: Optional[HTTPClientPool] = None + client_pool: HTTPClientPool | None = None """HTTP client pool to use. If not given, a new pool is created per single user.""" - ssl_context_factory: Optional[Callable] = None + ssl_context_factory: Callable | None = None """A callable that return a SSLContext for overriding the default context created by the FastHttpSession.""" abstract = True @@ -360,7 +360,7 @@ def __init__(self, environment): @contextmanager def rest( - self, method, url, headers: Optional[dict] = None, **kwargs + self, method, url, headers: dict | None = None, **kwargs ) -> Generator[RestResponseContextManager, None, None]: """ A wrapper for self.client.request that: @@ -423,36 +423,36 @@ def rest_(self, method, url, name=None, **kwargs) -> Generator[RestResponseConte class FastRequest(CompatRequest): - payload: Optional[str] = None + payload: str | None = None @property - def body(self) -> Optional[str]: + def body(self) -> str | None: return self.payload class FastResponse(CompatResponse): - headers: Optional[Headers] = None + headers: Headers | None = None """Dict like object containing the response headers""" - _response: Optional[HTTPSocketPoolResponse] = None + _response: HTTPSocketPoolResponse | None = None - encoding: Optional[str] = None + encoding: str | None = None """In some cases setting the encoding explicitly is needed. If so, do it before calling .text""" - request: Optional[FastRequest] = None + request: FastRequest | None = None def __init__( self, ghc_response: HTTPSocketPoolResponse, - request: Optional[FastRequest] = None, - sent_request: Optional[str] = None, + request: FastRequest | None = None, + sent_request: str | None = None, ): super().__init__(ghc_response, request, sent_request) self.request = request @property - def text(self) -> Optional[str]: + def text(self) -> str | None: """ Returns the text content of the response as a decoded string """ @@ -472,7 +472,7 @@ def text(self) -> Optional[str]: return str(self.content, str(self.encoding), errors="replace") @property - def url(self) -> Optional[str]: + def url(self) -> str | None: """ Get "response" URL, which is the same as the request URL. This is a small deviation from HttpSession, which gets the final (possibly redirected) URL. """ @@ -527,11 +527,11 @@ class ErrorResponse: that doesn't have a real Response object attached. E.g. a socket error or similar """ - headers: Optional[Headers] = None + headers: Headers | None = None content = None status_code = 0 - error: Optional[Exception] = None - text: Optional[str] = None + error: Exception | None = None + text: str | None = None request: CompatRequest def __init__(self, url: str, request: CompatRequest): @@ -547,7 +547,7 @@ class LocustUserAgent(UserAgent): request_type = FastRequest valid_response_codes = frozenset([200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 301, 302, 303, 304, 307]) - def __init__(self, client_pool: Optional[HTTPClientPool] = None, **kwargs): + def __init__(self, client_pool: HTTPClientPool | None = None, **kwargs): super().__init__(**kwargs) if client_pool is not None: diff --git a/locust/debug.py b/locust/debug.py index 3afc560a4b..e411589c62 100644 --- a/locust/debug.py +++ b/locust/debug.py @@ -1,10 +1,11 @@ +from __future__ import annotations + from datetime import datetime, timezone import os import inspect import locust import locust.log from locust import User, argument_parser -from typing import Type, Optional from locust.env import Environment from locust.exception import CatchResponseError, RescheduleTask @@ -94,16 +95,16 @@ def on_request( print() -_env: Optional[Environment] = None # minimal Environment for debugging +_env: Environment | None = None # minimal Environment for debugging def run_single_user( - user_class: Type[User], + user_class: type[User], include_length=False, include_time=False, include_context=False, include_payload=False, - loglevel: Optional[str] = "WARNING", + loglevel: str | None = "WARNING", ): """ Runs a single User. Useful when you want to run a debugger. diff --git a/locust/dispatch.py b/locust/dispatch.py index fb744f5afb..dda49d46f4 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from collections import defaultdict import contextlib import itertools @@ -5,7 +7,7 @@ import time from collections.abc import Iterator from operator import attrgetter -from typing import Dict, Generator, List, TYPE_CHECKING, Optional, Tuple, Type, Set +from typing import Generator, TYPE_CHECKING import gevent @@ -49,7 +51,7 @@ class UsersDispatcher(Iterator): from 10 to 100. """ - def __init__(self, worker_nodes: List["WorkerNode"], user_classes: List[Type[User]]): + def __init__(self, worker_nodes: list[WorkerNode], user_classes: list[type[User]]): """ :param worker_nodes: List of worker nodes :param user_classes: The user classes @@ -79,16 +81,16 @@ def __init__(self, worker_nodes: List["WorkerNode"], user_classes: List[Type[Use self._current_user_count = self.get_current_user_count() - self._dispatcher_generator: Generator[Dict[str, Dict[str, int]], None, None] = None + self._dispatcher_generator: Generator[dict[str, dict[str, int]], None, None] = None self._user_generator = self._user_gen() self._worker_node_generator = itertools.cycle(self._worker_nodes) # To keep track of how long it takes for each dispatch iteration to compute - self._dispatch_iteration_durations: List[float] = [] + self._dispatch_iteration_durations: list[float] = [] - self._active_users: List[Tuple[WorkerNode, str]] = [] + self._active_users: list[tuple[WorkerNode, str]] = [] # TODO: Test that attribute is set when dispatching and unset when done dispatching self._dispatch_in_progress = False @@ -108,10 +110,10 @@ def dispatch_in_progress(self): return self._dispatch_in_progress @property - def dispatch_iteration_durations(self) -> List[float]: + def dispatch_iteration_durations(self) -> list[float]: return self._dispatch_iteration_durations - def __next__(self) -> Dict[str, Dict[str, int]]: + def __next__(self) -> dict[str, dict[str, int]]: users_on_workers = next(self._dispatcher_generator) # TODO: Is this necessary to copy the users_on_workers if we know # it won't be mutated by external code? @@ -131,7 +133,7 @@ def _sort_workers(self): # Sort again, first by index within host, to ensure Users get started evenly across hosts self._worker_nodes = sorted(self._worker_nodes, key=lambda worker: (worker._index_within_host, worker.id)) - def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: + def _dispatcher(self) -> Generator[dict[str, dict[str, int]], None, None]: self._dispatch_in_progress = True if self._rebalance: @@ -164,7 +166,7 @@ def _dispatcher(self) -> Generator[Dict[str, Dict[str, int]], None, None]: self._dispatch_in_progress = False - def new_dispatch(self, target_user_count: int, spawn_rate: float, user_classes: Optional[List] = None) -> None: + def new_dispatch(self, target_user_count: int, spawn_rate: float, user_classes: list | None = None) -> None: """ Initialize a new dispatch cycle. @@ -194,7 +196,7 @@ def new_dispatch(self, target_user_count: int, spawn_rate: float, user_classes: self._dispatch_iteration_durations.clear() - def add_worker(self, worker_node: "WorkerNode") -> None: + def add_worker(self, worker_node: WorkerNode) -> None: """ This method is to be called when a new worker connects to the master. When a new worker is added, the users dispatcher will flag that a rebalance is required @@ -207,7 +209,7 @@ def add_worker(self, worker_node: "WorkerNode") -> None: self._sort_workers() self._prepare_rebalance() - def remove_worker(self, worker_node: "WorkerNode") -> None: + def remove_worker(self, worker_node: WorkerNode) -> None: """ This method is similar to the above `add_worker`. When a worker disconnects (because of e.g. network failure, worker failure, etc.), this method will ensure that the next @@ -268,7 +270,7 @@ def _wait_between_dispatch_iteration_context(self) -> Generator[None, None, None sleep_duration = max(0.0, self._wait_between_dispatch - delta) gevent.sleep(sleep_duration) - def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: + def _add_users_on_workers(self) -> dict[str, dict[str, int]]: """Add users on the workers until the target number of users is reached for the current dispatch iteration :return: The users that we want to run on the workers @@ -290,7 +292,7 @@ def _add_users_on_workers(self) -> Dict[str, Dict[str, int]]: return self._users_on_workers - def _remove_users_from_workers(self) -> Dict[str, Dict[str, int]]: + def _remove_users_from_workers(self) -> dict[str, dict[str, int]]: """Remove users from the workers until the target number of users is reached for the current dispatch iteration :return: The users that we want to run on the workers @@ -318,8 +320,8 @@ def _get_user_current_count(self, user: str) -> int: def _distribute_users( self, target_user_count: int - ) -> Tuple[ - Dict[str, Dict[str, int]], Generator[Optional[str], None, None], itertools.cycle, List[Tuple["WorkerNode", str]] + ) -> tuple[ + dict[str, dict[str, int]], Generator[str | None, None, None], itertools.cycle, list[tuple[WorkerNode, str]] ]: """ This function might take some time to complete if the `target_user_count` is a big number. A big number @@ -349,7 +351,7 @@ def _distribute_users( return users_on_workers, user_gen, worker_gen, active_users - def _user_gen(self) -> Generator[Optional[str], None, None]: + def _user_gen(self) -> Generator[str | None, None, None]: """ This method generates users according to their weights using a smooth weighted round-robin algorithm implemented by https://github.com/linnik/roundrobin. @@ -361,7 +363,7 @@ def _user_gen(self) -> Generator[Optional[str], None, None]: less accurate during ramp-up/down. """ - def infinite_cycle_gen(users: List[Tuple[Type[User], int]]) -> itertools.cycle: + def infinite_cycle_gen(users: list[tuple[type[User], int]]) -> itertools.cycle: if not users: return itertools.cycle([None]) @@ -401,9 +403,9 @@ def infinite_cycle_gen(users: List[Tuple[Type[User], int]]) -> itertools.cycle: if self._try_dispatch_fixed: self._try_dispatch_fixed = False current_fixed_users_count = {u: self._get_user_current_count(u) for u in fixed_users} - spawned_classes: Set[str] = set() + spawned_classes: set[str] = set() while len(spawned_classes) != len(fixed_users): - user_name: Optional[str] = next(cycle_fixed_gen) + user_name: str | None = next(cycle_fixed_gen) if not user_name: break @@ -422,7 +424,7 @@ def infinite_cycle_gen(users: List[Tuple[Type[User], int]]) -> itertools.cycle: yield next(cycle_weighted_gen) @staticmethod - def _fast_users_on_workers_copy(users_on_workers: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]: + def _fast_users_on_workers_copy(users_on_workers: dict[str, dict[str, int]]) -> dict[str, dict[str, int]]: """deepcopy is too slow, so we use this custom copy function. The implementation was profiled and compared to other implementations such as dict-comprehensions diff --git a/locust/env.py b/locust/env.py index ee69017303..999775d939 100644 --- a/locust/env.py +++ b/locust/env.py @@ -1,12 +1,7 @@ +from __future__ import annotations + from operator import methodcaller -from typing import ( - Callable, - Dict, - List, - Type, - TypeVar, - Optional, -) +from typing import Callable, TypeVar from configargparse import Namespace @@ -27,27 +22,27 @@ class Environment: def __init__( self, *, - user_classes: Optional[List[Type[User]]] = None, - shape_class: Optional[LoadTestShape] = None, - tags: Optional[List[str]] = None, - locustfile: Optional[str] = None, - exclude_tags: Optional[List[str]] = None, - events: Optional[Events] = None, - host: Optional[str] = None, + user_classes: list[type[User]] | None = None, + shape_class: LoadTestShape | None = None, + tags: list[str] | None = None, + locustfile: str | None = None, + exclude_tags: list[str] | None = None, + events: Events | None = None, + host: str | None = None, reset_stats=False, - stop_timeout: Optional[float] = None, + stop_timeout: float | None = None, catch_exceptions=True, - parsed_options: Optional[Namespace] = None, - available_user_classes: Optional[Dict[str, User]] = None, - available_shape_classes: Optional[Dict[str, LoadTestShape]] = None, + parsed_options: Namespace | None = None, + available_user_classes: dict[str, User] | None = None, + available_shape_classes: dict[str, LoadTestShape] | None = None, ): - self.runner: Optional[Runner] = None + self.runner: Runner | None = None """Reference to the :class:`Runner ` instance""" - self.web_ui: Optional[WebUI] = None + self.web_ui: WebUI | None = None """Reference to the WebUI instance""" - self.process_exit_code: Optional[int] = None + self.process_exit_code: int | None = None """ If set it'll be the exit code of the Locust process """ @@ -63,7 +58,7 @@ def __init__( self.locustfile = locustfile """Filename (not path) of locustfile""" - self.user_classes: List[Type[User]] = user_classes or [] + self.user_classes: list[type[User]] = user_classes or [] """User classes that the runner will run""" self.shape_class = shape_class """A shape class to control the shape of the load test""" @@ -105,7 +100,7 @@ def __init__( def _create_runner( self, - runner_class: Type[RunnerType], + runner_class: type[RunnerType], *args, **kwargs, ) -> RunnerType: @@ -160,9 +155,9 @@ def create_web_ui( host="", port=8089, web_login: bool = False, - tls_cert: Optional[str] = None, - tls_key: Optional[str] = None, - stats_csv_writer: Optional[StatsCSV] = None, + tls_cert: str | None = None, + tls_key: str | None = None, + stats_csv_writer: StatsCSV | None = None, delayed_start=False, userclass_picker_is_active=False, modern_ui=False, @@ -244,7 +239,7 @@ def assign_equal_weights(self) -> None: """ for u in self.user_classes: u.weight = 1 - user_tasks: List[TaskSet | Callable] = [] + user_tasks: list[TaskSet | Callable] = [] tasks_frontier = u.tasks while len(tasks_frontier) != 0: t = tasks_frontier.pop() @@ -274,5 +269,5 @@ def _validate_shape_class_instance(self): ) @property - def user_classes_by_name(self) -> Dict[str, Type[User]]: + def user_classes_by_name(self) -> dict[str, type[User]]: return {u.__name__: u for u in self.user_classes} diff --git a/locust/event.py b/locust/event.py index d2889b1294..b1a7a719d3 100644 --- a/locust/event.py +++ b/locust/event.py @@ -1,8 +1,9 @@ +from __future__ import annotations import logging from . import log import traceback from contextlib import contextmanager -from typing import Generator, Any, Dict +from typing import Generator, Any import time from .exception import StopUser, RescheduleTask, RescheduleTaskImmediately, InterruptTaskSet @@ -52,7 +53,7 @@ def fire(self, *, reverse=False, **kwargs): @contextmanager def measure( self, request_type: str, name: str, response_length: int = 0, context=None - ) -> Generator[Dict[str, Any], None, None]: + ) -> Generator[dict[str, Any], None, None]: """Convenience method for firing the event with automatically calculated response time and automatically marking the request as failed if an exception is raised (this is really only useful for the *request* event) Example usage (in a task): @@ -230,11 +231,11 @@ class Events: """ def __init__(self): - # For backwarde compatibility use also values of class attributes + # For backward compatibility use also values of class attributes for name, value in vars(type(self)).items(): - if value == EventHook: - setattr(self, name, value()) + if value == "EventHook": + setattr(self, name, EventHook()) for name, value in self.__annotations__.items(): - if value == EventHook: - setattr(self, name, value()) + if value == "EventHook": + setattr(self, name, EventHook()) diff --git a/locust/input_events.py b/locust/input_events.py index 1e02f77811..eafcf08079 100644 --- a/locust/input_events.py +++ b/locust/input_events.py @@ -1,4 +1,6 @@ -from typing import Dict, Callable +from __future__ import annotations + +from typing import Callable import gevent import logging @@ -88,7 +90,7 @@ def get_poller(): return UnixKeyPoller() -def input_listener(key_to_func_map: Dict[str, Callable]): +def input_listener(key_to_func_map: dict[str, Callable]): def input_listener_func(): try: poller = get_poller() diff --git a/locust/main.py b/locust/main.py index 705df0aa4c..5a96410d75 100644 --- a/locust/main.py +++ b/locust/main.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import errno import logging import os @@ -8,7 +10,6 @@ import inspect import gevent import locust -from typing import Dict from . import log from .argument_parser import parse_locustfile_option, parse_options from .env import Environment @@ -83,7 +84,7 @@ def main(): locustfile = locustfiles[0] if locustfiles_length == 1 else None # Importing Locustfile(s) - setting available UserClasses and ShapeClasses to choose from in UI - user_classes: Dict[str, locust.User] = {} + user_classes: dict[str, locust.User] = {} available_user_classes = {} available_shape_classes = {} shape_class = None diff --git a/locust/rpc/protocol.py b/locust/rpc/protocol.py index 36b29fb30e..1da46fd902 100644 --- a/locust/rpc/protocol.py +++ b/locust/rpc/protocol.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import msgpack import datetime -from typing import Type try: from bson import ObjectId # type: ignore diff --git a/locust/runners.py b/locust/runners.py index e0e6e69d8c..42d96e765f 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import functools import json import logging @@ -17,15 +19,9 @@ from types import TracebackType from typing import ( TYPE_CHECKING, - Dict, Iterator, - List, NoReturn, ValuesView, - Set, - Optional, - Tuple, - Type, Any, cast, Callable, @@ -87,7 +83,7 @@ class ExceptionDict(TypedDict): count: int msg: str traceback: str - nodes: Set[str] + nodes: set[str] class Runner: @@ -101,29 +97,29 @@ class Runner: desired type. """ - def __init__(self, environment: "Environment") -> None: + def __init__(self, environment: Environment) -> None: self.environment = environment self.user_greenlets = Group() self.greenlet = Group() self.state = STATE_INIT - self.spawning_greenlet: Optional[gevent.Greenlet] = None - self.shape_greenlet: Optional[gevent.Greenlet] = None - self.shape_last_tick: Tuple[int, float] | Tuple[int, float, Optional[List[Type[User]]]] | None = None + self.spawning_greenlet: gevent.Greenlet | None = None + self.shape_greenlet: gevent.Greenlet | None = None + self.shape_last_tick: tuple[int, float] | tuple[int, float, list[type[User]] | None] | None = None self.current_cpu_usage: int = 0 self.cpu_warning_emitted: bool = False self.worker_cpu_warning_emitted: bool = False self.current_memory_usage: int = 0 self.greenlet.spawn(self.monitor_cpu_and_memory).link_exception(greenlet_exception_handler) - self.exceptions: Dict[int, ExceptionDict] = {} + self.exceptions: dict[int, ExceptionDict] = {} # Because of the way the ramp-up/ramp-down is implemented, target_user_classes_count # is only updated at the end of the ramp-up/ramp-down. # See https://github.com/locustio/locust/issues/1883#issuecomment-919239824 for context. - self.target_user_classes_count: Dict[str, int] = {} + self.target_user_classes_count: dict[str, int] = {} # target_user_count is set before the ramp-up/ramp-down occurs. self.target_user_count: int = 0 - self.custom_messages: Dict[str, Callable] = {} + self.custom_messages: dict[str, Callable] = {} - self._users_dispatcher: Optional[UsersDispatcher] = None + self._users_dispatcher: UsersDispatcher | None = None # set up event listeners for recording requests def on_request(request_type, name, response_time, response_length, exception=None, **_kwargs): @@ -134,7 +130,7 @@ def on_request(request_type, name, response_time, response_length, exception=Non self.environment.events.request.add_listener(on_request) self.connection_broken = False - self.final_user_classes_count: Dict[str, int] = {} # just for the ratio report, fills before runner stops + self.final_user_classes_count: dict[str, int] = {} # just for the ratio report, fills before runner stops # register listener that resets stats when spawning is complete def on_spawning_complete(user_count: int) -> None: @@ -151,11 +147,11 @@ def __del__(self) -> None: self.greenlet.kill(block=False) @property - def user_classes(self) -> List[Type[User]]: + def user_classes(self) -> list[type[User]]: return self.environment.user_classes @property - def user_classes_by_name(self) -> Dict[str, Type[User]]: + def user_classes_by_name(self) -> dict[str, type[User]]: return self.environment.user_classes_by_name @property @@ -163,7 +159,7 @@ def stats(self) -> RequestStats: return self.environment.stats @property - def errors(self) -> Dict[str, StatsError]: + def errors(self) -> dict[str, StatsError]: return self.stats.errors @property @@ -174,7 +170,7 @@ def user_count(self) -> int: return len(self.user_greenlets) @property - def user_classes_count(self) -> Dict[str, int]: + def user_classes_count(self) -> dict[str, int]: """ :returns: Number of currently running users for each user class """ @@ -214,7 +210,7 @@ def cpu_log_warning(self) -> bool: ) return self.cpu_warning_emitted - def spawn_users(self, user_classes_spawn_count: Dict[str, int], wait: bool = False): + def spawn_users(self, user_classes_spawn_count: dict[str, int], wait: bool = False): if self.state == STATE_INIT or self.state == STATE_STOPPED: self.update_state(STATE_SPAWNING) @@ -223,9 +219,9 @@ def spawn_users(self, user_classes_spawn_count: Dict[str, int], wait: bool = Fal % (json.dumps(user_classes_spawn_count), json.dumps(self.user_classes_count)) ) - def spawn(user_class: str, spawn_count: int) -> List[User]: + def spawn(user_class: str, spawn_count: int) -> list[User]: n = 0 - new_users: List[User] = [] + new_users: list[User] = [] while n < spawn_count: new_user = self.user_classes_by_name[user_class](self.environment) new_user.start(self.user_greenlets) @@ -236,7 +232,7 @@ def spawn(user_class: str, spawn_count: int) -> List[User]: logger.debug("All users of class %s spawned" % user_class) return new_users - new_users: List[User] = [] + new_users: list[User] = [] for user_class, spawn_count in user_classes_spawn_count.items(): new_users += spawn(user_class, spawn_count) @@ -245,7 +241,7 @@ def spawn(user_class: str, spawn_count: int) -> List[User]: logger.info("All users stopped\n") return new_users - def stop_users(self, user_classes_stop_count: Dict[str, int]) -> None: + def stop_users(self, user_classes_stop_count: dict[str, int]) -> None: async_calls_to_stop = Group() stop_group = Group() @@ -253,7 +249,7 @@ def stop_users(self, user_classes_stop_count: Dict[str, int]) -> None: if self.user_classes_count[user_class] == 0: continue - to_stop: List[greenlet.greenlet] = [] + to_stop: list[greenlet.greenlet] = [] for user_greenlet in self.user_greenlets: if len(to_stop) == stop_count: break @@ -313,12 +309,12 @@ def monitor_cpu_and_memory(self) -> NoReturn: @abstractmethod def start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: Optional[List[Type[User]]] = None + self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None ) -> None: ... @abstractmethod - def send_message(self, msg_type: str, data: Optional[Any] = None, client_id: Optional[str] = None) -> None: + def send_message(self, msg_type: str, data: Any | None = None, client_id: str | None = None) -> None: ... def start_shape(self) -> None: @@ -462,9 +458,7 @@ def on_user_error(user_instance, exception, tb): self.environment.events.user_error.add_listener(on_user_error) - def _start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: Optional[list] = None - ) -> None: + def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list | None = None) -> None: """ Start running a load test @@ -507,8 +501,8 @@ def _start( try: for dispatched_users in self._users_dispatcher: - user_classes_spawn_count: Dict[str, int] = {} - user_classes_stop_count: Dict[str, int] = {} + user_classes_spawn_count: dict[str, int] = {} + user_classes_stop_count: dict[str, int] = {} user_classes_count = dispatched_users[self._local_worker_node.id] logger.debug("Ramping to %s" % _format_user_classes_count_for_log(user_classes_count)) for user_class_name, user_class_count in user_classes_count.items(): @@ -546,7 +540,7 @@ def _start( self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values())) def start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: Optional[List[Type[User]]] = None + self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None ) -> None: if spawn_rate > 100: logger.warning( @@ -566,7 +560,7 @@ def stop(self) -> None: return super().stop() - def send_message(self, msg_type: str, data: Optional[Any] = None, client_id: Optional[str] = None) -> None: + def send_message(self, msg_type: str, data: Any | None = None, client_id: str | None = None) -> None: """ Emulates internodal messaging by calling registered listeners @@ -597,7 +591,7 @@ def __init__(self, id: str, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVEN self.cpu_warning_emitted = False self.memory_usage: int = 0 # The reported users running on the worker - self.user_classes_count: Dict[str, int] = {} + self.user_classes_count: dict[str, int] = {} @property def user_count(self) -> int: @@ -606,9 +600,9 @@ def user_count(self) -> int: class WorkerNodes(MutableMapping): def __init__(self): - self._worker_nodes: Dict[str, WorkerNode] = {} + self._worker_nodes: dict[str, WorkerNode] = {} - def get_by_state(self, state) -> List[WorkerNode]: + def get_by_state(self, state) -> list[WorkerNode]: return [c for c in self.values() if c.state == state] @property @@ -616,19 +610,19 @@ def all(self) -> ValuesView[WorkerNode]: return self.values() @property - def ready(self) -> List[WorkerNode]: + def ready(self) -> list[WorkerNode]: return self.get_by_state(STATE_INIT) @property - def spawning(self) -> List[WorkerNode]: + def spawning(self) -> list[WorkerNode]: return self.get_by_state(STATE_SPAWNING) @property - def running(self) -> List[WorkerNode]: + def running(self) -> list[WorkerNode]: return self.get_by_state(STATE_RUNNING) @property - def missing(self) -> List[WorkerNode]: + def missing(self) -> list[WorkerNode]: return self.get_by_state(STATE_MISSING) def __setitem__(self, k: str, v: WorkerNode) -> None: @@ -687,13 +681,13 @@ def __init__(self, environment, master_bind_host, master_bind_port): else: raise - self._users_dispatcher: Optional[UsersDispatcher] = None + self._users_dispatcher: UsersDispatcher | None = None self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler) self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler) # listener that gathers info on how many users the worker has spawned - def on_worker_report(client_id: str, data: Dict[str, Any]) -> None: + def on_worker_report(client_id: str, data: dict[str, Any]) -> None: if client_id not in self.clients: logger.info("Discarded report from unrecognized worker %s", client_id) return @@ -702,7 +696,7 @@ def on_worker_report(client_id: str, data: Dict[str, Any]) -> None: self.environment.events.worker_report.add_listener(on_worker_report) # register listener that sends quit message to worker nodes - def on_quitting(environment: "Environment", **kw): + def on_quitting(environment: Environment, **kw): self.quit() self.environment.events.quitting.add_listener(on_quitting) @@ -737,7 +731,7 @@ def cpu_log_warning(self) -> bool: return warning_emitted def start( - self, user_count: int, spawn_rate: float, wait=False, user_classes: Optional[List[Type[User]]] = None + self, user_count: int, spawn_rate: float, wait=False, user_classes: list[type[User]] | None = None ) -> None: self.spawning_completed = False @@ -1128,14 +1122,14 @@ def worker_count(self) -> int: return len(self.clients.ready) + len(self.clients.spawning) + len(self.clients.running) @property - def reported_user_classes_count(self) -> Dict[str, int]: - reported_user_classes_count: Dict[str, int] = defaultdict(lambda: 0) + def reported_user_classes_count(self) -> dict[str, int]: + reported_user_classes_count: dict[str, int] = defaultdict(lambda: 0) for client in self.clients.ready + self.clients.spawning + self.clients.running: for name, count in client.user_classes_count.items(): reported_user_classes_count[name] += count return reported_user_classes_count - def send_message(self, msg_type: str, data: Optional[Dict[str, Any]] = None, client_id: Optional[str] = None): + def send_message(self, msg_type: str, data: dict[str, Any] | None = None, client_id: str | None = None): """ Sends a message to attached worker node(s) @@ -1165,7 +1159,7 @@ class WorkerRunner(DistributedRunner): # the worker index is set on ACK, if master provided it (masters <= 2.10.2 do not provide it) worker_index = -1 - def __init__(self, environment: "Environment", master_host: str, master_port: int) -> None: + def __init__(self, environment: Environment, master_host: str, master_port: int) -> None: """ :param environment: Environment instance :param master_host: Host/IP to use for connection to the master @@ -1174,14 +1168,14 @@ def __init__(self, environment: "Environment", master_host: str, master_port: in super().__init__(environment) self.retry = 0 self.connected = False - self.last_heartbeat_timestamp: Optional[float] = None + self.last_heartbeat_timestamp: float | None = None self.connection_event = Event() self.worker_state = STATE_INIT self.client_id = socket.gethostname() + "_" + uuid4().hex self.master_host = master_host self.master_port = master_port self.worker_cpu_warning_emitted = False - self._users_dispatcher: Optional[UsersDispatcher] = None + self._users_dispatcher: UsersDispatcher | None = None self.client = rpc.Client(master_host, master_port, self.client_id) self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler) self.connect_to_master() @@ -1204,14 +1198,14 @@ def on_spawning_complete(user_count: int) -> None: self.environment.events.spawning_complete.add_listener(on_spawning_complete) # register listener that adds the current number of spawned users to the report that is sent to the master node - def on_report_to_master(client_id: str, data: Dict[str, Any]): + def on_report_to_master(client_id: str, data: dict[str, Any]): data["user_classes_count"] = self.user_classes_count data["user_count"] = self.user_count self.environment.events.report_to_master.add_listener(on_report_to_master) # register listener that sends quit message to master - def on_quitting(environment: "Environment", **kw) -> None: + def on_quitting(environment: Environment, **kw) -> None: self.client.send(Message("quit", None, self.client_id)) self.environment.events.quitting.add_listener(on_quitting) @@ -1224,11 +1218,11 @@ def on_user_error(user_instance: User, exception: Exception, tb: TracebackType) self.environment.events.user_error.add_listener(on_user_error) def start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: Optional[List[Type[User]]] = None + self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None ) -> None: raise NotImplementedError("use start_worker") - def start_worker(self, user_classes_count: Dict[str, int], **kwargs) -> None: + def start_worker(self, user_classes_count: dict[str, int], **kwargs) -> None: """ Start running a load test as a worker @@ -1241,8 +1235,8 @@ def start_worker(self, user_classes_count: Dict[str, int], **kwargs) -> None: if self.environment.host: user_class.host = self.environment.host - user_classes_spawn_count: Dict[str, int] = {} - user_classes_stop_count: Dict[str, int] = {} + user_classes_spawn_count: dict[str, int] = {} + user_classes_stop_count: dict[str, int] = {} for user_class_name, user_class_count in user_classes_count.items(): if self.user_classes_count[user_class_name] > user_class_count: @@ -1378,9 +1372,7 @@ def stats_reporter(self) -> NoReturn: logger.error(f"Temporary connection lost to master server: {e}, will retry later.") gevent.sleep(WORKER_REPORT_INTERVAL) - def send_message( - self, msg_type: str, data: Optional[Dict[str, Any]] = None, client_id: Optional[str] = None - ) -> None: + def send_message(self, msg_type: str, data: dict[str, Any] | None = None, client_id: str | None = None) -> None: """ Sends a message to master node @@ -1392,7 +1384,7 @@ def send_message( self.client.send(Message(msg_type, data, self.client_id)) def _send_stats(self) -> None: - data: Dict[str, Any] = {} + data: dict[str, Any] = {} self.environment.events.report_to_master.fire(client_id=self.client_id, data=data) self.client.send(Message("stats", data, self.client_id)) @@ -1419,14 +1411,14 @@ def connect_to_master(self): self.connected = True -def _format_user_classes_count_for_log(user_classes_count: Dict[str, int]) -> str: +def _format_user_classes_count_for_log(user_classes_count: dict[str, int]) -> str: return "{} ({} total users)".format( json.dumps(dict(sorted(user_classes_count.items(), key=itemgetter(0)))), sum(user_classes_count.values()), ) -def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]: +def _aggregate_dispatched_users(d: dict[str, dict[str, int]]) -> dict[str, int]: # TODO: Test it user_classes = list(next(iter(d.values())).keys()) return {u: sum(d[u] for d in d.values()) for u in user_classes} diff --git a/locust/shape.py b/locust/shape.py index 77417d13a9..264e6afc3a 100644 --- a/locust/shape.py +++ b/locust/shape.py @@ -1,6 +1,6 @@ from __future__ import annotations import time -from typing import ClassVar, Optional, Tuple, List, Type +from typing import ClassVar from abc import ABCMeta, abstractmethod from . import User @@ -23,7 +23,7 @@ class LoadTestShape(metaclass=LoadTestShapeMeta): Base class for custom load shapes. """ - runner: Optional[Runner] = None + runner: Runner | None = None """Reference to the :class:`Runner ` instance""" abstract: ClassVar[bool] = True @@ -52,7 +52,7 @@ def get_current_user_count(self): return self.runner.user_count @abstractmethod - def tick(self) -> Tuple[int, float] | Tuple[int, float, Optional[List[Type[User]]]] | None: + def tick(self) -> tuple[int, float] | tuple[int, float, list[type[User]] | None] | None: """ Returns a tuple with 2 elements to control the running load test: diff --git a/locust/stats.py b/locust/stats.py index 38ce9721b1..4cb2e3c0dd 100644 --- a/locust/stats.py +++ b/locust/stats.py @@ -18,12 +18,8 @@ from typing import ( TYPE_CHECKING, Any, - Dict, Iterable, NoReturn, - Tuple, - List, - Optional, OrderedDict as OrderedDictType, Callable, TypeVar, @@ -66,18 +62,18 @@ class StatsBaseDict(TypedDict): class StatsEntryDict(StatsBaseDict): - last_request_timestamp: Optional[float] + last_request_timestamp: float | None start_time: float num_requests: int num_none_requests: int num_failures: int total_response_time: int max_response_time: int - min_response_time: Optional[int] + min_response_time: int | None total_content_length: int - response_times: Dict[int, int] - num_reqs_per_sec: Dict[int, int] - num_fail_per_sec: Dict[int, int] + response_times: dict[int, int] + num_reqs_per_sec: dict[int, int] + num_fail_per_sec: dict[int, int] class StatsErrorDict(StatsBaseDict): @@ -93,7 +89,7 @@ class StatsHolder(Protocol): S = TypeVar("S", bound=StatsHolder) -def resize_handler(signum: int, frame: Optional[FrameType]): +def resize_handler(signum: int, frame: FrameType | None): global STATS_NAME_WIDTH if STATS_AUTORESIZE: try: @@ -139,7 +135,7 @@ class RequestStatsAdditionError(Exception): pass -def get_readable_percentiles(percentile_list: List[float]) -> List[str]: +def get_readable_percentiles(percentile_list: list[float]) -> list[str]: """ Converts a list of percentiles from 0-1 fraction to 0%-100% view for using in console & csv reporting :param percentile_list: The list of percentiles in range 0-1 @@ -151,7 +147,7 @@ def get_readable_percentiles(percentile_list: List[float]) -> List[str]: ] -def calculate_response_time_percentile(response_times: Dict[int, int], num_requests: int, percent: float) -> int: +def calculate_response_time_percentile(response_times: dict[int, int], num_requests: int, percent: float) -> int: """ Get the response time that a certain number of percent of the requests finished within. Arguments: @@ -172,7 +168,7 @@ def calculate_response_time_percentile(response_times: Dict[int, int], num_reque return 0 -def diff_response_time_dicts(latest: Dict[int, int], old: Dict[int, int]) -> Dict[int, int]: +def diff_response_time_dicts(latest: dict[int, int], old: dict[int, int]) -> dict[int, int]: """ Returns the delta between two {response_times:request_count} dicts. @@ -212,8 +208,8 @@ def __init__(self, use_response_times_cache=True): is not needed. """ self.use_response_times_cache = use_response_times_cache - self.entries: Dict[Tuple[str, str], StatsEntry] = EntriesDict(self) - self.errors: Dict[str, StatsError] = {} + self.entries: dict[tuple[str, str], StatsEntry] = EntriesDict(self) + self.errors: dict[str, StatsError] = {} self.total = StatsEntry(self, "Aggregated", None, use_response_times_cache=self.use_response_times_cache) self.history = [] @@ -253,7 +249,7 @@ def log_error(self, method: str, name: str, error: Exception | str | None) -> No self.errors[key] = entry entry.occurred() - def get(self, name: str, method: str) -> "StatsEntry": + def get(self, name: str, method: str) -> StatsEntry: """ Retrieve a StatsEntry instance by name and method """ @@ -278,12 +274,12 @@ def clear_all(self) -> None: self.errors = {} self.history = [] - def serialize_stats(self) -> List["StatsEntryDict"]: + def serialize_stats(self) -> list[StatsEntryDict]: return [ e.get_stripped_report() for e in self.entries.values() if not (e.num_requests == 0 and e.num_failures == 0) ] - def serialize_errors(self) -> Dict[str, "StatsErrorDict"]: + def serialize_errors(self) -> dict[str, StatsErrorDict]: return {k: e.serialize() for k, e in self.errors.items()} @@ -292,7 +288,7 @@ class StatsEntry: Represents a single stats entry (name and method) """ - def __init__(self, stats: Optional[RequestStats], name: str, method: str, use_response_times_cache: bool = False): + def __init__(self, stats: RequestStats | None, name: str, method: str, use_response_times_cache: bool = False): self.stats = stats self.name = name """ Name (URL) of this stats entry """ @@ -313,15 +309,15 @@ def __init__(self, stats: Optional[RequestStats], name: str, method: str, use_re """ Number of failed request """ self.total_response_time: int = 0 """ Total sum of the response times """ - self.min_response_time: Optional[int] = None + self.min_response_time: int | None = None """ Minimum response time """ self.max_response_time: int = 0 """ Maximum response time """ - self.num_reqs_per_sec: Dict[int, int] = {} + self.num_reqs_per_sec: dict[int, int] = {} """ A {second => request_count} dict that holds the number of requests made per second """ - self.num_fail_per_sec: Dict[int, int] = {} + self.num_fail_per_sec: dict[int, int] = {} """ A (second => failure_count) dict that hold the number of failures per second """ - self.response_times: Dict[int, int] = {} + self.response_times: dict[int, int] = {} """ A {response_time => count} dict that holds the response time distribution of all the requests. @@ -331,7 +327,7 @@ def __init__(self, stats: Optional[RequestStats], name: str, method: str, use_re This dict is used to calculate the median and percentile response times. """ - self.response_times_cache: Optional[OrderedDictType[int, CachedResponseTimes]] = None + self.response_times_cache: OrderedDictType[int, CachedResponseTimes] | None = None """ If use_response_times_cache is set to True, this will be a {timestamp => CachedResponseTimes()} OrderedDict that holds a copy of the response_times dict for each of the last 20 seconds. @@ -340,7 +336,7 @@ def __init__(self, stats: Optional[RequestStats], name: str, method: str, use_re """ The sum of the content length of all the responses for this entry """ self.start_time: float = 0.0 """ Time of the first request for this entry """ - self.last_request_timestamp: Optional[float] = None + self.last_request_timestamp: float | None = None """ Time of the last request for this entry """ self.reset() @@ -456,7 +452,7 @@ def current_rps(self) -> float: return 0 slice_start_time = max(int(self.stats.last_request_timestamp) - 12, int(self.stats.start_time or 0)) - reqs: List[int | float] = [ + reqs: list[int | float] = [ self.num_reqs_per_sec.get(t, 0) for t in range(slice_start_time, int(self.stats.last_request_timestamp) - 2) ] return avg(reqs) @@ -497,7 +493,7 @@ def avg_content_length(self): except ZeroDivisionError: return 0 - def extend(self, other: "StatsEntry") -> None: + def extend(self, other: StatsEntry) -> None: """ Extend the data from the current StatsEntry with the stats from another StatsEntry instance. @@ -545,7 +541,7 @@ def serialize(self) -> StatsEntryDict: return cast(StatsEntryDict, {key: getattr(self, key, None) for key in StatsEntryDict.__annotations__.keys()}) @classmethod - def unserialize(cls, data: StatsEntryDict) -> "StatsEntry": + def unserialize(cls, data: StatsEntryDict) -> StatsEntry: """Return the unserialzed version of the specified dict""" obj = cls(None, data["name"], data["method"]) valid_keys = StatsEntryDict.__annotations__.keys() @@ -608,7 +604,7 @@ def get_response_time_percentile(self, percent: float) -> int: """ return calculate_response_time_percentile(self.response_times, self.num_requests, percent) - def get_current_response_time_percentile(self, percent: float) -> Optional[int]: + def get_current_response_time_percentile(self, percent: float) -> int | None: """ Calculate the *current* response time for a certain percentile. We use a sliding window of (approximately) the last 10 seconds (specified by CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW) @@ -626,13 +622,13 @@ def get_current_response_time_percentile(self, percent: float) -> Optional[int]: # when trying to fetch the cached response_times. We construct this list in such a way # that it's ordered by preference by starting to add t-10, then t-11, t-9, t-12, t-8, # and so on - acceptable_timestamps: List[int] = [] + acceptable_timestamps: list[int] = [] acceptable_timestamps.append(t - CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW) for i in range(1, 9): acceptable_timestamps.append(t - CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW - i) acceptable_timestamps.append(t - CURRENT_RESPONSE_TIME_PERCENTILE_WINDOW + i) - cached: Optional[CachedResponseTimes] = None + cached: CachedResponseTimes | None = None if self.response_times_cache is not None: for ts in acceptable_timestamps: if ts in self.response_times_cache: @@ -748,7 +744,7 @@ def to_name(self) -> str: return f"{self.method} {self.name}: {unwrapped_error}" def serialize(self) -> StatsErrorDict: - def _getattr(obj: "StatsError", key: str, default: Optional[Any]) -> Optional[Any]: + def _getattr(obj: StatsError, key: str, default: Any | None) -> Any | None: value = getattr(obj, key, default) if key in ["error"]: @@ -759,7 +755,7 @@ def _getattr(obj: "StatsError", key: str, default: Optional[Any]) -> Optional[An return cast(StatsErrorDict, {key: _getattr(self, key, None) for key in StatsErrorDict.__annotations__.keys()}) @classmethod - def unserialize(cls, data: StatsErrorDict) -> "StatsError": + def unserialize(cls, data: StatsErrorDict) -> StatsError: return cls(data["method"], data["name"], data["error"], data["occurrences"]) def to_dict(self, escape_string_values=False): @@ -771,11 +767,11 @@ def to_dict(self, escape_string_values=False): } -def avg(values: List[float | int]) -> float: +def avg(values: list[float | int]) -> float: return sum(values, 0.0) / max(len(values), 1) -def median_from_dict(total: int, count: Dict[int, int]) -> int: +def median_from_dict(total: int, count: dict[int, int]) -> int: """ total is the number of requests made count is a dict {response_time: count} @@ -790,13 +786,13 @@ def median_from_dict(total: int, count: Dict[int, int]) -> int: def setup_distributed_stats_event_listeners(events: Events, stats: RequestStats) -> None: - def on_report_to_master(client_id: str, data: Dict[str, Any]) -> None: + def on_report_to_master(client_id: str, data: dict[str, Any]) -> None: data["stats"] = stats.serialize_stats() data["stats_total"] = stats.total.get_stripped_report() data["errors"] = stats.serialize_errors() stats.errors = {} - def on_worker_report(client_id: str, data: Dict[str, Any]) -> None: + def on_worker_report(client_id: str, data: dict[str, Any]) -> None: for stats_data in data["stats"]: entry = StatsEntry.unserialize(stats_data) request_key = (entry.name, entry.method) @@ -826,7 +822,7 @@ def print_stats_json(stats: RequestStats) -> None: print(json.dumps(stats.serialize_stats(), indent=4)) -def get_stats_summary(stats: RequestStats, current=True) -> List[str]: +def get_stats_summary(stats: RequestStats, current=True) -> list[str]: """ stats summary will be returned as list of string """ @@ -852,7 +848,7 @@ def print_percentile_stats(stats: RequestStats) -> None: console_logger.info("") -def get_percentile_stats_summary(stats: RequestStats) -> List[str]: +def get_percentile_stats_summary(stats: RequestStats) -> list[str]: """ Percentile stats summary will be returned as list of string """ @@ -886,7 +882,7 @@ def print_error_report(stats: RequestStats) -> None: console_logger.info(line) -def get_error_report_summary(stats) -> List[str]: +def get_error_report_summary(stats) -> list[str]: summary = ["Error report"] summary.append("%-18s %-100s" % ("# occurrences", "Error")) separator = f'{"-" * 18}|{"-" * ((80 + STATS_NAME_WIDTH) - 19)}' @@ -907,11 +903,11 @@ def stats_printer_func() -> None: return stats_printer_func -def sort_stats(stats: Dict[Any, S]) -> List[S]: +def sort_stats(stats: dict[Any, S]) -> list[S]: return [stats[key] for key in sorted(stats.keys())] -def stats_history(runner: "Runner") -> None: +def stats_history(runner: Runner) -> None: """Save current stats info to history for charts of report.""" while True: stats = runner.stats @@ -943,7 +939,7 @@ def stats_history(runner: "Runner") -> None: class StatsCSV: """Write statistics to csv_writer stream.""" - def __init__(self, environment: "Environment", percentiles_to_report: List[float]) -> None: + def __init__(self, environment: Environment, percentiles_to_report: list[float]) -> None: self.environment = environment self.percentiles_to_report = percentiles_to_report @@ -977,7 +973,7 @@ def __init__(self, environment: "Environment", percentiles_to_report: List[float "Nodes", ] - def _percentile_fields(self, stats_entry: StatsEntry, use_current: bool = False) -> List[str] | List[int]: + def _percentile_fields(self, stats_entry: StatsEntry, use_current: bool = False) -> list[str] | list[int]: if not stats_entry.num_requests: return self.percentiles_na elif use_current: @@ -1045,8 +1041,8 @@ class StatsCSVFileWriter(StatsCSV): def __init__( self, - environment: "Environment", - percentiles_to_report: List[float], + environment: Environment, + percentiles_to_report: list[float], base_filepath: str, full_history: bool = False, ): @@ -1142,7 +1138,7 @@ def _stats_history_data_rows(self, csv_writer: CSVWriter, now: float) -> None: stats = self.environment.stats timestamp = int(now) - stats_entries: List[StatsEntry] = [] + stats_entries: list[StatsEntry] = [] if self.full_history: stats_entries = sort_stats(stats.entries) diff --git a/locust/test/mock_logging.py b/locust/test/mock_logging.py index 618f9a8505..756c479c51 100644 --- a/locust/test/mock_logging.py +++ b/locust/test/mock_logging.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging from typing import List, Union, Dict @@ -7,11 +9,11 @@ class MockedLoggingHandler(logging.Handler): - debug: List[LogMessage] = [] - warning: List[LogMessage] = [] - info: List[LogMessage] = [] - error: List[LogMessage] = [] - critical: List[LogMessage] = [] + debug: list[LogMessage] = [] + warning: list[LogMessage] = [] + info: list[LogMessage] = [] + error: list[LogMessage] = [] + critical: list[LogMessage] = [] def emit(self, record): if record.exc_info: diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 8525c00224..abd5a80329 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import time import unittest from operator import attrgetter -from typing import Dict, List, Tuple, Type from locust import User from locust.dispatch import UsersDispatcher @@ -3372,7 +3373,7 @@ class User3(User): class TestRampUpUsersFromZeroWithFixed(unittest.TestCase): class RampUpCase: - def __init__(self, fixed_counts: Tuple[int], weights: Tuple[int], target_user_count: int): + def __init__(self, fixed_counts: tuple[int], weights: tuple[int], target_user_count: int): self.fixed_counts = fixed_counts self.weights = weights self.target_user_count = target_user_count @@ -3382,7 +3383,7 @@ def __str__(self): self.fixed_counts, self.weights, self.target_user_count ) - def case_handler(self, cases: List[RampUpCase], expected: List[Dict[str, int]], user_classes: List[Type[User]]): + def case_handler(self, cases: list[RampUpCase], expected: list[dict[str, int]], user_classes: list[type[User]]): self.assertEqual(len(cases), len(expected)) for case_num in range(len(cases)): @@ -4092,14 +4093,14 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 6) -def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]: +def _aggregate_dispatched_users(d: dict[str, dict[str, int]]) -> dict[str, int]: user_classes = list(next(iter(d.values())).keys()) return {u: sum(d[u] for d in d.values()) for u in user_classes} -def _user_count(d: Dict[str, Dict[str, int]]) -> int: +def _user_count(d: dict[str, dict[str, int]]) -> int: return sum(map(sum, map(dict.values, d.values()))) # type: ignore -def _user_count_on_worker(d: Dict[str, Dict[str, int]], worker_node_id: str) -> int: +def _user_count_on_worker(d: dict[str, dict[str, int]], worker_node_id: str) -> int: return sum(d[worker_node_id].values()) diff --git a/locust/test/test_web.py b/locust/test/test_web.py index 0a434ffb0a..c5ca786b13 100644 --- a/locust/test/test_web.py +++ b/locust/test/test_web.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import copy import csv import json @@ -8,7 +10,6 @@ import logging from io import StringIO from tempfile import NamedTemporaryFile, TemporaryDirectory -from typing import List, Optional, Tuple, Type import gevent import requests diff --git a/locust/user/inspectuser.py b/locust/user/inspectuser.py index e1b817a848..4c06107e19 100644 --- a/locust/user/inspectuser.py +++ b/locust/user/inspectuser.py @@ -1,7 +1,8 @@ +from __future__ import annotations + from collections import defaultdict import inspect from json import dumps -from typing import List, Type, Dict from .task import TaskSet from .users import User @@ -49,11 +50,11 @@ def _print_task_ratio(x, level=0): _print_task_ratio(v["tasks"], level + 1) -def get_ratio(user_classes: List[Type[User]], user_spawned: Dict[str, int], total: bool) -> Dict[str, Dict[str, float]]: +def get_ratio(user_classes: list[type[User]], user_spawned: dict[str, int], total: bool) -> dict[str, dict[str, float]]: user_count = sum(user_spawned.values()) or 1 - ratio_percent: Dict[Type[User], float] = {u: user_spawned.get(u.__name__, 0) / user_count for u in user_classes} + ratio_percent: dict[type[User], float] = {u: user_spawned.get(u.__name__, 0) / user_count for u in user_classes} - task_dict: Dict[str, Dict[str, float]] = {} + task_dict: dict[str, dict[str, float]] = {} for u, r in ratio_percent.items(): d = {"ratio": r} d["tasks"] = _get_task_ratio(u.tasks, total, r) diff --git a/locust/user/task.py b/locust/user/task.py index 56d68848fe..d7d55aa2c3 100644 --- a/locust/user/task.py +++ b/locust/user/task.py @@ -6,13 +6,9 @@ from typing import ( TYPE_CHECKING, Callable, - List, TypeVar, - Optional, Type, overload, - Dict, - Set, Protocol, final, runtime_checkable, @@ -34,7 +30,7 @@ @runtime_checkable class TaskHolder(Protocol[TaskT]): - tasks: List[TaskT] + tasks: list[TaskT] @overload @@ -170,10 +166,10 @@ def get_tasks_from_base_classes(bases, class_dict): def filter_tasks_by_tags( - task_holder: Type[TaskHolder], - tags: Optional[Set[str]] = None, - exclude_tags: Optional[Set[str]] = None, - checked: Optional[Dict[TaskT, bool]] = None, + task_holder: type[TaskHolder], + tags: set[str] | None = None, + exclude_tags: set[str] | None = None, + checked: dict[TaskT, bool] | None = None, ): """ Function used by Environment to recursively remove any tasks/TaskSets from a TaskSet/User that @@ -238,7 +234,7 @@ class TaskSet(metaclass=TaskSetMeta): will then continue in the first TaskSet). """ - tasks: List[TaskSet | Callable] = [] + tasks: list[TaskSet | Callable] = [] """ Collection of python callables and/or TaskSet classes that the User(s) will run. @@ -253,7 +249,7 @@ class ForumPage(TaskSet): tasks = {ThreadPage:15, write_post:1} """ - min_wait: Optional[float] = None + min_wait: float | None = None """ Deprecated: Use wait_time instead. Minimum waiting time between the execution of user tasks. Can be used to override @@ -261,7 +257,7 @@ class ForumPage(TaskSet): TaskSet. """ - max_wait: Optional[float] = None + max_wait: float | None = None """ Deprecated: Use wait_time instead. Maximum waiting time between the execution of user tasks. Can be used to override @@ -277,11 +273,11 @@ class ForumPage(TaskSet): if not set on the TaskSet. """ - _user: "User" - _parent: "User" + _user: User + _parent: User - def __init__(self, parent: "User") -> None: - self._task_queue: List[Callable] = [] + def __init__(self, parent: User) -> None: + self._task_queue: list[Callable] = [] self._time_start = time() if isinstance(parent, TaskSet): @@ -301,7 +297,7 @@ def __init__(self, parent: "User") -> None: self._cp_last_run = time() # used by constant_pacing wait_time @property - def user(self) -> "User": + def user(self) -> User: """:py:class:`User ` instance that this TaskSet was created by""" return self._user diff --git a/locust/user/users.py b/locust/user/users.py index ebc2d52dcc..0c23763c8d 100644 --- a/locust/user/users.py +++ b/locust/user/users.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Callable, Dict, List, Optional, final +from typing import Callable, final import time from gevent import GreenletExit, greenlet @@ -59,7 +59,7 @@ class by using the :py:func:`@task decorator ` on methods, or by se :py:class:`HttpUser ` class. """ - host: Optional[str] = None + host: str | None = None """Base hostname to swarm. i.e: http://127.0.0.1:1234""" min_wait = None @@ -89,7 +89,7 @@ class MyUser(User): Method that returns the time between the execution of locust tasks in milliseconds """ - tasks: List[TaskSet | Callable] = [] + tasks: list[TaskSet | Callable] = [] """ Collection of python callables and/or TaskSet classes that the Locust user(s) will run. @@ -216,7 +216,7 @@ def group(self): def greenlet(self): return self._greenlet - def context(self) -> Dict: + def context(self) -> dict: """ Adds the returned value (a dict) to the context for :ref:`request event `. Override this in your User class to customize the context. @@ -244,7 +244,7 @@ class by using the :py:func:`@task decorator ` on methods, or by se abstract = True """If abstract is True, the class is meant to be subclassed, and users will not choose this locust during a test""" - pool_manager: Optional[PoolManager] = None + pool_manager: PoolManager | None = None """Connection pool manager to use. If not given, a new manager is created per single user.""" def __init__(self, *args, **kwargs): diff --git a/locust/util/load_locustfile.py b/locust/util/load_locustfile.py index f719786d2a..fe9036fbc1 100644 --- a/locust/util/load_locustfile.py +++ b/locust/util/load_locustfile.py @@ -1,8 +1,9 @@ +from __future__ import annotations + import importlib import inspect import os import sys -from typing import Dict, List, Optional, Tuple from ..shape import LoadTestShape from ..user import User @@ -21,7 +22,7 @@ def is_shape_class(item): return bool(inspect.isclass(item) and issubclass(item, LoadTestShape) and not getattr(item, "abstract", True)) -def load_locustfile(path) -> Tuple[Optional[str], Dict[str, User], List[LoadTestShape]]: +def load_locustfile(path) -> tuple[str | None, dict[str, User], list[LoadTestShape]]: """ Import given locustfile path and return (docstring, callables). diff --git a/locust/web.py b/locust/web.py index e2a779372b..de2f93dbd0 100644 --- a/locust/web.py +++ b/locust/web.py @@ -9,7 +9,7 @@ from json import dumps from itertools import chain from time import time -from typing import TYPE_CHECKING, Optional, Any, Dict, List +from typing import TYPE_CHECKING, Any import gevent from flask import ( @@ -55,7 +55,7 @@ class WebUI: in :attr:`environment.stats ` """ - app: Optional[Flask] = None + app: Flask | None = None """ Reference to the :class:`flask.Flask` app. Can be used to add additional web routes and customize the Flask app in other various ways. Example:: @@ -67,30 +67,30 @@ def my_custom_route(): return "your IP is: %s" % request.remote_addr """ - greenlet: Optional[gevent.Greenlet] = None + greenlet: gevent.Greenlet | None = None """ Greenlet of the running web server """ - server: Optional[pywsgi.WSGIServer] = None + server: pywsgi.WSGIServer | None = None """Reference to the :class:`pyqsgi.WSGIServer` instance""" - template_args: Dict[str, Any] + template_args: dict[str, Any] """Arguments used to render index.html for the web UI. Must be used with custom templates extending index.html.""" - auth_args: Dict[str, Any] + auth_args: dict[str, Any] """Arguments used to render auth.html for the web UI auth page. Must be used when configuring auth""" def __init__( self, - environment: "Environment", + environment: Environment, host: str, port: int, web_login: bool = False, - tls_cert: Optional[str] = None, - tls_key: Optional[str] = None, - stats_csv_writer: Optional[StatsCSV] = None, + tls_cert: str | None = None, + tls_key: str | None = None, + stats_csv_writer: StatsCSV | None = None, delayed_start=False, userclass_picker_is_active=False, modern_ui=False, @@ -126,8 +126,8 @@ def __init__( root_path = os.path.dirname(os.path.abspath(__file__)) app.root_path = root_path self.webui_build_path = os.path.join(root_path, "webui", "dist") - self.greenlet: Optional[gevent.Greenlet] = None - self._swarm_greenlet: Optional[gevent.Greenlet] = None + self.greenlet: gevent.Greenlet | None = None + self._swarm_greenlet: gevent.Greenlet | None = None self.template_args = {} self.auth_args = {} @@ -370,8 +370,8 @@ def failures_stats_csv() -> Response: @self.auth_required_if_enabled @memoize(timeout=DEFAULT_CACHE_TIME, dynamic_timeout=True) def request_stats() -> Response: - stats: List[Dict[str, Any]] = [] - errors: List[StatsErrorDict] = [] + stats: list[dict[str, Any]] = [] + errors: list[StatsErrorDict] = [] if environment.runner is None: report = { @@ -481,9 +481,9 @@ def exceptions_csv() -> Response: @app.route("/tasks") @self.auth_required_if_enabled - def tasks() -> Dict[str, Dict[str, Dict[str, float]]]: + def tasks() -> dict[str, dict[str, dict[str, float]]]: runner = self.environment.runner - user_spawned: Dict[str, int] + user_spawned: dict[str, int] if runner is None: user_spawned = {} else: diff --git a/pyproject.toml b/pyproject.toml index d7fa311787..814ccb57c4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,10 +83,7 @@ locust = "locust.main:main" [tool.mypy] # missing type stubs ignore_missing_imports = true - -[[tool.mypy.overrides]] -module = ["requests.*"] -ignore_missing_imports = true +python_version = "3.8" [[tool.mypy.overrides]] module = ["locust.dispatch"] diff --git a/tox.ini b/tox.ini index 5316c450ad..b2de982a5d 100644 --- a/tox.ini +++ b/tox.ini @@ -49,5 +49,7 @@ deps = flake8==6.0.0 commands = flake8 . --count --show-source --statistics [testenv:mypy] -deps = mypy==1.7.1 +deps = + mypy==1.8.0 + types-requests commands = mypy locust/