Skip to content

Commit

Permalink
Merge pull request #20033 from influxdata/DSB_influxdb_1.8_3238
Browse files Browse the repository at this point in the history
fix(tsm1): "snapshot in progress" error during backup
  • Loading branch information
davidby-influx authored Nov 17, 2020
2 parents c6f38a8 + 4406b97 commit 08f394f
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 28 deletions.
2 changes: 1 addition & 1 deletion tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Engine interface {

LoadMetadataIndex(shardID uint64, index Index) error

CreateSnapshot() (string, error)
CreateSnapshot(skipCacheOk bool) (string, error)
Backup(w io.Writer, basePath string, since time.Time) error
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Expand Down
40 changes: 20 additions & 20 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,26 +912,16 @@ func (e *Engine) Free() error {
// of the files in the archive. It will force a snapshot of the WAL first
// then perform the backup with a read lock against the file store. This means
// that new TSM files will not be able to be created in this shard while the
// backup is running. For shards that are still acively getting writes, this
// could cause the WAL to backup, increasing memory usage and evenutally rejecting writes.
// backup is running. For shards that are still actively getting writes, this
// could cause the WAL to backup, increasing memory usage and eventually rejecting writes.
func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
var err error
var path string
for i := 0; i < 3; i++ {
path, err = e.CreateSnapshot()
if err != nil {
switch err {
case ErrSnapshotInProgress:
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
default:
return err
}
}
}
if err == ErrSnapshotInProgress {
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
path, err = e.CreateSnapshot(true)
if err != nil {
return err
}

// Remove the temporary snapshot dir
defer func() {
if err := os.RemoveAll(path); err != nil {
Expand Down Expand Up @@ -998,7 +988,7 @@ func (e *Engine) timeStampFilterTarFile(start, end time.Time) func(f os.FileInfo
}

func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.Time) error {
path, err := e.CreateSnapshot()
path, err := e.CreateSnapshot(false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1958,9 +1948,19 @@ func (e *Engine) WriteSnapshot() (err error) {
}

// CreateSnapshot will create a temp directory that holds
// temporary hardlinks to the underylyng shard files.
func (e *Engine) CreateSnapshot() (string, error) {
if err := e.WriteSnapshot(); err != nil {
// temporary hardlinks to the underlying shard files.
// skipCacheOk controls whether it is permissible to fail writing out
// in-memory cache data when a previous snapshot is in progress
func (e *Engine) CreateSnapshot(skipCacheOk bool) (string, error) {
err := e.WriteSnapshot()
for i := 0; (i < 3) && (err == ErrSnapshotInProgress); i += 1 {
backoff := time.Duration(math.Pow(32, float64(i))) * time.Millisecond
time.Sleep(backoff)
err = e.WriteSnapshot()
}
if (err == ErrSnapshotInProgress) && skipCacheOk {
e.logger.Warn("Snapshotter busy: proceeding without cache contents.")
} else if err != nil {
return "", err
}

Expand Down
116 changes: 116 additions & 0 deletions tsdb/engine/tsm1/engine_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tsm1

import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/index/inmem"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"testing"
"time"
)

func TestEngine_ConcurrentShardSnapshots(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on windows")
}

tmpDir, err := ioutil.TempDir("", "shard_test")
if err != nil {
t.Fatalf("error creating temporary directory: %s", err.Error())
}
defer os.RemoveAll(tmpDir)
tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")

sfile := NewSeriesFile(tmpDir)
defer sfile.Close()

opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile)
opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{})

sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
defer sh.Close()

points := make([]models.Point, 0, 10000)
for i := 0; i < cap(points); i++ {
points = append(points, models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(int64(i), 0),
))
}
err = sh.WritePoints(points)
if err != nil {
t.Fatalf(err.Error())
}

engineInterface, err := sh.Engine()
if err != nil {
t.Fatalf("error retrieving shard.Engine(): %s", err.Error())
}

// Get the struct underlying the interface. Not a recommended practice.
realEngineStruct, ok := (engineInterface).(*Engine)
if !ok {
t.Log("Engine type does not permit simulating Cache race conditions")
return
}
// fake a race condition in snapshotting the cache.
realEngineStruct.Cache.snapshotting = true
defer func() {
realEngineStruct.Cache.snapshotting = false
}()

snapshotFunc := func(skipCacheOk bool) {
if f, err := sh.CreateSnapshot(skipCacheOk); err == nil {
if err = os.RemoveAll(f); err != nil {
t.Fatalf("Failed to clean up in TestEngine_ConcurrentShardSnapshots: %s", err.Error())
}
} else if err == ErrSnapshotInProgress {
if skipCacheOk {
t.Fatalf("failing to ignore this error,: %s", err.Error())
}
} else if err != nil {
t.Fatalf("error creating shard snapshot: %s", err.Error())
}
}

// Permit skipping cache in the snapshot
snapshotFunc(true)
// do not permit skipping the cache in the snapshot
snapshotFunc(false)
realEngineStruct.Cache.snapshotting = false
}

// NewSeriesFile returns a new instance of SeriesFile with a temporary file path.
func NewSeriesFile(tmpDir string) *tsdb.SeriesFile {
dir, err := ioutil.TempDir(tmpDir, "tsdb-series-file-")
if err != nil {
panic(err)
}
f := tsdb.NewSeriesFile(dir)
f.Logger = logger.New(os.Stdout)
if err := f.Open(); err != nil {
panic(err)
}
return f
}

type seriesIDSets []*tsdb.SeriesIDSet

func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error {
for _, v := range a {
f(v)
}
return nil
}
4 changes: 2 additions & 2 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,12 +1177,12 @@ func (s *Shard) Import(r io.Reader, basePath string) error {

// CreateSnapshot will return a path to a temp directory
// containing hard links to the underlying shard files.
func (s *Shard) CreateSnapshot() (string, error) {
func (s *Shard) CreateSnapshot(skipCacheOk bool) (string, error) {
engine, err := s.Engine()
if err != nil {
return "", err
}
return engine.CreateSnapshot()
return engine.CreateSnapshot(skipCacheOk)
}

// ForEachMeasurementName iterates over each measurement in the shard.
Expand Down
4 changes: 2 additions & 2 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[:500])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}

Expand All @@ -472,7 +472,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}

_ = sh.WritePoints(points[500:])
if f, err := sh.CreateSnapshot(); err == nil {
if f, err := sh.CreateSnapshot(false); err == nil {
os.RemoveAll(f)
}
}
Expand Down
4 changes: 2 additions & 2 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,13 +675,13 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en

// CreateShardSnapShot will create a hard link to the underlying shard and return a path.
// The caller is responsible for cleaning up (removing) the file path returned.
func (s *Store) CreateShardSnapshot(id uint64) (string, error) {
func (s *Store) CreateShardSnapshot(id uint64, skipCacheOk bool) (string, error) {
sh := s.Shard(id)
if sh == nil {
return "", ErrShardNotFound
}

return sh.CreateSnapshot()
return sh.CreateSnapshot(skipCacheOk)
}

// SetShardEnabled enables or disables a shard for read and writes.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestStore_CreateShardSnapShot(t *testing.T) {
t.Fatalf("expected shard")
}

dir, e := s.CreateShardSnapshot(1)
dir, e := s.CreateShardSnapshot(1, false)
if e != nil {
t.Fatal(e)
}
Expand Down

0 comments on commit 08f394f

Please sign in to comment.