Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Receive: Add parameter to set out-of-order time window #5839

Merged
merged 7 commits into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
- [#5836](https://github.com/thanos-io/thanos/pull/5836) Receive: Add hidden flag `tsdb.memory-snapshot-on-shutdown` to enable experimental TSDB feature to snapshot on shutdown. This is intended to speed up receiver restart.

### Changed
Expand Down
13 changes: 13 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func registerReceive(app *extkingpin.App) {
MinBlockDuration: int64(time.Duration(*conf.tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*conf.tsdbMaxBlockDuration) / time.Millisecond),
RetentionDuration: int64(time.Duration(*conf.retention) / time.Millisecond),
OutOfOrderTimeWindow: int64(time.Duration(*conf.tsdbOutOfOrderTimeWindow) / time.Millisecond),
OutOfOrderCapMax: conf.tsdbOutOfOrderCapMax,
NoLockfile: conf.noLockFile,
WALCompression: conf.walCompression,
MaxExemplars: conf.tsdbMaxExemplars,
Expand Down Expand Up @@ -775,6 +777,8 @@ type receiveConfig struct {

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
tsdbOutOfOrderTimeWindow *model.Duration
tsdbOutOfOrderCapMax int64
tsdbAllowOverlappingBlocks bool
tsdbMaxExemplars int64
tsdbWriteQueueSize int64
Expand Down Expand Up @@ -861,6 +865,15 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
"Please note if you enable this option and you use compactor, make sure you have the --enable-vertical-compaction flag enabled, otherwise you might risk compactor halt.",
).Default("0s").Hidden())

cmd.Flag("tsdb.out-of-order.cap-max",
"[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value 32 is assumed.",
).Default("0").Int64Var(&rc.tsdbOutOfOrderCapMax)

cmd.Flag("tsdb.allow-overlapping-blocks", "Allow overlapping blocks, which in turn enables vertical compaction and vertical query merge. Does not do anything, enabled all the time.").Default("false").BoolVar(&rc.tsdbAllowOverlappingBlocks)

cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").BoolVar(&rc.walCompression)
Expand Down
4 changes: 4 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,10 @@ Flags:
--tsdb.no-lockfile Do not create lockfile in TSDB data directory.
In any case, the lockfiles will be deleted on
next startup.
--tsdb.out-of-order.cap-max=0
[EXPERIMENTAL] Configures the maximum capacity
for out-of-order chunks (in samples). If set to
<=0, default value 32 is assumed.
--tsdb.path="./data" Data directory of TSDB.
--tsdb.retention=15d How long to retain raw samples on local
storage. 0d - disables this retention.
Expand Down
8 changes: 8 additions & 0 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
numSamplesOutOfOrder = 0
numSamplesDuplicates = 0
numSamplesOutOfBounds = 0
numSamplesTooOld = 0

numExemplarsOutOfOrder = 0
numExemplarsDuplicate = 0
Expand Down Expand Up @@ -120,6 +121,9 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
case storage.ErrOutOfBounds:
numSamplesOutOfBounds++
level.Debug(tLogger).Log("msg", "Out of bounds metric", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
case storage.ErrTooOldSample:
numSamplesTooOld++
level.Debug(tLogger).Log("msg", "Sample is too old", "lset", lset, "value", s.Value, "timestamp", s.Timestamp)
default:
if err != nil {
level.Debug(tLogger).Log("msg", "Error ingesting sample", "err", err)
Expand Down Expand Up @@ -185,6 +189,10 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are too old or are too far into the future", "numDropped", numSamplesOutOfBounds)
errs.Add(errors.Wrapf(storage.ErrOutOfBounds, "add %d samples", numSamplesOutOfBounds))
}
if numSamplesTooOld > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting samples that are outside of the allowed out-of-order time window", "numDropped", numSamplesTooOld)
errs.Add(errors.Wrapf(storage.ErrTooOldSample, "add %d samples", numSamplesTooOld))
}

if numExemplarsOutOfOrder > 0 {
level.Warn(tLogger).Log("msg", "Error on ingesting out-of-order exemplars", "numDropped", numExemplarsOutOfOrder)
Expand Down