Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new query inflight request on ingester #6081

Merged
merged 3 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,11 @@ instance_limits:
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]

# Max inflight query requests that this ingester can handle (across all
# tenants). Additional requests will be rejected. 0 = unlimited.
# CLI flag: -ingester.instance-limits.max-inflight-query-requests
[max_inflight_query_requests: <int> | default = 0]

# Comma-separated list of metric names, for which
# -ingester.max-series-per-metric and -ingester.max-global-series-per-metric
# limits will be ignored. Does not affect max-series-per-user or
Expand Down
48 changes: 32 additions & 16 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
f.Int64Var(&cfg.DefaultLimits.MaxInflightQueryRequests, "ingester.instance-limits.max-inflight-query-requests", 0, "Max inflight query requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")

f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.")

Expand Down Expand Up @@ -1401,9 +1402,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

c := i.trackInflightQueryRequest()
defer c()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
Expand All @@ -1426,8 +1424,15 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
return nil, err
}

// We will report *this* request in the error too.
c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, err
}

// It's not required to sort series from a single ingester because series are sorted by the Exemplar Storage before returning from Select.
res, err := q.Select(from, through, matchers...)
c()
if err != nil {
return nil, err
}
Expand All @@ -1452,17 +1457,13 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelValuesStream returns all label values that are associated with a given label name.
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1525,6 +1526,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
q.Close()
}

c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, cleanup, err
}
defer c()
vals, _, err := q.LabelValues(ctx, labelName, matchers...)
if err != nil {
return nil, cleanup, err
Expand All @@ -1537,17 +1543,13 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(ctx, req)
defer cleanup()
return resp, err
}

// LabelNamesStream return all the label names.
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
c := i.trackInflightQueryRequest()
defer c()
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
defer cleanup()

Expand Down Expand Up @@ -1605,6 +1607,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
q.Close()
}

c, err := i.trackInflightQueryRequest()
if err != nil {
return nil, cleanup, err
}
defer c()
names, _, err := q.LabelNames(ctx)
if err != nil {
return nil, cleanup, err
Expand Down Expand Up @@ -1831,9 +1838,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return err
}

c := i.trackInflightQueryRequest()
defer c()

spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
defer spanlog.Finish()

Expand Down Expand Up @@ -1879,11 +1883,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
return nil
}

func (i *Ingester) trackInflightQueryRequest() func() {
func (i *Ingester) trackInflightQueryRequest() (func(), error) {
gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightQueryRequests > 0 {
if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests {
return nil, errTooManyInflightQueryRequests
}
}

i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
return func() {
i.inflightQueryRequests.Dec()
}
}, nil
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
Expand All @@ -1894,8 +1905,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
}
defer q.Close()

c, err := i.trackInflightQueryRequest()
if err != nil {
return 0, 0, 0, err
}
// It's not required to return sorted series because series are sorted by the Cortex querier.
ss := q.Select(ctx, false, nil, matchers...)
c()
if ss.Err() != nil {
return 0, 0, 0, ss.Err()
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,33 @@ func Test_Ingester_LabelValues(t *testing.T) {
}
}

func Test_Ingester_LabelValue_MaxInflighQueryRequest(t *testing.T) {
danielblando marked this conversation as resolved.
Show resolved Hide resolved
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.LabelValuesRequest{}
_, err = i.LabelValues(ctx, rreq)
require.Error(t, err)
danielblando marked this conversation as resolved.
Show resolved Hide resolved
}

func Test_Ingester_Query(t *testing.T) {
series := []struct {
lbls labels.Labels
Expand Down Expand Up @@ -2409,6 +2436,35 @@ func Test_Ingester_Query(t *testing.T) {
})
}
}

func Test_Ingester_Query_MaxInflighQueryRequest(t *testing.T) {
danielblando marked this conversation as resolved.
Show resolved Hide resolved
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.QueryRequest{}
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(rreq, s)
require.Error(t, err)
}

func TestIngester_Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry())
require.NoError(t, err)
Expand Down Expand Up @@ -4949,6 +5005,33 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) {
require.Equal(t, maxExemplars, int64(5))
}

func Test_Ingester_QueryExemplar_MaxInflighQueryRequest(t *testing.T) {
danielblando marked this conversation as resolved.
Show resolved Hide resolved
cfg := defaultIngesterTestConfig(t)
cfg.DefaultLimits.MaxInflightQueryRequests = 1
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

i.inflightQueryRequests.Add(1)

// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

rreq := &client.ExemplarQueryRequest{}
_, err = i.QueryExemplars(ctx, rreq)
require.Error(t, err)
}

func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest {
var lbls = make([]labels.Labels, 0, count)
var samples = make([]cortexpb.Sample, 0, count)
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ var (
errMaxUsersLimitReached = errors.New("cannot create TSDB: ingesters's max tenants limit reached")
errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached")
errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester")
errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester")
)

// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
// (internal) error.
type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInMemoryTenants int64 `yaml:"max_tenants"`
MaxInMemorySeries int64 `yaml:"max_series"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInMemoryTenants int64 `yaml:"max_tenants"`
MaxInMemorySeries int64 `yaml:"max_series"`
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
MaxInflightQueryRequests int64 `yaml:"max_inflight_query_requests"`
}

// Sets default limit values for unmarshalling.
Expand Down
14 changes: 8 additions & 6 deletions pkg/ingester/instance_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (

func TestInstanceLimitsUnmarshal(t *testing.T) {
defaultInstanceLimits = &InstanceLimits{
MaxIngestionRate: 10,
MaxInMemoryTenants: 20,
MaxInMemorySeries: 30,
MaxInflightPushRequests: 40,
MaxIngestionRate: 10,
MaxInMemoryTenants: 20,
MaxInMemorySeries: 30,
MaxInflightPushRequests: 40,
MaxInflightQueryRequests: 50,
}

l := InstanceLimits{}
Expand All @@ -24,6 +25,7 @@ max_tenants: 50000
require.NoError(t, yaml.UnmarshalStrict([]byte(input), &l))
require.Equal(t, float64(125.678), l.MaxIngestionRate)
require.Equal(t, int64(50000), l.MaxInMemoryTenants)
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
require.Equal(t, int64(50), l.MaxInflightQueryRequests) // default value
}
Loading