Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel and synchronized test execution for loadscraper.TestScrape #28877

Merged
merged 17 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ package loadscraper // import "github.com/open-telemetry/opentelemetry-collector

import (
"context"
"time"

"github.com/shirou/gopsutil/v3/load"
"go.uber.org/zap"
)

// no sampling performed on non windows environments, nothing to do
func setSamplingFrequency(_ time.Duration) {}

// unix based systems sample & compute load averages in the kernel, so nothing to do here
func startSampling(_ context.Context, _ *zap.Logger) error {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"
"errors"
"runtime"
"sync"
"testing"
"time"

"github.com/shirou/gopsutil/v3/load"
"github.com/stretchr/testify/assert"
Expand All @@ -27,13 +29,7 @@ const (
bootTime = 100
)

// Skips test without applying unused rule
var skip = func(t *testing.T, why string) {
t.Skip(why)
}

func TestScrape(t *testing.T) {
skip(t, "Flaky test. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10030")
type testCase struct {
name string
bootTimeFunc func(context.Context) (uint64, error)
Expand Down Expand Up @@ -70,60 +66,96 @@ func TestScrape(t *testing.T) {
},
}
results := make(map[string]pmetric.MetricSlice)

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config)
if test.loadFunc != nil {
scraper.load = test.loadFunc
// triggers scraping to start
startChan := make(chan bool)
// used to lock results map in order to avoid concurrent map writes
resultsMapLock := sync.Mutex{}

testFn := func(t *testing.T, test testCase) {
// wait for messurement to start
<-startChan

scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config)
if test.loadFunc != nil {
scraper.load = test.loadFunc
}
if test.bootTimeFunc != nil {
scraper.bootTime = test.bootTimeFunc
}

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Failed to initialize load scraper: %v", err)
defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }()
if runtime.GOOS == "windows" {
// let it sample
<-time.After(3 * time.Second)
}

md, err := scraper.scrape(context.Background())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)

isPartial := scrapererror.IsPartialScrapeError(err)
assert.True(t, isPartial)
if isPartial {
var scraperErr scrapererror.PartialScrapeError
require.ErrorAs(t, err, &scraperErr)
assert.Equal(t, metricsLen, scraperErr.Failed)
}
if test.bootTimeFunc != nil {
scraper.bootTime = test.bootTimeFunc
}

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Failed to initialize load scraper: %v", err)
defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }()

md, err := scraper.scrape(context.Background())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)

isPartial := scrapererror.IsPartialScrapeError(err)
assert.True(t, isPartial)
if isPartial {
var scraperErr scrapererror.PartialScrapeError
require.ErrorAs(t, err, &scraperErr)
assert.Equal(t, metricsLen, scraperErr.Failed)
}

return
}
require.NoError(t, err, "Failed to scrape metrics: %v", err)
return
}
require.NoError(t, err, "Failed to scrape metrics: %v", err)

if test.bootTimeFunc != nil {
actualBootTime, err := scraper.bootTime(context.Background())
assert.Nil(t, err)
assert.Equal(t, uint64(bootTime), actualBootTime)
}
// expect 3 metrics
assert.Equal(t, 3, md.MetricCount())

metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
// expect a single datapoint for 1m, 5m & 15m load metrics
assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m")
assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m")
assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m")

internal.AssertSameTimeStampForAllMetrics(t, metrics)

// save metrics for additional tests if flag is enabled
if test.saveMetrics {
resultsMapLock.Lock()
results[test.name] = metrics
resultsMapLock.Unlock()
}
}

if test.bootTimeFunc != nil {
actualBootTime, err := scraper.bootTime(context.Background())
assert.Nil(t, err)
assert.Equal(t, uint64(bootTime), actualBootTime)
}
// expect 3 metrics
assert.Equal(t, 3, md.MetricCount())
// used to wait for each test to start to make sure they are all sampling at the same time
var startWg sync.WaitGroup
startWg.Add(len(testCases))

metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
// expect a single datapoint for 1m, 5m & 15m load metrics
assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m")
assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m")
assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m")
// used to wait for each test to finish
var waitWg sync.WaitGroup
waitWg.Add(len(testCases))

internal.AssertSameTimeStampForAllMetrics(t, metrics)
setSamplingFrequency(500 * time.Millisecond)

// save metrics for additional tests if flag is enabled
if test.saveMetrics {
results[test.name] = metrics
}
})
for _, test := range testCases {
go func(t *testing.T, test testCase) {
startWg.Done()
testFn(t, test)
waitWg.Done()
}(t, test)
}

// wait for test goroutines to start
startWg.Wait()
// trigger tests
close(startChan)
// wait for tests to finish
waitWg.Wait()

// Additional test for average per CPU
numCPU := runtime.NumCPU()
for i := 0; i < results[testStandard].Len(); i++ {
Expand All @@ -139,6 +171,10 @@ func assertMetricHasSingleDatapoint(t *testing.T, metric pmetric.Metric, expecte
func assertCompareAveragePerCPU(t *testing.T, average pmetric.Metric, standard pmetric.Metric, numCPU int) {
valAverage := average.Gauge().DataPoints().At(0).DoubleValue()
valStandard := standard.Gauge().DataPoints().At(0).DoubleValue()
if valAverage == 0 && valStandard == 0 {
// nothing to compare, queue is empty
return
}
if numCPU == 1 {
// For hardware with only 1 cpu, results must be very close
assert.InDelta(t, valAverage, valStandard, 0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type sampler struct {
lock sync.RWMutex
}

func setSamplingFrequency(freq time.Duration) {
samplingFrequency = freq
}

func startSampling(_ context.Context, logger *zap.Logger) error {
startupLock.Lock()
defer startupLock.Unlock()
Expand Down Expand Up @@ -93,6 +97,7 @@ func (sw *sampler) startSamplingTicker() {
ticker := time.NewTicker(samplingFrequency)
defer ticker.Stop()

sw.sampleLoad()
for {
select {
case <-ticker.C:
Expand Down
Loading