Skip to content

Commit

Permalink
Add benchmark tests
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Nov 28, 2023
1 parent ab4b9c7 commit 8086aee
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 8 deletions.
8 changes: 2 additions & 6 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,10 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl
return errors.New("no queue found")
}

metricLabels := prometheus.Labels{
"user": userID,
"priority": priority,
}
q.totalRequests.With(metricLabels).Inc()
q.totalRequests.WithLabelValues(userID, priority).Inc()

if queue.length() >= maxOutstandingRequests {
q.discardedRequests.With(metricLabels).Inc()
q.discardedRequests.WithLabelValues(userID, priority).Inc()
return ErrTooManyRequests
}

Expand Down
103 changes: 101 additions & 2 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func BenchmarkQueueRequest(b *testing.B) {

for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}),
MockLimits{MaxOutstanding: 100},
nil,
)
Expand Down Expand Up @@ -115,6 +115,105 @@ func BenchmarkQueueRequest(b *testing.B) {
}
}

func BenchmarkGetNextRequestPriorityQueue(b *testing.B) {
const maxOutstandingPerTenant = 2
const numTenants = 50
const queriers = 5

queues := make([]*RequestQueue, 0, b.N)

for n := 0; n < b.N; n++ {
queue := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}),
MockLimits{MaxOutstanding: 100, QueryPriorityVal: validation.QueryPriority{Enabled: true}},
nil,
)
queues = append(queues, queue)

for ix := 0; ix < queriers; ix++ {
queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
}

for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
userID := strconv.Itoa(j)

err := queue.EnqueueRequest(userID, MockRequest{priority: int64(i)}, 0, nil)
if err != nil {
b.Fatal(err)
}
}
}
}

ctx := context.Background()
b.ResetTimer()

for i := 0; i < b.N; i++ {
idx := FirstUser()
for j := 0; j < maxOutstandingPerTenant*numTenants; j++ {
querier := ""
b:
// Find querier with at least one request to avoid blocking in getNextRequestForQuerier.
for _, q := range queues[i].queues.userQueues {
for qid := range q.queriers {
querier = qid
break b
}
}

_, nidx, err := queues[i].GetNextRequestForQuerier(ctx, idx, querier)
if err != nil {
b.Fatal(err)
}
idx = nidx
}
}
}

func BenchmarkQueueRequestPriorityQueue(b *testing.B) {
const maxOutstandingPerTenant = 2
const numTenants = 50
const queriers = 5

queues := make([]*RequestQueue, 0, b.N)
users := make([]string, 0, numTenants)
requests := make([]MockRequest, 0, numTenants)

for n := 0; n < b.N; n++ {
q := NewRequestQueue(maxOutstandingPerTenant, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}),
MockLimits{MaxOutstanding: 100, QueryPriorityVal: validation.QueryPriority{Enabled: true}},
nil,
)

for ix := 0; ix < queriers; ix++ {
q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix))
}

queues = append(queues, q)

for j := 0; j < numTenants; j++ {
requests = append(requests, MockRequest{id: fmt.Sprintf("%d-%d", n, j), priority: int64(j)})
users = append(users, strconv.Itoa(j))
}
}

b.ResetTimer()
for n := 0; n < b.N; n++ {
for i := 0; i < maxOutstandingPerTenant; i++ {
for j := 0; j < numTenants; j++ {
err := queues[n].EnqueueRequest(users[j], requests[j], 0, nil)
if err != nil {
b.Fatal(err)
}
}
}
}
}

func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) {
const forgetDelay = 3 * time.Second

Expand Down

0 comments on commit 8086aee

Please sign in to comment.