Skip to content

Commit

Permalink
fix #2733: add endpoint to trigger CQ(s)
Browse files Browse the repository at this point in the history
  • Loading branch information
dgnorton committed Jun 5, 2015
1 parent 8699aa8 commit fb514f2
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 18 deletions.
10 changes: 9 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ func NewServer(c *Config) *Server {
// Append services.
s.appendClusterService(c.Cluster)
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
s.appendUDPService(c.UDP)
s.appendRetentionPolicyService(c.Retention)
s.appendContinuousQueryService(c.ContinuousQuery)
for _, g := range c.Graphites {
s.appendGraphiteService(g)
}
Expand Down Expand Up @@ -106,6 +106,14 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.MetaStore = s.MetaStore
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter

// If a ContinuousQuerier service has been started, attach it.
for _, srvc := range s.Services {
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
srv.Handler.ContinuousQuerier = cqsrvc
}
}

s.Services = append(s.Services, srv)
}

Expand Down
52 changes: 49 additions & 3 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ const (
NoChunkingSize = 0
)

// ContinuousQuerier represents a service that executes continuous queries.
type ContinuousQuerier interface {
// Run executes the named query in the named database. Blank database or name matches all.
Run(database, name string) error
}

// queryExecutor is an internal interface to make testing easier.
type queryExecutor interface {
ExecuteQuery(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
Expand All @@ -29,6 +35,7 @@ type queryExecutor interface {
type metaStore interface {
IsLeader() bool
Databases() ([]meta.DatabaseInfo, error)
Database(name string) (*meta.DatabaseInfo, error)
}

// pointsWriter is an internal interface to make testing easier.
Expand Down Expand Up @@ -93,6 +100,48 @@ func (s *Service) Close() error {
return nil
}

// Run runs the specified continuous query, or all CQs if none is specified.
func (s *Service) Run(database, name string) error {
fmt.Printf("database = %v\n", database)
fmt.Printf("name = %v\n", name)
var dbs []meta.DatabaseInfo

if database != "" {
// Find the requested database.
db, err := s.MetaStore.Database(database)
if err != nil {
return err
} else if db == nil {
return tsdb.ErrDatabaseNotFound(database)
}
dbs = append(dbs, *db)
} else {
// Get all databases.
var err error
dbs, err = s.MetaStore.Databases()
if err != nil {
return err
}
}

// Loop through databases.
for _, db := range dbs {
// Loop through CQs in each DB executing the ones that match name.
for _, cq := range db.ContinuousQueries {
if name == "" || cq.Name == name {
// Reset the last run time for the CQ.
fmt.Printf("running = %v\n", name)
s.lastRuns[cq.Name] = time.Time{}
}
}
}

// Signal the background routine to run CQs.
s.RunCh <- struct{}{}

return nil
}

// backgroundLoop runs on a go routine and periodically executes CQs.
func (s *Service) backgroundLoop() {
defer s.wg.Done()
Expand Down Expand Up @@ -173,7 +222,6 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}

// Calculate and set the time range for the query.
fmt.Printf("interval = %v\n", interval)
startTime := now.Round(interval)
if startTime.UnixNano() > now.UnixNano() {
startTime = startTime.Add(-interval)
Expand All @@ -193,7 +241,6 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

for i := 0; i < s.Config.RecomputePreviousN; i++ {
// if we're already more time past the previous window than we're going to look back, stop
fmt.Printf("now - start = %v, recomputeNoOlderThan = %v\n", now.Sub(startTime), recomputeNoOlderThan)
if now.Sub(startTime) > recomputeNoOlderThan {
return nil
}
Expand All @@ -216,7 +263,6 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti

// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
fmt.Printf("runContinuousQueryAndWriteResult(): %s\n", cq.q.String())
// Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor.
q := &influxql.Query{
Statements: influxql.Statements{cq.q},
Expand Down
146 changes: 132 additions & 14 deletions services/continuous_querier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,59 @@ func TestService_HappyPath(t *testing.T) {
s.Close()
}

// Test Run method.
func TestService_Run(t *testing.T) {
s := NewTestService(t)

// Set RunInterval high so we can trigger using Run method.
s.RunInterval = 10 * time.Minute

// Only want one call to ExecuteQueryFn per CQ.
s.Config.RecomputePreviousN = 0

done := make(chan struct{})
expectCallCnt := 2
callCnt := 0

// Set a callback for ExecuteQuery.
qe := s.QueryExecutor.(*QueryExecutor)
qe.ExecuteQueryFn = func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error) {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
}
return nil, nil
}

s.Open()
// Trigger service to run all CQs.
s.Run("", "")
// Shouldn't time out.
if err := wait(done, 100*time.Millisecond); err != nil {
t.Error(err)
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait(done, 100*time.Millisecond); err == nil {
t.Error("too many queries executed")
}
s.Close()

// Now test just one query.
expectCallCnt = 1
callCnt = 0
s.Open()
s.Run("db", "cq")
// Shouldn't time out.
if err := wait(done, 100*time.Millisecond); err != nil {
t.Error(err)
}
// This time it should timeout because ExecuteQuery should not get called again.
if err := wait(done, 100*time.Millisecond); err == nil {
t.Error("too many queries executed")
}
s.Close()
}

// Test service when not the cluster leader (CQs shouldn't run).
func TestService_NotLeader(t *testing.T) {
s := NewTestService(t)
Expand Down Expand Up @@ -185,7 +238,8 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
// NewTestService returns a new *Service with default mock object members.
func NewTestService(t *testing.T) *Service {
s := NewService(NewConfig())
s.MetaStore = NewMetaStore(t)
ms := NewMetaStore(t)
s.MetaStore = ms
s.QueryExecutor = NewQueryExecutor(t)
s.PointsWriter = NewPointsWriter(t)
s.RunInterval = time.Millisecond
Expand All @@ -194,6 +248,12 @@ func NewTestService(t *testing.T) *Service {
//null, _ := os.Open(os.DevNull)
s.Logger = log.New(os.Stdout, "", 0)

// Add a couple test databases and CQs.
ms.CreateDatabase("db", "rp")
ms.CreateContinuousQuery("db", "cq", `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`)
ms.CreateDatabase("db2", "default")
ms.CreateContinuousQuery("db2", "cq2", `SELECT mean(value) INTO cpu_mean FROM cpu WHERE time > now() - 10m GROUP BY time(1m))`)

return s
}

Expand All @@ -209,19 +269,7 @@ type MetaStore struct {
func NewMetaStore(t *testing.T) *MetaStore {
return &MetaStore{
Leader: true,
DatabaseInfos: []meta.DatabaseInfo{
{
Name: "db",
DefaultRetentionPolicy: "rp",
ContinuousQueries: []meta.ContinuousQueryInfo{
{
Name: "cq",
Query: `SELECT count(cpu) INTO cpu_count FROM cpu WHERE time > now() - 1h GROUP BY time(1s)`,
},
},
},
},
t: t,
t: t,
}
}

Expand All @@ -231,6 +279,70 @@ func (ms *MetaStore) IsLeader() bool { return ms.Leader }
// Databases returns a list of database info about each database in the cluster.
func (ms *MetaStore) Databases() ([]meta.DatabaseInfo, error) { return ms.DatabaseInfos, ms.Err }

// Database returns a single database by name.
func (ms *MetaStore) Database(name string) (*meta.DatabaseInfo, error) {
if ms.Err != nil {
return nil, ms.Err
}
for i := range ms.DatabaseInfos {
if ms.DatabaseInfos[i].Name == name {
return &ms.DatabaseInfos[i], nil
}
}
return nil, fmt.Errorf("database not found: %s", name)
}

// CreateDatabase adds a new database to the meta store.
func (ms *MetaStore) CreateDatabase(name, defaultRetentionPolicy string) error {
if ms.Err != nil {
return ms.Err
}

// See if the database already exists.
for _, dbi := range ms.DatabaseInfos {
if dbi.Name == name {
return fmt.Errorf("database already exists: %s", name)
}
}

// Create database.
ms.DatabaseInfos = append(ms.DatabaseInfos, meta.DatabaseInfo{
Name: name,
DefaultRetentionPolicy: defaultRetentionPolicy,
})

return nil
}

// CreateContinuousQuery adds a CQ to the meta store.
func (ms *MetaStore) CreateContinuousQuery(database, name, query string) error {
if ms.Err != nil {
return ms.Err
}

dbi, err := ms.Database(database)
if err != nil {
return err
} else if dbi == nil {
return fmt.Errorf("database not found: %s", database)
}

// See if CQ already exists.
for _, cqi := range dbi.ContinuousQueries {
if cqi.Name == name {
return fmt.Errorf("continuous query already exists: %s", name)
}
}

// Create a new CQ and store it.
dbi.ContinuousQueries = append(dbi.ContinuousQueries, meta.ContinuousQueryInfo{
Name: name,
Query: query,
})

return nil
}

// QueryExecutor is a mock query executor.
type QueryExecutor struct {
ExecuteQueryFn func(query *influxql.Query, database string, chunkSize int) (<-chan *influxql.Result, error)
Expand Down Expand Up @@ -360,3 +472,9 @@ func wait(c chan struct{}, d time.Duration) (err error) {
}
return
}

func check(err error) {
if err != nil {
panic(err)
}
}
30 changes: 30 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/continuous_querier"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/uuid"
)
Expand Down Expand Up @@ -66,6 +67,8 @@ type Handler struct {
WritePoints(p *cluster.WritePointsRequest) error
}

ContinuousQuerier continuous_querier.ContinuousQuerier

Logger *log.Logger
loggingEnabled bool // Log every HTTP access.
WriteTrace bool // Detailed logging of write path
Expand Down Expand Up @@ -102,6 +105,10 @@ func NewHandler(requireAuthentication, loggingEnabled bool, version string) *Han
"ping-head",
"HEAD", "/ping", true, true, h.servePing,
},
route{ // Tell data node to run CQs that should be run
"process_continuous_queries",
"POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
},
// route{
// "dump", // export all points in the given db.
// "GET", "/dump", true, true, h.serveDump,
Expand Down Expand Up @@ -159,6 +166,29 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mux.ServeHTTP(w, r)
}

func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
// If the continuous query service isn't configured, return 404.
if h.ContinuousQuerier == nil {
w.WriteHeader(http.StatusNotImplemented)
return
}

q := r.URL.Query()

// Get the database name (blank means all databases).
db := q.Get("db")
// Get the name of the CQ to run (blank means run all).
name := q.Get("name")

// Pass the request to the CQ service.
if err := h.ContinuousQuerier.Run(db, name); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusNoContent)
}

// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
q := r.URL.Query()
Expand Down

0 comments on commit fb514f2

Please sign in to comment.