From ad21f8e9a9648a6829a9192cdd64e277684765bd Mon Sep 17 00:00:00 2001 From: Thomas Barker Date: Thu, 6 Jan 2022 15:15:37 +0000 Subject: [PATCH] Add support for histogram to metrics exporter. (#258) BUG=210164184 --- exporter/collector/metricsexporter.go | 76 +++++++++++++++++++++- exporter/collector/metricsexporter_test.go | 64 ++++++++++++++++++ 2 files changed, 138 insertions(+), 2 deletions(-) diff --git a/exporter/collector/metricsexporter.go b/exporter/collector/metricsexporter.go index e8b20d5f7..75490e844 100644 --- a/exporter/collector/metricsexporter.go +++ b/exporter/collector/metricsexporter.go @@ -265,6 +265,13 @@ func (m *metricMapper) metricToTimeSeries( ts := m.summaryPointToTimeSeries(resource, extraLabels, metric, summary, points.At(i)) timeSeries = append(timeSeries, ts...) } + case pdata.MetricDataTypeHistogram: + hist := metric.Histogram() + points := hist.DataPoints() + for i := 0; i < points.Len(); i++ { + ts := m.histogramToTimeSeries(resource, extraLabels, metric, hist, points.At(i)) + timeSeries = append(timeSeries, ts) + } case pdata.MetricDataTypeExponentialHistogram: eh := metric.ExponentialHistogram() points := eh.DataPoints() @@ -413,8 +420,39 @@ func (m *metricMapper) exemplars(exs pdata.ExemplarSlice) []*distribution.Distri return exemplars } +// histogramPoint maps a histogram data point into a GCM point. +func (m *metricMapper) histogramPoint(point pdata.HistogramDataPoint) *monitoringpb.TypedValue { + counts := make([]int64, len(point.BucketCounts())) + for i, v := range point.BucketCounts() { + counts[i] = int64(v) + } + + mean := float64(0) + if point.Count() > 0 { // Avoid divide-by-zero + mean = float64(point.Sum() / float64(point.Count())) + } + + return &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DistributionValue{ + DistributionValue: &distribution.Distribution{ + Count: int64(point.Count()), + Mean: mean, + BucketCounts: counts, + BucketOptions: &distribution.Distribution_BucketOptions{ + Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{ + ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{ + Bounds: point.ExplicitBounds(), + }, + }, + }, + Exemplars: m.exemplars(point.Exemplars()), + }, + }, + } +} + // Maps an exponential distribution into a GCM point. -func (m *metricMapper) exponentialPoint(point pdata.ExponentialHistogramDataPoint) *monitoringpb.TypedValue { +func (m *metricMapper) exponentialHistogramPoint(point pdata.ExponentialHistogramDataPoint) *monitoringpb.TypedValue { // First calculate underflow bucket with all negatives + zeros. underflow := point.ZeroCount() for _, v := range point.Negative().BucketCounts() { @@ -464,6 +502,40 @@ func (m *metricMapper) exponentialPoint(point pdata.ExponentialHistogramDataPoin } } +func (m *metricMapper) histogramToTimeSeries( + resource *monitoredrespb.MonitoredResource, + extraLabels labels, + metric pdata.Metric, + _ pdata.Histogram, + point pdata.HistogramDataPoint, +) *monitoringpb.TimeSeries { + // We treat deltas as cumulatives w/ resets. + metricKind := metricpb.MetricDescriptor_CUMULATIVE + startTime := timestamppb.New(point.StartTimestamp().AsTime()) + endTime := timestamppb.New(point.Timestamp().AsTime()) + value := m.histogramPoint(point) + return &monitoringpb.TimeSeries{ + Resource: resource, + Unit: metric.Unit(), + MetricKind: metricKind, + ValueType: metricpb.MetricDescriptor_DISTRIBUTION, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: startTime, + EndTime: endTime, + }, + Value: value, + }}, + Metric: &metricpb.Metric{ + Type: m.metricNameToType(metric.Name()), + Labels: mergeLabels( + attributesToLabels(point.Attributes()), + extraLabels, + ), + }, + } +} + func (m *metricMapper) exponentialHistogramToTimeSeries( resource *monitoredrespb.MonitoredResource, extraLabels labels, @@ -475,7 +547,7 @@ func (m *metricMapper) exponentialHistogramToTimeSeries( metricKind := metricpb.MetricDescriptor_CUMULATIVE startTime := timestamppb.New(point.StartTimestamp().AsTime()) endTime := timestamppb.New(point.Timestamp().AsTime()) - value := m.exponentialPoint(point) + value := m.exponentialHistogramPoint(point) return &monitoringpb.TimeSeries{ Resource: resource, Unit: metric.Unit(), diff --git a/exporter/collector/metricsexporter_test.go b/exporter/collector/metricsexporter_test.go index 674094ba4..d26946665 100644 --- a/exporter/collector/metricsexporter_test.go +++ b/exporter/collector/metricsexporter_test.go @@ -95,6 +95,70 @@ func TestMergeLabels(t *testing.T) { ) } +func TestHistogramPointToTimeSeries(t *testing.T) { + cfg := createDefaultConfig() + cfg.ProjectID = "myproject" + mapper := metricMapper{cfg: cfg} + mr := &monitoredrespb.MonitoredResource{} + metric := pdata.NewMetric() + metric.SetName("myhist") + metric.SetDataType(pdata.MetricDataTypeHistogram) + unit := "1" + metric.SetUnit(unit) + hist := metric.Histogram() + point := hist.DataPoints().AppendEmpty() + end := start.Add(time.Hour) + point.SetStartTimestamp(pdata.NewTimestampFromTime(start)) + point.SetTimestamp(pdata.NewTimestampFromTime(end)) + point.SetBucketCounts([]uint64{1, 2, 3, 4, 5}) + point.SetCount(15) + point.SetSum(42) + point.SetExplicitBounds([]float64{10, 20, 30, 40}) + exemplar := point.Exemplars().AppendEmpty() + exemplar.SetDoubleVal(2) + exemplar.SetTimestamp(pdata.NewTimestampFromTime(end)) + exemplar.SetTraceID(pdata.NewTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6})) + exemplar.SetSpanID(pdata.NewSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7})) + exemplar.FilteredAttributes().InsertString("test", "extra") + + ts := mapper.histogramToTimeSeries(mr, labels{}, metric, hist, point) + // Verify aspects + assert.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind) + assert.Equal(t, metricpb.MetricDescriptor_DISTRIBUTION, ts.ValueType) + assert.Equal(t, unit, ts.Unit) + assert.Same(t, mr, ts.Resource) + + assert.Equal(t, "workload.googleapis.com/myhist", ts.Metric.Type) + assert.Equal(t, map[string]string{}, ts.Metric.Labels) + + assert.Nil(t, ts.Metadata) + + assert.Len(t, ts.Points, 1) + assert.Equal(t, &monitoringpb.TimeInterval{ + StartTime: timestamppb.New(start), + EndTime: timestamppb.New(end), + }, ts.Points[0].Interval) + hdp := ts.Points[0].Value.GetDistributionValue() + assert.Equal(t, int64(15), hdp.Count) + assert.ElementsMatch(t, []int64{1, 2, 3, 4, 5}, hdp.BucketCounts) + assert.Equal(t, float64(2.8), hdp.Mean) + assert.Equal(t, []float64{10, 20, 30, 40}, hdp.BucketOptions.GetExplicitBuckets().Bounds) + assert.Len(t, hdp.Exemplars, 1) + ex := hdp.Exemplars[0] + assert.Equal(t, float64(2), ex.Value) + assert.Equal(t, timestamppb.New(end), ex.Timestamp) + // We should see trace + dropped labels + assert.Len(t, ex.Attachments, 2) + spanctx := &monitoringpb.SpanContext{} + err := ex.Attachments[0].UnmarshalTo(spanctx) + assert.Nil(t, err) + assert.Equal(t, "projects/myproject/traces/00010203040506070809010203040506/spans/0001020304050607", spanctx.SpanName) + dropped := &monitoringpb.DroppedLabels{} + err = ex.Attachments[1].UnmarshalTo(dropped) + assert.Nil(t, err) + assert.Equal(t, map[string]string{"test": "extra"}, dropped.Label) +} + func TestExponentialHistogramPointToTimeSeries(t *testing.T) { cfg := createDefaultConfig() cfg.ProjectID = "myproject"