From 878a33e9d9b0d28a7ecee7913c3e0c6e772e4fc6 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 3 Nov 2023 12:00:16 +0100 Subject: [PATCH 1/4] Parallel and synchronized test execution for loadscraper.TestScrape --- .../scraper/loadscraper/load_scraper_test.go | 136 +++++++++++------- .../loadscraper/load_scraper_windows.go | 1 + 2 files changed, 89 insertions(+), 48 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go index 00c41da24b7f..3c033328b68d 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go @@ -7,7 +7,9 @@ import ( "context" "errors" "runtime" + "sync" "testing" + "time" "github.com/shirou/gopsutil/v3/load" "github.com/stretchr/testify/assert" @@ -33,7 +35,6 @@ var skip = func(t *testing.T, why string) { } func TestScrape(t *testing.T) { - skip(t, "Flaky test. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10030") type testCase struct { name string bootTimeFunc func(context.Context) (uint64, error) @@ -70,60 +71,95 @@ func TestScrape(t *testing.T) { }, } results := make(map[string]pmetric.MetricSlice) - - for _, test := range testCases { - t.Run(test.name, func(t *testing.T) { - scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config) - if test.loadFunc != nil { - scraper.load = test.loadFunc - } - if test.bootTimeFunc != nil { - scraper.bootTime = test.bootTimeFunc - } - - err := scraper.start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err, "Failed to initialize load scraper: %v", err) - defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }() - - md, err := scraper.scrape(context.Background()) - if test.expectedErr != "" { - assert.EqualError(t, err, test.expectedErr) - - isPartial := scrapererror.IsPartialScrapeError(err) - assert.True(t, isPartial) - if isPartial { - var scraperErr scrapererror.PartialScrapeError - require.ErrorAs(t, err, &scraperErr) - assert.Equal(t, metricsLen, scraperErr.Failed) - } - - return + // triggers scraping to start + startChan := make(chan bool) + // used to lock results map in order to avoid concurrent map writes + resultsMapLock := sync.Mutex{} + + testFn := func(t *testing.T, test testCase){ + // wait for messurement to start + <-startChan + + scraper := newLoadScraper(context.Background(), receivertest.NewNopCreateSettings(), test.config) + if test.loadFunc != nil { + scraper.load = test.loadFunc + } + if test.bootTimeFunc != nil { + scraper.bootTime = test.bootTimeFunc + } + + err := scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err, "Failed to initialize load scraper: %v", err) + defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }() + if runtime.GOOS == "windows" { + // let it sample + <-time.After(3*time.Second) + } + + md, err := scraper.scrape(context.Background()) + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + + isPartial := scrapererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + var scraperErr scrapererror.PartialScrapeError + require.ErrorAs(t, err, &scraperErr) + assert.Equal(t, metricsLen, scraperErr.Failed) } - require.NoError(t, err, "Failed to scrape metrics: %v", err) - if test.bootTimeFunc != nil { - actualBootTime, err := scraper.bootTime(context.Background()) - assert.Nil(t, err) - assert.Equal(t, uint64(bootTime), actualBootTime) - } - // expect 3 metrics - assert.Equal(t, 3, md.MetricCount()) + return + } + require.NoError(t, err, "Failed to scrape metrics: %v", err) + + if test.bootTimeFunc != nil { + actualBootTime, err := scraper.bootTime(context.Background()) + assert.Nil(t, err) + assert.Equal(t, uint64(bootTime), actualBootTime) + } + // expect 3 metrics + assert.Equal(t, 3, md.MetricCount()) + + metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + // expect a single datapoint for 1m, 5m & 15m load metrics + assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m") + assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m") + assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m") + + internal.AssertSameTimeStampForAllMetrics(t, metrics) + + // save metrics for additional tests if flag is enabled + if test.saveMetrics { + resultsMapLock.Lock() + results[test.name] = metrics + resultsMapLock.Unlock() + } + } - metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() - // expect a single datapoint for 1m, 5m & 15m load metrics - assertMetricHasSingleDatapoint(t, metrics.At(0), "system.cpu.load_average.15m") - assertMetricHasSingleDatapoint(t, metrics.At(1), "system.cpu.load_average.1m") - assertMetricHasSingleDatapoint(t, metrics.At(2), "system.cpu.load_average.5m") + // used to wait for each test to start to make sure they are all sampling at the same time + var startWg sync.WaitGroup + startWg.Add(len(testCases)) - internal.AssertSameTimeStampForAllMetrics(t, metrics) + // used to wait for each test to finish + var waitWg sync.WaitGroup + waitWg.Add(len(testCases)) - // save metrics for additional tests if flag is enabled - if test.saveMetrics { - results[test.name] = metrics - } - }) + for _, test := range testCases { + samplingFrequency = 500*time.Millisecond + go func(t *testing.T, test testCase){ + startWg.Done() + testFn(t, test) + waitWg.Done() + }(t, test) } + // wait for test goroutines to start + startWg.Wait() + // trigger tests + close(startChan) + // wait for tests to finish + waitWg.Wait() + // Additional test for average per CPU numCPU := runtime.NumCPU() for i := 0; i < results[testStandard].Len(); i++ { @@ -139,6 +175,10 @@ func assertMetricHasSingleDatapoint(t *testing.T, metric pmetric.Metric, expecte func assertCompareAveragePerCPU(t *testing.T, average pmetric.Metric, standard pmetric.Metric, numCPU int) { valAverage := average.Gauge().DataPoints().At(0).DoubleValue() valStandard := standard.Gauge().DataPoints().At(0).DoubleValue() + if valAverage == 0 && valStandard == 0 { + // nothing to compare, queue is empty + return + } if numCPU == 1 { // For hardware with only 1 cpu, results must be very close assert.InDelta(t, valAverage, valStandard, 0.1) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go index c0556569c3e6..865be23031df 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go @@ -93,6 +93,7 @@ func (sw *sampler) startSamplingTicker() { ticker := time.NewTicker(samplingFrequency) defer ticker.Stop() + sw.sampleLoad() for { select { case <-ticker.C: From f2cb1abf16fa5397ad1c499e697a4a911d137904 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 3 Nov 2023 12:07:23 +0100 Subject: [PATCH 2/4] Sampling frequency exposed to tests using private function --- .../scraper/loadscraper/load_scraper_others.go | 4 ++++ .../internal/scraper/loadscraper/load_scraper_test.go | 11 ++++++----- .../scraper/loadscraper/load_scraper_windows.go | 4 ++++ 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go index 8f48ffe9028e..fb584fe88839 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go @@ -8,11 +8,15 @@ package loadscraper // import "github.com/open-telemetry/opentelemetry-collector import ( "context" + "time" "github.com/shirou/gopsutil/v3/load" "go.uber.org/zap" ) +// no sampling performed on non windows enviroments, nothing to do +func setSamplingFrequency(_ time.Duration) {} + // unix based systems sample & compute load averages in the kernel, so nothing to do here func startSampling(_ context.Context, _ *zap.Logger) error { return nil diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go index 3c033328b68d..2bbc03459902 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go @@ -75,8 +75,8 @@ func TestScrape(t *testing.T) { startChan := make(chan bool) // used to lock results map in order to avoid concurrent map writes resultsMapLock := sync.Mutex{} - - testFn := func(t *testing.T, test testCase){ + + testFn := func(t *testing.T, test testCase) { // wait for messurement to start <-startChan @@ -93,7 +93,7 @@ func TestScrape(t *testing.T) { defer func() { assert.NoError(t, scraper.shutdown(context.Background())) }() if runtime.GOOS == "windows" { // let it sample - <-time.After(3*time.Second) + <-time.After(3 * time.Second) } md, err := scraper.scrape(context.Background()) @@ -144,9 +144,10 @@ func TestScrape(t *testing.T) { var waitWg sync.WaitGroup waitWg.Add(len(testCases)) + setSamplingFrequency(500 * time.Millisecond) + for _, test := range testCases { - samplingFrequency = 500*time.Millisecond - go func(t *testing.T, test testCase){ + go func(t *testing.T, test testCase) { startWg.Done() testFn(t, test) waitWg.Done() diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go index 865be23031df..1f0f6f11561a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_windows.go @@ -51,6 +51,10 @@ type sampler struct { lock sync.RWMutex } +func setSamplingFrequency(freq time.Duration) { + samplingFrequency = freq +} + func startSampling(_ context.Context, logger *zap.Logger) error { startupLock.Lock() defer startupLock.Unlock() From 8550cda343a11a069445d8f1892e5bcac23e2686 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 3 Nov 2023 13:08:24 +0100 Subject: [PATCH 3/4] Fixed typos in comments --- .../internal/scraper/loadscraper/load_scraper_others.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go index fb584fe88839..278e15f752c8 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_others.go @@ -14,7 +14,7 @@ import ( "go.uber.org/zap" ) -// no sampling performed on non windows enviroments, nothing to do +// no sampling performed on non windows environments, nothing to do func setSamplingFrequency(_ time.Duration) {} // unix based systems sample & compute load averages in the kernel, so nothing to do here From ec5ce9d9e37faff2606fa76cbc1fda1d9127114d Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 3 Nov 2023 13:24:34 +0100 Subject: [PATCH 4/4] Removed unused code --- .../internal/scraper/loadscraper/load_scraper_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go index 2bbc03459902..8dac83af8ee4 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go @@ -29,11 +29,6 @@ const ( bootTime = 100 ) -// Skips test without applying unused rule -var skip = func(t *testing.T, why string) { - t.Skip(why) -} - func TestScrape(t *testing.T) { type testCase struct { name string