Skip to content

Commit

Permalink
some metrics for monitoring futures
Browse files Browse the repository at this point in the history
  • Loading branch information
stefantalpalaru committed Mar 13, 2020
1 parent 7ed9f14 commit 1c1e0e3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
3 changes: 2 additions & 1 deletion chronos.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ skipDirs = @["tests"]
### Dependencies

requires "nim > 0.19.4",
"bearssl"
"bearssl",
"metrics"

task test, "Run all tests":
var commands = [
Expand Down
8 changes: 7 additions & 1 deletion chronos/asyncfutures2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type
Pending, Finished, Cancelled, Failed

FutureBase* = ref object of RootObj ## Untyped future.
location: array[2, ptr SrcLoc]
location*: array[2, ptr SrcLoc]
callbacks: Deque[AsyncCallback]
cancelcb*: CallbackFunc
child*: FutureBase
Expand Down Expand Up @@ -63,13 +63,16 @@ type
var currentID* {.threadvar.}: int
currentID = 0

declareCounter chronos_new_future, "new Future being created"

template setupFutureBase(loc: ptr SrcLoc) =
new(result)
result.state = FutureState.Pending
result.stackTrace = getStackTrace()
result.id = currentID
result.location[LocCreateIndex] = loc
currentID.inc()
chronos_new_future.inc()

## ZAH: As far as I undestand `fromProc` is just a debugging helper.
## It would be more efficient if it's represented as a simple statically
Expand Down Expand Up @@ -284,6 +287,9 @@ proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) =
## Adds the callbacks proc to be called when the future completes.
##
## If future has already completed then ``cb`` will be called immediately.
{.gcsafe.}:
if future.location[0] != nil:
callbacksByFuture.inc($future.location[0])
doAssert(not isNil(cb))
if future.finished():
callSoon(cb, udata)
Expand Down
46 changes: 44 additions & 2 deletions chronos/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ include "system/inclrtl"

import os, tables, strutils, heapqueue, lists, options, nativesockets, net,
deques
import timer
import metrics
import ./timer, ./srcloc

export Port, SocketFlag
export timer

var callbacksByFuture* = initCountTable[string]()

#{.injectStmt: newGcInvariant().}

## AsyncDispatch
Expand Down Expand Up @@ -253,8 +256,12 @@ template processTimersGetTimeout(loop, timeout: untyped) =
if len(loop.callbacks) != 0:
timeout = 0

declareCounter chronos_loop_timers, "loop timers"
declareGauge chronos_loop_timers_queue, "loop timers queue"

template processTimers(loop: untyped) =
var curTime = Moment.now()
chronos_loop_timers_queue.set(loop.timers.len.int64)
while loop.timers.len > 0:
if loop.timers[0].deleted:
discard loop.timers.pop()
Expand All @@ -263,6 +270,9 @@ template processTimers(loop: untyped) =
if curTime < loop.timers[0].finishAt:
break
loop.callbacks.addLast(loop.timers.pop().function)
chronos_loop_timers.inc()

declareCounter chronos_processed_callbacks, "total number of processed callbacks"

template processCallbacks(loop: untyped) =
var count = len(loop.callbacks)
Expand All @@ -276,6 +286,7 @@ template processCallbacks(loop: untyped) =
let callable = loop.callbacks.popFirst()
if not isNil(callable.function):
callable.function(callable.udata)
chronos_processed_callbacks.inc(count.int64)

when defined(windows) or defined(nimdoc):
type
Expand Down Expand Up @@ -685,12 +696,17 @@ elif unixPlatform:
let loop = getGlobalDispatcher()
loop.selector.unregister(sigfd)

declareCounter chronos_poll_ticks, "Chronos event loop ticks"
declareCounter chronos_poll_events, "Chronos poll events", ["event"]
declareCounter chronos_future_callbacks, "Future callbacks", ["location"]
proc poll*() =
## Perform single asynchronous step.
let loop = getGlobalDispatcher()
var curTime = Moment.now()
var curTimeout = 0

chronos_poll_ticks.inc()

when ioselSupportedPlatform:
let customSet = {Event.Timer, Event.Signal, Event.Process,
Event.Vnode}
Expand All @@ -703,6 +719,8 @@ elif unixPlatform:
for i in 0..<count:
let fd = loop.keys[i].fd
let events = loop.keys[i].events
for event in events:
chronos_poll_events.inc(labelValues = [$event])

withData(loop.selector, fd, adata) do:
if Event.Read in events or events == {Event.Error}:
Expand All @@ -729,6 +747,28 @@ elif unixPlatform:
# poll() call.
loop.processCallbacks()

# Wait until we have a decent amount of data and pick the most frequently
# seen futures.
const
ticksBetweenChecks = 50
minimumCallbacksPerCheck = 1000
maximumPicksPerCheck = 5

if chronos_poll_ticks.value.int64 mod ticksBetweenChecks == 0:
var sum = 0
for val in callbacksByFuture.values:
sum += val

if sum >= minimumCallbacksPerCheck:
callbacksByFuture.sort()
var i = 0
for futureLocation, val in callbacksByFuture:
if i == maximumPicksPerCheck:
break
chronos_future_callbacks.inc(val.int64, labelValues = [futureLocation])
i.inc()
callbacksByFuture.clear()

else:
proc initAPI() = discard
proc globalInit() = discard
Expand Down Expand Up @@ -786,7 +826,9 @@ include asyncfutures2
proc sleepAsync*(duration: Duration): Future[void] =
## Suspends the execution of the current async procedure for the next
## ``duration`` time.
var retFuture = newFuture[void]("chronos.sleepAsync(Duration)")

# It won't compile with a string argument.
var retFuture = newFuture[void](getSrcLocation("chronos.sleepAsync(chronos.timer.Duration)"))
let moment = Moment.fromNow(duration)
var timer: TimerCallback

Expand Down

0 comments on commit 1c1e0e3

Please sign in to comment.