Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MB-61421: Prevent deletion of segments scheduled for copy (#2032) #2046

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x]
go-version: [1.16.x, 1.17.x, 1.18.x]
platform: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.13
require (
github.com/RoaringBitmap/roaring v0.9.4
github.com/bits-and-blooms/bitset v1.2.0
github.com/blevesearch/bleve_index_api v1.0.2
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/blevesearch/go-porterstemmer v1.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/blevesearch/bleve_index_api v1.0.1/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/bleve_index_api v1.0.2 h1:rO736FwEPMVY1mGi7d4n7CgBB3+tB7uYN7QTjR+Ij+s=
github.com/blevesearch/bleve_index_api v1.0.2/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5 h1:AG1xQCQKv8dqODzsCc5v1bnxOumAcLeHDHKPiVTGhqE=
github.com/blevesearch/bleve_index_api v1.0.3-0.20240624205006-07f7b7930fd5/go.mod h1:fiwKS0xLEm+gBRgv5mumf0dhgFr2mDgZah1pqv1c1M4=
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121 h1:5uNzC0Mn/8aCGbSJA6T8ZCjrKW8MKsZKQYBDowmeV/g=
github.com/blevesearch/geo v0.1.12-0.20220606102651-aab42add3121/go.mod h1:8z6udmXe8Ek8uuX4qOIWKb50vY/OQ1SG+XhL5FrcHOU=
github.com/blevesearch/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:kDy+zgJFJJoJYBvdfBSiZYBbdsUL0XcjHYWezpQBGPA=
Expand Down
6 changes: 3 additions & 3 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
}
}

var filenames []string
newSegmentPaths := make(map[uint64]string)
filenames := make([]string, 0, len(snapshot.segment))
newSegmentPaths := make(map[uint64]string, len(snapshot.segment))

// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
Expand Down Expand Up @@ -982,7 +982,7 @@ func (s *Scorch) removeOldZapFiles() error {
for _, finfo := range currFileInfos {
fname := finfo.Name()
if filepath.Ext(fname) == ".zap" {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] {
if _, exists := liveFileNames[fname]; !exists && !s.ineligibleForRemoval[fname] && (s.copyScheduled[fname] <= 0) {
err := os.Remove(s.path + string(os.PathSeparator) + fname)
if err != nil {
log.Printf("got err removing file: %s, err: %v", fname, err)
Expand Down
40 changes: 40 additions & 0 deletions index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -50,13 +51,20 @@ type Scorch struct {
unsafeBatch bool

rootLock sync.RWMutex

root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
persistedCallbacks []index.BatchCallback
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.

// keeps track of segments scheduled for online copy/backup operation. Each segment's filename maps to
// the count of copy schedules. Segments with non-zero counts are protected from removal by the cleanup
// operation. Counts decrement upon successful copy, allowing removal of segments with zero or absent counts.
// must be accessed within the rootLock as it is accessed by the asynchronous cleanup routine.
copyScheduled map[string]int

numSnapshotsToKeep int
closeCh chan struct{}
introductions chan *segmentIntroduction
Expand Down Expand Up @@ -110,6 +118,7 @@ func NewScorch(storeName string,
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
copyScheduled: map[string]int{},
}

forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
Expand Down Expand Up @@ -715,3 +724,34 @@ func parseToInteger(i interface{}) (int, error) {
return 0, fmt.Errorf("expects int or float64 value")
}
}

// CopyReader returns a low-level accessor for index data, ensuring persisted segments
// remain on disk for backup, preventing race conditions with the persister/merger cleanup.
// Close the reader after backup to allow segment removal by the persister/merger.
func (s *Scorch) CopyReader() index.CopyReader {
s.rootLock.Lock()
rv := s.root
if rv != nil {
rv.AddRef()
var fileName string
// schedule a backup for all the segments from the root. Note that the
// both the unpersisted and persisted segments are scheduled for backup.
// because during the backup, the unpersisted segments may get persisted and
// hence we need to protect both the unpersisted and persisted segments from removal
// by the cleanup routine during the online backup
for _, seg := range rv.segment {
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
rv.parent.copyScheduled[fileName]++
}
}
s.rootLock.Unlock()
return rv
}
23 changes: 23 additions & 0 deletions index/scorch/snapshot_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,3 +843,26 @@ func (i *IndexSnapshot) GetSpatialAnalyzerPlugin(typ string) (
}
return rv, nil
}

func (is *IndexSnapshot) CloseCopyReader() error {
// first unmark the segments that were marked for backup by this index snapshot
is.parent.rootLock.Lock()
for _, seg := range is.segment {
var fileName string
if perSeg, ok := seg.segment.(segment.PersistedSegment); ok {
// segment is persisted
fileName = filepath.Base(perSeg.Path())
} else {
// segment is not persisted
// the name of the segment file that is generated if the
// the segment is persisted in the future.
fileName = zapFileName(seg.id)
}
if is.parent.copyScheduled[fileName]--; is.parent.copyScheduled[fileName] <= 0 {
delete(is.parent.copyScheduled, fileName)
}
}
is.parent.rootLock.Unlock()
// close the index snapshot normally
return is.Close()
}
3 changes: 3 additions & 0 deletions index/upsidedown/store/boltdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !darwin || !arm64
// +build !darwin !arm64

package boltdb

import (
Expand Down
21 changes: 11 additions & 10 deletions index_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,22 +921,23 @@ func (i *indexImpl) CopyTo(d index.Directory) (err error) {
return ErrorIndexClosed
}

indexReader, err := i.i.Reader()
if err != nil {
return err
copyIndex, ok := i.i.(index.CopyIndex)
if !ok {
return fmt.Errorf("index implementation does not support copy reader")
}

copyReader := copyIndex.CopyReader()
if copyReader == nil {
return fmt.Errorf("index's copyReader is nil")
}

defer func() {
if cerr := indexReader.Close(); err == nil && cerr != nil {
if cerr := copyReader.CloseCopyReader(); err == nil && cerr != nil {
err = cerr
}
}()

irc, ok := indexReader.(IndexCopyable)
if !ok {
return fmt.Errorf("index implementation does not support copy")
}

err = irc.CopyTo(d)
err = copyReader.CopyTo(d)
if err != nil {
return fmt.Errorf("error copying index metadata: %v", err)
}
Expand Down
Loading