Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable the ability for buffering and aggregation to work at the same #851

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ def initialize(
api_host=None, # type: Optional[str]
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_aggregator=True, # type: bool
statsd_disable_aggregation=True, # type: bool
statsd_disable_buffering=True, # type: bool
statsd_aggregation_flush_interval=2, # type: float
statsd_aggregation_flush_interval=0.3, # type: float
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -78,12 +78,13 @@ def initialize(
(default: True).
:type statsd_disable_buffering: boolean

:param statsd_disable_aggregator: Enable/disable statsd client aggregation support
:param statsd_disable_aggregation: Enable/disable statsd client aggregation support
(default: True).
:type statsd_disable_aggregator: boolean
:type statsd_disable_aggregation: boolean

:param statsd_aggregation_flush_interval: Sets the flush interval for aggregation
(default: 2 seconds)
:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
aggregation/buffering
(default: 0.3 seconds)
:type statsd_aggregation_flush_interval: float

:param statsd_use_default_route: Dynamically set the statsd host to the default route
Expand Down Expand Up @@ -138,7 +139,7 @@ def initialize(
if statsd_constant_tags:
statsd.constant_tags += statsd_constant_tags

if statsd_disable_aggregator:
if statsd_disable_aggregation:
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
Expand Down
158 changes: 55 additions & 103 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@
DEFAULT_PORT = 8125

# Buffering-related values (in seconds)
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
DEFAULT_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001

# Aggregation-related values (in seconds)
DEFAULT_AGGREGATION_FLUSH_INTERVAL = 2
# Env var to enable/disable sending the container ID field
ORIGIN_DETECTION_ENABLED = "DD_ORIGIN_DETECTION_ENABLED"

Expand Down Expand Up @@ -147,8 +145,8 @@ def __init__(
host=DEFAULT_HOST, # type: Text
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
disable_aggregating=True, # type: bool
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
disable_aggregation=True, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
constant_tags=None, # type: Optional[List[str]]
Expand Down Expand Up @@ -238,8 +236,8 @@ def __init__(
it overrides the default value.
:type flush_interval: float

:disable_aggregating: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
:type disable_aggregating: bool
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
:type disable_aggregation: bool

:disable_buffering: If set, metrics are no longered buffered by the client and
all data is sent synchronously to the server
Expand Down Expand Up @@ -447,34 +445,24 @@ def __init__(
self._config_lock = RLock()

self._disable_buffering = disable_buffering
self._disable_aggregating = disable_aggregating
self._disable_aggregation = disable_aggregation

self._flush_interval = flush_interval
self._flush_thread = None
self._flush_thread_stop = threading.Event()
self.aggregator = Aggregator()
# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False
# Currently, we do not allow both aggregation and buffering, we may revisit this in the future
if self._disable_buffering and self._disable_aggregating:
self._send = self._send_to_server
log.debug("Statsd buffering and aggregation is disabled")
elif self._disable_aggregating:
# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.

if not self._disable_buffering:
self._send = self._send_to_buffer
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._send = self._send_to_server
self._disable_buffering = True
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)

if not self._disable_aggregation or not self._disable_buffering:
self._start_flush_thread()
else:
log.debug("Statsd buffering and aggregation is disabled")

self._queue = None
self._sender_thread = None
Expand Down Expand Up @@ -551,30 +539,14 @@ def enable_telemetry(self):
self._telemetry = True

# Note: Invocations of this method should be thread-safe
def _start_flush_thread(
self,
flush_interval,
flush_function,
):
if (self._disable_buffering or not self._disable_aggregating) and flush_function == self.flush_buffered_metrics:
log.debug("Statsd periodic buffer flush is disabled")
def _start_flush_thread(self):
if self._disable_aggregation and self.disable_buffering:
log.debug("Statsd periodic buffer and aggregation flush is disabled")
return
if (
self._disable_aggregating
and flush_function == self.flush_aggregated_metrics
):
log.debug("Statsd periodic aggregating flush is disabled")
return

flush_type = ""
if self._disable_buffering:
flush_type = "aggregation"
else:
flush_type = "buffering"

if flush_interval <= MIN_FLUSH_INTERVAL:
if self._flush_interval <= MIN_FLUSH_INTERVAL:
log.debug(
"the set flush interval for %s is less then the minimum", flush_type
"the set flush interval is less then the minimum"
)
return

Expand All @@ -587,30 +559,31 @@ def _start_flush_thread(
def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
flush_function()

if not self._disable_aggregation:
self.flush_aggregated_metrics()
if not self._disable_buffering:
self.flush_buffered_metrics()
self._flush_thread = threading.Thread(
name="{}_flush_thread".format(self.__class__.__name__),
target=_flush_thread_loop,
args=(self, flush_interval,),
args=(self, self._flush_interval,),
)
self._flush_thread.daemon = True
self._flush_thread.start()
log.debug(
"Statsd %s flush thread registered with period of %s",
flush_type,
flush_interval,
"Statsd flush thread registered with period of %s",
self._flush_interval,
)

# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
return
try:
if self._disable_aggregating:
self.flush_buffered_metrics()
else:
if not self._disable_aggregation:
self.flush_aggregated_metrics()
if not self.disable_buffering:
self.flush_buffered_metrics()
finally:
pass

Expand Down Expand Up @@ -645,43 +618,40 @@ def disable_buffering(self, is_disabled):

self._disable_buffering = is_disabled

# If buffering has been disabled, flush and kill the background thread
# If buffering (and aggregation) has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable the buffering.
if is_disabled:
self._send = self._send_to_server
self._stop_flush_thread()
if self._disable_aggregation and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd buffering is disabled")
else:
self._send = self._send_to_buffer
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
self._start_flush_thread()

def disable_aggregation(self):
with self._config_lock:
# If the toggle didn't change anything, this method is a noop
if self._disable_aggregating:
if self._disable_aggregation:
return

self._disable_aggregating = True
self._disable_aggregation = True

# If aggregation has been disabled, flush and kill the background thread
# If aggregation and buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable aggregation.
self._stop_flush_thread()
if self._disable_aggregation and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, aggregation_flush_interval=DEFAULT_AGGREGATION_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregating:
if not self._disable_aggregation:
return
self._disable_aggregating = False
self._flush_interval = aggregation_flush_interval
self._send = self._send_to_server
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._disable_aggregation = False
self._flush_interval = flush_interval
if self._disable_buffering:
self._send = self._send_to_server
self._start_flush_thread()

@staticmethod
def resolve_host(host, use_default_route):
Expand Down Expand Up @@ -867,7 +837,7 @@ def gauge(
>>> statsd.gauge("users.online", 123)
>>> statsd.gauge("active.connections", 1001, tags=["protocol:http"])
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "g", value, tags, sample_rate)
else:
self.aggregator.gauge(metric, value, tags, sample_rate)
Expand All @@ -890,7 +860,7 @@ def gauge_with_timestamp(
>>> statsd.gauge("users.online", 123, 1713804588)
>>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"])
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "g", value, tags, sample_rate, timestamp)
else:
self.aggregator.gauge(metric, value, tags, sample_rate, timestamp)
Expand All @@ -908,7 +878,7 @@ def count(

>>> statsd.count("page.views", 123)
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "c", value, tags, sample_rate)
else:
self.aggregator.count(metric, value, tags, sample_rate)
Expand All @@ -930,7 +900,7 @@ def count_with_timestamp(

>>> statsd.count("files.transferred", 124, timestamp=1713804588)
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "c", value, tags, sample_rate, timestamp)
else:
self.aggregator.count(metric, value, tags, sample_rate, timestamp)
Expand All @@ -949,7 +919,7 @@ def increment(
>>> statsd.increment("page.views")
>>> statsd.increment("files.transferred", 124)
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "c", value, tags, sample_rate)
else:
self.aggregator.count(metric, value, tags, sample_rate)
Expand All @@ -969,7 +939,7 @@ def decrement(
>>> statsd.decrement("active.connections", 2)
"""
metric_value = -value if value else value
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "c", metric_value, tags, sample_rate)
else:
self.aggregator.count(metric, metric_value, tags, sample_rate)
Expand Down Expand Up @@ -1080,7 +1050,7 @@ def set(self, metric, value, tags=None, sample_rate=None):

>>> statsd.set("visitors.uniques", 999)
"""
if self._disable_aggregating:
if self._disable_aggregation:
self._report(metric, "s", value, tags, sample_rate)
else:
self.aggregator.set(metric, value, tags, sample_rate)
Expand Down Expand Up @@ -1533,16 +1503,7 @@ def pre_fork(self):

def post_fork_parent(self):
"""Restore the client state after a fork in the parent process."""
if self._disable_aggregating:
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._start_flush_thread()
self._start_sender_thread()
self._config_lock.release()

Expand All @@ -1565,16 +1526,7 @@ def post_fork_child(self):
self.close_socket()

with self._config_lock:
if self._disable_aggregating:
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._start_flush_thread()
self._start_sender_thread()

def stop(self):
Expand All @@ -1587,9 +1539,9 @@ def stop(self):

self.disable_background_sender()
self._disable_buffering = True
self._disable_aggregating = True
self.flush_buffered_metrics()
self._disable_aggregation = True
self.flush_aggregated_metrics()
self.flush_buffered_metrics()
self.close_socket()


Expand Down
Loading
Loading