-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
processor_test.go
108 lines (86 loc) · 3.5 KB
/
processor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"
import (
"context"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor/processortest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)
func TestAggregation(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
passThrough bool
}{
{name: "basic_aggregation"},
{name: "histograms_are_aggregated"},
{name: "exp_histograms_are_aggregated"},
{name: "gauges_are_aggregated"},
{name: "summaries_are_aggregated"},
{name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled
{name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled
{name: "gauges_are_passed_through", passThrough: true},
{name: "summaries_are_passed_through", passThrough: true},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var config *Config
for _, tc := range testCases {
config = &Config{Interval: time.Second, PassThrough: PassThrough{Gauge: tc.passThrough, Summary: tc.passThrough}}
t.Run(tc.name, func(t *testing.T) {
// next stores the results of the filter metric processor
next := &consumertest.MetricsSink{}
factory := NewFactory()
mgp, err := factory.CreateMetrics(
context.Background(),
processortest.NewNopSettings(),
config,
next,
)
require.NoError(t, err)
dir := filepath.Join("testdata", tc.name)
md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
require.NoError(t, err)
// Test that ConsumeMetrics works
err = mgp.ConsumeMetrics(ctx, md)
require.NoError(t, err)
require.IsType(t, &Processor{}, mgp)
processor := mgp.(*Processor)
// Pretend we hit the interval timer and call export
processor.exportMetrics()
// All the lookup tables should now be empty
require.Empty(t, processor.rmLookup)
require.Empty(t, processor.smLookup)
require.Empty(t, processor.mLookup)
require.Empty(t, processor.numberLookup)
require.Empty(t, processor.histogramLookup)
require.Empty(t, processor.expHistogramLookup)
require.Empty(t, processor.summaryLookup)
// Exporting again should return nothing
processor.exportMetrics()
// Next should have gotten three data sets:
// 1. Anything left over from ConsumeMetrics()
// 2. Anything exported from exportMetrics()
// 3. An empty entry for the second call to exportMetrics()
allMetrics := next.AllMetrics()
require.Len(t, allMetrics, 3)
nextData := allMetrics[0]
exportData := allMetrics[1]
secondExportData := allMetrics[2]
expectedNextData, err := golden.ReadMetrics(filepath.Join(dir, "next.yaml"))
require.NoError(t, err)
require.NoError(t, pmetrictest.CompareMetrics(expectedNextData, nextData))
expectedExportData, err := golden.ReadMetrics(filepath.Join(dir, "output.yaml"))
require.NoError(t, err)
require.NoError(t, pmetrictest.CompareMetrics(expectedExportData, exportData))
require.NoError(t, pmetrictest.CompareMetrics(pmetric.NewMetrics(), secondExportData), "the second export data should be empty")
})
}
}