Skip to content

Commit

Permalink
cmd/thanos/receive: reduce WAL replays at startup
Browse files Browse the repository at this point in the history
Every time thanos receive is started, it has to replay the WAL three
times, namely:
1. open the TSDB;
2. close the TSDB; open the ReadOnly TSDB and Flush; and
3. open the TSDB

These WAL replays can take a very long time if the WAL has lots of data.
With the fix from thanos-io#1654, the third time will be instantaneous because
the WAL will be empty. That still leaves two potentially long WAL
replays. We can cut this down to just one long replay if we do the
following operations instead:
1. with a closed TSDB, open the ReadOnly TSDB and Flush; and
2. open the TSDB

Now, the second step will be a fast replay because the WAL is empty,
leaving just one potentially expensive WAL replay.

This commit eliminates explicit opening of the writable TSDB during
startup, and instead opens it after flushing the read-only TSDB.

Signed-off-by: Lucas Servén Marín <lserven@gmail.com>
  • Loading branch information
squat committed Nov 5, 2019
1 parent 27a578d commit 6c82afa
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 23 deletions.
17 changes: 6 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,30 +230,22 @@ func runReceive(
defer close(dbReady)
defer close(uploadC)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring is loaded.
db := receive.NewFlushableStorage(
dataDir,
log.With(logger, "component", "tsdb"),
reg,
tsdbCfg,
)

// Before actually starting, we need to make sure the
// WAL is flushed. The WAL is flushed after the
// hashring ring is loaded.
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}

// Before quitting, ensure the WAL is flushed and the DB is closed.
defer func() {
if err := db.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
return
}
if err := db.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close storage")
return
}
}()

for {
Expand All @@ -267,6 +259,9 @@ func runReceive(
if err := db.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
}
if err := db.Open(); err != nil {
return errors.Wrap(err, "opening storage")
}
if upload {
uploadC <- struct{}{}
<-uploadDone
Expand Down
16 changes: 4 additions & 12 deletions pkg/receive/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,24 @@ func (f *FlushableStorage) open() error {
}

// Flush temporarily stops the storage and flushes the WAL to blocks.
// Note: this operation leaves the storage in the same state it was in.
// Note: this operation leaves the storage closed.
func (f *FlushableStorage) Flush() error {
f.mu.Lock()
defer f.mu.Unlock()
var reopen bool
if !f.stopped {
if err := f.DB.Close(); err != nil {
return errors.Wrap(err, "stopping storage")
}
f.stopped = true
reopen = true
}
ro, err := promtsdb.OpenDBReadOnly(f.Dir(), f.l)
ro, err := promtsdb.OpenDBReadOnly(f.path, f.l)
if err != nil {
return errors.Wrap(err, "opening read-only DB")
}
if err := ro.FlushWAL(f.Dir()); err != nil {
if err := ro.FlushWAL(f.path); err != nil {
return errors.Wrap(err, "flushing WAL")
}
if err := os.RemoveAll(filepath.Join(f.Dir(), "wal")); err != nil {
return errors.Wrap(err, "removing stale WAL")
}
if reopen {
return errors.Wrap(f.open(), "re-starting storage")
}
return nil
return errors.Wrap(os.RemoveAll(filepath.Join(f.path, "wal")), "removing stale WAL")
}

// Close stops the storage.
Expand Down

0 comments on commit 6c82afa

Please sign in to comment.