Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query priority #5605

Merged
merged 49 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
52d520b
Separate user queue channel into two - normalQueue and highPriorityQueue
justinjung04 Oct 16, 2023
646853a
Add ReservedHighPriorityQueriers
justinjung04 Oct 17, 2023
39639c4
Change default priority for all requests to low
justinjung04 Oct 17, 2023
38945ac
Update config description
justinjung04 Oct 17, 2023
d0f7e74
Lint
justinjung04 Oct 17, 2023
fec4a90
Fix test
justinjung04 Oct 17, 2023
ad046d0
More test fix
justinjung04 Oct 17, 2023
a88ad90
Add HighPriorityQueries confing + placeholder for IsHighPriorityQuery…
justinjung04 Oct 17, 2023
b050ac1
Nit
justinjung04 Oct 17, 2023
f767614
Lint
justinjung04 Oct 17, 2023
de06e37
Implement IsPriorityQuery
justinjung04 Oct 18, 2023
8b31b1c
Add changelog
justinjung04 Oct 18, 2023
d28aa33
Pass timestamp as param
justinjung04 Oct 18, 2023
daf269b
Parse form so that range query parameters are passed to the roundtrip
justinjung04 Oct 25, 2023
3b603ba
Address comments
justinjung04 Oct 29, 2023
393d9f0
Add CompiledRegex to HighPriorityQuery
justinjung04 Oct 31, 2023
ebc2c1e
Introduce numbered priority + change config structure
justinjung04 Nov 1, 2023
07723ce
Lint
justinjung04 Nov 1, 2023
488921c
Updated docs
justinjung04 Nov 1, 2023
d8b9799
Skip regex compile if it is match all
justinjung04 Nov 1, 2023
e6b9348
Updated GetPriority to handle new config structure
justinjung04 Nov 3, 2023
5eb2d73
Add priority queue to user queue
justinjung04 Nov 7, 2023
f064b3a
Nits + priority queue test added
justinjung04 Nov 8, 2023
e41e4c1
Create user request queue test
justinjung04 Nov 8, 2023
ca2cd1c
Add reserved querier logic
justinjung04 Nov 9, 2023
067cc83
Fix tests
justinjung04 Nov 9, 2023
e1b996f
Add priority label to request and queue metrics
justinjung04 Nov 9, 2023
2f7947c
Fix tests
justinjung04 Nov 9, 2023
f9df453
Fix tests
justinjung04 Nov 9, 2023
a510f77
Refactor
justinjung04 Nov 9, 2023
717d746
Add more tests
justinjung04 Nov 10, 2023
f3a9772
Lint
justinjung04 Nov 10, 2023
87c9a0e
Update doc
justinjung04 Nov 10, 2023
506d5c2
Improve time comparison when assigning priority
justinjung04 Nov 15, 2023
c0f268d
Make reserved querier to match exact priority + change query length g…
justinjung04 Nov 15, 2023
0b98773
Bug fix
justinjung04 Nov 15, 2023
3fffc3f
Add comments
justinjung04 Nov 15, 2023
2abeffb
Address comments
justinjung04 Nov 17, 2023
ab4b9c7
Make reserved querier to handle priorities higher or equal
justinjung04 Nov 21, 2023
8086aee
Add benchmark tests
justinjung04 Nov 21, 2023
584c6f9
Update regex to be compiled upon unmarshal
justinjung04 Nov 22, 2023
4001fe4
Make start and end time check to be skipped if not specified
justinjung04 Nov 22, 2023
aa38a8c
Assign priority before splitting the query
justinjung04 Nov 24, 2023
6c1813b
Attempt to fix tests
justinjung04 Nov 28, 2023
cdd2a0c
Address comments
justinjung04 Nov 29, 2023
3f347e4
Minor improvements
justinjung04 Nov 30, 2023
3ee0b0d
Rename query start end time
justinjung04 Nov 30, 2023
c0252f1
Improve tests
justinjung04 Nov 30, 2023
b6eb74d
Nit
justinjung04 Nov 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638

## 1.16.0 2023-11-20
Expand Down
39 changes: 39 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3041,6 +3041,18 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]

# Configuration for query priority.
query_priority:
# Whether queries are assigned with priorities.
[enabled: <boolean> | default = false]

# Priority assigned to all queries by default. Must be a unique value. Use
# this as a baseline to make certain queries higher/lower priority.
[default_priority: <int> | default = 0]

# List of priority definitions.
[priorities: <list of PriorityDef> | default = []]

# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down Expand Up @@ -5032,6 +5044,33 @@ otel:
[tls_insecure_skip_verify: <boolean> | default = false]
```

### `PriorityDef`

```yaml
# Priority level. Must be a unique value.
[priority: <int> | default = 0]

# Number of reserved queriers to handle priorities higher or equal to the
# priority level. Value between 0 and 1 will be used as a percentage.
[reserved_queriers: <float> | default = 0]

# List of query attributes to assign the priority.
[query_attributes: <list of QueryAttribute> | default = []]
```

### `QueryAttribute`

```yaml
# Query string regex. If set to empty string, it will not match anything.
[regex: <string> | default = ""]

# Query start time. If set to 0, the start time won't be checked.
[start_time: <int> | default = 0]

# Query end time. If set to 0, the end time won't be checked.
[end_time: <int> | default = 0]
```
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved

### `DisabledRuleGroup`

```yaml
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
cfg.FrontendV2.Port = grpcListenPort
}

fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry)
fr, err := v2.NewFrontend(cfg.FrontendV2, limits, log, reg, retry)
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err

default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

r.Header.Get("test")
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
startTime := time.Now()
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)
Expand Down Expand Up @@ -343,6 +344,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
if ua := r.Header.Get("User-Agent"); len(ua) > 0 {
logMessage = append(logMessage, "user_agent", ua)
}
if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 {
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
logMessage = append(logMessage, "priority", queryPriority)
}

if error != nil {
s, ok := status.FromError(error)
Expand Down
29 changes: 24 additions & 5 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"net/http"
"strconv"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -51,14 +52,19 @@ type Limits interface {

// MockLimits implements the Limits interface. Used in tests only.
type MockLimits struct {
Queriers float64
Queriers float64
queryPriority validation.QueryPriority
queue.MockLimits
}

func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
return l.Queriers
}

func (l MockLimits) QueryPriority(_ string) validation.QueryPriority {
return l.queryPriority
}

// Frontend queues HTTP requests, dispatches them to backends, and handles retries
// for requests which failed.
type Frontend struct {
Expand Down Expand Up @@ -93,6 +99,15 @@ type request struct {
response chan *httpgrpc.HTTPResponse
}

func (r request) Priority() int64 {
priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64)
if err != nil {
return 0
}

return priority
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
f := &Frontend{
Expand All @@ -103,11 +118,11 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_query_frontend_queue_length",
Help: "Number of queries in the queue.",
}, []string{"user"}),
}, []string{"user", "priority", "type"}),
discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_discarded_requests_total",
Help: "Total number of query requests discarded.",
}, []string{"user"}),
}, []string{"user", "priority"}),
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_query_frontend_queue_duration_seconds",
Help: "Time spend by requests queued.",
Expand Down Expand Up @@ -160,8 +175,12 @@ func (f *Frontend) stopping(_ error) error {
}

func (f *Frontend) cleanupInactiveUserMetrics(user string) {
f.queueLength.DeleteLabelValues(user)
f.discardedRequests.DeleteLabelValues(user)
f.queueLength.DeletePartialMatch(prometheus.Labels{
"user": user,
})
f.discardedRequests.DeletePartialMatch(prometheus.Labels{
"user": user,
})
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func TestFrontendMetricsCleanup(t *testing.T) {
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_frontend_queue_length Number of queries in the queue.
# TYPE cortex_query_frontend_queue_length gauge
cortex_query_frontend_queue_length{user="1"} 0
cortex_query_frontend_queue_length{priority="0",type="fifo",user="1"} 0
`), "cortex_query_frontend_queue_length"))

fr.cleanupInactiveUserMetrics("1")
Expand Down
12 changes: 7 additions & 5 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
Expand Down Expand Up @@ -64,10 +65,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
type Frontend struct {
services.Service

cfg Config
log log.Logger

retry *transport.Retry
cfg Config
log log.Logger
limits scheduler.Limits
retry *transport.Retry

lastQueryID atomic.Uint64

Expand Down Expand Up @@ -112,7 +113,7 @@ type enqueueResult struct {
}

// NewFrontend creates a new frontend.
func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
func NewFrontend(cfg Config, limits scheduler.Limits, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
requestsCh := make(chan *frontendRequest)

schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log)
Expand All @@ -122,6 +123,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *t

f := &Frontend{
cfg: cfg,
limits: limits,
log: log,
requestsCh: requestsCh,
schedulerWorkers: schedulerWorkers,
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -48,7 +49,7 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched

//logger := log.NewLogfmtLogger(os.Stdout)
logger := log.NewNopLogger()
f, err := NewFrontend(cfg, logger, nil, transport.NewRetry(maxRetries, nil))
f, err := NewFrontend(cfg, queue.MockLimits{}, logger, nil, transport.NewRetry(maxRetries, nil))
require.NoError(t, err)

frontendv2pb.RegisterFrontendForQuerierServer(server, f)
Expand Down
18 changes: 2 additions & 16 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ import (
"bytes"
"context"
"fmt"
"github.com/cortexproject/cortex/pkg/util"
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
"io"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -132,7 +130,7 @@ func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string {
func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
result := PrometheusRequest{Headers: map[string][]string{}}
var err error
result.Time, err = parseTimeParam(r, "time", c.now().Unix())
result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix())
if err != nil {
return nil, decorateWithParamName(err, "time")
}
Expand Down Expand Up @@ -630,15 +628,3 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
return s.Result.GetRawBytes(), nil
}
}

func parseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) {
val := r.FormValue(paramName)
if val == "" {
val = strconv.FormatInt(defaultValue, 10)
}
result, err := util.ParseTime(val)
if err != nil {
return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}
9 changes: 8 additions & 1 deletion pkg/querier/tripperware/limits.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package tripperware

import "time"
import (
"time"

"github.com/cortexproject/cortex/pkg/util/validation"
)

// Limits allows us to specify per-tenant runtime limits on the behavior of
// the query handling code.
Expand All @@ -21,4 +25,7 @@ type Limits interface {

// QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user.
QueryVerticalShardSize(userID string) int

// QueryPriority returns the query priority config for the tenant, including different priorities and their attributes.
QueryPriority(userID string) validation.QueryPriority
}
94 changes: 94 additions & 0 deletions pkg/querier/tripperware/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package tripperware

import (
"net/http"
"strings"
"time"

"github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func GetPriority(r *http.Request, userID string, limits Limits, now time.Time) (int64, error) {
isQuery := strings.HasSuffix(r.URL.Path, "/query")
isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range")
queryPriority := limits.QueryPriority(userID)
query := r.FormValue("query")

if (!isQuery && !isQueryRange) || !queryPriority.Enabled || query == "" {
return 0, nil
}

expr, err := parser.ParseExpr(query)
if err != nil {
return 0, err
}

var startTime, endTime int64
if isQuery {
if t, err := util.ParseTimeParam(r, "time", now.Unix()); err == nil {
startTime = t
endTime = t
}
} else if isQueryRange {
if st, err := util.ParseTime(r.FormValue("start")); err == nil {
if et, err := util.ParseTime(r.FormValue("end")); err == nil {
startTime = st
endTime = et
}
}
}

es := &parser.EvalStmt{
Expr: expr,
Start: util.TimeFromMillis(startTime),
End: util.TimeFromMillis(endTime),
LookbackDelta: limits.MaxQueryLookback(userID), // this is available from querier flag.
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
}

minTime, maxTime := FindMinMaxTime(es)

for _, priority := range queryPriority.Priorities {
for _, attribute := range priority.QueryAttributes {
if attribute.Regex == "" || (attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(query)) {
continue
}

if isWithinTimeAttributes(attribute, now, minTime, maxTime) {
return priority.Priority, nil
}
}
}

return queryPriority.DefaultPriority, nil
}

func isWithinTimeAttributes(attribute validation.QueryAttribute, now time.Time, startTime, endTime int64) bool {
if attribute.StartTime == 0 && attribute.EndTime == 0 {
return true
}

if attribute.StartTime != 0 {
startTimeThreshold := now.Add(-1 * time.Duration(attribute.StartTime).Abs()).Truncate(time.Second).Unix()
if startTime < startTimeThreshold {
return false
}
}

if attribute.EndTime != 0 {
endTimeThreshold := now.Add(-1 * time.Duration(attribute.EndTime).Abs()).Add(1 * time.Second).Truncate(time.Second).Unix()
if endTime > endTimeThreshold {
return false
}
}

return true
}

func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) {
// Placeholder until Prometheus is updated to >=0.48.0
// which includes https://github.com/prometheus/prometheus/commit/9e3df532d8294d4fe3284bde7bc96db336a33552
return s.Start.Unix(), s.End.Unix()
}
Loading
Loading