From beb0d735ed5b906d7d4cd687bc5ff4c5e0b5c279 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Sat, 8 Sep 2018 08:28:45 +0530 Subject: [PATCH 01/21] Dont write empty blocks Signed-off-by: Ganesh Vernekar --- compact.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/compact.go b/compact.go index 1da13005..31d7e1e5 100644 --- a/compact.go +++ b/compact.go @@ -471,10 +471,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "write compaction") } - if err = writeMetaFile(tmp, meta); err != nil { - return errors.Wrap(err, "write merged meta") - } - if err = chunkw.Close(); err != nil { return errors.Wrap(err, "close chunk writer") } @@ -482,6 +478,17 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + if meta.Stats.NumSamples == 0 { + if err := os.RemoveAll(tmp); err != nil { + return errors.Wrap(err, "remove tmp folder after empty block failed") + } + return nil + } + + if err = writeMetaFile(tmp, meta); err != nil { + return errors.Wrap(err, "write merged meta") + } + // Create an empty tombstones file. if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") From bb76c82a3d7941065ed7c79f8280671ec8916a5e Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 12 Sep 2018 11:32:48 +0530 Subject: [PATCH 02/21] Added unit test TestNoEmptyBlock Signed-off-by: Ganesh Vernekar --- db_test.go | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/db_test.go b/db_test.go index 9c175118..c7477e35 100644 --- a/db_test.go +++ b/db_test.go @@ -1301,3 +1301,53 @@ func TestInitializeHeadTimestamp(t *testing.T) { testutil.Equals(t, int64(15000), db.head.MaxTime()) }) } + +func TestNoEmptyBlock(t *testing.T) { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + // Test no blocks after compact with empty head. + _, err := db.compact() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(db.blocks)) + + blockRange := DefaultOptions.BlockRanges[0] + label := labels.FromStrings("foo", "bar") + + app := db.Appender() + for i := int64(0); i < 3; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + _, err = app.Add(label, i*blockRange+1000, 0) + testutil.Ok(t, err) + } + err = app.Commit() + testutil.Ok(t, err) + + // Test no blocks after deleting all samples from head. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) + _, err = db.compact() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(db.blocks)) + + app = db.Appender() + for i := int64(3); i < 6; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + _, err = app.Add(label, i*blockRange+1000, 0) + testutil.Ok(t, err) + } + err = app.Commit() + testutil.Ok(t, err) + + _, err = db.compact() + testutil.Ok(t, err) + testutil.Equals(t, 1, len(db.blocks)) + + // No blocks after deleting all samples from disk. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) + _, err = db.compact() + testutil.Ok(t, err) + testutil.Equals(t, 0, len(db.blocks)) +} From c4edbccf6afa820d622e0442b2392ea9667f077f Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 18 Sep 2018 19:41:08 +0530 Subject: [PATCH 03/21] Fix infinite loop while compacting head. When number of blocks becomes 0, head is not truncated in reload(). This fixes it. Signed-off-by: Ganesh Vernekar --- db.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/db.go b/db.go index e6a0a74b..452d58e3 100644 --- a/db.go +++ b/db.go @@ -383,6 +383,22 @@ func (db *DB) compact() (changes bool, err error) { if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + // After this is fixed https://github.com/prometheus/tsdb/issues/309, + // if the compaction of head did not create any new blocks (possible + // if the samples were deleted in that range), and if number of + // blocks are 0, db.reload() does not truncate head. Hence MinTime + // of head is not set properly. So only for that situation, we need + // to call db.head.Truncate(..) manually. + db.mtx.RLock() + l := len(db.blocks) + db.mtx.RUnlock() + if l == 0 { + err = db.head.Truncate(maxt) + if err != nil { + return changes, errors.Wrap(err, "head truncate failed (in compact)") + + } + } runtime.GC() } From 9de39263d5b0c0e242679b365081f021e35f980d Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 19 Sep 2018 12:40:27 +0530 Subject: [PATCH 04/21] Fix deletion of old blocks after no block is written. With previous design if a block is not created from parent blocks, the parent block gets undeleted. This adds `Deletable` field in `BlockMetaCompaction` which helps find such parent blocks. Signed-off-by: Ganesh Vernekar --- block.go | 4 ++++ compact.go | 31 +++++++++++++++++++++++-------- db.go | 3 +++ db_test.go | 27 ++++++++++++++++++--------- 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/block.go b/block.go index 1a45fb97..677b93c6 100644 --- a/block.go +++ b/block.go @@ -178,6 +178,10 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // As we dont write empty blocks, we need this to mark + // if the block is deletable if in compaction this block + // resulted in an empty block. + Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create // this block. Parents []BlockDesc `json:"parents,omitempty"` diff --git a/compact.go b/compact.go index 31d7e1e5..82737eb0 100644 --- a/compact.go +++ b/compact.go @@ -349,14 +349,29 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, meta := compactBlockMetas(uid, metas...) err = c.write(dest, meta, blocks...) if err == nil { - level.Info(c.logger).Log( - "msg", "compact blocks", - "count", len(blocks), - "mint", meta.MinTime, - "maxt", meta.MaxTime, - "ulid", meta.ULID, - "sources", fmt.Sprintf("%v", uids), - ) + + if meta.Stats.NumSamples == 0 { + level.Info(c.logger).Log( + "msg", "compact blocks [resulted in empty block]", + "count", len(blocks), + "sources", fmt.Sprintf("%v", uids), + ) + for _, b := range bs { + b.meta.Compaction.Deletable = true + writeMetaFile(b.dir, &b.meta) + } + uid = ulid.ULID{} + } else { + level.Info(c.logger).Log( + "msg", "compact blocks", + "count", len(blocks), + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "sources", fmt.Sprintf("%v", uids), + ) + } + return uid, nil } diff --git a/db.go b/db.go index 452d58e3..c5c79cc1 100644 --- a/db.go +++ b/db.go @@ -484,6 +484,9 @@ func (db *DB) reload() (err error) { corrupted[ulid] = err continue } + if meta.Compaction.Deletable { + deleteable[meta.ULID] = struct{}{} + } if db.beyondRetention(meta) { deleteable[meta.ULID] = struct{}{} continue diff --git a/db_test.go b/db_test.go index c7477e35..f3c3cef6 100644 --- a/db_test.go +++ b/db_test.go @@ -1312,6 +1312,7 @@ func TestNoEmptyBlock(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 0, len(db.blocks)) + // Test no blocks after deleting all samples from head. blockRange := DefaultOptions.BlockRanges[0] label := labels.FromStrings("foo", "bar") @@ -1325,29 +1326,37 @@ func TestNoEmptyBlock(t *testing.T) { err = app.Commit() testutil.Ok(t, err) - // Test no blocks after deleting all samples from head. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) _, err = db.compact() testutil.Ok(t, err) + // No blocks created. testutil.Equals(t, 0, len(db.blocks)) + // Test no blocks remaining after small samples are deleted from disk. app = db.Appender() - for i := int64(3); i < 6; i++ { - _, err := app.Add(label, i*blockRange, 0) - testutil.Ok(t, err) - _, err = app.Add(label, i*blockRange+1000, 0) - testutil.Ok(t, err) + for i := int64(3); i < 20; i++ { + for j := int64(0); j < 10; j++ { + _, err := app.Add(label, i*blockRange+j, 0) + testutil.Ok(t, err) + } } err = app.Commit() testutil.Ok(t, err) _, err = db.compact() testutil.Ok(t, err) - testutil.Equals(t, 1, len(db.blocks)) + testutil.Assert(t, len(db.blocks) > 0, "No blocks created") - // No blocks after deleting all samples from disk. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - _, err = db.compact() + + // Mimicking small part of compaction. + plan := []string{} + for _, b := range db.blocks { + plan = append(plan, b.Dir()) + } + _, err = db.compactor.Compact(db.dir, plan...) testutil.Ok(t, err) + testutil.Ok(t, db.reload()) + // All samples are deleted. No blocks should be remeianing after compact. testutil.Equals(t, 0, len(db.blocks)) } From ab14c9c85ea534dd7a224a4e5bfbcea3fde8b1a8 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 20 Sep 2018 00:25:59 +0530 Subject: [PATCH 05/21] Fix review comments Signed-off-by: Ganesh Vernekar --- block.go | 2 +- compact.go | 3 +++ db.go | 14 ++++++++------ db_test.go | 37 +++++++++++++++++++++++++++++++------ 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/block.go b/block.go index 677b93c6..232f96e3 100644 --- a/block.go +++ b/block.go @@ -179,7 +179,7 @@ type BlockMetaCompaction struct { // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` // As we dont write empty blocks, we need this to mark - // if the block is deletable if in compaction this block + // if the block is deletable if during compaction this block // resulted in an empty block. Deletable bool `json:"deletable,omitempty"` // Short descriptions of the direct blocks that were used to create diff --git a/compact.go b/compact.go index 82737eb0..d4a2dd30 100644 --- a/compact.go +++ b/compact.go @@ -59,6 +59,8 @@ type Compactor interface { // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). + // Compaction resulting in an empty block are not written to the disk + // and marks all parents as deletable in its meta data. Compact(dest string, dirs ...string) (ulid.ULID, error) } @@ -493,6 +495,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe return errors.Wrap(err, "close index writer") } + // Populated block is empty, so cleanup and exit. if meta.Stats.NumSamples == 0 { if err := os.RemoveAll(tmp); err != nil { return errors.Wrap(err, "remove tmp folder after empty block failed") diff --git a/db.go b/db.go index c5c79cc1..66407a2b 100644 --- a/db.go +++ b/db.go @@ -383,16 +383,18 @@ func (db *DB) compact() (changes bool, err error) { if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } - // After this is fixed https://github.com/prometheus/tsdb/issues/309, - // if the compaction of head did not create any new blocks (possible - // if the samples were deleted in that range), and if number of - // blocks are 0, db.reload() does not truncate head. Hence MinTime - // of head is not set properly. So only for that situation, we need - // to call db.head.Truncate(..) manually. db.mtx.RLock() l := len(db.blocks) db.mtx.RUnlock() if l == 0 { + // After this was fixed https://github.com/prometheus/tsdb/issues/309, + // if the compaction of head did not create any new blocks (if all samples + // were deleted), it is possible to have 0 blocks after compaction. + // + // db.reload() doesn't truncate the head when the block count is zero. + // + // In this case we have 0 blocks because all data has been deleted + // so the head needs to be reset. err = db.head.Truncate(maxt) if err != nil { return changes, errors.Wrap(err, "head truncate failed (in compact)") diff --git a/db_test.go b/db_test.go index f3c3cef6..83e30dcf 100644 --- a/db_test.go +++ b/db_test.go @@ -1302,7 +1302,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { }) } -func TestNoEmptyBlock(t *testing.T) { +func TestNoEmptyBlocks(t *testing.T) { db, close := openTestDB(t, nil) defer close() defer db.Close() @@ -1317,7 +1317,7 @@ func TestNoEmptyBlock(t *testing.T) { label := labels.FromStrings("foo", "bar") app := db.Appender() - for i := int64(0); i < 3; i++ { + for i := int64(0); i < 6; i++ { _, err := app.Add(label, i*blockRange, 0) testutil.Ok(t, err) _, err = app.Add(label, i*blockRange+1000, 0) @@ -1326,15 +1326,18 @@ func TestNoEmptyBlock(t *testing.T) { err = app.Commit() testutil.Ok(t, err) + oldHeadMinT := db.head.MinTime() testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) _, err = db.compact() testutil.Ok(t, err) + // Making sure that head was modified. + testutil.Assert(t, oldHeadMinT < db.head.MinTime(), "Head was not changed after compaction.") // No blocks created. testutil.Equals(t, 0, len(db.blocks)) - // Test no blocks remaining after small samples are deleted from disk. + // Test no blocks remaining after deleting all samples from disk. app = db.Appender() - for i := int64(3); i < 20; i++ { + for i := int64(7); i < 25; i++ { for j := int64(0); j < 10; j++ { _, err := app.Add(label, i*blockRange+j, 0) testutil.Ok(t, err) @@ -1345,17 +1348,39 @@ func TestNoEmptyBlock(t *testing.T) { _, err = db.compact() testutil.Ok(t, err) - testutil.Assert(t, len(db.blocks) > 0, "No blocks created") + testutil.Assert(t, len(db.blocks) > 0, "No blocks created when compacting with >0 samples") testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - // Mimicking small part of compaction. + // Mimicking Plan() of compactor and getting list + // of all block directories to pass for compaction. plan := []string{} for _, b := range db.blocks { plan = append(plan, b.Dir()) } + + // Blocks are not set deletable before compaction. + for _, b := range db.blocks { + meta, err := readMetaFile(b.dir) + testutil.Ok(t, err) + testutil.Assert(t, !meta.Compaction.Deletable, "Block was marked deletable before compaction") + } + + // No new blocks are created by Compact, and marks all old blocks as deletable. + oldLen := len(db.blocks) _, err = db.compactor.Compact(db.dir, plan...) testutil.Ok(t, err) + // Number of blocks are the same. + testutil.Equals(t, oldLen, len(db.blocks)) + + // Marked as deletable. + for _, b := range db.blocks { + meta, err := readMetaFile(b.dir) + testutil.Ok(t, err) + testutil.Assert(t, meta.Compaction.Deletable, "Block was not marked deletable after compaction") + } + + // Deletes the deletable blocks. testutil.Ok(t, db.reload()) // All samples are deleted. No blocks should be remeianing after compact. testutil.Equals(t, 0, len(db.blocks)) From 897dd33ca1b5a67adfc51e8368a230d8c5dbd300 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 28 Sep 2018 18:18:24 +0530 Subject: [PATCH 06/21] Fixed infinite loop for head compaction Signed-off-by: Ganesh Vernekar --- compact.go | 4 ++++ db.go | 24 +++++++++--------------- db_test.go | 30 ++++++++++++++++++------------ 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/compact.go b/compact.go index 6dc55295..87f1bb3b 100644 --- a/compact.go +++ b/compact.go @@ -412,6 +412,10 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return uid, err } + if meta.Stats.NumSamples == 0 { + return ulid.ULID{}, nil + } + level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) return uid, nil } diff --git a/db.go b/db.go index 0c1ac82d..caa3c8e9 100644 --- a/db.go +++ b/db.go @@ -401,7 +401,8 @@ func (db *DB) compact() (err error) { // from the block interval here. maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { + uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) + if err != nil { return errors.Wrap(err, "persist head block") } @@ -410,22 +411,15 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } - db.mtx.RLock() - l := len(db.blocks) - db.mtx.RUnlock() - if l == 0 { + if (uid == ulid.ULID{}) { // No block created. // After this was fixed https://github.com/prometheus/tsdb/issues/309, - // if the compaction of head did not create any new blocks (if all samples - // were deleted), it is possible to have 0 blocks after compaction. - // - // db.reload() doesn't truncate the head when the block count is zero. - // - // In this case we have 0 blocks because all data has been deleted - // so the head needs to be reset. - err = db.head.Truncate(maxt) - if err != nil { + // Case 1: It is possible to have 0 blocks after compaction. + // db.reload() doesn't truncate the head when the block count is zero. + // Case 2: If there were blocks on disk, head will be truncated to a + // wrong value based on old blocks. + // Hence we need to truncate manually. + if err = db.head.Truncate(maxt); err != nil { return errors.Wrap(err, "head truncate failed (in compact)") - } } runtime.GC() diff --git a/db_test.go b/db_test.go index c3da91c6..f6220fbc 100644 --- a/db_test.go +++ b/db_test.go @@ -1309,8 +1309,7 @@ func TestNoEmptyBlocks(t *testing.T) { defer db.Close() // Test no blocks after compact with empty head. - err := db.compact() - testutil.Ok(t, err) + testutil.Ok(t, db.compact()) testutil.Equals(t, 0, len(db.blocks)) // Test no blocks after deleting all samples from head. @@ -1324,19 +1323,16 @@ func TestNoEmptyBlocks(t *testing.T) { _, err = app.Add(label, i*blockRange+1000, 0) testutil.Ok(t, err) } - err = app.Commit() - testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) oldHeadMinT := db.head.MinTime() testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - err = db.compact() - testutil.Ok(t, err) + testutil.Ok(t, db.compact()) // Making sure that head was modified. testutil.Assert(t, oldHeadMinT < db.head.MinTime(), "Head was not changed after compaction.") // No blocks created. testutil.Equals(t, 0, len(db.blocks)) - // Test no blocks remaining after deleting all samples from disk. app = db.Appender() for i := int64(7); i < 25; i++ { for j := int64(0); j < 10; j++ { @@ -1344,13 +1340,23 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) } } - err = app.Commit() - testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) - err = db.compact() - testutil.Ok(t, err) + testutil.Ok(t, db.compact()) testutil.Assert(t, len(db.blocks) > 0, "No blocks created when compacting with >0 samples") + // When no new block is created from head, and there are some blocks on disk, + // compaction should not run into infinite loop (was seen during development). + app = db.Appender() + for i := int64(26); i < 30; i++ { + _, err := app.Add(label, i*blockRange, 0) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.compact()) + + // Test no blocks remaining after deleting all samples from disk. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) // Mimicking Plan() of compactor and getting list @@ -1369,7 +1375,7 @@ func TestNoEmptyBlocks(t *testing.T) { // No new blocks are created by Compact, and marks all old blocks as deletable. oldLen := len(db.blocks) - _, err = db.compactor.Compact(db.dir, plan...) + _, err := db.compactor.Compact(db.dir, plan...) testutil.Ok(t, err) // Number of blocks are the same. testutil.Equals(t, oldLen, len(db.blocks)) From 0faafecddbd06c11f4898535a26061cfb4dfb2ec Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 3 Oct 2018 18:32:17 +0530 Subject: [PATCH 07/21] Test fix attempt Signed-off-by: Ganesh Vernekar --- db_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/db_test.go b/db_test.go index f6220fbc..765edfa6 100644 --- a/db_test.go +++ b/db_test.go @@ -1325,11 +1325,12 @@ func TestNoEmptyBlocks(t *testing.T) { } testutil.Ok(t, app.Commit()) - oldHeadMinT := db.head.MinTime() testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - testutil.Ok(t, db.compact()) - // Making sure that head was modified. - testutil.Assert(t, oldHeadMinT < db.head.MinTime(), "Head was not changed after compaction.") + uid, err := db.compactor.Write(db.dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) + testutil.Ok(t, err) + testutil.Equals(t, ulid.ULID{}, uid) + testutil.Ok(t, db.reload()) + testutil.Ok(t, db.head.Truncate(db.head.MaxTime())) // No blocks created. testutil.Equals(t, 0, len(db.blocks)) @@ -1375,7 +1376,7 @@ func TestNoEmptyBlocks(t *testing.T) { // No new blocks are created by Compact, and marks all old blocks as deletable. oldLen := len(db.blocks) - _, err := db.compactor.Compact(db.dir, plan...) + _, err = db.compactor.Compact(db.dir, plan...) testutil.Ok(t, err) // Number of blocks are the same. testutil.Equals(t, oldLen, len(db.blocks)) From 38a2c6b6bd454443896a401c236fdea8383441d5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 22 Nov 2018 10:10:05 +0530 Subject: [PATCH 08/21] Updated tests Signed-off-by: Ganesh Vernekar --- db_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/db_test.go b/db_test.go index af5b35d7..fd424f9a 100644 --- a/db_test.go +++ b/db_test.go @@ -1307,6 +1307,7 @@ func TestNoEmptyBlocks(t *testing.T) { db, close := openTestDB(t, nil) defer close() defer db.Close() + db.DisableCompactions() // Test no blocks after compact with empty head. testutil.Ok(t, db.compact()) @@ -1348,6 +1349,7 @@ func TestNoEmptyBlocks(t *testing.T) { // When no new block is created from head, and there are some blocks on disk, // compaction should not run into infinite loop (was seen during development). + oldBlocks := db.Blocks() app = db.Appender() for i := int64(26); i < 30; i++ { _, err := app.Add(label, i*blockRange, 0) @@ -1356,6 +1358,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, app.Commit()) testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) testutil.Ok(t, db.compact()) + testutil.Equals(t, oldBlocks, db.Blocks()) // Test no blocks remaining after deleting all samples from disk. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) @@ -1371,15 +1374,15 @@ func TestNoEmptyBlocks(t *testing.T) { for _, b := range db.Blocks() { meta, err := readMetaFile(b.dir) testutil.Ok(t, err) - testutil.Assert(t, !meta.Compaction.Deletable, "Block was marked deletable before compaction") + testutil.Assert(t, meta.Compaction.Deletable == false, "Block was marked deletable before compaction") } // No new blocks are created by Compact, and marks all old blocks as deletable. - oldLen := len(db.Blocks()) + oldBlocks = db.Blocks() _, err = db.compactor.Compact(db.dir, plan, db.Blocks()) testutil.Ok(t, err) - // Number of blocks are the same. - testutil.Equals(t, oldLen, len(db.Blocks())) + // Blocks are the same. + testutil.Equals(t, oldBlocks, db.Blocks()) // Marked as deletable. for _, b := range db.Blocks() { @@ -1390,7 +1393,7 @@ func TestNoEmptyBlocks(t *testing.T) { // Deletes the deletable blocks. testutil.Ok(t, db.reload()) - // All samples are deleted. No blocks should be remeianing after compact. + // All samples are deleted. No blocks should be remaining after compact. testutil.Equals(t, 0, len(db.Blocks())) } From 061971bc02be4e5546188d2918ef6148f70244f0 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 23 Nov 2018 16:42:31 +0530 Subject: [PATCH 09/21] Fix review comments * Handle error for writeMetaFile * Comments in Compactor interface Signed-off-by: Ganesh Vernekar --- compact.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/compact.go b/compact.go index 61a70085..0a2ca6a3 100644 --- a/compact.go +++ b/compact.go @@ -55,12 +55,17 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). // Can optionally pass a list of already open blocks, // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. Compact(dest string, dirs []string, open []*Block) (ulid.ULID, error) } @@ -367,17 +372,22 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u err = c.write(dest, meta, blocks...) if err == nil { if meta.Stats.NumSamples == 0 { + for _, b := range bs { + b.meta.Compaction.Deletable = true + if err = writeMetaFile(b.dir, &b.meta); err != nil { + level.Error(c.logger).Log( + "msg", "Failed to write 'Deletable' to meta file after compaction", + "ulid", b.meta.ULID, + ) + } + } + uid = ulid.ULID{} level.Info(c.logger).Log( "msg", "compact blocks [resulted in empty block]", "count", len(blocks), "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), ) - for _, b := range bs { - b.meta.Compaction.Deletable = true - writeMetaFile(b.dir, &b.meta) - } - uid = ulid.ULID{} } else { level.Info(c.logger).Log( "msg", "compact blocks", From 2378d2b6fbfb7fb14b8df5632450781a5b863737 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Sat, 1 Dec 2018 23:33:00 +0530 Subject: [PATCH 10/21] Updated tests and added CHANGELOG entry Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 1 + db_test.go | 20 ++++++++++++++++---- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11f704b2..1bbb9538 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,3 +3,4 @@ - `LastCheckpoint` used to return just the segment name and now it returns the full relative path. - `NewSegmentsRangeReader` can now read over miltiple wal ranges by using the new `SegmentRange` struct. - `CorruptionErr` now also exposes the Segment `Dir` which is added when displaying any errors. + - \[CHANGE\] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) \ No newline at end of file diff --git a/db_test.go b/db_test.go index 64ff04f1..137b790f 100644 --- a/db_test.go +++ b/db_test.go @@ -1317,7 +1317,10 @@ func TestNoEmptyBlocks(t *testing.T) { // Test no blocks after compact with empty head. testutil.Ok(t, db.compact()) - testutil.Equals(t, 0, len(db.Blocks())) + bb, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(bb)) + testutil.Equals(t, 0, len(bb)) // Test no blocks after deleting all samples from head. blockRange := DefaultOptions.BlockRanges[0] @@ -1339,7 +1342,10 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, db.reload()) testutil.Ok(t, db.head.Truncate(db.head.MaxTime())) // No blocks created. - testutil.Equals(t, 0, len(db.Blocks())) + bb, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(bb)) + testutil.Equals(t, 0, len(bb)) app = db.Appender() for i := int64(7); i < 25; i++ { @@ -1351,7 +1357,10 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, app.Commit()) testutil.Ok(t, db.compact()) - testutil.Assert(t, len(db.Blocks()) > 0, "No blocks created when compacting with >0 samples") + bb, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(bb)) + testutil.Assert(t, len(bb) > 0, "No blocks created when compacting with >0 samples") // When no new block is created from head, and there are some blocks on disk, // compaction should not run into infinite loop (was seen during development). @@ -1400,7 +1409,10 @@ func TestNoEmptyBlocks(t *testing.T) { // Deletes the deletable blocks. testutil.Ok(t, db.reload()) // All samples are deleted. No blocks should be remaining after compact. - testutil.Equals(t, 0, len(db.Blocks())) + bb, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(bb)) + testutil.Equals(t, 0, len(bb)) } func TestDB_LabelNames(t *testing.T) { From e0bb7579061ae394782c7a0670484fb42c122c48 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 16 Jan 2019 14:52:00 +0200 Subject: [PATCH 11/21] rebased Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 1 + db.go | 30 +++++++++++------------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4447168..820dfb85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` - new public interface `SizeReader: Size() int64` diff --git a/db.go b/db.go index 375e3489..1c6099cf 100644 --- a/db.go +++ b/db.go @@ -427,16 +427,11 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } - if (uid == ulid.ULID{}) { // No block created. - // After this was fixed https://github.com/prometheus/tsdb/issues/309, - // Case 1: It is possible to have 0 blocks after compaction. - // db.reload() doesn't truncate the head when the block count is zero. - // Case 2: If there were blocks on disk, head will be truncated to a - // wrong value based on old blocks. - // Hence we need to truncate manually. - if err = db.head.Truncate(maxt); err != nil { - return errors.Wrap(err, "head truncate failed (in compact)") - } + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if (uid == ulid.ULID{}) { + return errors.Wrap(db.head.Truncate(maxt), "head truncate failed (in compact)") } runtime.GC() } @@ -521,14 +516,6 @@ func (db *DB) reload() (err error) { deletable[block.Meta().ULID] = block continue } -<<<<<<< HEAD - if meta.Compaction.Deletable { - deleteable[meta.ULID] = struct{}{} - } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} - } -======= bb = append(bb, block) blocksSize += block.Size() @@ -541,7 +528,6 @@ func (db *DB) reload() (err error) { }) if err := validateBlockSequence(loadable); err != nil { return errors.Wrap(err, "invalid block sequence") ->>>>>>> upstream/master } // Swap new blocks first for subsequently created readers to be seen. @@ -609,6 +595,12 @@ func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime }) + for _, block := range blocks { + if block.Meta().Compaction.Deletable { + deletable[block.Meta().ULID] = block + } + } + for ulid, block := range db.beyondTimeRetention(blocks) { deletable[ulid] = block } From 0eed036edb6de3a8a4597a18330631de86d12b91 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 16 Jan 2019 20:40:03 +0530 Subject: [PATCH 12/21] Revert returning after empty block from head Signed-off-by: Ganesh Vernekar --- db.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/db.go b/db.go index 1c6099cf..1349dfbd 100644 --- a/db.go +++ b/db.go @@ -427,11 +427,13 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { return errors.Wrap(err, "reload blocks") } - // Compaction resulted in an empty block. - // Head truncating during db.reload() depends on the persisted blocks and - // in this case no new block will be persisted so manually truncate the head. if (uid == ulid.ULID{}) { - return errors.Wrap(db.head.Truncate(maxt), "head truncate failed (in compact)") + // Compaction resulted in an empty block. + // Head truncating during db.reload() depends on the persisted blocks and + // in this case no new block will be persisted so manually truncate the head. + if err = db.head.Truncate(maxt); err != nil { + return errors.Wrap(err, "head truncate failed (in compact)") + } } runtime.GC() } From 271b98f53a0b62d9f356d3e4ccaedad9d2d8e537 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 16 Jan 2019 21:21:57 +0530 Subject: [PATCH 13/21] Fix review comments Signed-off-by: Ganesh Vernekar --- db_test.go | 53 +++++++++++++++++++---------------------------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/db_test.go b/db_test.go index 7ffe52d1..8d632104 100644 --- a/db_test.go +++ b/db_test.go @@ -1378,24 +1378,22 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Equals(t, 0, len(bb)) // Test no blocks after deleting all samples from head. - blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 6; i++ { - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0) testutil.Ok(t, err) - _, err = app.Add(label, i*blockRange+1000, 0) + _, err = app.Add(defaultLabel, i*rangeToTriggercompaction+1000, 0) testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) - testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) - uid, err := db.compactor.Write(db.dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) - testutil.Ok(t, err) - testutil.Equals(t, ulid.ULID{}, uid) - testutil.Ok(t, db.reload()) - testutil.Ok(t, db.head.Truncate(db.head.MaxTime())) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + // No blocks created. bb, err = blockDirs(db.Dir()) testutil.Ok(t, err) @@ -1405,7 +1403,7 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() for i := int64(7); i < 25; i++ { for j := int64(0); j < 10; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*rangeToTriggercompaction+j, 0) testutil.Ok(t, err) } } @@ -1422,16 +1420,16 @@ func TestNoEmptyBlocks(t *testing.T) { oldBlocks := db.Blocks() app = db.Appender() for i := int64(26); i < 30; i++ { - _, err := app.Add(label, i*blockRange, 0) + _, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0) testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) - testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.compact()) testutil.Equals(t, oldBlocks, db.Blocks()) // Test no blocks remaining after deleting all samples from disk. - testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) // Mimicking Plan() of compactor and getting list // of all block directories to pass for compaction. @@ -1440,13 +1438,6 @@ func TestNoEmptyBlocks(t *testing.T) { plan = append(plan, b.Dir()) } - // Blocks are not set deletable before compaction. - for _, b := range db.Blocks() { - meta, err := readMetaFile(b.dir) - testutil.Ok(t, err) - testutil.Assert(t, meta.Compaction.Deletable == false, "Block was marked deletable before compaction") - } - // No new blocks are created by Compact, and marks all old blocks as deletable. oldBlocks = db.Blocks() _, err = db.compactor.Compact(db.dir, plan, db.Blocks()) @@ -1454,13 +1445,6 @@ func TestNoEmptyBlocks(t *testing.T) { // Blocks are the same. testutil.Equals(t, oldBlocks, db.Blocks()) - // Marked as deletable. - for _, b := range db.Blocks() { - meta, err := readMetaFile(b.dir) - testutil.Ok(t, err) - testutil.Assert(t, meta.Compaction.Deletable, "Block was not marked deletable after compaction") - } - // Deletes the deletable blocks. testutil.Ok(t, db.reload()) // All samples are deleted. No blocks should be remaining after compact. @@ -1574,12 +1558,13 @@ func TestCorrectNumTombstones(t *testing.T) { defer db.Close() blockRange := DefaultOptions.BlockRanges[0] - label := labels.FromStrings("foo", "bar") + defaultLabel := labels.FromStrings("foo", "bar") + defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) app := db.Appender() for i := int64(0); i < 3; i++ { for j := int64(0); j < 15; j++ { - _, err := app.Add(label, i*blockRange+j, 0) + _, err := app.Add(defaultLabel, i*blockRange+j, 0) testutil.Ok(t, err) } } @@ -1589,17 +1574,17 @@ func TestCorrectNumTombstones(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 1, len(db.blocks)) - testutil.Ok(t, db.Delete(0, 1, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(0, 1, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) // {0, 1} and {2, 3} are merged to form 1 tombstone. - testutil.Ok(t, db.Delete(2, 3, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(2, 3, defaultMatcher)) testutil.Equals(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(5, 6, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(5, 6, defaultMatcher)) testutil.Equals(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) - testutil.Ok(t, db.Delete(9, 11, labels.NewEqualMatcher("foo", "bar"))) + testutil.Ok(t, db.Delete(9, 11, defaultMatcher)) testutil.Equals(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) } From a6a1779c8d1779d7c10760bfa119d4e636578082 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 16 Jan 2019 21:42:49 +0530 Subject: [PATCH 14/21] Dont create empty blocks in tests Signed-off-by: Ganesh Vernekar --- block_test.go | 3 +-- db_test.go | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/block_test.go b/block_test.go index 789aebaa..724ab378 100644 --- a/block_test.go +++ b/block_test.go @@ -45,7 +45,7 @@ func TestSetCompactionFailed(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(tmpdir) - blockDir := createBlock(t, tmpdir, 0, 0, 0) + blockDir := createBlock(t, tmpdir, 1, 0, 0) b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) @@ -91,6 +91,5 @@ func createBlock(tb testing.TB, dir string, nSeries int, mint, maxt int64) strin ulid, err := compactor.Write(dir, head, head.MinTime(), head.MaxTime(), nil) testutil.Ok(tb, err) - return filepath.Join(dir, ulid.String()) } diff --git a/db_test.go b/db_test.go index 8d632104..70abfc25 100644 --- a/db_test.go +++ b/db_test.go @@ -836,7 +836,7 @@ func TestTombstoneCleanFail(t *testing.T) { // totalBlocks should be >=2 so we have enough blocks to trigger compaction failure. totalBlocks := 2 for i := 0; i < totalBlocks; i++ { - blockDir := createBlock(t, db.Dir(), 0, 0, 0) + blockDir := createBlock(t, db.Dir(), 1, 0, 0) block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. @@ -880,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 1, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) From 2b509e7c886c232563e4d3dac064e8330dc63d13 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 17 Jan 2019 17:12:33 +0530 Subject: [PATCH 15/21] nits Signed-off-by: Ganesh Vernekar --- compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compact.go b/compact.go index 5f1c4358..5ec07167 100644 --- a/compact.go +++ b/compact.go @@ -383,7 +383,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u } uid = ulid.ULID{} level.Info(c.logger).Log( - "msg", "compact blocks [resulted in empty block]", + "msg", "compact blocks resulted in empty block", "count", len(blocks), "sources", fmt.Sprintf("%v", uids), "duration", time.Since(start), From ad9e3ed58f683277aeec66308228201e6e4b7bcb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 17 Jan 2019 15:40:45 +0200 Subject: [PATCH 16/21] simplified test Signed-off-by: Krasi Georgiev --- compact.go | 7 ++- db_test.go | 155 ++++++++++++++++++++++++++++------------------------- 2 files changed, 86 insertions(+), 76 deletions(-) diff --git a/compact.go b/compact.go index 5f1c4358..ef615adb 100644 --- a/compact.go +++ b/compact.go @@ -195,15 +195,14 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { - break + continue } - if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { - return []string{dms[i].dir}, nil + res = append(res, dms[i].dir) } } - return nil, nil + return res, nil } // selectDirs returns the dir metas that should be compacted into a single new block. diff --git a/db_test.go b/db_test.go index 70abfc25..bbe53ad7 100644 --- a/db_test.go +++ b/db_test.go @@ -1365,93 +1365,104 @@ func TestInitializeHeadTimestamp(t *testing.T) { } func TestNoEmptyBlocks(t *testing.T) { - db, close := openTestDB(t, nil) + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) defer close() defer db.Close() db.DisableCompactions() - // Test no blocks after compact with empty head. - testutil.Ok(t, db.compact()) - bb, err := blockDirs(db.Dir()) - testutil.Ok(t, err) - testutil.Equals(t, len(db.Blocks()), len(bb)) - testutil.Equals(t, 0, len(bb)) - - // Test no blocks after deleting all samples from head. - rangeToTriggercompaction := DefaultOptions.BlockRanges[0]/2*3 + 1 + rangeToTriggercompaction := db.opts.BlockRanges[0] / 2 * 3 defaultLabel := labels.FromStrings("foo", "bar") - defaultMatcher := labels.NewEqualMatcher(defaultLabel[0].Name, defaultLabel[0].Value) + defaultMatcher := labels.NewMustRegexpMatcher("", ".*") - app := db.Appender() - for i := int64(0); i < 6; i++ { - _, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0) + t.Run("Test no blocks after compact with empty head.", func(t *testing.T) { + testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + }) + + t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, i*rangeToTriggercompaction+1000, 0) + _, err = app.Add(defaultLabel, 2, 0) testutil.Ok(t, err) - } - testutil.Ok(t, app.Commit()) + _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "initial compaction count should be zero") + testutil.Ok(t, db.compact()) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") - testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) - testutil.Ok(t, db.compact()) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 0, len(actBlocks)) + + app = db.Appender() + _, err = app.Add(defaultLabel, 1, 0) + testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + currentTime := db.Head().MaxTime() + _, err = app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) - // No blocks created. - bb, err = blockDirs(db.Dir()) - testutil.Ok(t, err) - testutil.Equals(t, len(db.Blocks()), len(bb)) - testutil.Equals(t, 0, len(bb)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 2, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Assert(t, len(actBlocks) == 1, "No blocks created when compacting with >0 samples") + }) - app = db.Appender() - for i := int64(7); i < 25; i++ { - for j := int64(0); j < 10; j++ { - _, err := app.Add(defaultLabel, i*rangeToTriggercompaction+j, 0) - testutil.Ok(t, err) + t.Run(`When no new block is created from head, and there are some blocks on disk + compaction should not run into infinite loop (was seen during development).`, func(t *testing.T) { + oldBlocks := db.Blocks() + app := db.Appender() + currentTime := db.Head().MaxTime() + _, err := app.Add(defaultLabel, currentTime, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) // ????????????? + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") + testutil.Equals(t, oldBlocks, db.Blocks()) + }) + + t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { + currentTime := db.Head().MaxTime() + blocks := []*BlockMeta{ + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, // + {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, + } + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) } - } - testutil.Ok(t, app.Commit()) - testutil.Ok(t, db.compact()) - bb, err = blockDirs(db.Dir()) - testutil.Ok(t, err) - testutil.Equals(t, len(db.Blocks()), len(bb)) - testutil.Assert(t, len(bb) > 0, "No blocks created when compacting with >0 samples") + oldBlocks := db.Blocks() + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. + testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) + testutil.Ok(t, db.compact()) + testutil.Equals(t, 4, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here as all blocks have tombstones") - // When no new block is created from head, and there are some blocks on disk, - // compaction should not run into infinite loop (was seen during development). - oldBlocks := db.Blocks() - app = db.Appender() - for i := int64(26); i < 30; i++ { - _, err := app.Add(defaultLabel, i*rangeToTriggercompaction, 0) + actBlocks, err := blockDirs(db.Dir()) testutil.Ok(t, err) - } - testutil.Ok(t, app.Commit()) - testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) - testutil.Ok(t, db.compact()) - testutil.Equals(t, oldBlocks, db.Blocks()) - - // Test no blocks remaining after deleting all samples from disk. - testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) - - // Mimicking Plan() of compactor and getting list - // of all block directories to pass for compaction. - plan := []string{} - for _, b := range db.Blocks() { - plan = append(plan, b.Dir()) - } - - // No new blocks are created by Compact, and marks all old blocks as deletable. - oldBlocks = db.Blocks() - _, err = db.compactor.Compact(db.dir, plan, db.Blocks()) - testutil.Ok(t, err) - // Blocks are the same. - testutil.Equals(t, oldBlocks, db.Blocks()) - - // Deletes the deletable blocks. - testutil.Ok(t, db.reload()) - // All samples are deleted. No blocks should be remaining after compact. - bb, err = blockDirs(db.Dir()) - testutil.Ok(t, err) - testutil.Equals(t, len(db.Blocks()), len(bb)) - testutil.Equals(t, 0, len(bb)) + testutil.Equals(t, len(db.Blocks()), len(actBlocks)) + testutil.Equals(t, 1, len(actBlocks), "All samples are deleted. Only the most recent block should remain after compaction.") + }) } func TestDB_LabelNames(t *testing.T) { From 02dc73225320d0d86ada86b2023705157270409c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 17 Jan 2019 15:51:11 +0200 Subject: [PATCH 17/21] solved the mistery for double compaction Signed-off-by: Krasi Georgiev --- db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db_test.go b/db_test.go index 7388d202..0eff540f 100644 --- a/db_test.go +++ b/db_test.go @@ -1372,7 +1372,7 @@ func TestNoEmptyBlocks(t *testing.T) { defer db.Close() db.DisableCompactions() - rangeToTriggercompaction := db.opts.BlockRanges[0] / 2 * 3 + rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 defaultLabel := labels.FromStrings("foo", "bar") defaultMatcher := labels.NewMustRegexpMatcher("", ".*") @@ -1432,7 +1432,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, currentTime+1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction-3, 0) // ????????????? + _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) From 162fa2995d67470a861859afcfea7faab6fb2c7c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 17 Jan 2019 15:55:47 +0200 Subject: [PATCH 18/21] nit Signed-off-by: Krasi Georgiev --- db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db_test.go b/db_test.go index 0eff540f..7f74eab5 100644 --- a/db_test.go +++ b/db_test.go @@ -1382,6 +1382,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, len(db.Blocks()), len(actBlocks)) testutil.Equals(t, 0, len(actBlocks)) + testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "no compaction should be triggered here") }) t.Run("Test no blocks after deleting all samples from head.", func(t *testing.T) { @@ -1394,7 +1395,6 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) - testutil.Equals(t, 0, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "initial compaction count should be zero") testutil.Ok(t, db.compact()) testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here") From 9f24cde60e3a462077b2ae85dfc85e16f0b83636 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 17 Jan 2019 15:58:27 +0200 Subject: [PATCH 19/21] less samples Signed-off-by: Krasi Georgiev --- db_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db_test.go b/db_test.go index 7f74eab5..790c9248 100644 --- a/db_test.go +++ b/db_test.go @@ -1448,7 +1448,7 @@ func TestNoEmptyBlocks(t *testing.T) { {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, } for _, m := range blocks { - createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + createBlock(t, db.Dir(), 2, m.MinTime, m.MaxTime) } oldBlocks := db.Blocks() From 45bde4c2c1bb3deb22df44c06fe2ffbc52251421 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 17 Jan 2019 20:00:33 +0530 Subject: [PATCH 20/21] Nits Signed-off-by: Ganesh Vernekar --- db_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/db_test.go b/db_test.go index 790c9248..e7d0d338 100644 --- a/db_test.go +++ b/db_test.go @@ -1406,6 +1406,8 @@ func TestNoEmptyBlocks(t *testing.T) { app = db.Appender() _, err = app.Add(defaultLabel, 1, 0) testutil.Assert(t, err == ErrOutOfBounds, "the head should be truncated so no samples in the past should be allowed") + + // Adding new blocks. currentTime := db.Head().MaxTime() _, err = app.Add(defaultLabel, currentTime, 0) testutil.Ok(t, err) @@ -1444,7 +1446,7 @@ func TestNoEmptyBlocks(t *testing.T) { t.Run("Test no blocks remaining after deleting all samples from disk.", func(t *testing.T) { currentTime := db.Head().MaxTime() blocks := []*BlockMeta{ - {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, // + {MinTime: currentTime, MaxTime: currentTime + db.opts.BlockRanges[0]}, {MinTime: currentTime + 100, MaxTime: currentTime + 100 + db.opts.BlockRanges[0]}, } for _, m := range blocks { From 6b87a564109d8be692bef35a996fbf1980b51f3f Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 18 Jan 2019 10:20:32 +0200 Subject: [PATCH 21/21] revert some changes Signed-off-by: Krasi Georgiev --- compact.go | 6 +++--- db_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compact.go b/compact.go index 138de0ab..5d8155f5 100644 --- a/compact.go +++ b/compact.go @@ -195,14 +195,14 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { for i := len(dms) - 1; i >= 0; i-- { meta := dms[i].meta if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] { - continue + break } if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { - res = append(res, dms[i].dir) + return []string{dms[i].dir}, nil } } - return res, nil + return nil, nil } // selectDirs returns the dir metas that should be compacted into a single new block. diff --git a/db_test.go b/db_test.go index e7d0d338..21de405e 100644 --- a/db_test.go +++ b/db_test.go @@ -1458,7 +1458,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Equals(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) testutil.Ok(t, db.compact()) - testutil.Equals(t, 4, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here as all blocks have tombstones") + testutil.Equals(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran)), "compaction should have been triggered here once for each block that have tombstones") actBlocks, err := blockDirs(db.Dir()) testutil.Ok(t, err)