Skip to content

Commit

Permalink
fix: ignore empty index error deleting last measurement (influxdata#2…
Browse files Browse the repository at this point in the history
…5037)

An empty index is appropriate when deleting the last
measurement.  Also clean up error handling, avoid
duplicate calls to Close.

closes influxdata#9929
  • Loading branch information
davidby-influx authored and chengshiwen committed Aug 28, 2024
1 parent 045a119 commit a9eefa3
Showing 1 changed file with 80 additions and 51 deletions.
131 changes: 80 additions & 51 deletions cmd/influx_inspect/deletetsm/deletetsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package deletetsm

import (
"errors"
"flag"
"fmt"
"io"
Expand All @@ -10,6 +11,7 @@ import (
"time"

"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)

Expand Down Expand Up @@ -68,77 +70,104 @@ func (cmd *Command) Run(args ...string) (err error) {
return nil
}

func (cmd *Command) process(path string) error {
// Open TSM reader.
input, err := os.Open(path)
if err != nil {
return err
}
defer input.Close()

r, err := tsm1.NewTSMReader(input)
if err != nil {
return fmt.Errorf("unable to read %s: %s", path, err)
}
defer r.Close()

func (cmd *Command) process(path string) (retErr error) {
// Remove previous temporary files.
outputPath := path + ".rewriting.tmp"
if err := os.RemoveAll(outputPath); err != nil {
return err
} else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil {
return err
}

// Create TSMWriter to temporary location.
output, err := os.Create(outputPath)
// Open TSM reader.
input, err := os.Open(path)
if err != nil {
return err
}
defer output.Close()

w, err := tsm1.NewTSMWriter(output)
r, err := tsm1.NewTSMReader(input)
if err != nil {
return err
// close the input file on error creating the TSMReader
_ = input.Close()
return fmt.Errorf("unable to read %s: %w", path, err)
}
defer w.Close()
// Nested function to ensure all the deferred close operations happen before final deletion or rename
size, err := func() (size uint32, fRetErr error) {
// This will close the input file
defer errors2.Capture(&retErr, r.Close)()

if err := os.RemoveAll(outputPath); err != nil {
return 0, err
} else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil {
return 0, err
}

// Iterate over the input blocks.
itr := r.BlockIterator()
for itr.Next() {
// Read key & time range.
key, minTime, maxTime, _, _, block, err := itr.Read()
// Create TSMWriter to temporary location.
output, err := os.Create(outputPath)
if err != nil {
return err
return 0, err
}

// Skip block if this is the measurement and time range we are deleting.
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement, tags := models.ParseKey(series)
if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) {
log.Printf("deleting block: %s (%s-%s) sz=%d",
key,
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
len(block),
)
continue
w, err := tsm1.NewTSMWriter(output)
if err != nil {
// close the output file on error creating the TSMWriter
_ = output.Close()
return 0, fmt.Errorf("unable to write %s: %w", outputPath, err)
}

if err := w.WriteBlock(key, minTime, maxTime, block); err != nil {
return err
// This will close the output file
defer errors2.Capture(&fRetErr, w.Close)()

// Iterate over the input blocks.
itr := r.BlockIterator()
blockWritten := false
for itr.Next() {
// Read key & time range.
key, minTime, maxTime, _, _, block, err := itr.Read()
if err != nil {
return 0, err
}

// Skip block if this is the measurement and time range we are deleting.
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement, tags := models.ParseKey(series)
if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) {
log.Printf("deleting block: %s (%s-%s) sz=%d",
key,
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
len(block),
)
continue
}

if err := w.WriteBlock(key, minTime, maxTime, block); err != nil {
return 0, err
}
blockWritten = true
}
}

// Write index & close.
if err := w.WriteIndex(); err != nil {
return err
} else if err := w.Close(); err != nil {
// Write index & close.
// It is okay to have no index values if no block was written
if err := w.WriteIndex(); err != nil && !(blockWritten || errors.Is(err, tsm1.ErrNoValues)) {
return 0, err
} else {
return w.Size(), nil
}
}()
if err != nil {
return err
}

// Replace original file with new file.
return os.Rename(outputPath, path)
if size > 0 {
// Replace original file with new file.
return os.Rename(outputPath, path)
} else {
// Empty TSM file of size == 0, remove it
if err = os.RemoveAll(path); err != nil {
err = fmt.Errorf("cannot remove %s: %w", path, err)
}
if err2 := os.RemoveAll(outputPath); err2 != nil && err == nil {
return fmt.Errorf("cannot remove temporary file %s: %w", outputPath, err)
} else {
return err
}
}
}

func (cmd *Command) printUsage() {
Expand Down

0 comments on commit a9eefa3

Please sign in to comment.