Skip to content

Commit

Permalink
Merge pull request #1306 from locustio/locust-start-stop-refactor
Browse files Browse the repository at this point in the history
Decouple Runner and Locust code by introducing Locust.start and Locust.stop methods
  • Loading branch information
heyman authored Apr 3, 2020
2 parents 479aef9 + 07d1ee6 commit 2a0a6ef
Show file tree
Hide file tree
Showing 4 changed files with 296 additions and 56 deletions.
114 changes: 89 additions & 25 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from .clients import HttpSession
from .exception import (InterruptTaskSet, LocustError, RescheduleTask,
RescheduleTaskImmediately, StopLocust, MissingWaitTimeError)
from .runners import STATE_CLEANUP, LOCUST_STATE_RUNNING, LOCUST_STATE_STOPPING, LOCUST_STATE_WAITING
from .util import deprecation


logger = logging.getLogger(__name__)


LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]


def task(weight=1):
"""
Used as a convenience decorator to be able to declare tasks for a Locust or a TaskSet
Expand Down Expand Up @@ -253,13 +255,24 @@ def _set_setup_flag(cls):
def _set_teardown_flag(cls):
cls._teardown_is_set = True

def on_start(self):
"""
Hook for end-user scripts for running code when a Locust user starts running
"""
pass

def on_stop(self):
"""
Hook for end-user scripts for running code when a Locust user stops running
"""
pass

def run(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs

try:
if hasattr(self, "on_start"):
self.on_start()
self.on_start()
except InterruptTaskSet as e:
if e.reschedule:
raise RescheduleTaskImmediately(e.reschedule).with_traceback(sys.exc_info()[2])
Expand All @@ -272,27 +285,22 @@ def run(self, *args, **kwargs):
self.schedule_task(self.get_next_task())

try:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
self._check_stop_condition()
self.execute_next_task()
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
except RescheduleTaskImmediately:
if self.locust._state == LOCUST_STATE_STOPPING:
raise GreenletExit()
pass
except RescheduleTask:
self.wait()
else:
self.wait()
except InterruptTaskSet as e:
self.on_stop()
if e.reschedule:
raise RescheduleTaskImmediately(e.reschedule) from e
else:
raise RescheduleTask(e.reschedule) from e
except StopLocust:
raise
except GreenletExit:
except (StopLocust, GreenletExit):
self.on_stop()
raise
except Exception as e:
self.locust.environment.events.locust_error.fire(locust_instance=self, exception=e, tb=sys.exc_info()[2])
Expand Down Expand Up @@ -361,13 +369,19 @@ class Tasks(TaskSet):
))

def wait(self):
self._check_stop_condition()
self.locust._state = LOCUST_STATE_WAITING
self._sleep(self.wait_time())
self._check_stop_condition()
self.locust._state = LOCUST_STATE_RUNNING

def _sleep(self, seconds):
gevent.sleep(seconds)

def _check_stop_condition(self):
if self.locust._state == LOCUST_STATE_STOPPING:
raise StopLocust()

def interrupt(self, reschedule=True):
"""
Interrupt the TaskSet and hand over execution control back to the parent TaskSet.
Expand Down Expand Up @@ -527,7 +541,8 @@ class ForumPage(TaskSet):
_setup_has_run = False # Internal state to see if we have already run
_teardown_is_set = False # Internal state to see if we have already run
_lock = gevent.lock.Semaphore() # Lock to make sure setup is only run once
_state = False
_state = None
_greenlet = None

def __init__(self, environment):
super(Locust, self).__init__()
Expand Down Expand Up @@ -556,21 +571,70 @@ def _set_setup_flag(cls):
def _set_teardown_flag(cls):
cls._teardown_is_set = True

def run(self, runner=None):
def on_start(self):
"""
Hook for end-user scripts for running code when a Locust user starts running
"""
pass

def on_stop(self):
"""
Hook for end-user scripts for running code when a Locust user stops running
"""
pass

def run(self):
self._state = LOCUST_STATE_RUNNING
task_set_instance = DefaultTaskSet(self)
try:
if hasattr(self, "on_start"):
self.on_start()
# run the task_set on_start method, if it has one
self.on_start()

task_set_instance.run()
except StopLocust:
pass
except GreenletExit as e:
if runner:
runner.state = STATE_CLEANUP
# Run the task_set on_stop method, if it has one
if hasattr(task_set_instance, "on_stop"):
task_set_instance.on_stop()
raise # Maybe something relies on this except being raised?
except (GreenletExit, StopLocust) as e:
# run the on_stop method, if it has one
self.on_stop()

def start(self, gevent_group):
"""
Start a greenlet that runs this locust instance.
*Arguments*:
* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.
Returns the spawned greenlet.
"""
def run_locust(user):
"""
Main function for Locust user greenlet. It's important that this function takes the locust
instance as argument, since we use greenlet_instance.args[0] to retrieve a reference to the
locust instance.
"""
user.run()
self._greenlet = gevent_group.spawn(run_locust, self)
return self._greenlet

def stop(self, gevent_group, force=False):
"""
Stop the locust user greenlet that exists in the gevent_group.
This method is not meant to be called from within of the Locust's greenlet.
*Arguments*:
* gevent_group: gevent.pool.Group instance where the greenlet will be spawned.
* force: If False (the default) the stopping is done gracefully by setting the state to LOCUST_STATE_STOPPING
which will make the Locust instance stop once any currently running task is complete and on_stop
methods are called. If force is True the greenlet will be killed immediately.
Returns True if the greenlet was killed immediately, otherwise False
"""
if force or self._state == LOCUST_STATE_WAITING:
gevent_group.killone(self._greenlet)
return True
elif self._state == LOCUST_STATE_RUNNING:
self._state = LOCUST_STATE_STOPPING
return False


class HttpLocust(Locust):
Expand Down
50 changes: 20 additions & 30 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import gevent
import psutil
from gevent import GreenletExit
from gevent.pool import Group

from .rpc import Message, rpc
Expand All @@ -24,7 +23,6 @@
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

class LocustRunner(object):
def __init__(self, environment, locust_classes):
Expand Down Expand Up @@ -135,15 +133,10 @@ def hatch():
self.environment.events.hatch_complete.fire(user_count=len(self.locusts))
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust.__name__] += 1
new_locust = locust(self.environment)
def start_locust(_):
try:
new_locust.run(runner=self)
except GreenletExit:
pass
self.locusts.spawn(start_locust, new_locust)
locust_class = bucket.pop(random.randint(0, len(bucket)-1))
occurrence_count[locust_class.__name__] += 1
new_locust = locust_class(self.environment)
new_locust.start(self.locusts)
if len(self.locusts) % 10 == 0:
logger.debug("%i locusts hatched" % len(self.locusts))
if bucket:
Expand All @@ -161,36 +154,32 @@ def kill_locusts(self, kill_count):
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
logger.info("Killing %i locusts" % kill_count)
dying = []
to_kill = []
for g in self.locusts:
for l in bucket:
if l == type(g.args[0]):
dying.append(g)
user = g.args[0]
if l == type(user):
to_kill.append(user)
bucket.remove(l)
break
self.kill_locust_greenlets(dying)
self.kill_locust_instances(to_kill)
self.environment.events.hatch_complete.fire(user_count=self.user_count)

def kill_locust_greenlets(self, greenlets):
"""
Kill running locust greenlets. If environment.stop_timeout is set, we try to stop the
Locust users gracefully
"""

def kill_locust_instances(self, users):
if self.environment.stop_timeout:
dying = Group()
for g in greenlets:
locust = g.args[0]
if locust._state == LOCUST_STATE_WAITING:
self.locusts.killone(g)
else:
locust._state = LOCUST_STATE_STOPPING
dying.add(g)
for user in users:
if not user.stop(self.locusts, force=False):
# Locust.stop() returns False if the greenlet was not killed, so we'll need
# to add it's greenlet to our dying Group so we can wait for it to finish it's task
dying.add(user._greenlet)
if not dying.join(timeout=self.environment.stop_timeout):
logger.info("Not all locusts finished their tasks & terminated in %s seconds. Killing them..." % self.environment.stop_timeout)
dying.kill(block=True)
else:
for g in greenlets:
self.locusts.killone(g)
for user in users:
user.stop(self.locusts, force=True)

def monitor_cpu(self):
process = psutil.Process()
Expand Down Expand Up @@ -252,10 +241,11 @@ def stepload_worker(self, hatch_rate, step_clients_growth, step_duration):
gevent.sleep(step_duration)

def stop(self):
self.state = STATE_CLEANUP
# if we are currently hatching locusts we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
self.hatching_greenlet.kill(block=True)
self.kill_locust_greenlets([g for g in self.locusts])
self.kill_locust_instances([g.args[0] for g in self.locusts])
self.state = STATE_STOPPED
self.cpu_log_warning()
self.environment.events.locust_stop_hatching.fire()
Expand Down
Loading

0 comments on commit 2a0a6ef

Please sign in to comment.