Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown of action runner. #5428

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/st2.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ enable = True
[actionrunner]
# Internal pool size for dispatcher used by regular actions.
actions_pool_size = 60
# How long to wait for process (in seconds) to exit after receiving shutdown signal.
exit_still_active_check = 300
# This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.
graceful_shutdown = True
# location of the logging.conf file
logging = /etc/st2/logging.actionrunner.conf
# List of pip options to be passed to "pip install" command when installing pack dependencies into pack virtual environment.
Expand All @@ -18,6 +22,8 @@ pip_opts = # comma separated list allowed here.
python_binary = /usr/bin/python
# Default log level to use for Python runner actions. Can be overriden on invocation basis using "log_level" runner parameter.
python_runner_log_level = DEBUG
# Time interval between subsequent queries to check running executions.
still_active_check_interval = 2
# True to store and stream action output (stdout and stderr) in real-time.
stream_output = True
# Buffer size to use for real time action output streaming. 0 means unbuffered 1 means line buffered, -1 means system default, which usually means fully buffered and any other positive value means use a buffer of (approximately) that size
Expand Down
30 changes: 30 additions & 0 deletions st2actions/st2actions/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@
import sys
import traceback

from tooz.coordination import GroupNotCreated
from oslo_config import cfg

from st2actions.container.base import RunnerContainer
from st2common import log as logging
from st2common.constants import action as action_constants
from st2common.exceptions.actionrunner import ActionRunnerException
from st2common.exceptions.db import StackStormDBObjectNotFoundError
from st2common.models.db.liveaction import LiveActionDB
from st2common.persistence.execution import ActionExecution
from st2common.services import coordination
from st2common.services import executions
from st2common.services import workflows as wf_svc
from st2common.transport.consumers import MessageHandler
from st2common.transport.consumers import ActionsQueueConsumer
from st2common.transport import utils as transport_utils
from st2common.util import action_db as action_utils
from st2common.util import concurrency
from st2common.util import system_info
from st2common.transport import queues

Expand Down Expand Up @@ -134,7 +139,32 @@ def process(self, liveaction):

def shutdown(self):
super(ActionExecutionDispatcher, self).shutdown()

if cfg.CONF.actionrunner.graceful_shutdown:

coordinator = coordination.get_coordinator()
member_ids = []
service = "actionrunner"
exit_timeout = cfg.CONF.actionrunner.exit_still_active_check
sleep_delay = cfg.CONF.actionrunner.still_active_check_interval
timeout = 0

while timeout < exit_timeout and self._running_liveactions:
try:
member_ids = list(
coordinator.get_members(service.encode("utf-8")).get()
)
except GroupNotCreated:
pass

# Check if there are other runners in service registry
if not member_ids:
break
timeout += sleep_delay
concurrency.sleep(sleep_delay)

# Abandon running executions if incomplete

while self._running_liveactions:
liveaction_id = self._running_liveactions.pop()
try:
Expand Down
201 changes: 201 additions & 0 deletions st2actions/tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from st2common.persistence.execution import ActionExecution
from st2common.persistence.liveaction import LiveAction
from st2common.services import executions
from st2common.services import coordination
from st2common.util import date as date_utils
from st2common.bootstrap import runnersregistrar as runners_registrar
from local_runner.local_shell_command_runner import LocalShellCommandRunner
Expand Down Expand Up @@ -116,6 +117,9 @@ def test_non_utf8_action_result_string(self):
)

def test_worker_shutdown(self):
cfg.CONF.set_override(
name="graceful_shutdown", override=False, group="actionrunner"
)
action_worker = actions_worker.get_worker()
temp_file = None

Expand Down Expand Up @@ -164,3 +168,200 @@ def test_worker_shutdown(self):
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()

@mock.patch.object(
coordination.NoOpDriver,
"get_members",
mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")),
)
def test_worker_graceful_shutdown_with_multiple_runners(self):
cfg.CONF.set_override(
name="graceful_shutdown", override=True, group="actionrunner"
)
action_worker = actions_worker.get_worker()
temp_file = None

# Create a temporary file that is deleted when the file is closed and then set up an
# action to wait for this file to be deleted. This allows this test to run the action
# over a separate thread, run the shutdown sequence on the main thread, and then let
# the local runner to exit gracefully and allow _run_action to finish execution.
with tempfile.NamedTemporaryFile() as fp:
temp_file = fp.name
self.assertIsNotNone(temp_file)
self.assertTrue(os.path.isfile(temp_file))

# Launch the action execution in a separate thread.
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
liveaction_db = self._get_liveaction_model(
WorkerTestCase.local_action_db, params
)
liveaction_db = LiveAction.add_or_update(liveaction_db)
executions.create_execution_object(liveaction_db)
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)

# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) > 0:
break

self.assertEqual(len(action_worker._running_liveactions), 1)

# Shutdown the worker to trigger the abandon process.
shutdown_thread = eventlet.spawn(action_worker.shutdown)

# Make sure the temporary file has been deleted.
self.assertFalse(os.path.isfile(temp_file))

# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) < 1:
break
liveaction_db = LiveAction.get_by_id(liveaction_db.id)

# Verify that _running_liveactions is empty and the liveaction is succeeded.
self.assertEqual(len(action_worker._running_liveactions), 0)
self.assertEqual(
liveaction_db.status,
action_constants.LIVEACTION_STATUS_SUCCEEDED,
str(liveaction_db),
)

# Wait for the local runner to complete. This will activate the finally block in
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()
shutdown_thread.kill()

def test_worker_graceful_shutdown_with_single_runner(self):
cfg.CONF.set_override(
name="graceful_shutdown", override=True, group="actionrunner"
)
action_worker = actions_worker.get_worker()
temp_file = None

# Create a temporary file that is deleted when the file is closed and then set up an
# action to wait for this file to be deleted. This allows this test to run the action
# over a separate thread, run the shutdown sequence on the main thread, and then let
# the local runner to exit gracefully and allow _run_action to finish execution.
with tempfile.NamedTemporaryFile() as fp:
temp_file = fp.name
self.assertIsNotNone(temp_file)
self.assertTrue(os.path.isfile(temp_file))

# Launch the action execution in a separate thread.
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
liveaction_db = self._get_liveaction_model(
WorkerTestCase.local_action_db, params
)
liveaction_db = LiveAction.add_or_update(liveaction_db)
executions.create_execution_object(liveaction_db)
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)

# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) > 0:
break

self.assertEqual(len(action_worker._running_liveactions), 1)

# Shutdown the worker to trigger the abandon process.
shutdown_thread = eventlet.spawn(action_worker.shutdown)
# Wait for action runner shutdown sequence to complete
eventlet.sleep(5)

# Make sure the temporary file has been deleted.
self.assertFalse(os.path.isfile(temp_file))

# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) < 1:
break
liveaction_db = LiveAction.get_by_id(liveaction_db.id)

# Verify that _running_liveactions is empty and the liveaction is abandoned.
self.assertEqual(len(action_worker._running_liveactions), 0)
self.assertEqual(
liveaction_db.status,
action_constants.LIVEACTION_STATUS_ABANDONED,
str(liveaction_db),
)

# Wait for the local runner to complete. This will activate the finally block in
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()
shutdown_thread.kill()

@mock.patch.object(
coordination.NoOpDriver,
"get_members",
mock.MagicMock(return_value=coordination.NoOpAsyncResult("member-1")),
)
def test_worker_graceful_shutdown_exit_timeout(self):
cfg.CONF.set_override(
name="graceful_shutdown", override=True, group="actionrunner"
)
cfg.CONF.set_override(
name="exit_still_active_check", override=5, group="actionrunner"
)
action_worker = actions_worker.get_worker()
temp_file = None

# Create a temporary file that is deleted when the file is closed and then set up an
# action to wait for this file to be deleted. This allows this test to run the action
# over a separate thread, run the shutdown sequence on the main thread, and then let
# the local runner to exit gracefully and allow _run_action to finish execution.
with tempfile.NamedTemporaryFile() as fp:
temp_file = fp.name
self.assertIsNotNone(temp_file)
self.assertTrue(os.path.isfile(temp_file))

# Launch the action execution in a separate thread.
params = {"cmd": "while [ -e '%s' ]; do sleep 0.1; done" % temp_file}
liveaction_db = self._get_liveaction_model(
WorkerTestCase.local_action_db, params
)
liveaction_db = LiveAction.add_or_update(liveaction_db)
executions.create_execution_object(liveaction_db)
runner_thread = eventlet.spawn(action_worker._run_action, liveaction_db)

# Wait for the worker up to 10s to add the liveaction to _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) > 0:
break

self.assertEqual(len(action_worker._running_liveactions), 1)

# Shutdown the worker to trigger the abandon process.
shutdown_thread = eventlet.spawn(action_worker.shutdown)
# Continue the excution for 5+ seconds to ensure timeout occurs.
eventlet.sleep(6)

# Make sure the temporary file has been deleted.
self.assertFalse(os.path.isfile(temp_file))

# Wait for the worker up to 10s to remove the liveaction from _running_liveactions.
for i in range(0, int(10 / 0.1)):
eventlet.sleep(0.1)
if len(action_worker._running_liveactions) < 1:
break
liveaction_db = LiveAction.get_by_id(liveaction_db.id)

# Verify that _running_liveactions is empty and the liveaction is abandoned.
self.assertEqual(len(action_worker._running_liveactions), 0)
self.assertEqual(
liveaction_db.status,
action_constants.LIVEACTION_STATUS_ABANDONED,
str(liveaction_db),
)

# Wait for the local runner to complete. This will activate the finally block in
# _run_action but will not result in KeyError because the discard method is used to
# to remove the liveaction from _running_liveactions.
runner_thread.wait()
shutdown_thread.kill()
22 changes: 22 additions & 0 deletions st2common/st2common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,28 @@ def register_opts(ignore_errors=False):
dispatcher_pool_opts, group="actionrunner", ignore_errors=ignore_errors
)

graceful_shutdown_opts = [
cfg.BoolOpt(
"graceful_shutdown",
default=True,
help="This will enable the graceful shutdown and wait for ongoing requests to complete until exit_timeout.",
),
cfg.IntOpt(
"exit_still_active_check",
default=300,
help="How long to wait for process (in seconds) to exit after receiving shutdown signal.",
),
cfg.IntOpt(
"still_active_check_interval",
default=2,
help="Time interval between subsequent queries to check running executions.",
),
]

do_register_opts(
graceful_shutdown_opts, group="actionrunner", ignore_errors=ignore_errors
)

ssh_runner_opts = [
cfg.StrOpt(
"remote_dir",
Expand Down