From 092bc3fd2dd3995664693a58755c7e10cc3fed69 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 3 Jun 2015 18:39:37 -0400 Subject: [PATCH] fix #2733: hook CQs back in --- client/influxdb.go | 2 +- cmd/influxd/run/server.go | 9 + services/continuous_querier/service.go | 342 +++++++++++++++++++++++++ 3 files changed, 352 insertions(+), 1 deletion(-) diff --git a/client/influxdb.go b/client/influxdb.go index f2e56b8077d..31134559bf3 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -137,7 +137,7 @@ func (c *Client) Write(bp BatchPoints) (*Response, error) { if err != nil { return nil, err } - req.Header.Set("Content-Type", "application/json") + req.Header.Set("Content-Type", "") req.Header.Set("User-Agent", c.userAgent) if c.username != "" { req.SetBasicAuth(c.username, c.password) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 1de35960728..e2010f12bf8 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -8,6 +8,7 @@ import ( "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/services/admin" "github.com/influxdb/influxdb/services/collectd" + "github.com/influxdb/influxdb/services/continuous_querier" "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/opentsdb" @@ -65,6 +66,7 @@ func NewServer(c *Config) *Server { s.appendOpenTSDBService(c.OpenTSDB) s.appendUDPService(c.UDP) s.appendRetentionPolicyService(c.Retention) + s.appendContinuousQueryService(c.ContinuousQuery) for _, g := range c.Graphites { s.appendGraphiteService(g) } @@ -136,6 +138,13 @@ func (s *Server) appendUDPService(c udp.Config) { return } srv := udp.NewService(c) + srv.Server.PointsWriter = s.PointsWriter +} + +func (s *Server) appendContinuousQueryService(c continuous_querier.Config) { + srv := continuous_querier.NewService(c) + srv.MetaStore = s.MetaStore + srv.QueryExecutor = s.QueryExecutor srv.PointsWriter = s.PointsWriter s.Services = append(s.Services, srv) } diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 488744f6816..2ac000ab168 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -1 +1,343 @@ package continuous_querier + +import ( + "errors" + "log" + "os" + "strings" + "sync" + "time" + + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/influxql" + "github.com/influxdb/influxdb/meta" + "github.com/influxdb/influxdb/tsdb" +) + +const ( + // When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries + NoChunkingSize = 0 +) + +// Service manages continuous query execution. +type Service struct { + MetaStore *meta.Store + QueryExecutor *tsdb.QueryExecutor + PointsWriter *cluster.PointsWriter + Config *Config + RunInterval time.Duration + Logger *log.Logger + // lastRuns maps CQ name to last time it was run. + lastRuns map[string]time.Time + stop chan struct{} + wg *sync.WaitGroup +} + +// NewService returns a new instance of Service. +func NewService(c Config) *Service { + s := &Service{ + Config: &c, + RunInterval: time.Second, + Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), + lastRuns: map[string]time.Time{}, + } + return s +} + +// Open starts the service. +func (s *Service) Open() error { + if s.stop != nil { + return nil + } + + if s.MetaStore == nil { + panic("MetaStore is nil") + } else if s.QueryExecutor == nil { + panic("QueryExecutor is nil") + } else if s.PointsWriter == nil { + panic("PointsWriter is nil") + } + + s.stop = make(chan struct{}) + s.wg = &sync.WaitGroup{} + s.wg.Add(1) + go s.backgroundLoop() + return nil +} + +// Close stops the service. +func (s *Service) Close() error { + if s.stop == nil { + return nil + } + close(s.stop) + s.wg.Wait() + s.wg = nil + s.stop = nil + return nil +} + +// backgroundLoop runs on a go routine and periodically executes CQs. +func (s *Service) backgroundLoop() { + defer s.wg.Done() + for { + select { + case <-s.stop: + return + case <-time.After(s.RunInterval): + if s.MetaStore.IsLeader() { + s.runContinuousQueries() + } + } + } +} + +// runContinuousQueries gets CQs from the meta store and runs them. +func (s *Service) runContinuousQueries() { + // Get list of all databases. + dbs, err := s.MetaStore.Databases() + if err != nil { + s.Logger.Println("error getting databases") + return + } + // Loop through all databases executing CQs. + for _, db := range dbs { + // TODO: distribute across nodes + for _, cq := range db.ContinuousQueries { + if err := s.ExecuteContinuousQuery(&db, &cq); err != nil { + s.Logger.Println(err) + } + } + } +} + +// ExecuteContinuousQuery executes a single CQ. +func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error { + // TODO: re-enable stats + //s.stats.Inc("continuousQueryExecuted") + + // Local wrapper / helper. + cq, err := NewContinuousQuery(dbi.Name, cqi) + if err != nil { + return err + } + + // Get the last time this CQ was run from the service's cache. + cq.LastRun = s.lastRuns[cqi.Name] + + // Set the retention policy to default if it wasn't specified in the query. + if cq.intoRP() == "" { + cq.setIntoRP(dbi.DefaultRetentionPolicy) + } + + // See if this query needs to be run. + computeNoMoreThan := time.Duration(s.Config.ComputeNoMoreThan) + if !cq.shouldRunContinuousQuery(s.Config.ComputeRunsPerInterval, computeNoMoreThan) { + return nil + } + + // We're about to run the query so store the time. + now := time.Now() + cq.LastRun = now + s.lastRuns[cqi.Name] = now + + // Get the group by interval. + interval, err := cq.q.GroupByInterval() + if err != nil || interval == 0 { + return nil + } + + // Calculate and set the time range for the query. + startTime := now.Round(interval) + if startTime.UnixNano() > now.UnixNano() { + startTime = startTime.Add(-interval) + } + + if err := cq.q.SetTimeRange(startTime, startTime.Add(interval)); err != nil { + s.Logger.Printf("error setting time range: %s\n", err) + } + + // Do the actual processing of the query & writing of results. + if err := s.runContinuousQueryAndWriteResult(cq); err != nil { + s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String()) + } + + recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan) + + for i := 0; i < s.Config.RecomputePreviousN; i++ { + // if we're already more time past the previous window than we're going to look back, stop + if now.Sub(startTime) > recomputeNoOlderThan { + return nil + } + newStartTime := startTime.Add(-interval) + + if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil { + s.Logger.Printf("error setting time range: %s\n", err) + } + + if err := s.runContinuousQueryAndWriteResult(cq); err != nil { + s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String()) + } + + startTime = newStartTime + } + return nil +} + +// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in +func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { + // Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. + q := &influxql.Query{ + Statements: influxql.Statements{cq.q}, + } + + // Execute the SELECT. + ch, err := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize) + if err != nil { + return err + } + + // Read all rows from the result channel. + for result := range ch { + if result.Err != nil { + return result.Err + } + + for _, row := range result.Series { + // Convert the result row to points. + points, err := s.convertRowToPoints(cq.intoMeasurement(), row) + if err != nil { + log.Println(err) + continue + } + + if len(points) == 0 { + continue + } + + // If the points have any nil values, can't write. + // This happens if the CQ is created and running before data is written to the measurement. + for _, p := range points { + fields := p.Fields() + for _, v := range fields { + if v == nil { + return nil + } + } + } + + // Create a write request for the points. + req := &cluster.WritePointsRequest{ + Database: cq.intoDB(), + RetentionPolicy: cq.intoRP(), + ConsistencyLevel: cluster.ConsistencyLevelAny, + Points: points, + } + + // Write the request. + if err := s.PointsWriter.WritePoints(req); err != nil { + s.Logger.Println(err) + } + } + } + + return nil +} + +// convertRowToPoints will convert a query result Row into Points that can be written back in. +// Used for continuous and INTO queries +func (s *Service) convertRowToPoints(measurementName string, row *influxql.Row) ([]tsdb.Point, error) { + // figure out which parts of the result are the time and which are the fields + timeIndex := -1 + fieldIndexes := make(map[string]int) + for i, c := range row.Columns { + if c == "time" { + timeIndex = i + } else { + fieldIndexes[c] = i + } + } + + if timeIndex == -1 { + return nil, errors.New("error finding time index in result") + } + + points := make([]tsdb.Point, 0, len(row.Values)) + for _, v := range row.Values { + vals := make(map[string]interface{}) + for fieldName, fieldIndex := range fieldIndexes { + vals[fieldName] = v[fieldIndex] + } + + p := tsdb.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time)) + + points = append(points, p) + } + + return points, nil +} + +// ContinuousQuery is a local wrapper / helper around continuous queries. +type ContinuousQuery struct { + Database string + Info *meta.ContinuousQueryInfo + LastRun time.Time + q *influxql.SelectStatement +} + +func (cq *ContinuousQuery) intoDB() string { return cq.q.Target.Measurement.Database } +func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy } +func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp } +func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name } + +// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement +func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error) { + stmt, err := influxql.NewParser(strings.NewReader(cqi.Query)).ParseStatement() + if err != nil { + return nil, err + } + + q, ok := stmt.(*influxql.SelectStatement) + if !ok { + return nil, errors.New("query isn't a valid continuous query") + } + + cquery := &ContinuousQuery{ + Database: database, + Info: cqi, + q: q, + } + + return cquery, nil +} + +// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the +// lastRunTime of the CQ and the rules for when to run set through the config to determine +// if this CQ should be run +func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool { + // if it's not aggregated we don't run it + if cq.q.IsRawQuery { + return false + } + + // since it's aggregated we need to figure how often it should be run + interval, err := cq.q.GroupByInterval() + if err != nil { + return false + } + + // determine how often we should run this continuous query. + // group by time / the number of times to compute + computeEvery := time.Duration(interval.Nanoseconds()/int64(runsPerInterval)) * time.Nanosecond + // make sure we're running no more frequently than the setting in the config + if computeEvery < noMoreThan { + computeEvery = noMoreThan + } + + // if we've passed the amount of time since the last run, do it up + if cq.LastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() { + return true + } + + return false +}