Skip to content

Commit

Permalink
Update regex to be compiled upon unmarshal
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Nov 28, 2023
1 parent 8086aee commit 584c6f9
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 142 deletions.
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

startTime := time.Now()
// get config
// assign priority
// embed it to the http request, header?
// extract Decode to here, to make sure all requests pass here
// log the priority as well
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)

Expand Down
28 changes: 1 addition & 27 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"net/http"
"net/url"
"reflect"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -81,13 +79,6 @@ type Frontend struct {
requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

// Used to check whether query priority config has changed
queryPriority map[string]validation.QueryPriority
queryPriorityMtx map[string]*sync.RWMutex

// Populate and reuse compiled regex until query priority config changes
compiledQueryPriority map[string]validation.QueryPriority

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher
Expand Down Expand Up @@ -223,24 +214,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
queryPriority := f.limits.QueryPriority(userID)

if queryPriority.Enabled {
if _, exists := f.queryPriorityMtx[userID]; !exists {
f.queryPriorityMtx[userID] = &sync.RWMutex{}
}

f.queryPriorityMtx[userID].RLock()
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority)
f.queryPriorityMtx[userID].RUnlock()

if queryPriorityChanged {
f.queryPriorityMtx[userID].Lock()
f.queryPriority[userID] = queryPriority
f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority)
f.queryPriorityMtx[userID].Unlock()
}

f.queryPriorityMtx[userID].RLock()
request.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID])
f.queryPriorityMtx[userID].Unlock()
request.priority = util_query.GetPriority(reqParams, ts, queryPriority)
}
}

Expand Down
28 changes: 1 addition & 27 deletions pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/rand"
"net/http"
"net/url"
"reflect"
"sync"
"time"

Expand All @@ -31,7 +30,6 @@ import (
util_log "github.com/cortexproject/cortex/pkg/util/log"
util_query "github.com/cortexproject/cortex/pkg/util/query"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// Config for a Frontend.
Expand Down Expand Up @@ -76,13 +74,6 @@ type Frontend struct {

lastQueryID atomic.Uint64

// Used to check whether query priority config has changed
queryPriority map[string]validation.QueryPriority
queryPriorityMtx map[string]*sync.RWMutex

// Populate and reuse compiled regex until query priority config changes
compiledQueryPriority map[string]validation.QueryPriority

// frontend workers will read from this channel, and send request to scheduler.
requestsCh chan *frontendRequest

Expand Down Expand Up @@ -223,24 +214,7 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest,
queryPriority := f.limits.QueryPriority(userID)

if queryPriority.Enabled {
if _, exists := f.queryPriorityMtx[userID]; !exists {
f.queryPriorityMtx[userID] = &sync.RWMutex{}
}

f.queryPriorityMtx[userID].RLock()
queryPriorityChanged := !reflect.DeepEqual(f.queryPriority[userID], queryPriority)
f.queryPriorityMtx[userID].RUnlock()

if queryPriorityChanged {
f.queryPriorityMtx[userID].Lock()
f.queryPriority[userID] = queryPriority
f.compiledQueryPriority[userID] = util_query.GetCompileQueryPriority(queryPriority)
f.queryPriorityMtx[userID].Unlock()
}

f.queryPriorityMtx[userID].RLock()
freq.priority = util_query.GetPriority(reqParams, ts, f.compiledQueryPriority[userID])
f.queryPriorityMtx[userID].Unlock()
freq.priority = util_query.GetPriority(reqParams, ts, queryPriority)
}
}

Expand Down
18 changes: 0 additions & 18 deletions pkg/util/query/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,12 @@ package query

import (
"net/url"
"regexp"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func GetCompileQueryPriority(queryPriority validation.QueryPriority) validation.QueryPriority {
compiledQueryPriority := queryPriority
for i, priority := range compiledQueryPriority.Priorities {
for j, attribute := range priority.QueryAttributes {
compiledRegex, err := regexp.Compile(attribute.Regex)
if err != nil {
continue
}

attribute.CompiledRegex = compiledRegex
compiledQueryPriority.Priorities[i].QueryAttributes[j] = attribute
}
}

return compiledQueryPriority
}

func GetPriority(requestParams url.Values, now time.Time, queryPriority validation.QueryPriority) int64 {
queryParam := requestParams.Get("query")
timeParam := requestParams.Get("time")
Expand Down
112 changes: 83 additions & 29 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
)

var errMaxGlobalSeriesPerUserValidation = errors.New("The ingester.max-global-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled")
var errDuplicateQueryPriorities = errors.New("There is a duplicate entry of priorities. Make sure they are all unique, including the default priority")
var errDuplicateQueryPriorities = errors.New("duplicate entry of priorities found. Make sure they are all unique, including the default priority")
var errCompilingQueryPriorityRegex = errors.New("error compiling query priority regex")

// Supported values for enum limits
const (
Expand Down Expand Up @@ -49,22 +50,22 @@ type DisabledRuleGroup struct {
type DisabledRuleGroups []DisabledRuleGroup

type QueryPriority struct {
Enabled bool `yaml:"enabled" doc:"nocli|description=Whether queries are assigned with priorities.|default=false"`
DefaultPriority int64 `yaml:"default_priority" doc:"nocli|description=Priority assigned to all queries by default. Must be a unique value. Use this as a baseline to make certain queries higher/lower priority.|default=0"`
Priorities []PriorityDef `yaml:"priorities" doc:"nocli|description=List of priority definitions."`
Enabled bool `yaml:"enabled" json:"enabled" doc:"nocli|description=Whether queries are assigned with priorities.|default=false"`
DefaultPriority int64 `yaml:"default_priority" json:"default_priority" doc:"nocli|description=Priority assigned to all queries by default. Must be a unique value. Use this as a baseline to make certain queries higher/lower priority.|default=0"`
Priorities []PriorityDef `yaml:"priorities" json:"priorities" doc:"nocli|description=List of priority definitions."`
}

type PriorityDef struct {
Priority int64 `yaml:"priority" doc:"nocli|description=Priority level. Must be a unique value.|default=0"`
ReservedQueriers float64 `yaml:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to this value only. Value between 0 and 1 will be used as a percentage.|default=0"`
QueryAttributes []QueryAttribute `yaml:"query_attributes" doc:"nocli|description=List of query attributes to assign the priority."`
Priority int64 `yaml:"priority" json:"priority" doc:"nocli|description=Priority level. Must be a unique value.|default=0"`
ReservedQueriers float64 `yaml:"reserved_queriers" json:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to this value only. Value between 0 and 1 will be used as a percentage.|default=0"`
QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query attributes to assign the priority."`
}

type QueryAttribute struct {
Regex string `yaml:"regex" doc:"nocli|description=Query string regex.|default=.*"`
CompiledRegex *regexp.Regexp `yaml:"-" doc:"nocli"`
StartTime time.Duration `yaml:"start_time" doc:"nocli|description=Query start time.|default=0s"`
EndTime time.Duration `yaml:"end_time" doc:"nocli|description=Query end time.|default=0s"`
Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Query string regex.|default=.*"`
StartTime time.Duration `yaml:"start_time" json:"start_time" doc:"nocli|description=Query start time.|default=0s"`
EndTime time.Duration `yaml:"end_time" json:"end_time" doc:"nocli|description=Query end time.|default=0s"`
CompiledRegex *regexp.Regexp
}

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -122,8 +123,10 @@ type Limits struct {
QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"`

// Query Frontend / Scheduler enforced limits.
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"`
QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."`
MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"`
QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."`
queryPriorityRegexHash string
queryPriorityCompiledRegex map[string]*regexp.Regexp

// Ruler defaults and limits.
RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"`
Expand Down Expand Up @@ -250,20 +253,6 @@ func (l *Limits) Validate(shardByAllLabels bool) error {
return errMaxGlobalSeriesPerUserValidation
}

// If query priority is enabled, do not allow duplicate priority values
if l.QueryPriority.Enabled {
queryPriority := l.QueryPriority
prioritySet := map[int64]struct{}{}
prioritySet[queryPriority.DefaultPriority] = struct{}{}
for _, priority := range queryPriority.Priorities {
if _, exists := prioritySet[priority.Priority]; exists {
return errDuplicateQueryPriorities
}

prioritySet[priority.Priority] = struct{}{}
}
}

return nil
}

Expand All @@ -280,7 +269,15 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
l.copyNotificationIntegrationLimits(defaultLimits.NotificationRateLimitPerIntegration)
}
type plain Limits
return unmarshal((*plain)(l))
if err := unmarshal((*plain)(l)); err != nil {
return err
}

if err := l.compileQueryPriorityRegex(); err != nil {
return err
}

return nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
Expand All @@ -298,7 +295,15 @@ func (l *Limits) UnmarshalJSON(data []byte) error {
dec := json.NewDecoder(bytes.NewReader(data))
dec.DisallowUnknownFields()

return dec.Decode((*plain)(l))
if err := dec.Decode((*plain)(l)); err != nil {
return err
}

if err := l.compileQueryPriorityRegex(); err != nil {
return err
}

return nil
}

func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimitMap) {
Expand All @@ -308,6 +313,55 @@ func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimi
}
}

func (l *Limits) hasQueryPriorityRegexChanged() bool {
var newHash string
for _, priority := range l.QueryPriority.Priorities {
for _, attribute := range priority.QueryAttributes {
newHash += attribute.Regex
}
}
if newHash != l.queryPriorityRegexHash {
l.queryPriorityRegexHash = newHash
return true
}
return false
}

func (l *Limits) compileQueryPriorityRegex() error {
if l.QueryPriority.Enabled {
hasQueryPriorityRegexChanged := l.hasQueryPriorityRegexChanged()
prioritySet := map[int64]struct{}{}
newQueryPriorityCompiledRegex := map[string]*regexp.Regexp{}

for i, priority := range l.QueryPriority.Priorities {
// Check for duplicate priority entry
if _, exists := prioritySet[priority.Priority]; exists {
return errDuplicateQueryPriorities
}
prioritySet[priority.Priority] = struct{}{}

for j, attribute := range priority.QueryAttributes {
if hasQueryPriorityRegexChanged {
compiledRegex, err := regexp.Compile(attribute.Regex)
if err != nil {
return errors.Join(errCompilingQueryPriorityRegex, err)
}
newQueryPriorityCompiledRegex[attribute.Regex] = compiledRegex
l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = compiledRegex
} else {
l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = l.queryPriorityCompiledRegex[attribute.Regex]
}
}
}

if hasQueryPriorityRegexChanged {
l.queryPriorityCompiledRegex = newQueryPriorityCompiledRegex
}
}

return nil
}

// When we load YAML from disk, we want the various per-customer limits
// to default to any values specified on the command line, not default
// command line values. This global contains those values. I (Tom) cannot
Expand Down
Loading

0 comments on commit 584c6f9

Please sign in to comment.