Skip to content

Commit

Permalink
sdk/metrics: implement MinMaxSumCount aggregator
Browse files Browse the repository at this point in the history
This aggregator is the default aggregator for measure metrics and
keeps the minimum, maximum, sum and count of those measures.
  • Loading branch information
mauriciovasquezbernal committed Feb 14, 2020
1 parent f4b38b3 commit 18029b4
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
from collections import namedtuple


class Aggregator(abc.ABC):
Expand Down Expand Up @@ -56,3 +57,52 @@ def take_checkpoint(self):

def merge(self, other):
self.checkpoint += other.checkpoint


class MinMaxSumCountAggregator(Aggregator):
"""Agregator for Measure metrics that keeps min, max, sum and count."""

_TYPE = namedtuple("minmaxsumcount", "min max sum count")

@classmethod
def _min(cls, val1, val2):
if val1 is None and val2 is None:
return None
return min(val1 or val2, val2 or val1)

@classmethod
def _max(cls, val1, val2):
if val1 is None and val2 is None:
return None
return max(val1 or val2, val2 or val1)

@classmethod
def _sum(cls, val1, val2):
if val1 is None and val2 is None:
return None
return (val1 or 0) + (val2 or 0)

def __init__(self):
super().__init__()
self.current = self._TYPE(None, None, None, 0)
self.checkpoint = self._TYPE(None, None, None, 0)

def update(self, value):
self.current = self._TYPE(
self._min(self.current.min, value),
self._max(self.current.max, value),
self._sum(self.current.sum, value),
self.current.count + 1,
)

def take_checkpoint(self):
self.checkpoint = self.current
self.current = self._TYPE(None, None, None, 0)

def merge(self, other):
self.checkpoint = self._TYPE(
self._min(self.checkpoint.min, other.checkpoint.min),
self._max(self.checkpoint.max, other.checkpoint.max),
self._sum(self.checkpoint.sum, other.checkpoint.sum),
self.checkpoint.count + other.checkpoint.count,
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
import abc
from typing import Sequence, Type

from opentelemetry.metrics import Counter, MetricT
from opentelemetry.metrics import Counter, Measure, MetricT
from opentelemetry.sdk.metrics.export import MetricRecord
from opentelemetry.sdk.metrics.export.aggregate import (
Aggregator,
CounterAggregator,
MinMaxSumCountAggregator,
)


Expand Down Expand Up @@ -47,6 +48,8 @@ def aggregator_for(self, metric_type: Type[MetricT]) -> Aggregator:
# pylint:disable=R0201
if issubclass(metric_type, Counter):
return CounterAggregator()
if issubclass(metric_type, Measure):
return MinMaxSumCountAggregator()
# TODO: Add other aggregators
return CounterAggregator()

Expand Down
89 changes: 83 additions & 6 deletions opentelemetry-sdk/tests/metrics/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
ConsoleMetricsExporter,
MetricRecord,
)
from opentelemetry.sdk.metrics.export.aggregate import CounterAggregator
from opentelemetry.sdk.metrics.export.aggregate import (
CounterAggregator,
MinMaxSumCountAggregator,
)
from opentelemetry.sdk.metrics.export.batcher import UngroupedBatcher
from opentelemetry.sdk.metrics.export.controller import PushController

Expand Down Expand Up @@ -218,22 +221,21 @@ def test_ungrouped_batcher_process_not_stateful(self):
)


class TestAggregator(unittest.TestCase):
# TODO: test other aggregators once implemented
def test_counter_update(self):
class TestCounterAggregator(unittest.TestCase):
def test_update(self):
counter = CounterAggregator()
counter.update(1.0)
counter.update(2.0)
self.assertEqual(counter.current, 3.0)

def test_counter_checkpoint(self):
def test_checkpoint(self):
counter = CounterAggregator()
counter.update(2.0)
counter.take_checkpoint()
self.assertEqual(counter.current, 0)
self.assertEqual(counter.checkpoint, 2.0)

def test_counter_merge(self):
def test_merge(self):
counter = CounterAggregator()
counter2 = CounterAggregator()
counter.checkpoint = 1.0
Expand All @@ -242,6 +244,81 @@ def test_counter_merge(self):
self.assertEqual(counter.checkpoint, 4.0)


class TestMinMaxSumCountAggregator(unittest.TestCase):
def test_update(self):
mmsc = MinMaxSumCountAggregator()
# test current values without any update
self.assertEqual(
mmsc.current, (None, None, None, 0),
)

# call update with some values
values = (3, 50, 3, 97)
for val in values:
mmsc.update(val)

self.assertEqual(
mmsc.current, (min(values), max(values), sum(values), len(values)),
)

def test_checkpoint(self):
mmsc = MinMaxSumCountAggregator()

# take checkpoint wihtout any update
mmsc.take_checkpoint()
self.assertEqual(
mmsc.checkpoint, (None, None, None, 0),
)

# call update with some values
values = (3, 50, 3, 97)
for val in values:
mmsc.update(val)

mmsc.take_checkpoint()
self.assertEqual(
mmsc.checkpoint,
(min(values), max(values), sum(values), len(values)),
)

self.assertEqual(
mmsc.current, (None, None, None, 0),
)

def test_merge(self):
mmsc1 = MinMaxSumCountAggregator()
mmsc2 = MinMaxSumCountAggregator()

checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
checkpoint2 = MinMaxSumCountAggregator._TYPE(1, 33, 44, 2)

mmsc1.checkpoint = checkpoint1
mmsc2.checkpoint = checkpoint2

mmsc1.merge(mmsc2)

self.assertEqual(
mmsc1.checkpoint,
(
min(checkpoint1.min, checkpoint2.min),
max(checkpoint1.max, checkpoint2.max),
checkpoint1.sum + checkpoint2.sum,
checkpoint1.count + checkpoint2.count,
),
)

def test_merge_with_empty(self):
mmsc1 = MinMaxSumCountAggregator()
mmsc2 = MinMaxSumCountAggregator()

checkpoint1 = MinMaxSumCountAggregator._TYPE(3, 150, 101, 3)
mmsc1.checkpoint = checkpoint1

mmsc1.merge(mmsc2)

self.assertEqual(mmsc1.checkpoint, checkpoint1)


class TestController(unittest.TestCase):
def test_push_controller(self):
meter = mock.Mock()
Expand Down
16 changes: 11 additions & 5 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ def test_record_batch_multiple(self):
meter.record_batch(label_set, record_tuples)
self.assertEqual(counter.get_handle(label_set).aggregator.current, 1.0)
self.assertEqual(gauge.get_handle(label_set).aggregator.current, 5.0)
# TODO: Fix when aggregator implemented for measure
self.assertEqual(measure.get_handle(label_set).aggregator.current, 3.0)
self.assertEqual(
measure.get_handle(label_set).aggregator.current,
(3.0, 3.0, 3.0, 1),
)

def test_record_batch_exists(self):
meter = metrics.Meter()
Expand Down Expand Up @@ -195,9 +197,13 @@ def test_record(self):
kvp = {"key": "value"}
label_set = meter.get_label_set(kvp)
handle = metric.get_handle(label_set)
metric.record(3, label_set)
# TODO: Fix once other aggregators implemented
self.assertEqual(handle.aggregator.current, 3)
values = (37, 42, 7)
for val in values:
metric.record(val, label_set)
self.assertEqual(
handle.aggregator.current,
(min(values), max(values), sum(values), len(values)),
)


class TestCounterHandle(unittest.TestCase):
Expand Down

0 comments on commit 18029b4

Please sign in to comment.