Skip to content

Commit

Permalink
implement priority rate limiter (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
vivek-ng authored Nov 22, 2020
1 parent fa25e71 commit c24be6e
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 10 deletions.
63 changes: 63 additions & 0 deletions priority/priorityRateLimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package priority

import (
"container/heap"
"sync"

"github.com/vivek-ng/concurrency-limiter/queue"
)

type PriorityLimiter struct {
count int
limit int
mu sync.Mutex
waitList queue.PriorityQueue
}

func NewLimiter(limit int) *PriorityLimiter {
pq := make(queue.PriorityQueue, 0)
nl := &PriorityLimiter{
limit: limit,
waitList: pq,
}

heap.Init(&pq)
return nl
}

func (p *PriorityLimiter) Wait(priority int) {
ok, ch := p.proceed(priority)
if !ok {
<-ch
}
}

func (p *PriorityLimiter) proceed(priority int) (bool, chan struct{}) {
p.mu.Lock()
defer p.mu.Unlock()

if p.count < p.limit {
p.count++
return true, nil
}
ch := make(chan struct{})
w := &queue.Item{
Priority: priority,
Done: ch,
}
heap.Push(&p.waitList, w)
return false, ch
}

func (p *PriorityLimiter) Finish() {
p.mu.Lock()
defer p.mu.Unlock()
p.count -= 1
ele := heap.Pop(&p.waitList)
if ele == nil {
return
}
it := ele.(*queue.Item)
it.Done <- struct{}{}
close(it.Done)
}
36 changes: 36 additions & 0 deletions priority/priorityRateLimiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package priority

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/vivek-ng/concurrency-limiter/queue"
)

func TestPriorityLimiter(t *testing.T) {
nl := NewLimiter(3)
var wg sync.WaitGroup
wg.Add(5)

for i := 0; i < 5; i++ {
go func(pr int) {
defer wg.Done()
nl.Wait(pr)
}(i)
}
time.Sleep(200 * time.Millisecond)
assert.Equal(t, 2, nl.waitList.Len())
pVal := nl.waitList.Top()
pValItem := pVal.(queue.Item)
expectedVal1 := pValItem.Priority
nl.Finish()
pVal = nl.waitList.Top()
pValItem = pVal.(queue.Item)
expectedVal2 := pValItem.Priority
assert.Greater(t, expectedVal1, expectedVal2)
nl.Finish()
wg.Wait()
assert.Equal(t, 0, nl.waitList.Len())
}
19 changes: 14 additions & 5 deletions queue/priorityQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

type Item struct {
done chan struct{}
priority int
Done chan struct{}
Priority int
timeStamp int64
index int
}
Expand All @@ -17,10 +17,10 @@ type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }

func (pq PriorityQueue) Less(i, j int) bool {
if pq[i].priority == pq[j].priority {
if pq[i].Priority == pq[j].Priority {
return pq[i].timeStamp <= pq[j].timeStamp
}
return pq[i].priority > pq[j].priority
return pq[i].Priority > pq[j].Priority
}

func (pq PriorityQueue) Swap(i, j int) {
Expand All @@ -47,11 +47,20 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}

func (pq *PriorityQueue) Top() interface{} {
n := len(*pq)
if n == 0 {
return nil
}
ol := *pq
return *ol[0]
}

func makeTimestamp() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}

func (pq *PriorityQueue) Update(item *Item, priority int) {
item.priority = priority
item.Priority = priority
heap.Fix(pq, item.index)
}
11 changes: 6 additions & 5 deletions queue/priorityQueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ func TestPriorityQueue(t *testing.T) {
heap.Init(&pq)
for i := 0; i < 3; i++ {
item := &Item{
priority: i * 2,
done: make(chan struct{}),
Priority: i * 2,
Done: make(chan struct{}),
}
heap.Push(&pq, item)
}
expectedVals := []int{4, 2, 0}
actualVals := make([]int, 0)
for pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
actualVals = append(actualVals, item.priority)
topEle := pq.Top().(Item)
_ = heap.Pop(&pq).(*Item)
actualVals = append(actualVals, topEle.Priority)
}
assert.Equal(t, expectedVals, actualVals)
}
Expand All @@ -31,7 +32,7 @@ func TestPriorityQueue_SamePriority(t *testing.T) {

for i := 0; i < 3; i++ {
pq[i] = &Item{
priority: 1,
Priority: 1,
timeStamp: int64(i),
}
}
Expand Down

0 comments on commit c24be6e

Please sign in to comment.