Skip to content

Commit

Permalink
Make SafeDelete faster for large blocks; TSDB block repair case (#1056)
Browse files Browse the repository at this point in the history
* bucket verify: repair out of order labels

* verify repair: correctly order series in the index on rewrite

When we have label sets that are not in the correct order, fixing that
changes the order of the series in the index.  So the index must be
rewritten in that new order.  This makes this repair tool take up a
bunch more memory, but produces blocks that verify correctly.

* Fix the TSDB block safe-delete function

The directory name must be the block ID name exactly to verify.  A temp
directory or random name will not work here.

* verify repair: fix duplicate chunk detection

Pointer/reference logic error was eliminating all chunks for a series in
a given TSDB block that wasn't the first chunk.  Chunks are now
referenced correctly via pointers.

* PR feedback: use errors.Errorf() instead of fmt.Errorf()

* Allow safe-delete to use previously downloaded TSDB block

If we have just downloaded a TSDB block for repair, we create a brand
new block that is fixed.  The original block can be used as the data we
upload in SafeDelete() to save time/bandwidth rather than re-download it
from the storage bucket.

* Use errors.New()

Some linters catch errors.Errorf() as its not really part of the errors
package.

* Repair duplicate series in a TSDB block

Where duplicate series means two series with an identical label set.
This code just drops any duplicate series that is encountered after the
first.  No merging or anything fancy.

* Liberally comment this for loop

We're comparing items by pointers, using Go's range variables is
misleading here and we need not fall into the same trap.

* Take advantage of sort.Interface

This prevents us from having to re-implement label sorting.

* PR Feedback: Comments are full sentences.

* PR Feedback

* PR Feedback: Handle other possible Stat() errors

* PR Feedback: Add TODO for future metric of dropped series in block

* PR feedback: Make SafeDelete() more explicit

When we want to SafeDelete an already downloaded TSDB block, be very
explicit about that use case.

* PR Feedback: refactor SafeDelete

Be extra explicit about using existing tempdirs.  Added note about
future improvements for verifying if we have an intact tsdb block.

* Use lset.String() rather than fmt.Sprintf()

* SafeDelete(): Make comments match function behavior

The tempdir value is ignored if useExistingTempdir is false.

* PR feedback: comments in pkg/verifier/safe_delete.go

Co-Authored-By: Bartek Płotka <bwplotka@gmail.com>

* PR feedback: comments in pkg/verifier/safe_delete.go

Co-Authored-By: Bartek Płotka <bwplotka@gmail.com>

* PR Feedback: comments in pkg/block/index.go

* Update pkg/verifier/safe_delete.go

Co-Authored-By: Bartek Płotka <bwplotka@gmail.com>

* PR feedback: split SafeDelete() into multiple functions

* PR Feedback: Log the upstream PR with this TODO

* PR Feedback: clean up function signatures
  • Loading branch information
jjneely authored and bwplotka committed Jul 8, 2019
1 parent 0e46dd9 commit 63257cf
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 18 deletions.
24 changes: 23 additions & 1 deletion pkg/block/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,12 +503,18 @@ func Repair(logger log.Logger, dir string, id ulid.ULID, source metadata.SourceT
resmeta.Stats = tsdb.BlockStats{} // reset stats
resmeta.Thanos.Source = source // update source

if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
if err := rewrite(logger, indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil {
return resid, errors.Wrap(err, "rewrite block")
}
if err := metadata.Write(logger, resdir, &resmeta); err != nil {
return resid, err
}
// TSDB may rewrite metadata in bdir.
// TODO: This is not needed in newer TSDB code. See
// https://github.com/prometheus/tsdb/pull/637
if err := metadata.Write(logger, bdir, meta); err != nil {
return resid, err
}
return resid, nil
}

Expand Down Expand Up @@ -603,6 +609,7 @@ type seriesRepair struct {
// rewrite writes all data from the readers back into the writers while cleaning
// up mis-ordered and duplicated chunks.
func rewrite(
logger log.Logger,
indexr tsdb.IndexReader, chunkr tsdb.ChunkReader,
indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter,
meta *metadata.Meta,
Expand Down Expand Up @@ -673,8 +680,22 @@ func rewrite(
return labels.Compare(series[i].lset, series[j].lset) < 0
})

lastSet := labels.Labels{}
// Build a new TSDB block.
for _, s := range series {
// The TSDB library will throw an error if we add a series with
// identical labels as the last series. This means that we have
// discovered a duplicate time series in the old block. We drop
// all duplicate series preserving the first one.
// TODO: Add metric to count dropped series if repair becomes a daemon
// rather than a batch job.
if labels.Compare(lastSet, s.lset) == 0 {
level.Warn(logger).Log("msg",
"dropping duplicate series in tsdb block found",
"labelset", s.lset.String(),
)
continue
}
if err := chunkw.WriteChunks(s.chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
Expand All @@ -699,6 +720,7 @@ func rewrite(
}
postings.Add(i, s.lset)
i++
lastSet = s.lset
}

s := make([]string, 0, 256)
Expand Down
2 changes: 1 addition & 1 deletion pkg/verifier/duplicated_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objst
}

for i, id := range toKill {
if err := SafeDelete(ctx, logger, bkt, backupBkt, id); err != nil {
if err := BackupAndDelete(ctx, logger, bkt, backupBkt, id); err != nil {
return err
}
level.Info(logger).Log("msg", "Removed duplicated block", "id", id, "to-be-removed", len(toKill)-(i+1), "removed", i+1, "issue", DuplicatedCompactionIssueID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/verifier/index_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
}

level.Info(logger).Log("msg", "safe deleting broken block", "id", id, "issue", IndexIssueID)
if err := SafeDelete(ctx, logger, bkt, backupBkt, id); err != nil {
if err := BackupAndDeleteDownloaded(ctx, logger, filepath.Join(tmpdir, id.String()), bkt, backupBkt, id); err != nil {
return errors.Wrapf(err, "safe deleting old block %s failed", id)
}
level.Info(logger).Log("msg", "all good, continuing", "id", id, "issue", IndexIssueID)
Expand Down
91 changes: 76 additions & 15 deletions pkg/verifier/safe_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package verifier

import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -14,29 +15,35 @@ import (
"github.com/pkg/errors"
)

// SafeDelete moves block to backup bucket and if succeeded, removes it from source bucket.
// It returns error if block dir already exists in backup bucket (blocks should be immutable) or any
// of the operation fails.
func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, id ulid.ULID) error {
// TSDBBlockExistsInBucket checks to see if a given TSDB block ID exists in a
// bucket. If so, true is returned. An error is returned on failure and in
// such case the boolean result has no meaning.
func TSDBBlockExistsInBucket(ctx context.Context, bkt objstore.Bucket, id ulid.ULID) (bool, error) {
foundDir := false
err := backupBkt.Iter(ctx, id.String(), func(name string) error {
err := bkt.Iter(ctx, id.String(), func(name string) error {
foundDir = true
return nil
})

return foundDir, err
}

// BackupAndDelete moves a TSDB block to a backup bucket and on success removes
// it from the source bucket. It returns error if block dir already exists in
// the backup bucket (blocks should be immutable) or if any of the operations
// fail.
func BackupAndDelete(ctx context.Context, logger log.Logger, bkt, backupBkt objstore.Bucket, id ulid.ULID) error {
// Does this TSDB block exist in backupBkt already?
found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id)
if err != nil {
return err
}

if foundDir {
if found {
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

tempdir, err := ioutil.TempDir("", "safe-delete")
if err != nil {
return err
}
dir := filepath.Join(tempdir, id.String())
err = os.Mkdir(dir, 0755)
// Create a tempdir to locally store TSDB block.
tempdir, err := ioutil.TempDir("", fmt.Sprintf("safe-delete-%s", id.String()))
if err != nil {
return err
}
Expand All @@ -46,18 +53,72 @@ func SafeDelete(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac
}
}()

// Download the TSDB block.
dir := filepath.Join(tempdir, id.String())
if err := block.Download(ctx, logger, bkt, id, dir); err != nil {
return errors.Wrap(err, "download from source")
}

if err := block.Upload(ctx, logger, backupBkt, dir); err != nil {
return errors.Wrap(err, "upload to backup")
// Backup the block.
if err := backupDownloaded(ctx, logger, dir, backupBkt, id); err != nil {
return err
}

// Block uploaded, so we are ok to remove from src bucket.
level.Info(logger).Log("msg", "Deleting block", "id", id.String())
if err := block.Delete(ctx, bkt, id); err != nil {
return errors.Wrap(err, "delete from source")
}

return nil
}

// BackupAndDeleteDownloaded works much like BackupAndDelete in that it will
// move a TSDB block from a bucket to a backup bucket. The bdir parameter
// points to the location on disk where the TSDB block was previously
// downloaded allowing this function to avoid downloading the TSDB block from
// the source bucket again. An error is returned if any operation fails.
func BackupAndDeleteDownloaded(ctx context.Context, logger log.Logger, bdir string, bkt, backupBkt objstore.Bucket, id ulid.ULID) error {
// Does this TSDB block exist in backupBkt already?
found, err := TSDBBlockExistsInBucket(ctx, backupBkt, id)
if err != nil {
return err
}
if found {
return errors.Errorf("%s dir seems to exists in backup bucket. Remove this block manually if you are sure it is safe to do", id)
}

// Backup the block.
if err := backupDownloaded(ctx, logger, bdir, backupBkt, id); err != nil {
return err
}

// Block uploaded, so we are ok to remove from src bucket.
level.Info(logger).Log("msg", "Deleting block", "id", id.String())
if err := block.Delete(ctx, bkt, id); err != nil {
return errors.Wrap(err, "delete from source")
}

return nil
}

// backupDownloaded is a helper function that uploads a TSDB block
// found on disk to the given bucket. An error is returned if any operation
// fails.
func backupDownloaded(ctx context.Context, logger log.Logger, bdir string, backupBkt objstore.Bucket, id ulid.ULID) error {
// Safety checks.
if _, err := os.Stat(filepath.Join(bdir, "meta.json")); err != nil {
// If there is any error stat'ing meta.json inside the TSDB block
// then declare the existing block as bad and refuse to upload it.
// TODO: Make this check more robust.
return errors.Wrap(err, "existing tsdb block is invalid")
}

// Upload the on disk TSDB block.
level.Info(logger).Log("msg", "Uploading block to backup bucket", "id", id.String())
if err := block.Upload(ctx, logger, backupBkt, bdir); err != nil {
return errors.Wrap(err, "upload to backup")
}

return nil
}

0 comments on commit 63257cf

Please sign in to comment.