diff --git a/CHANGELOG.md b/CHANGELOG.md index fbf510f47b5..5d771ee7123 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco - [#7396](https://github.com/influxdata/influxdb/issues/7396): CLI should use spaces for alignment, not tabs. - [#7615](https://github.com/influxdata/influxdb/issues/7615): Reject invalid subscription urls @allenpetersen +- [#7741](https://github.com/influxdata/influxdb/pull/7741): Fix string quoting and significantly improve performance of `influx_inspect export`. ## v1.1.1 [unreleased] diff --git a/cmd/influx_inspect/export/export.go b/cmd/influx_inspect/export/export.go index 012fcc222d0..9a928094e58 100644 --- a/cmd/influx_inspect/export/export.go +++ b/cmd/influx_inspect/export/export.go @@ -1,16 +1,16 @@ package export import ( + "bufio" "compress/gzip" "flag" "fmt" "io" - "log" "math" "os" - "path" "path/filepath" "sort" + "strconv" "strings" "sync" "time" @@ -62,8 +62,8 @@ func (cmd *Command) Run(args ...string) error { fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to") fs.StringVar(&cmd.database, "database", "", "Optional: the database to export") fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)") - fs.StringVar(&start, "start", "", "Optional: the start time to export") - fs.StringVar(&end, "end", "", "Optional: the end time to export") + fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)") + fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)") fs.BoolVar(&cmd.compress, "compress", false, "Compress the output") fs.SetOutput(cmd.Stdout) @@ -106,7 +106,6 @@ func (cmd *Command) Run(args ...string) error { } func (cmd *Command) validate() error { - // validate args if cmd.retentionPolicy != "" && cmd.database == "" { return fmt.Errorf("must specify a db") } @@ -123,94 +122,90 @@ func (cmd *Command) export() error { if err := cmd.walkWALFiles(); err != nil { return err } - return cmd.writeFiles() + return cmd.write() } func (cmd *Command) walkTSMFiles() error { - err := filepath.Walk(cmd.dataDir, func(dir string, f os.FileInfo, err error) error { + return filepath.Walk(cmd.dataDir, func(path string, f os.FileInfo, err error) error { if err != nil { return err } // check to see if this is a tsm file - ext := fmt.Sprintf(".%s", tsm1.TSMFileExtension) - if filepath.Ext(dir) != ext { + if filepath.Ext(path) != "."+tsm1.TSMFileExtension { return nil } - relPath, _ := filepath.Rel(cmd.dataDir, dir) + relPath, err := filepath.Rel(cmd.dataDir, path) + if err != nil { + return err + } dirs := strings.Split(relPath, string(byte(os.PathSeparator))) if len(dirs) < 2 { - return fmt.Errorf("invalid directory structure for %s", dir) + return fmt.Errorf("invalid directory structure for %s", path) } if dirs[0] == cmd.database || cmd.database == "" { if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { key := filepath.Join(dirs[0], dirs[1]) - files := cmd.tsmFiles[key] - if files == nil { - files = []string{} - } cmd.manifest[key] = struct{}{} - cmd.tsmFiles[key] = append(files, dir) + cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path) } } return nil }) - if err != nil { - return err - } - return nil } func (cmd *Command) walkWALFiles() error { - err := filepath.Walk(cmd.walDir, func(dir string, f os.FileInfo, err error) error { + return filepath.Walk(cmd.walDir, func(path string, f os.FileInfo, err error) error { if err != nil { return err } // check to see if this is a wal file - prefix := tsm1.WALFilePrefix - ext := fmt.Sprintf(".%s", tsm1.WALFileExtension) - _, fileName := path.Split(dir) - if filepath.Ext(dir) != ext || !strings.HasPrefix(fileName, prefix) { + fileName := filepath.Base(path) + if filepath.Ext(path) != "."+tsm1.WALFileExtension || !strings.HasPrefix(fileName, tsm1.WALFilePrefix) { return nil } - relPath, _ := filepath.Rel(cmd.walDir, dir) + relPath, err := filepath.Rel(cmd.walDir, path) + if err != nil { + return err + } dirs := strings.Split(relPath, string(byte(os.PathSeparator))) if len(dirs) < 2 { - return fmt.Errorf("invalid directory structure for %s", dir) + return fmt.Errorf("invalid directory structure for %s", path) } if dirs[0] == cmd.database || cmd.database == "" { if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { key := filepath.Join(dirs[0], dirs[1]) - files := cmd.walFiles[key] - if files == nil { - files = []string{} - } cmd.manifest[key] = struct{}{} - cmd.walFiles[key] = append(files, dir) + cmd.walFiles[key] = append(cmd.walFiles[key], path) } } return nil }) - if err != nil { - return err - } - return nil } -func (cmd *Command) writeFiles() error { +func (cmd *Command) write() error { // open our output file and create an output buffer - var w io.WriteCloser - w, err := os.Create(cmd.out) + f, err := os.Create(cmd.out) if err != nil { return err } - defer w.Close() + defer f.Close() + + // Because calling (*os.File).Write is relatively expensive, + // and we don't *need* to sync to disk on every written line of export, + // use a sized buffered writer so that we only sync the file every megabyte. + bw := bufio.NewWriterSize(f, 1024*1024) + defer bw.Flush() + + var w io.Writer = bw + if cmd.compress { - w = gzip.NewWriter(w) - defer w.Close() + gzw := gzip.NewWriter(w) + defer gzw.Close() + w = gzw } s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339) @@ -219,175 +214,191 @@ func (cmd *Command) writeFiles() error { // Write out all the DDL fmt.Fprintln(w, "# DDL") for key := range cmd.manifest { - keys := strings.Split(key, string(byte(os.PathSeparator))) + keys := strings.Split(key, string(os.PathSeparator)) db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1]) fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp) } fmt.Fprintln(w, "# DML") for key := range cmd.manifest { - keys := strings.Split(key, string(byte(os.PathSeparator))) + keys := strings.Split(key, string(os.PathSeparator)) fmt.Fprintf(w, "# CONTEXT-DATABASE:%s\n", keys[0]) fmt.Fprintf(w, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1]) if files, ok := cmd.tsmFiles[key]; ok { - fmt.Printf("writing out tsm file data for %s...", key) + fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key) if err := cmd.writeTsmFiles(w, files); err != nil { return err } - fmt.Println("complete.") + fmt.Fprintln(cmd.Stdout, "complete.") } if _, ok := cmd.walFiles[key]; ok { - fmt.Printf("writing out wal file data for %s...", key) + fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key) if err := cmd.writeWALFiles(w, cmd.walFiles[key], key); err != nil { return err } - fmt.Println("complete.") + fmt.Fprintln(cmd.Stdout, "complete.") } } return nil } -func (cmd *Command) writeTsmFiles(w io.WriteCloser, files []string) error { +func (cmd *Command) writeTsmFiles(w io.Writer, files []string) error { fmt.Fprintln(w, "# writing tsm data") // we need to make sure we write the same order that the files were written sort.Strings(files) - // use a function here to close the files in the defers and not let them accumulate in the loop - write := func(f string) error { - file, err := os.OpenFile(f, os.O_RDONLY, 0600) - if err != nil { - return fmt.Errorf("%v", err) - } - defer file.Close() - reader, err := tsm1.NewTSMReader(file) - if err != nil { - log.Printf("unable to read %s, skipping\n", f) - return nil + for _, f := range files { + if err := cmd.exportTSMFile(f, w); err != nil { + return err } - defer reader.Close() + } - if sgStart, sgEnd := reader.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime { - return nil - } + return nil +} - for i := 0; i < reader.KeyCount(); i++ { - var pairs string - key, typ := reader.KeyAt(i) - values, _ := reader.ReadAll(string(key)) - measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) - // measurements are stored escaped, field names are not - field = escape.String(field) - - for _, value := range values { - if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) { - continue - } +func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error { + f, err := os.Open(tsmFilePath) + if err != nil { + return err + } + defer f.Close() - switch typ { - case tsm1.BlockFloat64: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case tsm1.BlockInteger: - pairs = field + "=" + fmt.Sprintf("%vi", value.Value()) - case tsm1.BlockBoolean: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case tsm1.BlockString: - pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value()))) - default: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - } + r, err := tsm1.NewTSMReader(f) + if err != nil { + fmt.Fprintf(cmd.Stderr, "unable to read %s, skipping: %s\n", tsmFilePath, err.Error()) + return nil + } + defer r.Close() - fmt.Fprintln(w, string(measurement), pairs, value.UnixNano()) - } - } + if sgStart, sgEnd := r.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime { return nil } - for _, f := range files { - if err := write(f); err != nil { + for i := 0; i < r.KeyCount(); i++ { + key, _ := r.KeyAt(i) + values, err := r.ReadAll(string(key)) + if err != nil { + fmt.Fprintf(cmd.Stderr, "unable to read key %q in %s, skipping: %s\n", string(key), tsmFilePath, err.Error()) + continue + } + measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) + field = escape.String(field) + + if err := cmd.writeValues(w, measurement, field, values); err != nil { + // An error from writeValues indicates an IO error, which should be returned. return err } } - return nil } -func (cmd *Command) writeWALFiles(w io.WriteCloser, files []string, key string) error { +func (cmd *Command) writeWALFiles(w io.Writer, files []string, key string) error { fmt.Fprintln(w, "# writing wal data") // we need to make sure we write the same order that the wal received the data sort.Strings(files) var once sync.Once - warn := func() { - msg := fmt.Sprintf(`WARNING: detected deletes in wal file. - Some series for %q may be brought back by replaying this data. - To resolve, you can either let the shard snapshot prior to exporting the data - or manually editing the exported file. - `, key) - fmt.Fprintln(cmd.Stderr, msg) + warnDelete := func() { + once.Do(func() { + msg := fmt.Sprintf(`WARNING: detected deletes in wal file. +Some series for %q may be brought back by replaying this data. +To resolve, you can either let the shard snapshot prior to exporting the data +or manually editing the exported file. + `, key) + fmt.Fprintln(cmd.Stderr, msg) + }) + } + + for _, f := range files { + if err := cmd.exportWALFile(f, w, warnDelete); err != nil { + return err + } + } + + return nil +} + +// exportWAL reads every WAL entry from r and exports it to w. +func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete func()) error { + f, err := os.Open(walFilePath) + if err != nil { + return err } + defer f.Close() + + r := tsm1.NewWALSegmentReader(f) + defer r.Close() - // use a function here to close the files in the defers and not let them accumulate in the loop - write := func(f string) error { - file, err := os.OpenFile(f, os.O_RDONLY, 0600) + for r.Next() { + entry, err := r.Read() if err != nil { - return fmt.Errorf("%v", err) + n := r.Count() + fmt.Fprintf(cmd.Stderr, "file %s corrupt at position %d", walFilePath, n) + break } - defer file.Close() - - reader := tsm1.NewWALSegmentReader(file) - defer reader.Close() - for reader.Next() { - entry, err := reader.Read() - if err != nil { - n := reader.Count() - fmt.Fprintf(os.Stderr, "file %s corrupt at position %d", file.Name(), n) - break - } - switch t := entry.(type) { - case *tsm1.DeleteWALEntry: - once.Do(warn) - continue - case *tsm1.DeleteRangeWALEntry: - once.Do(warn) - continue - case *tsm1.WriteWALEntry: - var pairs string - - for key, values := range t.Values { - measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) - // measurements are stored escaped, field names are not - field = escape.String(field) - - for _, value := range values { - if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) { - continue - } - - switch value.Value().(type) { - case float64: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case int64: - pairs = field + "=" + fmt.Sprintf("%vi", value.Value()) - case bool: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case string: - pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value()))) - default: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - } - fmt.Fprintln(w, string(measurement), pairs, value.UnixNano()) - } + switch t := entry.(type) { + case *tsm1.DeleteWALEntry, *tsm1.DeleteRangeWALEntry: + warnDelete() + continue + case *tsm1.WriteWALEntry: + for key, values := range t.Values { + measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) + // measurements are stored escaped, field names are not + field = escape.String(field) + + if err := cmd.writeValues(w, measurement, field, values); err != nil { + // An error from writeValues indicates an IO error, which should be returned. + return err } } } - return nil } + return nil +} - for _, f := range files { - if err := write(f); err != nil { +// writeValues writes every value in values to w, using the given series key and field name. +// If any call to w.Write fails, that error is returned. +func (cmd *Command) writeValues(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error { + buf := []byte(string(seriesKey) + " " + field + "=") + prefixLen := len(buf) + + for _, value := range values { + ts := value.UnixNano() + if (ts < cmd.startTime) || (ts > cmd.endTime) { + continue + } + + // Re-slice buf to be " =". + buf = buf[:prefixLen] + + // Append the correct representation of the value. + switch v := value.Value().(type) { + case float64: + buf = strconv.AppendFloat(buf, v, 'g', -1, 64) + case int64: + buf = strconv.AppendInt(buf, v, 10) + buf = append(buf, 'i') + case bool: + buf = strconv.AppendBool(buf, v) + case string: + buf = append(buf, '"') + buf = append(buf, models.EscapeStringField(v)...) + buf = append(buf, '"') + default: + // This shouldn't be possible, but we'll format it anyway. + buf = append(buf, fmt.Sprintf("%v", v)...) + } + + // Now buf has " =". + // Append the timestamp and a newline, then write it. + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, ts, 10) + buf = append(buf, '\n') + if _, err := w.Write(buf); err != nil { + // Underlying IO error needs to be returned. return err } } diff --git a/cmd/influx_inspect/export/export_test.go b/cmd/influx_inspect/export/export_test.go index 4786944d08c..d1a6598dcb8 100644 --- a/cmd/influx_inspect/export/export_test.go +++ b/cmd/influx_inspect/export/export_test.go @@ -1,3 +1,337 @@ -package export_test +package export -// #TODO: write some tests +import ( + "bytes" + "fmt" + "io/ioutil" + "math" + "math/rand" + "os" + "sort" + "strconv" + "strings" + "testing" + + "github.com/golang/snappy" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +type corpus map[string][]tsm1.Value + +var ( + basicCorpus = corpus{ + tsm1.SeriesFieldKey("floats,k=f", "f"): []tsm1.Value{ + tsm1.NewValue(1, float64(1.5)), + tsm1.NewValue(2, float64(3)), + }, + tsm1.SeriesFieldKey("ints,k=i", "i"): []tsm1.Value{ + tsm1.NewValue(10, int64(15)), + tsm1.NewValue(20, int64(30)), + }, + tsm1.SeriesFieldKey("bools,k=b", "b"): []tsm1.Value{ + tsm1.NewValue(100, true), + tsm1.NewValue(200, false), + }, + tsm1.SeriesFieldKey("strings,k=s", "s"): []tsm1.Value{ + tsm1.NewValue(1000, "1k"), + tsm1.NewValue(2000, "2k"), + }, + } + + basicCorpusExpLines = []string{ + "floats,k=f f=1.5 1", + "floats,k=f f=3 2", + "ints,k=i i=15i 10", + "ints,k=i i=30i 20", + "bools,k=b b=true 100", + "bools,k=b b=false 200", + `strings,k=s s="1k" 1000`, + `strings,k=s s="2k" 2000`, + } + + escapeStringCorpus = corpus{ + tsm1.SeriesFieldKey("t", "s"): []tsm1.Value{ + tsm1.NewValue(1, `1. "quotes"`), + tsm1.NewValue(2, `2. back\slash`), + tsm1.NewValue(3, `3. bs\q"`), + }, + } + + escCorpusExpLines = []string{ + `t s="1. \"quotes\"" 1`, + `t s="2. back\\slash" 2`, + `t s="3. bs\\q\"" 3`, + } +) + +func Test_exportWALFile(t *testing.T) { + for _, c := range []struct { + corpus corpus + lines []string + }{ + {corpus: basicCorpus, lines: basicCorpusExpLines}, + {corpus: escapeStringCorpus, lines: escCorpusExpLines}, + } { + walFile := writeCorpusToWALFile(c.corpus) + defer os.Remove(walFile.Name()) + + var out bytes.Buffer + if err := newCommand().exportWALFile(walFile.Name(), &out, func() {}); err != nil { + t.Fatal(err) + } + + lines := strings.Split(out.String(), "\n") + for _, exp := range c.lines { + found := false + for _, l := range lines { + if exp == l { + found = true + break + } + } + + if !found { + t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String()) + } + } + } +} + +func Test_exportTSMFile(t *testing.T) { + for _, c := range []struct { + corpus corpus + lines []string + }{ + {corpus: basicCorpus, lines: basicCorpusExpLines}, + {corpus: escapeStringCorpus, lines: escCorpusExpLines}, + } { + tsmFile := writeCorpusToTSMFile(c.corpus) + defer os.Remove(tsmFile.Name()) + + var out bytes.Buffer + if err := newCommand().exportTSMFile(tsmFile.Name(), &out); err != nil { + t.Fatal(err) + } + + lines := strings.Split(out.String(), "\n") + for _, exp := range c.lines { + found := false + for _, l := range lines { + if exp == l { + found = true + break + } + } + + if !found { + t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String()) + } + } + } +} + +var sink interface{} + +func benchmarkExportTSM(c corpus, b *testing.B) { + // Garbage collection is relatively likely to happen during export, so track allocations. + b.ReportAllocs() + + f := writeCorpusToTSMFile(c) + defer os.Remove(f.Name()) + + cmd := newCommand() + var out bytes.Buffer + b.ResetTimer() + b.StartTimer() + for i := 0; i < b.N; i++ { + if err := cmd.exportTSMFile(f.Name(), &out); err != nil { + b.Fatal(err) + } + + sink = out.Bytes() + out.Reset() + } +} + +func BenchmarkExportTSMFloats_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeFloatsCorpus(100, 250), b) +} + +func BenchmarkExportTSMInts_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeIntsCorpus(100, 250), b) +} + +func BenchmarkExportTSMBools_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeBoolsCorpus(100, 250), b) +} + +func BenchmarkExportTSMStrings_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeStringsCorpus(100, 250), b) +} + +func benchmarkExportWAL(c corpus, b *testing.B) { + // Garbage collection is relatively likely to happen during export, so track allocations. + b.ReportAllocs() + + f := writeCorpusToWALFile(c) + defer os.Remove(f.Name()) + + cmd := newCommand() + var out bytes.Buffer + b.ResetTimer() + b.StartTimer() + for i := 0; i < b.N; i++ { + if err := cmd.exportWALFile(f.Name(), &out, func() {}); err != nil { + b.Fatal(err) + } + + sink = out.Bytes() + out.Reset() + } +} + +func BenchmarkExportWALFloats_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeFloatsCorpus(100, 250), b) +} + +func BenchmarkExportWALInts_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeIntsCorpus(100, 250), b) +} + +func BenchmarkExportWALBools_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeBoolsCorpus(100, 250), b) +} + +func BenchmarkExportWALStrings_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeStringsCorpus(100, 250), b) +} + +// newCommand returns a command that discards its output and that accepts all timestamps. +func newCommand() *Command { + return &Command{ + Stderr: ioutil.Discard, + Stdout: ioutil.Discard, + startTime: math.MinInt64, + endTime: math.MaxInt64, + } +} + +// makeCorpus returns a new corpus filled with values generated by fn. +// The RNG passed to fn is seeded with numSeries * numValuesPerSeries, for predictable output. +func makeCorpus(numSeries, numValuesPerSeries int, fn func(*rand.Rand) interface{}) corpus { + rng := rand.New(rand.NewSource(int64(numSeries) * int64(numValuesPerSeries))) + var unixNano int64 + corpus := make(corpus, numSeries) + for i := 0; i < numSeries; i++ { + vals := make([]tsm1.Value, numValuesPerSeries) + for j := 0; j < numValuesPerSeries; j++ { + vals[j] = tsm1.NewValue(unixNano, fn(rng)) + unixNano++ + } + + k := fmt.Sprintf("m,t=%d", i) + corpus[tsm1.SeriesFieldKey(k, "x")] = vals + } + + return corpus +} + +func makeFloatsCorpus(numSeries, numFloatsPerSeries int) corpus { + return makeCorpus(numSeries, numFloatsPerSeries, func(rng *rand.Rand) interface{} { + return rng.Float64() + }) +} + +func makeIntsCorpus(numSeries, numIntsPerSeries int) corpus { + return makeCorpus(numSeries, numIntsPerSeries, func(rng *rand.Rand) interface{} { + // This will only return positive integers. That's probably okay. + return rng.Int63() + }) +} + +func makeBoolsCorpus(numSeries, numBoolsPerSeries int) corpus { + return makeCorpus(numSeries, numBoolsPerSeries, func(rng *rand.Rand) interface{} { + return rand.Int63n(2) == 1 + }) +} + +func makeStringsCorpus(numSeries, numStringsPerSeries int) corpus { + return makeCorpus(numSeries, numStringsPerSeries, func(rng *rand.Rand) interface{} { + // The string will randomly have 2-6 parts + parts := make([]string, rand.Intn(4)+2) + + for i := range parts { + // Each part is a random base36-encoded number + parts[i] = strconv.FormatInt(rand.Int63(), 36) + } + + // Join the individual parts with underscores. + return strings.Join(parts, "_") + }) +} + +// writeCorpusToWALFile writes the given corpus as a WAL file, and returns a handle to that file. +// It is the caller's responsibility to remove the returned temp file. +// writeCorpusToWALFile will panic on any error that occurs. +func writeCorpusToWALFile(c corpus) *os.File { + walFile, err := ioutil.TempFile("", "export_test_corpus_wal") + if err != nil { + panic(err) + } + + e := &tsm1.WriteWALEntry{Values: c} + b, err := e.Encode(nil) + if err != nil { + panic(err) + } + + w := tsm1.NewWALSegmentWriter(walFile) + if err := w.Write(e.Type(), snappy.Encode(nil, b)); err != nil { + panic(err) + } + + // (*tsm1.WALSegmentWriter).sync isn't exported, but it only Syncs the file anyway. + if err := walFile.Sync(); err != nil { + panic(err) + } + + return walFile +} + +// writeCorpusToTSMFile writes the given corpus as a TSM file, and returns a handle to that file. +// It is the caller's responsibility to remove the returned temp file. +// writeCorpusToTSMFile will panic on any error that occurs. +func writeCorpusToTSMFile(c corpus) *os.File { + tsmFile, err := ioutil.TempFile("", "export_test_corpus_tsm") + if err != nil { + panic(err) + } + + w, err := tsm1.NewTSMWriter(tsmFile) + if err != nil { + panic(err) + } + + // Write the series in alphabetical order so that each test run is comparable, + // given an identical corpus. + keys := make([]string, 0, len(c)) + for k := range c { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + if err := w.Write(k, c[k]); err != nil { + panic(err) + } + } + + if err := w.WriteIndex(); err != nil { + panic(err) + } + + if err := w.Close(); err != nil { + panic(err) + } + + return tsmFile +} diff --git a/models/points.go b/models/points.go index 6d083f7e7d1..24e4637734f 100644 --- a/models/points.go +++ b/models/points.go @@ -1095,34 +1095,17 @@ func unescapeTag(in []byte) []byte { return in } +// escapeStringFieldReplacer replaces double quotes and backslashes +// with the same character preceded by a backslash. +// As of Go 1.7 this benchmarked better in allocations and CPU time +// compared to iterating through a string byte-by-byte and appending to a new byte slice, +// calling strings.Replace twice, and better than (*Regex).ReplaceAllString. +var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`) + // EscapeStringField returns a copy of in with any double quotes or // backslashes with escaped values func EscapeStringField(in string) string { - var out []byte - i := 0 - for { - if i >= len(in) { - break - } - // escape double-quotes - if in[i] == '\\' { - out = append(out, '\\') - out = append(out, '\\') - i++ - continue - } - // escape double-quotes - if in[i] == '"' { - out = append(out, '\\') - out = append(out, '"') - i++ - continue - } - out = append(out, in[i]) - i++ - - } - return string(out) + return escapeStringFieldReplacer.Replace(in) } // unescapeStringField returns a copy of in with any escaped double-quotes diff --git a/models/points_test.go b/models/points_test.go index 0652964696c..d1db8a4ee44 100644 --- a/models/points_test.go +++ b/models/points_test.go @@ -26,6 +26,8 @@ var ( } maxFloat64 = strconv.FormatFloat(math.MaxFloat64, 'f', 1, 64) minFloat64 = strconv.FormatFloat(-math.MaxFloat64, 'f', 1, 64) + + sink interface{} ) func TestMarshal(t *testing.T) { @@ -2168,3 +2170,61 @@ func TestPoint_FieldIterator_Delete_Twice(t *testing.T) { t.Fatalf("Delete failed, got %#v, exp %#v", got, exp) } } + +func TestEscapeStringField(t *testing.T) { + cases := []struct { + in string + expOut string + }{ + {in: "abcdefg", expOut: "abcdefg"}, + {in: `one double quote " .`, expOut: `one double quote \" .`}, + {in: `quote " then backslash \ .`, expOut: `quote \" then backslash \\ .`}, + {in: `backslash \ then quote " .`, expOut: `backslash \\ then quote \" .`}, + } + + for _, c := range cases { + // Unescapes as expected. + got := models.EscapeStringField(c.in) + if got != c.expOut { + t.Errorf("unexpected result from EscapeStringField(%s)\ngot [%s]\nexp [%s]\n", c.in, got, c.expOut) + continue + } + + pointLine := fmt.Sprintf(`t s="%s"`, got) + test(t, pointLine, NewTestPoint( + "t", + models.NewTags(nil), + models.Fields{"s": c.in}, + time.Unix(0, 0), + )) + } +} + +func BenchmarkEscapeStringField_Plain(b *testing.B) { + s := "nothing special" + for i := 0; i < b.N; i++ { + sink = models.EscapeStringField(s) + } +} + +func BenchmarkEscapeString_Quotes(b *testing.B) { + s := `Hello, "world"` + for i := 0; i < b.N; i++ { + sink = models.EscapeStringField(s) + } +} + +func BenchmarkEscapeString_Backslashes(b *testing.B) { + s := `C:\windows\system32` + for i := 0; i < b.N; i++ { + sink = models.EscapeStringField(s) + } +} + +func BenchmarkEscapeString_QuotesAndBackslashes(b *testing.B) { + s1 := `a quote " then backslash \ .` + s2 := `a backslash \ then quote " .` + for i := 0; i < b.N; i++ { + sink = [...]string{models.EscapeStringField(s1), models.EscapeStringField(s2)} + } +}