Skip to content

Commit

Permalink
Parallel and synchronized test execution for loadscraper.TestScrape (#…
Browse files Browse the repository at this point in the history
…28877)

**Description:** 
This is not a pretty refactor but I believe this is proper way to fix
the test.

I was running into `0 is not lower than 0` error when `Processor Queue
Length` on my system was properly reported as 0.
I added a special case not to fail in this case.

Then the main fix here is making test cases run in parallel as
concurrent as possible.
Windows is special case when it comes to reporting CPU metrics as it
uses sampling. This sampling is happening every 5 seconds.
While sampling it loads current load from counter and does 1/5/15
minutes average.
As tests are running sequentially, this current load can be different. 

To work around this each test case is started in a goroutine while main
test goroutine waits for all of them to start.
Each subtest waits for signal to start processing. This signal comes
from main goroutine.
This way we achieve as close execution as possible. However there's no
guarantee but it should be way more stable than it was.
To make test even more realistic, I changed sequencing frequency to 500
milliseconds in a test and let 3 seconds for scraper to do some actual
work and not evaluate on a single scrape.

I ran more than 1200 consecutive executions without failure on my
machine.

**Link to tracking Issue:** #10030 

**Testing:** Unit test

**Documentation:**
  • Loading branch information
michalpristas authored Mar 30, 2024
1 parent 66df53c commit 4a71e9d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ package loadscraper // import "github.com/open-telemetry/opentelemetry-collector

import (
"context"
"time"

"github.com/shirou/gopsutil/v3/load"
"go.uber.org/zap"
)

// no sampling performed on non windows environments, nothing to do
func setSamplingFrequency(_ time.Duration) {}

// unix based systems sample & compute load averages in the kernel, so nothing to do here
func startSampling(_ context.Context, _ *zap.Logger) error {
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"context"
"errors"
"runtime"
"sync"
"testing"
"time"

"github.com/shirou/gopsutil/v3/load"
"github.com/stretchr/testify/assert"
Expand All @@ -27,13 +29,7 @@ const (
bootTime = 100
)

// Skips test without applying unused rule
var skip = func(t *testing.T, why string) {
t.Skip(why)
}

func TestScrape(t *testing.T) {
skip(t, "Flaky test. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10030")
type testCase struct {
name string
bootTimeFunc func(context.Context) (uint64, error)
Expand Down Expand Up @@ -70,60 +66,96 @@ func TestScrape(t *testing.T) {
},
}
results := make(map[string]pmetric.MetricSlice)

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config)
if test.loadFunc != nil {
scraper.load = test.loadFunc
// triggers scraping to start
startChan := make(chan bool)
// used to lock results map in order to avoid concurrent map writes
resultsMapLock := sync.Mutex{}

testFn := func(t *testing.T, test testCase) {
// wait for messurement to start
<-startChan

scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config)
if test.loadFunc != nil {
scraper.load = test.loadFunc
}
if test.bootTimeFunc != nil {
scraper.bootTime = test.bootTimeFunc
}

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Failed to initialize load scraper: %v", err)
defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }()
if runtime.GOOS == "windows" {
// let it sample
<-time.After(3 * time.Second)
}

md, err := scraper.scrape(context.Background())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)

isPartial := scrapererror.IsPartialScrapeError(err)
assert.True(t, isPartial)
if isPartial {
var scraperErr scrapererror.PartialScrapeError
require.ErrorAs(t, err, &scraperErr)
assert.Equal(t, metricsLen, scraperErr.Failed)
}
if test.bootTimeFunc != nil {
scraper.bootTime = test.bootTimeFunc
}

err := scraper.start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err, "Failed to initialize load scraper: %v", err)
defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }()

md, err := scraper.scrape(context.Background())
if test.expectedErr != "" {
assert.EqualError(t, err, test.expectedErr)

isPartial := scrapererror.IsPartialScrapeError(err)
assert.True(t, isPartial)
if isPartial {
var scraperErr scrapererror.PartialScrapeError
require.ErrorAs(t, err, &scraperErr)
assert.Equal(t, metricsLen, scraperErr.Failed)
}

return
}
require.NoError(t, err, "Failed to scrape metrics: %v", err)
return
}
require.NoError(t, err, "Failed to scrape metrics: %v", err)

if test.bootTimeFunc != nil {
actualBootTime, err := scraper.bootTime(context.Background())
assert.NoError(t, err)
assert.Equal(t, uint64(bootTime), actualBootTime)
}
// expect 3 metrics
assert.Equal(t, 3, md.MetricCount())

metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
// expect a single datapoint for 1m, 5m & 15m load metrics
assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m")
assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m")
assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m")

internal.AssertSameTimeStampForAllMetrics(t, metrics)

// save metrics for additional tests if flag is enabled
if test.saveMetrics {
resultsMapLock.Lock()
results[test.name] = metrics
resultsMapLock.Unlock()
}
}

if test.bootTimeFunc != nil {
actualBootTime, err := scraper.bootTime(context.Background())
assert.NoError(t, err)
assert.Equal(t, uint64(bootTime), actualBootTime)
}
// expect 3 metrics
assert.Equal(t, 3, md.MetricCount())
// used to wait for each test to start to make sure they are all sampling at the same time
var startWg sync.WaitGroup
startWg.Add(len(testCases))

metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
// expect a single datapoint for 1m, 5m & 15m load metrics
assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m")
assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m")
assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m")
// used to wait for each test to finish
var waitWg sync.WaitGroup
waitWg.Add(len(testCases))

internal.AssertSameTimeStampForAllMetrics(t, metrics)
setSamplingFrequency(500 * time.Millisecond)

// save metrics for additional tests if flag is enabled
if test.saveMetrics {
results[test.name] = metrics
}
})
for _, test := range testCases {
go func(t *testing.T, test testCase) {
startWg.Done()
testFn(t, test)
waitWg.Done()
}(t, test)
}

// wait for test goroutines to start
startWg.Wait()
// trigger tests
close(startChan)
// wait for tests to finish
waitWg.Wait()

// Additional test for average per CPU
numCPU := runtime.NumCPU()
for i := 0; i < results[testStandard].Len(); i++ {
Expand All @@ -139,6 +171,10 @@ func assertMetricHasSingleDatapoint(t *testing.T, metric pmetric.Metric, expecte
func assertCompareAveragePerCPU(t *testing.T, average pmetric.Metric, standard pmetric.Metric, numCPU int) {
valAverage := average.Gauge().DataPoints().At(0).DoubleValue()
valStandard := standard.Gauge().DataPoints().At(0).DoubleValue()
if valAverage == 0 && valStandard == 0 {
// nothing to compare, queue is empty
return
}
if numCPU == 1 {
// For hardware with only 1 cpu, results must be very close
assert.InDelta(t, valAverage, valStandard, 0.1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ type sampler struct {
lock sync.RWMutex
}

func setSamplingFrequency(freq time.Duration) {
samplingFrequency = freq
}

func startSampling(_ context.Context, logger *zap.Logger) error {
startupLock.Lock()
defer startupLock.Unlock()
Expand Down Expand Up @@ -92,6 +96,7 @@ func (sw *sampler) startSamplingTicker() {
ticker := time.NewTicker(samplingFrequency)
defer ticker.Stop()

sw.sampleLoad()
for {
select {
case <-ticker.C:
Expand Down

0 comments on commit 4a71e9d

Please sign in to comment.