Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn if CPU usage is too high (>90%) #1161 #1236

Merged
merged 6 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,8 @@ def sig_term_handler():
logger.info("Starting Locust %s" % version)
main_greenlet.join()
code = 0
if len(runners.locust_runner.errors) or len(runners.locust_runner.exceptions):
lr = runners.locust_runner
if len(lr.errors) or len(lr.exceptions) or lr.cpu_log_warning():
code = options.exit_code_on_error
shutdown(code=code)
except KeyboardInterrupt as e:
Expand Down
64 changes: 52 additions & 12 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from time import time

import gevent
import psutil
import six
from gevent import GreenletExit
from gevent.pool import Group
Expand All @@ -25,6 +26,7 @@

STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_CLEANUP, STATE_STOPPING, STATE_STOPPED, STATE_MISSING = ["ready", "hatching", "running", "cleanup", "stopping", "stopped", "missing"]
SLAVE_REPORT_INTERVAL = 3.0
CPU_MONITOR_INTERVAL = 5.0

LOCUST_STATE_RUNNING, LOCUST_STATE_WAITING, LOCUST_STATE_STOPPING = ["running", "waiting", "stopping"]

Expand All @@ -36,10 +38,13 @@ def __init__(self, locust_classes, options):
self.num_clients = options.num_clients
self.host = options.host
self.locusts = Group()
self.greenlet = self.locusts
self.greenlet = Group()
self.state = STATE_INIT
self.hatching_greenlet = None
self.stepload_greenlet = None
self.current_cpu_usage = 0
self.cpu_warning_emitted = False
self.greenlet.spawn(self.monitor_cpu)
self.exceptions = {}
self.stats = global_stats
self.step_load = options.step_load
Expand All @@ -51,6 +56,10 @@ def on_hatch_complete(user_count):
logger.info("Resetting stats\n")
self.stats.reset_all()
events.hatch_complete += on_hatch_complete

def __del__(self):
# don't leave any stray greenlets if runner is removed
self.greenlet.kill(block=False)

@property
def request_stats(self):
Expand All @@ -64,6 +73,13 @@ def errors(self):
def user_count(self):
return len(self.locusts)

def cpu_log_warning(self):
"""Called at the end of the test to repeat the warning & return the status"""
if self.cpu_warning_emitted:
logger.warning("Loadgen CPU usage was too high at some point during the test! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines")
return True
return False

def weight_locusts(self, amount):
"""
Distributes the amount of locusts for each WebLocust-class according to it's weight
Expand Down Expand Up @@ -180,11 +196,22 @@ def kill_locust_greenlets(self, greenlets):
else:
for g in greenlets:
self.locusts.killone(g)


def monitor_cpu(self):
process = psutil.Process()
while True:
self.current_cpu_usage = process.cpu_percent()
if self.current_cpu_usage > 90 and not self.cpu_warning_emitted:
logging.warning("Loadgen CPU usage above 90%! This may constrain your throughput and may even give inconsistent response time measurements! See https://docs.locust.io/en/stable/running-locust-distributed.html for how to distribute the load over multiple CPU cores or machines")
self.cpu_warning_emitted = True
gevent.sleep(CPU_MONITOR_INTERVAL)

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
self.stats.clear_all()
self.exceptions = {}
self.cpu_warning_emitted = False
self.slave_cpu_warning_emitted = False
events.locust_start_hatching.fire()

# Dynamically changing the locust count
Expand Down Expand Up @@ -224,7 +251,7 @@ def start_stepload(self, locust_count, hatch_rate, step_locust_count, step_durat
self.stepload_greenlet.kill()
logger.info("Start a new swarming in Step Load mode: total locust count of %d, hatch rate of %d, step locust count of %d, step duration of %d " % (locust_count, hatch_rate, step_locust_count, step_duration))
self.state = STATE_INIT
self.stepload_greenlet = gevent.spawn(self.stepload_worker)
self.stepload_greenlet = self.greenlet.spawn(self.stepload_worker)
self.stepload_greenlet.link_exception(callback=self.noop)

def stepload_worker(self):
Expand Down Expand Up @@ -274,8 +301,8 @@ def on_locust_error(locust_instance, exception, tb):
def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
if hatch_rate > 100:
logger.warning("Your selected hatch rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?")
self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))
self.greenlet = self.hatching_greenlet
self.hatching_greenlet = self.greenlet.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))


class DistributedLocustRunner(LocustRunner):
def __init__(self, locust_classes, options):
Expand All @@ -293,10 +320,13 @@ def __init__(self, id, state=STATE_INIT, heartbeat_liveness=3):
self.state = state
self.user_count = 0
self.heartbeat = heartbeat_liveness
self.cpu_usage = 0
self.cpu_warning_emitted = False

class MasterLocustRunner(DistributedLocustRunner):
def __init__(self, *args, **kwargs):
super(MasterLocustRunner, self).__init__(*args, **kwargs)
self.slave_cpu_warning_emitted = False

class SlaveNodesDict(dict):
def get_by_state(self, state):
Expand All @@ -320,7 +350,6 @@ def running(self):

self.clients = SlaveNodesDict()
self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
self.greenlet = Group()
self.greenlet.spawn(self.heartbeat_worker).link_exception(callback=self.noop)
self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

Expand All @@ -342,6 +371,13 @@ def on_quitting():
def user_count(self):
return sum([c.user_count for c in six.itervalues(self.clients)])

def cpu_log_warning(self):
warning_emitted = LocustRunner.cpu_log_warning(self)
if self.slave_cpu_warning_emitted:
logger.warning("CPU usage threshold was exceeded on slaves during the test!")
warning_emitted = True
return warning_emitted

def start_hatching(self, locust_count, hatch_rate):
num_slaves = len(self.clients.ready) + len(self.clients.running) + len(self.clients.hatching)
if not num_slaves:
Expand Down Expand Up @@ -423,8 +459,14 @@ def client_listener(self):
logger.info("Removing %s client from running clients" % (msg.node_id))
elif msg.type == "heartbeat":
if msg.node_id in self.clients:
self.clients[msg.node_id].heartbeat = self.heartbeat_liveness
self.clients[msg.node_id].state = msg.data['state']
c = self.clients[msg.node_id]
c.heartbeat = self.heartbeat_liveness
c.state = msg.data['state']
c.cpu_usage = msg.data['current_cpu_usage']
if not c.cpu_warning_emitted and c.cpu_usage > 90:
self.slave_cpu_warning_emitted = True # used to fail the test in the end
c.cpu_warning_emitted = True # used to suppress logging for this node
logger.warning("Slave %s exceeded cpu threshold (will only log this once per slave)" % (msg.node_id))
elif msg.type == "stats":
events.slave_report.fire(client_id=msg.node_id, data=msg.data)
elif msg.type == "hatching":
Expand Down Expand Up @@ -455,8 +497,6 @@ def __init__(self, *args, **kwargs):
self.client_id = socket.gethostname() + "_" + uuid4().hex

self.client = rpc.Client(self.master_host, self.master_port, self.client_id)
self.greenlet = Group()

self.greenlet.spawn(self.heartbeat).link_exception(callback=self.noop)
self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
self.client.send(Message("client_ready", None, self.client_id))
Expand Down Expand Up @@ -487,7 +527,7 @@ def on_locust_error(locust_instance, exception, tb):

def heartbeat(self):
while True:
self.client.send(Message('heartbeat', {'state': self.slave_state}, self.client_id))
self.client.send(Message('heartbeat', {'state': self.slave_state, 'current_cpu_usage': self.current_cpu_usage}, self.client_id))
gevent.sleep(self.heartbeat_interval)

def worker(self):
Expand All @@ -501,7 +541,7 @@ def worker(self):
#self.num_clients = job["num_clients"]
self.host = job["host"]
self.options.stop_timeout = job["stop_timeout"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
self.hatching_greenlet = self.greenlet.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg.type == "stop":
self.stop()
self.client.send(Message("client_stopped", None, self.client_id))
Expand Down
24 changes: 22 additions & 2 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from gevent.queue import Queue

import mock
from locust import events
from locust import events, runners
from locust.core import Locust, TaskSet, task
from locust.exception import LocustError
from locust.rpc import Message
Expand Down Expand Up @@ -84,6 +84,26 @@ def assert_locust_class_distribution(self, expected_distribution, classes):
),
)

def test_cpu_warning(self):
_monitor_interval = runners.CPU_MONITOR_INTERVAL
runners.CPU_MONITOR_INTERVAL = 2.0
try:
class CpuLocust(Locust):
wait_time = constant(0)
class task_set(TaskSet):
@task
def cpu_task(self):
for i in range(1000000):
_ = 3 / 2
runner = LocalLocustRunner([CpuLocust], mocked_options())
self.assertFalse(runner.cpu_warning_emitted)
runner.spawn_locusts(1, wait=False)
sleep(2.5)
runner.quit()
self.assertTrue(runner.cpu_warning_emitted)
finally:
runners.CPU_MONITOR_INTERVAL = _monitor_interval

def test_weight_locusts(self):
maxDiff = 2048
class BaseLocust(Locust):
Expand Down Expand Up @@ -368,7 +388,7 @@ class MyTestLocust(Locust):

try:
runner.start_hatching(0, 1, wait=True)
runner.greenlet.join()
runner.hatching_greenlet.join()
except gevent.Timeout:
self.fail("Got Timeout exception. A locust seems to have been spawned, even though 0 was specified.")
finally:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"pyzmq>=16.0.2",
"geventhttpclient-wheels==1.3.1.dev2",
"ConfigArgParse==0.15.1",
"psutil==5.6.7",
],
test_suite="locust.test",
tests_require=['mock'],
Expand Down