Skip to content

Commit

Permalink
introduce enabled/recording settings (elastic#790)
Browse files Browse the repository at this point in the history
* introduce enabled/recording settings

see elastic/apm#92 (comment)

Co-authored-by: Colton Myers <colton.myers@gmail.com>
  • Loading branch information
beniwohli and basepi committed Sep 14, 2021
1 parent 2358582 commit 009a7c7
Show file tree
Hide file tree
Showing 17 changed files with 199 additions and 73 deletions.
31 changes: 31 additions & 0 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,37 @@ Your service name must only contain characters from the ASCII alphabet, numbers,
The URL for your APM Server.
The URL must be fully qualified, including protocol (`http` or `https`) and port.

[float]
[[config-enabled]]
=== `enabled`

[options="header"]
|============
| Environment | Django/Flask | Default
| `ELASTIC_APM_ENABLED` | `ENABLED` | `true`
|============

Enable or disable the agent.
When set to false, the agent will not collect any data, nor start any background threads.


[float]
[[config-recording]]
=== `recording`

<<dynamic-configuration, image:./images/dynamic-config.svg[] >>

[options="header"]
|============
| Environment | Django/Flask | Default
| `ELASTIC_APM_RECORDING` | `RECORDING` | `false`
|============

Enable or disable recording of events.
If set to false, then the Python agent does not send any events to the Elastic APM server,
and instrumentation overhead is minimized,
but the agent will continue to poll the server for configuration changes.


[float]
[[config-transport-class]]
Expand Down
22 changes: 12 additions & 10 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ def __init__(self, config=None, **inline):
)
self.include_paths_re = stacks.get_path_regex(self.config.include_paths) if self.config.include_paths else None
self.exclude_paths_re = stacks.get_path_regex(self.config.exclude_paths) if self.config.exclude_paths else None
self._metrics = MetricsRegistry(
self.config.metrics_interval / 1000.0, self.queue, ignore_patterns=self.config.disable_metrics
)
self._metrics = MetricsRegistry(self)
for path in self.config.metrics_sets:
self._metrics.register(path)
if self.config.breakdown_metrics:
Expand All @@ -200,8 +198,8 @@ def __init__(self, config=None, **inline):
self._thread_managers["config"] = self.config
else:
self._config_updater = None

self.start_threads()
if config.enabled:
self.start_threads()

def start_threads(self):
with self._thread_starter_lock:
Expand All @@ -210,7 +208,7 @@ def start_threads(self):
self.logger.debug("Detected PID change from %r to %r, starting threads", self._pid, current_pid)
for manager_type, manager in self._thread_managers.items():
self.logger.debug("Starting %s thread", manager_type)
manager.start_thread()
manager.start_thread(pid=current_pid)
self._pid = current_pid

def get_handler(self, name):
Expand All @@ -220,6 +218,8 @@ def capture(self, event_type, date=None, context=None, custom=None, stack=None,
"""
Captures and processes an event and pipes it off to Client.send.
"""
if not self.config.is_recording:
return
if event_type == "Exception":
# never gather log stack for exceptions
stack = False
Expand Down Expand Up @@ -274,7 +274,8 @@ def begin_transaction(self, transaction_type, trace_parent=None, start=None):
:param start: override the start timestamp, mostly useful for testing
:return: the started transaction object
"""
return self.tracer.begin_transaction(transaction_type, trace_parent=trace_parent, start=start)
if self.config.is_recording:
return self.tracer.begin_transaction(transaction_type, trace_parent=trace_parent, start=start)

def end_transaction(self, name=None, result="", duration=None):
"""
Expand All @@ -289,9 +290,10 @@ def end_transaction(self, name=None, result="", duration=None):
return transaction

def close(self):
with self._thread_starter_lock:
for _manager_type, manager in self._thread_managers.items():
manager.stop_thread()
if self.config.enabled:
with self._thread_starter_lock:
for _, manager in self._thread_managers.items():
manager.stop_thread()

def get_service_info(self):
if self._service_info:
Expand Down
24 changes: 22 additions & 2 deletions elasticapm/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,20 +337,38 @@ class Config(_ConfigBase):
framework_version = _ConfigValue("FRAMEWORK_VERSION", default=None)
global_labels = _DictConfigValue("GLOBAL_LABELS", default=None)
disable_send = _BoolConfigValue("DISABLE_SEND", default=False)
enabled = _BoolConfigValue("ENABLED", default=True)
recording = _BoolConfigValue("RECORDING", default=True)
instrument = _BoolConfigValue("INSTRUMENT", default=True)
enable_distributed_tracing = _BoolConfigValue("ENABLE_DISTRIBUTED_TRACING", default=True)
capture_headers = _BoolConfigValue("CAPTURE_HEADERS", default=True)
django_transaction_name_from_route = _BoolConfigValue("DJANGO_TRANSACTION_NAME_FROM_ROUTE", default=False)
disable_log_record_factory = _BoolConfigValue("DISABLE_LOG_RECORD_FACTORY", default=False)
use_elastic_traceparent_header = _BoolConfigValue("USE_ELASTIC_TRACEPARENT_HEADER", default=True)

@property
def is_recording(self):
if not self.enabled:
return False
else:
return self.recording


class VersionedConfig(ThreadManager):
"""
A thin layer around Config that provides versioning
"""

__slots__ = ("_config", "_version", "_first_config", "_first_version", "_lock", "transport", "_update_thread")
__slots__ = (
"_config",
"_version",
"_first_config",
"_first_version",
"_lock",
"transport",
"_update_thread",
"pid",
)

def __init__(self, config_object, version, transport=None):
"""
Expand All @@ -363,6 +381,7 @@ def __init__(self, config_object, version, transport=None):
self.transport = transport
self._lock = threading.Lock()
self._update_thread = None
super(VersionedConfig, self).__init__()

def update(self, version, **config):
"""
Expand Down Expand Up @@ -436,11 +455,12 @@ def update_config(self):

return next_run

def start_thread(self):
def start_thread(self, pid=None):
self._update_thread = IntervalTimer(
self.update_config, 1, "eapm conf updater", daemon=True, evaluate_function_interval=True
)
self._update_thread.start()
super(VersionedConfig, self).start_thread(pid=pid)

def stop_thread(self):
if self._update_thread:
Expand Down
2 changes: 1 addition & 1 deletion elasticapm/contrib/aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def install_tracing(self, app, client):
from elasticapm.contrib.aiohttp.middleware import tracing_middleware

app.middlewares.insert(0, tracing_middleware(app))
if client.config.instrument:
if client.config.instrument and client.config.enabled:
elasticapm.instrument()
2 changes: 1 addition & 1 deletion elasticapm/contrib/django/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def ready(self):
if self.client.config.autoinsert_django_middleware:
self.insert_middleware(django_settings)
register_handlers(self.client)
if self.client.config.instrument:
if self.client.config.instrument and self.client.config.enabled:
instrument(self.client)
else:
self.client.logger.debug("Skipping instrumentation. INSTRUMENT is set to False.")
Expand Down
3 changes: 2 additions & 1 deletion elasticapm/contrib/flask/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


from __future__ import absolute_import

import logging

import flask
Expand Down Expand Up @@ -145,7 +146,7 @@ def init_app(self, app, **defaults):
pass

# Instrument to get spans
if self.client.config.instrument:
if self.client.config.instrument and self.client.config.enabled:
elasticapm.instrumentation.control.instrument()

signals.request_started.connect(self.request_started, sender=app)
Expand Down
2 changes: 1 addition & 1 deletion elasticapm/contrib/opentracing/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, client_instance=None, config=None, scope_manager=None):
"Usage of other scope managers will lead to unpredictable results."
)
self._scope_manager = scope_manager or ThreadLocalScopeManager()
if self._agent.config.instrument:
if self._agent.config.instrument and self._agent.config.enabled:
instrument()

def start_active_span(
Expand Down
6 changes: 5 additions & 1 deletion elasticapm/contrib/tornado/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,9 @@ def __init__(self, app, client=None, **config):
app.elasticapm_client = client

# Don't instrument if debug=True in tornado, unless client.config.debug is True
if (not self.app.settings.get("debug") or client.config.debug) and client.config.instrument:
if (
(not self.app.settings.get("debug") or client.config.debug)
and client.config.instrument
and client.config.enabled
):
elasticapm.instrument()
38 changes: 22 additions & 16 deletions elasticapm/metrics/base_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,18 @@


class MetricsRegistry(ThreadManager):
def __init__(self, collect_interval, queue_func, tags=None, ignore_patterns=None):
def __init__(self, client, tags=None):
"""
Creates a new metric registry
:param collect_interval: the interval to collect metrics from registered metric sets
:param queue_func: the function to call with the collected metrics
:param client: client instance
:param tags:
"""
self._collect_interval = collect_interval
self._queue_func = queue_func
self.client = client
self._metricsets = {}
self._tags = tags or {}
self._collect_timer = None
self._ignore_patterns = ignore_patterns or ()
super(MetricsRegistry, self).__init__()

def register(self, class_path):
"""
Expand All @@ -84,16 +82,18 @@ def collect(self):
Collect metrics from all registered metric sets and queues them for sending
:return:
"""
logger.debug("Collecting metrics")
if self.client.config.is_recording:
logger.debug("Collecting metrics")

for name, metricset in compat.iteritems(self._metricsets):
for data in metricset.collect():
self._queue_func(constants.METRICSET, data)
for _, metricset in compat.iteritems(self._metricsets):
for data in metricset.collect():
self.client.queue(constants.METRICSET, data)

def start_thread(self):
if self._collect_interval:
def start_thread(self, pid=None):
super(MetricsRegistry, self).start_thread(pid=pid)
if self.client.config.metrics_interval:
self._collect_timer = IntervalTimer(
self.collect, self._collect_interval, name="eapm metrics collect timer", daemon=True
self.collect, self.collect_interval, name="eapm metrics collect timer", daemon=True
)
logger.debug("Starting metrics collect timer")
self._collect_timer.start()
Expand All @@ -104,6 +104,14 @@ def stop_thread(self):
self._collect_timer.cancel()
self._collect_timer = None

@property
def collect_interval(self):
return self.client.config.metrics_interval / 1000.0

@property
def ignore_patterns(self):
return self.client.config.disable_metrics or []


class MetricsSet(object):
def __init__(self, registry):
Expand Down Expand Up @@ -159,9 +167,7 @@ def _metric(self, container, metric_class, name, reset_on_collect, labels):
key = (name, labels)
with self._lock:
if key not in container:
if self._registry._ignore_patterns and any(
pattern.match(name) for pattern in self._registry._ignore_patterns
):
if any(pattern.match(name) for pattern in self._registry.ignore_patterns):
metric = noop_metric
elif len(self._gauges) + len(self._counters) + len(self._timers) >= DISTINCT_LABEL_LIMIT:
if not self._label_limit_logged:
Expand Down
9 changes: 5 additions & 4 deletions elasticapm/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
self._flushed = threading.Event()
self._closed = False
self._processors = processors if processors is not None else []
super(Transport, self).__init__()

@property
def _max_flush_time(self):
Expand Down Expand Up @@ -228,13 +229,13 @@ def _flush(self, buffer):
except Exception as e:
self.handle_transport_fail(e)

def start_thread(self):
current_pid = os.getpid()
if (not self._thread or current_pid != self._thread.pid) and not self._closed:
def start_thread(self, pid=None):
super(Transport, self).start_thread(pid=pid)
if (not self._thread or self.pid != self._thread.pid) and not self._closed:
try:
self._thread = threading.Thread(target=self._process_queue, name="eapm event processor thread")
self._thread.daemon = True
self._thread.pid = current_pid
self._thread.pid = self.pid
self._thread.start()
except RuntimeError:
pass
Expand Down
17 changes: 14 additions & 3 deletions elasticapm/utils/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from __future__ import absolute_import

import os
import threading
from timeit import default_timer

Expand All @@ -47,7 +48,7 @@ def __init__(
"""
:param function: the function to run
:param interval: the interval in-between invocations of the function
:param interval: the interval in-between invocations of the function, in milliseconds
:param name: name of the thread
:param args: arguments to call the function with
:param kwargs: keyword arguments to call the function with
Expand Down Expand Up @@ -88,8 +89,18 @@ def cancel(self):


class ThreadManager(object):
def start_thread(self):
raise NotImplementedError()
def __init__(self):
self.pid = None

def start_thread(self, pid=None):
if not pid:
pid = os.getpid()
self.pid = pid

def stop_thread(self):
raise NotImplementedError()

def is_started(self, current_pid=None):
if not current_pid:
current_pid = os.getpid()
return self.pid == current_pid
Loading

0 comments on commit 009a7c7

Please sign in to comment.