diff --git a/CHANGELOG.md b/CHANGELOG.md index 49d809667..c038a7cf1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ See the API docs for more details. - [#980](https://github.com/influxdata/kapacitor/pull/980): Upgrade to using go 1.7 - [#957](https://github.com/influxdata/kapacitor/issues/957): Add API endpoints for testing service integrations. - [#958](https://github.com/influxdata/kapacitor/issues/958): Add support for Slack icon emojis and custom usernames. +- [#991](https://github.com/influxdata/kapacitor/pull/991): Bring Kapacitor up to parity with available InfluxQL functions in 1.1 ### Bugfixes diff --git a/influxdb/client.go b/influxdb/client.go index 0d5ff08e7..ec5c44de2 100644 --- a/influxdb/client.go +++ b/influxdb/client.go @@ -510,7 +510,7 @@ type Point struct { // Returns byte array of a line protocol representation of the point func (p Point) Bytes(precision string) []byte { - key := imodels.MakeKey([]byte(p.Name), imodels.Tags(p.Tags)) + key := imodels.MakeKey([]byte(p.Name), imodels.NewTags(p.Tags)) fields := imodels.Fields(p.Fields).MarshalBinary() kl := len(key) fl := len(fields) diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index e972b8d78..97cddc305 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -374,6 +374,54 @@ batch testBatcherWithOutput(t, "TestBatch_MovingAverage", script, 21*time.Second, er, false) } +func TestBatch_CumulativeSum(t *testing.T) { + + var script = ` +batch + |query(''' + SELECT "value" + FROM "telegraf"."default".packets +''') + .period(10s) + .every(10s) + |cumulativeSum('value') + |httpOut('TestBatch_CumulativeSum') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "packets", + Tags: nil, + Columns: []string{"time", "cumulativeSum"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), + 10.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 14, 0, time.UTC), + 30.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 16, 0, time.UTC), + 60.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 18, 0, time.UTC), + 100.0, + }, + }, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_CumulativeSum", script, 31*time.Second, er, false) +} func TestBatch_SimpleMR(t *testing.T) { diff --git a/integrations/data/TestBatch_CumulativeSum.0.brpl b/integrations/data/TestBatch_CumulativeSum.0.brpl new file mode 100644 index 000000000..dd83801e2 --- /dev/null +++ b/integrations/data/TestBatch_CumulativeSum.0.brpl @@ -0,0 +1,50 @@ +{ + "name":"packets", + "points":[ + { + "fields":{"value":1000}, + "time":"2015-10-18T00:00:00Z" + }, + { + "fields":{"value":1005}, + "time":"2015-10-18T00:00:02Z" + }, + { + "fields":{"value":1008}, + "time":"2015-10-18T00:00:04Z" + }, + { + "fields":{"value":1009}, + "time":"2015-10-18T00:00:06Z" + }, + { + "fields":{"value":1004}, + "time":"2015-10-18T00:00:08Z" + } + ] +} +{ + "name":"packets", + "points":[ + { + "fields":{"value":0}, + "time":"2015-10-18T00:00:10Z" + }, + { + "fields":{"value":10}, + "time":"2015-10-18T00:00:12Z" + }, + { + "fields":{"value":20}, + "time":"2015-10-18T00:00:14Z" + }, + { + "fields":{"value":30}, + "time":"2015-10-18T00:00:16Z" + }, + { + "fields":{"value":40}, + "time":"2015-10-18T00:00:18Z" + } + ] +} diff --git a/integrations/data/TestStream_CumulativeSum.srpl b/integrations/data/TestStream_CumulativeSum.srpl new file mode 100644 index 000000000..3284e88c6 --- /dev/null +++ b/integrations/data/TestStream_CumulativeSum.srpl @@ -0,0 +1,33 @@ +dbname +rpname +packets value=00 0000000000 +dbname +rpname +packets value=00 0000000001 +dbname +rpname +packets value=01 0000000002 +dbname +rpname +packets value=02 0000000003 +dbname +rpname +packets value=03 0000000004 +dbname +rpname +packets value=04 0000000005 +dbname +rpname +packets value=05 0000000006 +dbname +rpname +packets value=06 0000000007 +dbname +rpname +packets value=07 0000000008 +dbname +rpname +packets value=08 0000000009 +dbname +rpname +packets value=09 0000000010 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 2f852eb1c..5d9193ec0 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -457,6 +457,72 @@ stream testStreamerWithOutput(t, "TestStream_MovingAverage", script, 16*time.Second, er, false, nil) } +func TestStream_CumulativeSum(t *testing.T) { + var script = ` +stream + |from() + .measurement('packets') + |cumulativeSum('value') + |window() + .period(10s) + .every(10s) + |httpOut('TestStream_CumulativeSum') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "packets", + Tags: nil, + Columns: []string{"time", "cumulativeSum"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 0.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 3.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 6.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 10.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 15.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 21.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 28.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 36.0, + }, + }, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_CumulativeSum", script, 13*time.Second, er, false, nil) +} + func TestStream_WindowMissing(t *testing.T) { var script = ` @@ -7087,7 +7153,7 @@ stream if len(p.Tags()) != 1 { t.Errorf("got %v exp %v", len(p.Tags()), 1) } - if got, exp := p.Tags()["key"], "value"; got != exp { + if got, exp := p.Tags().GetString("key"), "value"; got != exp { t.Errorf("got %s exp %s", got, exp) } tm := time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC) diff --git a/metaclient.go b/metaclient.go index 70cf4141c..4282f08bd 100644 --- a/metaclient.go +++ b/metaclient.go @@ -15,10 +15,10 @@ func (m *NoopMetaClient) WaitForLeader(d time.Duration) error { func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) { return nil, nil } -func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicyInfo) (*meta.DatabaseInfo, error) { +func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { return nil, nil } -func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) { +func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { return nil, nil } func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo { diff --git a/models/point.go b/models/point.go index c3c5d65e3..d125e7455 100644 --- a/models/point.go +++ b/models/point.go @@ -166,7 +166,7 @@ func ToGroupID(name string, tags map[string]string, dims Dimensions) GroupID { // Returns byte array of a line protocol representation of the point func (p Point) Bytes(precision string) []byte { - key := models.MakeKey([]byte(p.Name), models.Tags(p.Tags)) + key := models.MakeKey([]byte(p.Name), models.NewTags(p.Tags)) fields := models.Fields(p.Fields).MarshalBinary() kl := len(key) fl := len(fields) diff --git a/pipeline/influxql.go b/pipeline/influxql.go index 6770c2907..962cbdab8 100644 --- a/pipeline/influxql.go +++ b/pipeline/influxql.go @@ -480,3 +480,21 @@ func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration n.linkChild(i) return i } + +// Compute a cumulative sum of each point that is received. +// A point is emitted for every point collected. +func (n *chainnode) CumulativeSum(field string) *InfluxQLNode { + i := newInfluxQLNode("cumulativeSum", field, n.Provides(), n.Provides(), ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatCumulativeSumReducer() + return fn, fn + }, + CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { + fn := influxql.NewIntegerCumulativeSumReducer() + return fn, fn + }, + IsStreamTransformation: true, + }) + n.linkChild(i) + return i +} diff --git a/replay.go b/replay.go index 149495cfd..2648a87db 100644 --- a/replay.go +++ b/replay.go @@ -98,7 +98,7 @@ func readPointsFromIO(data io.ReadCloser, points chan<- models.Point, precision RetentionPolicy: rp, Name: mp.Name(), Group: models.NilGroup, - Tags: models.Tags(mp.Tags()), + Tags: models.Tags(mp.Tags().Map()), Fields: models.Fields(mp.Fields()), Time: mp.Time().UTC(), } diff --git a/task_master.go b/task_master.go index 071995317..4a037333f 100644 --- a/task_master.go +++ b/task_master.go @@ -556,7 +556,7 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL RetentionPolicy: retentionPolicy, Name: mp.Name(), Group: models.NilGroup, - Tags: models.Tags(mp.Tags()), + Tags: models.Tags(mp.Tags().Map()), Fields: models.Fields(mp.Fields()), Time: mp.Time(), } diff --git a/vendor.list b/vendor.list index 0e5d97508..96842cc5d 100644 --- a/vendor.list +++ b/vendor.list @@ -6,7 +6,7 @@ github.com/dustin/go-humanize github.com/gogo/protobuf github.com/golang/protobuf github.com/gorhill/cronexpr -github.com/influxdata/influxdb 1.0 +github.com/influxdata/influxdb master github.com/influxdata/usage-client github.com/kimor79/gollectd github.com/mattn/go-runewidth diff --git a/vendor/github.com/influxdata/influxdb/.gitignore b/vendor/github.com/influxdata/influxdb/.gitignore index e234f391f..6c13a672d 100644 --- a/vendor/github.com/influxdata/influxdb/.gitignore +++ b/vendor/github.com/influxdata/influxdb/.gitignore @@ -1,23 +1,21 @@ +# Keep editor-specific, non-project specific ignore rules in global .gitignore: +# https://help.github.com/articles/ignoring-files/#create-a-global-gitignore + *~ src/ config.json /bin/ -TAGS - -# vim temp files -*.swp - -*.test /query/a.out* -.DS_Store # ignore generated files. cmd/influxd/version.go # executables +*.test + influx_tsm **/influx_tsm !**/influx_tsm/ @@ -69,15 +67,6 @@ config.toml # test data files integration/migration_data/ -# goide project files -.idea - -# goconvey config files -*.goconvey - -// Ingnore SourceGraph directory -.srclib-store/ - # man outputs man/*.xml man/*.1 diff --git a/vendor/github.com/influxdata/influxdb/.gitrepo b/vendor/github.com/influxdata/influxdb/.gitrepo index b587b2473..3c1fa8618 100644 --- a/vendor/github.com/influxdata/influxdb/.gitrepo +++ b/vendor/github.com/influxdata/influxdb/.gitrepo @@ -5,7 +5,7 @@ ; [subrepo] remote = https://github.com/influxdata/influxdb.git - branch = 1.0 - commit = 5130cd703bbf5876c0cbc6273e01c2404b289dd6 - parent = b382f601e381065c27b4bcf62fb15cc348bf5777 + branch = master + commit = af72d9b0e4ebe95be30e89b160f43eabaf0529ed + parent = 7d5d8bc93906a5d9956ada6f6e89d62b8ca2b399 cmdver = 0.3.0 diff --git a/vendor/github.com/influxdata/influxdb/CHANGELOG.md b/vendor/github.com/influxdata/influxdb/CHANGELOG.md index cf44327eb..1badcec41 100644 --- a/vendor/github.com/influxdata/influxdb/CHANGELOG.md +++ b/vendor/github.com/influxdata/influxdb/CHANGELOG.md @@ -1,4 +1,91 @@ -## v1.0.0 [unreleased] +## v1.1.0 [unreleased] + +### Release Notes + +### Configuration Changes + +The following configuration changes in the `[data]` section may need to changed before upgrading to `1.1.0` from prior versions. + +* `max-values-per-tag` was added with a default of 100,000, but can be disabled by setting it to `0`. Existing measurements with tags that exceed this limit will continue to load, but writes that would cause the tags cardinality to increase will be dropped and a `partial write` error will be returned to the caller. This limit can be used to prevent high cardinality tag values from being written to a measurement. +* `cache-max-memory-size` has been increased to from `524288000` to `1048576000`. This setting is the maximum amount of RAM, in bytes, a shard cache can use before it rejects writes with an error. Setting this value to `0` disables the limit. +* `cache-snapshot-write-cold-duration` has been decreased from `1h` to `10m`. This setting determines how long values will stay in the shard cache while the shard is cold for writes. +* `compact-full-write-cold-duration` has been decreased from `24h` to `4h`. The shorter duration allows cold shards to be compacted to an optimal state more quickly. + +### Features + +- [#7415](https://github.com/influxdata/influxdb/pull/7415): Add sample function to query language. +- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language. +- [#7120](https://github.com/influxdata/influxdb/issues/7120): Add additional statistics to query executor. +- [#7135](https://github.com/influxdata/influxdb/pull/7135): Support enable HTTP service over unix domain socket. Thanks @oiooj +- [#3634](https://github.com/influxdata/influxdb/issues/3634): Support mixed duration units. +- [#7099](https://github.com/influxdata/influxdb/pull/7099): Implement text/csv content encoding for the response writer. +- [#6992](https://github.com/influxdata/influxdb/issues/6992): Support tools for running async queries. +- [#7136](https://github.com/influxdata/influxdb/pull/7136): Update jwt-go dependency to version 3. +- [#6962](https://github.com/influxdata/influxdb/issues/6962): Support ON and use default database for SHOW commands. +- [#7268](https://github.com/influxdata/influxdb/pull/7268): More man pages for the other tools we package and compress man pages fully. +- [#7305](https://github.com/influxdata/influxdb/pull/7305): UDP Client: Split large points. Thanks @vlasad +- [#7115](https://github.com/influxdata/influxdb/issues/7115): Feature request: `influx inspect -export` should dump WAL files. +- [#7388](https://github.com/influxdata/influxdb/pull/7388): Implement cumulative_sum() function. +- [#7441](https://github.com/influxdata/influxdb/pull/7441): Speed up shutdown by closing shards concurrently. +- [#7146](https://github.com/influxdata/influxdb/issues/7146): Add max-values-per-tag to limit high tag cardinality data +- [#5955](https://github.com/influxdata/influxdb/issues/5955): Make regex work on field and dimension keys in SELECT clause. +- [#7470](https://github.com/influxdata/influxdb/pull/7470): Reduce map allocations when computing the TagSet of a measurement. +- [#6894](https://github.com/influxdata/influxdb/issues/6894): Support `INFLUX_USERNAME` and `INFLUX_PASSWORD` for setting username/password in the CLI. +- [#6896](https://github.com/influxdata/influxdb/issues/6896): Correctly read in input from a non-interactive stream for the CLI. +- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent. +- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time. +- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors. +- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series. +- [#7480](https://github.com/influxdata/influxdb/pull/7480): Improve compaction planning performance by caching tsm file stats. +- [#7320](https://github.com/influxdata/influxdb/issues/7320): Update defaults in config for latest best practices +- [#7495](https://github.com/influxdata/influxdb/pull/7495): Rewrite regexes of the form host = /^server-a$/ to host = 'server-a', to take advantage of the tsdb index. + +### Bugfixes + +- [#7392](https://github.com/influxdata/influxdb/pull/7392): Enable https subscriptions to work with custom CA certificates. +- [#1834](https://github.com/influxdata/influxdb/issues/1834): Drop time when used as a tag or field key. +- [#7152](https://github.com/influxdata/influxdb/issues/7152): Decrement number of measurements only once when deleting the last series from a measurement. +- [#7177](https://github.com/influxdata/influxdb/issues/7177): Fix base64 encoding issue with /debug/vars stats. +- [#7196](https://github.com/influxdata/influxdb/issues/7196): Fix mmap dereferencing, fixes #7183, #7180 +- [#7013](https://github.com/influxdata/influxdb/issues/7013): Fix the dollar sign so it properly handles reserved keywords. +- [#7297](https://github.com/influxdata/influxdb/issues/7297): Use consistent column output from the CLI for column formatted responses. +- [#7231](https://github.com/influxdata/influxdb/issues/7231): Duplicate parsing bug in ALTER RETENTION POLICY. +- [#7285](https://github.com/influxdata/influxdb/issues/7285): Correctly use password-type field in Admin UI. Thanks @dandv! +- [#2792](https://github.com/influxdata/influxdb/issues/2792): Exceeding max retention policy duration gives incorrect error message +- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards +- [#7382](https://github.com/influxdata/influxdb/issues/7382): Shard stats include wal path tag so disk bytes make more sense. +- [#7385](https://github.com/influxdata/influxdb/pull/7385): Reduce query planning allocations +- [#7436](https://github.com/influxdata/influxdb/issues/7436): Remove accidentally added string support for the stddev call. +- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error. +- [#7334](https://github.com/influxdata/influxdb/issues/7334): Panic with unread show series iterators during drop database +- [#7482](https://github.com/influxdata/influxdb/issues/7482): Fix issue where point would be written to wrong shard. +- [#7431](https://github.com/influxdata/influxdb/issues/7431): Remove /data/process_continuous_queries endpoint. +- [#7053](https://github.com/influxdata/influxdb/issues/7053): Delete statement returns an error when retention policy or database is specified + +## v1.0.2 [2016-10-05] + +### Bugfixes + +- [#7150](https://github.com/influxdata/influxdb/issues/7150): Do not automatically reset the shard duration when using ALTER RETENTION POLICY +- [#5878](https://github.com/influxdata/influxdb/issues/5878): Ensure correct shard groups created when retention policy has been altered. +- [#7391](https://github.com/influxdata/influxdb/issues/7391): Fix RLE integer decoding producing negative numbers +- [#7335](https://github.com/influxdata/influxdb/pull/7335): Avoid stat syscall when planning compactions +- [#7330](https://github.com/influxdata/influxdb/issues/7330): Subscription data loss under high write load + +## v1.0.1 [2016-09-26] + +### Bugfixes + +- [#7271](https://github.com/influxdata/influxdb/issues/7271): Fixing typo within example configuration file. Thanks @andyfeller! +- [#7270](https://github.com/influxdata/influxdb/issues/7270): Implement time math for lazy time literals. +- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. +- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. +- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. +- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series. +- [#7226](https://github.com/influxdata/influxdb/issues/7226): Fix database locked up when deleting shards +- [#7315](https://github.com/influxdata/influxdb/issues/7315): Prevent users from manually using system queries since incorrect use would result in a panic. + +## v1.0.0 [2016-09-08] ### Release Notes @@ -9,6 +96,7 @@ * Support for config options `[collectd]` and `[opentsdb]` has been removed; use `[[collectd]]` and `[[opentsdb]]` instead. * Config option `data-logging-enabled` within the `[data]` section, has been renamed to `trace-logging-enabled`, and defaults to `false`. * The keywords `IF`, `EXISTS`, and `NOT` where removed for this release. This means you no longer need to specify `IF NOT EXISTS` for `DROP DATABASE` or `IF EXISTS` for `CREATE DATABASE`. If these are specified, a query parse error is returned. +* The Shard `writePointsFail` stat has been renamed to `writePointsErr` for consistency with other stats. With this release the systemd configuration files for InfluxDB will use the system configured default for logging and will no longer write files to `/var/log/influxdb` by default. On most systems, the logs will be directed to the systemd journal and can be accessed by `journalctl -u influxdb.service`. Consult the systemd journald documentation for configuring journald. @@ -48,9 +136,11 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7011](https://github.com/influxdata/influxdb/issues/7011): Create man pages for commands. - [#7050](https://github.com/influxdata/influxdb/pull/7050): Update go package library dependencies. - [#5750](https://github.com/influxdata/influxdb/issues/5750): Support wildcards in aggregate functions. -- [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language. +- [#7065](https://github.com/influxdata/influxdb/issues/7065): Remove IF EXISTS/IF NOT EXISTS from influxql language. - [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting. - [#7199](https://github.com/influxdata/influxdb/pull/7199): Add mode function. Thanks @agaurav. +- [#7194](https://github.com/influxdata/influxdb/issues/7194): Support negative timestamps for the query engine. +- [#7172](https://github.com/influxdata/influxdb/pull/7172): Write path stats ### Bugfixes @@ -119,11 +209,17 @@ With this release the systemd configuration files for InfluxDB will use the syst - [#7084](https://github.com/influxdata/influxdb/pull/7084): Tombstone memory improvements - [#6543](https://github.com/influxdata/influxdb/issues/6543): Fix parseFill to check for fill ident before attempting to parse an expression. - [#7032](https://github.com/influxdata/influxdb/pull/7032): Copy tags in influx_stress to avoid a concurrent write panic on a map. -- [#7107](https://github.com/influxdata/influxdb/pull/7107): Limit shard concurrency - [#7028](https://github.com/influxdata/influxdb/pull/7028): Do not run continuous queries that have no time span. - [#7025](https://github.com/influxdata/influxdb/issues/7025): Move the CQ interval by the group by offset. - [#7125](https://github.com/influxdata/influxdb/pull/7125): Ensure gzip writer is closed in influx_inspect export - [#7127](https://github.com/influxdata/influxdb/pull/7127): Concurrent series limit +- [#7119](https://github.com/influxdata/influxdb/pull/7119): Fix CREATE DATABASE when dealing with default values. +- [#7218](https://github.com/influxdata/influxdb/issues/7218): Fix alter retention policy when all options are used. +- [#7225](https://github.com/influxdata/influxdb/issues/7225): runtime: goroutine stack exceeds 1000000000-byte limit +- [#7240](https://github.com/influxdata/influxdb/issues/7240): Allow blank lines in the line protocol input. +- [#7119](https://github.com/influxdata/influxdb/pull/7119): Fix CREATE DATABASE when dealing with default values. +- [#7243](https://github.com/influxdata/influxdb/issues/7243): Optimize queries that compare a tag value to an empty string. +- [#7074](https://github.com/influxdata/influxdb/issues/7074): Continuous full compactions ## v0.13.0 [2016-05-12] @@ -497,7 +593,7 @@ There are breaking changes in this release: - Scripts are now located in `/usr/lib/influxdb/scripts` (previously `/opt/influxdb`) ### Features -- [#4098](https://github.com/influxdata/influxdb/pull/4702): Support 'history' command at CLI +- [#4702](https://github.com/influxdata/influxdb/pull/4702): Support 'history' command at CLI - [#4098](https://github.com/influxdata/influxdb/issues/4098): Enable `golint` on the code base - uuid subpackage - [#4141](https://github.com/influxdata/influxdb/pull/4141): Control whether each query should be logged - [#4065](https://github.com/influxdata/influxdb/pull/4065): Added precision support in cmd client. Thanks @sbouchex diff --git a/vendor/github.com/influxdata/influxdb/DOCKER.md b/vendor/github.com/influxdata/influxdb/DOCKER.md index 567768d2c..6fa2196b8 100644 --- a/vendor/github.com/influxdata/influxdb/DOCKER.md +++ b/vendor/github.com/influxdata/influxdb/DOCKER.md @@ -11,12 +11,12 @@ To build a docker image for InfluxDB from your current checkout, run the followi $ ./build-docker.sh ``` -This script uses the `golang:1.5` image to build a fully static binary of `influxd` and then adds it to a minimal `scratch` image. +This script uses the `golang:1.7.1` image to build a fully static binary of `influxd` and then adds it to a minimal `scratch` image. To build the image using a different version of go: ``` -$ GO_VER=1.4.2 ./build-docker.sh +$ GO_VER=1.7.1 ./build-docker.sh ``` Available version can be found [here](https://hub.docker.com/_/golang/). diff --git a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu32 b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu32 index 5f556923e..058897796 100644 --- a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu32 +++ b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu32 @@ -1,4 +1,4 @@ -FROM 32bit/ubuntu:14.04 +FROM ioft/i386-ubuntu:14.04 RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ python-software-properties \ @@ -18,7 +18,7 @@ RUN gem install fpm # Install go ENV GOPATH /root/go -ENV GO_VERSION 1.6.2 +ENV GO_VERSION 1.7.1 ENV GO_ARCH 386 RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \ tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \ diff --git a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64 b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64 index 1be43b236..638ecef63 100644 --- a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64 +++ b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64 @@ -21,7 +21,7 @@ RUN gem install fpm # Install go ENV GOPATH /root/go -ENV GO_VERSION 1.6.2 +ENV GO_VERSION 1.7.1 ENV GO_ARCH amd64 RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \ tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \ diff --git a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64_git b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64_git index 0fd7ecfdc..08716183e 100644 --- a/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64_git +++ b/vendor/github.com/influxdata/influxdb/Dockerfile_build_ubuntu64_git @@ -26,7 +26,7 @@ VOLUME $PROJECT_DIR # Install go -ENV GO_VERSION 1.6.2 +ENV GO_VERSION 1.7.1 ENV GO_ARCH amd64 RUN wget https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \ tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \ diff --git a/vendor/github.com/influxdata/influxdb/Godeps b/vendor/github.com/influxdata/influxdb/Godeps index 733189b01..e6a929bb2 100644 --- a/vendor/github.com/influxdata/influxdb/Godeps +++ b/vendor/github.com/influxdata/influxdb/Godeps @@ -3,13 +3,13 @@ github.com/BurntSushi/toml 99064174e013895bbd9b025c31100bd1d9b590ca github.com/bmizerany/pat c068ca2f0aacee5ac3681d68e4d0a003b7d1fd2c github.com/boltdb/bolt 5cc10bbbc5c141029940133bb33c9e969512a698 github.com/davecgh/go-spew 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d -github.com/dgrijalva/jwt-go 9b486c879bab3fde556ce8c27d9a2bb05d5b2c60 +github.com/dgrijalva/jwt-go 63734eae1ef55eaac06fdc0f312615f2e321e273 github.com/dgryski/go-bits 2ad8d707cc05b1815ce6ff2543bb5e8d8f9298ef github.com/dgryski/go-bitstream 7d46cd22db7004f0cceb6f7975824b560cf0e486 -github.com/gogo/protobuf 6abcf94fd4c97dcb423fdafd42fe9f96ca7e421b +github.com/gogo/protobuf 0394392b81058a7f972029451f06e528bb18ba50 github.com/golang/snappy d9eb7a3d35ec988b8585d4a0068e462c27d28380 -github.com/influxdata/usage-client 475977e68d79883d9c8d67131c84e4241523f452 -github.com/jwilder/encoding ac74639f65b2180a2e5eb5ff197f0c122441aed0 +github.com/influxdata/usage-client 6d3895376368aa52a3a81d2a16e90f0f52371967 +github.com/jwilder/encoding 4dada27c33277820fe35c7ee71ed34fbc9477d00 github.com/kimor79/gollectd 61d0deeb4ffcc167b2a1baa8efd72365692811bc github.com/paulbellamy/ratecounter 5a11f585a31379765c190c033b6ad39956584447 github.com/peterh/liner 8975875355a81d612fafb9f5a6037bdcc2d9b073 diff --git a/vendor/github.com/influxdata/influxdb/TODO.md b/vendor/github.com/influxdata/influxdb/TODO.md new file mode 100644 index 000000000..56b5294b4 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/TODO.md @@ -0,0 +1,9 @@ +# TODO + +## v2 + +TODO list for v2. Here is a list of things we want to add to v1, but can't because they would be a breaking change. + +- [#1834](https://github.com/influxdata/influxdb/issues/1834): Disallow using time as a tag key or field key. +- [#2124](https://github.com/influxdata/influxdb/issues/2124): Prohibit writes with precision, but without an explicit timestamp. +- [#4461](https://github.com/influxdata/influxdb/issues/4461): Change default time boundaries. diff --git a/vendor/github.com/influxdata/influxdb/appveyor.yml b/vendor/github.com/influxdata/influxdb/appveyor.yml index 6e26c9554..05f77efbc 100644 --- a/vendor/github.com/influxdata/influxdb/appveyor.yml +++ b/vendor/github.com/influxdata/influxdb/appveyor.yml @@ -12,13 +12,13 @@ clone_folder: c:\gopath\src\github.com\influxdata\influxdb # Environment variables environment: - #AppVeyor has go 1.6 as default go environment - GOROOT: C:\go + GOROOT: C:\go17 GOPATH: C:\gopath # Scripts that run after cloning repository install: - set PATH=%GOROOT%\bin;%GOPATH%\bin;%PATH% + - rmdir c:\go /s /q - echo %PATH% - echo %GOPATH% - cd C:\gopath\src\github.com\influxdata\influxdb @@ -28,10 +28,10 @@ install: - cd C:\gopath\src\github.com\influxdata\influxdb - gdm restore -# To run your custom scripts instead of automatic MSBuild +# To run your custom scripts instead of automatic MSBuild build_script: - go get -t -v ./... - go test -race -v ./... - + # To disable deployment deploy: off diff --git a/vendor/github.com/influxdata/influxdb/build-docker.sh b/vendor/github.com/influxdata/influxdb/build-docker.sh index 54f6db0c1..75a4b5a34 100755 --- a/vendor/github.com/influxdata/influxdb/build-docker.sh +++ b/vendor/github.com/influxdata/influxdb/build-docker.sh @@ -2,7 +2,7 @@ set -e -x -GO_VER=${GO_VER:-1.4.3} +GO_VER=${GO_VER:-1.7.1} docker run -it -v "${GOPATH}":/gopath -v "$(pwd)":/app -e "GOPATH=/gopath" -w /app golang:$GO_VER sh -c 'CGO_ENABLED=0 go build -a --installsuffix cgo --ldflags="-s" -o influxd ./cmd/influxd' diff --git a/vendor/github.com/influxdata/influxdb/build.py b/vendor/github.com/influxdata/influxdb/build.py index 5d95ab198..97055bf3e 100755 --- a/vendor/github.com/influxdata/influxdb/build.py +++ b/vendor/github.com/influxdata/influxdb/build.py @@ -49,7 +49,7 @@ DESCRIPTION = "Distributed time-series database." prereqs = [ 'git', 'go' ] -go_vet_command = "go tool vet -example=false ./" +go_vet_command = "go tool vet ./" optional_prereqs = [ 'fpm', 'rpmbuild', 'gpg' ] fpm_common_args = "-f -s dir --log error \ @@ -154,7 +154,7 @@ def package_man_files(build_root): run("make -C man/ clean install DESTDIR={}/usr".format(build_root)) for path, dir, files in os.walk(os.path.join(build_root, MAN_DIR[1:])): for f in files: - run("gzip {}".format(os.path.join(path, f))) + run("gzip -9n {}".format(os.path.join(path, f))) def run_generate(): """Run 'go generate' to rebuild any static assets. diff --git a/vendor/github.com/influxdata/influxdb/circle.yml b/vendor/github.com/influxdata/influxdb/circle.yml index a298c392e..613e5e841 100644 --- a/vendor/github.com/influxdata/influxdb/circle.yml +++ b/vendor/github.com/influxdata/influxdb/circle.yml @@ -2,7 +2,7 @@ machine: services: - docker environment: - GODIST: "go1.6.2.linux-amd64.tar.gz" + GODIST: "go1.7.1.linux-amd64.tar.gz" post: - mkdir -p download - test -e download/$GODIST || curl -o download/$GODIST https://storage.googleapis.com/golang/$GODIST diff --git a/vendor/github.com/influxdata/influxdb/client/README.md b/vendor/github.com/influxdata/influxdb/client/README.md index e11eaee93..e55c43657 100644 --- a/vendor/github.com/influxdata/influxdb/client/README.md +++ b/vendor/github.com/influxdata/influxdb/client/README.md @@ -255,6 +255,28 @@ func WriteUDP() { } ``` +### Point Splitting + +The UDP client now supports splitting single points that exceed the configured +payload size. The logic for processing each point is listed here, starting with +an empty payload. + +1. If adding the point to the current (non-empty) payload would exceed the + configured size, send the current payload. Otherwise, add it to the current + payload. +1. If the point is smaller than the configured size, add it to the payload. +1. If the point has no timestamp, just try to send the entire point as a single + UDP payload, and process the next point. +1. Since the point has a timestamp, re-use the existing measurement name, + tagset, and timestamp and create multiple new points by splitting up the + fields. The per-point length will be kept close to the configured size, + staying under it if possible. This does mean that one large field, maybe a + long string, could be sent as a larger-than-configured payload. + +The above logic attempts to respect configured payload sizes, but not sacrifice +any data integrity. Points without a timestamp can't be split, as that may +cause fields to have differing timestamps when processed by the server. + ## Go Docs Please refer to diff --git a/vendor/github.com/influxdata/influxdb/client/influxdb.go b/vendor/github.com/influxdata/influxdb/client/influxdb.go index 90695b9ed..623276dd5 100644 --- a/vendor/github.com/influxdata/influxdb/client/influxdb.go +++ b/vendor/github.com/influxdata/influxdb/client/influxdb.go @@ -498,17 +498,36 @@ func (r *Response) Error() error { return nil } +// duplexReader reads responses and writes it to another writer while +// satisfying the reader interface. +type duplexReader struct { + r io.Reader + w io.Writer +} + +func (r *duplexReader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil { + r.w.Write(p[:n]) + } + return n, err +} + // ChunkedResponse represents a response from the server that // uses chunking to stream the output. type ChunkedResponse struct { - dec *json.Decoder + dec *json.Decoder + duplex *duplexReader + buf bytes.Buffer } // NewChunkedResponse reads a stream and produces responses from the stream. func NewChunkedResponse(r io.Reader) *ChunkedResponse { - dec := json.NewDecoder(r) - dec.UseNumber() - return &ChunkedResponse{dec: dec} + resp := &ChunkedResponse{} + resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.dec = json.NewDecoder(resp.duplex) + resp.dec.UseNumber() + return resp } // NextResponse reads the next line of the stream and returns a response. @@ -518,8 +537,13 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) { if err == io.EOF { return nil, nil } - return nil, err + // A decoding error happened. This probably means the server crashed + // and sent a last-ditch error message to us. Ensure we have read the + // entirety of the connection to get any remaining error text. + io.Copy(ioutil.Discard, r.duplex) + return nil, errors.New(strings.TrimSpace(r.buf.String())) } + r.buf.Reset() return &response, nil } @@ -562,7 +586,7 @@ func (p *Point) MarshalJSON() ([]byte, error) { // MarshalString renders string representation of a Point with specified // precision. The default precision is nanoseconds. func (p *Point) MarshalString() string { - pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time) + pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time) if err != nil { return "# ERROR: " + err.Error() + " " + p.Measurement } diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb/client/v2/client.go index 1ed78e315..becafd734 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/client.go +++ b/vendor/github.com/influxdata/influxdb/client/v2/client.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io/ioutil" - "net" "net/http" "net/url" "time" @@ -15,12 +14,6 @@ import ( "github.com/influxdata/influxdb/models" ) -// UDPPayloadSize is a reasonable default payload size for UDP packets that -// could be travelling over the internet. -const ( - UDPPayloadSize = 512 -) - // HTTPConfig is the config data needed to create an HTTP Client type HTTPConfig struct { // Addr should be of the form "http://host:port" @@ -48,17 +41,6 @@ type HTTPConfig struct { TLSConfig *tls.Config } -// UDPConfig is the config data needed to create a UDP Client -type UDPConfig struct { - // Addr should be of the form "host:port" - // or "[ipv6-host%zone]:port". - Addr string - - // PayloadSize is the maximum size of a UDP client message, optional - // Tune this based on your network. Defaults to UDPBufferSize. - PayloadSize int -} - // BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct type BatchPointsConfig struct { // Precision is the write precision of the points, defaults to "ns" @@ -76,7 +58,8 @@ type BatchPointsConfig struct { // Client is a client interface for writing & querying the database type Client interface { - // Ping checks that status of cluster + // Ping checks that status of cluster, and will always return 0 time and no + // error for UDP clients Ping(timeout time.Duration) (time.Duration, string, error) // Write takes a BatchPoints object and writes all Points to InfluxDB. @@ -177,42 +160,6 @@ func (c *client) Close() error { return nil } -// NewUDPClient returns a client interface for writing to an InfluxDB UDP -// service from the given config. -func NewUDPClient(conf UDPConfig) (Client, error) { - var udpAddr *net.UDPAddr - udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr) - if err != nil { - return nil, err - } - - conn, err := net.DialUDP("udp", nil, udpAddr) - if err != nil { - return nil, err - } - - payloadSize := conf.PayloadSize - if payloadSize == 0 { - payloadSize = UDPPayloadSize - } - - return &udpclient{ - conn: conn, - payloadSize: payloadSize, - }, nil -} - -// Ping will check to see if the server is up with an optional timeout on waiting for leader. -// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. -func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { - return 0, "", nil -} - -// Close releases the udpclient's resources. -func (uc *udpclient) Close() error { - return uc.conn.Close() -} - // client is safe for concurrent use as the fields are all read-only // once the client is instantiated. type client struct { @@ -226,11 +173,6 @@ type client struct { transport *http.Transport } -type udpclient struct { - conn *net.UDPConn - payloadSize int -} - // BatchPoints is an interface into a batched grouping of points to write into // InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate // batch for each goroutine. @@ -356,7 +298,7 @@ func NewPoint( T = t[0] } - pt, err := models.NewPoint(name, tags, fields, T) + pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) if err != nil { return nil, err } @@ -382,7 +324,7 @@ func (p *Point) Name() string { // Tags returns the tags associated with the point func (p *Point) Tags() map[string]string { - return p.pt.Tags() + return p.pt.Tags().Map() } // Time return the timestamp for the point @@ -405,31 +347,6 @@ func NewPointFrom(pt models.Point) *Point { return &Point{pt: pt} } -func (uc *udpclient) Write(bp BatchPoints) error { - var b bytes.Buffer - var d time.Duration - d, _ = time.ParseDuration("1" + bp.Precision()) - - for _, p := range bp.Points() { - pointstring := p.pt.RoundedString(d) + "\n" - - // Write and reset the buffer if we reach the max size - if b.Len()+len(pointstring) >= uc.payloadSize { - if _, err := uc.conn.Write(b.Bytes()); err != nil { - return err - } - b.Reset() - } - - if _, err := b.WriteString(pointstring); err != nil { - return err - } - } - - _, err := uc.conn.Write(b.Bytes()) - return err -} - func (c *client) Write(bp BatchPoints) error { var b bytes.Buffer @@ -532,10 +449,6 @@ type Result struct { Err string `json:"error,omitempty"` } -func (uc *udpclient) Query(q Query) (*Response, error) { - return nil, fmt.Errorf("Querying via UDP is not supported") -} - // Query sends a command to the server and returns the Response func (c *client) Query(q Query) (*Response, error) { u := c.url diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client_test.go b/vendor/github.com/influxdata/influxdb/client/v2/client_test.go index d25bf9c59..30ee09df4 100644 --- a/vendor/github.com/influxdata/influxdb/client/v2/client_test.go +++ b/vendor/github.com/influxdata/influxdb/client/v2/client_test.go @@ -72,6 +72,68 @@ func TestUDPClient_BadAddr(t *testing.T) { } } +func TestUDPClient_Batches(t *testing.T) { + var logger writeLogger + var cl udpclient + + cl.conn = &logger + cl.payloadSize = 20 // should allow for two points per batch + + // expected point should look like this: "cpu a=1i" + fields := map[string]interface{}{"a": 1} + + p, _ := NewPoint("cpu", nil, fields, time.Time{}) + + bp, _ := NewBatchPoints(BatchPointsConfig{}) + + for i := 0; i < 9; i++ { + bp.AddPoint(p) + } + + if err := cl.Write(bp); err != nil { + t.Fatalf("Unexpected error during Write: %v", err) + } + + if len(logger.writes) != 5 { + t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), 5) + } +} + +func TestUDPClient_Split(t *testing.T) { + var logger writeLogger + var cl udpclient + + cl.conn = &logger + cl.payloadSize = 1 // force one field per point + + fields := map[string]interface{}{"a": 1, "b": 2, "c": 3, "d": 4} + + p, _ := NewPoint("cpu", nil, fields, time.Unix(1, 0)) + + bp, _ := NewBatchPoints(BatchPointsConfig{}) + + bp.AddPoint(p) + + if err := cl.Write(bp); err != nil { + t.Fatalf("Unexpected error during Write: %v", err) + } + + if len(logger.writes) != len(fields) { + t.Errorf("Mismatched write count: got %v, exp %v", len(logger.writes), len(fields)) + } +} + +type writeLogger struct { + writes [][]byte +} + +func (w *writeLogger) Write(b []byte) (int, error) { + w.writes = append(w.writes, append([]byte(nil), b...)) + return len(b), nil +} + +func (w *writeLogger) Close() error { return nil } + func TestClient_Query(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var data Response diff --git a/vendor/github.com/influxdata/influxdb/client/v2/udp.go b/vendor/github.com/influxdata/influxdb/client/v2/udp.go new file mode 100644 index 000000000..aff07d0f2 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/client/v2/udp.go @@ -0,0 +1,112 @@ +package client + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + // UDPPayloadSize is a reasonable default payload size for UDP packets that + // could be travelling over the internet. + UDPPayloadSize = 512 +) + +// UDPConfig is the config data needed to create a UDP Client +type UDPConfig struct { + // Addr should be of the form "host:port" + // or "[ipv6-host%zone]:port". + Addr string + + // PayloadSize is the maximum size of a UDP client message, optional + // Tune this based on your network. Defaults to UDPPayloadSize. + PayloadSize int +} + +// NewUDPClient returns a client interface for writing to an InfluxDB UDP +// service from the given config. +func NewUDPClient(conf UDPConfig) (Client, error) { + var udpAddr *net.UDPAddr + udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr) + if err != nil { + return nil, err + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + + payloadSize := conf.PayloadSize + if payloadSize == 0 { + payloadSize = UDPPayloadSize + } + + return &udpclient{ + conn: conn, + payloadSize: payloadSize, + }, nil +} + +// Close releases the udpclient's resources. +func (uc *udpclient) Close() error { + return uc.conn.Close() +} + +type udpclient struct { + conn io.WriteCloser + payloadSize int +} + +func (uc *udpclient) Write(bp BatchPoints) error { + var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed + var d, _ = time.ParseDuration("1" + bp.Precision()) + + var delayedError error + + var checkBuffer = func(n int) { + if len(b) > 0 && len(b)+n > uc.payloadSize { + if _, err := uc.conn.Write(b); err != nil { + delayedError = err + } + b = b[:0] + } + } + + for _, p := range bp.Points() { + p.pt.Round(d) + pointSize := p.pt.StringSize() + 1 // include newline in size + //point := p.pt.RoundedString(d) + "\n" + + checkBuffer(pointSize) + + if p.Time().IsZero() || pointSize <= uc.payloadSize { + b = p.pt.AppendString(b) + b = append(b, '\n') + continue + } + + points := p.pt.Split(uc.payloadSize - 1) // account for newline character + for _, sp := range points { + checkBuffer(sp.StringSize() + 1) + b = sp.AppendString(b) + b = append(b, '\n') + } + } + + if len(b) > 0 { + if _, err := uc.conn.Write(b); err != nil { + return err + } + } + return delayedError +} + +func (uc *udpclient) Query(q Query) (*Response, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + +func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { + return 0, "", nil +} diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli.go b/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli.go index 409fb3299..a9e044b0b 100644 --- a/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli.go +++ b/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" "net/url" "os" @@ -18,6 +19,8 @@ import ( "syscall" "text/tabwriter" + "golang.org/x/crypto/ssh/terminal" + "github.com/influxdata/influxdb/client" "github.com/influxdata/influxdb/importer/v8" "github.com/influxdata/influxdb/models" @@ -58,6 +61,7 @@ type CommandLine struct { Chunked bool Quit chan struct{} IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing) + ForceTTY bool // Force the CLI to act as if it were connected to a TTY osSignals chan os.Signal historyFilePath string } @@ -73,6 +77,22 @@ func New(version string) *CommandLine { // Run executes the CLI func (c *CommandLine) Run() error { + // If we are not running in an interactive terminal, read stdin completely + // and execute a query. Do not allow meta commands. + if !c.ForceTTY && !terminal.IsTerminal(int(os.Stdin.Fd())) { + if err := c.Connect(""); err != nil { + return fmt.Errorf( + "Failed to connect to %s\nPlease check your connection settings and ensure 'influxd' is running.", + c.Client.Addr()) + } + + cmd, err := ioutil.ReadAll(os.Stdin) + if err != nil { + return err + } + return c.ExecuteQuery(string(cmd)) + } + if !c.IgnoreSignals { // register OS signals for graceful termination signal.Notify(c.osSignals, syscall.SIGINT, syscall.SIGTERM) @@ -102,6 +122,15 @@ func (c *CommandLine) Run() error { } } + // Read environment variables for username/password. + if c.Username == "" { + c.Username = os.Getenv("INFLUX_USERNAME") + } + // If we prompted for a password, always use the entered password. + if !promptForPassword && c.Password == "" { + c.Password = os.Getenv("INFLUX_PASSWORD") + } + if err := c.Connect(""); err != nil { return fmt.Errorf( "Failed to connect to %s\nPlease check your connection settings and ensure 'influxd' is running.", @@ -200,7 +229,7 @@ func (c *CommandLine) mainLoop() error { c.exit() return e } - if err := c.ParseCommand(l); err != ErrBlankCommand { + if err := c.ParseCommand(l); err != ErrBlankCommand && !strings.HasPrefix(strings.TrimSpace(l), "auth") { c.Line.AppendHistory(l) c.saveHistory() } @@ -570,7 +599,8 @@ func (c *CommandLine) DatabaseToken() (string, error) { if err != nil { return "", err } - if response.Error() != nil || len((*response).Results[0].Series) == 0 { + + if response.Error() != nil || len(response.Results) == 0 || len(response.Results[0].Series) == 0 { return "", nil } @@ -645,7 +675,7 @@ func (c *CommandLine) writeColumns(response *client.Response, w io.Writer) { // formatResults will behave differently if you are formatting for columns or csv func (c *CommandLine) formatResults(result client.Result, separator string) []string { rows := []string{} - // Create a tabbed writer for each result a they won't always line up + // Create a tabbed writer for each result as they won't always line up for i, row := range result.Series { // gather tags tags := []string{} @@ -676,15 +706,11 @@ func (c *CommandLine) formatResults(result client.Result, separator string) []st rows = append(rows, "") } - // If we are column format, we break out the name/tag to seperate lines + // If we are column format, we break out the name/tag to separate lines if c.Format == "column" { if row.Name != "" { n := fmt.Sprintf("name: %s", row.Name) rows = append(rows, n) - if len(tags) == 0 { - l := strings.Repeat("-", len(n)) - rows = append(rows, l) - } } if len(tags) > 0 { t := fmt.Sprintf("tags: %s", (strings.Join(tags, ", "))) @@ -694,8 +720,8 @@ func (c *CommandLine) formatResults(result client.Result, separator string) []st rows = append(rows, strings.Join(columnNames, separator)) - // if format is column, break tags to their own line/format - if c.Format == "column" && len(tags) > 0 { + // if format is column, write dashes under each column + if c.Format == "column" { lines := []string{} for _, columnName := range columnNames { lines = append(lines, strings.Repeat("-", len(columnName))) @@ -719,7 +745,7 @@ func (c *CommandLine) formatResults(result client.Result, separator string) []st } rows = append(rows, strings.Join(values, separator)) } - // Outout a line separator if in column format + // Output a line separator if in column format if c.Format == "column" { rows = append(rows, "") } @@ -745,7 +771,7 @@ func interfaceToString(v interface{}) string { // Settings prints current settings func (c *CommandLine) Settings() { w := new(tabwriter.Writer) - w.Init(os.Stdout, 0, 8, 1, '\t', 0) + w.Init(os.Stdout, 0, 1, 1, '\t', 0) if c.Port > 0 { fmt.Fprintf(w, "Host\t%s:%d\n", c.Host, c.Port) } else { diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli_test.go b/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli_test.go index c2c7edae7..2e1816acd 100644 --- a/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli_test.go +++ b/vendor/github.com/influxdata/influxdb/cmd/influx/cli/cli_test.go @@ -48,6 +48,7 @@ func TestRunCLI(t *testing.T) { c.Host = h c.Port, _ = strconv.Atoi(p) c.IgnoreSignals = true + c.ForceTTY = true go func() { close(c.Quit) }() @@ -69,6 +70,7 @@ func TestRunCLI_ExecuteInsert(t *testing.T) { c.Precision = "ms" c.Execute = "INSERT sensor,floor=1 value=2" c.IgnoreSignals = true + c.ForceTTY = true if err := c.Run(); err != nil { t.Fatalf("Run failed with error: %s", err) } diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/README.md b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/README.md index 31453652e..2954fc120 100644 --- a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/README.md +++ b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/README.md @@ -38,26 +38,37 @@ Only display index and block data match this key substring. Exports all tsm files to line protocol. This output file can be imported via the [influx](https://github.com/influxdata/influxdb/tree/master/importer#running-the-import-command) command. -#### `-dir` string -Root storage path. +#### `-datadir` string +Data storage path. -`default` = "$HOME/.influxdb" +`default` = "$HOME/.influxdb/data" + +#### `-waldir` string +WAL storage path. + +`default` = "$HOME/.influxdb/wal" #### `-out` string Destination file to export to `default` = "$HOME/.influxdb/export" -#### `-db` string (optional) +#### `-database` string (optional) Database to export. `default` = "" -#### `-rp` string (optional) +#### `-retention` string (optional) Retention policy to export. `default` = "" +#### `-start` string (optional) +Optional. The time range to start at. + +#### `-end` string (optional) +Optional. The time range to end at. + #### `-compress` bool (optional) Compress the output. diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/tsm.go b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm.go similarity index 69% rename from vendor/github.com/influxdata/influxdb/cmd/influx_inspect/tsm.go rename to vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm.go index 5fea93aed..424df2a63 100644 --- a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/tsm.go +++ b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm.go @@ -1,8 +1,10 @@ -package main +package dumptsm import ( "encoding/binary" + "flag" "fmt" + "io" "os" "strconv" "strings" @@ -12,85 +14,71 @@ import ( "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) -type tsdmDumpOpts struct { +// Command represents the program execution for "influxd dumptsm". +type Command struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + dumpIndex bool dumpBlocks bool + dumpAll bool filterKey string path string } -type blockStats struct { - min, max int - counts [][]int -} - -func (b *blockStats) inc(typ int, enc byte) { - for len(b.counts) <= typ { - b.counts = append(b.counts, []int{}) - } - for len(b.counts[typ]) <= int(enc) { - b.counts[typ] = append(b.counts[typ], 0) +// NewCommand returns a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, } - b.counts[typ][enc]++ } -func (b *blockStats) size(sz int) { - if b.min == 0 || sz < b.min { - b.min = sz - } - if b.min == 0 || sz > b.max { - b.max = sz - } -} +// Run executes the command. +func (cmd *Command) Run(args ...string) error { + fs := flag.NewFlagSet("file", flag.ExitOnError) + fs.BoolVar(&cmd.dumpIndex, "index", false, "Dump raw index data") + fs.BoolVar(&cmd.dumpBlocks, "blocks", false, "Dump raw block data") + fs.BoolVar(&cmd.dumpAll, "all", false, "Dump all data. Caution: This may print a lot of information") + fs.StringVar(&cmd.filterKey, "filter-key", "", "Only display index and block data match this key substring") -var ( - fieldType = []string{ - "timestamp", "float", "int", "bool", "string", - } - blockTypes = []string{ - "float64", "int64", "bool", "string", - } - timeEnc = []string{ - "none", "s8b", "rle", - } - floatEnc = []string{ - "none", "gor", - } - intEnc = []string{ - "none", "s8b", "rle", - } - boolEnc = []string{ - "none", "bp", - } - stringEnc = []string{ - "none", "snpy", + fs.SetOutput(cmd.Stdout) + fs.Usage = cmd.printUsage + + if err := fs.Parse(args); err != nil { + return err } - encDescs = [][]string{ - timeEnc, floatEnc, intEnc, boolEnc, stringEnc, + + if fs.Arg(0) == "" { + fmt.Printf("TSM file not specified\n\n") + fs.Usage() + return nil } -) + cmd.path = fs.Args()[0] + cmd.dumpBlocks = cmd.dumpBlocks || cmd.dumpAll || cmd.filterKey != "" + cmd.dumpIndex = cmd.dumpIndex || cmd.dumpAll || cmd.filterKey != "" + return cmd.dump() +} -func cmdDumpTsm1(opts *tsdmDumpOpts) { +func (cmd *Command) dump() error { var errors []error - f, err := os.Open(opts.path) + f, err := os.Open(cmd.path) if err != nil { - println(err.Error()) - os.Exit(1) + return err } // Get the file size stat, err := f.Stat() if err != nil { - println(err.Error()) - os.Exit(1) + return err } b := make([]byte, 8) r, err := tsm1.NewTSMReader(f) if err != nil { - println("Error opening TSM files: ", err.Error()) - os.Exit(1) + return fmt.Errorf("Error opening TSM files: %s", err.Error()) } defer r.Close() @@ -100,7 +88,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { blockStats := &blockStats{} println("Summary:") - fmt.Printf(" File: %s\n", opts.path) + fmt.Printf(" File: %s\n", cmd.path) fmt.Printf(" Time Range: %s - %s\n", time.Unix(0, minTime).UTC().Format(time.RFC3339Nano), time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano), @@ -110,9 +98,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { fmt.Printf(" File Size: %d\n", stat.Size()) println() - tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + tw := tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) - if opts.dumpIndex { + if cmd.dumpIndex { println("Index:") tw.Flush() println() @@ -121,9 +109,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { var pos int for i := 0; i < keyCount; i++ { key, _ := r.KeyAt(i) - for _, e := range r.Entries(key) { + for _, e := range r.Entries(string(key)) { pos++ - split := strings.Split(key, "#!~#") + split := strings.Split(string(key), "#!~#") // We dont' know know if we have fields so use an informative default var measurement, field string = "UNKNOWN", "UNKNOWN" @@ -132,7 +120,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { measurement = split[0] field = split[1] - if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + if cmd.filterKey != "" && !strings.Contains(string(key), cmd.filterKey) { continue } fmt.Fprintln(tw, " "+strings.Join([]string{ @@ -149,7 +137,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { } } - tw = tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + tw = tabwriter.NewWriter(cmd.Stdout, 8, 8, 1, '\t', 0) fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Chk", "Ofs", "Len", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t")) // Starting at 5 because the magic number is 4 bytes + 1 byte version @@ -160,7 +148,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { // Start at the beginning and read every block for j := 0; j < keyCount; j++ { key, _ := r.KeyAt(j) - for _, e := range r.Entries(key) { + for _, e := range r.Entries(string(key)) { f.Seek(int64(e.Offset), 0) f.Read(b[:4]) @@ -172,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { blockSize += int64(e.Size) - if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + if cmd.filterKey != "" && !strings.Contains(string(key), cmd.filterKey) { i += blockSize blockCount++ continue @@ -185,8 +173,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { var v []tsm1.Value v, err := tsm1.DecodeBlock(buf, v) if err != nil { - fmt.Printf("error: %v\n", err.Error()) - os.Exit(1) + return err } startTime := time.Unix(0, v[0].UnixNano()) @@ -210,7 +197,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { blockStats.inc(int(blockType+1), values[0]>>4) blockStats.size(len(buf)) - if opts.dumpBlocks { + if cmd.dumpBlocks { fmt.Fprintln(tw, " "+strings.Join([]string{ strconv.FormatInt(blockCount, 10), strconv.FormatUint(uint64(chksum), 10), @@ -229,7 +216,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { } } - if opts.dumpBlocks { + if cmd.dumpBlocks { println("Blocks:") tw.Flush() println() @@ -271,5 +258,77 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { fmt.Printf(" * %v\n", err) } println() + return fmt.Errorf("error count %d", len(errors)) + } + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *Command) printUsage() { + usage := `Dumps low-level details about tsm1 files. + +Usage: influx_inspect dumptsm [flags] + Only display index and block data match this key substring +` + + fmt.Fprintf(cmd.Stdout, usage) +} + +var ( + fieldType = []string{ + "timestamp", "float", "int", "bool", "string", + } + blockTypes = []string{ + "float64", "int64", "bool", "string", + } + timeEnc = []string{ + "none", "s8b", "rle", + } + floatEnc = []string{ + "none", "gor", + } + intEnc = []string{ + "none", "s8b", "rle", + } + boolEnc = []string{ + "none", "bp", + } + stringEnc = []string{ + "none", "snpy", + } + encDescs = [][]string{ + timeEnc, floatEnc, intEnc, boolEnc, stringEnc, + } +) + +type blockStats struct { + min, max int + counts [][]int +} + +func (b *blockStats) inc(typ int, enc byte) { + for len(b.counts) <= typ { + b.counts = append(b.counts, []int{}) + } + for len(b.counts[typ]) <= int(enc) { + b.counts[typ] = append(b.counts[typ], 0) + } + b.counts[typ][enc]++ +} + +func (b *blockStats) size(sz int) { + if b.min == 0 || sz < b.min { + b.min = sz + } + if b.min == 0 || sz > b.max { + b.max = sz } } diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm_test.go b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm_test.go new file mode 100644 index 000000000..6a01a65e2 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/dumptsm/dumptsm_test.go @@ -0,0 +1,3 @@ +package dumptsm_test + +// TODO: write some tests diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export.go b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export.go deleted file mode 100644 index df8114a7e..000000000 --- a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export.go +++ /dev/null @@ -1,163 +0,0 @@ -package main - -import ( - "compress/gzip" - "fmt" - "io" - "log" - "os" - "path/filepath" - "strings" - - "github.com/influxdata/influxdb/models" - "github.com/influxdata/influxdb/tsdb/engine/tsm1" -) - -type cmdExport struct { - path string - out string - db string - retentionPolicy string - compress bool - - ext string - files map[string][]string -} - -func newCmdExport(path, out, db, retentionPolicy string, compress bool) *cmdExport { - return &cmdExport{ - path: filepath.Join(path, "data"), - out: out, - db: db, - compress: compress, - ext: fmt.Sprintf(".%s", tsm1.TSMFileExtension), - retentionPolicy: retentionPolicy, - files: make(map[string][]string), - } -} - -func (c *cmdExport) validate() error { - // validate args - if c.retentionPolicy != "" && c.db == "" { - return fmt.Errorf("must specify a db") - } - return nil -} - -func (c *cmdExport) run() error { - if err := c.validate(); err != nil { - return err - } - - return c.export() -} - -func (c *cmdExport) export() error { - if err := c.walkFiles(); err != nil { - return err - } - return c.writeFiles() -} - -func (c *cmdExport) walkFiles() error { - err := filepath.Walk(c.path, func(path string, f os.FileInfo, err error) error { - if err != nil { - return err - } - if filepath.Ext(path) == c.ext { - //files = append(files, path) - relPath, _ := filepath.Rel(c.path, path) - dirs := strings.Split(relPath, string(byte(os.PathSeparator))) - if dirs[0] == c.db || c.db == "" { - if dirs[1] == c.retentionPolicy || c.retentionPolicy == "" { - key := filepath.Join(dirs[0], dirs[1]) - files := c.files[key] - if files == nil { - files = []string{} - } - c.files[key] = append(files, path) - } - } - } - return nil - }) - if err != nil { - log.Fatal(err) - } - return nil -} - -func (c *cmdExport) writeFiles() error { - // open our output file and create an output buffer - var w io.WriteCloser - w, err := os.Create(c.out) - if err != nil { - return err - } - defer w.Close() - if c.compress { - w = gzip.NewWriter(w) - defer w.Close() - } - - // Write out all the DDL - fmt.Fprintln(w, "# DDL") - for key, _ := range c.files { - keys := strings.Split(key, string(byte(os.PathSeparator))) - fmt.Fprintf(w, "CREATE DATABASE %s\n", keys[0]) - fmt.Fprintf(w, "CREATE RETENTION POLICY %s ON %s DURATION inf REPLICATION 1\n", keys[1], keys[0]) - } - - fmt.Fprintln(w, "# DML") - for key, files := range c.files { - keys := strings.Split(key, string(byte(os.PathSeparator))) - fmt.Fprintf(w, "# CONTEXT-DATABASE:%s\n", keys[0]) - fmt.Fprintf(w, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1]) - for _, f := range files { - // use an anonymous function here to close the files in the defers and not let them - // accumulate in the loop - if err := func(f string) error { - file, err := os.OpenFile(f, os.O_RDONLY, 0600) - if err != nil { - return fmt.Errorf("%v", err) - } - defer file.Close() - reader, err := tsm1.NewTSMReader(file) - if err != nil { - log.Printf("unable to read %s, skipping\n", f) - return nil - } - defer reader.Close() - - for i := 0; i < reader.KeyCount(); i++ { - var pairs string - key, typ := reader.KeyAt(i) - values, _ := reader.ReadAll(key) - measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) - - for _, value := range values { - switch typ { - case tsm1.BlockFloat64: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case tsm1.BlockInteger: - pairs = field + "=" + fmt.Sprintf("%vi", value.Value()) - case tsm1.BlockBoolean: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - case tsm1.BlockString: - pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value()))) - default: - pairs = field + "=" + fmt.Sprintf("%v", value.Value()) - } - - fmt.Fprintln(w, measurement, pairs, value.UnixNano()) - } - } - return nil - }(f); err != nil { - return err - } - } - _ = key - } - return nil -} diff --git a/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export/export.go b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export/export.go new file mode 100644 index 000000000..0ca444656 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/cmd/influx_inspect/export/export.go @@ -0,0 +1,417 @@ +package export + +import ( + "compress/gzip" + "flag" + "fmt" + "io" + "log" + "math" + "os" + "path" + "path/filepath" + "sort" + "strings" + "sync" + "time" + + "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +// Command represents the program execution for "influx_inspect export". +type Command struct { + // Standard input/output, overridden for testing. + Stderr io.Writer + Stdout io.Writer + + dataDir string + walDir string + out string + database string + retentionPolicy string + startTime int64 + endTime int64 + compress bool + + manifest map[string]struct{} + tsmFiles map[string][]string + walFiles map[string][]string +} + +// NewCommand returns a new instance of Command. +func NewCommand() *Command { + return &Command{ + Stderr: os.Stderr, + Stdout: os.Stdout, + + manifest: make(map[string]struct{}), + tsmFiles: make(map[string][]string), + walFiles: make(map[string][]string), + } +} + +// Run executes the command. +func (cmd *Command) Run(args ...string) error { + var start, end string + fs := flag.NewFlagSet("export", flag.ExitOnError) + fs.StringVar(&cmd.dataDir, "datadir", os.Getenv("HOME")+"/.influxdb/data", "Data storage path. [$HOME/.influxdb/data]") + fs.StringVar(&cmd.walDir, "waldir", os.Getenv("HOME")+"/.influxdb/wal", "Wal storage path. [$HOME/.influxdb/wal]") + fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to") + fs.StringVar(&cmd.database, "database", "", "Optional: the database to export") + fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires db parameter to be specified)") + fs.StringVar(&start, "start", "", "Optional: the start time to export") + fs.StringVar(&end, "end", "", "Optional: the end time to export") + fs.BoolVar(&cmd.compress, "compress", false, "Compress the output") + + fs.SetOutput(cmd.Stdout) + fs.Usage = cmd.printUsage + + if err := fs.Parse(args); err != nil { + return err + } + + // set defaults + if start != "" { + s, err := time.Parse(time.RFC3339, start) + if err != nil { + return err + } + cmd.startTime = s.UnixNano() + } else { + cmd.startTime = math.MinInt64 + } + if end != "" { + e, err := time.Parse(time.RFC3339, end) + if err != nil { + return err + } + cmd.endTime = e.UnixNano() + } else { + // set end time to max if it is not set. + cmd.endTime = math.MaxInt64 + } + + if err := cmd.validate(); err != nil { + return err + } + + return cmd.export() +} + +func (cmd *Command) validate() error { + // validate args + if cmd.retentionPolicy != "" && cmd.database == "" { + return fmt.Errorf("must specify a db") + } + if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime { + return fmt.Errorf("end time before start time") + } + return nil +} + +func (cmd *Command) export() error { + if err := cmd.walkTSMFiles(); err != nil { + return err + } + if err := cmd.walkWALFiles(); err != nil { + return err + } + return cmd.writeFiles() +} + +func (cmd *Command) walkTSMFiles() error { + err := filepath.Walk(cmd.dataDir, func(dir string, f os.FileInfo, err error) error { + if err != nil { + return err + } + + // check to see if this is a tsm file + ext := fmt.Sprintf(".%s", tsm1.TSMFileExtension) + if filepath.Ext(dir) != ext { + return nil + } + + relPath, _ := filepath.Rel(cmd.dataDir, dir) + dirs := strings.Split(relPath, string(byte(os.PathSeparator))) + if len(dirs) < 2 { + return fmt.Errorf("invalid directory structure for %s", dir) + } + if dirs[0] == cmd.database || cmd.database == "" { + if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { + key := filepath.Join(dirs[0], dirs[1]) + files := cmd.tsmFiles[key] + if files == nil { + files = []string{} + } + cmd.manifest[key] = struct{}{} + cmd.tsmFiles[key] = append(files, dir) + } + } + return nil + }) + if err != nil { + return err + } + return nil +} + +func (cmd *Command) walkWALFiles() error { + err := filepath.Walk(cmd.walDir, func(dir string, f os.FileInfo, err error) error { + if err != nil { + return err + } + + // check to see if this is a wal file + prefix := tsm1.WALFilePrefix + ext := fmt.Sprintf(".%s", tsm1.WALFileExtension) + _, fileName := path.Split(dir) + if filepath.Ext(dir) != ext || !strings.HasPrefix(fileName, prefix) { + return nil + } + + relPath, _ := filepath.Rel(cmd.walDir, dir) + dirs := strings.Split(relPath, string(byte(os.PathSeparator))) + if len(dirs) < 2 { + return fmt.Errorf("invalid directory structure for %s", dir) + } + if dirs[0] == cmd.database || cmd.database == "" { + if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { + key := filepath.Join(dirs[0], dirs[1]) + files := cmd.walFiles[key] + if files == nil { + files = []string{} + } + cmd.manifest[key] = struct{}{} + cmd.walFiles[key] = append(files, dir) + } + } + return nil + }) + if err != nil { + return err + } + return nil +} + +func (cmd *Command) writeFiles() error { + // open our output file and create an output buffer + var w io.WriteCloser + w, err := os.Create(cmd.out) + if err != nil { + return err + } + defer w.Close() + if cmd.compress { + w = gzip.NewWriter(w) + defer w.Close() + } + + s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339) + fmt.Fprintf(w, "# INFLUXDB EXPORT: %s - %s\n", s, e) + + // Write out all the DDL + fmt.Fprintln(w, "# DDL") + for key := range cmd.manifest { + keys := strings.Split(key, string(byte(os.PathSeparator))) + db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1]) + fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp) + } + + fmt.Fprintln(w, "# DML") + for key := range cmd.manifest { + keys := strings.Split(key, string(byte(os.PathSeparator))) + fmt.Fprintf(w, "# CONTEXT-DATABASE:%s\n", keys[0]) + fmt.Fprintf(w, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1]) + if files, ok := cmd.tsmFiles[key]; ok { + fmt.Printf("writing out tsm file data for %s...", key) + if err := cmd.writeTsmFiles(w, files); err != nil { + return err + } + fmt.Println("complete.") + } + if _, ok := cmd.walFiles[key]; ok { + fmt.Printf("writing out wal file data for %s...", key) + if err := cmd.writeWALFiles(w, cmd.walFiles[key], key); err != nil { + return err + } + fmt.Println("complete.") + } + } + return nil +} + +func (cmd *Command) writeTsmFiles(w io.WriteCloser, files []string) error { + fmt.Fprintln(w, "# writing tsm data") + + // we need to make sure we write the same order that the files were written + sort.Strings(files) + + // use a function here to close the files in the defers and not let them accumulate in the loop + write := func(f string) error { + file, err := os.OpenFile(f, os.O_RDONLY, 0600) + if err != nil { + return fmt.Errorf("%v", err) + } + defer file.Close() + reader, err := tsm1.NewTSMReader(file) + if err != nil { + log.Printf("unable to read %s, skipping\n", f) + return nil + } + defer reader.Close() + + if sgStart, sgEnd := reader.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime { + return nil + } + + for i := 0; i < reader.KeyCount(); i++ { + var pairs string + key, typ := reader.KeyAt(i) + values, _ := reader.ReadAll(string(key)) + measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) + + for _, value := range values { + if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) { + continue + } + + switch typ { + case tsm1.BlockFloat64: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + case tsm1.BlockInteger: + pairs = field + "=" + fmt.Sprintf("%vi", value.Value()) + case tsm1.BlockBoolean: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + case tsm1.BlockString: + pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value()))) + default: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + } + + fmt.Fprintln(w, string(measurement), pairs, value.UnixNano()) + } + } + return nil + } + + for _, f := range files { + if err := write(f); err != nil { + return err + } + } + + return nil +} + +func (cmd *Command) writeWALFiles(w io.WriteCloser, files []string, key string) error { + fmt.Fprintln(w, "# writing wal data") + + // we need to make sure we write the same order that the wal received the data + sort.Strings(files) + + var once sync.Once + warn := func() { + msg := fmt.Sprintf(`WARNING: detected deletes in wal file. + Some series for %q may be brought back by replaying this data. + To resolve, you can either let the shard snapshot prior to exporting the data + or manually editing the exported file. + `, key) + fmt.Fprintln(cmd.Stderr, msg) + } + + // use a function here to close the files in the defers and not let them accumulate in the loop + write := func(f string) error { + file, err := os.OpenFile(f, os.O_RDONLY, 0600) + if err != nil { + return fmt.Errorf("%v", err) + } + defer file.Close() + + reader := tsm1.NewWALSegmentReader(file) + defer reader.Close() + for reader.Next() { + entry, err := reader.Read() + if err != nil { + n := reader.Count() + fmt.Fprintf(os.Stderr, "file %s corrupt at position %d", file.Name(), n) + break + } + + switch t := entry.(type) { + case *tsm1.DeleteWALEntry: + once.Do(warn) + continue + case *tsm1.DeleteRangeWALEntry: + once.Do(warn) + continue + case *tsm1.WriteWALEntry: + var pairs string + + for key, values := range t.Values { + measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) + + for _, value := range values { + if (value.UnixNano() < cmd.startTime) || (value.UnixNano() > cmd.endTime) { + continue + } + + switch value.Value().(type) { + case float64: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + case int64: + pairs = field + "=" + fmt.Sprintf("%vi", value.Value()) + case bool: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + case string: + pairs = field + "=" + fmt.Sprintf("%q", models.EscapeStringField(fmt.Sprintf("%s", value.Value()))) + default: + pairs = field + "=" + fmt.Sprintf("%v", value.Value()) + } + fmt.Fprintln(w, string(measurement), pairs, value.UnixNano()) + } + } + } + } + return nil + } + + for _, f := range files { + if err := write(f); err != nil { + return err + } + } + + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *Command) printUsage() { + usage := fmt.Sprintf(`Exports TSM files into InfluxDB line protocol format. + +Usage: influx_inspect export [flags] + + -datadir + Data storage path + Defaults to "%[1]s/.influxdb/data". + -waldir + WAL storage path + Defaults to "%[1]s/.influxdb/wal". + -out + Destination file to export to. + Defaults to "%[1]s/.influxdb/export". + -database + Optional. Database to export. + -retention + Optional. the retention policy to export (requires db parameter to be specified). + -start-time