Skip to content

Commit

Permalink
Merge pull request #1561 from DataDog/yann/is-jmx-enabled-2
Browse files Browse the repository at this point in the history
[jmxfetch] fix windows bootloop 🐛
  • Loading branch information
yannmh committed Apr 21, 2015
2 parents 0999057 + 06ac2f6 commit e1c2fc9
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 62 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Changes
=======

# 5.3.1 / 04-21-2015
**Windows only**

### Details
https://github.com/DataDog/dd-agent/compare/5.3.0...5.3.1

### Changes
* [BUGFIX] JMXFetch: Fix bootloop issue when no JMX integration is set. See [#1561][]


# 5.3.0 / 04-16-2015
### Details
https://github.com/DataDog/dd-agent/compare/5.2.2...5.3.0
Expand Down Expand Up @@ -1558,6 +1568,7 @@ If you use ganglia, you want this version.
[#1511]: https://github.com/DataDog/dd-agent/issues/1511
[#1512]: https://github.com/DataDog/dd-agent/issues/1512
[#1518]: https://github.com/DataDog/dd-agent/issues/1518
[#1561]: https://github.com/DataDog/dd-agent/issues/1561
[@AirbornePorcine]: https://github.com/AirbornePorcine
[@CaptTofu]: https://github.com/CaptTofu
[@Osterjour]: https://github.com/Osterjour
Expand Down Expand Up @@ -1613,4 +1624,4 @@ If you use ganglia, you want this version.
[@stefan-mees]: https://github.com/stefan-mees
[@takus]: https://github.com/takus
[@tomduckering]: https://github.com/tomduckering
[@walkeran]: https://github.com/walkeran
[@walkeran]: https://github.com/walkeran
3 changes: 2 additions & 1 deletion checks/check_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ def to_dict(self):
status_info['checks'][cs.name] = {'instances': {}}
if cs.init_failed_error:
status_info['checks'][cs.name]['init_failed'] = True
status_info['checks'][cs.name]['traceback'] = cs.init_failed_traceback
status_info['checks'][cs.name]['traceback'] = \
cs.init_failed_traceback or cs.init_failed_error
else:
status_info['checks'][cs.name] = {'instances': {}}
status_info['checks'][cs.name]['init_failed'] = False
Expand Down
2 changes: 1 addition & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import yaml

# CONSTANTS
AGENT_VERSION = "5.3.0"
AGENT_VERSION = "5.4.0"
DATADOG_CONF = "datadog.conf"
DEFAULT_CHECK_FREQUENCY = 15 # seconds
LOGGING_MAX_BYTES = 5 * 1024 * 1024
Expand Down
86 changes: 62 additions & 24 deletions jmxfetch.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# std
import os
import logging
import glob
import logging
import os
import subprocess
import time
import sys
import signal
import time

# datadog
from util import get_os, yLoader, yDumper
Expand Down Expand Up @@ -68,6 +68,7 @@ def __init__(self, confd_path, agentConfig):
self.check_frequency = DEFAULT_CHECK_FREQUENCY

self.jmx_process = None
self.jmx_checks = None

def terminate(self):
self.jmx_process.terminate()
Expand All @@ -77,35 +78,63 @@ def _handle_sigterm(self, signum, frame):
log.debug("Caught sigterm. Stopping subprocess.")
self.jmx_process.terminate()

def initialize(self):
# Gracefully exit on sigterm.
signal.signal(signal.SIGTERM, self._handle_sigterm)
def register_signal_handlers(self):
"""
Enable SIGTERM and SIGINT handlers
"""
try:
# Gracefully exit on sigterm
signal.signal(signal.SIGTERM, self._handle_sigterm)

# Handle Keyboard Interrupt
signal.signal(signal.SIGINT, self._handle_sigterm)
# Handle Keyboard Interrupt
signal.signal(signal.SIGINT, self._handle_sigterm)

self.run()
except ValueError:
log.exception("Unable to register signal handlers.")

def configure(self, check_list=None):
"""
Instantiate JMXFetch parameters.
"""
self.jmx_checks, self.invalid_checks, self.java_bin_path, self.java_options, self.tools_jar_path = \
self.get_configuration(check_list)

def should_run(self):
"""
Should JMXFetch run ?
"""
return self.jmx_checks is not None and self.jmx_checks != []

def run(self, command=None, check_list=None, reporter=None):

if check_list or self.jmx_checks is None:
# (Re)set/(re)configure JMXFetch parameters when `check_list` is specified or
# no configuration was found
self.configure(check_list)

def run(self, command=None, checks_list=None, reporter=None):
try:
command = command or JMX_COLLECT_COMMAND
jmx_checks, invalid_checks, java_bin_path, java_options, tools_jar_path = \
self._should_run(checks_list)
if len(invalid_checks) > 0:

if len(self.invalid_checks) > 0:
try:
self._write_status_file(invalid_checks)
self._write_status_file(self.invalid_checks)
except Exception:
log.exception("Error while writing JMX status file")

if len(jmx_checks) > 0:
self._start(
java_bin_path, java_options, jmx_checks, command, reporter, tools_jar_path)
return True
if len(self.jmx_checks) > 0:
return self._start(self.java_bin_path, self.java_options, self.jmx_checks,
command, reporter, self.tools_jar_path)
else:
# We're exiting purposefully, so exit with zero (supervisor's expected
# code). HACK: Sleep a little bit so supervisor thinks we've started cleanly
# and thus can exit cleanly.
time.sleep(4)
log.info("No valid JMX integration was found. Exiting ...")
except Exception:
log.exception("Error while initiating JMXFetch")
raise

def _should_run(self, checks_list):
def get_configuration(self, checks_list=None):
"""
Return a tuple (jmx_checks, invalid_checks, java_bin_path, java_options)
Expand Down Expand Up @@ -171,7 +200,6 @@ def _should_run(self, checks_list):
return (jmx_checks, invalid_checks, java_bin_path, java_options, tools_jar_path)

def _start(self, path_to_java, java_run_opts, jmx_checks, command, reporter, tools_jar_path):

statsd_port = self.agentConfig.get('dogstatsd_port', "8125")
if reporter is None:
reporter = "statsd:%s" % str(statsd_port)
Expand Down Expand Up @@ -219,12 +247,24 @@ def _start(self, path_to_java, java_run_opts, jmx_checks, command, reporter, too
log.info("Running %s" % " ".join(subprocess_args))
jmx_process = subprocess.Popen(subprocess_args, close_fds=True)
self.jmx_process = jmx_process

# Register SIGINT and SIGTERM signal handlers
self.register_signal_handlers()

# Wait for JMXFetch to return
jmx_process.wait()

return jmx_process.returncode

except OSError:
log.exception("Couldn't launch JMXTerm. Is java in your PATH?")
java_path_msg = "Couldn't launch JMXTerm. Is Java in your PATH ?"
log.exception(java_path_msg)
invalid_checks = {}
for check in jmx_checks:
check_name = check.split('.')[0]
check_name = check_name.encode('ascii', 'ignore')
invalid_checks[check_name] = java_path_msg
self._write_status_file(invalid_checks)
raise
except Exception:
log.exception("Couldn't launch JMXFetch")
Expand Down Expand Up @@ -361,9 +401,7 @@ def main(config_path=None):
confd_path, agentConfig = init(config_path)

jmx = JMXFetch(confd_path, agentConfig)
jmx.initialize()

return 0
return jmx.run()

if __name__ == '__main__':
sys.exit(main())
2 changes: 1 addition & 1 deletion packaging/datadog-agent/source/supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ command=python agent/jmxfetch.py
stdout_logfile=supervisord/logs/jmxfetch.log
redirect_stderr=true
priority=999
startsecs=0
startsecs=3

[group:datadog-agent]
programs=forwarder,collector,dogstatsd,jmxfetch
2 changes: 1 addition & 1 deletion packaging/supervisor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ stdout_logfile=NONE
stderr_logfile=NONE
redirect_stderr=true
priority=999
startsecs=0
startsecs=3

[group:datadog-agent]
programs=forwarder,collector,dogstatsd,jmxfetch
2 changes: 1 addition & 1 deletion supervisord.dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ command=python jmxfetch.py
stdout_logfile=jmxfetch.log
redirect_stderr=true
priority=999
startsecs=0
startsecs=3

[group:datadog-agent]
programs=forwarder,collector,dogstatsd,jmxfetch
92 changes: 60 additions & 32 deletions win32/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
log = logging.getLogger(__name__)

SERVICE_SLEEP_INTERVAL = 1
MAX_FAILED_HEARTBEATS = 8 # runs of collector
MAX_FAILED_HEARTBEATS = 8 # runs of collector


class AgentSvc(win32serviceutil.ServiceFramework):
_svc_name_ = "DatadogAgent"
Expand Down Expand Up @@ -63,14 +64,18 @@ def __init__(self, args):
self._max_failed_heartbeats = \
MAX_FAILED_HEARTBEATS * agentConfig['check_freq'] / SERVICE_SLEEP_INTERVAL

# Watch JMXFetch restarts
self._MAX_JMXFETCH_RESTARTS = 3
self._count_jmxfetch_restarts = 0

# Keep a list of running processes so we can start/end as needed.
# Processes will start started in order and stopped in reverse order.
self.procs = {
'forwarder': DDForwarder(config, self.hostname),
'collector': DDAgent(agentConfig, self.hostname,
heartbeat=self._collector_send_heartbeat),
'dogstatsd': DogstatsdProcess(config, self.hostname),
'jmxfetch': JMXFetchProcess(config, self.hostname),
'forwarder': ProcessWatchDog("forwarder", DDForwarder(config, self.hostname)),
'collector': ProcessWatchDog("collector", DDAgent(agentConfig, self.hostname,
heartbeat=self._collector_send_heartbeat)),
'dogstatsd': ProcessWatchDog("dogstatsd", DogstatsdProcess(config, self.hostname)),
'jmxfetch': ProcessWatchDog("jmxfetch", JMXFetchProcess(config, self.hostname), 3),
}

def SvcStop(self):
Expand Down Expand Up @@ -99,9 +104,9 @@ def SvcDoRun(self):
while self.running:
# Restart any processes that might have died.
for name, proc in self.procs.iteritems():
if not proc.is_alive() and proc.is_enabled:
servicemanager.LogInfoMsg("%s has died. Restarting..." % proc.name)
self._restart_proc(name)
if not proc.is_alive() and proc.is_enabled():
servicemanager.LogInfoMsg("%s has died. Restarting..." % name)
proc.restart()

self._check_collector_blocked()

Expand All @@ -117,25 +122,49 @@ def _check_collector_blocked(self):
if self._collector_failed_heartbeats > self._max_failed_heartbeats:
servicemanager.LogInfoMsg(
"%s was unresponsive for too long. Restarting..." % 'collector')
self._restart_proc('collector')
self.procs['collector'].restart()
self._collector_failed_heartbeats = 0

def _restart_proc(self, proc_name):

class ProcessWatchDog(object):
"""
Monitor the attached process.
Restarts when it exits until the limit set is reached.
"""
def __init__(self, name, process, max_restarts=5):
self._name = name
self._process = process
self._count_restarts = 0
self._MAX_RESTARTS = max_restarts

def start(self):
return self._process.start()

def terminate(self):
return self._process.terminate()

def is_alive(self):
return self._process.is_alive()

def is_enabled(self):
return self._process.is_enabled

def restart(self):
self._count_restarts += 1
if self._count_restarts >= self._MAX_RESTARTS:
servicemanager.LogInfoMsg(
"%s reached the limit of restarts. Not restarting..." % self._name)
self._process.is_enabled = False
return

# Make a new proc instances because multiprocessing
# won't let you call .start() twice on the same instance.
old_proc = self.procs[proc_name]
if proc_name == 'collector':
new_proc = old_proc.__class__(
old_proc.config, self.hostname, heartbeat=self._collector_send_heartbeat)
else:
new_proc = old_proc.__class__(old_proc.config, self.hostname)
if self._process.is_alive():
self._process.terminate()

if old_proc.is_alive():
old_proc.terminate()
del self.procs[proc_name]
self._process = self._process.__class__(self._process.config, self._process.hostname)
self._process.start()

new_proc.start()
self.procs[proc_name] = new_proc

class DDAgent(multiprocessing.Process):
def __init__(self, agentConfig, hostname, heartbeat=None):
Expand Down Expand Up @@ -240,25 +269,24 @@ class JMXFetchProcess(multiprocessing.Process):
def __init__(self, agentConfig, hostname):
multiprocessing.Process.__init__(self, name='jmxfetch')
self.config = agentConfig
self.is_enabled = True
self.hostname = hostname

osname = get_os()
try:
osname = get_os()
confd_path = get_confd_path(osname)
except PathNotFound, e:
log.error("No conf.d folder found at '%s' or in the directory where"
"the Agent is currently deployed.\n" % e.args[0])
self.jmx_daemon = JMXFetch(confd_path, agentConfig)
self.jmx_daemon.configure()
self.is_enabled = self.jmx_daemon.should_run()

self.jmx_daemon = JMXFetch(confd_path, agentConfig)
except PathNotFound:
self.is_enabled = False

def run(self):
log.debug("Windows Service - Starting JMXFetch")
self.jmx_daemon.run()
if self.is_enabled:
self.jmx_daemon.run()

def stop(self):
log.debug("Windows Service - Stopping JMXFetch")
self.jmx_daemon.terminate()
pass


if __name__ == '__main__':
Expand Down

0 comments on commit e1c2fc9

Please sign in to comment.