Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Sep 21, 2018
1 parent 9de3926 commit ab14c9c
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 13 deletions.
2 changes: 1 addition & 1 deletion block.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ type BlockMetaCompaction struct {
// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
// As we dont write empty blocks, we need this to mark
// if the block is deletable if in compaction this block
// if the block is deletable if during compaction this block
// resulted in an empty block.
Deletable bool `json:"deletable,omitempty"`
// Short descriptions of the direct blocks that were used to create
Expand Down
3 changes: 3 additions & 0 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Compactor interface {

// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
// Compaction resulting in an empty block are not written to the disk
// and marks all parents as deletable in its meta data.
Compact(dest string, dirs ...string) (ulid.ULID, error)
}

Expand Down Expand Up @@ -493,6 +495,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "close index writer")
}

// Populated block is empty, so cleanup and exit.
if meta.Stats.NumSamples == 0 {
if err := os.RemoveAll(tmp); err != nil {
return errors.Wrap(err, "remove tmp folder after empty block failed")
Expand Down
14 changes: 8 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,16 +383,18 @@ func (db *DB) compact() (changes bool, err error) {
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
// After this is fixed https://github.com/prometheus/tsdb/issues/309,
// if the compaction of head did not create any new blocks (possible
// if the samples were deleted in that range), and if number of
// blocks are 0, db.reload() does not truncate head. Hence MinTime
// of head is not set properly. So only for that situation, we need
// to call db.head.Truncate(..) manually.
db.mtx.RLock()
l := len(db.blocks)
db.mtx.RUnlock()
if l == 0 {
// After this was fixed https://github.com/prometheus/tsdb/issues/309,
// if the compaction of head did not create any new blocks (if all samples
// were deleted), it is possible to have 0 blocks after compaction.
//
// db.reload() doesn't truncate the head when the block count is zero.
//
// In this case we have 0 blocks because all data has been deleted
// so the head needs to be reset.
err = db.head.Truncate(maxt)
if err != nil {
return changes, errors.Wrap(err, "head truncate failed (in compact)")
Expand Down
37 changes: 31 additions & 6 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ func TestInitializeHeadTimestamp(t *testing.T) {
})
}

func TestNoEmptyBlock(t *testing.T) {
func TestNoEmptyBlocks(t *testing.T) {
db, close := openTestDB(t, nil)
defer close()
defer db.Close()
Expand All @@ -1317,7 +1317,7 @@ func TestNoEmptyBlock(t *testing.T) {
label := labels.FromStrings("foo", "bar")

app := db.Appender()
for i := int64(0); i < 3; i++ {
for i := int64(0); i < 6; i++ {
_, err := app.Add(label, i*blockRange, 0)
testutil.Ok(t, err)
_, err = app.Add(label, i*blockRange+1000, 0)
Expand All @@ -1326,15 +1326,18 @@ func TestNoEmptyBlock(t *testing.T) {
err = app.Commit()
testutil.Ok(t, err)

oldHeadMinT := db.head.MinTime()
testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))
_, err = db.compact()
testutil.Ok(t, err)
// Making sure that head was modified.
testutil.Assert(t, oldHeadMinT < db.head.MinTime(), "Head was not changed after compaction.")
// No blocks created.
testutil.Equals(t, 0, len(db.blocks))

// Test no blocks remaining after small samples are deleted from disk.
// Test no blocks remaining after deleting all samples from disk.
app = db.Appender()
for i := int64(3); i < 20; i++ {
for i := int64(7); i < 25; i++ {
for j := int64(0); j < 10; j++ {
_, err := app.Add(label, i*blockRange+j, 0)
testutil.Ok(t, err)
Expand All @@ -1345,17 +1348,39 @@ func TestNoEmptyBlock(t *testing.T) {

_, err = db.compact()
testutil.Ok(t, err)
testutil.Assert(t, len(db.blocks) > 0, "No blocks created")
testutil.Assert(t, len(db.blocks) > 0, "No blocks created when compacting with >0 samples")

testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, labels.NewEqualMatcher("foo", "bar")))

// Mimicking small part of compaction.
// Mimicking Plan() of compactor and getting list
// of all block directories to pass for compaction.
plan := []string{}
for _, b := range db.blocks {
plan = append(plan, b.Dir())
}

// Blocks are not set deletable before compaction.
for _, b := range db.blocks {
meta, err := readMetaFile(b.dir)
testutil.Ok(t, err)
testutil.Assert(t, !meta.Compaction.Deletable, "Block was marked deletable before compaction")
}

// No new blocks are created by Compact, and marks all old blocks as deletable.
oldLen := len(db.blocks)
_, err = db.compactor.Compact(db.dir, plan...)
testutil.Ok(t, err)
// Number of blocks are the same.
testutil.Equals(t, oldLen, len(db.blocks))

// Marked as deletable.
for _, b := range db.blocks {
meta, err := readMetaFile(b.dir)
testutil.Ok(t, err)
testutil.Assert(t, meta.Compaction.Deletable, "Block was not marked deletable after compaction")
}

// Deletes the deletable blocks.
testutil.Ok(t, db.reload())
// All samples are deleted. No blocks should be remeianing after compact.
testutil.Equals(t, 0, len(db.blocks))
Expand Down

0 comments on commit ab14c9c

Please sign in to comment.