Skip to content

Commit

Permalink
Support int typed aggregations (#696)
Browse files Browse the repository at this point in the history
and refactor supporting stats classes.
  • Loading branch information
colincadams authored and c24t committed Jul 17, 2019
1 parent e6a1b9e commit 0d00458
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 436 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Fix exporting int-valued stats with sum and lastvalue aggregations
([#696](https://github.com/census-instrumentation/opencensus-python/pull/696))
- Fix cloud format propagator to use decimal span_id encoding instead of hex
([#719](https://github.com/census-instrumentation/opencensus-python/pull/719))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def to_metric(self, desc, tag_values, agg_data):
return metric

elif isinstance(agg_data,
aggregation_data_module.SumAggregationDataFloat):
aggregation_data_module.SumAggregationData):
metric = UnknownMetricFamily(name=metric_name,
documentation=metric_description,
labels=label_keys)
Expand Down
18 changes: 11 additions & 7 deletions contrib/opencensus-ext-prometheus/tests/test_prometheus_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

from datetime import datetime
import copy
import mock
import unittest

Expand Down Expand Up @@ -140,7 +139,8 @@ def test_collector_to_metric_count(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None],
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -158,7 +158,8 @@ def test_collector_to_metric_sum(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None],
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -176,7 +177,8 @@ def test_collector_to_metric_last_value(self):
collector.register_view(view)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None],
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -189,7 +191,8 @@ def test_collector_to_metric_histogram(self):
collector = prometheus.Collector(options=options)
collector.register_view(VIDEO_SIZE_VIEW)
desc = collector.registered_views[list(REGISTERED_VIEW)[0]]
distribution = copy.deepcopy(VIDEO_SIZE_DISTRIBUTION.aggregation_data)
distribution = VIDEO_SIZE_DISTRIBUTION.new_aggregation_data(
VIDEO_SIZE_MEASURE)
distribution.add_sample(280.0 * MiB, None, None)
metric = collector.to_metric(
desc=desc,
Expand Down Expand Up @@ -243,7 +246,7 @@ def test_collector_collect(self):
metric = collector.to_metric(
desc=desc,
tag_values=[tag_value_module.TagValue("value")],
agg_data=agg.aggregation_data)
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(desc['name'], metric.name)
self.assertEqual(desc['documentation'], metric.documentation)
Expand All @@ -262,7 +265,8 @@ def test_collector_collect_with_none_label_value(self):
collector.register_view(view)
desc = collector.registered_views['test3_new_view']
metric = collector.to_metric(
desc=desc, tag_values=[None], agg_data=agg.aggregation_data)
desc=desc, tag_values=[None],
agg_data=agg.new_aggregation_data(VIDEO_SIZE_MEASURE))

self.assertEqual(1, len(metric.samples))
sample = metric.samples[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,8 +1275,7 @@ def test_create_timeseries_disjoint_tags(self, monitoring_resoure_mock):

def test_create_timeseries_from_distribution(self):
"""Check for explicit 0-bound bucket for SD export."""
agg = aggregation_module.DistributionAggregation(
aggregation_type=aggregation_module.Type.DISTRIBUTION)
agg = aggregation_module.DistributionAggregation()

view = view_module.View(
name="example.org/test_view",
Expand Down Expand Up @@ -1328,8 +1327,7 @@ def test_create_timeseries_multiple_tags(self):
create_time_series_list should return a time series for each set of
values in the tag value aggregation map.
"""
agg = aggregation_module.CountAggregation(
aggregation_type=aggregation_module.Type.COUNT)
agg = aggregation_module.CountAggregation()

view = view_module.View(
name="example.org/test_view",
Expand Down Expand Up @@ -1375,12 +1373,10 @@ def test_create_timeseries_invalid_aggregation(self):
v_data = mock.Mock(spec=view_data_module.ViewData)
v_data.view.name = "example.org/base_view"
v_data.view.columns = [tag_key_module.TagKey('base_key')]
v_data.view.aggregation.aggregation_type = \
aggregation_module.Type.NONE
v_data.start_time = TEST_TIME_STR
v_data.end_time = TEST_TIME_STR

base_data = aggregation_data_module.BaseAggregationData(10)
base_data = None
v_data.tag_value_aggregation_data_map = {
(None,): base_data,
}
Expand Down
191 changes: 73 additions & 118 deletions opencensus/stats/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,125 +14,78 @@

import logging

from opencensus.stats import bucket_boundaries
from opencensus.stats import aggregation_data
from opencensus.stats import measure as measure_module
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType


logger = logging.getLogger(__name__)


class Type(object):
""" The type of aggregation function used on a View.
Attributes:
NONE (int): The aggregation type of the view is 'unknown'.
SUM (int): The aggregation type of the view is 'sum'.
COUNT (int): The aggregation type of the view is 'count'.
DISTRIBUTION (int): The aggregation type of the view is 'distribution'.
LASTVALUE (int): The aggregation type of the view is 'lastvalue'.
"""
NONE = 0
SUM = 1
COUNT = 2
DISTRIBUTION = 3
LASTVALUE = 4


class BaseAggregation(object):
"""Aggregation describes how the data collected is aggregated by type of
aggregation and buckets
:type buckets: list(:class: '~opencensus.stats.bucket_boundaries.
BucketBoundaries')
:param buckets: list of endpoints if the aggregation represents a
distribution
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
:param aggregation_type: represents the type of this aggregation
"""
def __init__(self, buckets=None, aggregation_type=Type.NONE):
self._aggregation_type = aggregation_type
self._buckets = buckets or []

@property
def aggregation_type(self):
"""The aggregation type of the current aggregation"""
return self._aggregation_type

@property
def buckets(self):
"""The buckets of the current aggregation"""
return self._buckets


class SumAggregation(BaseAggregation):
"""Sum Aggregation escribes that data collected and aggregated with this
class SumAggregation(object):
"""Sum Aggregation describes that data collected and aggregated with this
method will be summed
:type sum: int or float
:param sum: the sum of the data collected and aggregated
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
:param aggregation_type: represents the type of this aggregation
:param sum: the initial sum to be used in the aggregation
"""
def __init__(self, sum=None, aggregation_type=Type.SUM):
super(SumAggregation, self).__init__(aggregation_type=aggregation_type)
self._sum = aggregation_data.SumAggregationDataFloat(
sum_data=float(sum or 0))
self.aggregation_data = self._sum

@property
def sum(self):
"""The sum of the current aggregation"""
return self._sum
def __init__(self, sum=None):
self._initial_sum = sum or 0

def new_aggregation_data(self, measure):
"""Get a new AggregationData for this aggregation."""
value_type = MetricDescriptorType.to_type_class(
self.get_metric_type(measure))
return aggregation_data.SumAggregationData(
value_type=value_type, sum_data=self._initial_sum)

@staticmethod
def get_metric_type(measure):
"""Get the MetricDescriptorType for the metric produced by this
aggregation and measure.
"""
if isinstance(measure, measure_module.MeasureInt):
return MetricDescriptorType.CUMULATIVE_INT64
if isinstance(measure, measure_module.MeasureFloat):
return MetricDescriptorType.CUMULATIVE_DOUBLE
raise ValueError


class CountAggregation(BaseAggregation):
class CountAggregation(object):
"""Describes that the data collected and aggregated with this method will
be turned into a count value
:type count: int
:param count: represents the count of this aggregation
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
:param aggregation_type: represents the type of this aggregation
:param count: the initial count to be used in the aggregation
"""
def __init__(self, count=0, aggregation_type=Type.COUNT):
super(CountAggregation, self).__init__(
aggregation_type=aggregation_type)
self._count = aggregation_data.CountAggregationData(count)
self.aggregation_data = self._count
def __init__(self, count=0):
self._initial_count = count

@property
def count(self):
"""The count of the current aggregation"""
return self._count
def new_aggregation_data(self, measure=None):
"""Get a new AggregationData for this aggregation."""
return aggregation_data.CountAggregationData(self._initial_count)

@staticmethod
def get_metric_type(measure):
"""Get the MetricDescriptorType for the metric produced by this
aggregation and measure.
"""
return MetricDescriptorType.CUMULATIVE_INT64

class DistributionAggregation(BaseAggregation):

class DistributionAggregation(object):
"""Distribution Aggregation indicates that the desired aggregation is a
histogram distribution
:type boundaries: list(:class:'~opencensus.stats.bucket_boundaries.
BucketBoundaries')
:param boundaries: the bucket endpoints
:type distribution: histogram
:param distribution: histogram of the values of the population
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
:param aggregation_type: represents the type of this aggregation
"""

def __init__(self,
boundaries=None,
distribution=None,
aggregation_type=Type.DISTRIBUTION):
def __init__(self, boundaries=None):
if boundaries:
if not all(boundaries[ii] < boundaries[ii + 1]
for ii in range(len(boundaries) - 1)):
Expand All @@ -147,44 +100,46 @@ def __init__(self,
ii)
boundaries = boundaries[ii:]

super(DistributionAggregation, self).__init__(
buckets=boundaries, aggregation_type=aggregation_type)
self._boundaries = bucket_boundaries.BucketBoundaries(boundaries)
self._distribution = distribution or {}
self.aggregation_data = aggregation_data.DistributionAggregationData(
0, 0, 0, None, boundaries)
self._boundaries = boundaries

@property
def boundaries(self):
"""The boundaries of the current aggregation"""
return self._boundaries
def new_aggregation_data(self, measure=None):
"""Get a new AggregationData for this aggregation."""
return aggregation_data.DistributionAggregationData(
0, 0, 0, None, self._boundaries)

@property
def distribution(self):
"""The distribution of the current aggregation"""
return self._distribution
@staticmethod
def get_metric_type(measure):
"""Get the MetricDescriptorType for the metric produced by this
aggregation and measure.
"""
return MetricDescriptorType.CUMULATIVE_DISTRIBUTION


class LastValueAggregation(BaseAggregation):
class LastValueAggregation(object):
"""Describes that the data collected with this method will
overwrite the last recorded value
:type value: long
:param value: represents the value of this aggregation
:type aggregation_type: :class:`~opencensus.stats.aggregation.Type`
:param aggregation_type: represents the type of this aggregation
:param count: the initial value to be used in the aggregation
"""
def __init__(self, value=0, aggregation_type=Type.LASTVALUE):
super(LastValueAggregation, self).__init__(
aggregation_type=aggregation_type)
self.aggregation_data = aggregation_data.LastValueAggregationData(
value=value)
self._value = value

@property
def value(self):
"""The current recorded value
def __init__(self, value=0):
self._initial_value = value

def new_aggregation_data(self, measure):
"""Get a new AggregationData for this aggregation."""
value_type = MetricDescriptorType.to_type_class(
self.get_metric_type(measure))
return aggregation_data.LastValueAggregationData(
value=self._initial_value, value_type=value_type)

@staticmethod
def get_metric_type(measure):
"""Get the MetricDescriptorType for the metric produced by this
aggregation and measure.
"""
return self._value
if isinstance(measure, measure_module.MeasureInt):
return MetricDescriptorType.GAUGE_INT64
if isinstance(measure, measure_module.MeasureFloat):
return MetricDescriptorType.GAUGE_DOUBLE
raise ValueError
Loading

0 comments on commit 0d00458

Please sign in to comment.