From c69de24f1e14d8fc8f056699f483c235a5a11598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sun, 19 Nov 2023 05:17:32 +0200 Subject: [PATCH] compact: hook nodownsamplemarkfilter into filters chain (#6893) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We have a NoDownsampleMarkFilter that we were not using before in the compactor for some reason. Hook it into the filters chain if downsampling is enabled and then trim matching ULIDs from the downsampling process. Add a test to cover this scenario. Fixes https://github.com/thanos-io/thanos/issues/6179. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/compact.go | 61 ++++++++++++++++++++++------ cmd/thanos/downsample.go | 2 +- pkg/compact/downsample/downsample.go | 4 +- test/e2e/compact_test.go | 46 +++++++++++++++++++++ 4 files changed, 97 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 7ab7e056398..b813a0d4648 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -234,6 +234,7 @@ func runCompact( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency) noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) + noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig) consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) @@ -260,18 +261,21 @@ func runCompact( sy *compact.Syncer ) { + filters := []block.MetadataFilter{ + timePartitionMetaFilter, + labelShardedMetaFilter, + consistencyDelayMetaFilter, + ignoreDeletionMarkFilter, + block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), + duplicateBlocksFilter, + noCompactMarkerFilter, + } + if !conf.disableDownsampling { + filters = append(filters, noDownsampleMarkerFilter) + } // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. cf := baseMetaFetcher.NewMetaFetcher( - extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - timePartitionMetaFilter, - labelShardedMetaFilter, - consistencyDelayMetaFilter, - ignoreDeletionMarkFilter, - block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), - duplicateBlocksFilter, - noCompactMarkerFilter, - }, - ) + extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { api.SetLoaded(blocks, err) }) @@ -436,12 +440,30 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - for _, meta := range sy.Metas() { + filteredMetas := sy.Metas() + noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks() + for ul := range noDownsampleBlocks { + delete(filteredMetas, ul) + } + + for _, meta := range filteredMetas { groupKey := meta.Thanos.GroupKey() downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -449,9 +471,22 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } + level.Info(logger).Log("msg", "downsampling iterations done") } else { level.Info(logger).Log("msg", "downsampling was explicitly disabled") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 99132705d84..9a3418c1712 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -92,7 +92,7 @@ func RunDownsample( // While fetching blocks, filter out blocks that were marked for no downsample. metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), - downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt), + downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), }) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 829cc8d4565..14963602e64 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -808,11 +808,11 @@ type GatherNoDownsampleMarkFilter struct { } // NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. -func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter { return &GatherNoDownsampleMarkFilter{ logger: logger, bkt: bkt, - concurrency: 1, + concurrency: concurrency, } } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index eddeebdf683..e7fc6a03339 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -878,3 +878,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) { testutil.Ok(t, err) testutil.Equals(t, code, r.StatusCode) } + +func TestCompactorDownsampleIgnoresMarked(t *testing.T) { + now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") + testutil.Ok(t, err) + + logger := log.NewLogfmtLogger(os.Stderr) + e, err := e2e.NewDockerEnvironment("downsample-mrkd") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()) + bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test") + testutil.Ok(t, err) + + downsampledBase := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "block-about-to-be-downsampled"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)), + } + // New block that will be downsampled. + justAfterConsistencyDelay := 30 * time.Minute + + downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String())) + testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) + +}