From f34d616db4813a6c4f78623f9f83b49d5d37f81d Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Thu, 30 Jun 2022 12:12:40 -0300 Subject: [PATCH 1/2] feat(test): add mock clock API --- .travis.yml | 2 +- flow_test.go | 45 +++++++++++++++++++++++++++++++++ go.mod | 4 ++- sweeper.go | 70 ++++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 109 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index a156d3e..263a72b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ os: language: go go: - - 1.13.x + - 1.15.x env: global: diff --git a/flow_test.go b/flow_test.go index fb25828..06cdfaa 100644 --- a/flow_test.go +++ b/flow_test.go @@ -6,6 +6,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/benbjohnson/clock" ) func TestBasic(t *testing.T) { @@ -169,6 +171,49 @@ func TestUnregister(t *testing.T) { wg.Wait() } +func TestMockClock(t *testing.T) { + mockClock := useMockClock() + defer RestoreClock() + + m := new(Meter) + for i := 0; i < 300; i++ { + m.Mark(1000) + mockClock.Add(40 * time.Millisecond) + } + actual := m.Snapshot() + checkApproxEq(t, actual.Rate, 25000, 100) + + for i := 0; i < 200; i++ { + m.Mark(200) + mockClock.Add(40 * time.Millisecond) + } + + // Adjusts + actual = m.Snapshot() + checkApproxEq(t, actual.Rate, 5000, 20) + + // Let it settle. + mockClock.Add(2 * time.Second) + + // get the right total + actual = m.Snapshot() + if actual.Total != 340000 { + t.Errorf("expected total %d, got %d", 340000, actual.Total) + } +} + +func checkApproxEq(t *testing.T, rate float64, expected, err int) { + if !approxEq(rate, float64(expected), float64(err)) { + t.Errorf("expected rate %d (±%d), got %d", expected, err, int(rate)) + } +} + func approxEq(a, b, err float64) bool { return math.Abs(a-b) < err } + +func useMockClock() *clock.Mock { + mockClock := clock.NewMock() + SetClock(mockClock) + return mockClock +} diff --git a/go.mod b/go.mod index e9b50d4..41ee9a2 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module github.com/libp2p/go-flow-metrics -go 1.12 +go 1.15 + +require github.com/benbjohnson/clock v1.3.0 diff --git a/sweeper.go b/sweeper.go index ceed13f..560c88d 100644 --- a/sweeper.go +++ b/sweeper.go @@ -1,10 +1,13 @@ package flow import ( + "context" "math" "sync" "sync/atomic" "time" + + "github.com/benbjohnson/clock" ) // IdleRate the rate at which we declare a meter idle (and stop tracking it @@ -20,26 +23,71 @@ var alpha = 1 - math.Exp(-1.0) // The global sweeper. var globalSweeper sweeper +// SetClock puts a global clock in place for testing purposes only. +// It should be called only once and before using any of the Meter or +// MeterRegistry APIs of this package and should be restored with +// RestoreClock after the test. +func SetClock(clock clock.Clock) { + globalSweeper.stop() + globalSweeper = sweeper{} + globalSweeper.clock = clock +} + +func RestoreClock() { + globalSweeper.stop() + globalSweeper = sweeper{} +} + type sweeper struct { sweepOnce sync.Once + ctx context.Context + cancel context.CancelFunc + stopped chan struct{} snapshotMu sync.RWMutex meters []*Meter activeMeters int - lastUpdateTime time.Time + lastUpdateTime time.Time + // Consumed each time a Meter has been updated (`Mark`ed). registerChannel chan *Meter + + clock clock.Clock } func (sw *sweeper) start() { sw.registerChannel = make(chan *Meter, 16) - go sw.run() + if sw.clock == nil { + sw.clock = clock.New() + } + var ctx context.Context + ctx, sw.cancel = context.WithCancel(context.Background()) + sw.stopped = make(chan struct{}) + go sw.run(ctx) +} + +func (sw *sweeper) stop() { + if sw.cancel != nil { + sw.cancel() + <-sw.stopped + } } -func (sw *sweeper) run() { - for m := range sw.registerChannel { - sw.register(m) - sw.runActive() +func (sw *sweeper) run(ctx context.Context) { + defer close(sw.stopped) + for { + select { + case <-ctx.Done(): + return + default: + } + select { + case m := <-sw.registerChannel: + sw.register(m) + sw.runActive(ctx) + case <-ctx.Done(): + return + } } } @@ -52,11 +100,11 @@ func (sw *sweeper) register(m *Meter) { sw.meters = append(sw.meters, m) } -func (sw *sweeper) runActive() { - ticker := time.NewTicker(time.Second) +func (sw *sweeper) runActive(ctx context.Context) { + ticker := sw.clock.Ticker(time.Second) defer ticker.Stop() - sw.lastUpdateTime = time.Now() + sw.lastUpdateTime = sw.clock.Now() for len(sw.meters) > 0 { // Scale back allocation. if len(sw.meters)*2 < cap(sw.meters) { @@ -70,6 +118,8 @@ func (sw *sweeper) runActive() { sw.update() case m := <-sw.registerChannel: sw.register(m) + case <-ctx.Done(): + return } } sw.meters = nil @@ -80,7 +130,7 @@ func (sw *sweeper) update() { sw.snapshotMu.Lock() defer sw.snapshotMu.Unlock() - now := time.Now() + now := sw.clock.Now() tdiff := now.Sub(sw.lastUpdateTime) if tdiff <= 0 { return From 3eb9d2a5d759c555d98098d36a6b815d3e31aaff Mon Sep 17 00:00:00 2001 From: Lucas Molas Date: Fri, 8 Jul 2022 14:54:36 -0300 Subject: [PATCH 2/2] Update sweeper.go Co-authored-by: Steven Allen --- sweeper.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sweeper.go b/sweeper.go index 560c88d..9f60539 100644 --- a/sweeper.go +++ b/sweeper.go @@ -75,12 +75,7 @@ func (sw *sweeper) stop() { func (sw *sweeper) run(ctx context.Context) { defer close(sw.stopped) - for { - select { - case <-ctx.Done(): - return - default: - } + for ctx.Err() == nil { select { case m := <-sw.registerChannel: sw.register(m)