diff --git a/jmxfetch.py b/jmxfetch.py index f029f621ac..87f62d9f64 100644 --- a/jmxfetch.py +++ b/jmxfetch.py @@ -1,11 +1,11 @@ # std -import os -import logging import glob +import logging +import os import subprocess -import time import sys import signal +import time # datadog from util import get_os, yLoader, yDumper @@ -68,6 +68,7 @@ def __init__(self, confd_path, agentConfig): self.check_frequency = DEFAULT_CHECK_FREQUENCY self.jmx_process = None + self.jmx_checks = None def terminate(self): self.jmx_process.terminate() @@ -77,35 +78,59 @@ def _handle_sigterm(self, signum, frame): log.debug("Caught sigterm. Stopping subprocess.") self.jmx_process.terminate() - def initialize(self): - # Gracefully exit on sigterm. + def register_signal_handlers(self): + """ + Enable SIGTERM and SIGINT handlers + """ + # Gracefully exit on sigterm signal.signal(signal.SIGTERM, self._handle_sigterm) # Handle Keyboard Interrupt signal.signal(signal.SIGINT, self._handle_sigterm) - self.run() + def configure(self, check_list=None): + """ + Instantiate JMXFetch parameters. + """ + self.jmx_checks, self.invalid_checks, self.java_bin_path, self.java_options, self.tools_jar_path = \ + self.get_configuration(check_list) + + def should_run(self): + """ + Should JMXFetch run ? + """ + return self.jmx_checks is not None and self.jmx_checks != [] + + def run(self, command=None, check_list=None, reporter=None): + + if check_list or self.jmx_checks is None: + # (Re)set/(re)configure JMXFetch parameters when `check_list` is specified or + # no configuration was found + self.configure(check_list) - def run(self, command=None, checks_list=None, reporter=None): try: command = command or JMX_COLLECT_COMMAND - jmx_checks, invalid_checks, java_bin_path, java_options, tools_jar_path = \ - self._should_run(checks_list) - if len(invalid_checks) > 0: + + if len(self.invalid_checks) > 0: try: - self._write_status_file(invalid_checks) + self._write_status_file(self.invalid_checks) except Exception: log.exception("Error while writing JMX status file") - if len(jmx_checks) > 0: - self._start( - java_bin_path, java_options, jmx_checks, command, reporter, tools_jar_path) - return True + if len(self.jmx_checks) > 0: + return self._start(self.java_bin_path, self.java_options, self.jmx_checks, + command, reporter, self.tools_jar_path) + else: + # We're exiting purposefully, so exit with zero (supervisor's expected + # code). HACK: Sleep a little bit so supervisor thinks we've started cleanly + # and thus can exit cleanly. + time.sleep(4) + log.info("No valid JMX integration was found. Exiting ...") except Exception: log.exception("Error while initiating JMXFetch") raise - def _should_run(self, checks_list): + def get_configuration(self, checks_list=None): """ Return a tuple (jmx_checks, invalid_checks, java_bin_path, java_options) @@ -172,6 +197,9 @@ def _should_run(self, checks_list): def _start(self, path_to_java, java_run_opts, jmx_checks, command, reporter, tools_jar_path): + # Register SIGINT and SIGTERM signal handlers + self.register_signal_handlers() + statsd_port = self.agentConfig.get('dogstatsd_port', "8125") if reporter is None: reporter = "statsd:%s" % str(statsd_port) @@ -361,9 +389,7 @@ def main(config_path=None): confd_path, agentConfig = init(config_path) jmx = JMXFetch(confd_path, agentConfig) - jmx.initialize() - - return 0 + return jmx.run() if __name__ == '__main__': sys.exit(main()) diff --git a/packaging/datadog-agent/source/supervisord.conf b/packaging/datadog-agent/source/supervisord.conf index 4af9d763e4..97b2a6f019 100644 --- a/packaging/datadog-agent/source/supervisord.conf +++ b/packaging/datadog-agent/source/supervisord.conf @@ -45,7 +45,7 @@ command=python agent/jmxfetch.py stdout_logfile=supervisord/logs/jmxfetch.log redirect_stderr=true priority=999 -startsecs=0 +startsecs=3 [group:datadog-agent] programs=forwarder,collector,dogstatsd,jmxfetch diff --git a/packaging/supervisor.conf b/packaging/supervisor.conf index 3ccbafd374..27764127ad 100644 --- a/packaging/supervisor.conf +++ b/packaging/supervisor.conf @@ -53,7 +53,7 @@ stdout_logfile=NONE stderr_logfile=NONE redirect_stderr=true priority=999 -startsecs=0 +startsecs=3 [group:datadog-agent] programs=forwarder,collector,dogstatsd,jmxfetch diff --git a/supervisord.dev.conf b/supervisord.dev.conf index 60167ee215..10e63db24d 100644 --- a/supervisord.dev.conf +++ b/supervisord.dev.conf @@ -43,7 +43,7 @@ command=python jmxfetch.py stdout_logfile=jmxfetch.log redirect_stderr=true priority=999 -startsecs=0 +startsecs=3 [group:datadog-agent] programs=forwarder,collector,dogstatsd,jmxfetch diff --git a/win32/agent.py b/win32/agent.py index 1236008e2f..2c0d4fb051 100644 --- a/win32/agent.py +++ b/win32/agent.py @@ -63,6 +63,10 @@ def __init__(self, args): self._max_failed_heartbeats = \ MAX_FAILED_HEARTBEATS * agentConfig['check_freq'] / SERVICE_SLEEP_INTERVAL + # Watch JMXFetch restarts + self._MAX_JMXFETCH_RESTARTS = 3 + self._count_jmxfetch_restarts = 0 + # Keep a list of running processes so we can start/end as needed. # Processes will start started in order and stopped in reverse order. self.procs = { @@ -124,6 +128,15 @@ def _restart_proc(self, proc_name): # Make a new proc instances because multiprocessing # won't let you call .start() twice on the same instance. old_proc = self.procs[proc_name] + + # Watch JMXFetch restarts + if proc_name == 'jmxfetch': + self._count_jmxfetch_restarts += 1 + if self._count_jmxfetch_restarts >= self._MAX_JMXFETCH_RESTARTS: + servicemanager.LogInfoMsg( + "JMXFetch reached the limit of restarts. Restarting for the last time...") + old_proc.is_enabled = False + if proc_name == 'collector': new_proc = old_proc.__class__( old_proc.config, self.hostname, heartbeat=self._collector_send_heartbeat) @@ -240,7 +253,6 @@ class JMXFetchProcess(multiprocessing.Process): def __init__(self, agentConfig, hostname): multiprocessing.Process.__init__(self, name='jmxfetch') self.config = agentConfig - self.is_enabled = True self.hostname = hostname osname = get_os() @@ -251,14 +263,21 @@ def __init__(self, agentConfig, hostname): "the Agent is currently deployed.\n" % e.args[0]) self.jmx_daemon = JMXFetch(confd_path, agentConfig) + self.jmx_daemon.configure() + self.is_enabled = self.jmx_daemon.should_run() def run(self): - log.debug("Windows Service - Starting JMXFetch") - self.jmx_daemon.run() + from config import initialize_logging; initialize_logging('windows_jmxfetch') + if self.is_enabled: + log.debug("Windows Service - Starting JMXFetch") + self.jmx_daemon.run() + else: + log.info("No JMXFetch integration found, not starting it.") def stop(self): - log.debug("Windows Service - Stopping JMXFetch") - self.jmx_daemon.terminate() + if self.is_enabled: + log.debug("Windows Service - Stopping JMXFetch") + self.jmx_daemon.terminate() if __name__ == '__main__':