-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(test): add mock clock API #18
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ os: | |
language: go | ||
|
||
go: | ||
- 1.13.x | ||
- 1.15.x | ||
|
||
env: | ||
global: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
module github.com/libp2p/go-flow-metrics | ||
|
||
go 1.12 | ||
go 1.15 | ||
|
||
require github.com/benbjohnson/clock v1.3.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,13 @@ | ||
package flow | ||
|
||
import ( | ||
"context" | ||
"math" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/benbjohnson/clock" | ||
) | ||
|
||
// IdleRate the rate at which we declare a meter idle (and stop tracking it | ||
|
@@ -20,26 +23,71 @@ var alpha = 1 - math.Exp(-1.0) | |
// The global sweeper. | ||
var globalSweeper sweeper | ||
|
||
// SetClock puts a global clock in place for testing purposes only. | ||
// It should be called only once and before using any of the Meter or | ||
// MeterRegistry APIs of this package and should be restored with | ||
// RestoreClock after the test. | ||
func SetClock(clock clock.Clock) { | ||
globalSweeper.stop() | ||
globalSweeper = sweeper{} | ||
globalSweeper.clock = clock | ||
} | ||
|
||
func RestoreClock() { | ||
globalSweeper.stop() | ||
globalSweeper = sweeper{} | ||
} | ||
Comment on lines
+26
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are testing functions and don't need to be exported. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea would be to use this in libp2p/go-libp2p-core#267 so I think we still want to export this, or something similar to it. |
||
|
||
type sweeper struct { | ||
sweepOnce sync.Once | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
stopped chan struct{} | ||
|
||
snapshotMu sync.RWMutex | ||
meters []*Meter | ||
activeMeters int | ||
|
||
lastUpdateTime time.Time | ||
lastUpdateTime time.Time | ||
// Consumed each time a Meter has been updated (`Mark`ed). | ||
registerChannel chan *Meter | ||
|
||
clock clock.Clock | ||
} | ||
|
||
func (sw *sweeper) start() { | ||
sw.registerChannel = make(chan *Meter, 16) | ||
go sw.run() | ||
if sw.clock == nil { | ||
sw.clock = clock.New() | ||
} | ||
var ctx context.Context | ||
ctx, sw.cancel = context.WithCancel(context.Background()) | ||
sw.stopped = make(chan struct{}) | ||
go sw.run(ctx) | ||
} | ||
|
||
func (sw *sweeper) stop() { | ||
if sw.cancel != nil { | ||
sw.cancel() | ||
<-sw.stopped | ||
} | ||
} | ||
|
||
func (sw *sweeper) run() { | ||
for m := range sw.registerChannel { | ||
sw.register(m) | ||
sw.runActive() | ||
func (sw *sweeper) run(ctx context.Context) { | ||
defer close(sw.stopped) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
default: | ||
} | ||
schomatis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
select { | ||
case m := <-sw.registerChannel: | ||
sw.register(m) | ||
sw.runActive(ctx) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} | ||
|
||
|
@@ -52,11 +100,11 @@ func (sw *sweeper) register(m *Meter) { | |
sw.meters = append(sw.meters, m) | ||
} | ||
|
||
func (sw *sweeper) runActive() { | ||
ticker := time.NewTicker(time.Second) | ||
func (sw *sweeper) runActive(ctx context.Context) { | ||
ticker := sw.clock.Ticker(time.Second) | ||
defer ticker.Stop() | ||
|
||
sw.lastUpdateTime = time.Now() | ||
sw.lastUpdateTime = sw.clock.Now() | ||
for len(sw.meters) > 0 { | ||
// Scale back allocation. | ||
if len(sw.meters)*2 < cap(sw.meters) { | ||
|
@@ -70,6 +118,8 @@ func (sw *sweeper) runActive() { | |
sw.update() | ||
case m := <-sw.registerChannel: | ||
sw.register(m) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
sw.meters = nil | ||
|
@@ -80,7 +130,7 @@ func (sw *sweeper) update() { | |
sw.snapshotMu.Lock() | ||
defer sw.snapshotMu.Unlock() | ||
|
||
now := time.Now() | ||
now := sw.clock.Now() | ||
tdiff := now.Sub(sw.lastUpdateTime) | ||
if tdiff <= 0 { | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've said this before, but there's no need for approximations. We're using a mock clock, so timing is precise, and so should be the measurement.