Skip to content

Commit

Permalink
Assign priority before splitting the query
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Nov 28, 2023
1 parent 4001fe4 commit aa38a8c
Show file tree
Hide file tree
Showing 24 changed files with 533 additions and 481 deletions.
16 changes: 8 additions & 8 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5050,8 +5050,8 @@ otel:
# Priority level. Must be a unique value.
[priority: <int> | default = 0]
# Number of reserved queriers to handle priorities higher or equal to this value
# only. Value between 0 and 1 will be used as a percentage.
# Number of reserved queriers to handle priorities higher or equal to the
# priority level. Value between 0 and 1 will be used as a percentage.
[reserved_queriers: <float> | default = 0]
# List of query attributes to assign the priority.
Expand All @@ -5061,14 +5061,14 @@ otel:
### `QueryAttribute`

```yaml
# Query string regex.
[regex: <string> | default = ".*"]
# Query string regex. If set to empty string, it will not match anything.
[regex: <string> | default = ""]
# Query start time.
[start_time: <duration> | default = 0s]
# Query start time. If set to 0, the start time won't be checked.
[start_time: <int> | default = 0]
# Query end time.
[end_time: <duration> | default = 0s]
# Query end time. If set to 0, the end time won't be checked.
[end_time: <int> | default = 0]
```

### `DisabledRuleGroup`
Expand Down
9 changes: 4 additions & 5 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body = io.NopCloser(&buf)
}

r.Header.Get("test")
startTime := time.Now()
// get config
// assign priority
// embed it to the http request, header?
// extract Decode to here, to make sure all requests pass here
// log the priority as well
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)

Expand Down Expand Up @@ -348,6 +344,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
if ua := r.Header.Get("User-Agent"); len(ua) > 0 {
logMessage = append(logMessage, "user_agent", ua)
}
if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 {
logMessage = append(logMessage, "priority", queryPriority)
}

if error != nil {
s, ok := status.FromError(error)
Expand Down
20 changes: 2 additions & 18 deletions pkg/frontend/transport/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import (
"context"
"io"
"net/http"
"net/url"
"strings"
"time"

"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
)

// GrpcRoundTripper is similar to http.RoundTripper, but works with HTTP requests converted to protobuf messages.
type GrpcRoundTripper interface {
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest, url.Values, time.Time) (*httpgrpc.HTTPResponse, error)
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
}

func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
Expand All @@ -42,20 +39,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
return nil, err
}

var (
resp *httpgrpc.HTTPResponse
reqValues url.Values
ts time.Time
)

if strings.HasSuffix(r.URL.Path, "/query") || strings.HasSuffix(r.URL.Path, "/query_range") {
if err = r.ParseForm(); err == nil {
reqValues = r.Form
ts = time.Now()
}
}

resp, err = a.roundTripper.RoundTripGRPC(r.Context(), req, reqValues, ts)
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
if err != nil {
return nil, err
}
Expand Down
27 changes: 8 additions & 19 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"flag"
"fmt"
"net/http"
"net/url"
"strconv"
"time"

"github.com/go-kit/log"
Expand All @@ -23,7 +23,6 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/httpgrpcutil"
util_query "github.com/cortexproject/cortex/pkg/util/query"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -98,11 +97,15 @@ type request struct {
request *httpgrpc.HTTPRequest
err chan error
response chan *httpgrpc.HTTPResponse
priority int64
}

func (r request) Priority() int64 {
return r.priority
priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64)
if err != nil {
return 0
}

return priority
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
Expand Down Expand Up @@ -181,7 +184,7 @@ func (f *Frontend) cleanupInactiveUserMetrics(user string) {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
// Propagate trace context in gRPC too - this will be ignored if using HTTP.
tracer, span := opentracing.GlobalTracer(), opentracing.SpanFromContext(ctx)
if tracer != nil && span != nil {
Expand All @@ -192,12 +195,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
}
}

tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
request := request{
request: req,
Expand All @@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
response: make(chan *httpgrpc.HTTPResponse, 1),
}

if reqParams != nil {
queryPriority := f.limits.QueryPriority(userID)

if queryPriority.Enabled {
request.priority = util_query.GetPriority(reqParams, ts, queryPriority)
}
}

if err := f.queueRequest(ctx, &request); err != nil {
return nil, err
}
Expand Down
13 changes: 1 addition & 12 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/rand"
"net/http"
"net/url"
"sync"
"time"

Expand All @@ -28,7 +27,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/httpgrpcutil"
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_query "github.com/cortexproject/cortex/pkg/util/query"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -89,7 +87,6 @@ type frontendRequest struct {
request *httpgrpc.HTTPRequest
userID string
statsEnabled bool
priority int64

cancel context.CancelFunc

Expand Down Expand Up @@ -170,7 +167,7 @@ func (f *Frontend) stopping(_ error) error {
}

// RoundTripGRPC round trips a proto (instead of a HTTP request).
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest, reqParams url.Values, ts time.Time) (*httpgrpc.HTTPResponse, error) {
func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if s := f.State(); s != services.Running {
return nil, fmt.Errorf("frontend not running: %v", s)
}
Expand Down Expand Up @@ -210,14 +207,6 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
}

if reqParams != nil {
queryPriority := f.limits.QueryPriority(userID)

if queryPriority.Enabled {
freq.priority = util_query.GetPriority(reqParams, ts, queryPriority)
}
}

f.requests.put(freq)
defer f.requests.delete(freq.queryID)

Expand Down
1 change: 0 additions & 1 deletion pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
HttpRequest: req.request,
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
Priority: req.priority,
})

if err != nil {
Expand Down
13 changes: 6 additions & 7 deletions pkg/frontend/v2/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package v2
import (
"context"
"net"
"net/url"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -112,7 +111,7 @@ func TestFrontendBasicWorkflow(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
resp, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), resp.Code)
require.Equal(t, []byte(body), resp.Body)
Expand Down Expand Up @@ -142,7 +141,7 @@ func TestFrontendRetryRequest(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 3)

res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
res, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
require.Equal(t, int32(200), res.Code)
}
Expand All @@ -169,7 +168,7 @@ func TestFrontendRetryEnqueue(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.OK}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), userID), &httpgrpc.HTTPRequest{})
require.NoError(t, err)
}

Expand All @@ -178,7 +177,7 @@ func TestFrontendEnqueueFailure(t *testing.T) {
return &schedulerpb.SchedulerToFrontend{Status: schedulerpb.SHUTTING_DOWN}
}, 0)

_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
_, err := f.RoundTripGRPC(user.InjectOrgID(context.Background(), "test"), &httpgrpc.HTTPRequest{})
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "failed to enqueue request"))
}
Expand All @@ -189,7 +188,7 @@ func TestFrontendCancellation(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, resp)

Expand Down Expand Up @@ -238,7 +237,7 @@ func TestFrontendFailedCancellation(t *testing.T) {
}()

// send request
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{}, url.Values{}, time.Now())
resp, err := f.RoundTripGRPC(user.InjectOrgID(ctx, "test"), &httpgrpc.HTTPRequest{})
require.EqualError(t, err, context.Canceled.Error())
require.Nil(t, resp)

Expand Down
18 changes: 2 additions & 16 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@ import (
"bytes"
"context"
"fmt"
"github.com/cortexproject/cortex/pkg/util"
"io"
"net/http"
"net/url"
"sort"
"strconv"
"strings"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand All @@ -26,7 +25,6 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -132,7 +130,7 @@ func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string {
func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) {
result := PrometheusRequest{Headers: map[string][]string{}}
var err error
result.Time, err = parseTimeParam(r, "time", c.now().Unix())
result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix())
if err != nil {
return nil, decorateWithParamName(err, "time")
}
Expand Down Expand Up @@ -630,15 +628,3 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
return s.Result.GetRawBytes(), nil
}
}

func parseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) {
val := r.FormValue(paramName)
if val == "" {
val = strconv.FormatInt(defaultValue, 10)
}
result, err := util.ParseTime(val)
if err != nil {
return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName)
}
return result, nil
}
9 changes: 8 additions & 1 deletion pkg/querier/tripperware/limits.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package tripperware

import "time"
import (
"time"

"github.com/cortexproject/cortex/pkg/util/validation"
)

// Limits allows us to specify per-tenant runtime limits on the behavior of
// the query handling code.
Expand All @@ -21,4 +25,7 @@ type Limits interface {

// QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user.
QueryVerticalShardSize(userID string) int

// QueryPriority returns the query priority config for the tenant, including different priorities and their attributes.
QueryPriority(userID string) validation.QueryPriority
}
Loading

0 comments on commit aa38a8c

Please sign in to comment.