diff --git a/CHANGELOG.md b/CHANGELOG.md index 824bb8a22..73fcea2e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,47 @@ On startup Kapacitor will detect the change and recreate the subscriptions in In >NOTE: While HTTP itself is a TCP transport such that packet loss shouldn't be an issue, if Kapacitor starts to slow down for whatever reason, InfluxDB will drop the subscription writes to Kapacitor. In order to know if subscription writes are being dropped you should monitor the measurement `_internal.monitor.subscriber` for the field `writeFailures`. +#### Holt-Winters Forecasting + +This release contains an new Holt Winters InfluxQL function. + +With this forecasting method one can now define an alert based off forecasted future values. + +For example, the following TICKscript will take the last 30 days of disk usage stats and using holt-winters forecast the next 7 days. +If the forecasted value crosses a threshold an alert is triggered. + +The result is now Kapacitor will alert you 7 days in advance of a disk filling up. +This assumes a slow growth but by changing the vars in the script you could check for shorter growth intervals. + +``` +// The interval on which to aggregate the disk usage +var growth_interval = 1d +// The number of `growth_interval`s to forecast into the future +var forecast_count = 7 +// The amount of historical data to use for the fit +var history = 30d + +// The critical threshold on used_percent +var threshold = 90.0 + +batch + |query(''' + SELECT max(used_percent) as used_percent + FROM "telegraf"."default"."disk" +''') + .period(history) + .every(growth_interval) + .align() + .groupBy(time(growth_interval), *) + |holtWinters('used_percent', forecast_count, 0, growth_interval) + .as('used_percent') + |max('used_percent') + .as('used_percent') + |alert() + // Trigger alert if the forecasted disk usage is greater than threshold + .crit(lambda: "used_percent" > threshold) +``` + ### Features @@ -124,6 +165,7 @@ In order to know if subscription writes are being dropped you should monitor the - [#416](https://github.com/influxdata/kapacitor/issues/416): Track ingress counts by database, retention policy, and measurement. Expose stats via cli. - [#586](https://github.com/influxdata/kapacitor/pull/586): Add spread stateful function. thanks @upccup! - [#600](https://github.com/influxdata/kapacitor/pull/600): Add close http response after handler laert post, thanks @jsvisa! +- [#606](https://github.com/influxdata/kapacitor/pull/606): Add Holt-Winters forecasting method. ### Bugfixes diff --git a/pipeline/influxql.go b/pipeline/influxql.go index ece5d2a54..6dc3a9e9e 100644 --- a/pipeline/influxql.go +++ b/pipeline/influxql.go @@ -401,3 +401,28 @@ func (n *chainnode) Elapsed(field string, unit time.Duration) *InfluxQLNode { n.linkChild(i) return i } + +// Compute the holt-winters forecast of a data set. +func (n *chainnode) HoltWinters(field string, h, m int64, interval time.Duration) *InfluxQLNode { + return n.holtWinters(field, h, m, interval, false) +} + +// Compute the holt-winters forecast of a data set. +func (n *chainnode) HoltWintersWithFit(field string, h, m int64, interval time.Duration) *InfluxQLNode { + return n.holtWinters(field, h, m, interval, true) +} + +func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration, includeFitData bool) *InfluxQLNode { + i := newInfluxQLNode("holt_winters", field, n.Provides(), BatchEdge, ReduceCreater{ + CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) + return fn, fn + }, + CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { + fn := influxql.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) + return fn, fn + }, + }) + n.linkChild(i) + return i +} diff --git a/vendor.yml b/vendor.yml index ccfc737cc..f3b9f59b3 100644 --- a/vendor.yml +++ b/vendor.yml @@ -8,7 +8,7 @@ vendors: - path: github.com/davecgh/go-spew rev: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d - path: github.com/dustin/go-humanize - rev: 88e58c26e9fe8ac578a0d76a68e32838acf17a8d + rev: 499693e27ee0d14ffab67c31ad065fdb3d34ea75 - path: github.com/gogo/protobuf rev: 7883e1468d48d969e1c3ce4bcde89b6a7dd4adc4 - path: github.com/golang/protobuf @@ -16,7 +16,7 @@ vendors: - path: github.com/gorhill/cronexpr rev: f0984319b44273e83de132089ae42b1810f4933b - path: github.com/influxdata/influxdb - rev: 6e0c5698c1dc33064a1aeb59d0caf8fa88c2edb8 + rev: 128b07e352a324c90b48d386cad4efb75f56a0d0 - path: github.com/influxdata/wlog rev: 7c63b0a71ef8300adc255344d275e10e5c3a71ec - path: github.com/influxdb/usage-client @@ -42,7 +42,7 @@ vendors: - path: github.com/stretchr/testify rev: 8d64eb7173c7753d6419fd4a9caf057398611364 - path: github.com/twinj/uuid - rev: 1ec75364d0b109a59493653451144657b8b0698d + rev: 5a4b9dcb2a5e9eaba079cd853d275582fc764505 - path: golang.org/x/crypto rev: 5bcd134fee4dd1475da17714aac19c0aa0142e2f - path: golang.org/x/sys diff --git a/vendor/github.com/dustin/go-humanize/README.markdown b/vendor/github.com/dustin/go-humanize/README.markdown index 5fcdfa419..23dfee0ac 100644 --- a/vendor/github.com/dustin/go-humanize/README.markdown +++ b/vendor/github.com/dustin/go-humanize/README.markdown @@ -1,4 +1,4 @@ -# Humane Units +# Humane Units [![Build Status](https://travis-ci.org/dustin/go-humanize.svg?branch=master)](https://travis-ci.org/dustin/go-humanize) [![GoDoc](https://godoc.org/github.com/dustin/go-humanize?status.svg)](https://godoc.org/github.com/dustin/go-humanize) Just a few functions for helping humanize times and sizes. diff --git a/vendor/github.com/dustin/go-humanize/comma_go15.go b/vendor/github.com/dustin/go-humanize/commaf.go similarity index 97% rename from vendor/github.com/dustin/go-humanize/comma_go15.go rename to vendor/github.com/dustin/go-humanize/commaf.go index 9d27cdc3d..620690dec 100644 --- a/vendor/github.com/dustin/go-humanize/comma_go15.go +++ b/vendor/github.com/dustin/go-humanize/commaf.go @@ -1,4 +1,4 @@ -// +build go1.5 +// +build go1.6 package humanize diff --git a/vendor/github.com/influxdata/influxdb/influxql/functions.go b/vendor/github.com/influxdata/influxdb/influxql/functions.go index 271cbfd85..c9f5af952 100644 --- a/vendor/github.com/influxdata/influxdb/influxql/functions.go +++ b/vendor/github.com/influxdata/influxdb/influxql/functions.go @@ -322,14 +322,6 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint { // 1. Using the series the initial values are calculated using a SSE. // 2. The series is forecasted into the future using the iterative relations. type FloatHoltWintersReducer struct { - // Smoothing parameters - alpha, - beta, - gamma float64 - - // Dampening parameter - phi float64 - // Season period m int seasonal bool @@ -355,11 +347,19 @@ type FloatHoltWintersReducer struct { } const ( - defaultAlpha = 0.5 - defaultBeta = 0.5 - defaultGamma = 0.5 - defaultPhi = 0.5 - defaultEpsilon = 1.0e-4 + // Arbitrary weight for initializing some intial guesses. + // This should be in the range [0,1] + hwWeight = 0.5 + // Epsilon value for the minimization process + hwDefaultEpsilon = 1.0e-4 + // Define a grid of initial guesses for the parameters: alpha, beta, gamma, and phi. + // Keep in mind that this grid is N^4 so we should keep N small + // The starting lower guess + hwGuessLower = 0.3 + // The upper bound on the grid + hwGuessUpper = 1.0 + // The step between guesses + hwGuessStep = 0.4 ) // NewFloatHoltWintersReducer creates a new FloatHoltWintersReducer. @@ -369,10 +369,6 @@ func NewFloatHoltWintersReducer(h, m int, includeFitData bool, interval time.Dur seasonal = false } return &FloatHoltWintersReducer{ - alpha: defaultAlpha, - beta: defaultBeta, - gamma: defaultGamma, - phi: defaultPhi, h: h, m: m, seasonal: seasonal, @@ -380,7 +376,7 @@ func NewFloatHoltWintersReducer(h, m int, includeFitData bool, interval time.Dur interval: int64(interval), halfInterval: int64(interval) / 2, optim: neldermead.New(), - epsilon: defaultEpsilon, + epsilon: hwDefaultEpsilon, } } @@ -442,15 +438,9 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { r.y = append(r.y, p.Value) } - // Smoothing parameters - alpha, beta, gamma := r.alpha, r.beta, r.gamma - // Seasonality m := r.m - // Dampening paramter - phi := r.phi - // Starting guesses // NOTE: Since these values are guesses // in the cases where we were missing data, @@ -464,7 +454,7 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { } } } else { - l_0 += alpha * r.y[0] + l_0 += hwWeight * r.y[0] } b_0 := 0.0 @@ -476,7 +466,7 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { } } else { if !math.IsNaN(r.y[1]) { - b_0 = beta * (r.y[1] - r.y[0]) + b_0 = hwWeight * (r.y[1] - r.y[0]) } } @@ -493,10 +483,6 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { } parameters := make([]float64, 6+len(s)) - parameters[0] = alpha - parameters[1] = beta - parameters[2] = gamma - parameters[3] = phi parameters[4] = l_0 parameters[5] = b_0 o := len(parameters) - len(s) @@ -505,28 +491,51 @@ func (r *FloatHoltWintersReducer) Emit() []FloatPoint { } // Determine best fit for the various parameters - _, params := r.optim.Optimize(r.sse, parameters, r.epsilon, 1, r.constrain) + minSSE := math.Inf(1) + var bestParams []float64 + for alpha := hwGuessLower; alpha < hwGuessUpper; alpha += hwGuessStep { + for beta := hwGuessLower; beta < hwGuessUpper; beta += hwGuessStep { + for gamma := hwGuessLower; gamma < hwGuessUpper; gamma += hwGuessStep { + for phi := hwGuessLower; phi < hwGuessUpper; phi += hwGuessStep { + parameters[0] = alpha + parameters[1] = beta + parameters[2] = gamma + parameters[3] = phi + sse, params := r.optim.Optimize(r.sse, parameters, r.epsilon, 1) + if sse < minSSE || bestParams == nil { + minSSE = sse + bestParams = params + } + } + } + } + } // Forecast - forecasted := r.forecast(r.h, params) + forecasted := r.forecast(r.h, bestParams) var points []FloatPoint if r.includeFitData { - points = make([]FloatPoint, len(forecasted)) + start := r.points[0].Time + points = make([]FloatPoint, 0, len(forecasted)) for i, v := range forecasted { - t := start + r.interval*(int64(i)) - points[i] = FloatPoint{ - Value: v, - Time: t, + if !math.IsNaN(v) { + t := start + r.interval*(int64(i)) + points = append(points, FloatPoint{ + Value: v, + Time: t, + }) } } } else { - points = make([]FloatPoint, r.h) - forecasted := r.forecast(r.h, params) + stop := r.points[len(r.points)-1].Time + points = make([]FloatPoint, 0, r.h) for i, v := range forecasted[len(r.y):] { - t := stop + r.interval*(int64(i)+1) - points[i] = FloatPoint{ - Value: v, - Time: t, + if !math.IsNaN(v) { + t := stop + r.interval*(int64(i)+1) + points = append(points, FloatPoint{ + Value: v, + Time: t, + }) } } } @@ -546,6 +555,9 @@ func (r *FloatHoltWintersReducer) next(alpha, beta, gamma, phi, phi_h, y_t, l_tp // Forecast the data h points into the future. func (r *FloatHoltWintersReducer) forecast(h int, params []float64) []float64 { + // Constrain parameters + r.constrain(params) + y_t := r.y[0] phi := params[3] @@ -575,7 +587,7 @@ func (r *FloatHoltWintersReducer) forecast(h int, params []float64) []float64 { stm, stmh := 1.0, 1.0 for t := 1; t < l+h; t++ { if r.seasonal { - hm = (t - 1) % m + hm = t % m stm = seasonals[(t-m+so)%m] stmh = seasonals[(t-m+hm+so)%m] } @@ -594,8 +606,8 @@ func (r *FloatHoltWintersReducer) forecast(h int, params []float64) []float64 { phi_h += math.Pow(phi, float64(t)) if r.seasonal { - so++ seasonals[(t+so)%m] = s_t + so++ } forecasted[t] = y_t @@ -611,8 +623,13 @@ func (r *FloatHoltWintersReducer) sse(params []float64) float64 { // Skip missing values since we cannot use them to compute an error. if !math.IsNaN(r.y[i]) { // Compute error - diff := forecasted[i] - r.y[i] - sse += diff * diff + if math.IsNaN(forecasted[i]) { + // Penalize forecasted NaNs + return math.Inf(1) + } else { + diff := forecasted[i] - r.y[i] + sse += diff * diff + } } } return sse diff --git a/vendor/github.com/influxdata/influxdb/influxql/iterator.go b/vendor/github.com/influxdata/influxdb/influxql/iterator.go index 4ee2f75c7..92dcb1f69 100644 --- a/vendor/github.com/influxdata/influxdb/influxql/iterator.go +++ b/vendor/github.com/influxdata/influxdb/influxql/iterator.go @@ -112,6 +112,38 @@ func (a Iterators) cast() interface{} { return a } +// Merge combines all iterators into a single iterator. +// A sorted merge iterator or a merge iterator can be used based on opt. +func (a Iterators) Merge(opt IteratorOptions) (Iterator, error) { + // Merge into a single iterator. + if opt.MergeSorted() { + itr := NewSortedMergeIterator(a, opt) + if itr != nil && opt.InterruptCh != nil { + itr = NewInterruptIterator(itr, opt.InterruptCh) + } + return itr, nil + } + + itr := NewMergeIterator(a, opt) + if itr == nil { + return nil, nil + } + + if opt.Expr != nil { + if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" { + opt.Expr = &Call{ + Name: "sum", + Args: expr.Args, + } + } + } + + if opt.InterruptCh != nil { + itr = NewInterruptIterator(itr, opt.InterruptCh) + } + return NewCallIterator(itr, opt) +} + // NewMergeIterator returns an iterator to merge itrs into one. // Inputs must either be merge iterators or only contain a single name/tag in // sorted order. The iterator will output all points by window, name/tag, then @@ -578,32 +610,7 @@ func (a IteratorCreators) CreateIterator(opt IteratorOptions) (Iterator, error) return nil, err } - // Merge into a single iterator. - if opt.MergeSorted() { - itr := NewSortedMergeIterator(itrs, opt) - if itr != nil && opt.InterruptCh != nil { - itr = NewInterruptIterator(itr, opt.InterruptCh) - } - return itr, nil - } - - itr := NewMergeIterator(itrs, opt) - if itr != nil { - if opt.Expr != nil { - if expr, ok := opt.Expr.(*Call); ok && expr.Name == "count" { - opt.Expr = &Call{ - Name: "sum", - Args: expr.Args, - } - } - } - - if opt.InterruptCh != nil { - itr = NewInterruptIterator(itr, opt.InterruptCh) - } - return NewCallIterator(itr, opt) - } - return nil, nil + return Iterators(itrs).Merge(opt) } // FieldDimensions returns unique fields and dimensions from multiple iterator creators. @@ -1259,7 +1266,11 @@ func (itr *floatFastDedupeIterator) Next() (*FloatPoint, error) { } // If the point has already been output then move to the next point. - key := fastDedupeKey{p.Name, p.Aux[0]} + key := fastDedupeKey{name: p.Name} + key.values[0] = p.Aux[0] + if len(p.Aux) > 1 { + key.values[1] = p.Aux[1] + } if _, ok := itr.m[key]; ok { continue } @@ -1271,8 +1282,8 @@ func (itr *floatFastDedupeIterator) Next() (*FloatPoint, error) { } type fastDedupeKey struct { - name string - value interface{} + name string + values [2]interface{} } type reverseStringSlice []string diff --git a/vendor/github.com/influxdata/influxdb/influxql/neldermead/neldermead.go b/vendor/github.com/influxdata/influxdb/influxql/neldermead/neldermead.go index e270cea55..6b6829289 100644 --- a/vendor/github.com/influxdata/influxdb/influxql/neldermead/neldermead.go +++ b/vendor/github.com/influxdata/influxdb/influxql/neldermead/neldermead.go @@ -38,7 +38,6 @@ func (o *Optimizer) Optimize( start []float64, epsilon, scale float64, - constrain func([]float64), ) (float64, []float64) { n := len(start) @@ -83,10 +82,6 @@ func (o *Optimizer) Optimize( } } - if constrain != nil { - constrain(v[n]) - } - // find the initial function values for j := 0; j <= n; j++ { f[j] = objfunc(v[j]) @@ -129,9 +124,6 @@ func (o *Optimizer) Optimize( for i := 0; i <= n-1; i++ { vr[i] = vm[i] + o.Alpha*(vm[i]-v[vg][i]) } - if constrain != nil { - constrain(vr) - } // value of function at reflection point fr := objfunc(vr) @@ -148,9 +140,6 @@ func (o *Optimizer) Optimize( for i := 0; i <= n-1; i++ { ve[i] = vm[i] + o.Gamma*(vr[i]-vm[i]) } - if constrain != nil { - constrain(ve) - } // value of function at expansion point fe := objfunc(ve) @@ -186,10 +175,6 @@ func (o *Optimizer) Optimize( } } - if constrain != nil { - constrain(vc) - } - // value of function at contraction point fc := objfunc(vc) @@ -210,17 +195,7 @@ func (o *Optimizer) Optimize( } } } - - if constrain != nil { - constrain(v[vg]) - } - f[vg] = objfunc(v[vg]) - - if constrain != nil { - constrain(v[vh]) - } - f[vh] = objfunc(v[vh]) } } diff --git a/vendor/github.com/influxdata/influxdb/influxql/select.go b/vendor/github.com/influxdata/influxdb/influxql/select.go index b7dffe000..843dbac30 100644 --- a/vendor/github.com/influxdata/influxdb/influxql/select.go +++ b/vendor/github.com/influxdata/influxdb/influxql/select.go @@ -104,9 +104,13 @@ func buildAuxIterators(fields Fields, ic IteratorCreator, opt IteratorOptions) ( // Filter out duplicate rows, if required. if opt.Dedupe { - // If there is no group by and it's a single field then fast dedupe. - if itr, ok := input.(FloatIterator); ok && len(fields) == 1 && len(opt.Dimensions) == 0 { - input = newFloatFastDedupeIterator(itr) + // If there is no group by and it is a float iterator, see if we can use a fast dedupe. + if itr, ok := input.(FloatIterator); ok && len(opt.Dimensions) == 0 { + if sz := len(fields); sz > 0 && sz < 3 { + input = newFloatFastDedupeIterator(itr) + } else { + input = NewDedupeIterator(itr) + } } else { input = NewDedupeIterator(input) } diff --git a/vendor/github.com/twinj/uuid/README.md b/vendor/github.com/twinj/uuid/README.md index 21d7f1459..890806a16 100644 --- a/vendor/github.com/twinj/uuid/README.md +++ b/vendor/github.com/twinj/uuid/README.md @@ -1,11 +1,11 @@ Go UUID implementation ======================== -[![Build Status]()](https://ci.appveyor.com/api/projects/status/github/twinj/uuid?branch=master&svg=true) +[![Build Status](https://ci.appveyor.com/api/projects/status/github/twinj/uuid?branch=master&svg=true)](https://ci.appveyor.com/project/twinj/uuid) [![Build Status](https://travis-ci.org/twinj/uuid.png?branch=master)](https://travis-ci.org/twinj/uuid) [![GoDoc](http://godoc.org/github.com/twinj/uuid?status.png)](http://godoc.org/github.com/twinj/uuid) -This package provides RFC 4122 compliant UUIDs. +This package provides RFC 4122 and DCE 1.1 compliant UUIDs. It will generate the following: * Version 1: based on a Timestamp and MAC address as Node id @@ -15,33 +15,32 @@ It will generate the following: * Version 5: based on SHA-1 hash * Your own implementations -Functions NewV1, NewV3, NewV4, NewV5, New, NewHex and Parse() for generating versions 1, 3, 4 -and 5 UUIDs are as specified in [RFC 4122](http://www.ietf.org/rfc/rfc4122.txt). +Functions NewV1, NewV2, NewV3, NewV4, NewV5, New, NewHex and Parse() for generating version 1, 2, 3, 4 +and 5 UUIDs # Requirements -Go 1.6, 1.5, 1.4, 1.3, 1.2. +Any supported version of Go. # Design considerations * Ensure UUIDs are unique across a use case - Proper test coverage has determined that the UUID timestamp spinner works correctly -* the generator should work on all app servers. + Proper test coverage has determined that the UUID timestamp spinner works correctly, cross multiple clock resolutions +* Generator should work on all app servers. No Os locking threads or file system dependant storage. Saver interface exists for the user to provide their own Saver implementations for V1 and V2 UUIDs. The interface could theoretically be applied to your own UUID implementation. Have provided a saver which works on a standard OS environment. - New implementations for app servers which stop -* UUIDs +* Allow user implementations # Future considerations -* allows open ended user implementations -* allow open ended generation of UUID via various node id's +* length and format should not be an issue +* # Recent Changes -* Improved file system Saver interface, breaking changes, however. +* Improved file system Saver interface, breaking changes. To use a saver make sure you pass it in via the uuid.SetupSaver(Saver) method before a UUID is generated, so as to take affect. * Removed use of OS Thread locking and runtime package requirement * Changed String() output to CleanHyphen to match the canonical standard @@ -64,7 +63,7 @@ Use the `go` tool: See [documentation and examples](http://godoc.org/github.com/twinj/uuid) for more information. - uuid.SetupSaver(config) + uuid.SetupSaver(...) u1 := uuid.NewV1() @@ -83,17 +82,20 @@ for more information. uuid.SwitchFormat(uuid.BracketHyphen) +## Links + +[RFC 4122](http://www.ietf.org/rfc/rfc4122.txt) +[DCE 1.1: Authentication and Security Services](http://pubs.opengroup.org/onlinepubs/9629399/apdxa.htm) + ## Copyright This is a derivative work -Orginal version from +Original version from Copyright (C) 2011 by Krzysztof Kowalik . See [COPYING](https://github.com/nu7hatch/gouuid/tree/master/COPYING) file for details. -Also see: Algorithm details in [RFC 4122](http://www.ietf.org/rfc/rfc4122.txt). - Copyright (C) 2014 twinj@github.com See [LICENSE](https://github.com/twinj/uuid/tree/master/LICENSE) file for details. diff --git a/vendor/github.com/twinj/uuid/appveyor.yml b/vendor/github.com/twinj/uuid/appveyor.yml index 8d1287ccc..d4ff307fb 100644 --- a/vendor/github.com/twinj/uuid/appveyor.yml +++ b/vendor/github.com/twinj/uuid/appveyor.yml @@ -16,13 +16,12 @@ install: - set PATH=%GOPATH%\bin;c:\go\bin;%PATH% - go version - go env + - go get -t -v ./... # to run your custom scripts instead of automatic MSBuild build_script: - go vet ./... - gofmt -s -l . -# - golint github.com/twinj/uuid... -# - deadcode - go test -short -v - go test -short -race -v