Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engines support on Minions! #29245

Merged
merged 3 commits into from
Dec 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions salt/cli/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ class Minion(parsers.MinionOptionParser): # pylint: disable=no-init
'''
Create a minion server
'''

def _handle_signals(self, signum, sigframe): # pylint: disable=unused-argument
# escalate signal to the process manager processes
self.minion.process_manager.stop_restarting()
self.minion.process_manager.send_signal_to_processes(signum)
# kill any remaining processes
self.minion.process_manager.kill_children()
super(Minion, self)._handle_signals(signum, sigframe)

# pylint: disable=no-member
def prepare(self):
'''
Expand Down
4 changes: 2 additions & 2 deletions salt/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import salt
import salt.loader
import salt.utils
from salt.utils.process import MultiprocessingProcess
from salt.utils.process import SignalHandlingMultiprocessingProcess

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,7 +63,7 @@ def start_engines(opts, proc_mgr):
)


class Engine(MultiprocessingProcess):
class Engine(SignalHandlingMultiprocessingProcess):
'''
Execute the given engine in a new process
'''
Expand Down
45 changes: 27 additions & 18 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import salt.crypt
import salt.loader
import salt.beacons
import salt.engines
import salt.payload
import salt.syspaths
import salt.utils
Expand All @@ -98,7 +99,9 @@
from salt.executors import FUNCTION_EXECUTORS
from salt.utils.debug import enable_sigusr1_handler
from salt.utils.event import tagify
from salt.utils.process import default_signals, SignalHandlingMultiprocessingProcess
from salt.utils.process import (default_signals,
SignalHandlingMultiprocessingProcess,
ProcessManager)
from salt.exceptions import (
CommandExecutionError,
CommandNotFoundError,
Expand Down Expand Up @@ -724,6 +727,29 @@ def __init__(self, opts, timeout=60, safe=True, loaded_base_name=None, io_loop=N
if not salt.utils.is_proxy():
self.opts['grains'] = salt.loader.grains(opts)

log.info('Creating minion process manager')
self.process_manager = ProcessManager(name='MinionProcessManager')
self.io_loop.spawn_callback(self.process_manager.run, async=True)
self.io_loop.spawn_callback(salt.engines.start_engines, self.opts, self.process_manager)

# Install the SIGINT/SIGTERM handlers if not done so far
if signal.getsignal(signal.SIGINT) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGINT, self._handle_signals)

if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
# No custom signal handling was added, install our own
signal.signal(signal.SIGINT, self._handle_signals)

def _handle_signals(self, signum, sigframe): # pylint: disable=unused-argument
self._running = False
# escalate the signals to the process manager
self.process_manager.stop_restarting()
self.process_manager.send_signal_to_processes(signum)
# kill any remaining processes
self.process_manager.kill_children()
exit(0)

def sync_connect_master(self, timeout=None):
'''
Block until we are connected to a master
Expand Down Expand Up @@ -1527,14 +1553,6 @@ def environ_setenv(self, tag, data):
import salt.modules.environ as mod_environ
return mod_environ.setenv(environ, false_unsets, clear_all)

def clean_die(self, signum, frame):
'''
Python does not handle the SIGTERM cleanly, if it is signaled exit
the minion process cleanly
'''
self._running = False
exit(0)

def _pre_tune(self):
'''
Set the minion running flag and issue the appropriate warnings if
Expand Down Expand Up @@ -1713,11 +1731,6 @@ def tune_in(self, start=True):
'''
self._pre_tune()

# Properly exit if a SIGTERM is signalled
if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
# No SIGTERM installed, install ours
signal.signal(signal.SIGTERM, self.clean_die)

# start up the event publisher, so we can see events during startup
self.event_publisher = salt.utils.event.AsyncEventPublisher(
self.opts,
Expand Down Expand Up @@ -1936,10 +1949,6 @@ def tune_in(self, start=True):
'''
Lock onto the publisher. This is the main event loop for the syndic
'''
# Properly exit if a SIGTERM is signalled
if signal.getsignal(signal.SIGTERM) is signal.SIG_DFL:
# No SIGTERM installed, install ours
signal.signal(signal.SIGTERM, self.clean_die)
log.debug('Syndic \'{0}\' trying to tune in'.format(self.opts['id']))

if start:
Expand Down
12 changes: 9 additions & 3 deletions salt/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# Import 3rd-party libs
import salt.ext.six as six
from salt.ext.six.moves import queue, range # pylint: disable=import-error,redefined-builtin
from tornado import gen

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -344,10 +345,12 @@ def send_signal_to_processes(self, signal):
for pid in self._process_map:
os.kill(pid, signal)

def run(self):
@gen.coroutine
def run(self, async=False):
'''
Load and start all available api modules
'''
log.debug('Process Manager starting!')
salt.utils.appendproctitle(self.name)

# make sure to kill the subprocesses if the parent is killed
Expand All @@ -359,18 +362,21 @@ def run(self):
signal.signal(signal.SIGINT, self.kill_children)

while True:
log.debug('Process manager iteration')
try:
# in case someone died while we were waiting...
self.check_children()

if not salt.utils.is_windows():
if not salt.utils.is_windows() and not async:
pid, exit_status = os.wait()
if pid not in self._process_map:
log.debug('Process of pid {0} died, not a known'
' process, will not restart'.format(pid))
continue
self.restart_process(pid)
else:
elif async is True:
yield gen.sleep(10)
elif async is False:
# os.wait() is not supported on Windows.
time.sleep(10)
# OSError is raised if a signal handler is called (SIGTERM) during os.wait
Expand Down