Skip to content

Commit

Permalink
Normalize unreproducible ocgrpc self-obs metrics in test fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
aabmass committed Jan 19, 2022
1 parent 7e5a692 commit a9c04f8
Show file tree
Hide file tree
Showing 18 changed files with 3,894 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ func main() {
require.NoError(t, testServerExporter.PushMetrics(ctx, metrics), "failed to export metrics to local test server")
require.NoError(t, testServerExporter.Shutdown(ctx))

selfObsMetrics, err := inMemoryOCExporter.Proto(ctx)
require.NoError(t, err)
fixture := &integrationtest.MetricExpectFixture{
CreateMetricDescriptorRequests: testServer.CreateMetricDescriptorRequests(),
CreateTimeSeriesRequests: testServer.CreateTimeSeriesRequests(),
CreateServiceTimeSeriesRequests: testServer.CreateServiceTimeSeriesRequests(),
SelfObservabilityMetrics: inMemoryOCExporter.Proto(),
SelfObservabilityMetrics: selfObsMetrics,
}
test.SaveRecordedFixtures(t, fixture)
}()
Expand Down
312 changes: 272 additions & 40 deletions exporter/collector/internal/integrationtest/fixtures.pb.go

Large diffs are not rendered by default.

27 changes: 25 additions & 2 deletions exporter/collector/internal/integrationtest/fixtures.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,31 @@ message MetricExpectFixture {
message SelfObservabilityMetric {
// metric name
string name = 1;
// stringified value
string val = 2;

// Copied from
// https://github.com/googleapis/googleapis/blob/ab1bf9a5cef888843afb9ef3b70b79587d9a033b/google/api/metric.proto
enum MetricKind {
METRIC_KIND_UNSPECIFIED = 0;
GAUGE = 1;
DELTA = 2;
CUMULATIVE = 3;
}
// The GCM MetricKind that the metric would create
MetricKind metric_kind = 2;

// A slimmed down explicit bucketed histogram
message Histogram {
double sum = 1;
int64 count = 2;
repeated double bucket_bounds = 3;
repeated int64 bucket_counts = 4;
}

oneof value {
int64 int64_value = 5;
double float64_value = 6;
Histogram histogram_value = 7;
};
// stringified labels
map<string, string> labels = 3;
}
68 changes: 57 additions & 11 deletions exporter/collector/internal/integrationtest/inmemoryocexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integrationtest
import (
"context"
"fmt"
"log"
"sort"
"time"

Expand All @@ -28,6 +29,16 @@ import (
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector"
)

var (
ocTypeToMetricKind = map[metricdata.Type]SelfObservabilityMetric_MetricKind{
metricdata.TypeCumulativeInt64: SelfObservabilityMetric_CUMULATIVE,
metricdata.TypeCumulativeFloat64: SelfObservabilityMetric_CUMULATIVE,
metricdata.TypeGaugeInt64: SelfObservabilityMetric_GAUGE,
metricdata.TypeGaugeFloat64: SelfObservabilityMetric_GAUGE,
metricdata.TypeCumulativeDistribution: SelfObservabilityMetric_CUMULATIVE,
}
)

var _ metricexport.Exporter = (*InMemoryOCExporter)(nil)

// OC stats/metrics exporter used to capture self observability metrics
Expand All @@ -41,15 +52,18 @@ func (i *InMemoryOCExporter) ExportMetrics(ctx context.Context, data []*metricda
return nil
}

func (i *InMemoryOCExporter) Proto() []*SelfObservabilityMetric {
func (i *InMemoryOCExporter) Proto(ctx context.Context) ([]*SelfObservabilityMetric, error) {
// Hack to flush stats, see https://tinyurl.com/5hfcxzk2
view.SetReportingPeriod(time.Minute)
i.reader.ReadAndExport(i)
var data []*metricdata.Metric

ctx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
defer cancel()

select {
case <-time.NewTimer(time.Millisecond * 100).C:
return nil
case <-ctx.Done():
return nil, ctx.Err()
case data = <-i.c:
}

Expand All @@ -60,20 +74,52 @@ func (i *InMemoryOCExporter) Proto() []*SelfObservabilityMetric {
for i := 0; i < len(d.Descriptor.LabelKeys); i++ {
labels[d.Descriptor.LabelKeys[i].Key] = ts.LabelValues[i].Value
}
metricKind, ok := ocTypeToMetricKind[d.Descriptor.Type]
if !ok {
return nil, fmt.Errorf("Got unsupported OC metric type %v", d.Descriptor.Type)
}

for _, p := range ts.Points {
selfObsMetrics = append(selfObsMetrics, &SelfObservabilityMetric{
Name: d.Descriptor.Name,
Val: fmt.Sprint(p.Value),
Labels: labels,
})
selfObsMetric := &SelfObservabilityMetric{
Name: d.Descriptor.Name,
MetricKind: metricKind,
Labels: labels,
}

switch value := p.Value.(type) {
case int64:
selfObsMetric.Value = &SelfObservabilityMetric_Int64Value{Int64Value: value}
case float64:
selfObsMetric.Value = &SelfObservabilityMetric_Float64Value{Float64Value: value}
case *metricdata.Distribution:
bucketCounts := []int64{}
for _, bucket := range value.Buckets {
bucketCounts = append(bucketCounts, bucket.Count)
}
selfObsMetric.Value = &SelfObservabilityMetric_HistogramValue{
HistogramValue: &SelfObservabilityMetric_Histogram{
BucketBounds: value.BucketOptions.Bounds,
BucketCounts: bucketCounts,
Count: value.Count,
Sum: value.Sum,
},
}
default:
// Probably don't care about any others so leaving them out for now
log.Printf("Can't handle OpenCensus metric data type %v, update the code", d.Descriptor.Type)
}

selfObsMetrics = append(selfObsMetrics, selfObsMetric)
}
}
}
sort.Slice(selfObsMetrics, func(i, j int) bool {
if selfObsMetrics[i].Name == selfObsMetrics[j].Name {
return fmt.Sprint(selfObsMetrics[i].Labels) < fmt.Sprint(selfObsMetrics[j].Labels)
}
return selfObsMetrics[i].Name < selfObsMetrics[j].Name
})
return selfObsMetrics
return selfObsMetrics, nil
}

// Shutdown unregisters the global OpenCensus views to reset state for the next test
Expand All @@ -88,9 +134,9 @@ func (i *InMemoryOCExporter) Shutdown(ctx context.Context) error {
func NewInMemoryOCViewExporter() (*InMemoryOCExporter, error) {
// Reset our views in case any tests ran before this
view.Unregister(collector.MetricViews()...)
view.Register(collector.MetricViews()...)
view.Unregister(ocgrpc.DefaultClientViews...)
// TODO: Register ocgrpc.DefaultClientViews to test them
view.Register(collector.MetricViews()...)
view.Register(ocgrpc.DefaultClientViews...)

return &InMemoryOCExporter{
c: make(chan []*metricdata.Metric, 1),
Expand Down
5 changes: 4 additions & 1 deletion exporter/collector/internal/integrationtest/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ func TestMetrics(t *testing.T) {
startTime,
endTime,
)

selfObsMetrics, err := inMemoryOCExporter.Proto(ctx)
require.NoError(t, err)
diff := DiffProtos(
&MetricExpectFixture{
CreateTimeSeriesRequests: testServer.CreateTimeSeriesRequests(),
CreateMetricDescriptorRequests: testServer.CreateMetricDescriptorRequests(),
CreateServiceTimeSeriesRequests: testServer.CreateServiceTimeSeriesRequests(),
SelfObservabilityMetrics: inMemoryOCExporter.Proto(),
SelfObservabilityMetrics: selfObsMetrics,
},
expectFixture,
)
Expand Down
32 changes: 32 additions & 0 deletions exporter/collector/internal/integrationtest/testcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ import (
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector"
)

var (
// selfObsMetricsToNormalize is the set of self-observability metrics which may not record
// the same value every time due to side effects. The values of these metrics get cleared
// and are not checked in the fixture. Their labels and types are still checked.
selfObsMetricsToNormalize = map[string]struct{}{
"grpc.io/client/roundtrip_latency": {},
"grpc.io/client/sent_bytes_per_rpc": {},
"grpc.io/client/received_bytes_per_rpc": {},
}
)

type MetricsTestCase struct {
// Name of the test case
Name string
Expand Down Expand Up @@ -200,6 +211,27 @@ func normalizeFixture(fixture *MetricExpectFixture) {
md.Name = ""
}
}

for _, som := range fixture.SelfObservabilityMetrics {
if _, ok := selfObsMetricsToNormalize[som.Name]; ok {
// zero out the specific value type
switch value := som.Value.(type) {
case *SelfObservabilityMetric_Int64Value:
value.Int64Value = 0
case *SelfObservabilityMetric_Float64Value:
value.Float64Value = 0
case *SelfObservabilityMetric_HistogramValue:
bucketCounts := value.HistogramValue.BucketCounts
for i := range bucketCounts {
bucketCounts[i] = 0
}
value.HistogramValue.Count = 0
value.HistogramValue.Sum = 0
default:
som.Value = nil
}
}
}
}

func (m *MetricsTestCase) SkipIfNeeded(t testing.TB) {
Expand Down
Loading

0 comments on commit a9c04f8

Please sign in to comment.