Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

common/prque: generic priority queue 9000 #26290

Merged
merged 7 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 38 additions & 40 deletions common/prque/lazyqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"golang.org/x/exp/constraints"
)

// LazyQueue is a priority queue data structure where priorities can change over
Expand All @@ -32,31 +33,31 @@ import (
//
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
type LazyQueue[P constraints.Ordered, V any] struct {
clock mclock.Clock
// Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1].
queue [2]*sstack
popQueue *sstack
queue [2]*sstack[P, V]
popQueue *sstack[P, V]
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
setIndex SetIndexCallback[V]
priority PriorityCallback[P, V]
maxPriority MaxPriorityCallback[P, V]
lastRefresh1, lastRefresh2 mclock.AbsTime
}

type (
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
PriorityCallback[P constraints.Ordered, V any] func(data V) P // actual priority callback
MaxPriorityCallback[P constraints.Ordered, V any] func(data V, until mclock.AbsTime) P // estimated maximum priority callback
)

// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil, false),
func NewLazyQueue[P constraints.Ordered, V any](setIndex SetIndexCallback[V], priority PriorityCallback[P, V], maxPriority MaxPriorityCallback[P, V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[P, V] {
q := &LazyQueue[P, V]{
popQueue: newSstack[P, V](nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
Expand All @@ -71,13 +72,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
}

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0, false)
q.queue[1] = newSstack(q.setIndex1, false)
func (q *LazyQueue[P, V]) Reset() {
q.queue[0] = newSstack[P, V](q.setIndex0)
q.queue[1] = newSstack[P, V](q.setIndex1)
}

// Refresh performs queue re-evaluation if necessary
func (q *LazyQueue) Refresh() {
func (q *LazyQueue[P, V]) Refresh() {
now := q.clock.Now()
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
q.refresh(now)
Expand All @@ -87,33 +88,33 @@ func (q *LazyQueue) Refresh() {
}

// refresh re-evaluates items in the older queue and swaps the two queues
func (q *LazyQueue) refresh(now mclock.AbsTime) {
func (q *LazyQueue[P, V]) refresh(now mclock.AbsTime) {
q.maxUntil = now.Add(q.period)
for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value)
q.Push(heap.Pop(q.queue[0]).(*item[P, V]).value)
}
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
q.indexOffset = 1 - q.indexOffset
q.maxUntil = q.maxUntil.Add(q.period)
}

// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
func (q *LazyQueue[P, V]) Push(data V) {
heap.Push(q.queue[1], &item[P, V]{data, q.maxPriority(data, q.maxUntil)})
}

// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
func (q *LazyQueue[P, V]) Update(index int) {
q.Push(q.Remove(index))
}

// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
func (q *LazyQueue[P, V]) Pop() (V, P) {
var (
resData interface{}
resPri int64
resData V
resPri P
)
q.MultiPop(func(data interface{}, priority int64) bool {
q.MultiPop(func(data V, priority P) bool {
resData = data
resPri = priority
return false
Expand All @@ -123,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) {

// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
func (q *LazyQueue[P, V]) peekIndex() int {
if q.queue[0].Len() != 0 {
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
return 1
Expand All @@ -139,17 +140,17 @@ func (q *LazyQueue) peekIndex() int {
// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
func (q *LazyQueue[P, V]) MultiPop(callback func(data V, priority P) bool) {
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data)})
data := heap.Pop(q.queue[nextIndex]).(*item[P, V]).value
heap.Push(q.popQueue, &item[P, V]{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
i := heap.Pop(q.popQueue).(*item[P, V])
if !callback(i.value, i.priority) {
for q.popQueue.Len() != 0 {
q.Push(heap.Pop(q.popQueue).(*item).value)
q.Push(heap.Pop(q.popQueue).(*item[P, V]).value)
}
return
}
Expand All @@ -159,31 +160,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
}

// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
func (q *LazyQueue[P, V]) PopItem() V {
i, _ := q.Pop()
return i
}

// Remove removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
if index < 0 {
return nil
}
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
func (q *LazyQueue[P, V]) Remove(index int) V {
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[P, V]).value
}

// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
func (q *LazyQueue[P, V]) Empty() bool {
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}

// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
func (q *LazyQueue[P, V]) Size() int {
return q.queue[0].Len() + q.queue[1].Len()
}

// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
func (q *LazyQueue[P, V]) setIndex0(data V, index int) {
if index == -1 {
q.setIndex(data, -1)
} else {
Expand All @@ -192,6 +190,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) {
}

// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
func (q *LazyQueue[P, V]) setIndex1(data V, index int) {
q.setIndex(data, index+index+1)
}
44 changes: 19 additions & 25 deletions common/prque/prque.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,65 +19,59 @@ package prque

import (
"container/heap"

"golang.org/x/exp/constraints"
)

// Priority queue data structure.
type Prque struct {
cont *sstack
type Prque[P constraints.Ordered, V any] struct {
cont *sstack[P, V]
}

// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, false)}
}

// NewWrapAround creates a new priority queue with wrap-around priority handling.
func NewWrapAround(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, true)}
func New[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *Prque[P, V] {
return &Prque[P, V]{newSstack[P, V](setIndex)}
}

// Pushes a value with a given priority into the queue, expanding if necessary.
func (p *Prque) Push(data interface{}, priority int64) {
heap.Push(p.cont, &item{data, priority})
func (p *Prque[P, V]) Push(data V, priority P) {
heap.Push(p.cont, &item[P, V]{data, priority})
}

// Peek returns the value with the greatest priority but does not pop it off.
func (p *Prque) Peek() (interface{}, int64) {
func (p *Prque[P, V]) Peek() (V, P) {
item := p.cont.blocks[0][0]
return item.value, item.priority
}

// Pops the value with the greatest priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, int64) {
item := heap.Pop(p.cont).(*item)
func (p *Prque[P, V]) Pop() (V, P) {
item := heap.Pop(p.cont).(*item[P, V])
return item.value, item.priority
}

// Pops only the item from the queue, dropping the associated priority value.
func (p *Prque) PopItem() interface{} {
return heap.Pop(p.cont).(*item).value
func (p *Prque[P, V]) PopItem() V {
return heap.Pop(p.cont).(*item[P, V]).value
}

// Remove removes the element with the given index.
func (p *Prque) Remove(i int) interface{} {
if i < 0 {
return nil
}
return heap.Remove(p.cont, i)
func (p *Prque[P, V]) Remove(i int) V {
return heap.Remove(p.cont, i).(*item[P, V]).value
}

// Checks whether the priority queue is empty.
func (p *Prque) Empty() bool {
func (p *Prque[P, V]) Empty() bool {
return p.cont.Len() == 0
}

// Returns the number of element in the priority queue.
func (p *Prque) Size() int {
func (p *Prque[P, V]) Size() int {
return p.cont.Len()
}

// Clears the contents of the priority queue.
func (p *Prque) Reset() {
*p = *New(p.cont.setIndex)
func (p *Prque[P, V]) Reset() {
*p = *New[P, V](p.cont.setIndex)
}
27 changes: 15 additions & 12 deletions common/prque/prque_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ func TestPrque(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int, int](nil)

for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], int64(prio[i]))
queue.Push(data[i], prio[i])
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[int64]int)
dict := make(map[int]int)
for i := 0; i < size; i++ {
dict[int64(prio[i])] = data[i]
dict[prio[i]] = data[i]
}

// Pop out the elements in priority order and verify them
prevPrio := int64(size + 1)
prevPrio := size + 1
for !queue.Empty() {
val, prio := queue.Pop()
if prio > prevPrio {
Expand All @@ -59,22 +61,23 @@ func TestReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int, int](nil)

for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], int64(prio[i]))
queue.Push(data[i], prio[i])
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[int64]int)
dict := make(map[int]int)
for i := 0; i < size; i++ {
dict[int64(prio[i])] = data[i]
dict[prio[i]] = data[i]
}
// Pop out half the elements in priority order and verify them
prevPrio := int64(size + 1)
prevPrio := size + 1
for i := 0; i < size/2; i++ {
val, prio := queue.Pop()
if prio > prevPrio {
Expand Down Expand Up @@ -104,7 +107,7 @@ func BenchmarkPush(b *testing.B) {
}
// Execute the benchmark
b.ResetTimer()
queue := New(nil)
queue := New[int64, int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
Expand All @@ -118,7 +121,7 @@ func BenchmarkPop(b *testing.B) {
data[i] = rand.Int()
prio[i] = rand.Int63()
}
queue := New(nil)
queue := New[int64, int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
Expand Down
Loading