diff --git a/CHANGELOG.md b/CHANGELOG.md index 5975d8b2..1238644b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` - new public interface `SizeReader: Size() int64` diff --git a/block.go b/block.go index 837f4a76..42e11d95 100644 --- a/block.go +++ b/block.go @@ -191,6 +191,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Indicates that during compaction it resulted in a block without any samples + // so it should be deleted on the next reload. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` diff --git a/block_test.go b/block_test.go index 789aebaa..724ab378 100644 --- a/block_test.go +++ b/block_test.go @@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - blockDir := createBlock(t, tmpdir, 0, 0, 0) + blockDir := createBlock(t, tmpdir, 1, 0, 0) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -91,6 +91,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - return filepath.Join(dir, ulid.String()) } diff --git a/compact.go b/compact.go index 49d4e586..5d8155f5 100644 --- a/compact.go +++ b/compact.go @@ -55,12 +55,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -186,13 +191,12 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return res, nil } - // Compact any blocks that have >5% tombstones. + // Compact any blocks with big enough time range that have >5% tombstones. for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { break } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { return []string{dms[i].dir}, nil } @@ -366,15 +370,34 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - "duration", time.Since(start), - ) + if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} + level.Info(c.logger).Log( + "msg", "compact blocks resulted in empty block", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + "duration", time.Since(start), + ) + } return uid, nil } @@ -413,6 +436,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } @@ -490,11 +517,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } - - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to @@ -506,6 +528,18 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, newMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") diff --git a/db.go b/db.go index 63070043..1349dfbd 100644 --- a/db.go +++ b/db.go @@ -417,7 +417,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -426,6 +427,14 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } + if (uid == ulid.ULID{}) { + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } + } runtime.GC() } @@ -588,6 +597,12 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime }) + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + for ulid, block := range db.beyondTimeRetention(blocks) { deletable[ulid] = block } diff --git a/db_test.go b/db_test.go index 2beef0c5..21de405e 100644 --- a/db_test.go +++ b/db_test.go @@ -836,7 +836,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), 0, 0, 0) + blockDir := createBlock(t, db.Dir(), 1, 0, 0) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -1364,6 +1364,109 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } +func TestNoEmptyBlocks(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewMustRegexpMatcher("", ".*") + + t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { + testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") + }) + + t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + + app = db.Appender() + _, err = app.Add(defaultLabel, 1, 0) + testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + + // Adding new blocks. + currentTime := db.Head().MaxTime() + _, err = app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + testutil.Ok(t, db.compact()) + testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") + }) + + t.Run(`When no new block is created from head, and there are some blocks on disk + compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { + oldBlocks := db.Blocks() + app := db.Appender() + currentTime := db.Head().MaxTime() + _, err := app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + testutil.Equals(t, oldBlocks, db.Blocks()) + }) + + t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { + currentTime := db.Head().MaxTime() + blocks := []*BlockMeta{ + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, + {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, + } + for _, m := range blocks { + createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) + } + + oldBlocks := db.Blocks() + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") + + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") + }) +} + func TestDB_LabelNames(t *testing.T) { tests := []struct { // Add 'sampleLabels1' -> Test Head -> Compact -> Test Disk -> @@ -1468,12 +1571,13 @@ func TestCorrectNumTombstones(t *testing.T) { defer db.Close() blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*blockRange+j, 0) testutil.Ok(t, err) } } @@ -1483,17 +1587,17 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 1, len(db.blocks)) - testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) }