Skip to content

Commit

Permalink
common/prque: refactor LazyQueue (ethereum#21236)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Dec 26, 2024
1 parent a4355be commit 29e1ab0
Showing 1 changed file with 36 additions and 20 deletions.
56 changes: 36 additions & 20 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,26 @@ import (
// LazyQueue is a priority queue data structure where priorities can change over
// time and are only evaluated on demand.
// Two callbacks are required:
// - priority evaluates the actual priority of an item
// - maxPriority gives an upper estimate for the priority in any moment between
// now and the given absolute time
// - priority evaluates the actual priority of an item
// - maxPriority gives an upper estimate for the priority in any moment between
// now and the given absolute time
//
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
clock mclock.Clock
// Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1].
queue [2]*sstack
popQueue *sstack
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
queue [2]*sstack
popQueue *sstack
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
lastRefresh1, lastRefresh2 mclock.AbsTime
}

type (
Expand All @@ -54,14 +56,17 @@ type (
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
clock: clock,
period: refreshPeriod}
popQueue: newSstack(nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
clock: clock,
period: refreshPeriod,
lastRefresh1: clock.Now(),
lastRefresh2: clock.Now(),
}
q.Reset()
q.Refresh()
q.refresh(clock.Now())
return q
}

Expand All @@ -71,9 +76,19 @@ func (q *LazyQueue) Reset() {
q.queue[1] = newSstack(q.setIndex1)
}

// Refresh should be called at least with the frequency specified by the refreshPeriod parameter
// Refresh performs queue re-evaluation if necessary
func (q *LazyQueue) Refresh() {
q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period)
now := q.clock.Now()
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
q.refresh(now)
q.lastRefresh2 = q.lastRefresh1
q.lastRefresh1 = now
}
}

// refresh re-evaluates items in the older queue and swaps the two queues
func (q *LazyQueue) refresh(now mclock.AbsTime) {
q.maxUntil = now + mclock.AbsTime(q.period)
for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value)
}
Expand Down Expand Up @@ -139,6 +154,7 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
}
return
}
nextIndex = q.peekIndex() // re-check because callback is allowed to push items back
}
}
}
Expand Down

0 comments on commit 29e1ab0

Please sign in to comment.