Skip to content

Commit

Permalink
Add support for histogram to metrics exporter. (#258)
Browse files Browse the repository at this point in the history
BUG=210164184
  • Loading branch information
tbarker25 authored Jan 6, 2022
1 parent f57a56a commit ad21f8e
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 2 deletions.
76 changes: 74 additions & 2 deletions exporter/collector/metricsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ func (m *metricMapper) metricToTimeSeries(
ts := m.summaryPointToTimeSeries(resource, extraLabels, metric, summary, points.At(i))
timeSeries = append(timeSeries, ts...)
}
case pdata.MetricDataTypeHistogram:
hist := metric.Histogram()
points := hist.DataPoints()
for i := 0; i < points.Len(); i++ {
ts := m.histogramToTimeSeries(resource, extraLabels, metric, hist, points.At(i))
timeSeries = append(timeSeries, ts)
}
case pdata.MetricDataTypeExponentialHistogram:
eh := metric.ExponentialHistogram()
points := eh.DataPoints()
Expand Down Expand Up @@ -413,8 +420,39 @@ func (m *metricMapper) exemplars(exs pdata.ExemplarSlice) []*distribution.Distri
return exemplars
}

// histogramPoint maps a histogram data point into a GCM point.
func (m *metricMapper) histogramPoint(point pdata.HistogramDataPoint) *monitoringpb.TypedValue {
counts := make([]int64, len(point.BucketCounts()))
for i, v := range point.BucketCounts() {
counts[i] = int64(v)
}

mean := float64(0)
if point.Count() > 0 { // Avoid divide-by-zero
mean = float64(point.Sum() / float64(point.Count()))
}

return &monitoringpb.TypedValue{
Value: &monitoringpb.TypedValue_DistributionValue{
DistributionValue: &distribution.Distribution{
Count: int64(point.Count()),
Mean: mean,
BucketCounts: counts,
BucketOptions: &distribution.Distribution_BucketOptions{
Options: &distribution.Distribution_BucketOptions_ExplicitBuckets{
ExplicitBuckets: &distribution.Distribution_BucketOptions_Explicit{
Bounds: point.ExplicitBounds(),
},
},
},
Exemplars: m.exemplars(point.Exemplars()),
},
},
}
}

// Maps an exponential distribution into a GCM point.
func (m *metricMapper) exponentialPoint(point pdata.ExponentialHistogramDataPoint) *monitoringpb.TypedValue {
func (m *metricMapper) exponentialHistogramPoint(point pdata.ExponentialHistogramDataPoint) *monitoringpb.TypedValue {
// First calculate underflow bucket with all negatives + zeros.
underflow := point.ZeroCount()
for _, v := range point.Negative().BucketCounts() {
Expand Down Expand Up @@ -464,6 +502,40 @@ func (m *metricMapper) exponentialPoint(point pdata.ExponentialHistogramDataPoin
}
}

func (m *metricMapper) histogramToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
metric pdata.Metric,
_ pdata.Histogram,
point pdata.HistogramDataPoint,
) *monitoringpb.TimeSeries {
// We treat deltas as cumulatives w/ resets.
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.histogramPoint(point)
return &monitoringpb.TimeSeries{
Resource: resource,
Unit: metric.Unit(),
MetricKind: metricKind,
ValueType: metricpb.MetricDescriptor_DISTRIBUTION,
Points: []*monitoringpb.Point{{
Interval: &monitoringpb.TimeInterval{
StartTime: startTime,
EndTime: endTime,
},
Value: value,
}},
Metric: &metricpb.Metric{
Type: m.metricNameToType(metric.Name()),
Labels: mergeLabels(
attributesToLabels(point.Attributes()),
extraLabels,
),
},
}
}

func (m *metricMapper) exponentialHistogramToTimeSeries(
resource *monitoredrespb.MonitoredResource,
extraLabels labels,
Expand All @@ -475,7 +547,7 @@ func (m *metricMapper) exponentialHistogramToTimeSeries(
metricKind := metricpb.MetricDescriptor_CUMULATIVE
startTime := timestamppb.New(point.StartTimestamp().AsTime())
endTime := timestamppb.New(point.Timestamp().AsTime())
value := m.exponentialPoint(point)
value := m.exponentialHistogramPoint(point)
return &monitoringpb.TimeSeries{
Resource: resource,
Unit: metric.Unit(),
Expand Down
64 changes: 64 additions & 0 deletions exporter/collector/metricsexporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,70 @@ func TestMergeLabels(t *testing.T) {
)
}

func TestHistogramPointToTimeSeries(t *testing.T) {
cfg := createDefaultConfig()
cfg.ProjectID = "myproject"
mapper := metricMapper{cfg: cfg}
mr := &monitoredrespb.MonitoredResource{}
metric := pdata.NewMetric()
metric.SetName("myhist")
metric.SetDataType(pdata.MetricDataTypeHistogram)
unit := "1"
metric.SetUnit(unit)
hist := metric.Histogram()
point := hist.DataPoints().AppendEmpty()
end := start.Add(time.Hour)
point.SetStartTimestamp(pdata.NewTimestampFromTime(start))
point.SetTimestamp(pdata.NewTimestampFromTime(end))
point.SetBucketCounts([]uint64{1, 2, 3, 4, 5})
point.SetCount(15)
point.SetSum(42)
point.SetExplicitBounds([]float64{10, 20, 30, 40})
exemplar := point.Exemplars().AppendEmpty()
exemplar.SetDoubleVal(2)
exemplar.SetTimestamp(pdata.NewTimestampFromTime(end))
exemplar.SetTraceID(pdata.NewTraceID([16]byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6}))
exemplar.SetSpanID(pdata.NewSpanID([8]byte{0, 1, 2, 3, 4, 5, 6, 7}))
exemplar.FilteredAttributes().InsertString("test", "extra")

ts := mapper.histogramToTimeSeries(mr, labels{}, metric, hist, point)
// Verify aspects
assert.Equal(t, metricpb.MetricDescriptor_CUMULATIVE, ts.MetricKind)
assert.Equal(t, metricpb.MetricDescriptor_DISTRIBUTION, ts.ValueType)
assert.Equal(t, unit, ts.Unit)
assert.Same(t, mr, ts.Resource)

assert.Equal(t, "workload.googleapis.com/myhist", ts.Metric.Type)
assert.Equal(t, map[string]string{}, ts.Metric.Labels)

assert.Nil(t, ts.Metadata)

assert.Len(t, ts.Points, 1)
assert.Equal(t, &monitoringpb.TimeInterval{
StartTime: timestamppb.New(start),
EndTime: timestamppb.New(end),
}, ts.Points[0].Interval)
hdp := ts.Points[0].Value.GetDistributionValue()
assert.Equal(t, int64(15), hdp.Count)
assert.ElementsMatch(t, []int64{1, 2, 3, 4, 5}, hdp.BucketCounts)
assert.Equal(t, float64(2.8), hdp.Mean)
assert.Equal(t, []float64{10, 20, 30, 40}, hdp.BucketOptions.GetExplicitBuckets().Bounds)
assert.Len(t, hdp.Exemplars, 1)
ex := hdp.Exemplars[0]
assert.Equal(t, float64(2), ex.Value)
assert.Equal(t, timestamppb.New(end), ex.Timestamp)
// We should see trace + dropped labels
assert.Len(t, ex.Attachments, 2)
spanctx := &monitoringpb.SpanContext{}
err := ex.Attachments[0].UnmarshalTo(spanctx)
assert.Nil(t, err)
assert.Equal(t, "projects/myproject/traces/00010203040506070809010203040506/spans/0001020304050607", spanctx.SpanName)
dropped := &monitoringpb.DroppedLabels{}
err = ex.Attachments[1].UnmarshalTo(dropped)
assert.Nil(t, err)
assert.Equal(t, map[string]string{"test": "extra"}, dropped.Label)
}

func TestExponentialHistogramPointToTimeSeries(t *testing.T) {
cfg := createDefaultConfig()
cfg.ProjectID = "myproject"
Expand Down

0 comments on commit ad21f8e

Please sign in to comment.