-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make SafeDelete faster for large blocks; TSDB block repair case #1056
Changes from 15 commits
f0dfe27
f09d3f5
230bc7b
3f0d6cd
ed78bde
7fcf793
2f179ff
9ca0c05
fd572aa
24173d3
0d39253
5a7fd5d
bfd44f1
d1274ef
0a01898
bf62c5c
e296f8f
768ec93
c46ff1c
b625a95
380672b
5599139
e6ad6a0
649df4d
9e26cfd
31f7dd6
f6f3443
37d495a
b852c34
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -495,12 +495,16 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT | |
resmeta.Stats = tsdb.BlockStats{} // reset stats | ||
resmeta.Thanos.Source = source // update source | ||
|
||
if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { | ||
if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { | ||
return resid, errors.Wrap(err, "rewrite block") | ||
} | ||
if err := metadata.Write(logger, resdir, &resmeta); err != nil { | ||
return resid, err | ||
} | ||
// TSDB may rewrite metadata in bdir | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also missing trailing period in comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if err := metadata.Write(logger, bdir, meta); err != nil { | ||
return resid, err | ||
} | ||
return resid, nil | ||
} | ||
|
||
|
@@ -595,6 +599,7 @@ type seriesRepair struct { | |
// rewrite writes all data from the readers back into the writers while cleaning | ||
// up mis-ordered and duplicated chunks. | ||
func rewrite( | ||
logger log.Logger, | ||
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, | ||
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, | ||
meta *metadata.Meta, | ||
|
@@ -665,8 +670,16 @@ func rewrite( | |
return labels.Compare(series[i].lset, series[j].lset) < 0 | ||
}) | ||
|
||
lastSet := labels.Labels{} | ||
// Build a new TSDB block. | ||
for _, s := range series { | ||
if labels.Compare(lastSet, s.lset) == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The series are sorted based on their labelSet. What happens if two labelSets are identical? For one they will be consecutive in the slice. So if the labelSet we have now is the same as the previous labelSet we have a duplicate series which the TSDB code will produce an error on. So we just drop that series. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a comment on the implied behavior here. I believe you when you say you've seen this behavior, but it worries me, as something went terribly wrong wherever these blocks were produced. We should at least document that the heuristic here is that on collision we keep the first series found and drop consecutive ones. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Thank you! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry one more thing. As this shouldn't generally happen, let's add a metric so we can monitor when it does happen. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The repair process batch job only which makes metrics less useful. I had some similar comments in #964 Perhaps I should leave a comment if that were to change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes let's in that case put a TODO in there There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO added. |
||
level.Warn(logger).Log("msg", | ||
"dropping duplicate series in tsdb block found", | ||
"labelset", fmt.Sprintf("%s", s.lset), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
) | ||
continue | ||
} | ||
if err := chunkw.WriteChunks(s.chks...); err != nil { | ||
return errors.Wrap(err, "write chunks") | ||
} | ||
|
@@ -691,6 +704,7 @@ func rewrite( | |
} | ||
postings.Add(i, s.lset) | ||
i++ | ||
lastSet = s.lset | ||
} | ||
|
||
s := make([]string, 0, 256) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ package verifier | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"io/ioutil" | ||
"os" | ||
"path/filepath" | ||
|
@@ -14,10 +15,13 @@ import ( | |
"github.com/pkg/errors" | ||
) | ||
|
||
// SafeDelete moves block to backup bucket and if succeeded, removes it from source bucket. | ||
// It returns error if block dir already exists in backup bucket (blocks should be immutable) or any | ||
// of the operation fails. | ||
func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, id ulid.ULID) error { | ||
// SafeDelete moves block to backup bucket and if succeeded, removes it from | ||
// source bucket. It returns error if block dir already exists in backup | ||
// bucket (blocks should be immutable) or any of the operation fails. If | ||
// the block has already been downloaded it must exist in tempdir, else it | ||
// will be downloaded to be copied to the backup bucket. tempdir set to the | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer not to hide this behavior in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I meant let's pass two parameters to explicitly describe the intention There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WIP There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, committed a patch for makes this explicit. I've tested the repair process with my collection of broken blocks I've started keeping and had the behavior expected. (No repeated downloads.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had thanos-compact cut a 450GiB TSDB block today which I referenced in #1152 which is part of the motivation for this. |
||
// zero value forces download. | ||
func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, id ulid.ULID, tempdir string) error { | ||
foundDir := false | ||
err := backupBkt.Iter(ctx, id.String(), func(name string) error { | ||
foundDir = true | ||
|
@@ -31,23 +35,33 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac | |
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id) | ||
} | ||
|
||
tempdir, err := ioutil.TempDir("", "safe-delete") | ||
if err != nil { | ||
return err | ||
if tempdir == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we already have the block, don't download it again. Blocks can be large. 30GiB or close to it in my environment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I wasn't clear, I would expect that when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, refactored this a bit. It will return an error if you ask it to use an existing tempdir and do not provide one, and the logic is a little simpler. |
||
// tempdir unspecified, create one | ||
tempdir, err = ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id.String())) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
dir := filepath.Join(tempdir, id.String()) | ||
err = os.Mkdir(dir, 0755) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { | ||
if err := os.RemoveAll(tempdir); err != nil { | ||
level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) | ||
|
||
if _, err := os.Stat(dir); err != nil { | ||
// TSDB block not already present, download it | ||
err = os.Mkdir(dir, 0755) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should only be done if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, thanks! |
||
if err != nil { | ||
return err | ||
} | ||
}() | ||
defer func() { | ||
if err := os.RemoveAll(tempdir); err != nil { | ||
level.Warn(logger).Log("msg", "failed to delete dir", "dir", tempdir, "err", err) | ||
} | ||
}() | ||
|
||
if err := block.Download(ctx, logger, bkt, id, dir); err != nil { | ||
return errors.Wrap(err, "download from source") | ||
if err := block.Download(ctx, logger, bkt, id, dir); err != nil { | ||
return errors.Wrap(err, "download from source") | ||
} | ||
} else { | ||
level.Info(logger).Log("msg", "using previously downloaded tsdb block", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what kind of situation would we end up in this path? And if the answer is a (half or maybe even fully) finished previous download, then how do we prevent that the directory is there but the content is not? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My blocks (and I'm sure others) easily grow above 30GiB when compacted. So if I need to repair block A, Thanos will download block A, create a repaired block B, upload block B. So at this point, we've created, successfully, a new block from A and that block B now passes the At this point we call This process can be time consuming (and perhaps expensive). So if blocks are immutable, and the repair process verifies that A is a legitimate block (if malformed) we should be able to skip the redundant download of A and backup the existing downloaded copy. |
||
"id", id.String()) | ||
} | ||
|
||
if err := block.Upload(ctx, logger, backupBkt, dir); err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TSDB code handily rewrites the block metadata even though we don't write or change read block here. This nukes the extra metadata Thanos uses. Write it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is also a bug that you are trying to fix? Perhaps we should add it to the changelog and/or the PR messsage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a bug, this is part of the initial patches to be able to reuse downloaded blocks. The TSDB library isn't aware of the additional meta data Thanos provides and was removing it. Therefore, when a previously downloaded block is reused it would have incomplete metadata without this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is being changed now: prometheus-junkyard/tsdb#637 so this won't be needed TBH (once we use latest TSDB).
Also this is true what @GiedriusS said - splitting PRs into focused changes helps a lot. However this change relies on this fix. I would be more explicit in description. This will help us to understand this early on (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add TODO to update TSDB and remove this as it will be not needed in newest TSDB. For now we need this, thanks for fixing this 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo added, thanks!