diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 69cccf91261..45f157a7449 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -43,7 +43,9 @@ type Service struct { PointsWriter pointsWriter Config *Config RunInterval time.Duration - Logger *log.Logger + // RunCh can be used by clients to signal service to run CQs. + RunCh chan struct{} + Logger *log.Logger // lastRuns maps CQ name to last time it was run. lastRuns map[string]time.Time stop chan struct{} @@ -55,6 +57,7 @@ func NewService(c Config) *Service { s := &Service{ Config: &c, RunInterval: time.Second, + RunCh: make(chan struct{}), Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags), lastRuns: map[string]time.Time{}, } @@ -97,6 +100,10 @@ func (s *Service) backgroundLoop() { select { case <-s.stop: return + case <-s.RunCh: + if s.MetaStore.IsLeader() { + s.runContinuousQueries() + } case <-time.After(s.RunInterval): if s.MetaStore.IsLeader() { s.runContinuousQueries() @@ -159,11 +166,14 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // Get the group by interval. interval, err := cq.q.GroupByInterval() - if err != nil || interval == 0 { + if err != nil { + return err + } else if interval == 0 { return nil } // Calculate and set the time range for the query. + fmt.Printf("interval = %v\n", interval) startTime := now.Round(interval) if startTime.UnixNano() > now.UnixNano() { startTime = startTime.Add(-interval) @@ -183,6 +193,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti for i := 0; i < s.Config.RecomputePreviousN; i++ { // if we're already more time past the previous window than we're going to look back, stop + fmt.Printf("now - start = %v, recomputeNoOlderThan = %v\n", now.Sub(startTime), recomputeNoOlderThan) if now.Sub(startTime) > recomputeNoOlderThan { return nil } @@ -190,10 +201,12 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil { s.Logger.Printf("error setting time range: %s\n", err) + return err } if err := s.runContinuousQueryAndWriteResult(cq); err != nil { s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String()) + return err } startTime = newStartTime @@ -203,6 +216,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { + fmt.Printf("runContinuousQueryAndWriteResult(): %s\n", cq.q.String()) // Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. q := &influxql.Query{ Statements: influxql.Statements{cq.q}, diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index fdc8a2f9732..f7e5053c604 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -21,7 +21,7 @@ var ( // Test closing never opened, open, open already open, close, and close already closed. func TestOpenAndClose(t *testing.T) { - s := NewTestService() + s := NewTestService(t) if err := s.Close(); err != nil { t.Error(err) @@ -38,12 +38,12 @@ func TestOpenAndClose(t *testing.T) { // Test ExecuteContinuousQuery happy path. func TestExecuteContinuousQuery_HappyPath(t *testing.T) { - s := NewTestService() + s := NewTestService(t) dbis, _ := s.MetaStore.Databases() dbi := dbis[0] cqi := dbi.ContinuousQueries[0] - pointCnt := 1000 + pointCnt := 100 qe := s.QueryExecutor.(*QueryExecutor) qe.Results = []*influxql.Result{genResult(1, pointCnt)} @@ -63,32 +63,84 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) { // Test the service happy path. func TestService_HappyPath(t *testing.T) { - s := NewTestService() + s := NewTestService(t) - pointCnt := 1000 + pointCnt := 100 qe := s.QueryExecutor.(*QueryExecutor) qe.Results = []*influxql.Result{genResult(1, pointCnt)} - wg := &sync.WaitGroup{} - wg.Add(1) + done := make(chan struct{}, 5) + defer close(done) pw := s.PointsWriter.(*PointsWriter) + gotCnt := -1 pw.WritePointsFn = func(p *cluster.WritePointsRequest) error { - defer wg.Done() - if len(p.Points) != pointCnt { - return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points)) - } + gotCnt = len(p.Points) + done <- struct{}{} return nil } s.Open() - // Test will timeout if the query doesn't process. - wg.Wait() + if err := wait(done, time.Second); err != nil { + t.Error(err) + } else if gotCnt != pointCnt { + t.Errorf("exp = %d, got = %d", pointCnt, gotCnt) + } + s.Close() +} + +// Test service when not the cluster leader (CQs shouldn't run). +func TestService_NotLeader(t *testing.T) { + s := NewTestService(t) + // Set RunInterval high so we can test triggering with the RunCh below. + s.RunInterval = 10 * time.Second + s.MetaStore.(*MetaStore).Leader = false + + done := make(chan struct{}) + qe := s.QueryExecutor.(*QueryExecutor) + // Set a callback for ExecuteQuery. Shouldn't get called because we're not the leader. + qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) { + done <- struct{}{} + return nil, unexpectedErr + } + + s.Open() + // Trigger service to run CQs. + s.RunCh <- struct{}{} + // Expect timeout error because ExecuteQuery callback wasn't called. + if err := wait(done, 100*time.Millisecond); err == nil { + t.Error(err) + } + s.Close() +} + +// Test service behavior when meta store fails to get databases. +func TestService_MetaStoreFailsToGetDatabases(t *testing.T) { + s := NewTestService(t) + // Set RunInterval high so we can test triggering with the RunCh below. + s.RunInterval = 10 * time.Second + s.MetaStore.(*MetaStore).Err = expectedErr + + done := make(chan struct{}) + qe := s.QueryExecutor.(*QueryExecutor) + // Set ExecuteQuery callback, which shouldn't get called because of meta store failure. + qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) { + done <- struct{}{} + return nil, unexpectedErr + } + + s.Open() + // Trigger service to run CQs. + s.RunCh <- struct{}{} + // Expect timeout error because ExecuteQuery callback wasn't called. + if err := wait(done, 100*time.Millisecond); err == nil { + t.Error(err) + } s.Close() } // Test ExecuteContinuousQuery with invalid queries. func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { - s := NewTestService() + s := NewTestService(t) dbis, _ := s.MetaStore.Databases() dbi := dbis[0] cqi := dbi.ContinuousQueries[0] @@ -107,7 +159,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { } // Group by requires aggregate. - cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1d GROUP BY time(1h)` + cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)` err = s.ExecuteContinuousQuery(&dbi, &cqi) if err == nil { t.Error("expected error but got nil") @@ -116,7 +168,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) { // Test ExecuteContinuousQuery when QueryExecutor returns an error. func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { - s := NewTestService() + s := NewTestService(t) qe := s.QueryExecutor.(*QueryExecutor) qe.Err = expectedErr @@ -131,15 +183,16 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { } // NewTestService returns a new *Service with default mock object members. -func NewTestService() *Service { +func NewTestService(t *testing.T) *Service { s := NewService(NewConfig()) - s.MetaStore = NewMetaStore() - s.QueryExecutor = NewQueryExecutor() - s.PointsWriter = NewPointsWriter() + s.MetaStore = NewMetaStore(t) + s.QueryExecutor = NewQueryExecutor(t) + s.PointsWriter = NewPointsWriter(t) + s.RunInterval = time.Millisecond // Set Logger to write to dev/null so stdout isn't polluted. - null, _ := os.Open(os.DevNull) - s.Logger = log.New(null, "", 0) + //null, _ := os.Open(os.DevNull) + s.Logger = log.New(os.Stdout, "", 0) return s } @@ -149,10 +202,11 @@ type MetaStore struct { Leader bool DatabaseInfos []meta.DatabaseInfo Err error + t *testing.T } // NewMetaStore returns a *MetaStore. -func NewMetaStore() *MetaStore { +func NewMetaStore(t *testing.T) *MetaStore { return &MetaStore{ Leader: true, DatabaseInfos: []meta.DatabaseInfo{ @@ -162,11 +216,12 @@ func NewMetaStore() *MetaStore { ContinuousQueries: []meta.ContinuousQueryInfo{ { Name: "cq", - Query: `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1d GROUP BY time(1h)`, + Query: `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`, }, }, }, }, + t: t, } } @@ -178,30 +233,31 @@ func (ms *MetaStore) Databases() ([]meta.DatabaseInfo, error) { return ms.Databa // QueryExecutor is a mock query executor. type QueryExecutor struct { - ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) error + ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) Results []*influxql.Result ResultInterval time.Duration - ResultCh chan *influxql.Result Err error ErrAfterResult int StopRespondingAfter int Wg *sync.WaitGroup + t *testing.T } // NewQueryExecutor returns a *QueryExecutor. -func NewQueryExecutor() *QueryExecutor { +func NewQueryExecutor(t *testing.T) *QueryExecutor { return &QueryExecutor{ - ResultCh: make(chan *influxql.Result, 0), ErrAfterResult: -1, StopRespondingAfter: -1, + t: t, } } // ExecuteQuery returns a channel that the caller can read query results from. func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) { - // Call the callback if set. + + // If the test set a callback, call it. if qe.ExecuteQueryFn != nil { - if err := qe.ExecuteQueryFn(query, database, chunkSize); err != nil { + if _, err := qe.ExecuteQueryFn(query, database, chunkSize); err != nil { return nil, err } } @@ -211,28 +267,33 @@ func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, ch return nil, qe.Err } + ch := make(chan *influxql.Result) qe.Wg = &sync.WaitGroup{} qe.Wg.Add(1) // Start a go routine to send results and / or error. go func() { - defer qe.Wg.Done() + defer func() { qe.t.Log("ExecuteQuery(): go routine exited"); qe.Wg.Done() }() + n := 0 for i, r := range qe.Results { - if i == qe.ErrAfterResult { - qe.ResultCh <- &influxql.Result{Err: qe.Err} - close(qe.ResultCh) + if i == qe.ErrAfterResult-1 { + qe.t.Logf("ExecuteQuery(): ErrAfterResult %d", qe.ErrAfterResult-1) + ch <- &influxql.Result{Err: qe.Err} + close(ch) return } else if i == qe.StopRespondingAfter { + qe.t.Log("ExecuteQuery(): StopRespondingAfter") return } - - qe.ResultCh <- r + ch <- r + n++ time.Sleep(qe.ResultInterval) } - close(qe.ResultCh) + qe.t.Logf("ExecuteQuery(): all (%d) results sent", n) + close(ch) }() - return qe.ResultCh, nil + return ch, nil } // PointsWriter is a mock points writer. @@ -240,16 +301,20 @@ type PointsWriter struct { WritePointsFn func(p *cluster.WritePointsRequest) error Err error PointsPerSecond int + t *testing.T } // NewPointsWriter returns a new *PointsWriter. -func NewPointsWriter() *PointsWriter { +func NewPointsWriter(t *testing.T) *PointsWriter { return &PointsWriter{ PointsPerSecond: 25000, + t: t, } } +// WritePoints mocks writing points. func (pw *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error { + // If the test set a callback, call it. if pw.WritePointsFn != nil { if err := pw.WritePointsFn(p); err != nil { return err @@ -286,3 +351,12 @@ func genResult(rowCnt, valCnt int) *influxql.Result { Series: rows, } } + +func wait(c chan struct{}, d time.Duration) (err error) { + select { + case <-c: + case <-time.After(d): + err = errors.New("timed out") + } + return +}