Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

refactor block size calculation #637

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## master / unreleased
- [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609)
- [BUGFIX] Re-calculate block size when calling `block.Delete`.
- [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before.

## 0.8.0
- [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic.
Expand Down
111 changes: 57 additions & 54 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,6 @@ type Appendable interface {
Appender() Appender
}

// SizeReader returns the size of the object in bytes.
type SizeReader interface {
// Size returns the size in bytes.
Size() int64
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand All @@ -183,7 +177,6 @@ type BlockStats struct {
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
}

// BlockDesc describes a block by ULID and time range.
Expand Down Expand Up @@ -214,24 +207,24 @@ const metaFilename = "meta.json"

func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }

func readMetaFile(dir string) (*BlockMeta, error) {
func readMetaFile(dir string) (*BlockMeta, int64, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil {
return nil, err
return nil, 0, err
}
var m BlockMeta

if err := json.Unmarshal(b, &m); err != nil {
return nil, err
return nil, 0, err
}
if m.Version != 1 {
return nil, errors.Errorf("unexpected meta file version %d", m.Version)
return nil, 0, errors.Errorf("unexpected meta file version %d", m.Version)
}

return &m, nil
return &m, int64(len(b)), nil
}

func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) (int64, error) {
meta.Version = 1

// Make any changes to the file appear atomic.
Expand All @@ -245,26 +238,32 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {

f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}

enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
jsonMeta, err := json.MarshalIndent(meta, "", "\t")
if err != nil {
return 0, err
}

var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
n, err := f.Write(jsonMeta)
if err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}

// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
if merr.Add(f.Sync()); merr.Err() != nil {
if err := f.Sync(); err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}
if err := f.Close(); err != nil {
return err
return 0, err
}
return fileutil.Replace(tmp, path)
return int64(n), fileutil.Replace(tmp, path)
}

// Block represents a directory of time series data covering a continuous time range.
Expand All @@ -285,6 +284,11 @@ type Block struct {
tombstones TombstoneReader

logger log.Logger

numBytesChunks int64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uint64 and separate struct maybe for those? Not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all Size,Write interfaces I have seen so far use int or int64 so I prefer to keep it as is.
Also for db.opts.MaxBytes we also use int64 where -1 means that the size retention is disabled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fine by me then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining

numBytesIndex int64
numBytesTombstone int64
numBytesMeta int64
}

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
Expand All @@ -302,7 +306,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
err = merr.Err()
}
}()
meta, err := readMetaFile(dir)
meta, sizeMeta, err := readMetaFile(dir)
if err != nil {
return nil, err
}
Expand All @@ -319,43 +323,28 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, ir)

tr, tsr, err := readTombstones(dir)
tr, sizeTomb, err := readTombstones(dir)
if err != nil {
return nil, err
}
closers = append(closers, tr)

// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(logger, dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
}

pb = &Block{
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
numBytesChunks: cr.Size(),
numBytesIndex: ir.Size(),
numBytesTombstone: sizeTomb,
numBytesMeta: sizeMeta,
}
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -390,7 +379,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
func (pb *Block) Size() int64 {
return pb.numBytesChunks + pb.numBytesIndex + pb.numBytesTombstone + pb.numBytesMeta
}

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
Expand Down Expand Up @@ -437,7 +428,12 @@ func (pb *Block) GetSymbolTableSize() uint64 {

func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
n, err := writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight inconsistency with below code. We update size only on no error, here we do it no matter what. Can we do it on no error here as well? (: this will avoid potential confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return nil
}

type blockIndexReader struct {
Expand Down Expand Up @@ -561,10 +557,17 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()

if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones)
if err != nil {
return err
}
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
pb.numBytesTombstone = n
n, err = writeMetaFile(pb.logger, pb.dir, &pb.meta)
if err != nil {
return err
}
pb.numBytesMeta = n
return nil
}

// CleanTombstones will remove the tombstones and rewrite the block (only if there are any tombstones).
Expand Down
60 changes: 58 additions & 2 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
)
Expand All @@ -40,9 +41,10 @@ func TestBlockMetaMustNeverBeVersion2(t *testing.T) {
testutil.Ok(t, os.RemoveAll(dir))
}()

testutil.Ok(t, writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{}))
_, err = writeMetaFile(log.NewNopLogger(), dir, &BlockMeta{})
testutil.Ok(t, err)

meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
testutil.Ok(t, err)
testutil.Assert(t, meta.Version != 2, "meta.json version must never be 2")
}
Expand Down Expand Up @@ -149,6 +151,60 @@ func TestCorruptedChunk(t *testing.T) {
}
}

// TestBlockSize ensures that the block size is calculated correctly.
func TestBlockSize(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_blockSize")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

var (
blockInit *Block
expSizeInit int64
blockDirInit string
)

// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockInit.Close())
}()
expSizeInit = blockInit.Size()
actSizeInit, err := testutil.DirSize(blockInit.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expSizeInit, actSizeInit)
}

// Delete some series and check the sizes again.
{
testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*")))
expAfterDelete := blockInit.Size()
testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
actAfterDelete, err := testutil.DirSize(blockDirInit)
testutil.Ok(t, err)
testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil)
testutil.Ok(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
testutil.Ok(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockAfterCompact.Close())
}()
expAfterCompact := blockAfterCompact.Size()
actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir())
testutil.Ok(t, err)
testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact)
testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size")
}
}

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head := createHead(tb, series)
Expand Down
12 changes: 7 additions & 5 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {

var dms []dirMeta
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
start := time.Now()

for _, d := range dirs {
meta, err := readMetaFile(d)
meta, _, err := readMetaFile(d)
if err != nil {
return uid, err
}
Expand Down Expand Up @@ -420,12 +420,14 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
if meta.Stats.NumSamples == 0 {
for _, b := range bs {
b.meta.Compaction.Deletable = true
if err = writeMetaFile(c.logger, b.dir, &b.meta); err != nil {
n, err := writeMetaFile(c.logger, b.dir, &b.meta)
if err != nil {
level.Error(c.logger).Log(
"msg", "Failed to write 'Deletable' to meta file after compaction",
"ulid", b.meta.ULID,
)
}
b.numBytesMeta = n
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we set numBytes to new block, but we ignore the other sizes later on?

Copy link
Contributor Author

@krasi-georgiev krasi-georgiev Jun 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this is setting the numBytesMeta for the existing blocks. Existing blocks are already open so need to set the size information when it changes.

Later on it is creating new blocks and these would have their size calculated when they are open at db.reload().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, LGTM.

}
uid = ulid.ULID{}
level.Info(c.logger).Log(
Expand Down Expand Up @@ -600,12 +602,12 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return nil
}

if err = writeMetaFile(c.logger, tmp, meta); err != nil {
if _, err = writeMetaFile(c.logger, tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
}

// Create an empty tombstones file.
if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err

corrupted = make(map[ulid.ULID]error)
for _, dir := range dirs {
meta, err := readMetaFile(dir)
meta, _, err := readMetaFile(dir)
if err != nil {
level.Error(db.logger).Log("msg", "not a block dir", "dir", dir)
continue
Expand Down
Loading