Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SafeDelete faster for large blocks; TSDB block repair case #1056

Merged
merged 29 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f0dfe27
bucket verify: repair out of order labels
jjneely Mar 22, 2019
f09d3f5
verify repair: correctly order series in the index on rewrite
jjneely Mar 25, 2019
230bc7b
Fix the TSDB block safe-delete function
jjneely Mar 28, 2019
3f0d6cd
verify repair: fix duplicate chunk detection
jjneely Apr 1, 2019
ed78bde
PR feedback: use errors.Errorf() instead of fmt.Errorf()
jjneely Apr 12, 2019
7fcf793
Allow safe-delete to use previously downloaded TSDB block
jjneely Apr 15, 2019
2f179ff
Use errors.New()
jjneely Apr 15, 2019
9ca0c05
Merge branch 'jjneely/fix-repair-for-out-of-order-labels' into jjneel…
jjneely Apr 15, 2019
fd572aa
Repair duplicate series in a TSDB block
jjneely Apr 15, 2019
24173d3
Liberally comment this for loop
jjneely Apr 15, 2019
0d39253
Take advantage of sort.Interface
jjneely Apr 17, 2019
5a7fd5d
Merge branch 'jjneely/fix-repair-for-out-of-order-labels' into jjneel…
jjneely Apr 17, 2019
bfd44f1
PR Feedback: Comments are full sentences.
jjneely Apr 18, 2019
d1274ef
Merge branch 'jjneely/fix-repair-for-out-of-order-labels' into jjneel…
jjneely Apr 23, 2019
0a01898
Merge branch 'jjneely/safe-delete' into jjneely/safe-delete-pr
jjneely Apr 23, 2019
bf62c5c
PR Feedback
jjneely Apr 29, 2019
e296f8f
PR Feedback: Handle other possible Stat() errors
jjneely May 13, 2019
768ec93
PR Feedback: Add TODO for future metric of dropped series in block
jjneely May 13, 2019
c46ff1c
PR feedback: Make SafeDelete() more explicit
jjneely May 13, 2019
b625a95
PR Feedback: refactor SafeDelete
jjneely May 17, 2019
380672b
Use lset.String() rather than fmt.Sprintf()
jjneely Jun 14, 2019
5599139
SafeDelete(): Make comments match function behavior
jjneely Jun 14, 2019
e6ad6a0
PR feedback: comments in pkg/verifier/safe_delete.go
jjneely Jun 25, 2019
649df4d
PR feedback: comments in pkg/verifier/safe_delete.go
jjneely Jun 25, 2019
9e26cfd
PR Feedback: comments in pkg/block/index.go
jjneely Jun 25, 2019
31f7dd6
Update pkg/verifier/safe_delete.go
jjneely Jun 25, 2019
f6f3443
PR feedback: split SafeDelete() into multiple functions
jjneely Jun 27, 2019
37d495a
PR Feedback: Log the upstream PR with this TODO
jjneely Jun 27, 2019
b852c34
PR Feedback: clean up function signatures
jjneely Jul 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,12 +495,18 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
resmeta.Stats = tsdb.BlockStats{} // reset stats
resmeta.Thanos.Source = source // update source

if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
// TSDB may rewrite metadata in bdir.
// TODO: This is not needed in newer TSDB code. See
// https://github.com/prometheus/tsdb/pull/637
if err := metadata.Write(logger, bdir, meta); err != nil {
return resid, err
}
return resid, nil
}

Expand Down Expand Up @@ -595,6 +601,7 @@ type seriesRepair struct {
// rewrite writes all data from the readers back into the writers while cleaning
// up mis-ordered and duplicated chunks.
func rewrite(
logger log.Logger,
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader,
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter,
meta *metadata.Meta,
Expand Down Expand Up @@ -665,8 +672,22 @@ func rewrite(
return labels.Compare(series[i].lset, series[j].lset) < 0
})

lastSet := labels.Labels{}
// Build a new TSDB block.
for _, s := range series {
// The TSDB library will throw an error if we add a series with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove double spaces in this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old habits die hard...fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I ask where is this habit from? Sorry for my ignorance - super curious (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol... Search google for "double space after period" you'll learn more than you bargained for. Like, my age. :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL never heard of this either. But actually the rule could apply here since code formatting is using monospaced fonts mostly :)

// identical labels as the last series. This means that we have
// discovered a duplicate time series in the old block. We drop
// all duplicate series preserving the first one.
// TODO: Add metric to count dropped series if repair becomes a daemon
// rather than a batch job.
if labels.Compare(lastSet, s.lset) == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The series are sorted based on their labelSet. What happens if two labelSets are identical? For one they will be consecutive in the slice. So if the labelSet we have now is the same as the previous labelSet we have a duplicate series which the TSDB code will produce an error on. So we just drop that series.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment on the implied behavior here. I believe you when you say you've seen this behavior, but it worries me, as something went terribly wrong wherever these blocks were produced. We should at least document that the heuristic here is that on collision we keep the first series found and drop consecutive ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry one more thing. As this shouldn't generally happen, let's add a metric so we can monitor when it does happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repair process batch job only which makes metrics less useful. I had some similar comments in #964

Perhaps I should leave a comment if that were to change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's in that case put a TODO in there

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO added.

level.Warn(logger).Log("msg",
"dropping duplicate series in tsdb block found",
"labelset", s.lset.String(),
)
continue
}
if err := chunkw.WriteChunks(s.chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
Expand All @@ -691,6 +712,7 @@ func rewrite(
}
postings.Add(i, s.lset)
i++
lastSet = s.lset
}

s := make([]string, 0, 256)
Expand Down
2 changes: 1 addition & 1 deletion pkg/verifier/duplicated_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objst
}

for i, id := range toKill {
if err := SafeDelete(ctx, logger, bkt, backupBkt, id); err != nil {
if err := BackupAndDelete(ctx, logger, bkt, backupBkt, id); err != nil {
return err
}
level.Info(logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1, "issue", DuplicatedCompactionIssueID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
}

level.Info(logger).Log("msg", "safe deleting broken block", "id", id, "issue", IndexIssueID)
if err := SafeDelete(ctx, logger, bkt, backupBkt, id); err != nil {
if err := BackupAndDeleteDownloaded(ctx, logger, filepath.Join(tmpdir, id.String()), bkt, backupBkt, id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(logger).Log("msg", "all good, continuing", "id", id, "issue", IndexIssueID)
Expand Down
91 changes: 76 additions & 15 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -14,29 +15,35 @@ import (
"github.com/pkg/errors"
)

// SafeDelete moves block to backup bucket and if succeeded, removes it from source bucket.
// It returns error if block dir already exists in backup bucket (blocks should be immutable) or any
// of the operation fails.
func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, id ulid.ULID) error {
// TSDBBlockExistsInBucket checks to see if a given TSDB block ID exists in a
// bucket. If so, true is returned. An error is returned on failure and in
// such case the boolean result has no meaning.
func TSDBBlockExistsInBucket(ctx context.Context, bkt objstore.Bucket, id ulid.ULID) (bool, error) {
foundDir := false
err := backupBkt.Iter(ctx, id.String(), func(name string) error {
err := bkt.Iter(ctx, id.String(), func(name string) error {
foundDir = true
return nil
})

return foundDir, err
}

// BackupAndDelete moves a TSDB block to a backup bucket and on success removes
// it from the source bucket. It returns error if block dir already exists in
// the backup bucket (blocks should be immutable) or if any of the operations
// fail.
func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objstore.Bucket, id ulid.ULID) error {
// Does this TSDB block exist in backupBkt already?
found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id)
if err != nil {
return err
}

if foundDir {
if found {
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

tempdir, err := ioutil.TempDir("", "safe-delete")
if err != nil {
return err
}
dir := filepath.Join(tempdir, id.String())
err = os.Mkdir(dir, 0755)
// Create a tempdir to locally store TSDB block.
tempdir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id.String()))
if err != nil {
return err
}
Expand All @@ -46,18 +53,72 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
}
}()

// Download the TSDB block.
dir := filepath.Join(tempdir, id.String())
if err := block.Download(ctx, logger, bkt, id, dir); err != nil {
return errors.Wrap(err, "download from source")
}

if err := block.Upload(ctx, logger, backupBkt, dir); err != nil {
return errors.Wrap(err, "upload to backup")
// Backup the block.
if err := backupDownloaded(ctx, logger, dir, backupBkt, id); err != nil {
return err
}

// Block uploaded, so we are ok to remove from src bucket.
level.Info(logger).Log("msg", "Deleting block", "id", id.String())
if err := block.Delete(ctx, bkt, id); err != nil {
return errors.Wrap(err, "delete from source")
}

return nil
}

// BackupAndDeleteDownloaded works much like BackupAndDelete in that it will
// move a TSDB block from a bucket to a backup bucket. The bdir parameter
// points to the location on disk where the TSDB block was previously
// downloaded allowing this function to avoid downloading the TSDB block from
// the source bucket again. An error is returned if any operation fails.
func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir string, bkt, backupBkt objstore.Bucket, id ulid.ULID) error {
// Does this TSDB block exist in backupBkt already?
found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id)
if err != nil {
return err
}
if found {
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

// Backup the block.
if err := backupDownloaded(ctx, logger, bdir, backupBkt, id); err != nil {
return err
}

// Block uploaded, so we are ok to remove from src bucket.
level.Info(logger).Log("msg", "Deleting block", "id", id.String())
if err := block.Delete(ctx, bkt, id); err != nil {
return errors.Wrap(err, "delete from source")
}

return nil
}

// backupDownloaded is a helper function that uploads a TSDB block
// found on disk to the given bucket. An error is returned if any operation
// fails.
func backupDownloaded(ctx context.Context, logger log.Logger, bdir string, backupBkt objstore.Bucket, id ulid.ULID) error {
// Safety checks.
if _, err := os.Stat(filepath.Join(bdir, "meta.json")); err != nil {
// If there is any error stat'ing meta.json inside the TSDB block
// then declare the existing block as bad and refuse to upload it.
// TODO: Make this check more robust.
return errors.Wrap(err, "existing tsdb block is invalid")
}

// Upload the on disk TSDB block.
level.Info(logger).Log("msg", "Uploading block to backup bucket", "id", id.String())
if err := block.Upload(ctx, logger, backupBkt, bdir); err != nil {
return errors.Wrap(err, "upload to backup")
}

return nil
}