Skip to content

Commit

Permalink
feat(test): add mock clock API
Browse files Browse the repository at this point in the history
  • Loading branch information
schomatis committed Jul 5, 2022
1 parent e6db3c8 commit 9725a73
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ os:
language: go

go:
- 1.13.x
- 1.15.x

env:
global:
Expand Down
45 changes: 45 additions & 0 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync/atomic"
"testing"
"time"

"github.com/benbjohnson/clock"
)

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -169,6 +171,49 @@ func TestUnregister(t *testing.T) {
wg.Wait()
}

func TestMockClock(t *testing.T) {
mockClock := useMockClock()
defer RestoreClock()

m := new(Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
mockClock.Add(40 * time.Millisecond)
}
actual := m.Snapshot()
checkApproxEq(t, actual.Rate, 25000, 100)

for i := 0; i < 200; i++ {
m.Mark(200)
mockClock.Add(40 * time.Millisecond)
}

// Adjusts
actual = m.Snapshot()
checkApproxEq(t, actual.Rate, 5000, 20)

// Let it settle.
mockClock.Add(2 * time.Second)

// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}

func checkApproxEq(t *testing.T, rate float64, expected, err int) {
if !approxEq(rate, float64(expected), float64(err)) {
t.Errorf("expected rate %d (±%d), got %d", expected, err, int(rate))
}
}

func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}

func useMockClock() *clock.Mock {
mockClock := clock.NewMock()
SetClock(mockClock)
return mockClock
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/libp2p/go-flow-metrics

go 1.12
go 1.15

require github.com/benbjohnson/clock v1.3.0
73 changes: 63 additions & 10 deletions sweeper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package flow

import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
Expand All @@ -20,26 +23,74 @@ var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper

// SetClock puts a global clock in place for testing purposes only.
// It should be called only once and before using any of the Meter or
// MeterRegistry APIs of this package and should be restored with
// RestoreClock after the test.
func SetClock(clock clock.Clock) {
if globalSweeper.cancel != nil {
globalSweeper.cancel()

}
globalSweeper = sweeper{}
globalSweeper.clock = clock
}

func RestoreClock() {
globalSweeper.stop()
globalSweeper = sweeper{}
}

type sweeper struct {
sweepOnce sync.Once
ctx context.Context
cancel context.CancelFunc
stopped chan struct{}

snapshotMu sync.RWMutex
meters []*Meter
activeMeters int

lastUpdateTime time.Time
lastUpdateTime time.Time
// Consumed each time a Meter has been updated (`Mark`ed).
registerChannel chan *Meter

clock clock.Clock
}

func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
if sw.clock == nil {
sw.clock = clock.New()
}
var ctx context.Context
ctx, sw.cancel = context.WithCancel(context.Background())
sw.stopped = make(chan struct{})
go sw.run(ctx)
}

func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
func (sw *sweeper) stop() {
if sw.cancel != nil {
sw.cancel()
<-sw.stopped
}
}

func (sw *sweeper) run(ctx context.Context) {
defer close(sw.stopped)
for {
select {
case <-ctx.Done():
return
default:
}
select {
case m := <-sw.registerChannel:
sw.register(m)
sw.runActive(ctx)
case <-ctx.Done():
return
}
}
}

Expand All @@ -52,11 +103,11 @@ func (sw *sweeper) register(m *Meter) {
sw.meters = append(sw.meters, m)
}

func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
func (sw *sweeper) runActive(ctx context.Context) {
ticker := sw.clock.Ticker(time.Second)
defer ticker.Stop()

sw.lastUpdateTime = time.Now()
sw.lastUpdateTime = sw.clock.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
Expand All @@ -70,6 +121,8 @@ func (sw *sweeper) runActive() {
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
case <-ctx.Done():
return
}
}
sw.meters = nil
Expand All @@ -80,7 +133,7 @@ func (sw *sweeper) update() {
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()

now := time.Now()
now := sw.clock.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
Expand Down

0 comments on commit 9725a73

Please sign in to comment.