-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for TSI shard streaming and shard size #8491
Changes from all commits
12a2ff7
67c67ae
368420c
abae36f
38e0dd6
041a383
b10249a
81976bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,11 @@ import ( | |
"go.uber.org/zap" | ||
) | ||
|
||
const ( | ||
// The extension used to describe temporary snapshot files. | ||
TmpTSMFileExtension = "tmp" | ||
) | ||
|
||
// TSMFile represents an on-disk TSM file. | ||
type TSMFile interface { | ||
// Path returns the underlying file path for the TSMFile. If the file | ||
|
@@ -433,8 +438,9 @@ func (f *FileStore) Open() error { | |
if err != nil { | ||
return err | ||
} | ||
ext := fmt.Sprintf(".%s", TmpTSMFileExtension) | ||
for _, fi := range tmpfiles { | ||
if fi.IsDir() && strings.HasSuffix(fi.Name(), ".tmp") { | ||
if fi.IsDir() && strings.HasSuffix(fi.Name(), ext) { | ||
ss := strings.Split(filepath.Base(fi.Name()), ".") | ||
if len(ss) == 2 { | ||
if i, err := strconv.Atoi(ss[0]); err != nil { | ||
|
@@ -635,16 +641,20 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF | |
f.mu.RUnlock() | ||
|
||
updated := make([]TSMFile, 0, len(newFiles)) | ||
tsmTmpExt := fmt.Sprintf("%s.%s", TSMFileExtension, TmpTSMFileExtension) | ||
|
||
// Rename all the new files to make them live on restart | ||
for _, file := range newFiles { | ||
var newName = file | ||
if strings.HasSuffix(file, ".tmp") { | ||
if strings.HasSuffix(file, tsmTmpExt) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "." is missing as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. defined on 574 |
||
// The new TSM files have a tmp extension. First rename them. | ||
newName = file[:len(file)-4] | ||
if err := os.Rename(file, newName); err != nil { | ||
return err | ||
} | ||
} else if !strings.HasSuffix(file, TSMFileExtension) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That check happens in the In the Without this |
||
// This isn't a .tsm or .tsm.tmp file. | ||
continue | ||
} | ||
|
||
fd, err := os.Open(newName) | ||
|
@@ -701,7 +711,7 @@ func (f *FileStore) replace(oldFiles, newFiles []string, updatedFn func(r []TSMF | |
deletes = append(deletes, file.Path()) | ||
|
||
// Rename the TSM file used by this reader | ||
tempPath := file.Path() + ".tmp" | ||
tempPath := fmt.Sprintf("%s.%s", file.Path(), TmpTSMFileExtension) | ||
if err := file.Rename(tempPath); err != nil { | ||
return err | ||
} | ||
|
@@ -958,7 +968,7 @@ func (f *FileStore) CreateSnapshot() (string, error) { | |
defer f.mu.RUnlock() | ||
|
||
// get a tmp directory name | ||
tmpPath := fmt.Sprintf("%s/%d.tmp", f.dir, f.currentTempDirID) | ||
tmpPath := fmt.Sprintf("%s/%d.%s", f.dir, f.currentTempDirID, TmpTSMFileExtension) | ||
err := os.Mkdir(tmpPath, 0777) | ||
if err != nil { | ||
return "", err | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2434,7 +2434,7 @@ func TestFileStore_Replace(t *testing.T) { | |
} | ||
|
||
// Replace requires assumes new files have a .tmp extension | ||
replacement := files[2] + ".tmp" | ||
replacement := fmt.Sprintf("%s.%s", files[2], tsm1.TmpTSMFileExtension) | ||
os.Rename(files[2], replacement) | ||
|
||
fs := tsm1.NewFileStore(dir) | ||
|
@@ -2663,9 +2663,9 @@ func TestFileStore_Stats(t *testing.T) { | |
"mem": []tsm1.Value{tsm1.NewValue(0, 1.0)}, | ||
}) | ||
|
||
replacement := files[2] + "-foo" + ".tmp" // Assumes new files have a .tmp extension | ||
replacement := fmt.Sprintf("%s.%s.%s", files[2], tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension) // Assumes new files have a .tmp extension | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should Tmp extension be at the end? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benbjohnson I changed the implementation to check for either |
||
if err := os.Rename(newFile, replacement); err != nil { | ||
|
||
t.Fatalf("rename: %v", err) | ||
} | ||
// Replace 3 w/ 1 | ||
if err := fs.Replace(files, []string{replacement}); err != nil { | ||
|
@@ -2675,7 +2675,7 @@ func TestFileStore_Stats(t *testing.T) { | |
var found bool | ||
stats = fs.Stats() | ||
for _, stat := range stats { | ||
if strings.HasSuffix(stat.Path, "-foo") { | ||
if strings.HasSuffix(stat.Path, fmt.Sprintf("%s.%s.%s", tsm1.TSMFileExtension, tsm1.TmpTSMFileExtension, tsm1.TSMFileExtension)) { | ||
found = true | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is missing the "." now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See a couple of lines up