Skip to content

Commit

Permalink
b*1 to tsm1 shard converter
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Dec 29, 2015
1 parent 86f433b commit eaec514
Show file tree
Hide file tree
Showing 13 changed files with 1,958 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ cmd/influxd/version.go

# executables

influx_tsm
**/influx_tsm
!**/influx_tsm/

influx_stress
**/influx_stress
!**/influx_stress/
Expand Down
1 change: 1 addition & 0 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'influxd' : './cmd/influxd/main.go',
'influx_stress' : './cmd/influx_stress/influx_stress.go',
'influx_inspect' : './cmd/influx_inspect/*.go',
'influx_tsm' : './cmd/influx_tsm/main.go',
}

supported_builds = {
Expand Down
50 changes: 50 additions & 0 deletions cmd/influx_tsm/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Converting b1 and bz1 shards to tsm1
`influx_tsm` is a tool for converting b1 and bz1 shards to tsm1 format. Converting shards to tsm1 format results in a very significant reduction in disk usage, and significantly improved write-throughput, when writing data into those shards.

Conversion can be controlled on a database-by-database basis. By default a database is backed up before it is converted, allowing you to roll back any changes. Because of the backup process, ensure the host system has at least as much free disk space as the disk space consumed by the _data_ directory of your InfluxDB system.

The tool automatically ignores tsm1 shards, and can be run idempotently on any database.

Conversion is an offline process, and the InfluxDB system must be stopped during conversion. However the conversion process reads and writes shards directly on disk and should be fast.

## Steps
Follow these steps to perform a conversion.

* Identify the databases you wish to convert. You can convert one or more databases at a time. By default all databases are converted.
* Decide on parallel operation. By default the conversion operation peforms each operation in a serial manner. This minimizes load on the host system performing the conversion, but also takes the most time. If you wish to minimize the time conversion takes, enable parallel mode. Conversion will then perform as many operations as possible in parallel, but the process may place significant load on the host system (CPU, disk, and RAM, usage will all increase).
* Stop all write-traffic to your InfluxDB system.
* Restart the InfluxDB service and wait until all WAL data is flushed to disk -- this has completed when the system responds to queries. This is to ensure all data is present in shards.
* Stop the InfluxDB service. It should not be restarted until conversion is complete.
* Run conversion tool.
* Restart node and ensure data looks correct.
* If everything looks OK, you may then wish to remove or archive the backed-up databases. This is not required for a correctly functioning InfluxDB system, since the backed-up databases will be simply ignored by the system. Backed-up databases are suffixed with the extension `.bak`.
* Restart write traffic.

## Example session
Below is an example session, showing a database being converted.

```
$ influx_tsm -parallel ~/.influxdb/data/
b1 and bz1 shard conversion.
-----------------------------------
Data directory is: /home/bob/.influxdb/data/
Databases specified: all
Parallel mode enabled: yes
Database backups enabled: yes
1 shard(s) detected, 1 non-TSM shards detected.
Database Retention Path Engine Size
_internal monitor /home/bob/.influxdb/data/_internal/monitor/1 bz1 262144
These shards will be converted. Proceed? y/N: y
Conversion starting....
Database _internal backed up.
Conversion of /home/bob/.influxdb/data/_internal/monitor/1 successful (27.485037ms)
$ rm -r /home/bob/.influxdb/data/_internal.bak # After confirming converted data looks good.
```
Note that the tool first lists the shards that will be converted, before asking for confirmation. You can abort the conversion process at this step if you just wish to see what would be converted, or if the list of shards does not look correct.

## Rolling back a conversion
If you wish to rollback a conversion check the databases in your _data_ directory. For every backed-up database remove the non-backed up version and then rename the backup so that it no longer has the extention `.bak`. Then restart your InfluxDB system.
238 changes: 238 additions & 0 deletions cmd/influx_tsm/b1/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package b1

import (
"encoding/binary"
"sort"
"time"

"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/cmd/influx_tsm/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)

const DefaultChunkSize = 1000

// Reader is used to read all data from a b1 shard.
type Reader struct {
path string
db *bolt.DB
tx *bolt.Tx

cursors []*cursor
currCursor int

keyBuf string
valuesBuf []tsm1.Value

series map[string]*tsdb.Series
fields map[string]*tsdb.MeasurementFields
codecs map[string]*tsdb.FieldCodec

ChunkSize int
}

// NewReader returns a reader for the b1 shard at path.
func NewReader(path string) *Reader {
return &Reader{
path: path,
series: make(map[string]*tsdb.Series),
fields: make(map[string]*tsdb.MeasurementFields),
codecs: make(map[string]*tsdb.FieldCodec),
}
}

// Open opens the reader.
func (r *Reader) Open() error {
// Open underlying storage.
db, err := bolt.Open(r.path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return err
}
r.db = db

// Load fields.
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("fields"))
c := meta.Cursor()

for k, v := c.First(); k != nil; k, v = c.Next() {
mf := &tsdb.MeasurementFields{}
if err := mf.UnmarshalBinary(v); err != nil {
return err
}
r.fields[string(k)] = mf
r.codecs[string(k)] = tsdb.NewFieldCodec(mf.Fields)
}
return nil
}); err != nil {
return err
}

// Load series
if err := r.db.View(func(tx *bolt.Tx) error {
meta := tx.Bucket([]byte("series"))
c := meta.Cursor()

for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
if err := series.UnmarshalBinary(v); err != nil {
return err
}
r.series[string(k)] = series
}
return nil
}); err != nil {
return err
}

// Create cursor for each field of each series.
r.tx, err = r.db.Begin(false)
if err != nil {
return err
}
for s, _ := range r.series {
if err != nil {
return err
}

measurement := tsdb.MeasurementFromSeriesKey(s)
for _, f := range r.fields[tsdb.MeasurementFromSeriesKey(s)].Fields {
c := newCursor(r.tx, s, f.Name, r.codecs[measurement])
c.SeekTo(0)
r.cursors = append(r.cursors, c)
}
}
sort.Sort(cursors(r.cursors))

return nil
}

// Next returns whether any data remains to be read. It must be called before
// the next call to Read().
func (r *Reader) Next() bool {
for {
if r.currCursor == len(r.cursors) {
// All cursors drained. No more data remains.
return false
}

cc := r.cursors[r.currCursor]
k, v := cc.Next()
if k == -1 {
// Go to next cursor and try again.
r.currCursor++
if len(r.valuesBuf) == 0 {
// The previous cursor had no data. Instead of returning
// just go immediately to the next cursor.
continue
}
// There is some data available. Indicate that it should be read.
return true
}

r.keyBuf = tsm1.SeriesFieldKey(cc.series, cc.field)
r.valuesBuf = append(r.valuesBuf, tsdb.ConvertToValue(k, v))
if len(r.valuesBuf) == r.ChunkSize {
return true
}
}

}

// Read returns the next chunk of data in the shard, converted to tsm1 values. Data is
// emitted completely for every field, in every series, before the next field is processed.
// Data from Read() adheres to the requirements for writing to tsm1 shards
func (r *Reader) Read() (string, []tsm1.Value, error) {
defer func() {
r.valuesBuf = nil
}()

return r.keyBuf, r.valuesBuf, nil
}

// Close closes the reader.
func (r *Reader) Close() error {
return r.tx.Rollback()
}

// cursor provides ordered iteration across a series.
type cursor struct {
// Bolt cursor and readahead buffer.
cursor *bolt.Cursor
keyBuf int64
valBuf interface{}

series string
field string
dec *tsdb.FieldCodec
}

// Cursor returns an iterator for a key over a single field.
func newCursor(tx *bolt.Tx, series string, field string, dec *tsdb.FieldCodec) *cursor {
cur := &cursor{
keyBuf: -2,
series: series,
field: field,
dec: dec,
}

// Retrieve series bucket.
b := tx.Bucket([]byte(series))
if b != nil {
cur.cursor = b.Cursor()
}

return cur
}

// Seek moves the cursor to a position.
func (c cursor) SeekTo(seek int64) {
k, v := c.cursor.Seek(u64tob(uint64(seek)))
c.keyBuf, c.valBuf = tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}

// Next returns the next key/value pair from the cursor.
func (c *cursor) Next() (key int64, value interface{}) {
for {
k, v := func() (int64, interface{}) {
if c.keyBuf != -2 {
k, v := c.keyBuf, c.valBuf
c.keyBuf = -2
return k, v
}

k, v := c.cursor.Next()
if k == nil {
return -1, nil
}
return tsdb.DecodeKeyValue(c.field, c.dec, k, v)
}()

if k != -1 && v == nil {
// There is a point in the series at the next timestamp,
// but not for this cursor's field. Go to the next point.
continue
}
return k, v
}
}

// Sort b1 cursors in correct order for writing to TSM files.

type cursors []*cursor

func (a cursors) Len() int { return len(a) }
func (a cursors) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a cursors) Less(i, j int) bool {
return tsm1.SeriesFieldKey(a[i].series, a[i].field) < tsm1.SeriesFieldKey(a[j].series, a[j].field)
}

// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}

// btou64 converts an 8-byte slice to a uint64.
func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) }
Loading

0 comments on commit eaec514

Please sign in to comment.