diff --git a/CHANGELOG.md b/CHANGELOG.md index 211b42c86d..4b5d91cbea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5470](https://github.com/thanos-io/thanos/pull/5470) Receive: Implement exposing TSDB stats for all tenants - [#5493](https://github.com/thanos-io/thanos/pull/5493) Compact: Added `--compact.blocks-fetch-concurrency` allowing to configure number of go routines for download blocks during compactions. - [#5527](https://github.com/thanos-io/thanos/pull/5527) Receive: Add per request limits for remote write. +- [#5520](https://github.com/thanos-io/thanos/pull/5520) Receive: Meta-monitoring based active series limiting ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 8f6bb0beb7..eb8b5dae16 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -6,6 +6,7 @@ package main import ( "context" "io/ioutil" + "net/url" "os" "path" "strings" @@ -185,6 +186,9 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } + // Impose active series limit only if Receiver is in Router or RouterIngestor mode, and config is provided. + seriesLimitSupported := (receiveMode == receive.RouterOnly || receiveMode == receive.RouterIngestor) && conf.maxPerTenantLimit != 0 + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -214,6 +218,11 @@ func runReceive( DialOpts: dialOpts, ForwardTimeout: time.Duration(*conf.forwardTimeout), TSDBStats: dbs, + SeriesLimitSupported: seriesLimitSupported, + MaxPerTenantLimit: conf.maxPerTenantLimit, + MetaMonitoringUrl: conf.metaMonitoringUrl, + MetaMonitoringHttpClient: conf.metaMonitoringHttpClient, + MetaMonitoringLimitQuery: conf.metaMonitoringLimitQuery, WriteSeriesLimit: conf.writeSeriesLimit, WriteSamplesLimit: conf.writeSamplesLimit, WriteRequestSizeLimit: conf.writeRequestSizeLimit, @@ -297,6 +306,23 @@ func runReceive( ) } + if seriesLimitSupported { + level.Info(logger).Log("msg", "setting up periodic (every 15s) meta-monitoring query for limiting cache") + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(15*time.Second, ctx.Done(), func() error { + if err := webHandler.ActiveSeriesLimit.QueryMetaMonitoring(ctx, log.With(logger, "component", "receive-meta-monitoring")); err != nil { + level.Error(logger).Log("msg", "failed to query meta-monitoring", "err", err.Error()) + } + return nil + }) + }, func(err error) { + cancel() + }) + } + } + level.Debug(logger).Log("msg", "setting up periodic tenant pruning") { ctx, cancel := context.WithCancel(context.Background()) @@ -733,6 +759,11 @@ type receiveConfig struct { rwClientServerCA string rwClientServerName string + maxPerTenantLimit uint64 + metaMonitoringLimitQuery string + metaMonitoringUrl *url.URL + metaMonitoringHttpClient *extflag.PathOrContent + dataDir string labelStrs []string @@ -831,6 +862,14 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor) + cmd.Flag("receive.tenant-limits.max-head-series", "The total number of active (head) series that a tenant is allowed to have within a Receive topology. For more details refer: https://thanos.io/tip/components/receive.md/#limiting").Hidden().Uint64Var(&rc.maxPerTenantLimit) + + cmd.Flag("receive.tenant-limits.meta-monitoring-url", "Meta-monitoring URL which is compatible with Prometheus Query API for active series limiting.").Hidden().URLVar(&rc.metaMonitoringUrl) + + cmd.Flag("receive.tenant-limits.meta-monitoring-query", "PromQL Query to execute against meta-monitoring, to get the current number of active series for each tenant, across Receive replicas.").Default("sum(prometheus_tsdb_head_series) by (tenant)").Hidden().StringVar(&rc.metaMonitoringLimitQuery) + + rc.metaMonitoringHttpClient = extflag.RegisterPathOrContent(cmd, "receive.tenant-limits.meta-monitoring-client", "YAML file or string with http client configs for meta-monitoring.", extflag.WithHidden()) + rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden()) rc.relabelConfigPath = extflag.RegisterPathOrContent(cmd, "receive.relabel-config", "YAML file that contains relabeling configuration.", extflag.WithEnvSubstitution()) diff --git a/docs/components/receive.md b/docs/components/receive.md index 035b264429..c3c96d13e7 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -115,6 +115,23 @@ The available request gates in Thanos Receive can be configured with the followi By default all gates are disabled. +## Active Series Limiting (experimental) + +Thanos Receive, in Router or RouterIngestor mode, supports limiting tenant active (head) series to maintain the system's stability. It uses any Prometheus Query API compatible meta-monitoring solution that consumes the metrics exposed by all receivers in the Thanos system. Such query endpoint allows getting the scrape time seconds old number of all active series per tenant, which is then compared with a configured limit before ingesting any tenant's remote write request. In case a tenant has gone above the limit, their remote write requests fail fully. + +Every Receive Router/RouterIngestor node, queries meta-monitoring for active series of all tenants, every 15 seconds, and caches the results in a map. This cached result is used to limit all incoming remote write requests. + +To use the feature, one should specify the following (hidden) flags: +- `--receive.tenant-limits.max-head-series`: Specifies the total number of active (head) series for any tenant, across all replicas (including data replication), allowed by Thanos Receive. +- `--receive.tenant-limits.meta-monitoring-url`: Specifies Prometheus Query API compatible meta-monitoring endpoint. +- `--receive.tenant-limits.meta-monitoring-query`: Optional flag to specify PromQL query to execute against meta-monitoring. +- `--receive.tenant-limits.meta-monitoring-client`: Optional YAML file/string specifying HTTP client config for meta-monitoring. + +NOTE: +- It is possible that Receive ingests more active series than the specified limit, as it relies on meta-monitoring, which may not have the latest data for current number of active series of a tenant at all times. +- Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similaly to when one receiver cannot be scraped. +- Support for different limit configuration for different tenants is planned for the future. + ## Flags ```$ mdox-exec="thanos receive --help" diff --git a/go.mod b/go.mod index 02d725b201..1062dd4097 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,9 @@ require ( github.com/chromedp/chromedp v0.8.2 github.com/davecgh/go-spew v1.1.1 github.com/dustin/go-humanize v1.0.0 - github.com/efficientgo/e2e v0.12.1 + github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363 github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b - github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b + github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.2.0 github.com/felixge/fgprof v0.9.2 diff --git a/go.sum b/go.sum index c2281cb8ba..70a4587c01 100644 --- a/go.sum +++ b/go.sum @@ -296,13 +296,14 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= -github.com/efficientgo/e2e v0.12.1 h1:ZYNTf09ptlba0I3ZStYaF7gCbevWdalriiX7usOSiFM= github.com/efficientgo/e2e v0.12.1/go.mod h1:xDHUyIqAWyVWU29Lf+BaZoavW7xAbDEvTwHWWI/3bhk= +github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363 h1:Nw7SeBNMBrX3s0BbDlAWuGhEEDcKLteMsMmPThj4sxQ= +github.com/efficientgo/e2e v0.12.2-0.20220714084440-2f5240d8c363/go.mod h1:0Jrqcog5+GlJkbC8ulPkgyRZwq+GsvjUlNt+B2swzJ8= github.com/efficientgo/tools/core v0.0.0-20210129205121-421d0828c9a6/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b h1:ZHiD4/yE4idlbqvAO6iYCOYRzOMRpxkW+FKasRA3tsQ= github.com/efficientgo/tools/core v0.0.0-20220225185207-fe763185946b/go.mod h1:OmVcnJopJL8d3X3sSXTiypGoUSgFq1aDGmlrdi9dn/M= -github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b h1:rFV4ZGoCKjhOyc4vjrzuCsi9BbrxMJvwmtceN0iR4Zc= -github.com/efficientgo/tools/extkingpin v0.0.0-20220225185207-fe763185946b/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= +github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d h1:WZV/mrUyKS9w9r+Jdw+zq/tdGAb5LwB+H37EkMLhEMA= +github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d/go.mod h1:ZV0utlglOczUWv3ih2AbqPSoLoFzdplUYxwV62eZi6Q= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= github.com/elastic/go-sysinfo v1.8.1 h1:4Yhj+HdV6WjbCRgGdZpPJ8lZQlXZLKDAeIkmQ/VRvi4= github.com/elastic/go-sysinfo v1.8.1/go.mod h1:JfllUnzoQV/JRYymbH3dO1yggI3mV2oTKSXsDHM+uIM= @@ -1011,6 +1012,7 @@ github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+ github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= github.com/prometheus/common v0.35.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= +github.com/prometheus/common v0.36.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE= github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/common/assets v0.2.0/go.mod h1:D17UVUE12bHbim7HzwUvtqm6gwBEaDQ0F+hIGbFbccI= diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 487fc5032e..09446fb34a 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -12,16 +12,20 @@ import ( stdlog "log" "net" "net/http" + "net/url" "sort" "strconv" "sync" "time" + extflag "github.com/efficientgo/tools/extkingpin" "github.com/thanos-io/thanos/pkg/api" statusapi "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/promclient" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -101,12 +105,23 @@ type Options struct { ForwardTimeout time.Duration RelabelConfigs []*relabel.Config TSDBStats TSDBStats + SeriesLimitSupported bool + MaxPerTenantLimit uint64 + MetaMonitoringUrl *url.URL + MetaMonitoringHttpClient *extflag.PathOrContent + MetaMonitoringLimitQuery string WriteSeriesLimit int64 WriteSamplesLimit int64 WriteRequestSizeLimit int64 WriteRequestConcurrencyLimit int } +// activeSeriesLimiter encompasses active series limiting logic. +type activeSeriesLimiter interface { + QueryMetaMonitoring(context.Context, log.Logger) error + isUnderLimit(string, log.Logger) (bool, error) +} + // Handler serves a Prometheus remote write receiving HTTP endpoint. type Handler struct { logger log.Logger @@ -115,12 +130,13 @@ type Handler struct { options *Options listener net.Listener - mtx sync.RWMutex - hashring Hashring - peers *peerGroup - expBackoff backoff.Backoff - peerStates map[string]*retryState - receiverMode ReceiverMode + mtx sync.RWMutex + hashring Hashring + peers *peerGroup + expBackoff backoff.Backoff + peerStates map[string]*retryState + receiverMode ReceiverMode + ActiveSeriesLimit activeSeriesLimiter forwardRequests *prometheus.CounterVec replications *prometheus.CounterVec @@ -219,6 +235,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler { h.replicationFactor.Set(1) } + h.ActiveSeriesLimit = NewNopSeriesLimit() + if h.options.SeriesLimitSupported { + h.ActiveSeriesLimit = NewActiveSeriesLimit(h.options, registerer, h.receiverMode, logger) + } + ins := extpromhttp.NewNopInstrumentationMiddleware() if o.Registry != nil { ins = extpromhttp.NewTenantInstrumentationMiddleware( @@ -431,6 +452,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { defer h.writeGate.Done() + under, err := h.ActiveSeriesLimit.isUnderLimit(tenant, tLogger) + if err != nil { + level.Error(tLogger).Log("msg", "error while limiting", "err", err.Error()) + } + + // Fail request fully if tenant has exceeded set limit. + if !under { + http.Error(w, "tenant is above active series limit", http.StatusTooManyRequests) + return + } + // ioutil.ReadAll dynamically adjust the byte slice for read data, starting from 512B. // Since this is receive hot path, grow upfront saving allocations and CPU time. compressed := bytes.Buffer{} @@ -534,6 +566,141 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } +// activeSeriesLimit implements activeSeriesLimiter interface. +type activeSeriesLimit struct { + mtx sync.RWMutex + limit uint64 + tenantCurrentSeriesMap map[string]float64 + + metaMonitoringURL *url.URL + metaMonitoringClient *http.Client + metaMonitoringQuery string + + configuredTenantLimit prometheus.Gauge + limitedRequests *prometheus.CounterVec + metaMonitoringErr prometheus.Counter +} + +func NewActiveSeriesLimit(o *Options, registerer prometheus.Registerer, r ReceiverMode, logger log.Logger) *activeSeriesLimit { + limit := &activeSeriesLimit{ + limit: o.MaxPerTenantLimit, + metaMonitoringURL: o.MetaMonitoringUrl, + metaMonitoringQuery: o.MetaMonitoringLimitQuery, + configuredTenantLimit: promauto.With(registerer).NewGauge( + prometheus.GaugeOpts{ + Name: "thanos_receive_tenant_head_series_limit", + Help: "The configured limit for active (head) series of tenants.", + }, + ), + limitedRequests: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_receive_head_series_limited_requests_total", + Help: "The total number of remote write requests that have been dropped due to active series limiting.", + }, []string{"tenant"}, + ), + metaMonitoringErr: promauto.With(registerer).NewCounter( + prometheus.CounterOpts{ + Name: "thanos_receive_metamonitoring_failed_queries_total", + Help: "The total number of meta-monitoring queries that failed while limiting.", + }, + ), + } + + limit.configuredTenantLimit.Set(float64(o.MaxPerTenantLimit)) + limit.tenantCurrentSeriesMap = map[string]float64{} + + // Use specified HTTPConfig to make requests to meta-monitoring. + httpConfContentYaml, err := o.MetaMonitoringHttpClient.Content() + if err != nil { + level.Error(logger).Log("msg", "getting http client config", "err", err.Error()) + } + + httpClientConfig, err := httpconfig.NewClientConfigFromYAML(httpConfContentYaml) + if err != nil { + level.Error(logger).Log("msg", "parsing http config YAML", "err", err.Error()) + } + + limit.metaMonitoringClient, err = httpconfig.NewHTTPClient(*httpClientConfig, "meta-mon-for-limit") + if err != nil { + level.Error(logger).Log("msg", "improper http client config", "err", err.Error()) + } + + return limit +} + +// QueryMetaMonitoring queries any Prometheus Query API compatible meta-monitoring +// solution with the configured query for getting current active (head) series of all tenants. +// It then populates tenantCurrentSeries map with result. +func (a *activeSeriesLimit) QueryMetaMonitoring(ctx context.Context, logger log.Logger) error { + c := promclient.NewWithTracingClient(logger, a.metaMonitoringClient, httpconfig.ThanosUserAgent) + + vectorRes, _, err := c.QueryInstant(ctx, a.metaMonitoringURL, a.metaMonitoringQuery, time.Now(), promclient.QueryOptions{}) + if err != nil { + a.metaMonitoringErr.Inc() + return err + } + + level.Debug(logger).Log("msg", "successfully queried meta-monitoring", "vectors", len(vectorRes)) + + a.mtx.Lock() + defer a.mtx.Unlock() + // Construct map of tenant name and current HEAD series. + for _, e := range vectorRes { + for k, v := range e.Metric { + if k == "tenant" { + a.tenantCurrentSeriesMap[string(v)] = float64(e.Value) + level.Debug(logger).Log("msg", "tenant value queried", "tenant", string(v), "value", e.Value) + } + } + } + + return nil +} + +// isUnderLimit ensures that the current number of active series for a tenant does not exceed given limit. +// It does so in a best-effort way, i.e, in case meta-monitoring is unreachable, it does not impose limits. +// TODO(saswatamcode): Add capability to configure different limits for different tenants. +func (a *activeSeriesLimit) isUnderLimit(tenant string, logger log.Logger) (bool, error) { + a.mtx.RLock() + defer a.mtx.RUnlock() + if a.limit == 0 || a.metaMonitoringURL.Host == "" { + return true, nil + } + + // In such limiting flow, we ingest the first remote write request + // and then check meta-monitoring metric to ascertain current active + // series. As such metric is updated in intervals, it is possible + // that Receive ingests more series than the limit, before detecting that + // a tenant has exceeded the set limits. + v, ok := a.tenantCurrentSeriesMap[tenant] + if !ok { + return true, errors.New("tenant not in current series map") + } + + if v >= float64(a.limit) { + level.Error(logger).Log("msg", "tenant above limit", "currentSeries", v, "limit", a.limit) + a.limitedRequests.WithLabelValues(tenant).Inc() + return false, nil + } + + return true, nil +} + +// nopSeriesLimit implements activeSeriesLimiter interface as no-op. +type nopSeriesLimit struct{} + +func NewNopSeriesLimit() *nopSeriesLimit { + return &nopSeriesLimit{} +} + +func (a *nopSeriesLimit) QueryMetaMonitoring(_ context.Context, _ log.Logger) error { + return nil +} + +func (a *nopSeriesLimit) isUnderLimit(_ string, _ log.Logger) (bool, error) { + return true, nil +} + // forward accepts a write request, batches its time series by // corresponding endpoint, and forwards them in parallel to the // correct endpoint. Requests destined for the local node are written diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index d39055ee51..7877f3dd14 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -168,6 +168,45 @@ func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, we return prom, sidecar } +type AvalancheOptions struct { + MetricCount string + SeriesCount string + MetricInterval string + SeriesInterval string + ValueInterval string + + RemoteURL string + RemoteWriteInterval string + RemoteBatchSize string + RemoteRequestCount string + + TenantID string +} + +func NewAvalanche(e e2e.Environment, name string, o AvalancheOptions) e2e.InstrumentedRunnable { + f := e2e.NewInstrumentedRunnable(e, name).WithPorts(map[string]int{"http": 9001}, "http").Future() + + args := e2e.BuildArgs(map[string]string{ + "--metric-count": o.MetricCount, + "--series-count": o.SeriesCount, + "--remote-url": o.RemoteURL, + "--remote-write-interval": o.RemoteWriteInterval, + "--remote-batch-size": o.RemoteBatchSize, + "--remote-requests-count": o.RemoteRequestCount, + "--value-interval": o.ValueInterval, + "--metric-interval": o.MetricInterval, + "--series-interval": o.SeriesInterval, + "--remote-tenant-header": "THANOS-TENANT", + "--remote-tenant": o.TenantID, + }) + + // Using this particular image as https://github.com/prometheus-community/avalanche/pull/25 is not merged yet. + return f.Init(wrapWithDefaults(e2e.StartOptions{ + Image: "quay.io/observatorium/avalanche:make-tenant-header-configurable-2021-10-07-0a2cbf5", + Command: e2e.NewCommandWithoutEntrypoint("avalanche", args...), + })) +} + type QuerierBuilder struct { name string routePrefix string @@ -364,12 +403,15 @@ type ReceiveBuilder struct { f e2e.FutureInstrumentedRunnable - maxExemplars int - ingestion bool - hashringConfigs []receive.HashringConfig - relabelConfigs []*relabel.Config - replication int - image string + maxExemplars int + ingestion bool + limit int + metamonitoring string + metamonitoringQuery string + hashringConfigs []receive.HashringConfig + relabelConfigs []*relabel.Config + replication int + image string } func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { @@ -411,6 +453,15 @@ func (r *ReceiveBuilder) WithRelabelConfigs(relabelConfigs []*relabel.Config) *R return r } +func (r *ReceiveBuilder) WithValidationEnabled(limit int, metamonitoring string, query ...string) *ReceiveBuilder { + r.limit = limit + r.metamonitoring = metamonitoring + if len(query) > 0 { + r.metamonitoringQuery = query[0] + } + return r +} + // Init creates a Thanos Receive instance. // If ingestion is enabled it will be configured for ingesting samples. // If routing is configured (i.e. hashring configuration is provided) it routes samples to other receivers. @@ -437,6 +488,14 @@ func (r *ReceiveBuilder) Init() e2e.InstrumentedRunnable { args["--receive.local-endpoint"] = r.InternalEndpoint("grpc") } + if r.limit != 0 && r.metamonitoring != "" { + args["--receive.tenant-limits.max-head-series"] = fmt.Sprintf("%v", r.limit) + args["--receive.tenant-limits.meta-monitoring-url"] = r.metamonitoring + if r.metamonitoringQuery != "" { + args["--receive.tenant-limits.meta-monitoring-query"] = r.metamonitoringQuery + } + } + if err := os.MkdirAll(filepath.Join(r.Dir(), "data"), 0750); err != nil { return e2e.NewErrInstrumentedRunnable(r.Name(), errors.Wrap(err, "create receive dir")) } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index c996501f2a..07ada51990 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -892,6 +892,43 @@ func instantQuery(t testing.TB, ctx context.Context, addr string, q func() strin return result } +func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expected model.Vector) { + t.Helper() + + fmt.Println("queryWaitAndAssert: Waiting for", len(expected), "results for query", q()) + var result model.Vector + + logger := log.NewLogfmtLogger(os.Stdout) + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error { + res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+addr), q(), ts(), opts) + if err != nil { + return err + } + + if len(warnings) > 0 { + return errors.Errorf("unexpected warnings %s", warnings) + } + + if len(res) != len(expected) { + return errors.Errorf("unexpected result size, expected %d; result %d: %v", len(expected), len(res), res) + } + result = res + sortResults(result) + for _, r := range result { + r.Timestamp = 0 // Does not matter for us. + } + + // Retry if not expected result + if reflect.DeepEqual(expected, result) { + return nil + } + return errors.New("series are different") + })) + + testutil.Equals(t, expected, result) +} + func queryAndAssertSeries(t *testing.T, ctx context.Context, addr string, q func() string, ts func() time.Time, opts promclient.QueryOptions, expected []model.Metric) { t.Helper() diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 53d1ebeeab..9f66f4efbb 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -12,6 +12,9 @@ import ( "time" "github.com/efficientgo/e2e" + e2edb "github.com/efficientgo/e2e/db" + e2emonitoring "github.com/efficientgo/e2e/monitoring" + "github.com/efficientgo/tools/core/pkg/backoff" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" @@ -602,4 +605,183 @@ func TestReceive(t *testing.T) { }, }) }) + + t.Run("multitenant_active_series_limiting", func(t *testing.T) { + + /* + The multitenant_active_series_limiting suite configures a hashring with + two avalanche writers and dedicated meta-monitoring. + + ┌──────────┐ ┌──────────┐ + │ │ │ │ + │Avalanche │ │Avalanche │ + │ │ │ │ + │ │ │ │ + └──────────┴──────────┐ ┌──────────┴──────────┘ + │ │ + ┌─▼─────▼──┐ + │ │ + │Router ├────────────────► Meta-monitoring + │Ingestor │ + │ │ + └──▲─┬──▲──┘ + │ │ │ + ┌──────────┐ │ │ │ ┌──────────┐ + │ │ │ │ │ │ │ + │Router ◄───────┘ │ └────────►Router │ + │Ingestor │ │ │Ingestor │ + │ ◄─────────┼───────────► │ + └────┬─────┘ │ └────┬─────┘ + │ │ │ + │ ┌────▼─────┐ │ + │ │ │ │ + └──────────► Query ◄──────────┘ + │ │ + │ │ + └──────────┘ + + NB: Made with asciiflow.com - you can copy & paste the above there to modify. + */ + + t.Parallel() + e, err := e2e.NewDockerEnvironment("e2e_multitenant_active_series_limiting") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // This can be treated as the meta-monitoring service. + meta, err := e2emonitoring.Start(e) + testutil.Ok(t, err) + + // Setup 3 RouterIngestors with a limit of 10 active series. + ingestor1 := e2ethanos.NewReceiveBuilder(e, "i1").WithIngestionEnabled() + ingestor2 := e2ethanos.NewReceiveBuilder(e, "i2").WithIngestionEnabled() + ingestor3 := e2ethanos.NewReceiveBuilder(e, "i3").WithIngestionEnabled() + + h := receive.HashringConfig{ + Endpoints: []string{ + ingestor1.InternalEndpoint("grpc"), + ingestor2.InternalEndpoint("grpc"), + ingestor3.InternalEndpoint("grpc"), + }, + } + + i1Runnable := ingestor1.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() + i2Runnable := ingestor2.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() + i3Runnable := ingestor3.WithRouting(1, h).WithValidationEnabled(10, "http://"+meta.GetMonitoringRunnable().InternalEndpoint(e2edb.AccessPortName)).Init() + + testutil.Ok(t, e2e.StartAndWaitReady(i1Runnable, i2Runnable, i3Runnable)) + + querier := e2ethanos.NewQuerierBuilder(e, "1", ingestor1.InternalEndpoint("grpc"), ingestor2.InternalEndpoint("grpc"), ingestor3.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + testutil.Ok(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics())) + + // We run two avalanches, one tenant which exceeds the limit, and one tenant which remains under it. + + // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. + // One request always fails due to TSDB not being ready for new tenant. + // So without limiting we end up with 40 timeseries and 40 samples. + avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1", + e2ethanos.AvalancheOptions{ + MetricCount: "10", + SeriesCount: "1", + MetricInterval: "30", + SeriesInterval: "3600", + ValueInterval: "3600", + + RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), + RemoteWriteInterval: "30s", + RemoteBatchSize: "10", + RemoteRequestCount: "5", + + TenantID: "exceed-tenant", + }) + + // Avalanche in this configuration, would send 5 requests each with 5 of the same timeseries. + // One request always fails due to TSDB not being ready for new tenant. + // So we end up with 5 timeseries, 20 samples. + avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2", + e2ethanos.AvalancheOptions{ + MetricCount: "5", + SeriesCount: "1", + MetricInterval: "3600", + SeriesInterval: "3600", + ValueInterval: "3600", + + RemoteURL: e2ethanos.RemoteWriteEndpoint(ingestor1.InternalEndpoint("remote-write")), + RemoteWriteInterval: "30s", + RemoteBatchSize: "5", + RemoteRequestCount: "5", + + TenantID: "under-tenant", + }) + + testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2)) + + // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request. + // 3 limited requests belong to the exceed-tenant. + testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2e.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2e.WaitMissingMetrics())) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + // Here for exceed-tenant we go above limit by 10, which results in 0 value. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { + return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_multitenant_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{}, + Value: model.SampleValue(0), + }, + }) + + // For under-tenant we stay at -5, as we have only pushed 5 series. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { + return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"}) - on() thanos_receive_tenant_head_series_limit{instance=\"e2e_multitenant_active_series_limiting-receive-i1:8080\", job=\"receive-i1\"}" + }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{}, + Value: model.SampleValue(-5), + }, + }) + + // Query meta-monitoring solution to assert that only 10 timeseries have been ingested for exceed-tenant. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"exceed-tenant\"})" }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{}, + Value: model.SampleValue(10), + }, + }) + + // Query meta-monitoring solution to assert that only 5 timeseries have been ingested for under-tenant. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "sum(prometheus_tsdb_head_series{tenant=\"under-tenant\"})" }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{}, + Value: model.SampleValue(5), + }, + }) + + // Query meta-monitoring solution to assert that 3 requests were limited for exceed-tenant and none for under-tenant. + queryWaitAndAssert(t, ctx, meta.GetMonitoringRunnable().Endpoint(e2edb.AccessPortName), func() string { return "thanos_receive_head_series_limited_requests_total" }, time.Now, promclient.QueryOptions{ + Deduplicate: true, + }, model.Vector{ + &model.Sample{ + Metric: model.Metric{ + "__name__": "thanos_receive_head_series_limited_requests_total", + "instance": "e2e_multitenant_active_series_limiting-receive-i1:8080", + "job": "receive-i1", + "tenant": "exceed-tenant", + }, + Value: model.SampleValue(3), + }, + }) + }) }