Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize task loading persistence requests #3217

Merged
merged 9 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,10 @@ func FactoryProvider(
) Factory {
var requestRatelimiter quotas.RequestRateLimiter
if params.PersistenceMaxQPS != nil && params.PersistenceMaxQPS() > 0 {
rateFn := func() float64 { return float64(params.PersistenceMaxQPS()) }

if params.PriorityRateLimiting != nil && params.PriorityRateLimiting() {
requestRatelimiter = NewPriorityRateLimiter(rateFn)
requestRatelimiter = NewPriorityRateLimiter(params.PersistenceMaxQPS)
} else {
requestRatelimiter = NewNoopPriorityRateLimiter(rateFn)
requestRatelimiter = NewNoopPriorityRateLimiter(params.PersistenceMaxQPS)
}
}

Expand Down
35 changes: 28 additions & 7 deletions common/persistence/client/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,45 @@ import (
var (
CallerTypePriority = map[string]int{
headers.CallerTypeAPI: 0,
headers.CallerTypeBackground: 1,
headers.CallerTypeBackground: 2,
}

APIPriorityOverride = map[string]int{
"GetOrCreateShard": 0,
"UpdateShard": 0,

// this is a preprequisite for checkpoint queue process progress
// This is a preprequisite for checkpoint queue process progress
"RangeCompleteHistoryTasks": 0,

// Task resource isolation assumes task can always be loaded.
// When one namespace has high load, all task processing goroutines
// may be busy and consumes all persistence request tokens, preventing
// tasks for other namespaces to be loaded. So give task loading a higher
// priority than other background requests.
// NOTE: we also don't want task loading to consume all persistence request tokens,
// and blocks all other operations. This is done by limiting the total rps allow
// for this priority.
// TODO: exclude certain task type from this override, like replication.
"GetHistoryTasks": 1,
}

RequestPrioritiesOrdered = []int{0, 1}
RequestPrioritiesOrdered = []int{0, 1, 2}

PriorityRatePercentage = map[int]float64{
0: 1.0,
1: 0.8,
yycptt marked this conversation as resolved.
Show resolved Hide resolved
2: 1.0,
}
)

func NewPriorityRateLimiter(
rateFn quotas.RateFn,
maxQps PersistenceMaxQps,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RateLimiter)
for priority := range RequestPrioritiesOrdered {
rateLimiters[priority] = quotas.NewDefaultOutgoingRateLimiter(rateFn)
rateLimiters[priority] = quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(maxQps()) * PriorityRatePercentage[priority] },
)
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -72,14 +91,16 @@ func NewPriorityRateLimiter(
}

func NewNoopPriorityRateLimiter(
rateFn quotas.RateFn,
maxQps PersistenceMaxQps,
) quotas.RequestRateLimiter {
priority := RequestPrioritiesOrdered[0]

return quotas.NewPriorityRateLimiter(
func(_ quotas.Request) int { return priority },
map[int]quotas.RateLimiter{
priority: quotas.NewDefaultOutgoingRateLimiter(rateFn),
priority: quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(maxQps()) },
),
},
)
}
9 changes: 9 additions & 0 deletions common/persistence/client/quotas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ func (s *quotasSuite) TestAPIPriorityOverrideMapping() {
}
}

func (s *quotasSuite) TestPriorityRatePercentageMapping() {
for _, priority := range RequestPrioritiesOrdered {
percentage, ok := PriorityRatePercentage[priority]
s.True(ok)
s.LessOrEqual(percentage, 1.0)
s.GreaterOrEqual(percentage, 0.0)
}
}

func (s *quotasSuite) TestRequestPrioritiesOrdered() {
for idx := range RequestPrioritiesOrdered[1:] {
s.True(RequestPrioritiesOrdered[idx] < RequestPrioritiesOrdered[idx+1])
Expand Down