Skip to content

Commit

Permalink
Wire up DROP CONTINUOUS QUERY
Browse files Browse the repository at this point in the history
  • Loading branch information
toddboom committed Mar 25, 2015
1 parent b164b63 commit a7694c4
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 6 deletions.
6 changes: 6 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (

// Continuous Query messages
createContinuousQueryMessageType = messaging.MessageType(0x70)
dropContinuousQueryMessageType = messaging.MessageType(0x71)

// Write series data messages (per-topic)
writeRawSeriesMessageType = messaging.MessageType(0x80)
Expand Down Expand Up @@ -206,3 +207,8 @@ type dropSeriesCommand struct {
type createContinuousQueryCommand struct {
Query string `json:"query"`
}

type dropContinuousQueryCommand struct {
Name string `json:"name"`
Database string `json:"database"`
}
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ var (

// ErrContinuousQueryExists is returned when creating a duplicate continuous query.
ErrContinuousQueryExists = errors.New("continuous query already exists")

// ErrContinuousQueryNotFound is returned when dropping a nonexistent continuous query.
ErrContinuousQueryNotFound = errors.New("continuous query not found")
)

// ErrAuthorize represents an authorization error.
Expand Down
3 changes: 2 additions & 1 deletion influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,8 @@ func (s *CreateContinuousQueryStatement) RequiredPrivileges() ExecutionPrivilege

// DropContinuousQueryStatement represents a command for removing a continuous query.
type DropContinuousQueryStatement struct {
Name string
Name string
Database string
}

// String returns a string representation of the statement.
Expand Down
15 changes: 13 additions & 2 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,11 +1212,22 @@ func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatem
}

// Read the id of the query to drop.
lit, err := p.parseIdent()
ident, err := p.parseIdent()
if err != nil {
return nil, err
}
stmt.Name = lit
stmt.Name = ident

// Expect an "ON" keyword.
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != ON {
return nil, newParseError(tokstr(tok, lit), []string{"ON"}, pos)
}

// Read the name of the database to remove the query from.
if ident, err = p.parseIdent(); err != nil {
return nil, err
}
stmt.Database = ident

return stmt, nil
}
Expand Down
6 changes: 4 additions & 2 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,8 @@ func TestParser_ParseStatement(t *testing.T) {

// DROP CONTINUOUS QUERY statement
{
s: `DROP CONTINUOUS QUERY myquery`,
stmt: &influxql.DropContinuousQueryStatement{Name: "myquery"},
s: `DROP CONTINUOUS QUERY myquery ON foo`,
stmt: &influxql.DropContinuousQueryStatement{Name: "myquery", Database: "foo"},
},

// DROP DATABASE statement
Expand Down Expand Up @@ -810,6 +810,8 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SHOW STATS ON`, err: `found EOF, expected string at line 1, char 15`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 23`},
{s: `DROP CONTINUOUS QUERY myquery`, err: `found EOF, expected ON at line 1, char 31`},
{s: `DROP CONTINUOUS QUERY myquery ON`, err: `found EOF, expected identifier at line 1, char 34`},
{s: `CREATE CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 19`},
{s: `CREATE CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENT at line 1, char 6`},
Expand Down
53 changes: 52 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3030,6 +3030,17 @@ func (s *Server) CreateContinuousQuery(q *influxql.CreateContinuousQueryStatemen
return err
}

func (s *Server) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement, user *User) *Result {
return &Result{Err: s.DropContinuousQuery(q)}
}

// DropContinuousQuery dropsoa continuous query.
func (s *Server) DropContinuousQuery(q *influxql.DropContinuousQueryStatement) error {
c := &dropContinuousQueryCommand{Name: q.Name, Database: q.Database}
_, err := s.broadcast(dropContinuousQueryMessageType, c)
return err
}

// ContinuousQueries returns a list of all continuous queries.
func (s *Server) ContinuousQueries(database string) []*ContinuousQuery {
s.mu.RLock()
Expand Down Expand Up @@ -3257,6 +3268,8 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) {
err = s.applySetPrivilege(m)
case createContinuousQueryMessageType:
err = s.applyCreateContinuousQueryCommand(m)
case dropContinuousQueryMessageType:
err = s.applyDropContinuousQueryCommand(m)
case dropSeriesMessageType:
err = s.applyDropSeries(m)
case writeRawSeriesMessageType:
Expand Down Expand Up @@ -3546,7 +3559,7 @@ func NewContinuousQuery(q string) (*ContinuousQuery, error) {

cq, ok := stmt.(*influxql.CreateContinuousQueryStatement)
if !ok {
return nil, errors.New("query isn't a continuous query")
return nil, errors.New("query isn't a valie continuous query")
}

cquery := &ContinuousQuery{
Expand Down Expand Up @@ -3616,6 +3629,44 @@ func (s *Server) applyCreateContinuousQueryCommand(m *messaging.Message) error {
return nil
}

// applyDropContinuousQueryCommand removes the continuous query from the database object and saves it to the metastore
func (s *Server) applyDropContinuousQueryCommand(m *messaging.Message) error {
var c dropContinuousQueryCommand

mustUnmarshalJSON(m.Data, &c)

// retrieve the database and ensure that it exists
db := s.databases[c.Database]
if db == nil {
return ErrDatabaseNotFound
}

// loop through continuous queries and find the match
cqIndex := -1
for n, continuousQuery := range db.continuousQueries {
if continuousQuery.cq.Name == c.Name {
cqIndex = n
break
}
}

if cqIndex == -1 {
return ErrContinuousQueryNotFound
}

// delete the relevant continuous query
copy(db.continuousQueries[cqIndex:], db.continuousQueries[cqIndex+1:])
db.continuousQueries[len(db.continuousQueries)-1] = nil
db.continuousQueries = db.continuousQueries[:len(db.continuousQueries)-1]

// persist to metastore
s.meta.mustUpdate(m.Index, func(tx *metatx) error {
return tx.saveDatabase(db)
})

return nil
}

// RunContinuousQueries will run any continuous queries that are due to run and write the
// results back into the database
func (s *Server) RunContinuousQueries() error {
Expand Down
57 changes: 57 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,63 @@ func TestServer_CreateContinuousQuery_ErrInfinteLoop(t *testing.T) {
t.Skip("pending")
}

func TestServer_DropContinuousQuery(t *testing.T) {
c := test.NewMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()

// Create the "foo" database.
if err := s.CreateDatabase("foo"); err != nil {
t.Fatal(err)
}
if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar", Duration: time.Hour}); err != nil {
t.Fatal(err)
}
s.SetDefaultRetentionPolicy("foo", "bar")

// create and check
q := "CREATE CONTINUOUS QUERY myquery ON foo BEGIN SELECT count() INTO measure1 FROM myseries GROUP BY time(10m) END"
stmt, err := influxql.NewParser(strings.NewReader(q)).ParseStatement()
if err != nil {
t.Fatalf("error parsing query %s", err.Error())
}
ccq := stmt.(*influxql.CreateContinuousQueryStatement)
if err := s.CreateContinuousQuery(ccq); err != nil {
t.Fatalf("error creating continuous query %s", err.Error())
}

queries := s.ContinuousQueries("foo")
cqObj, _ := influxdb.NewContinuousQuery(q)
expected := []*influxdb.ContinuousQuery{cqObj}
if mustMarshalJSON(expected) != mustMarshalJSON(queries) {
t.Fatalf("query not saved:\n\texp: %s\n\tgot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
}
s.Restart()

// check again
queries = s.ContinuousQueries("foo")
if !reflect.DeepEqual(queries, expected) {
t.Fatalf("query not saved:\n\texp: %s\ngot: %s", mustMarshalJSON(expected), mustMarshalJSON(queries))
}

// drop and check
q = "DROP CONTINUOUS QUERY myquery ON foo"
stmt, err = influxql.NewParser(strings.NewReader(q)).ParseStatement()
if err != nil {
t.Fatalf("error parsing query %s", err.Error())
}
dcq := stmt.(*influxql.DropContinuousQueryStatement)
if err := s.DropContinuousQuery(dcq); err != nil {
t.Fatalf("error dropping continuous query %s", err.Error())
}

queries = s.ContinuousQueries("foo")
if len(queries) != 0 {
t.Fatalf("continuous query didn't get dropped")
}
}

// Ensure
func TestServer_RunContinuousQueries(t *testing.T) {
t.Skip()
Expand Down

0 comments on commit a7694c4

Please sign in to comment.