diff --git a/CHANGELOG.md b/CHANGELOG.md index 63f9780896..aea72a57ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits. - [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits. +- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt. - [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 05c38507a7..7474d33a63 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -80,6 +80,8 @@ func registerReceive(app *extkingpin.App) { MinBlockDuration: int64(time.Duration(*conf.tsdbMinBlockDuration) / time.Millisecond), MaxBlockDuration: int64(time.Duration(*conf.tsdbMaxBlockDuration) / time.Millisecond), RetentionDuration: int64(time.Duration(*conf.retention) / time.Millisecond), + OutOfOrderTimeWindow: int64(time.Duration(*conf.tsdbOutOfOrderTimeWindow) / time.Millisecond), + OutOfOrderCapMax: conf.tsdbOutOfOrderCapMax, NoLockfile: conf.noLockFile, WALCompression: conf.walCompression, MaxExemplars: conf.tsdbMaxExemplars, @@ -775,6 +777,8 @@ type receiveConfig struct { tsdbMinBlockDuration *model.Duration tsdbMaxBlockDuration *model.Duration + tsdbOutOfOrderTimeWindow *model.Duration + tsdbOutOfOrderCapMax int64 tsdbAllowOverlappingBlocks bool tsdbMaxExemplars int64 tsdbWriteQueueSize int64 @@ -861,6 +865,15 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) + rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window", + "[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+ + "Please note if you enable this option and you use compactor, make sure you have the --enable-vertical-compaction flag enabled, otherwise you might risk compactor halt.", + ).Default("0s").Hidden()) + + cmd.Flag("tsdb.out-of-order.cap-max", + "[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value 32 is assumed.", + ).Default("0").Int64Var(&rc.tsdbOutOfOrderCapMax) + cmd.Flag("tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge. Does not do anything, enabled all the time.").Default("false").BoolVar(&rc.tsdbAllowOverlappingBlocks) cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").BoolVar(&rc.walCompression) diff --git a/docs/components/receive.md b/docs/components/receive.md index fbf4f9a4db..87dc38bf4e 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -356,6 +356,10 @@ Flags: --tsdb.no-lockfile Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup. + --tsdb.out-of-order.cap-max=0 + [EXPERIMENTAL] Configures the maximum capacity + for out-of-order chunks (in samples). If set to + <=0, default value 32 is assumed. --tsdb.path="./data" Data directory of TSDB. --tsdb.retention=15d How long to retain raw samples on local storage. 0d - disables this retention. diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 31b735710f..dbd3751536 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -50,6 +50,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR numSamplesOutOfOrder = 0 numSamplesDuplicates = 0 numSamplesOutOfBounds = 0 + numSamplesTooOld = 0 numExemplarsOutOfOrder = 0 numExemplarsDuplicate = 0 @@ -120,6 +121,9 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR case storage.ErrOutOfBounds: numSamplesOutOfBounds++ level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) + case storage.ErrTooOldSample: + numSamplesTooOld++ + level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", s.Value, "timestamp", s.Timestamp) default: if err != nil { level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err) @@ -185,6 +189,10 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds) errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds)) } + if numSamplesTooOld > 0 { + level.Warn(tLogger).Log("msg", "Error on ingesting samples that are outside of the allowed out-of-order time window", "numDropped", numSamplesTooOld) + errs.Add(errors.Wrapf(storage.ErrTooOldSample, "add %d samples", numSamplesTooOld)) + } if numExemplarsOutOfOrder > 0 { level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)