Skip to content

Commit

Permalink
Fix: simplify TestTenantPusher (#979)
Browse files Browse the repository at this point in the history
CI is failing consistently with this test. The best I can tell, it might be because some data is shared between tests, and the test might be time-sensitive.
  • Loading branch information
mem authored Nov 6, 2024
1 parent 346a3a0 commit ae46ff3
Showing 1 changed file with 90 additions and 143 deletions.
233 changes: 90 additions & 143 deletions internal/telemetry/region_pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,27 +203,10 @@ func getTestDataset(idx int) testData {
}
}

var m = RegionMetrics{
pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}),
pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}),
pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}),
pushRequestsError: prom.NewCounter(prom.CounterOpts{}),
addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}),
}

func TestTenantPusher(t *testing.T) {
var (
// This time span is passed to the tenant constructor, but it's ignored
// because we are overriding the ticker with one that we can control
timeSpan = 1 * time.Second

logger = zerolog.Nop()

// Because the push happens in a separate goroutine, we use a waitgroup
// to wait for the mock push client to finish before verifying the data
wg = &sync.WaitGroup{}
testClient = &testTelemetryClient{wg: wg}
t.Parallel()

var (
testPushRespOK = testPushResp{
tr: &sm.PushTelemetryResponse{
Status: &sm.Status{Code: sm.StatusCode_OK},
Expand All @@ -242,231 +225,147 @@ func TestTenantPusher(t *testing.T) {
}
}

// tickAndWait will tick the ticker once, so the push
// process starts, and wait for the push client to finish
tickAndWait := func(ticker *testTicker) {
wg.Add(1)
defer wg.Wait()
ticker.c <- time.Now()
}

// waitForShutdown will cancel the context passed to the
// tenant pusher and wait for it to finish its work
shutdownAndWait := func(cancel context.CancelFunc) {
defer wg.Wait()
// The pusher will send the current accumulated
// data before exiting
wg.Add(1)

cancel()
}

resetTestClient := func() {
testClient = &testTelemetryClient{wg: wg}
}

t.Run("should send telemetry data once", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

t.Cleanup(func() {
shutdownAndWait(cancel)
resetTestClient()
})
t.Parallel()

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker
td, tc, pusher, _ := setupTest(context.Background())

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, m, opt)
t.Cleanup(td.shutdownAndWait)

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Set mock response for client
testClient.rr = testPushRespOK
tc.rr = testPushRespOK

// Tick
tickAndWait(ticker)
td.tickAndWait()

// Verify sent data
testClient.assert(t, getTestDataset(0).message)
tc.assert(t, getTestDataset(0).message)
})

t.Run("should retry sending data once", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Parallel()

t.Cleanup(func() {
shutdownAndWait(cancel)
resetTestClient()
})
td, tc, pusher, _ := setupTest(context.Background())

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, m, opt)
t.Cleanup(td.shutdownAndWait)

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Set mock response for client
testClient.rr = testPushRespKO
tc.rr = testPushRespKO

// Tick twice, one for initial push and one for retry
tickAndWait(ticker)
tickAndWait(ticker)
td.tickAndWait()
td.tickAndWait()

// Verify sent data
testClient.assert(t, getTestDataset(0).message, getTestDataset(0).message)
tc.assert(t, getTestDataset(0).message, getTestDataset(0).message)
})

t.Run("should retry and send more data", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Parallel()

t.Cleanup(func() {
shutdownAndWait(cancel)
resetTestClient()
})

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker
td, tc, pusher, _ := setupTest(context.Background())

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, m, opt)
t.Cleanup(td.shutdownAndWait)

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Set KO mock response for client and tick once
testClient.rr = testPushRespKO
tickAndWait(ticker)
tc.rr = testPushRespKO
td.tickAndWait()

// Send more executions
addExecutions(pusher, getTestDataset(1).executions)

// Set OK mock response for client and tick again
testClient.rr = testPushRespOK
tickAndWait(ticker)
tc.rr = testPushRespOK
td.tickAndWait()

// Verify sent data
testClient.assert(t,
tc.assert(t,
getTestDataset(0).message, // First tick message
getTestDataset(1).message, // First message retry with newly accumulated data
)
})

t.Run("should push on context done", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Parallel()

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, m, opt)
td, tc, pusher, _ := setupTest(context.Background())

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Set mock response for client
testClient.rr = testPushRespKO
tc.rr = testPushRespKO

// Tick once, which should make the push fail
tickAndWait(ticker)
td.tickAndWait()

// Verify sent data
testClient.assert(t, getTestDataset(0).message)
tc.assert(t, getTestDataset(0).message)

// Send more executions
addExecutions(pusher, getTestDataset(1).executions)

// Cancel the context
// Which should make the pusher send
// the currently accumulated data
shutdownAndWait(cancel)
td.shutdownAndWait()

// Verify sent data on exit
testClient.assert(t, getTestDataset(1).message)
tc.assert(t, getTestDataset(1).message)
})

t.Run("should report push error", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

t.Cleanup(func() {
shutdownAndWait(cancel)
resetTestClient()
})
t.Parallel()

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker
td, tc, pusher, metrics := setupTest(context.Background())

metrics := RegionMetrics{
pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}),
pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}),
pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}),
pushRequestsError: prom.NewCounter(prom.CounterOpts{}),
addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}),
}
t.Cleanup(td.shutdownAndWait)

// Setup test client to return err on push
testClient.rr.err = errors.New("test error")

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, metrics, opt)
tc.rr.err = errors.New("test error")

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Tick once, which should make the push fail
tickAndWait(ticker)
td.tickAndWait()

// Verify sent data
testClient.assert(t, getTestDataset(0).message)
tc.assert(t, getTestDataset(0).message)

// Verify error metric
errsMetric := getMetricFromCollector(t, metrics.pushRequestsError)
require.Equal(t, 1, int(*errsMetric.Counter.Value))
})

t.Run("should report push error on unexpected status", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Parallel()

t.Cleanup(func() {
shutdownAndWait(cancel)
resetTestClient()
})
td, tc, pusher, metrics := setupTest(context.Background())

ticker := &testTicker{
c: make(chan time.Time),
}
var opt withTicker = ticker

metrics := RegionMetrics{
pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}),
pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}),
pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}),
pushRequestsError: prom.NewCounter(prom.CounterOpts{}),
addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}),
}

pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, metrics, opt)
t.Cleanup(td.shutdownAndWait)

// Add some executions
addExecutions(pusher, getTestDataset(0).executions)

// Set mock response for client
// with unexpected status code
testClient.rr = testPushRespKO
tc.rr = testPushRespKO

// Tick once, which should make the push fail
tickAndWait(ticker)
td.tickAndWait()

// Verify sent data
testClient.assert(t, getTestDataset(0).message)
tc.assert(t, getTestDataset(0).message)

// Verify error metric
errsMetric := getMetricFromCollector(t, metrics.pushRequestsError)
Expand All @@ -491,6 +390,54 @@ type testPushResp struct {
err error
}

func setupTest(ctx context.Context) (*testDriver, *testTelemetryClient, *RegionPusher, RegionMetrics) {
var (
wg = &sync.WaitGroup{}
testCtx, cancel = context.WithCancel(ctx)
ticker = &testTicker{c: make(chan time.Time)}
td = testDriver{wg: wg, cancel: cancel, ticker: ticker}
tc = &testTelemetryClient{wg: wg}
logger = zerolog.Nop()
metrics = RegionMetrics{
pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}),
pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}),
pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}),
pushRequestsError: prom.NewCounter(prom.CounterOpts{}),
addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}),
}
opt withTicker = ticker
)

pusher := NewRegionPusher(testCtx, 1*time.Second, tc, logger, instance, regionID, metrics, opt)

return &td, tc, pusher, metrics
}

type testDriver struct {
wg *sync.WaitGroup
cancel context.CancelFunc
ticker *testTicker
}

// tickAndWait will tick the ticker once, so the push
// process starts, and wait for the push client to finish
func (td *testDriver) tickAndWait() {
td.wg.Add(1)
defer td.wg.Wait()
td.ticker.c <- time.Now()
}

// waitForShutdown will cancel the context passed to the
// tenant pusher and wait for it to finish its work
func (td *testDriver) shutdownAndWait() {
defer td.wg.Wait()
// The pusher will send the current accumulated
// data before exiting
td.wg.Add(1)

td.cancel()
}

type testTelemetryClient struct {
mu sync.Mutex
wg *sync.WaitGroup
Expand Down

0 comments on commit ae46ff3

Please sign in to comment.