Skip to content

Commit

Permalink
fix #2733: more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton committed Jun 5, 2015
1 parent e0838cb commit 8699aa8
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 41 deletions.
18 changes: 16 additions & 2 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ type Service struct {
PointsWriter pointsWriter
Config *Config
RunInterval time.Duration
Logger *log.Logger
// RunCh can be used by clients to signal service to run CQs.
RunCh chan struct{}
Logger *log.Logger
// lastRuns maps CQ name to last time it was run.
lastRuns map[string]time.Time
stop chan struct{}
Expand All @@ -55,6 +57,7 @@ func NewService(c Config) *Service {
s := &Service{
Config: &c,
RunInterval: time.Second,
RunCh: make(chan struct{}),
Logger: log.New(os.Stderr, "[continuous_querier] ", log.LstdFlags),
lastRuns: map[string]time.Time{},
}
Expand Down Expand Up @@ -97,6 +100,10 @@ func (s *Service) backgroundLoop() {
select {
case <-s.stop:
return
case <-s.RunCh:
if s.MetaStore.IsLeader() {
s.runContinuousQueries()
}
case <-time.After(s.RunInterval):
if s.MetaStore.IsLeader() {
s.runContinuousQueries()
Expand Down Expand Up @@ -159,11 +166,14 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

// Get the group by interval.
interval, err := cq.q.GroupByInterval()
if err != nil || interval == 0 {
if err != nil {
return err
} else if interval == 0 {
return nil
}

// Calculate and set the time range for the query.
fmt.Printf("interval = %v\n", interval)
startTime := now.Round(interval)
if startTime.UnixNano() > now.UnixNano() {
startTime = startTime.Add(-interval)
Expand All @@ -183,17 +193,20 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

for i := 0; i < s.Config.RecomputePreviousN; i++ {
// if we're already more time past the previous window than we're going to look back, stop
fmt.Printf("now - start = %v, recomputeNoOlderThan = %v\n", now.Sub(startTime), recomputeNoOlderThan)
if now.Sub(startTime) > recomputeNoOlderThan {
return nil
}
newStartTime := startTime.Add(-interval)

if err := cq.q.SetTimeRange(newStartTime, startTime); err != nil {
s.Logger.Printf("error setting time range: %s\n", err)
return err
}

if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Printf("error during recompute previous: %s. running: %s\n", err, cq.q.String())
return err
}

startTime = newStartTime
Expand All @@ -203,6 +216,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
fmt.Printf("runContinuousQueryAndWriteResult(): %s\n", cq.q.String())
// Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor.
q := &influxql.Query{
Statements: influxql.Statements{cq.q},
Expand Down
152 changes: 113 additions & 39 deletions services/continuous_querier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (

// Test closing never opened, open, open already open, close, and close already closed.
func TestOpenAndClose(t *testing.T) {
s := NewTestService()
s := NewTestService(t)

if err := s.Close(); err != nil {
t.Error(err)
Expand All @@ -38,12 +38,12 @@ func TestOpenAndClose(t *testing.T) {

// Test ExecuteContinuousQuery happy path.
func TestExecuteContinuousQuery_HappyPath(t *testing.T) {
s := NewTestService()
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]

pointCnt := 1000
pointCnt := 100
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(1, pointCnt)}

Expand All @@ -63,32 +63,84 @@ func TestExecuteContinuousQuery_HappyPath(t *testing.T) {

// Test the service happy path.
func TestService_HappyPath(t *testing.T) {
s := NewTestService()
s := NewTestService(t)

pointCnt := 1000
pointCnt := 100
qe := s.QueryExecutor.(*QueryExecutor)
qe.Results = []*influxql.Result{genResult(1, pointCnt)}

wg := &sync.WaitGroup{}
wg.Add(1)
done := make(chan struct{}, 5)
defer close(done)
pw := s.PointsWriter.(*PointsWriter)
gotCnt := -1
pw.WritePointsFn = func(p *cluster.WritePointsRequest) error {
defer wg.Done()
if len(p.Points) != pointCnt {
return fmt.Errorf("exp = %d, got = %d", pointCnt, len(p.Points))
}
gotCnt = len(p.Points)
done <- struct{}{}
return nil
}

s.Open()
// Test will timeout if the query doesn't process.
wg.Wait()
if err := wait(done, time.Second); err != nil {
t.Error(err)
} else if gotCnt != pointCnt {
t.Errorf("exp = %d, got = %d", pointCnt, gotCnt)
}
s.Close()
}

// Test service when not the cluster leader (CQs shouldn't run).
func TestService_NotLeader(t *testing.T) {
s := NewTestService(t)
// Set RunInterval high so we can test triggering with the RunCh below.
s.RunInterval = 10 * time.Second
s.MetaStore.(*MetaStore).Leader = false

done := make(chan struct{})
qe := s.QueryExecutor.(*QueryExecutor)
// Set a callback for ExecuteQuery. Shouldn't get called because we're not the leader.
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
done <- struct{}{}
return nil, unexpectedErr
}

s.Open()
// Trigger service to run CQs.
s.RunCh <- struct{}{}
// Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait(done, 100*time.Millisecond); err == nil {
t.Error(err)
}
s.Close()
}

// Test service behavior when meta store fails to get databases.
func TestService_MetaStoreFailsToGetDatabases(t *testing.T) {
s := NewTestService(t)
// Set RunInterval high so we can test triggering with the RunCh below.
s.RunInterval = 10 * time.Second
s.MetaStore.(*MetaStore).Err = expectedErr

done := make(chan struct{})
qe := s.QueryExecutor.(*QueryExecutor)
// Set ExecuteQuery callback, which shouldn't get called because of meta store failure.
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
done <- struct{}{}
return nil, unexpectedErr
}

s.Open()
// Trigger service to run CQs.
s.RunCh <- struct{}{}
// Expect timeout error because ExecuteQuery callback wasn't called.
if err := wait(done, 100*time.Millisecond); err == nil {
t.Error(err)
}
s.Close()
}

// Test ExecuteContinuousQuery with invalid queries.
func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
s := NewTestService()
s := NewTestService(t)
dbis, _ := s.MetaStore.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]
Expand All @@ -107,7 +159,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
}

// Group by requires aggregate.
cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1d GROUP BY time(1h)`
cqi.Query = `SELECT value INTO other_value FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`
err = s.ExecuteContinuousQuery(&dbi, &cqi)
if err == nil {
t.Error("expected error but got nil")
Expand All @@ -116,7 +168,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {

// Test ExecuteContinuousQuery when QueryExecutor returns an error.
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
s := NewTestService()
s := NewTestService(t)
qe := s.QueryExecutor.(*QueryExecutor)
qe.Err = expectedErr

Expand All @@ -131,15 +183,16 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
}

// NewTestService returns a new *Service with default mock object members.
func NewTestService() *Service {
func NewTestService(t *testing.T) *Service {
s := NewService(NewConfig())
s.MetaStore = NewMetaStore()
s.QueryExecutor = NewQueryExecutor()
s.PointsWriter = NewPointsWriter()
s.MetaStore = NewMetaStore(t)
s.QueryExecutor = NewQueryExecutor(t)
s.PointsWriter = NewPointsWriter(t)
s.RunInterval = time.Millisecond

// Set Logger to write to dev/null so stdout isn't polluted.
null, _ := os.Open(os.DevNull)
s.Logger = log.New(null, "", 0)
//null, _ := os.Open(os.DevNull)
s.Logger = log.New(os.Stdout, "", 0)

return s
}
Expand All @@ -149,10 +202,11 @@ type MetaStore struct {
Leader bool
DatabaseInfos []meta.DatabaseInfo
Err error
t *testing.T
}

// NewMetaStore returns a *MetaStore.
func NewMetaStore() *MetaStore {
func NewMetaStore(t *testing.T) *MetaStore {
return &MetaStore{
Leader: true,
DatabaseInfos: []meta.DatabaseInfo{
Expand All @@ -162,11 +216,12 @@ func NewMetaStore() *MetaStore {
ContinuousQueries: []meta.ContinuousQueryInfo{
{
Name: "cq",
Query: `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1d GROUP BY time(1h)`,
Query: `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`,
},
},
},
},
t: t,
}
}

Expand All @@ -178,30 +233,31 @@ func (ms *MetaStore) Databases() ([]meta.DatabaseInfo, error) { return ms.Databa

// QueryExecutor is a mock query executor.
type QueryExecutor struct {
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) error
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
Results []*influxql.Result
ResultInterval time.Duration
ResultCh chan *influxql.Result
Err error
ErrAfterResult int
StopRespondingAfter int
Wg *sync.WaitGroup
t *testing.T
}

// NewQueryExecutor returns a *QueryExecutor.
func NewQueryExecutor() *QueryExecutor {
func NewQueryExecutor(t *testing.T) *QueryExecutor {
return &QueryExecutor{
ResultCh: make(chan *influxql.Result, 0),
ErrAfterResult: -1,
StopRespondingAfter: -1,
t: t,
}
}

// ExecuteQuery returns a channel that the caller can read query results from.
func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
// Call the callback if set.

// If the test set a callback, call it.
if qe.ExecuteQueryFn != nil {
if err := qe.ExecuteQueryFn(query, database, chunkSize); err != nil {
if _, err := qe.ExecuteQueryFn(query, database, chunkSize); err != nil {
return nil, err
}
}
Expand All @@ -211,45 +267,54 @@ func (qe *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, ch
return nil, qe.Err
}

ch := make(chan *influxql.Result)
qe.Wg = &sync.WaitGroup{}
qe.Wg.Add(1)

// Start a go routine to send results and / or error.
go func() {
defer qe.Wg.Done()
defer func() { qe.t.Log("ExecuteQuery(): go routine exited"); qe.Wg.Done() }()
n := 0
for i, r := range qe.Results {
if i == qe.ErrAfterResult {
qe.ResultCh <- &influxql.Result{Err: qe.Err}
close(qe.ResultCh)
if i == qe.ErrAfterResult-1 {
qe.t.Logf("ExecuteQuery(): ErrAfterResult %d", qe.ErrAfterResult-1)
ch <- &influxql.Result{Err: qe.Err}
close(ch)
return
} else if i == qe.StopRespondingAfter {
qe.t.Log("ExecuteQuery(): StopRespondingAfter")
return
}

qe.ResultCh <- r
ch <- r
n++
time.Sleep(qe.ResultInterval)
}
close(qe.ResultCh)
qe.t.Logf("ExecuteQuery(): all (%d) results sent", n)
close(ch)
}()

return qe.ResultCh, nil
return ch, nil
}

// PointsWriter is a mock points writer.
type PointsWriter struct {
WritePointsFn func(p *cluster.WritePointsRequest) error
Err error
PointsPerSecond int
t *testing.T
}

// NewPointsWriter returns a new *PointsWriter.
func NewPointsWriter() *PointsWriter {
func NewPointsWriter(t *testing.T) *PointsWriter {
return &PointsWriter{
PointsPerSecond: 25000,
t: t,
}
}

// WritePoints mocks writing points.
func (pw *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error {
// If the test set a callback, call it.
if pw.WritePointsFn != nil {
if err := pw.WritePointsFn(p); err != nil {
return err
Expand Down Expand Up @@ -286,3 +351,12 @@ func genResult(rowCnt, valCnt int) *influxql.Result {
Series: rows,
}
}

func wait(c chan struct{}, d time.Duration) (err error) {
select {
case <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}

0 comments on commit 8699aa8

Please sign in to comment.