Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

refactor block size calculation #637

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 63 additions & 28 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -179,11 +180,14 @@ type BlockMeta struct {

// BlockStats contains stats about contents of a block.
type BlockStats struct {
NumSamples uint64 `json:"numSamples,omitempty"`
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytes int64 `json:"numBytes,omitempty"`
NumSamples uint64 `json:"numSamples,omitempty"`
NumSeries uint64 `json:"numSeries,omitempty"`
NumChunks uint64 `json:"numChunks,omitempty"`
NumTombstones uint64 `json:"numTombstones,omitempty"`
NumBytesChunks int64 `json:"numBytesChunks"`
NumBytesIndex int64 `json:"numBytesIndex"`
NumBytesMeta int64 `json:"numBytesMeta"`
NumBytesTombstone int64 `json:"numBytesTombstone"`
}

// BlockDesc describes a block by ULID and time range.
Expand Down Expand Up @@ -234,6 +238,12 @@ func readMetaFile(dir string) (*BlockMeta, error) {
func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
meta.Version = 1

// The meta file is within a block so
// its size needs to be calcualted and set upfront.
if err := setMetaFileSize(meta); err != nil {
return err
}

// Make any changes to the file appear atomic.
path := filepath.Join(dir, metaFilename)
tmp := path + ".tmp"
Expand All @@ -248,16 +258,21 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
return err
}

enc := json.NewEncoder(f)
enc.SetIndent("", "\t")
jsonMeta, err := json.MarshalIndent(meta, "", "\t")
if err != nil {
return err
}

var merr tsdb_errors.MultiError
if merr.Add(enc.Encode(meta)); merr.Err() != nil {
if _, err = f.Write(jsonMeta); err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
}

// Force the kernel to persist the file on disk to avoid data loss if the host crashes.
if merr.Add(f.Sync()); merr.Err() != nil {
if err := f.Sync(); err != nil {
merr.Add(err)
merr.Add(f.Close())
return merr.Err()
}
Expand All @@ -267,6 +282,19 @@ func writeMetaFile(logger log.Logger, dir string, meta *BlockMeta) error {
return fileutil.Replace(tmp, path)
}

// setMetaFileSize calculates and set the meta file size.
func setMetaFileSize(meta *BlockMeta) error {
jsonMeta, err := json.MarshalIndent(meta, "", "\t")
if err != nil {
return err
}

// Get the current size and add to the final size one byte for each extra digit char.
chars := len(strconv.Itoa(len(jsonMeta))) - len(strconv.Itoa(int(meta.Stats.NumBytesMeta)))
meta.Stats.NumBytesMeta = int64(len(jsonMeta) + chars)
return nil
}

// Block represents a directory of time series data covering a continuous time range.
type Block struct {
mtx sync.RWMutex
Expand Down Expand Up @@ -325,13 +353,26 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
}
closers = append(closers, tr)

// TODO refactor to set this at block creation time as
// that would be the logical place for a block size to be calculated.
bs := blockSize(cr, ir, tsr)
meta.Stats.NumBytes = bs
err = writeMetaFile(logger, dir, meta)
if err != nil {
level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err)
// Calculate the block size.
// This is done at read and not when creating a block to avoid blocks return zero size.
// For example opening a block created by an older library version wouldn't include the size information.
{
// Rewrite the meta only when the sizes has changed.
// This check is to allow opening the db in read only mode without
// any meta file rewrites when the blocks size hasn't changed.
if meta.Stats.NumBytesChunks != cr.Size() ||
meta.Stats.NumBytesIndex != ir.Size() ||
meta.Stats.NumBytesTombstone != tsr.Size() {

meta.Stats.NumBytesChunks = cr.Size()
meta.Stats.NumBytesIndex = ir.Size()
meta.Stats.NumBytesTombstone = tsr.Size()

err = writeMetaFile(logger, dir, meta)
if err != nil {
return nil, err
}
}
}

pb = &Block{
Expand All @@ -346,16 +387,6 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
return pb, nil
}

func blockSize(rr ...SizeReader) int64 {
var total int64
for _, r := range rr {
if r != nil {
total += r.Size()
}
}
return total
}

// Close closes the on-disk block. It blocks as long as there are readers reading from the block.
func (pb *Block) Close() error {
pb.mtx.Lock()
Expand Down Expand Up @@ -390,7 +421,9 @@ func (pb *Block) MinTime() int64 { return pb.meta.MinTime }
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }
func (pb *Block) Size() int64 {
return pb.meta.Stats.NumBytesChunks + pb.meta.Stats.NumBytesIndex + pb.meta.Stats.NumBytesTombstone + pb.meta.Stats.NumBytesMeta
}

// ErrClosing is returned when a block is in the process of being closed.
var ErrClosing = errors.New("block is closing")
Expand Down Expand Up @@ -561,9 +594,11 @@ Outer:
pb.tombstones = stones
pb.meta.Stats.NumTombstones = pb.tombstones.Total()

if err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones); err != nil {
n, err := writeTombstoneFile(pb.logger, pb.dir, pb.tombstones)
if err != nil {
return err
}
pb.meta.Stats.NumBytesTombstone = n
return writeMetaFile(pb.logger, pb.dir, &pb.meta)
}

Expand Down
55 changes: 55 additions & 0 deletions block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil"
"github.com/prometheus/tsdb/tsdbutil"
)
Expand Down Expand Up @@ -149,6 +150,60 @@ func TestCorruptedChunk(t *testing.T) {
}
}

// TestBlockSize ensures that the block size is calculated correctly.
func TestBlockSize(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "test_blockSize")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(tmpdir))
}()

var (
blockInit *Block
expSizeInit int64
blockDirInit string
)
// Create a block and compare the reported size vs actual disk size.
{
blockDirInit = createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
blockInit, err = OpenBlock(nil, blockDirInit, nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockInit.Close())
}()
expSizeInit = blockInit.Size()
actSizeInit, err := testutil.DirSize(blockInit.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expSizeInit, actSizeInit)
}

// Delete some series and check the sizes again.
{
testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*")))
expAfterDelete := blockInit.Size()
testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
actAfterDelete, err := testutil.DirSize(blockDirInit)
testutil.Ok(t, err)
testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size")

c, err := NewLeveledCompactor(context.Background(), nil, log.NewNopLogger(), []int64{0}, nil)
testutil.Ok(t, err)
blockDirAfterCompact, err := c.Compact(tmpdir, []string{blockInit.Dir()}, nil)
testutil.Ok(t, err)
blockAfterCompact, err := OpenBlock(nil, filepath.Join(tmpdir, blockDirAfterCompact.String()), nil)
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, blockAfterCompact.Close())
}()
expAfterCompact := blockAfterCompact.Size()
actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir())
testutil.Ok(t, err)
testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact)
testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size")
}

}

// createBlock creates a block with given set of series and returns its dir.
func createBlock(tb testing.TB, dir string, series []Series) string {
head := createHead(tb, series)
Expand Down
2 changes: 1 addition & 1 deletion compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
}

// Create an empty tombstones file.
if err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
if _, err := writeTombstoneFile(c.logger, tmp, newMemTombstones()); err != nil {
return errors.Wrap(err, "write new tombstones file")
}

Expand Down
23 changes: 4 additions & 19 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ func TestDB_reloadOrder(t *testing.T) {

testutil.Ok(t, db.reload())
blocks := db.Blocks()
for _, b := range blocks {
b.meta.Stats.NumBytes = 0
}
testutil.Equals(t, 3, len(blocks))
testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime)
testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime)
Expand Down Expand Up @@ -1060,7 +1057,8 @@ func TestSizeRetention(t *testing.T) {
testutil.Ok(t, db.reload()) // Reload the db to register the new db size.
testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered.
expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics.
actSize := dbDiskSize(db.Dir())
actSize, err := testutil.DirSize(db.Dir())
testutil.Ok(t, err)
testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size")

// Decrease the max bytes limit so that a delete is triggered.
Expand All @@ -1074,7 +1072,8 @@ func TestSizeRetention(t *testing.T) {
actBlocks := db.Blocks()
expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes))
actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount))
actSize = dbDiskSize(db.Dir())
actSize, err = testutil.DirSize(db.Dir())
testutil.Ok(t, err)

testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch")
testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size")
Expand All @@ -1085,20 +1084,6 @@ func TestSizeRetention(t *testing.T) {

}

func dbDiskSize(dir string) int64 {
var statSize int64
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
// Include only index,tombstone and chunks.
if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) ||
info.Name() == indexFilename ||
info.Name() == tombstoneFilename {
statSize += info.Size()
}
return nil
})
return statSize
}

func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) {
db, delete := openTestDB(t, nil)
defer func() {
Expand Down
36 changes: 20 additions & 16 deletions tombstones.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ type TombstoneReader interface {
Close() error
}

func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error {
func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) (int64, error) {
path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp"
hash := newCRC32()
var size int

f, err := os.Create(tmp)
if err != nil {
return err
return 0, err
}
defer func() {
if f != nil {
Expand All @@ -79,10 +80,11 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
// Write the meta.
buf.PutBE32(MagicTombstone)
buf.PutByte(tombstoneFormatV1)
_, err = f.Write(buf.Get())
n, err := f.Write(buf.Get())
if err != nil {
return err
return 0, err
}
size += n

mw := io.MultiWriter(f, hash)

Expand All @@ -94,32 +96,34 @@ func writeTombstoneFile(logger log.Logger, dir string, tr TombstoneReader) error
buf.PutVarint64(iv.Mint)
buf.PutVarint64(iv.Maxt)

_, err = mw.Write(buf.Get())
n, err = mw.Write(buf.Get())
if err != nil {
return err
}
size += n
}
return nil
}); err != nil {
return fmt.Errorf("error writing tombstones: %v", err)
return 0, fmt.Errorf("error writing tombstones: %v", err)
}

_, err = f.Write(hash.Sum(nil))
n, err = f.Write(hash.Sum(nil))
if err != nil {
return err
return 0, err
}
size += n

var merr tsdb_errors.MultiError
if merr.Add(f.Sync()); merr.Err() != nil {
merr.Add(f.Close())
return merr.Err()
return 0, merr.Err()
}

if err = f.Close(); err != nil {
return err
return 0, err
}
f = nil
return fileutil.Replace(tmp, path)
return int64(size), fileutil.Replace(tmp, path)
}

// Stone holds the information on the posting and time-range
Expand All @@ -132,7 +136,7 @@ type Stone struct {
func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if os.IsNotExist(err) {
return newMemTombstones(), nil, nil
return newMemTombstones(), &TombstoneFile{size: 0}, nil
} else if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -230,6 +234,10 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) {
}
}

func (*memTombstones) Close() error {
return nil
}

// TombstoneFile holds information about the tombstone file.
type TombstoneFile struct {
size int64
Expand All @@ -240,10 +248,6 @@ func (t *TombstoneFile) Size() int64 {
return t.size
}

func (*memTombstones) Close() error {
return nil
}

// Interval represents a single time-interval.
type Interval struct {
Mint, Maxt int64
Expand Down
Loading