Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dont-write-empty-…
Browse files Browse the repository at this point in the history
…blocks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Sep 21, 2018
2 parents ab14c9c + d38516b commit 7623f3e
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 355 deletions.
38 changes: 31 additions & 7 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tsdb

import (
"encoding/binary"
"encoding/json"
"io/ioutil"
"os"
Expand Down Expand Up @@ -252,6 +253,10 @@ type Block struct {
dir string
meta BlockMeta

// Symbol Table Size in bytes.
// We maintain this variable to avoid recalculation everytime.
symbolTableSize uint64

chunkr ChunkReader
indexr IndexReader
tombstones TombstoneReader
Expand Down Expand Up @@ -279,12 +284,23 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
return nil, err
}

// Calculating symbol table size.
tmp := make([]byte, 8)
symTblSize := uint64(0)
for _, v := range ir.SymbolTable() {
// Size of varint length of the symbol.
symTblSize += uint64(binary.PutUvarint(tmp, uint64(len(v))))
// Size of the symbol.
symTblSize += uint64(len(v))
}

pb := &Block{
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
tombstones: tr,
symbolTableSize: symTblSize,
}
return pb, nil
}
Expand Down Expand Up @@ -354,6 +370,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) {
return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil
}

// GetSymbolTableSize returns the Symbol Table Size in the index of this block.
func (pb *Block) GetSymbolTableSize() uint64 {
return pb.symbolTableSize
}

func (pb *Block) setCompactionFailed() error {
pb.meta.Compaction.Failed = true
return writeMetaFile(pb.dir, &pb.meta)
Expand Down Expand Up @@ -487,10 +508,13 @@ Outer:
func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
numStones := 0

pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
if err := pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
numStones += len(ivs)
return nil
})
}); err != nil {
// This should never happen, as the iteration function only returns nil.
panic(err)
}
if numStones == 0 {
return nil, nil
}
Expand Down
9 changes: 9 additions & 0 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
stats := &CheckpointStats{}

var sr io.Reader
// We close everything explicitly because Windows needs files to be
// closed before being deleted. But we also have defer so that we close
// files if there is an error somewhere.
var closers []io.Closer
{
lastFn, k, err := LastCheckpoint(w.Dir())
if err != nil && err != ErrNotFound {
Expand All @@ -126,6 +130,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "open last checkpoint")
}
defer last.Close()
closers = append(closers, last)
sr = last
}

Expand All @@ -134,6 +139,7 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
return nil, errors.Wrap(err, "create segment reader")
}
defer segsr.Close()
closers = append(closers, segsr)

if sr != nil {
sr = io.MultiReader(sr, segsr)
Expand Down Expand Up @@ -263,6 +269,9 @@ func Checkpoint(logger log.Logger, w *wal.WAL, m, n int, keep func(id uint64) bo
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return nil, errors.Wrap(err, "rename checkpoint directory")
}
if err := closeAll(closers...); err != nil {
return stats, errors.Wrap(err, "close opened files")
}
if err := w.Truncate(n + 1); err != nil {
// If truncating fails, we'll just try again at the next checkpoint.
// Leftover segments will just be ignored in the future if there's a checkpoint
Expand Down
23 changes: 17 additions & 6 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func (b *writeBenchmark) run() {
if err := b.storage.Close(); err != nil {
exitWithError(err)
}
b.stopProfiling()
if err := b.stopProfiling(); err != nil {
exitWithError(err)
}
})
}

Expand Down Expand Up @@ -248,7 +250,9 @@ func (b *writeBenchmark) startProfiling() {
if err != nil {
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err))
}
pprof.StartCPUProfile(b.cpuprof)
if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err))
}

// Start memory profiling.
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
Expand All @@ -271,29 +275,36 @@ func (b *writeBenchmark) startProfiling() {
runtime.SetMutexProfileFraction(20)
}

func (b *writeBenchmark) stopProfiling() {
func (b *writeBenchmark) stopProfiling() error {
if b.cpuprof != nil {
pprof.StopCPUProfile()
b.cpuprof.Close()
b.cpuprof = nil
}
if b.memprof != nil {
pprof.Lookup("heap").WriteTo(b.memprof, 0)
if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil {
return fmt.Errorf("error writing mem profile: %v", err)
}
b.memprof.Close()
b.memprof = nil
}
if b.blockprof != nil {
pprof.Lookup("block").WriteTo(b.blockprof, 0)
if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil {
return fmt.Errorf("error writing block profile: %v", err)
}
b.blockprof.Close()
b.blockprof = nil
runtime.SetBlockProfileRate(0)
}
if b.mtxprof != nil {
pprof.Lookup("mutex").WriteTo(b.mtxprof, 0)
if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil {
return fmt.Errorf("error writing mutex profile: %v", err)
}
b.mtxprof.Close()
b.mtxprof = nil
runtime.SetMutexProfileFraction(0)
}
return nil
}

func measureTime(stage string, f func()) time.Duration {
Expand Down
15 changes: 11 additions & 4 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_size",
Name: "prometheus_tsdb_compaction_chunk_size_bytes",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
Expand All @@ -109,7 +109,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_chunk_range",
Name: "prometheus_tsdb_compaction_chunk_range_seconds",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
})
Expand Down Expand Up @@ -469,6 +469,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
defer chunkw.Close()
// Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
Expand All @@ -483,11 +484,15 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err != nil {
return errors.Wrap(err, "open index writer")
}
defer indexw.Close()

if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}

// We are explicitly closing them here to check for error even
// though these are covered under defer. This is because in Windows,
// you cannot delete these unless they are closed and the defer is to
// make sure they are closed if the function exits due to an error above.
if err = chunkw.Close(); err != nil {
return errors.Wrap(err, "close chunk writer")
}
Expand Down Expand Up @@ -651,7 +656,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}

for _, chk := range chks {
c.chunkPool.Put(chk.Chunk)
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
}

for _, l := range lset {
Expand Down
54 changes: 40 additions & 14 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ type DB struct {

type dbMetrics struct {
loadedBlocks prometheus.GaugeFunc
symbolTableSize prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
compactionsTriggered prometheus.Counter
cutoffs prometheus.Counter
cutoffsFailed prometheus.Counter
startTime prometheus.GaugeFunc
tombCleanTimer prometheus.Histogram
}

Expand All @@ -138,6 +140,19 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
defer db.mtx.RUnlock()
return float64(len(db.blocks))
})
m.symbolTableSize = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_symbol_table_size_bytes",
Help: "Size of symbol table on disk (in bytes)",
}, func() float64 {
db.mtx.RLock()
blocks := db.blocks[:]
db.mtx.RUnlock()
symTblSize := uint64(0)
for _, b := range blocks {
symTblSize += b.GetSymbolTableSize()
}
return float64(symTblSize)
})
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_reloads_total",
Help: "Number of times the database reloaded block data from disk.",
Expand All @@ -158,6 +173,17 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "prometheus_tsdb_retention_cutoffs_failures_total",
Help: "Number of times the database failed to cut off block data from disk.",
})
m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "prometheus_tsdb_lowest_timestamp",
Help: "Lowest timestamp value stored in the database.",
}, func() float64 {
db.mtx.RLock()
defer db.mtx.RUnlock()
if len(db.blocks) == 0 {
return float64(db.head.minTime)
}
return float64(db.blocks[0].meta.MinTime)
})
m.tombCleanTimer = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_tombstone_cleanup_seconds",
Help: "The time taken to recompact blocks to remove tombstones.",
Expand All @@ -166,11 +192,13 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
if r != nil {
r.MustRegister(
m.loadedBlocks,
m.symbolTableSize,
m.reloads,
m.reloadsFailed,
m.cutoffs,
m.cutoffsFailed,
m.compactionsTriggered,
m.startTime,
m.tombCleanTimer,
)
}
Expand All @@ -192,7 +220,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := repairBadIndexVersion(l, dir); err != nil {
return nil, err
}
// Migrate old WAL.
// Migrate old WAL if one exists.
if err := MigrateWAL(l, filepath.Join(dir, "wal")); err != nil {
return nil, errors.Wrap(err, "migrate WAL")
}
Expand Down Expand Up @@ -272,7 +300,7 @@ func (db *DB) run() {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()

_, err := db.compact()
err := db.compact()
if err != nil {
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
Expand Down Expand Up @@ -338,20 +366,20 @@ func (a dbAppender) Commit() error {
// this is sufficient to reliably delete old data.
// Old blocks are only deleted on reload based on the new block's parent information.
// See DB.reload documentation for further information.
func (db *DB) compact() (changes bool, err error) {
func (db *DB) compact() (err error) {
db.cmtx.Lock()
defer db.cmtx.Unlock()

if !db.compactionsEnabled {
return false, nil
return nil
}

// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
for {
select {
case <-db.stopc:
return changes, nil
return nil
default:
}
// The head has a compactable range if 1.5 level 0 ranges are between the oldest
Expand All @@ -374,14 +402,13 @@ func (db *DB) compact() (changes bool, err error) {
maxt: maxt - 1,
}
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
return changes, errors.Wrap(err, "persist head block")
return errors.Wrap(err, "persist head block")
}
changes = true

runtime.GC()

if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
return errors.Wrap(err, "reload blocks")
}
db.mtx.RLock()
l := len(db.blocks)
Expand All @@ -408,31 +435,30 @@ func (db *DB) compact() (changes bool, err error) {
for {
plan, err := db.compactor.Plan(db.dir)
if err != nil {
return changes, errors.Wrap(err, "plan compaction")
return errors.Wrap(err, "plan compaction")
}
if len(plan) == 0 {
break
}

select {
case <-db.stopc:
return changes, nil
return nil
default:
}

if _, err := db.compactor.Compact(db.dir, plan...); err != nil {
return changes, errors.Wrapf(err, "compact %s", plan)
return errors.Wrapf(err, "compact %s", plan)
}
changes = true
runtime.GC()

if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
return errors.Wrap(err, "reload blocks")
}
runtime.GC()
}

return changes, nil
return nil
}

func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
Expand Down
Loading

0 comments on commit 7623f3e

Please sign in to comment.