Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): add mock clock API #18

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ os:
language: go

go:
- 1.13.x
- 1.15.x

env:
global:
Expand Down
45 changes: 45 additions & 0 deletions flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync/atomic"
"testing"
"time"

"github.com/benbjohnson/clock"
)

func TestBasic(t *testing.T) {
Expand Down Expand Up @@ -169,6 +171,49 @@ func TestUnregister(t *testing.T) {
wg.Wait()
}

func TestMockClock(t *testing.T) {
mockClock := useMockClock()
defer RestoreClock()

m := new(Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
mockClock.Add(40 * time.Millisecond)
}
actual := m.Snapshot()
checkApproxEq(t, actual.Rate, 25000, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I've said this before, but there's no need for approximations. We're using a mock clock, so timing is precise, and so should be the measurement.


for i := 0; i < 200; i++ {
m.Mark(200)
mockClock.Add(40 * time.Millisecond)
}

// Adjusts
actual = m.Snapshot()
checkApproxEq(t, actual.Rate, 5000, 20)

// Let it settle.
mockClock.Add(2 * time.Second)

// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}

func checkApproxEq(t *testing.T, rate float64, expected, err int) {
schomatis marked this conversation as resolved.
Show resolved Hide resolved
if !approxEq(rate, float64(expected), float64(err)) {
t.Errorf("expected rate %d (±%d), got %d", expected, err, int(rate))
}
}

func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}

func useMockClock() *clock.Mock {
mockClock := clock.NewMock()
SetClock(mockClock)
return mockClock
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
module github.com/libp2p/go-flow-metrics

go 1.12
go 1.15

require github.com/benbjohnson/clock v1.3.0
70 changes: 60 additions & 10 deletions sweeper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package flow

import (
"context"
"math"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
)

// IdleRate the rate at which we declare a meter idle (and stop tracking it
Expand All @@ -20,26 +23,71 @@ var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper

// SetClock puts a global clock in place for testing purposes only.
// It should be called only once and before using any of the Meter or
// MeterRegistry APIs of this package and should be restored with
// RestoreClock after the test.
func SetClock(clock clock.Clock) {
globalSweeper.stop()
globalSweeper = sweeper{}
globalSweeper.clock = clock
}

func RestoreClock() {
globalSweeper.stop()
globalSweeper = sweeper{}
}
Comment on lines +26 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are testing functions and don't need to be exported.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea would be to use this in libp2p/go-libp2p-core#267 so I think we still want to export this, or something similar to it.


type sweeper struct {
sweepOnce sync.Once
ctx context.Context
cancel context.CancelFunc
stopped chan struct{}

snapshotMu sync.RWMutex
meters []*Meter
activeMeters int

lastUpdateTime time.Time
lastUpdateTime time.Time
// Consumed each time a Meter has been updated (`Mark`ed).
registerChannel chan *Meter

clock clock.Clock
}

func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
if sw.clock == nil {
sw.clock = clock.New()
}
var ctx context.Context
ctx, sw.cancel = context.WithCancel(context.Background())
sw.stopped = make(chan struct{})
go sw.run(ctx)
}

func (sw *sweeper) stop() {
if sw.cancel != nil {
sw.cancel()
<-sw.stopped
}
}

func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
func (sw *sweeper) run(ctx context.Context) {
defer close(sw.stopped)
for {
select {
case <-ctx.Done():
return
default:
}
schomatis marked this conversation as resolved.
Show resolved Hide resolved
select {
case m := <-sw.registerChannel:
sw.register(m)
sw.runActive(ctx)
case <-ctx.Done():
return
}
}
}

Expand All @@ -52,11 +100,11 @@ func (sw *sweeper) register(m *Meter) {
sw.meters = append(sw.meters, m)
}

func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
func (sw *sweeper) runActive(ctx context.Context) {
ticker := sw.clock.Ticker(time.Second)
defer ticker.Stop()

sw.lastUpdateTime = time.Now()
sw.lastUpdateTime = sw.clock.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
Expand All @@ -70,6 +118,8 @@ func (sw *sweeper) runActive() {
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
case <-ctx.Done():
return
}
}
sw.meters = nil
Expand All @@ -80,7 +130,7 @@ func (sw *sweeper) update() {
sw.snapshotMu.Lock()
defer sw.snapshotMu.Unlock()

now := time.Now()
now := sw.clock.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
Expand Down