diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go index 8219672a2342..a7f0e86d0dba 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go @@ -7,11 +7,15 @@ package loadscraper // import "github.com/open-telemetry/opentelemetry-collector import ( "context" + "time" "github.com/shirou/gopsutil/v3/load" "go.uber.org/zap" ) +// no sampling performed on non windows environments, nothing to do +func setSamplingFrequency(_ time.Duration) {} + // unix based systems sample & compute load averages in the kernel, so nothing to do here func startSampling(_ context.Context, _ *zap.Logger) error { return nil diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go index cdd1643e43f4..6cfd353fa26d 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "runtime" + "sync" "testing" + "time" "github.com/shirou/gopsutil/v3/load" "github.com/stretchr/testify/assert" @@ -27,13 +29,7 @@ const ( bootTime = 100 ) -// Skips test without applying unused rule -var skip = func(t *testing.T, why string) { - t.Skip(why) -} - func TestScrape(t *testing.T) { - skip(t, "Flaky test. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10030") type testCase struct { name string bootTimeFunc func(context.Context) (uint64, error) @@ -70,60 +66,96 @@ func TestScrape(t *testing.T) { }, } results := make(map[string]pmetric.MetricSlice) - - for _, test := range testCases { - t.Run(test.name, func(t *testing.T) { - scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config) - if test.loadFunc != nil { - scraper.load = test.loadFunc + // triggers scraping to start + startChan := make(chan bool) + // used to lock results map in order to avoid concurrent map writes + resultsMapLock := sync.Mutex{} + + testFn := func(t *testing.T, test testCase) { + // wait for messurement to start + <-startChan + + scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config) + if test.loadFunc != nil { + scraper.load = test.loadFunc + } + if test.bootTimeFunc != nil { + scraper.bootTime = test.bootTimeFunc + } + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err, "Failed to initialize load scraper: %v", err) + defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }() + if runtime.GOOS == "windows" { + // let it sample + <-time.After(3 * time.Second) + } + + md, err := scraper.scrape(context.Background()) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + + isPartial := scrapererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + var scraperErr scrapererror.PartialScrapeError + require.ErrorAs(t, err, &scraperErr) + assert.Equal(t, metricsLen, scraperErr.Failed) } - if test.bootTimeFunc != nil { - scraper.bootTime = test.bootTimeFunc - } - - err := scraper.start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err, "Failed to initialize load scraper: %v", err) - defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }() - - md, err := scraper.scrape(context.Background()) - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - - isPartial := scrapererror.IsPartialScrapeError(err) - assert.True(t, isPartial) - if isPartial { - var scraperErr scrapererror.PartialScrapeError - require.ErrorAs(t, err, &scraperErr) - assert.Equal(t, metricsLen, scraperErr.Failed) - } - return - } - require.NoError(t, err, "Failed to scrape metrics: %v", err) + return + } + require.NoError(t, err, "Failed to scrape metrics: %v", err) + + if test.bootTimeFunc != nil { + actualBootTime, err := scraper.bootTime(context.Background()) + assert.NoError(t, err) + assert.Equal(t, uint64(bootTime), actualBootTime) + } + // expect 3 metrics + assert.Equal(t, 3, md.MetricCount()) + + metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + // expect a single datapoint for 1m, 5m & 15m load metrics + assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m") + assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m") + assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m") + + internal.AssertSameTimeStampForAllMetrics(t, metrics) + + // save metrics for additional tests if flag is enabled + if test.saveMetrics { + resultsMapLock.Lock() + results[test.name] = metrics + resultsMapLock.Unlock() + } + } - if test.bootTimeFunc != nil { - actualBootTime, err := scraper.bootTime(context.Background()) - assert.NoError(t, err) - assert.Equal(t, uint64(bootTime), actualBootTime) - } - // expect 3 metrics - assert.Equal(t, 3, md.MetricCount()) + // used to wait for each test to start to make sure they are all sampling at the same time + var startWg sync.WaitGroup + startWg.Add(len(testCases)) - metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() - // expect a single datapoint for 1m, 5m & 15m load metrics - assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m") - assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m") - assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m") + // used to wait for each test to finish + var waitWg sync.WaitGroup + waitWg.Add(len(testCases)) - internal.AssertSameTimeStampForAllMetrics(t, metrics) + setSamplingFrequency(500 * time.Millisecond) - // save metrics for additional tests if flag is enabled - if test.saveMetrics { - results[test.name] = metrics - } - }) + for _, test := range testCases { + go func(t *testing.T, test testCase) { + startWg.Done() + testFn(t, test) + waitWg.Done() + }(t, test) } + // wait for test goroutines to start + startWg.Wait() + // trigger tests + close(startChan) + // wait for tests to finish + waitWg.Wait() + // Additional test for average per CPU numCPU := runtime.NumCPU() for i := 0; i < results[testStandard].Len(); i++ { @@ -139,6 +171,10 @@ func assertMetricHasSingleDatapoint(t *testing.T, metric pmetric.Metric, expecte func assertCompareAveragePerCPU(t *testing.T, average pmetric.Metric, standard pmetric.Metric, numCPU int) { valAverage := average.Gauge().DataPoints().At(0).DoubleValue() valStandard := standard.Gauge().DataPoints().At(0).DoubleValue() + if valAverage == 0 && valStandard == 0 { + // nothing to compare, queue is empty + return + } if numCPU == 1 { // For hardware with only 1 cpu, results must be very close assert.InDelta(t, valAverage, valStandard, 0.1) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go index 50d439efdadb..14153906ae15 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go @@ -50,6 +50,10 @@ type sampler struct { lock sync.RWMutex } +func setSamplingFrequency(freq time.Duration) { + samplingFrequency = freq +} + func startSampling(_ context.Context, logger *zap.Logger) error { startupLock.Lock() defer startupLock.Unlock() @@ -92,6 +96,7 @@ func (sw *sampler) startSamplingTicker() { ticker := time.NewTicker(samplingFrequency) defer ticker.Stop() + sw.sampleLoad() for { select { case <-ticker.C: