diff --git a/cmd/influx_inspect/deletetsm/deletetsm.go b/cmd/influx_inspect/deletetsm/deletetsm.go index 77083c34ccb..3104b66f54b 100644 --- a/cmd/influx_inspect/deletetsm/deletetsm.go +++ b/cmd/influx_inspect/deletetsm/deletetsm.go @@ -2,6 +2,7 @@ package deletetsm import ( + "errors" "flag" "fmt" "io" @@ -10,6 +11,7 @@ import ( "time" "github.com/influxdata/influxdb/models" + errors2 "github.com/influxdata/influxdb/pkg/errors" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) @@ -68,77 +70,104 @@ func (cmd *Command) Run(args ...string) (err error) { return nil } -func (cmd *Command) process(path string) error { - // Open TSM reader. - input, err := os.Open(path) - if err != nil { - return err - } - defer input.Close() - - r, err := tsm1.NewTSMReader(input) - if err != nil { - return fmt.Errorf("unable to read %s: %s", path, err) - } - defer r.Close() - +func (cmd *Command) process(path string) (retErr error) { // Remove previous temporary files. outputPath := path + ".rewriting.tmp" - if err := os.RemoveAll(outputPath); err != nil { - return err - } else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil { - return err - } - // Create TSMWriter to temporary location. - output, err := os.Create(outputPath) + // Open TSM reader. + input, err := os.Open(path) if err != nil { return err } - defer output.Close() - w, err := tsm1.NewTSMWriter(output) + r, err := tsm1.NewTSMReader(input) if err != nil { - return err + // close the input file on error creating the TSMReader + _ = input.Close() + return fmt.Errorf("unable to read %s: %w", path, err) } - defer w.Close() + // Nested function to ensure all the deferred close operations happen before final deletion or rename + size, err := func() (size uint32, fRetErr error) { + // This will close the input file + defer errors2.Capture(&retErr, r.Close)() + + if err := os.RemoveAll(outputPath); err != nil { + return 0, err + } else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil { + return 0, err + } - // Iterate over the input blocks. - itr := r.BlockIterator() - for itr.Next() { - // Read key & time range. - key, minTime, maxTime, _, _, block, err := itr.Read() + // Create TSMWriter to temporary location. + output, err := os.Create(outputPath) if err != nil { - return err + return 0, err } - // Skip block if this is the measurement and time range we are deleting. - series, _ := tsm1.SeriesAndFieldFromCompositeKey(key) - measurement, tags := models.ParseKey(series) - if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) { - log.Printf("deleting block: %s (%s-%s) sz=%d", - key, - time.Unix(0, minTime).UTC().Format(time.RFC3339Nano), - time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano), - len(block), - ) - continue + w, err := tsm1.NewTSMWriter(output) + if err != nil { + // close the output file on error creating the TSMWriter + _ = output.Close() + return 0, fmt.Errorf("unable to write %s: %w", outputPath, err) } - if err := w.WriteBlock(key, minTime, maxTime, block); err != nil { - return err + // This will close the output file + defer errors2.Capture(&fRetErr, w.Close)() + + // Iterate over the input blocks. + itr := r.BlockIterator() + blockWritten := false + for itr.Next() { + // Read key & time range. + key, minTime, maxTime, _, _, block, err := itr.Read() + if err != nil { + return 0, err + } + + // Skip block if this is the measurement and time range we are deleting. + series, _ := tsm1.SeriesAndFieldFromCompositeKey(key) + measurement, tags := models.ParseKey(series) + if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) { + log.Printf("deleting block: %s (%s-%s) sz=%d", + key, + time.Unix(0, minTime).UTC().Format(time.RFC3339Nano), + time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano), + len(block), + ) + continue + } + + if err := w.WriteBlock(key, minTime, maxTime, block); err != nil { + return 0, err + } + blockWritten = true } - } - // Write index & close. - if err := w.WriteIndex(); err != nil { - return err - } else if err := w.Close(); err != nil { + // Write index & close. + // It is okay to have no index values if no block was written + if err := w.WriteIndex(); err != nil && !(blockWritten || errors.Is(err, tsm1.ErrNoValues)) { + return 0, err + } else { + return w.Size(), nil + } + }() + if err != nil { return err } - // Replace original file with new file. - return os.Rename(outputPath, path) + if size > 0 { + // Replace original file with new file. + return os.Rename(outputPath, path) + } else { + // Empty TSM file of size == 0, remove it + if err = os.RemoveAll(path); err != nil { + err = fmt.Errorf("cannot remove %s: %w", path, err) + } + if err2 := os.RemoveAll(outputPath); err2 != nil && err == nil { + return fmt.Errorf("cannot remove temporary file %s: %w", outputPath, err) + } else { + return err + } + } } func (cmd *Command) printUsage() {