Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sharing connection pools between users #2059

Merged
merged 4 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/increase-performance.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ FastHttpUser class
--------------------

.. autoclass:: locust.contrib.fasthttp.FastHttpUser
:members: network_timeout, connection_timeout, max_redirects, max_retries, insecure
:members: network_timeout, connection_timeout, max_redirects, max_retries, insecure, concurrency, client_pool


FastHttpSession class
Expand Down
22 changes: 22 additions & 0 deletions docs/writing-a-locustfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,28 @@ requests.Session's trust_env attribute to ``False``. If you don't want this you
``locust_instance.client.trust_env`` to ``True``. For further details, refer to the
`documentation of requests <https://requests.readthedocs.io/en/master/api/#requests.Session.trust_env>`_.

Connection pooling
------------------

As every :py:class:`HttpUser <locust.HttpUser>` creates new :py:class:`HttpSession <locust.clients.HttpSession>`,
every user instance has it's own connection pools. This is similar to how real users would interact with a web server.

However, if you want to share connections among all users, you can use a single pool manager. To do this, set
:py:attr:`pool_manager <locust.HttpUser.pool_manager>` class attribute to an instance of :py:class:`urllib3.PoolManager`.

.. code-block:: python

from locust import HttpUser
from urllib3 import PoolManager

class MyUser(HttpUser):

# All users will be limited to 10 concurrent connections at most.
pool_manager = PoolManager(maxsize=10, block=True)

For more configuration options, refer to the
`urllib3 documentation <https://urllib3.readthedocs.io/en/stable/reference/urllib3.poolmanager.html>`_.

TaskSets
================================
TaskSets is a way to structure tests of hierarchical web sites/systems. You can :ref:`read more about it here <tasksets>`
Expand Down
27 changes: 20 additions & 7 deletions locust/clients.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import re
import time
from contextlib import contextmanager
from typing import Generator, Optional, Union
from urllib.parse import urlparse, urlunparse

import requests
from requests import Request, Response
from requests.adapters import HTTPAdapter
from requests.auth import HTTPBasicAuth
from requests.exceptions import InvalidSchema, InvalidURL, MissingSchema, RequestException
from contextlib import contextmanager

from urllib.parse import urlparse, urlunparse
from urllib3 import PoolManager

from .exception import CatchResponseError, LocustError, ResponseError

from typing import Union, Optional, Generator

absolute_http_url_regexp = re.compile(r"^https?://", re.I)


Expand Down Expand Up @@ -47,7 +47,7 @@ class HttpSession(requests.Session):
and then mark it as successful even if the response code was not (i.e 500 or 404).
"""

def __init__(self, base_url, request_event, user, *args, **kwargs):
def __init__(self, base_url, request_event, user, pool_manager: Optional[PoolManager] = None, *args, **kwargs):
super().__init__(*args, **kwargs)

self.base_url = base_url
Expand All @@ -56,7 +56,7 @@ def __init__(self, base_url, request_event, user, *args, **kwargs):

# User can group name, or use the group context manager to gather performance statistics under a specific name
# This is an alternative to passing in the "name" parameter to the requests function
self.request_name = None
self.request_name: Optional[str] = None

# Check for basic authentication
parsed_url = urlparse(self.base_url)
Expand All @@ -72,6 +72,9 @@ def __init__(self, base_url, request_event, user, *args, **kwargs):
# configure requests to use basic auth
self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)

self.mount("https://", LocustHttpAdapter(pool_manager=pool_manager))
self.mount("http://", LocustHttpAdapter(pool_manager=pool_manager))

def _build_url(self, path):
"""prepend url with hostname unless it's already an absolute URL"""
if absolute_http_url_regexp.match(path):
Expand Down Expand Up @@ -299,6 +302,16 @@ def failure(self, exc):
self._manual_result = exc


class LocustHttpAdapter(HTTPAdapter):
def __init__(self, pool_manager: Optional[PoolManager], *args, **kwargs):
self.poolmanager = pool_manager
super().__init__(*args, **kwargs)

def init_poolmanager(self, *args, **kwargs):
if self.poolmanager is None:
super().init_poolmanager(*args, **kwargs)


# Monkey patch Response class to give some guidance
def _success(self):
raise LocustError(
Expand Down
24 changes: 21 additions & 3 deletions locust/contrib/fasthttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import gevent
from gevent.timeout import Timeout
from geventhttpclient._parser import HTTPParseError
from geventhttpclient.client import HTTPClientPool
from geventhttpclient.useragent import UserAgent, CompatRequest, CompatResponse, ConnectionError
from geventhttpclient.response import HTTPConnectionClosed

Expand Down Expand Up @@ -65,7 +66,15 @@ def insecure_ssl_context_factory():
class FastHttpSession:
auth_header = None

def __init__(self, environment: Environment, base_url: str, user: "FastHttpUser", insecure=True, **kwargs):
def __init__(
self,
environment: Environment,
base_url: str,
user: "FastHttpUser",
insecure=True,
client_pool: Optional[HTTPClientPool] = None,
**kwargs,
):
self.environment = environment
self.base_url = base_url
self.cookiejar = CookieJar()
Expand All @@ -78,6 +87,7 @@ def __init__(self, environment: Environment, base_url: str, user: "FastHttpUser"
cookiejar=self.cookiejar,
ssl_context_factory=ssl_context_factory,
insecure=insecure,
client_pool=client_pool,
**kwargs,
)

Expand Down Expand Up @@ -292,7 +302,11 @@ class by using the :py:func:`@task decorator <locust.task>` on the methods, or b
"""Parameter passed to FastHttpSession. Default True, meaning no SSL verification."""

concurrency: int = 1
"""Parameter passed to FastHttpSession. Describes number of concurrent requests allowed by the FastHttpSession. Default 1."""
"""Parameter passed to FastHttpSession. Describes number of concurrent requests allowed by the FastHttpSession. Default 1.
Note that setting this value has no effect when custom client_pool was given."""

client_pool: Optional[HTTPClientPool] = None
"""HTTP client pool to use. If not given, a new pool is created per single user."""

abstract = True
"""Dont register this as a User class that can be run by itself"""
Expand All @@ -316,6 +330,7 @@ def __init__(self, environment):
insecure=self.insecure,
concurrency=self.concurrency,
user=self,
client_pool=self.client_pool,
)
"""
Instance of HttpSession that is created upon instantiation of User.
Expand Down Expand Up @@ -401,9 +416,12 @@ class LocustUserAgent(UserAgent):
response_type = FastResponse
valid_response_codes = frozenset([200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 301, 302, 303, 307])

def __init__(self, **kwargs):
def __init__(self, client_pool: Optional[HTTPClientPool] = None, **kwargs):
super().__init__(**kwargs)

if client_pool is not None:
self.clientpool = client_pool

def _urlopen(self, request):
"""Override _urlopen() in order to make it use the response_type attribute"""
client = self.clientpool.get_client(request.url_split)
Expand Down
39 changes: 39 additions & 0 deletions locust/test/test_fasthttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import time
from tempfile import NamedTemporaryFile

from geventhttpclient.client import HTTPClientPool

from locust.user import task, TaskSet
from locust.contrib.fasthttp import FastHttpSession
from locust import FastHttpUser
Expand Down Expand Up @@ -451,6 +453,43 @@ class MyUnauthorizedUser(FastHttpUser):
self.assertEqual(401, locust.client.get("/basic_auth").status_code)
self.assertEqual(401, unauthorized.client.get("/basic_auth").status_code)

def test_shared_client_pool(self):
shared_client_pool = HTTPClientPool(concurrency=1)

class MyUserA(FastHttpUser):
host = "http://127.0.0.1:%i" % self.port
client_pool = shared_client_pool

class MyUserB(FastHttpUser):
host = "http://127.0.0.1:%i" % self.port
client_pool = shared_client_pool

user_a = MyUserA(self.environment)
user_b = MyUserB(self.environment)

user_a.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_a.client.get("/ultra_fast")

self.assertEqual(1, self.connections_count)
self.assertEqual(4, self.requests_count)

def test_client_pool_per_user_instance(self):
class MyUser(FastHttpUser):
host = "http://127.0.0.1:%i" % self.port

user_a = MyUser(self.environment)
user_b = MyUser(self.environment)

user_a.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_a.client.get("/ultra_fast")

self.assertEqual(2, self.connections_count)
self.assertEqual(4, self.requests_count)


class TestFastHttpCatchResponse(WebserverTestCase):
def setUp(self):
Expand Down
44 changes: 43 additions & 1 deletion locust/test/test_users.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import unittest

from locust import User
from urllib3 import PoolManager

from locust import User, HttpUser
from locust.test.testcases import WebserverTestCase


class TestUserClass(unittest.TestCase):
Expand All @@ -25,3 +28,42 @@ class MyFunctionScopedUser(User):

class MyModuleScopedUser(User):
pass


class TestHttpUserWithWebserver(WebserverTestCase):
def test_shared_pool_manager(self):
shared_pool_manager = PoolManager(maxsize=1, block=True)

class MyUserA(HttpUser):
host = "http://127.0.0.1:%i" % self.port
pool_manager = shared_pool_manager

class MyUserB(HttpUser):
host = "http://127.0.0.1:%i" % self.port
pool_manager = shared_pool_manager

user_a = MyUserA(self.environment)
user_b = MyUserB(self.environment)

user_a.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_a.client.get("/ultra_fast")

self.assertEqual(1, self.connections_count)
self.assertEqual(4, self.requests_count)

def test_pool_manager_per_user_instance(self):
class MyUser(HttpUser):
host = "http://127.0.0.1:%i" % self.port

user_a = MyUser(self.environment)
user_b = MyUser(self.environment)

user_a.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_b.client.get("/ultra_fast")
user_a.client.get("/ultra_fast")

self.assertEqual(2, self.connections_count)
self.assertEqual(4, self.requests_count)
20 changes: 19 additions & 1 deletion locust/test/testcases.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,25 @@ class WebserverTestCase(LocustTestCase):

def setUp(self):
super().setUp()
self._web_server = gevent.pywsgi.WSGIServer(("127.0.0.1", 0), app, log=None)

self.connections_count = 0
self.requests_count = 0

class CountingWSGIHandler(gevent.pywsgi.WSGIHandler):
def handle(this):
self.connections_count += 1
super().handle()

def log_request(this):
self.requests_count += 1
super().log_request()

self._web_server = gevent.pywsgi.WSGIServer(
("127.0.0.1", 0),
app,
log=None,
handler_class=CountingWSGIHandler,
)
gevent.spawn(lambda: self._web_server.serve_forever())
gevent.sleep(0.01)
self.port = self._web_server.server_port
Expand Down
20 changes: 14 additions & 6 deletions locust/user/users.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
from locust.user.wait_time import constant
from typing import Callable, Dict, List, Optional, Union
from typing_extensions import final

from gevent import GreenletExit, greenlet
from gevent.pool import Group
from typing_extensions import final
from urllib3 import PoolManager

from locust.clients import HttpSession
from locust.exception import LocustError, StopUser
from locust.user.wait_time import constant
from locust.util import deprecation

from .task import (
TaskSet,
DefaultTaskSet,
get_tasks_from_base_classes,
LOCUST_STATE_RUNNING,
LOCUST_STATE_WAITING,
LOCUST_STATE_STOPPING,
LOCUST_STATE_WAITING,
DefaultTaskSet,
TaskSet,
get_tasks_from_base_classes,
)


Expand Down Expand Up @@ -226,6 +230,9 @@ class by using the :py:func:`@task decorator <locust.task>` on methods, or by se
abstract = True
"""If abstract is True, the class is meant to be subclassed, and users will not choose this locust during a test"""

pool_manager: Optional[PoolManager] = None
"""Connection pool manager to use. If not given, a new manager is created per single user."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.host is None:
Expand All @@ -237,6 +244,7 @@ def __init__(self, *args, **kwargs):
base_url=self.host,
request_event=self.environment.events.request,
user=self,
pool_manager=self.pool_manager,
)
session.trust_env = False
self.client = session
Expand Down