-
Notifications
You must be signed in to change notification settings - Fork 814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revamp Docker check, more metrics, less bugs, support of CoreOS #1043
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,10 @@ | |
DEFAULT_MAX_CONTAINERS = 20 | ||
EVENT_TYPE = SOURCE_TYPE_NAME = 'docker' | ||
|
||
LXC_METRICS = [ | ||
CGROUP_METRICS = [ | ||
{ | ||
"cgroup": "memory", | ||
"file": "%s/%s/memory.stat", | ||
"file": "memory.stat", | ||
"metrics": { | ||
"active_anon": ("docker.mem.active_anon", "gauge"), | ||
"active_file": ("docker.mem.active_file", "gauge"), | ||
|
@@ -53,7 +53,7 @@ | |
}, | ||
{ | ||
"cgroup": "cpuacct", | ||
"file": "%s/%s/cpuacct.stat", | ||
"file": "cpuacct.stat", | ||
"metrics": { | ||
"user": ("docker.cpu.user", "rate"), | ||
"system": ("docker.cpu.system", "rate"), | ||
|
@@ -72,6 +72,7 @@ | |
|
||
SOCKET_TIMEOUT = 5 | ||
|
||
|
||
class UnixHTTPConnection(httplib.HTTPConnection, object): | ||
"""Class used in conjuction with UnixSocketHandler to make urllib2 | ||
compatible with Unix sockets.""" | ||
|
@@ -110,71 +111,65 @@ def unix_open(self, req): | |
|
||
|
||
class Docker(AgentCheck): | ||
def __init__(self, *args, **kwargs): | ||
super(Docker, self).__init__(*args, **kwargs) | ||
def __init__(self, name, init_config, agentConfig): | ||
AgentCheck.__init__(self, name, init_config, agentConfig) | ||
self._mountpoints = {} | ||
self.cgroup_path_prefix = None # Depending on the version | ||
for metric in LXC_METRICS: | ||
self._mountpoints[metric["cgroup"]] = self._find_cgroup(metric["cgroup"]) | ||
self._path_prefix = None | ||
docker_root = init_config.get('docker_root', '/') | ||
for metric in CGROUP_METRICS: | ||
self._mountpoints[metric["cgroup"]] = self._find_cgroup(metric["cgroup"], docker_root) | ||
self._last_event_collection_ts = defaultdict(lambda: None) | ||
self.url_opener = urllib2.build_opener(UnixSocketHandler()) | ||
self.should_get_size = True | ||
|
||
@property | ||
def path_prefix(self): | ||
if self._path_prefix is None: | ||
metric = LXC_METRICS[0] | ||
mountpoint = self._mountpoints[metric["cgroup"]] | ||
stat_file_lxc = os.path.join(mountpoint, "lxc") | ||
stat_file_docker = os.path.join(mountpoint, "docker") | ||
|
||
if os.path.exists(stat_file_lxc): | ||
self._path_prefix = "lxc" | ||
elif os.path.exists(stat_file_docker): | ||
self._path_prefix = "docker" | ||
else: | ||
raise Exception("Cannot find Docker cgroup file. If you are using Docker 0.9 or 0.10, it is a known bug in Docker fixed in Docker 0.11") | ||
return self._path_prefix | ||
self._cgroup_filename_pattern = None | ||
|
||
def _find_cgroup_filename_pattern(self): | ||
if self._mountpoints: | ||
# We try with different cgroups so that it works even if only one is properly working | ||
for mountpoint in self._mountpoints.values(): | ||
stat_file_path_lxc = os.path.join(mountpoint, "lxc") | ||
stat_file_path_docker = os.path.join(mountpoint, "docker") | ||
stat_file_path_coreos = os.path.join(mountpoint, "system.slice") | ||
|
||
if os.path.exists(stat_file_path_lxc): | ||
return os.path.join('%(mountpoint)s/lxc/%(id)s/%(file)s') | ||
elif os.path.exists(stat_file_path_docker): | ||
return os.path.join('%(mountpoint)s/docker/%(id)s/%(file)s') | ||
elif os.path.exists(stat_file_path_coreos): | ||
return os.path.join('%(mountpoint)s/system.slice/docker-%(id)s.scope/%(file)s') | ||
|
||
raise Exception("Cannot find Docker cgroup directory. Be sure your system is supported.") | ||
|
||
def _get_cgroup_file(self, cgroup, container_id, filename): | ||
# This can't be initialized at startup because cgroups may not be mounted | ||
if not self._cgroup_filename_pattern: | ||
self._cgroup_filename_pattern = self._find_cgroup_filename_pattern() | ||
|
||
return self._cgroup_filename_pattern % (dict( | ||
mountpoint=self._mountpoints[cgroup], | ||
id=container_id, | ||
file=filename, | ||
)) | ||
|
||
def check(self, instance): | ||
tags = instance.get("tags") or [] | ||
skipped_cgroup = 0 | ||
|
||
try: | ||
self._process_events(self._get_events(instance)) | ||
except socket.timeout: | ||
except (socket.timeout, urllib2.URLError): | ||
self.warning('Timeout during socket connection. Events will be missing.') | ||
|
||
if self.should_get_size: | ||
try: | ||
containers = self._get_containers(instance, with_size=True) | ||
except socket.timeout: | ||
# Probably because of: https://github.com/DataDog/dd-agent/issues/963 | ||
# Then we should stop trying to get size info | ||
self.log.info('Cannot get container size because of API timeout. Turn size flag off.') | ||
self.should_get_size = False | ||
|
||
if not self.should_get_size: | ||
containers = self._get_containers(instance, with_size=False) | ||
|
||
if not containers: | ||
containers = [] | ||
self.warning("No containers are running.") | ||
return | ||
|
||
self.gauge("docker.containers.running", len(containers)) | ||
self._count_images(instance) | ||
containers = self._get_and_count_containers(instance) | ||
|
||
max_containers = instance.get('max_containers', DEFAULT_MAX_CONTAINERS) | ||
|
||
if not instance.get("exclude") or not instance.get("include"): | ||
if len(containers) > max_containers: | ||
self.warning("Too many containers to collect. Please refine the containers to collect by editing the configuration file. Truncating to %s containers" % max_containers) | ||
self.warning("Too many containers to collect. Please refine the containers to collect" | ||
"by editing the configuration file. Truncating to %s containers" % max_containers) | ||
containers = containers[:max_containers] | ||
|
||
collected_containers = 0 | ||
for container in containers: | ||
container_tags = list(tags) | ||
container_tags = instance.get("tags", []) | ||
for name in container["Names"]: | ||
container_tags.append(self._make_tag("name", name.lstrip("/"))) | ||
for key in DOCKER_TAGS: | ||
|
@@ -187,27 +182,21 @@ def check(self, instance): | |
for key, (dd_key, metric_type) in DOCKER_METRICS.items(): | ||
if key in container: | ||
getattr(self, metric_type)(dd_key, int(container[key]), tags=container_tags) | ||
for metric in LXC_METRICS: | ||
mountpoint = self._mountpoints[metric["cgroup"]] | ||
stat_file = os.path.join(mountpoint, metric["file"] % (self.path_prefix, container["Id"])) | ||
for cgroup in CGROUP_METRICS: | ||
stat_file = self._get_cgroup_file(cgroup["cgroup"], container['Id'], cgroup['file']) | ||
stats = self._parse_cgroup_file(stat_file) | ||
if stats: | ||
for key, (dd_key, metric_type) in metric["metrics"].items(): | ||
if key.startswith("total_") and not instance.get("collect_total"): | ||
for key, (dd_key, metric_type) in cgroup['metrics'].items(): | ||
if key.startswith('total_') and not instance.get('collect_total'): | ||
continue | ||
if key in stats: | ||
getattr(self, metric_type)(dd_key, int(stats[key]), tags=container_tags) | ||
else: | ||
skipped_cgroup += 1 | ||
|
||
collected_containers += 1 | ||
if collected_containers >= max_containers: | ||
self.warning("Too many containers are matching the current configuration. Some containers will not be collected. Please refine your configuration") | ||
break | ||
|
||
if skipped_cgroup and skipped_cgroup == collected_containers * len(LXC_METRICS): | ||
raise IOError("We were unable to open cgroup files. If you are using Docker 0.9 or 0.10, it is a known bug in Docker fixed in Docker 0.11") | ||
|
||
def _process_events(self, events): | ||
for ev in events: | ||
self.log.debug("Creating event for %s" % ev) | ||
|
@@ -220,6 +209,37 @@ def _process_events(self, events): | |
'event_object': ev['from'], | ||
}) | ||
|
||
def _count_images(self, instance): | ||
tags = instance.get("tags", []) | ||
active_images = len(self._get_images(instance, get_all=False)) | ||
all_images = len(self._get_images(instance, get_all=True)) | ||
|
||
self.gauge("docker.images.available", active_images, tags=tags) | ||
self.gauge("docker.images.intermediate", (all_images - active_images), tags=tags) | ||
|
||
def _get_and_count_containers(self, instance): | ||
tags = instance.get("tags", []) | ||
|
||
try: | ||
containers = self._get_containers(instance, with_size=self.should_get_size) | ||
except (socket.timeout, urllib2.URLError): | ||
# Probably because of: https://github.com/DataDog/dd-agent/issues/963 | ||
# Then we should stop trying to get size info | ||
self.log.info("Cannot get container size because of API timeout. Stop collecting it.") | ||
self.should_get_size = False | ||
containers = self._get_containers(instance, with_size=self.should_get_size) | ||
|
||
if not containers: | ||
containers = [] | ||
self.warning("No containers are running.") | ||
return | ||
|
||
stopped_containers_count = len(self._get_containers(instance, get_all=True)) - len(containers) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we get all at once and them filter the one running and the one not running so we query the api just once ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If only...But we can't, it is not in the API: https://docs.docker.com/reference/api/docker_remote_api_v1.13/#list-containers That's why I have to do this dirty trick. |
||
self.gauge("docker.containers.running", len(containers), tags=tags) | ||
self.gauge("docker.containers.stopped", stopped_containers_count, tags=tags) | ||
|
||
return containers | ||
|
||
|
||
def _make_tag(self, key, value): | ||
return "%s:%s" % (key.lower(), value.strip()) | ||
|
@@ -238,9 +258,14 @@ def _is_tag_included(tag): | |
return True | ||
return False | ||
|
||
def _get_containers(self, instance, with_size=True): | ||
"""Gets the list of running containers in Docker.""" | ||
return self._get_json("%(url)s/containers/json" % instance, params={'size': with_size}) | ||
|
||
def _get_containers(self, instance, with_size=False, get_all=False): | ||
"""Gets the list of running/all containers in Docker.""" | ||
return self._get_json("%(url)s/containers/json" % instance, params={'size': with_size, 'all': get_all}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the first Docker version that is supporting the "all" parameter ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It always existed. |
||
|
||
def _get_images(self, instance, with_size=True, get_all=False): | ||
"""Gets the list of images in Docker.""" | ||
return self._get_json("%(url)s/images/json" % instance, params={'all': get_all}) | ||
|
||
def _get_events(self, instance): | ||
"""Get the list of events """ | ||
|
@@ -275,35 +300,35 @@ def _get_json(self, uri, params=None, multi=False): | |
|
||
return json.loads(response) | ||
|
||
def _find_cgroup(self, hierarchy): | ||
def _find_cgroup(self, hierarchy, docker_root): | ||
"""Finds the mount point for a specified cgroup hierarchy. Works with | ||
old style and new style mounts.""" | ||
try: | ||
fp = open("/proc/mounts") | ||
fp = open(os.path.join(docker_root, "/proc/mounts")) | ||
mounts = map(lambda x: x.split(), fp.read().splitlines()) | ||
finally: | ||
fp.close() | ||
cgroup_mounts = filter(lambda x: x[2] == "cgroup", mounts) | ||
if len(cgroup_mounts) == 0: | ||
raise Exception("Can't find mounted cgroups. If you run the Agent inside a container," | ||
" please refer to the documentation.") | ||
# Old cgroup style | ||
if len(cgroup_mounts) == 1: | ||
return cgroup_mounts[0][1] | ||
return os.path.join(docker_root, cgroup_mounts[0][1]) | ||
for _, mountpoint, _, opts, _, _ in cgroup_mounts: | ||
if hierarchy in opts: | ||
return mountpoint | ||
return os.path.join(docker_root, mountpoint) | ||
|
||
def _parse_cgroup_file(self, file_): | ||
def _parse_cgroup_file(self, stat_file): | ||
"""Parses a cgroup pseudo file for key/values.""" | ||
fp = None | ||
self.log.debug("Opening file: %s" % file_) | ||
self.log.debug("Opening file: %s" % stat_file) | ||
try: | ||
try: | ||
fp = open(file_) | ||
return dict(map(lambda x: x.split(), fp.read().splitlines())) | ||
except IOError: | ||
# Can be because of Docker 0.9/0.10 bug or because the container got stopped | ||
# Count this kind of exception, if it happens to often it is because of the bug | ||
self.log.info("Can't open %s. Metrics for this container are skipped." % file_) | ||
return None | ||
fp = open(stat_file) | ||
return dict(map(lambda x: x.split(), fp.read().splitlines())) | ||
except IOError: | ||
# Can be because the container got stopped | ||
self.log.info("Can't open %s. Metrics for this container are skipped." % stat_file) | ||
finally: | ||
if fp is not None: | ||
fp.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the default behavior work with existing users that don't have this new parameter set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this parameter is useful when the Agent run inside a container. When we do that, we can mount some part of the host filesystem to a directory, which should be this
docker_root
. Then we use it as the root for our Docker daemon. By default, it is/
, which means that we keep the normal behavior.