Skip to content

Commit

Permalink
Add new query inflight request on ingester (#6081)
Browse files Browse the repository at this point in the history
* Add new query infligh request on ingester

Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>

* Fix test naming

Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>

* Add test for specif err interface

Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>

---------

Signed-off-by: Daniel Deluiggi <ddeluigg@amazon.com>
  • Loading branch information
danielblando authored Jul 18, 2024
1 parent 1ba4bca commit d2a0b02
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,11 @@ instance_limits:
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]
# Max inflight query requests that this ingester can handle (across all
# tenants). Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-inflight-query-requests
[max_inflight_query_requests: <int> | default = 0]
# Comma-separated list of metric names, for which
# -ingester.max-series-per-metric and -ingester.max-global-series-per-metric
# limits will be ignored. Does not affect max-series-per-user or
Expand Down
48 changes: 32 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightQueryRequests, "ingester.instance-limits.max-inflight-query-requests", 0, "Max inflight query requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")

f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.")

Expand Down Expand Up @@ -1401,9 +1402,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand All @@ -1426,8 +1424,15 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

// We will report *this* request in the error too.
c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, err
}

// It's not required to sort series from a single ingester because series are sorted by the Exemplar Storage before returning from Select.
res, err := q.Select(from, through, matchers...)
c()
if err != nil {
return nil, err
}
Expand All @@ -1452,17 +1457,13 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelValuesStream returns all label values that are associated with a given label name.
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1525,6 +1526,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
q.Close()
}

c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, cleanup, err
}
defer c()
vals, _, err := q.LabelValues(ctx, labelName, matchers...)
if err != nil {
return nil, cleanup, err
Expand All @@ -1537,17 +1543,13 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelNamesStream return all the label names.
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1605,6 +1607,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
q.Close()
}

c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, cleanup, err
}
defer c()
names, _, err := q.LabelNames(ctx)
if err != nil {
return nil, cleanup, err
Expand Down Expand Up @@ -1831,9 +1838,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

c := i.trackInflightQueryRequest()
defer c()

spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanlog.Finish()

Expand Down Expand Up @@ -1879,11 +1883,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

func (i *Ingester) trackInflightQueryRequest() func() {
func (i *Ingester) trackInflightQueryRequest() (func(), error) {
gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightQueryRequests > 0 {
if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests {
return nil, errTooManyInflightQueryRequests
}
}

i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
return func() {
i.inflightQueryRequests.Dec()
}
}, nil
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
Expand All @@ -1894,8 +1905,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
}
defer q.Close()

c, err := i.trackInflightQueryRequest()
if err != nil {
return 0, 0, 0, err
}
// It's not required to return sorted series because series are sorted by the Cortex querier.
ss := q.Select(ctx, false, nil, matchers...)
c()
if ss.Err() != nil {
return 0, 0, 0, ss.Err()
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,34 @@ func Test_Ingester_LabelValues(t *testing.T) {
}
}

func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.LabelValuesRequest{}
_, err = i.LabelValues(ctx, rreq)
require.Error(t, err)
require.Equal(t, err, errTooManyInflightQueryRequests)
}

func Test_Ingester_Query(t *testing.T) {
series := []struct {
lbls labels.Labels
Expand Down Expand Up @@ -2409,6 +2437,36 @@ func Test_Ingester_Query(t *testing.T) {
})
}
}

func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.QueryRequest{}
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(rreq, s)
require.Error(t, err)
require.Equal(t, err, errTooManyInflightQueryRequests)
}

func TestIngester_Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry())
require.NoError(t, err)
Expand Down Expand Up @@ -4949,6 +5007,34 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) {
require.Equal(t, maxExemplars, int64(5))
}

func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.ExemplarQueryRequest{}
_, err = i.QueryExemplars(ctx, rreq)
require.Error(t, err)
require.Equal(t, err, errTooManyInflightQueryRequests)
}

func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest {
var lbls = make([]labels.Labels, 0, count)
var samples = make([]cortexpb.Sample, 0, count)
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ var (
errMaxUsersLimitReached = errors.New("cannot create TSDB: ingesters's max tenants limit reached")
errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached")
errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester")
errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester")
)

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInMemoryTenants int64 `yaml:"max_tenants"`
MaxInMemorySeries int64 `yaml:"max_series"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInMemoryTenants int64 `yaml:"max_tenants"`
MaxInMemorySeries int64 `yaml:"max_series"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
MaxInflightQueryRequests int64 `yaml:"max_inflight_query_requests"`
}

// Sets default limit values for unmarshalling.
Expand Down
14 changes: 8 additions & 6 deletions pkg/ingester/instance_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

func TestInstanceLimitsUnmarshal(t *testing.T) {
defaultInstanceLimits = &InstanceLimits{
MaxIngestionRate: 10,
MaxInMemoryTenants: 20,
MaxInMemorySeries: 30,
MaxInflightPushRequests: 40,
MaxIngestionRate: 10,
MaxInMemoryTenants: 20,
MaxInMemorySeries: 30,
MaxInflightPushRequests: 40,
MaxInflightQueryRequests: 50,
}

l := InstanceLimits{}
Expand All @@ -24,6 +25,7 @@ max_tenants: 50000
require.NoError(t, yaml.UnmarshalStrict([]byte(input), &l))
require.Equal(t, float64(125.678), l.MaxIngestionRate)
require.Equal(t, int64(50000), l.MaxInMemoryTenants)
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
require.Equal(t, int64(50), l.MaxInflightQueryRequests) // default value
}

0 comments on commit d2a0b02

Please sign in to comment.