From eaec514ca0022ed0776554e2a267d8249fa254ea Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 26 Dec 2015 17:13:54 -0500 Subject: [PATCH] b*1 to tsm1 shard converter --- .gitignore | 4 + build.py | 1 + cmd/influx_tsm/README.md | 50 ++++ cmd/influx_tsm/b1/reader.go | 238 +++++++++++++++++ cmd/influx_tsm/bz1/reader.go | 342 ++++++++++++++++++++++++ cmd/influx_tsm/converter.go | 89 ++++++ cmd/influx_tsm/main.go | 337 +++++++++++++++++++++++ cmd/influx_tsm/tsdb/codec.go | 293 ++++++++++++++++++++ cmd/influx_tsm/tsdb/database.go | 231 ++++++++++++++++ cmd/influx_tsm/tsdb/internal/meta.pb.go | 122 +++++++++ cmd/influx_tsm/tsdb/types.go | 111 ++++++++ cmd/influx_tsm/tsdb/values.go | 139 ++++++++++ package.sh | 1 + 13 files changed, 1958 insertions(+) create mode 100644 cmd/influx_tsm/README.md create mode 100644 cmd/influx_tsm/b1/reader.go create mode 100644 cmd/influx_tsm/bz1/reader.go create mode 100644 cmd/influx_tsm/converter.go create mode 100644 cmd/influx_tsm/main.go create mode 100644 cmd/influx_tsm/tsdb/codec.go create mode 100644 cmd/influx_tsm/tsdb/database.go create mode 100644 cmd/influx_tsm/tsdb/internal/meta.pb.go create mode 100644 cmd/influx_tsm/tsdb/types.go create mode 100644 cmd/influx_tsm/tsdb/values.go diff --git a/.gitignore b/.gitignore index 50c31e055c8..8c550039725 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,10 @@ cmd/influxd/version.go # executables +influx_tsm +**/influx_tsm +!**/influx_tsm/ + influx_stress **/influx_stress !**/influx_stress/ diff --git a/build.py b/build.py index f360fd0b97d..6ac1bb3caea 100755 --- a/build.py +++ b/build.py @@ -78,6 +78,7 @@ 'influxd' : './cmd/influxd/main.go', 'influx_stress' : './cmd/influx_stress/influx_stress.go', 'influx_inspect' : './cmd/influx_inspect/*.go', + 'influx_tsm' : './cmd/influx_tsm/main.go', } supported_builds = { diff --git a/cmd/influx_tsm/README.md b/cmd/influx_tsm/README.md new file mode 100644 index 00000000000..45d0991a2d3 --- /dev/null +++ b/cmd/influx_tsm/README.md @@ -0,0 +1,50 @@ +# Converting b1 and bz1 shards to tsm1 +`influx_tsm` is a tool for converting b1 and bz1 shards to tsm1 format. Converting shards to tsm1 format results in a very significant reduction in disk usage, and significantly improved write-throughput, when writing data into those shards. + +Conversion can be controlled on a database-by-database basis. By default a database is backed up before it is converted, allowing you to roll back any changes. Because of the backup process, ensure the host system has at least as much free disk space as the disk space consumed by the _data_ directory of your InfluxDB system. + +The tool automatically ignores tsm1 shards, and can be run idempotently on any database. + +Conversion is an offline process, and the InfluxDB system must be stopped during conversion. However the conversion process reads and writes shards directly on disk and should be fast. + +## Steps +Follow these steps to perform a conversion. + +* Identify the databases you wish to convert. You can convert one or more databases at a time. By default all databases are converted. +* Decide on parallel operation. By default the conversion operation peforms each operation in a serial manner. This minimizes load on the host system performing the conversion, but also takes the most time. If you wish to minimize the time conversion takes, enable parallel mode. Conversion will then perform as many operations as possible in parallel, but the process may place significant load on the host system (CPU, disk, and RAM, usage will all increase). +* Stop all write-traffic to your InfluxDB system. +* Restart the InfluxDB service and wait until all WAL data is flushed to disk -- this has completed when the system responds to queries. This is to ensure all data is present in shards. +* Stop the InfluxDB service. It should not be restarted until conversion is complete. +* Run conversion tool. +* Restart node and ensure data looks correct. +* If everything looks OK, you may then wish to remove or archive the backed-up databases. This is not required for a correctly functioning InfluxDB system, since the backed-up databases will be simply ignored by the system. Backed-up databases are suffixed with the extension `.bak`. +* Restart write traffic. + +## Example session +Below is an example session, showing a database being converted. + +``` +$ influx_tsm -parallel ~/.influxdb/data/ + +b1 and bz1 shard conversion. +----------------------------------- +Data directory is: /home/bob/.influxdb/data/ +Databases specified: all +Parallel mode enabled: yes +Database backups enabled: yes +1 shard(s) detected, 1 non-TSM shards detected. + +Database Retention Path Engine Size +_internal monitor /home/bob/.influxdb/data/_internal/monitor/1 bz1 262144 + +These shards will be converted. Proceed? y/N: y +Conversion starting.... +Database _internal backed up. +Conversion of /home/bob/.influxdb/data/_internal/monitor/1 successful (27.485037ms) + +$ rm -r /home/bob/.influxdb/data/_internal.bak # After confirming converted data looks good. +``` +Note that the tool first lists the shards that will be converted, before asking for confirmation. You can abort the conversion process at this step if you just wish to see what would be converted, or if the list of shards does not look correct. + +## Rolling back a conversion +If you wish to rollback a conversion check the databases in your _data_ directory. For every backed-up database remove the non-backed up version and then rename the backup so that it no longer has the extention `.bak`. Then restart your InfluxDB system. diff --git a/cmd/influx_tsm/b1/reader.go b/cmd/influx_tsm/b1/reader.go new file mode 100644 index 00000000000..03a43992d07 --- /dev/null +++ b/cmd/influx_tsm/b1/reader.go @@ -0,0 +1,238 @@ +package b1 + +import ( + "encoding/binary" + "sort" + "time" + + "github.com/boltdb/bolt" + "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb" + "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +const DefaultChunkSize = 1000 + +// Reader is used to read all data from a b1 shard. +type Reader struct { + path string + db *bolt.DB + tx *bolt.Tx + + cursors []*cursor + currCursor int + + keyBuf string + valuesBuf []tsm1.Value + + series map[string]*tsdb.Series + fields map[string]*tsdb.MeasurementFields + codecs map[string]*tsdb.FieldCodec + + ChunkSize int +} + +// NewReader returns a reader for the b1 shard at path. +func NewReader(path string) *Reader { + return &Reader{ + path: path, + series: make(map[string]*tsdb.Series), + fields: make(map[string]*tsdb.MeasurementFields), + codecs: make(map[string]*tsdb.FieldCodec), + } +} + +// Open opens the reader. +func (r *Reader) Open() error { + // Open underlying storage. + db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return err + } + r.db = db + + // Load fields. + if err := r.db.View(func(tx *bolt.Tx) error { + meta := tx.Bucket([]byte("fields")) + c := meta.Cursor() + + for k, v := c.First(); k != nil; k, v = c.Next() { + mf := &tsdb.MeasurementFields{} + if err := mf.UnmarshalBinary(v); err != nil { + return err + } + r.fields[string(k)] = mf + r.codecs[string(k)] = tsdb.NewFieldCodec(mf.Fields) + } + return nil + }); err != nil { + return err + } + + // Load series + if err := r.db.View(func(tx *bolt.Tx) error { + meta := tx.Bucket([]byte("series")) + c := meta.Cursor() + + for k, v := c.First(); k != nil; k, v = c.Next() { + series := &tsdb.Series{} + if err := series.UnmarshalBinary(v); err != nil { + return err + } + r.series[string(k)] = series + } + return nil + }); err != nil { + return err + } + + // Create cursor for each field of each series. + r.tx, err = r.db.Begin(false) + if err != nil { + return err + } + for s, _ := range r.series { + if err != nil { + return err + } + + measurement := tsdb.MeasurementFromSeriesKey(s) + for _, f := range r.fields[tsdb.MeasurementFromSeriesKey(s)].Fields { + c := newCursor(r.tx, s, f.Name, r.codecs[measurement]) + c.SeekTo(0) + r.cursors = append(r.cursors, c) + } + } + sort.Sort(cursors(r.cursors)) + + return nil +} + +// Next returns whether any data remains to be read. It must be called before +// the next call to Read(). +func (r *Reader) Next() bool { + for { + if r.currCursor == len(r.cursors) { + // All cursors drained. No more data remains. + return false + } + + cc := r.cursors[r.currCursor] + k, v := cc.Next() + if k == -1 { + // Go to next cursor and try again. + r.currCursor++ + if len(r.valuesBuf) == 0 { + // The previous cursor had no data. Instead of returning + // just go immediately to the next cursor. + continue + } + // There is some data available. Indicate that it should be read. + return true + } + + r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field) + r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v)) + if len(r.valuesBuf) == r.ChunkSize { + return true + } + } + +} + +// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is +// emitted completely for every field, in every series, before the next field is processed. +// Data from Read() adheres to the requirements for writing to tsm1 shards +func (r *Reader) Read() (string, []tsm1.Value, error) { + defer func() { + r.valuesBuf = nil + }() + + return r.keyBuf, r.valuesBuf, nil +} + +// Close closes the reader. +func (r *Reader) Close() error { + return r.tx.Rollback() +} + +// cursor provides ordered iteration across a series. +type cursor struct { + // Bolt cursor and readahead buffer. + cursor *bolt.Cursor + keyBuf int64 + valBuf interface{} + + series string + field string + dec *tsdb.FieldCodec +} + +// Cursor returns an iterator for a key over a single field. +func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor { + cur := &cursor{ + keyBuf: -2, + series: series, + field: field, + dec: dec, + } + + // Retrieve series bucket. + b := tx.Bucket([]byte(series)) + if b != nil { + cur.cursor = b.Cursor() + } + + return cur +} + +// Seek moves the cursor to a position. +func (c cursor) SeekTo(seek int64) { + k, v := c.cursor.Seek(u64tob(uint64(seek))) + c.keyBuf, c.valBuf = tsdb.DecodeKeyValue(c.field, c.dec, k, v) +} + +// Next returns the next key/value pair from the cursor. +func (c *cursor) Next() (key int64, value interface{}) { + for { + k, v := func() (int64, interface{}) { + if c.keyBuf != -2 { + k, v := c.keyBuf, c.valBuf + c.keyBuf = -2 + return k, v + } + + k, v := c.cursor.Next() + if k == nil { + return -1, nil + } + return tsdb.DecodeKeyValue(c.field, c.dec, k, v) + }() + + if k != -1 && v == nil { + // There is a point in the series at the next timestamp, + // but not for this cursor's field. Go to the next point. + continue + } + return k, v + } +} + +// Sort b1 cursors in correct order for writing to TSM files. + +type cursors []*cursor + +func (a cursors) Len() int { return len(a) } +func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a cursors) Less(i, j int) bool { + return tsm1.SeriesFieldKey(a[i].series, a[i].field) < tsm1.SeriesFieldKey(a[j].series, a[j].field) +} + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice to a uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/cmd/influx_tsm/bz1/reader.go b/cmd/influx_tsm/bz1/reader.go new file mode 100644 index 00000000000..e088cb73100 --- /dev/null +++ b/cmd/influx_tsm/bz1/reader.go @@ -0,0 +1,342 @@ +package bz1 + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "fmt" + "sort" + "time" + + "github.com/boltdb/bolt" + "github.com/golang/snappy" + "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb" + tsm "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +const DefaultChunkSize = 1000 + +// Reader is used to read all data from a bz1 shard. +type Reader struct { + path string + db *bolt.DB + tx *bolt.Tx + + cursors []*cursor + currCursor int + + keyBuf string + valuesBuf []tsm.Value + + series map[string]*tsdb.Series + fields map[string]*tsdb.MeasurementFields + codecs map[string]*tsdb.FieldCodec + + ChunkSize int +} + +// NewReader returns a reader for the bz1 shard at path. +func NewReader(path string) *Reader { + return &Reader{ + path: path, + series: make(map[string]*tsdb.Series), + fields: make(map[string]*tsdb.MeasurementFields), + codecs: make(map[string]*tsdb.FieldCodec), + ChunkSize: DefaultChunkSize, + } +} + +// Open opens the reader. +func (r *Reader) Open() error { + // Open underlying storage. + db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return err + } + r.db = db + + if err := r.db.View(func(tx *bolt.Tx) error { + var data []byte + + meta := tx.Bucket([]byte("meta")) + if meta == nil { + // No data in this shard. + return nil + } + buf := meta.Get([]byte("series")) + if buf == nil { + // No data in this shard. + return nil + } + data, err = snappy.Decode(nil, buf) + if err != nil { + return err + } + if err := json.Unmarshal(data, &r.series); err != nil { + return err + } + + buf = meta.Get([]byte("fields")) + if buf == nil { + // No data in this shard. + return nil + } + + data, err = snappy.Decode(nil, buf) + if err != nil { + return err + } + if err := json.Unmarshal(data, &r.fields); err != nil { + return err + } + return nil + }); err != nil { + return err + } + + // Build the codec for each measurement. + for k, v := range r.fields { + r.codecs[k] = tsdb.NewFieldCodec(v.Fields) + } + + // Create cursor for each field of each series. + r.tx, err = r.db.Begin(false) + if err != nil { + return err + } + for s, _ := range r.series { + if err != nil { + return err + } + + measurement := tsdb.MeasurementFromSeriesKey(s) + for _, f := range r.fields[tsdb.MeasurementFromSeriesKey(s)].Fields { + c := newCursor(r.tx, s, f.Name, r.codecs[measurement]) + c.SeekTo(0) + r.cursors = append(r.cursors, c) + } + } + sort.Sort(cursors(r.cursors)) + + return nil +} + +// Next returns whether there is any more data to be read. +func (r *Reader) Next() bool { + for { + if r.currCursor == len(r.cursors) { + // All cursors drained. No more data remains. + return false + } + + cc := r.cursors[r.currCursor] + k, v := cc.Next() + if k == -1 { + // Go to next cursor and try again. + r.currCursor++ + if len(r.valuesBuf) == 0 { + // The previous cursor had no data. Instead of returning + // just go immediately to the next cursor. + continue + } + // There is some data available. Indicate that it should be read. + return true + } + + r.keyBuf = tsm.SeriesFieldKey(cc.series, cc.field) + r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v)) + if len(r.valuesBuf) == r.ChunkSize { + return true + } + } +} + +// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is +// emitted completely for every field, in every series, before the next field is processed. +// Data from Read() adheres to the requirements for writing to tsm1 shards +func (r *Reader) Read() (string, []tsm.Value, error) { + defer func() { + r.valuesBuf = nil + }() + + return r.keyBuf, r.valuesBuf, nil +} + +// Close closes the reader. +func (r *Reader) Close() error { + return r.tx.Rollback() +} + +// cursor provides ordered iteration across a series. +type cursor struct { + cursor *bolt.Cursor + buf []byte // uncompressed buffer + off int // buffer offset + fieldIndices []int + index int + + series string + field string + dec *tsdb.FieldCodec + + keyBuf int64 + valBuf interface{} +} + +// newCursor returns an instance of a bz1 cursor. +func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor { + + // Retrieve points bucket. Ignore if there is no bucket. + b := tx.Bucket([]byte("points")).Bucket([]byte(series)) + if b == nil { + return nil + } + + return &cursor{ + cursor: b.Cursor(), + series: series, + field: field, + dec: dec, + keyBuf: -2, + } +} + +// Seek moves the cursor to a position. +func (c *cursor) SeekTo(seek int64) { + seekBytes := u64tob(uint64(seek)) + + // Move cursor to appropriate block and set to buffer. + k, v := c.cursor.Seek(seekBytes) + if v == nil { // get the last block, it might have this time + _, v = c.cursor.Last() + } else if seek < int64(btou64(k)) { // the seek key is less than this block, go back one and check + _, v = c.cursor.Prev() + + // if the previous block max time is less than the seek value, reset to where we were originally + if v == nil || seek > int64(btou64(v[0:8])) { + _, v = c.cursor.Seek(seekBytes) + } + } + c.setBuf(v) + + // Read current block up to seek position. + c.seekBuf(seekBytes) + + // Return current entry. + c.keyBuf, c.valBuf = c.read() +} + +// seekBuf moves the cursor to a position within the current buffer. +func (c *cursor) seekBuf(seek []byte) (key, value []byte) { + for { + // Slice off the current entry. + buf := c.buf[c.off:] + + // Exit if current entry's timestamp is on or after the seek. + if len(buf) == 0 { + return + } + + if bytes.Compare(buf[0:8], seek) != -1 { + return + } + + c.off += entryHeaderSize + entryDataSize(buf) + } +} + +// Next returns the next key/value pair from the cursor. If there are no values +// remaining, -1 is returned. +func (c *cursor) Next() (int64, interface{}) { + for { + k, v := func() (int64, interface{}) { + if c.keyBuf != -2 { + k, v := c.keyBuf, c.valBuf + c.keyBuf = -2 + return k, v + } + + // Ignore if there is no buffer. + if len(c.buf) == 0 { + return -1, nil + } + + // Move forward to next entry. + c.off += entryHeaderSize + entryDataSize(c.buf[c.off:]) + + // If no items left then read first item from next block. + if c.off >= len(c.buf) { + _, v := c.cursor.Next() + c.setBuf(v) + } + + return c.read() + }() + + if k != -1 && v == nil { + // There is a point in the series at the next timestamp, + // but not for this cursor's field. Go to the next point. + continue + } + return k, v + } +} + +// setBuf saves a compressed block to the buffer. +func (c *cursor) setBuf(block []byte) { + // Clear if the block is empty. + if len(block) == 0 { + c.buf, c.off, c.fieldIndices, c.index = c.buf[0:0], 0, c.fieldIndices[0:0], 0 + return + } + + // Otherwise decode block into buffer. + // Skip over the first 8 bytes since they are the max timestamp. + buf, err := snappy.Decode(nil, block[8:]) + if err != nil { + c.buf = c.buf[0:0] + fmt.Printf("block decode error: %s\n", err) + } + + c.buf, c.off = buf, 0 +} + +// read reads the current key and value from the current block. +func (c *cursor) read() (key int64, value interface{}) { + // Return nil if the offset is at the end of the buffer. + if c.off >= len(c.buf) { + return -1, nil + } + + // Otherwise read the current entry. + buf := c.buf[c.off:] + dataSize := entryDataSize(buf) + + return tsdb.DecodeKeyValue(c.field, c.dec, buf[0:8], buf[entryHeaderSize:entryHeaderSize+dataSize]) +} + +// Sort bz1 cursors in correct order for writing to TSM files. + +type cursors []*cursor + +func (a cursors) Len() int { return len(a) } +func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a cursors) Less(i, j int) bool { + return tsm.SeriesFieldKey(a[i].series, a[i].field) < tsm.SeriesFieldKey(a[j].series, a[j].field) +} + +// btou64 converts an 8-byte slice into an uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// entryHeaderSize is the number of bytes required for the header. +const entryHeaderSize = 8 + 4 + +// entryDataSize returns the size of an entry's data field, in bytes. +func entryDataSize(v []byte) int { return int(binary.BigEndian.Uint32(v[8:12])) } diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go new file mode 100644 index 00000000000..995d5783849 --- /dev/null +++ b/cmd/influx_tsm/converter.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +type KeyIterator interface { + Next() bool + Read() (string, []tsm1.Value, error) +} + +// Converter encapsulates the logic for converting b*1 shards to tsm1 shards. +type Converter struct { + path string + maxTSMFileSize uint32 + generation int +} + +// NewConverter returns a new instance of the Converter. +func NewConverter(path string, sz uint32) *Converter { + return &Converter{ + path: path, + maxTSMFileSize: sz, + } +} + +// Process writes the data provided by iter to a tsm1 shard. +func (c *Converter) Process(iter KeyIterator) error { + // Ensure the tsm1 directory exists. + if err := os.MkdirAll(c.path, 0777); err != nil { + return err + } + + w, err := c.nextTSMWriter() + if err != nil { + return err + } + defer w.Close() + + // Iterate until no more data remains. + for iter.Next() { + k, v, err := iter.Read() + if err != nil { + return err + } + w.Write(k, v) + + // If we have a max file size configured and we're over it, start a new TSM file. + if w.Size() > c.maxTSMFileSize { + if err := w.WriteIndex(); err != nil { + return err + } + if err := w.Close(); err != nil { + return err + } + + w, err = c.nextTSMWriter() + if err != nil { + return err + } + } + } + + // All done! + return w.WriteIndex() +} + +// nextTSMWriter returns the next TSMWriter for the Converter. +func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { + c.generation++ + fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", c.generation, 0, tsm1.TSMFileExtension)) + + fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + + // Create the writer for the new TSM file. + w, err := tsm1.NewTSMWriter(fd) + if err != nil { + return nil, err + } + + return w, nil +} diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go new file mode 100644 index 00000000000..c2450c8c5b8 --- /dev/null +++ b/cmd/influx_tsm/main.go @@ -0,0 +1,337 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strings" + "sync" + "text/tabwriter" + "time" + + "github.com/influxdb/influxdb/cmd/influx_tsm/b1" + "github.com/influxdb/influxdb/cmd/influx_tsm/bz1" + "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb" +) + +type ShardReader interface { + KeyIterator + Open() error + Close() error +} + +const ( + backupExt = "bak" + tsmExt = "tsm" +) + +var description = fmt.Sprintf(` +Convert a database from b1 or bz1 format to tsm1 format. + +This tool will make backup any directory before conversion. It +is up to the end-user to delete the backup on the disk. Backups are +named by suffixing the database name with '.%s'. The backups will +be ignored by the system since they are not registered with the cluster. + +To restore a backup, delete the tsm version, rename the backup and +restart the node.`, backupExt) + +var dataPath string +var ds string +var tsmSz uint64 +var parallel bool +var disBack bool + +const maxTSMSz = 1 * 1024 * 1024 * 1024 + +func init() { + flag.StringVar(&ds, "dbs", "", "Comma-delimited list of databases to convert. Default is to convert all") + flag.Uint64Var(&tsmSz, "sz", maxTSMSz, "Maximum size of individual TSM files.") + flag.BoolVar(¶llel, "parallel", false, "Perform parallel conversion.") + flag.BoolVar(&disBack, "nobackup", false, "Disable database backups. Not recommended.") + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage: %s [options] \n", os.Args[0]) + fmt.Fprintf(os.Stderr, "%s\n\n", description) + flag.PrintDefaults() + fmt.Fprintf(os.Stderr, "\n") + } +} + +func main() { + pg := NewParallelGroup(1) + + flag.Parse() + if len(flag.Args()) < 1 { + fmt.Fprintf(os.Stderr, "No data directory specified\n") + os.Exit(1) + } + dataPath = flag.Args()[0] + + if tsmSz > maxTSMSz { + fmt.Fprintf(os.Stderr, "Maximum TSM file size is %d\n", maxTSMSz) + os.Exit(1) + } + + // Check if specific directories were requested. + reqDs := strings.Split(ds, ",") + if len(reqDs) == 1 && reqDs[0] == "" { + reqDs = nil + } + + // Determine the list of databases + dbs, err := ioutil.ReadDir(dataPath) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to access data directory at %s: %s\n", dataPath, err.Error()) + os.Exit(1) + } + fmt.Println() // Cleanly separate output from start of program. + + // Dump summary of what is about to happen. + fmt.Println("b1 and bz1 shard conversion.") + fmt.Println("-----------------------------------") + fmt.Println("Data directory is: ", dataPath) + fmt.Println("Databases specified: ", allDBs(reqDs)) + fmt.Println("Database backups enabled:", yesno(!disBack)) + fmt.Println("Parallel mode enabled: ", yesno(parallel)) + fmt.Println() + + // Get the list of shards for conversion. + var shards []*tsdb.ShardInfo + for _, db := range dbs { + if strings.HasSuffix(db.Name(), backupExt) { + fmt.Printf("Skipping %s as it looks like a backup.\n", db.Name()) + continue + } + + d := tsdb.NewDatabase(filepath.Join(dataPath, db.Name())) + shs, err := d.Shards() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to access shards for database %s: %s\n", d.Name(), err.Error()) + os.Exit(1) + } + shards = append(shards, shs...) + } + sort.Sort(tsdb.ShardInfos(shards)) + usl := len(shards) + shards = tsdb.ShardInfos(shards).FilterFormat(tsdb.TSM1).ExclusiveDatabases(reqDs) + sl := len(shards) + + // Anything to convert? + fmt.Printf("\n%d shard(s) detected, %d non-TSM shards detected.\n", usl, sl) + if len(shards) == 0 { + fmt.Printf("Nothing to do.\n") + os.Exit(0) + } + + // Display list of convertible shards. + fmt.Println() + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 1, '\t', 0) + fmt.Fprintln(w, "Database\tRetention\tPath\tEngine\tSize") + for _, si := range shards { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", si.Database, si.RetentionPolicy, si.FullPath(dataPath), si.FormatAsString(), si.Size) + } + w.Flush() + + // Get confirmation from user. + fmt.Printf("\nThese shards will be converted. Proceed? y/N: ") + liner := bufio.NewReader(os.Stdin) + yn, err := liner.ReadString('\n') + if err != nil { + fmt.Fprintf(os.Stderr, "failed to read response: %s", err.Error()) + os.Exit(1) + } + yn = strings.TrimRight(strings.ToLower(yn), "\n") + if yn != "y" { + fmt.Println("Conversion aborted.") + os.Exit(1) + } + fmt.Println("Conversion starting....") + + // Backup each directory. + if !disBack { + databases := tsdb.ShardInfos(shards).Databases() + if parallel { + pg = NewParallelGroup(len(databases)) + } + for _, db := range databases { + pg.Request() + go func(db string) { + defer pg.Release() + + start := time.Now() + err := backupDatabase(filepath.Join(dataPath, db)) + if err != nil { + fmt.Fprintf(os.Stderr, "Backup of database %s failed: %s\n", db, err.Error()) + os.Exit(1) + } + fmt.Printf("Database %s backed up (%v)\n", db, time.Now().Sub(start)) + }(db) + } + pg.Wait() + } else { + fmt.Println("Database backup disabled.") + } + + // Convert each shard. + if parallel { + pg = NewParallelGroup(len(shards)) + } + for _, si := range shards { + pg.Request() + go func(si *tsdb.ShardInfo) { + defer pg.Release() + + start := time.Now() + if err := convertShard(si); err != nil { + fmt.Fprintf(os.Stderr, "Failed to convert %s: %s\n", si.FullPath(dataPath), err.Error()) + os.Exit(1) + } + fmt.Printf("Conversion of %s successful (%s)\n", si.FullPath(dataPath), time.Now().Sub(start)) + }(si) + } + pg.Wait() +} + +// backupDatabase backs up the database at src. +func backupDatabase(src string) error { + dest := filepath.Join(src + "." + backupExt) + if _, err := os.Stat(dest); !os.IsNotExist(err) { + return fmt.Errorf("backup of %s already exists", src) + } + return copyDir(dest, src) +} + +// copyDir copies the directory at src to dest. If dest does not exist it +// will be created. It is up to the caller to ensure the paths don't overlap. +func copyDir(dest, src string) error { + copyFile := func(path string, info os.FileInfo, err error) error { + // Strip the src from the path and replace with dest. + toPath := strings.Replace(path, src, dest, 1) + + // Copy it. + if info.IsDir() { + if err := os.MkdirAll(toPath, info.Mode()); err != nil { + return err + } + } else { + err := func() error { + in, err := os.Open(path) + if err != nil { + return err + } + defer in.Close() + + out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode()) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, in) + return err + }() + if err != nil { + return err + } + } + return nil + } + + return filepath.Walk(src, copyFile) +} + +// convertShard converts the shard in-place. +func convertShard(si *tsdb.ShardInfo) error { + src := si.FullPath(dataPath) + dst := fmt.Sprintf("%s.%s", src, tsmExt) + + var reader ShardReader + switch si.Format { + case tsdb.BZ1: + reader = bz1.NewReader(src) + case tsdb.B1: + reader = b1.NewReader(src) + default: + return fmt.Errorf("Unsupported shard format: %s", si.FormatAsString()) + } + defer reader.Close() + + // Open the shard, and create a converter. + if err := reader.Open(); err != nil { + return fmt.Errorf("Failed to open %s for conversion: %s", src, err.Error()) + } + converter := NewConverter(dst, uint32(tsmSz)) + + // Perform the conversion. + if err := converter.Process(reader); err != nil { + return fmt.Errorf("Conversion of %s failed: %s", src, err.Error()) + } + + // Delete source shard, and rename new tsm1 shard. + if err := reader.Close(); err != nil { + return fmt.Errorf("Conversion of %s failed due to close: %s", src, err.Error()) + } + + if err := os.RemoveAll(si.FullPath(dataPath)); err != nil { + return fmt.Errorf("Deletion of %s failed: %s", src, err.Error()) + } + if err := os.Rename(dst, src); err != nil { + return fmt.Errorf("Rename of %s to %s failed: %s", dst, src, err.Error()) + } + + return nil +} + +// ParallelGroup allows the maximum parrallelism of a set of operations to be controlled. +type ParallelGroup struct { + c chan struct{} + wg sync.WaitGroup +} + +// NewParallelGroup returns a group which allows n operations to run in parallel. A value of 0 +// means no operations will ever run. +func NewParallelGroup(n int) *ParallelGroup { + return &ParallelGroup{ + c: make(chan struct{}, n), + } +} + +// Request requests permission to start an operation. It will block unless and until +// the parallel requirements would not be violated. +func (p *ParallelGroup) Request() { + p.wg.Add(1) + p.c <- struct{}{} +} + +// Release informs the group that a previoulsy requested operation has completed. +func (p *ParallelGroup) Release() { + <-p.c + p.wg.Done() +} + +// Wait blocks until the ParallelGroup has no unreleased operations. +func (p *ParallelGroup) Wait() { + p.wg.Wait() +} + +// yesno returns "yes" for true, "no" for false. +func yesno(b bool) string { + if b { + return "yes" + } + return "no" +} + +// allDBs returns "all" if all databases are requested for conversion. +func allDBs(dbs []string) string { + if dbs == nil { + return "all" + } + return fmt.Sprintf("%v", dbs) +} diff --git a/cmd/influx_tsm/tsdb/codec.go b/cmd/influx_tsm/tsdb/codec.go new file mode 100644 index 00000000000..86311f998a2 --- /dev/null +++ b/cmd/influx_tsm/tsdb/codec.go @@ -0,0 +1,293 @@ +package tsdb + +import ( + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + + "github.com/influxdb/influxdb/influxql" +) + +const maxStringLength = 64 * 1024 + +var ( + // ErrFieldNotFound is returned when a field cannot be found. + ErrFieldNotFound = errors.New("field not found") + + // ErrFieldUnmappedID is returned when the system is presented, during decode, with a field ID + // there is no mapping for. + ErrFieldUnmappedID = errors.New("field ID not mapped") +) + +// FieldCodec provides encoding and decoding functionality for the fields of a given +// Measurement. +type FieldCodec struct { + fieldsByID map[uint8]*Field + fieldsByName map[string]*Field +} + +// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with +// a RLock that protects the Measurement. +func NewFieldCodec(fields map[string]*Field) *FieldCodec { + fieldsByID := make(map[uint8]*Field, len(fields)) + fieldsByName := make(map[string]*Field, len(fields)) + for _, f := range fields { + fieldsByID[f.ID] = f + fieldsByName[f.Name] = f + } + return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName} +} + +// EncodeFields converts a map of values with string keys to a byte slice of field +// IDs and values. +// +// If a field exists in the codec, but its type is different, an error is returned. If +// a field is not present in the codec, the system panics. +func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) { + // Allocate byte slice + b := make([]byte, 0, 10) + + for k, v := range values { + field := f.fieldsByName[k] + if field == nil { + panic(fmt.Sprintf("field does not exist for %s", k)) + } else if influxql.InspectDataType(v) != field.Type { + return nil, fmt.Errorf("field \"%s\" is type %T, mapped as type %s", k, v, field.Type) + } + + var buf []byte + + switch field.Type { + case influxql.Float: + value := v.(float64) + buf = make([]byte, 9) + binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value)) + case influxql.Integer: + var value uint64 + switch v.(type) { + case int: + value = uint64(v.(int)) + case int32: + value = uint64(v.(int32)) + case int64: + value = uint64(v.(int64)) + default: + panic(fmt.Sprintf("invalid integer type: %T", v)) + } + buf = make([]byte, 9) + binary.BigEndian.PutUint64(buf[1:9], value) + case influxql.Boolean: + value := v.(bool) + + // Only 1 byte need for a boolean. + buf = make([]byte, 2) + if value { + buf[1] = byte(1) + } + case influxql.String: + value := v.(string) + if len(value) > maxStringLength { + value = value[:maxStringLength] + } + // Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string. + buf = make([]byte, len(value)+3) + + // Set the string length, then copy the string itself. + binary.BigEndian.PutUint16(buf[1:3], uint16(len(value))) + for i, c := range []byte(value) { + buf[i+3] = byte(c) + } + default: + panic(fmt.Sprintf("unsupported value type during encode fields: %T", v)) + } + + // Always set the field ID as the leading byte. + buf[0] = field.ID + + // Append temp buffer to the end. + b = append(b, buf...) + } + + return b, nil +} + +// FieldIDByName returns the ID for the given field. +func (f *FieldCodec) FieldIDByName(s string) (uint8, error) { + fi := f.fieldsByName[s] + if fi == nil { + return 0, ErrFieldNotFound + } + return fi.ID, nil +} + +// DecodeFields decodes a byte slice into a set of field ids and values. +func (f *FieldCodec) DecodeFields(b []byte) (map[uint8]interface{}, error) { + if len(b) == 0 { + return nil, nil + } + + // Create a map to hold the decoded data. + values := make(map[uint8]interface{}, 0) + + for { + if len(b) < 1 { + // No more bytes. + break + } + + // First byte is the field identifier. + fieldID := b[0] + field := f.fieldsByID[fieldID] + if field == nil { + // See note in DecodeByID() regarding field-mapping failures. + return nil, ErrFieldUnmappedID + } + + var value interface{} + switch field.Type { + case influxql.Float: + value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9])) + // Move bytes forward. + b = b[9:] + case influxql.Integer: + value = int64(binary.BigEndian.Uint64(b[1:9])) + // Move bytes forward. + b = b[9:] + case influxql.Boolean: + if b[1] == 1 { + value = true + } else { + value = false + } + // Move bytes forward. + b = b[2:] + case influxql.String: + size := binary.BigEndian.Uint16(b[1:3]) + value = string(b[3 : size+3]) + // Move bytes forward. + b = b[size+3:] + default: + panic(fmt.Sprintf("unsupported value type during decode fields: %T", f.fieldsByID[fieldID])) + } + + values[fieldID] = value + } + + return values, nil +} + +// DecodeFieldsWithNames decodes a byte slice into a set of field names and values +func (f *FieldCodec) DecodeFieldsWithNames(b []byte) (map[string]interface{}, error) { + fields, err := f.DecodeFields(b) + if err != nil { + return nil, err + } + m := make(map[string]interface{}) + for id, v := range fields { + field := f.fieldsByID[id] + if field != nil { + m[field.Name] = v + } + } + return m, nil +} + +// DecodeByID scans a byte slice for a field with the given ID, converts it to its +// expected type, and return that value. +func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) { + if len(b) == 0 { + return 0, ErrFieldNotFound + } + + for { + if len(b) < 1 { + // No more bytes. + break + } + field, ok := f.fieldsByID[b[0]] + if !ok { + // This can happen, though is very unlikely. If this node receives encoded data, to be written + // to disk, and is queried for that data before its metastore is updated, there will be no field + // mapping for the data during decode. All this can happen because data is encoded by the node + // that first received the write request, not the node that actually writes the data to disk. + // So if this happens, the read must be aborted. + return 0, ErrFieldUnmappedID + } + + var value interface{} + switch field.Type { + case influxql.Float: + // Move bytes forward. + value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9])) + b = b[9:] + case influxql.Integer: + value = int64(binary.BigEndian.Uint64(b[1:9])) + b = b[9:] + case influxql.Boolean: + if b[1] == 1 { + value = true + } else { + value = false + } + // Move bytes forward. + b = b[2:] + case influxql.String: + size := binary.BigEndian.Uint16(b[1:3]) + value = string(b[3 : 3+size]) + // Move bytes forward. + b = b[size+3:] + default: + panic(fmt.Sprintf("unsupported value type during decode by id: %T", field.Type)) + } + + if field.ID == targetID { + return value, nil + } + } + + return 0, ErrFieldNotFound +} + +// DecodeByName scans a byte slice for a field with the given name, converts it to its +// expected type, and return that value. +func (f *FieldCodec) DecodeByName(name string, b []byte) (interface{}, error) { + fi := f.FieldByName(name) + if fi == nil { + return 0, ErrFieldNotFound + } + return f.DecodeByID(fi.ID, b) +} + +func (f *FieldCodec) Fields() (a []*Field) { + for _, f := range f.fieldsByID { + a = append(a, f) + } + return +} + +// FieldByName returns the field by its name. It will return a nil if not found +func (f *FieldCodec) FieldByName(name string) *Field { + return f.fieldsByName[name] +} + +// mustMarshal encodes a value to JSON. +// This will panic if an error occurs. This should only be used internally when +// an invalid marshal will cause corruption and a panic is appropriate. +func mustMarshalJSON(v interface{}) []byte { + b, err := json.Marshal(v) + if err != nil { + panic("marshal: " + err.Error()) + } + return b +} + +// mustUnmarshalJSON decodes a value from JSON. +// This will panic if an error occurs. This should only be used internally when +// an invalid unmarshal will cause corruption and a panic is appropriate. +func mustUnmarshalJSON(b []byte, v interface{}) { + if err := json.Unmarshal(b, v); err != nil { + panic("unmarshal: " + err.Error()) + } +} diff --git a/cmd/influx_tsm/tsdb/database.go b/cmd/influx_tsm/tsdb/database.go new file mode 100644 index 00000000000..44b94604644 --- /dev/null +++ b/cmd/influx_tsm/tsdb/database.go @@ -0,0 +1,231 @@ +package tsdb + +import ( + "fmt" + "os" + "path" + "path/filepath" + "sort" + "time" + + "github.com/boltdb/bolt" + "github.com/influxdb/influxdb/pkg/slices" +) + +const ( + B1 = iota + BZ1 + TSM1 +) + +type EngineFormat int + +// String returns the string format of the engine. +func (e EngineFormat) String() string { + switch e { + case TSM1: + return "tsm1" + case B1: + return "b1" + case BZ1: + return "bz1" + default: + panic("unrecognized shard engine format") + } +} + +// ShardInfo is the description of a shard on disk. +type ShardInfo struct { + Database string + RetentionPolicy string + Path string + Format EngineFormat + Size int64 +} + +// FormatAsString returns the format of the shard as a string. +func (s *ShardInfo) FormatAsString() string { + return s.Format.String() +} + +// FullPath returns the full path to the shard, given the data directory root. +func (s *ShardInfo) FullPath(dataPath string) string { + return filepath.Join(dataPath, s.Database, s.RetentionPolicy, s.Path) +} + +type ShardInfos []*ShardInfo + +func (s ShardInfos) Len() int { return len(s) } +func (s ShardInfos) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s ShardInfos) Less(i, j int) bool { + if s[i].Database == s[j].Database { + if s[i].RetentionPolicy == s[j].RetentionPolicy { + return s[i].Path < s[i].Path + } else { + return s[i].RetentionPolicy < s[j].RetentionPolicy + } + } + return s[i].Database < s[j].Database +} + +// Databases returns the sorted unique set of databases for the shards. +func (s ShardInfos) Databases() []string { + dbm := make(map[string]bool) + for _, ss := range s { + dbm[ss.Database] = true + } + + var dbs []string + for k, _ := range dbm { + dbs = append(dbs, k) + } + sort.Strings(dbs) + return dbs +} + +// FilterFormat returns a copy of the ShardInfos, with shards of the given +// format removed. +func (s ShardInfos) FilterFormat(fmt EngineFormat) ShardInfos { + var a ShardInfos + for _, si := range s { + if si.Format != fmt { + a = append(a, si) + } + } + return a +} + +// ExclusiveDatabases returns a copy of the ShardInfo, with shards associated +// with the given databases present. If the given set is empty, all databases +// are returned. +func (s ShardInfos) ExclusiveDatabases(exc []string) ShardInfos { + var a ShardInfos + + // Empty set? Return everything. + if len(exc) == 0 { + a = make(ShardInfos, len(s)) + copy(a, s) + return a + } + + for _, si := range s { + if slices.Exists(exc, si.Database) { + a = append(a, si) + } + } + return a +} + +// Database represents an entire database on disk. +type Database struct { + path string +} + +// NewDatabase creates a database instance using data at path. +func NewDatabase(path string) *Database { + return &Database{path: path} +} + +// Name returns the name of the database. +func (d *Database) Name() string { + return path.Base(d.path) +} + +// Path returns the path to the database. +func (d *Database) Path() string { + return d.path +} + +// Shards returns information for every shard in the database. +func (d *Database) Shards() ([]*ShardInfo, error) { + fd, err := os.Open(d.path) + if err != nil { + return nil, err + } + + // Get each retention policy. + rps, err := fd.Readdirnames(-1) + if err != nil { + return nil, err + } + + // Process each retention policy. + var shardInfos []*ShardInfo + for _, rp := range rps { + rpfd, err := os.Open(filepath.Join(d.path, rp)) + if err != nil { + return nil, err + } + + // Process each shard + shards, err := rpfd.Readdirnames(-1) + for _, sh := range shards { + fmt, sz, err := shardFormat(filepath.Join(d.path, rp, sh)) + if err != nil { + return nil, err + } + + si := &ShardInfo{ + Database: d.Name(), + RetentionPolicy: path.Base(rp), + Path: sh, + Format: fmt, + Size: sz, + } + shardInfos = append(shardInfos, si) + } + } + + sort.Sort(ShardInfos(shardInfos)) + return shardInfos, nil +} + +// shardFormat returns the format and size on disk of the shard at path. +func shardFormat(path string) (EngineFormat, int64, error) { + // If it's a directory then it's a tsm1 engine + f, err := os.Open(path) + if err != nil { + return 0, 0, err + } + fi, err := f.Stat() + f.Close() + if err != nil { + return 0, 0, err + } + if fi.Mode().IsDir() { + return TSM1, fi.Size(), nil + } + + // It must be a BoltDB-based engine. + db, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return 0, 0, err + } + defer db.Close() + + var format EngineFormat + err = db.View(func(tx *bolt.Tx) error { + // Retrieve the meta bucket. + b := tx.Bucket([]byte("meta")) + + // If no format is specified then it must be an original b1 database. + if b == nil { + format = B1 + return nil + } + + // There is an actual format indicator. + switch string(b.Get([]byte("format"))) { + case "b1", "v1": + format = B1 + case "bz1": + format = BZ1 + default: + return fmt.Errorf("unrecognized engine format: %s", string(b.Get([]byte("format")))) + } + + return nil + }) + + return format, fi.Size(), err +} diff --git a/cmd/influx_tsm/tsdb/internal/meta.pb.go b/cmd/influx_tsm/tsdb/internal/meta.pb.go new file mode 100644 index 00000000000..c580f4dba61 --- /dev/null +++ b/cmd/influx_tsm/tsdb/internal/meta.pb.go @@ -0,0 +1,122 @@ +// Code generated by protoc-gen-gogo. +// source: internal/meta.proto +// DO NOT EDIT! + +/* +Package internal is a generated protocol buffer package. + +It is generated from these files: + internal/meta.proto + +It has these top-level messages: + Series + Tag + MeasurementFields + Field +*/ +package internal + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type Series struct { + Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` + Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Series) Reset() { *m = Series{} } +func (m *Series) String() string { return proto.CompactTextString(m) } +func (*Series) ProtoMessage() {} + +func (m *Series) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Series) GetTags() []*Tag { + if m != nil { + return m.Tags + } + return nil +} + +type Tag struct { + Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` + Value *string `protobuf:"bytes,2,req,name=Value" json:"Value,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Tag) Reset() { *m = Tag{} } +func (m *Tag) String() string { return proto.CompactTextString(m) } +func (*Tag) ProtoMessage() {} + +func (m *Tag) GetKey() string { + if m != nil && m.Key != nil { + return *m.Key + } + return "" +} + +func (m *Tag) GetValue() string { + if m != nil && m.Value != nil { + return *m.Value + } + return "" +} + +type MeasurementFields struct { + Fields []*Field `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *MeasurementFields) Reset() { *m = MeasurementFields{} } +func (m *MeasurementFields) String() string { return proto.CompactTextString(m) } +func (*MeasurementFields) ProtoMessage() {} + +func (m *MeasurementFields) GetFields() []*Field { + if m != nil { + return m.Fields + } + return nil +} + +type Field struct { + ID *int32 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + Type *int32 `protobuf:"varint,3,req,name=Type" json:"Type,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Field) Reset() { *m = Field{} } +func (m *Field) String() string { return proto.CompactTextString(m) } +func (*Field) ProtoMessage() {} + +func (m *Field) GetID() int32 { + if m != nil && m.ID != nil { + return *m.ID + } + return 0 +} + +func (m *Field) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} + +func (m *Field) GetType() int32 { + if m != nil && m.Type != nil { + return *m.Type + } + return 0 +} diff --git a/cmd/influx_tsm/tsdb/types.go b/cmd/influx_tsm/tsdb/types.go new file mode 100644 index 00000000000..56f33ade057 --- /dev/null +++ b/cmd/influx_tsm/tsdb/types.go @@ -0,0 +1,111 @@ +package tsdb + +import ( + "encoding/binary" + "strings" + + "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb/internal" + "github.com/influxdb/influxdb/influxql" + + "github.com/gogo/protobuf/proto" +) + +// Cursor represents an iterator over a series. +type Cursor interface { + SeekTo(seek int64) (key int64, value interface{}) + Next() (key int64, value interface{}) +} + +// Field represents an encoded field. +type Field struct { + ID uint8 `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Type influxql.DataType `json:"type,omitempty"` +} + +// MeasurementFields is a mapping from measurements to its fields. +type MeasurementFields struct { + Fields map[string]*Field `json:"fields"` + Codec *FieldCodec +} + +// MarshalBinary encodes the object to a binary format. +func (m *MeasurementFields) MarshalBinary() ([]byte, error) { + var pb internal.MeasurementFields + for _, f := range m.Fields { + id := int32(f.ID) + name := f.Name + t := int32(f.Type) + pb.Fields = append(pb.Fields, &internal.Field{ID: &id, Name: &name, Type: &t}) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes the object from a binary format. +func (m *MeasurementFields) UnmarshalBinary(buf []byte) error { + var pb internal.MeasurementFields + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + m.Fields = make(map[string]*Field) + for _, f := range pb.Fields { + m.Fields[f.GetName()] = &Field{ID: uint8(f.GetID()), Name: f.GetName(), Type: influxql.DataType(f.GetType())} + } + return nil +} + +// Series represents a series in the shard. +type Series struct { + Key string + Tags map[string]string +} + +// MarshalBinary encodes the object to a binary format. +func (s *Series) MarshalBinary() ([]byte, error) { + var pb internal.Series + pb.Key = &s.Key + for k, v := range s.Tags { + key := k + value := v + pb.Tags = append(pb.Tags, &internal.Tag{Key: &key, Value: &value}) + } + return proto.Marshal(&pb) +} + +// UnmarshalBinary decodes the object from a binary format. +func (s *Series) UnmarshalBinary(buf []byte) error { + var pb internal.Series + if err := proto.Unmarshal(buf, &pb); err != nil { + return err + } + s.Key = pb.GetKey() + s.Tags = make(map[string]string) + for _, t := range pb.Tags { + s.Tags[t.GetKey()] = t.GetValue() + } + return nil +} + +// MeasurementFromSeriesKey returns the Measurement name for a given series. +func MeasurementFromSeriesKey(key string) string { + idx := strings.Index(key, ",") + if idx == -1 { + return key + } + return key[:strings.Index(key, ",")] +} + +// DecodeKeyValue decodes the key and value from bytes. +func DecodeKeyValue(field string, dec *FieldCodec, k, v []byte) (int64, interface{}) { + // Convert key to a timestamp. + key := int64(btou64(k[0:8])) + + decValue, err := dec.DecodeByName(field, v) + if err != nil { + return key, nil + } + return key, decValue +} + +// btou64 converts an 8-byte slice into an uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/cmd/influx_tsm/tsdb/values.go b/cmd/influx_tsm/tsdb/values.go new file mode 100644 index 00000000000..ec5a5b2589b --- /dev/null +++ b/cmd/influx_tsm/tsdb/values.go @@ -0,0 +1,139 @@ +package tsdb + +import ( + "fmt" + "time" + + tsm "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +type FloatValue struct { + T time.Time + V float64 +} + +func (f *FloatValue) Time() time.Time { + return f.T +} + +func (f *FloatValue) UnixNano() int64 { + return f.T.UnixNano() +} + +func (f *FloatValue) Value() interface{} { + return f.V +} + +func (f *FloatValue) Size() int { + return 16 +} + +func (f *FloatValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + +type BoolValue struct { + T time.Time + V bool +} + +func (b *BoolValue) Time() time.Time { + return b.T +} + +func (b *BoolValue) Size() int { + return 9 +} + +func (b *BoolValue) UnixNano() int64 { + return b.T.UnixNano() +} + +func (b *BoolValue) Value() interface{} { + return b.V +} + +func (f *BoolValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + +type Int64Value struct { + T time.Time + V int64 +} + +func (v *Int64Value) Time() time.Time { + return v.T +} + +func (v *Int64Value) Value() interface{} { + return v.V +} + +func (v *Int64Value) UnixNano() int64 { + return v.T.UnixNano() +} + +func (v *Int64Value) Size() int { + return 16 +} + +func (f *Int64Value) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + +type StringValue struct { + T time.Time + V string +} + +func (v *StringValue) Time() time.Time { + return v.T +} + +func (v *StringValue) Value() interface{} { + return v.V +} + +func (v *StringValue) UnixNano() int64 { + return v.T.UnixNano() +} + +func (v *StringValue) Size() int { + return 8 + len(v.V) +} + +func (f *StringValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + +func ConvertToValue(k int64, v interface{}) tsm.Value { + var value tsm.Value + + switch v := v.(type) { + case int64: + value = &Int64Value{ + T: time.Unix(0, k), + V: v, + } + case float64: + value = &FloatValue{ + T: time.Unix(0, k), + V: v, + } + case bool: + value = &BoolValue{ + T: time.Unix(0, k), + V: v, + } + case string: + value = &StringValue{ + T: time.Unix(0, k), + V: v, + } + default: + panic(fmt.Sprintf("value type %T unsupported for conversion", v)) + } + + return value +} diff --git a/package.sh b/package.sh index ede3d33796b..531e792df97 100755 --- a/package.sh +++ b/package.sh @@ -73,6 +73,7 @@ BINS=( influxd influx influx_stress + influx_tsm influx_inspect )