From d67e1f883db487e68542217686f7d17600f2392b Mon Sep 17 00:00:00 2001 From: Rahul Rampure Date: Tue, 14 May 2024 22:06:13 +0530 Subject: [PATCH] MB-61421: Prevent deletion of segments scheduled for copy (#2032) - Use a modified index reader, CopyReader, to mark segments in the Scorch root for online copy/backup operations. This prevents their deletion by the asynchronous cleanup routine during the copy/backup process, thereby mitigating the race condition between the merger/persistor and the copy/backup routine. --------- Co-authored-by: Abhinav Dangeti --- go.mod | 2 +- go.sum | 4 ++-- index/scorch/persister.go | 6 ++--- index/scorch/scorch.go | 40 ++++++++++++++++++++++++++++++++++ index/scorch/snapshot_index.go | 23 +++++++++++++++++++ index_impl.go | 21 +++++++++--------- 6 files changed, 80 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 875a50da9..9f20b76a9 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.13 require ( github.com/RoaringBitmap/roaring v0.9.4 github.com/bits-and-blooms/bitset v1.2.0 - github.com/blevesearch/bleve_index_api v1.0.2 + github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5 github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121 github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/blevesearch/go-porterstemmer v1.0.3 diff --git a/go.sum b/go.sum index ed2907827..5fbd0a4da 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5 github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4= -github.com/blevesearch/bleve_index_api v1.0.2 h1:rO736FwEPMVY1mGi7d4n7CgBB3+tB7uYN7QTjR+Ij+s= -github.com/blevesearch/bleve_index_api v1.0.2/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4= +github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5 h1:AG1xQCQKv8dqODzsCc5v1bnxOumAcLeHDHKPiVTGhqE= +github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4= github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121 h1:5uNzC0Mn/8aCGbSJA6T8ZCjrKW8MKsZKQYBDowmeV/g= github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121/go.mod h1:8z6udmXe8Ek8uuX4qOIWKb50vY/OQ1SG+XhL5FrcHOU= github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:kDy+zgJFJJoJYBvdfBSiZYBbdsUL0XcjHYWezpQBGPA= diff --git a/index/scorch/persister.go b/index/scorch/persister.go index 670f0f820..85908f5f5 100644 --- a/index/scorch/persister.go +++ b/index/scorch/persister.go @@ -529,8 +529,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string, } } - var filenames []string - newSegmentPaths := make(map[uint64]string) + filenames := make([]string, 0, len(snapshot.segment)) + newSegmentPaths := make(map[uint64]string, len(snapshot.segment)) // first ensure that each segment in this snapshot has been persisted for _, segmentSnapshot := range snapshot.segment { @@ -982,7 +982,7 @@ func (s *Scorch) removeOldZapFiles() error { for _, finfo := range currFileInfos { fname := finfo.Name() if filepath.Ext(fname) == ".zap" { - if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] { + if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] && (s.copyScheduled[fname] <= 0) { err := os.Remove(s.path + string(os.PathSeparator) + fname) if err != nil { log.Printf("got err removing file: %s, err: %v", fname, err) diff --git a/index/scorch/scorch.go b/index/scorch/scorch.go index 822093729..cd43f91bd 100644 --- a/index/scorch/scorch.go +++ b/index/scorch/scorch.go @@ -19,6 +19,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "sync" "sync/atomic" "time" @@ -50,6 +51,7 @@ type Scorch struct { unsafeBatch bool rootLock sync.RWMutex + root *IndexSnapshot // holds 1 ref-count on the root rootPersisted []chan error // closed when root is persisted persistedCallbacks []index.BatchCallback @@ -57,6 +59,12 @@ type Scorch struct { eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC. ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet. + // keeps track of segments scheduled for online copy/backup operation. Each segment's filename maps to + // the count of copy schedules. Segments with non-zero counts are protected from removal by the cleanup + // operation. Counts decrement upon successful copy, allowing removal of segments with zero or absent counts. + // must be accessed within the rootLock as it is accessed by the asynchronous cleanup routine. + copyScheduled map[string]int + numSnapshotsToKeep int closeCh chan struct{} introductions chan *segmentIntroduction @@ -110,6 +118,7 @@ func NewScorch(storeName string, ineligibleForRemoval: map[string]bool{}, forceMergeRequestCh: make(chan *mergerCtrl, 1), segPlugin: defaultSegmentPlugin, + copyScheduled: map[string]int{}, } forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config) @@ -715,3 +724,34 @@ func parseToInteger(i interface{}) (int, error) { return 0, fmt.Errorf("expects int or float64 value") } } + +// CopyReader returns a low-level accessor for index data, ensuring persisted segments +// remain on disk for backup, preventing race conditions with the persister/merger cleanup. +// Close the reader after backup to allow segment removal by the persister/merger. +func (s *Scorch) CopyReader() index.CopyReader { + s.rootLock.Lock() + rv := s.root + if rv != nil { + rv.AddRef() + var fileName string + // schedule a backup for all the segments from the root. Note that the + // both the unpersisted and persisted segments are scheduled for backup. + // because during the backup, the unpersisted segments may get persisted and + // hence we need to protect both the unpersisted and persisted segments from removal + // by the cleanup routine during the online backup + for _, seg := range rv.segment { + if perSeg, ok := seg.segment.(segment.PersistedSegment); ok { + // segment is persisted + fileName = filepath.Base(perSeg.Path()) + } else { + // segment is not persisted + // the name of the segment file that is generated if the + // the segment is persisted in the future. + fileName = zapFileName(seg.id) + } + rv.parent.copyScheduled[fileName]++ + } + } + s.rootLock.Unlock() + return rv +} diff --git a/index/scorch/snapshot_index.go b/index/scorch/snapshot_index.go index a7ff7cf13..fd5413236 100644 --- a/index/scorch/snapshot_index.go +++ b/index/scorch/snapshot_index.go @@ -843,3 +843,26 @@ func (i *IndexSnapshot) GetSpatialAnalyzerPlugin(typ string) ( } return rv, nil } + +func (is *IndexSnapshot) CloseCopyReader() error { + // first unmark the segments that were marked for backup by this index snapshot + is.parent.rootLock.Lock() + for _, seg := range is.segment { + var fileName string + if perSeg, ok := seg.segment.(segment.PersistedSegment); ok { + // segment is persisted + fileName = filepath.Base(perSeg.Path()) + } else { + // segment is not persisted + // the name of the segment file that is generated if the + // the segment is persisted in the future. + fileName = zapFileName(seg.id) + } + if is.parent.copyScheduled[fileName]--; is.parent.copyScheduled[fileName] <= 0 { + delete(is.parent.copyScheduled, fileName) + } + } + is.parent.rootLock.Unlock() + // close the index snapshot normally + return is.Close() +} diff --git a/index_impl.go b/index_impl.go index 8a9cfd3b4..4ae15964f 100644 --- a/index_impl.go +++ b/index_impl.go @@ -921,22 +921,23 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) { return ErrorIndexClosed } - indexReader, err := i.i.Reader() - if err != nil { - return err + copyIndex, ok := i.i.(index.CopyIndex) + if !ok { + return fmt.Errorf("index implementation does not support copy reader") + } + + copyReader := copyIndex.CopyReader() + if copyReader == nil { + return fmt.Errorf("index's copyReader is nil") } + defer func() { - if cerr := indexReader.Close(); err == nil && cerr != nil { + if cerr := copyReader.CloseCopyReader(); err == nil && cerr != nil { err = cerr } }() - irc, ok := indexReader.(IndexCopyable) - if !ok { - return fmt.Errorf("index implementation does not support copy") - } - - err = irc.CopyTo(d) + err = copyReader.CopyTo(d) if err != nil { return fmt.Errorf("error copying index metadata: %v", err) }