-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
357 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,343 @@ | ||
package continuous_querier | ||
|
||
import ( | ||
"errors" | ||
"log" | ||
"os" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdb/influxdb/cluster" | ||
"github.com/influxdb/influxdb/influxql" | ||
"github.com/influxdb/influxdb/meta" | ||
"github.com/influxdb/influxdb/tsdb" | ||
) | ||
|
||
const ( | ||
// When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries | ||
NoChunkingSize = 0 | ||
) | ||
|
||
// Service manages continuous query execution. | ||
type Service struct { | ||
MetaStore *meta.Store | ||
QueryExecutor *tsdb.QueryExecutor | ||
PointsWriter *cluster.PointsWriter | ||
Config *Config | ||
RunInterval time.Duration | ||
Logger *log.Logger | ||
// lastRuns maps CQ name to last time it was run. | ||
lastRuns map[string]time.Time | ||
stop chan struct{} | ||
wg *sync.WaitGroup | ||
} | ||
|
||
// NewService returns a new instance of Service. | ||
func NewService(c Config) *Service { | ||
s := &Service{ | ||
Config: &c, | ||
RunInterval: time.Second, | ||
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), | ||
lastRuns: map[string]time.Time{}, | ||
} | ||
return s | ||
} | ||
|
||
// Open starts the service. | ||
func (s *Service) Open() error { | ||
if s.stop != nil { | ||
return nil | ||
} | ||
|
||
if s.MetaStore == nil { | ||
panic("MetaStore is nil") | ||
} else if s.QueryExecutor == nil { | ||
panic("QueryExecutor is nil") | ||
} else if s.PointsWriter == nil { | ||
panic("PointsWriter is nil") | ||
} | ||
|
||
s.stop = make(chan struct{}) | ||
s.wg = &sync.WaitGroup{} | ||
s.wg.Add(1) | ||
go s.backgroundLoop() | ||
return nil | ||
} | ||
|
||
// Close stops the service. | ||
func (s *Service) Close() error { | ||
if s.stop == nil { | ||
return nil | ||
} | ||
close(s.stop) | ||
s.wg.Wait() | ||
s.wg = nil | ||
s.stop = nil | ||
return nil | ||
} | ||
|
||
// backgroundLoop runs on a go routine and periodically executes CQs. | ||
func (s *Service) backgroundLoop() { | ||
defer s.wg.Done() | ||
for { | ||
select { | ||
case <-s.stop: | ||
return | ||
case <-time.After(s.RunInterval): | ||
if s.MetaStore.IsLeader() { | ||
s.runContinuousQueries() | ||
} | ||
} | ||
} | ||
} | ||
|
||
// runContinuousQueries gets CQs from the meta store and runs them. | ||
func (s *Service) runContinuousQueries() { | ||
// Get list of all databases. | ||
dbs, err := s.MetaStore.Databases() | ||
if err != nil { | ||
s.Logger.Println("error getting databases") | ||
return | ||
} | ||
// Loop through all databases executing CQs. | ||
for _, db := range dbs { | ||
// TODO: distribute across nodes | ||
for _, cq := range db.ContinuousQueries { | ||
if err := s.ExecuteContinuousQuery(&db, &cq); err != nil { | ||
s.Logger.Println(err) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// ExecuteContinuousQuery executes a single CQ. | ||
func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo) error { | ||
// TODO: re-enable stats | ||
//s.stats.Inc("continuousQueryExecuted") | ||
|
||
// Local wrapper / helper. | ||
cq, err := NewContinuousQuery(dbi.Name, cqi) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Get the last time this CQ was run from the service's cache. | ||
cq.LastRun = s.lastRuns[cqi.Name] | ||
|
||
// Set the retention policy to default if it wasn't specified in the query. | ||
if cq.intoRP() == "" { | ||
cq.setIntoRP(dbi.DefaultRetentionPolicy) | ||
} | ||
|
||
// See if this query needs to be run. | ||
computeNoMoreThan := time.Duration(s.Config.ComputeNoMoreThan) | ||
if !cq.shouldRunContinuousQuery(s.Config.ComputeRunsPerInterval, computeNoMoreThan) { | ||
return nil | ||
} | ||
|
||
// We're about to run the query so store the time. | ||
now := time.Now() | ||
cq.LastRun = now | ||
s.lastRuns[cqi.Name] = now | ||
|
||
// Get the group by interval. | ||
interval, err := cq.q.GroupByInterval() | ||
if err != nil || interval == 0 { | ||
return nil | ||
} | ||
|
||
// Calculate and set the time range for the query. | ||
startTime := now.Round(interval) | ||
if startTime.UnixNano() > now.UnixNano() { | ||
startTime = startTime.Add(-interval) | ||
} | ||
|
||
if err := cq.q.SetTimeRange(startTime, startTime.Add(interval)); err != nil { | ||
s.Logger.Printf("error setting time range: %s\n", err) | ||
} | ||
|
||
// Do the actual processing of the query & writing of results. | ||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil { | ||
s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String()) | ||
} | ||
|
||
recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan) | ||
|
||
for i := 0; i < s.Config.RecomputePreviousN; i++ { | ||
// if we're already more time past the previous window than we're going to look back, stop | ||
if now.Sub(startTime) > recomputeNoOlderThan { | ||
return nil | ||
} | ||
newStartTime := startTime.Add(-interval) | ||
|
||
if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil { | ||
s.Logger.Printf("error setting time range: %s\n", err) | ||
} | ||
|
||
if err := s.runContinuousQueryAndWriteResult(cq); err != nil { | ||
s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String()) | ||
} | ||
|
||
startTime = newStartTime | ||
} | ||
return nil | ||
} | ||
|
||
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in | ||
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { | ||
// Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. | ||
q := &influxql.Query{ | ||
Statements: influxql.Statements{cq.q}, | ||
} | ||
|
||
// Execute the SELECT. | ||
ch, err := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Read all rows from the result channel. | ||
for result := range ch { | ||
if result.Err != nil { | ||
return result.Err | ||
} | ||
|
||
for _, row := range result.Series { | ||
// Convert the result row to points. | ||
points, err := s.convertRowToPoints(cq.intoMeasurement(), row) | ||
if err != nil { | ||
log.Println(err) | ||
continue | ||
} | ||
|
||
if len(points) == 0 { | ||
continue | ||
} | ||
|
||
// If the points have any nil values, can't write. | ||
// This happens if the CQ is created and running before data is written to the measurement. | ||
for _, p := range points { | ||
fields := p.Fields() | ||
for _, v := range fields { | ||
if v == nil { | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
// Create a write request for the points. | ||
req := &cluster.WritePointsRequest{ | ||
Database: cq.intoDB(), | ||
RetentionPolicy: cq.intoRP(), | ||
ConsistencyLevel: cluster.ConsistencyLevelAny, | ||
Points: points, | ||
} | ||
|
||
// Write the request. | ||
if err := s.PointsWriter.WritePoints(req); err != nil { | ||
s.Logger.Println(err) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// convertRowToPoints will convert a query result Row into Points that can be written back in. | ||
// Used for continuous and INTO queries | ||
func (s *Service) convertRowToPoints(measurementName string, row *influxql.Row) ([]tsdb.Point, error) { | ||
// figure out which parts of the result are the time and which are the fields | ||
timeIndex := -1 | ||
fieldIndexes := make(map[string]int) | ||
for i, c := range row.Columns { | ||
if c == "time" { | ||
timeIndex = i | ||
} else { | ||
fieldIndexes[c] = i | ||
} | ||
} | ||
|
||
if timeIndex == -1 { | ||
return nil, errors.New("error finding time index in result") | ||
} | ||
|
||
points := make([]tsdb.Point, 0, len(row.Values)) | ||
for _, v := range row.Values { | ||
vals := make(map[string]interface{}) | ||
for fieldName, fieldIndex := range fieldIndexes { | ||
vals[fieldName] = v[fieldIndex] | ||
} | ||
|
||
p := tsdb.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time)) | ||
|
||
points = append(points, p) | ||
} | ||
|
||
return points, nil | ||
} | ||
|
||
// ContinuousQuery is a local wrapper / helper around continuous queries. | ||
type ContinuousQuery struct { | ||
Database string | ||
Info *meta.ContinuousQueryInfo | ||
LastRun time.Time | ||
q *influxql.SelectStatement | ||
} | ||
|
||
func (cq *ContinuousQuery) intoDB() string { return cq.q.Target.Measurement.Database } | ||
func (cq *ContinuousQuery) intoRP() string { return cq.q.Target.Measurement.RetentionPolicy } | ||
func (cq *ContinuousQuery) setIntoRP(rp string) { cq.q.Target.Measurement.RetentionPolicy = rp } | ||
func (cq *ContinuousQuery) intoMeasurement() string { return cq.q.Target.Measurement.Name } | ||
|
||
// NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement | ||
func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error) { | ||
stmt, err := influxql.NewParser(strings.NewReader(cqi.Query)).ParseStatement() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
q, ok := stmt.(*influxql.SelectStatement) | ||
if !ok { | ||
return nil, errors.New("query isn't a valid continuous query") | ||
} | ||
|
||
cquery := &ContinuousQuery{ | ||
Database: database, | ||
Info: cqi, | ||
q: q, | ||
} | ||
|
||
return cquery, nil | ||
} | ||
|
||
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the | ||
// lastRunTime of the CQ and the rules for when to run set through the config to determine | ||
// if this CQ should be run | ||
func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool { | ||
// if it's not aggregated we don't run it | ||
if cq.q.IsRawQuery { | ||
return false | ||
} | ||
|
||
// since it's aggregated we need to figure how often it should be run | ||
interval, err := cq.q.GroupByInterval() | ||
if err != nil { | ||
return false | ||
} | ||
|
||
// determine how often we should run this continuous query. | ||
// group by time / the number of times to compute | ||
computeEvery := time.Duration(interval.Nanoseconds()/int64(runsPerInterval)) * time.Nanosecond | ||
// make sure we're running no more frequently than the setting in the config | ||
if computeEvery < noMoreThan { | ||
computeEvery = noMoreThan | ||
} | ||
|
||
// if we've passed the amount of time since the last run, do it up | ||
if cq.LastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() { | ||
return true | ||
} | ||
|
||
return false | ||
} |