Skip to content

Commit

Permalink
fix #2733: add tests for CQ service
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton committed Jun 5, 2015
1 parent 077159a commit e0838cb
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 17 deletions.
59 changes: 42 additions & 17 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package continuous_querier

import (
"errors"
"fmt"
"log"
"os"
"strings"
Expand All @@ -19,11 +20,27 @@ const (
NoChunkingSize = 0
)

// queryExecutor is an internal interface to make testing easier.
type queryExecutor interface {
ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
}

// metaStore is an internal interface to make testing easier.
type metaStore interface {
IsLeader() bool
Databases() ([]meta.DatabaseInfo, error)
}

// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}

// Service manages continuous query execution.
type Service struct {
MetaStore *meta.Store
QueryExecutor *tsdb.QueryExecutor
PointsWriter *cluster.PointsWriter
MetaStore metaStore
QueryExecutor queryExecutor
PointsWriter pointsWriter
Config *Config
RunInterval time.Duration
Logger *log.Logger
Expand All @@ -50,13 +67,9 @@ func (s *Service) Open() error {
return nil
}

if s.MetaStore == nil {
panic("MetaStore is nil")
} else if s.QueryExecutor == nil {
panic("QueryExecutor is nil")
} else if s.PointsWriter == nil {
panic("PointsWriter is nil")
}
assert(s.MetaStore != nil, "MetaStore is nil")
assert(s.QueryExecutor != nil, "QueryExecutor is nil")
assert(s.PointsWriter != nil, "PointsWriter is nil")

s.stop = make(chan struct{})
s.wg = &sync.WaitGroup{}
Expand Down Expand Up @@ -132,7 +145,10 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

// See if this query needs to be run.
computeNoMoreThan := time.Duration(s.Config.ComputeNoMoreThan)
if !cq.shouldRunContinuousQuery(s.Config.ComputeRunsPerInterval, computeNoMoreThan) {
run, err := cq.shouldRunContinuousQuery(s.Config.ComputeRunsPerInterval, computeNoMoreThan)
if err != nil {
return err
} else if !run {
return nil
}

Expand Down Expand Up @@ -160,6 +176,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
// Do the actual processing of the query & writing of results.
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Printf("error: %s. running: %s\n", err, cq.q.String())
return err
}

recomputeNoOlderThan := time.Duration(s.Config.RecomputeNoOlderThan)
Expand Down Expand Up @@ -237,6 +254,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
// Write the request.
if err := s.PointsWriter.WritePoints(req); err != nil {
s.Logger.Println(err)
return err
}
}
}
Expand Down Expand Up @@ -298,7 +316,7 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin
}

q, ok := stmt.(*influxql.SelectStatement)
if !ok {
if !ok || q.Target == nil || q.Target.Measurement == nil {
return nil, errors.New("query isn't a valid continuous query")
}

Expand All @@ -314,16 +332,16 @@ func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*Contin
// shouldRunContinuousQuery returns true if the CQ should be schedule to run. It will use the
// lastRunTime of the CQ and the rules for when to run set through the config to determine
// if this CQ should be run
func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) bool {
func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreThan time.Duration) (bool, error) {
// if it's not aggregated we don't run it
if cq.q.IsRawQuery {
return false
return false, errors.New("continuous queries must be aggregate queries")
}

// since it's aggregated we need to figure how often it should be run
interval, err := cq.q.GroupByInterval()
if err != nil {
return false
return false, err
}

// determine how often we should run this continuous query.
Expand All @@ -336,8 +354,15 @@ func (cq *ContinuousQuery) shouldRunContinuousQuery(runsPerInterval int, noMoreT

// if we've passed the amount of time since the last run, do it up
if cq.LastRun.Add(computeEvery).UnixNano() <= time.Now().UnixNano() {
return true
return true, nil
}

return false
return false, nil
}

// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
Loading

0 comments on commit e0838cb

Please sign in to comment.