Skip to content

Commit

Permalink
Merge pull request #179 from influxdb/fix-177-drop-series
Browse files Browse the repository at this point in the history
fix #177. support drop series in the query lang
  • Loading branch information
pauldix committed Jan 13, 2014
2 parents 8963604 + 3e216e1 commit 2a08f41
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 23 deletions.
8 changes: 8 additions & 0 deletions src/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func (self *QueryEngine) RunQuery(user common.User, database string, queryString
continue
}

if query.DropSeriesQuery != nil {
err := self.coordinator.DropSeries(user, database, query.DropSeriesQuery.GetTableName())
if err != nil {
return err
}
continue
}

selectQuery := query.SelectQuery
if isAggregateQuery(selectQuery) {
return self.executeCountQueryWithGroupBy(user, database, selectQuery, localOnly, yield)
Expand Down
34 changes: 21 additions & 13 deletions src/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,23 +307,31 @@ func (self *ServerSuite) TestDropDatabase(c *C) {
}

func (self *ServerSuite) TestDropSeries(c *C) {
self.serverProcesses[0].Post("/db?u=root&p=root", `{"name": "drop_series", "replicationFactor": 3}`, c)
self.serverProcesses[0].Post("/db/drop_series/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)
data := `[{
for i := 0; i < 2; i++ {
self.serverProcesses[0].Post("/db?u=root&p=root", `{"name": "drop_series", "replicationFactor": 3}`, c)
self.serverProcesses[0].Post("/db/drop_series/users?u=root&p=root", `{"name": "paul", "password": "pass"}`, c)
data := `[{
"name": "cluster_query",
"columns": ["val1"],
"points": [[1]]
}]`
self.serverProcesses[0].Post("/db/drop_series/series?u=paul&p=pass", data, c)
time.Sleep(time.Second)
resp := self.serverProcesses[0].Request("DELETE", "/db/drop_series/series/cluster_query?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusNoContent)
time.Sleep(time.Second)
for _, s := range self.serverProcesses {
fmt.Printf("Running query against: %d\n", s.apiPort)
collection := s.Query("drop_series", "select * from cluster_query", true, c)
c.Assert(collection.GetSeries("cluster_query", c).Points, HasLen, 0)
c.Assert(collection.GetSeries("cluster_query", c).Columns, DeepEquals, []string{"time", "sequence_number"})
self.serverProcesses[0].Post("/db/drop_series/series?u=paul&p=pass", data, c)
time.Sleep(time.Second)
if i == 0 {
fmt.Printf("Using the http api\n")
resp := self.serverProcesses[0].Request("DELETE", "/db/drop_series/series/cluster_query?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusNoContent)
} else {
fmt.Printf("Using the drop series\n")
self.serverProcesses[0].Query("drop_series", "drop series cluster_query", false, c)
}
time.Sleep(time.Second)
for _, s := range self.serverProcesses {
fmt.Printf("Running query against: %d\n", s.apiPort)
collection := s.Query("drop_series", "select * from cluster_query", true, c)
c.Assert(collection.GetSeries("cluster_query", c).Points, HasLen, 0)
c.Assert(collection.GetSeries("cluster_query", c).Columns, DeepEquals, []string{"time", "sequence_number"})
}
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/parser/frees.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ free_delete_query (delete_query *q)
}
}

void
free_drop_series_query (drop_series_query *q)
{
free_value(q->name);
}

void
close_query (query *q)
{
Expand All @@ -127,6 +133,11 @@ close_query (query *q)
free(q->select_query);
}

if (q->drop_series_query) {
free_drop_series_query(q->drop_series_query);
free(q->drop_series_query);
}

if (q->delete_query) {
free_delete_query(q->delete_query);
free(q->delete_query);
Expand Down
42 changes: 34 additions & 8 deletions src/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,23 @@ type SelectQuery struct {

type ListQuery struct{}

type DropSeriesQuery struct {
tableName string
}

func (self *DropSeriesQuery) GetTableName() string {
return self.tableName
}

type DeleteQuery struct {
SelectDeleteCommonQuery
}

type Query struct {
SelectQuery *SelectQuery
DeleteQuery *DeleteQuery
ListQuery *ListQuery
SelectQuery *SelectQuery
DeleteQuery *DeleteQuery
ListQuery *ListQuery
DropSeriesQuery *DropSeriesQuery
}

func (self *Query) GetQueryString() string {
Expand Down Expand Up @@ -426,20 +435,37 @@ func ParseQuery(query string) ([]*Query, error) {

if q.list_series_query != 0 {
return []*Query{&Query{ListQuery: &ListQuery{}}}, nil
}

if q.select_query != nil {
} else if q.select_query != nil {
selectQuery, err := parseSelectQuery(query, q.select_query)
if err != nil {
return nil, err
}
return []*Query{&Query{SelectQuery: selectQuery}}, nil
} else if q.delete_query != nil {
deleteQuery, err := parseDeleteQuery(query, q.delete_query)
if err != nil {
return nil, err
}
return []*Query{&Query{DeleteQuery: deleteQuery}}, nil
} else if q.drop_series_query != nil {
dropSeriesQuery, err := parseDropSeriesQuery(query, q.drop_series_query)
if err != nil {
return nil, err
}
return []*Query{&Query{DropSeriesQuery: dropSeriesQuery}}, nil
}
deleteQuery, err := parseDeleteQuery(query, q.delete_query)
return nil, fmt.Errorf("Unknown query type encountered")
}

func parseDropSeriesQuery(queryStirng string, dropSeriesQuery *C.drop_series_query) (*DropSeriesQuery, error) {
name, err := GetValue(dropSeriesQuery.name)
if err != nil {
return nil, err
}
return []*Query{&Query{DeleteQuery: deleteQuery}}, nil

return &DropSeriesQuery{
tableName: name.Name,
}, nil
}

func parseSelectDeleteCommonQuery(queryString string, fromClause *C.from_clause, whereCondition *C.condition) (SelectDeleteCommonQuery, error) {
Expand Down
16 changes: 16 additions & 0 deletions src/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ func (self *QueryParserSuite) TestParseDeleteQueryWithEndTime(c *C) {
c.Assert(q.GetEndTime(), Equals, time.Unix(1389040522, 0).UTC())
}

func (self *QueryParserSuite) TestParseDropSeries(c *C) {
query := "drop series foobar"
queries, err := ParseQuery(query)
c.Assert(err, IsNil)

c.Assert(queries, HasLen, 1)

_q := queries[0]

c.Assert(_q.DropSeriesQuery, NotNil)

q := _q.DropSeriesQuery

c.Assert(q.GetTableName(), Equals, "foobar")
}

func (self *QueryParserSuite) TestGetQueryStringWithTimeCondition(c *C) {
now := time.Now().Round(time.Minute).UTC()
micros := common.TimeToMicroseconds(now)
Expand Down
1 change: 1 addition & 0 deletions src/parser/query.lex
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ static int yycolumn = 1;
"as" { return AS; }
"select" { return SELECT; }
"delete" { return DELETE; }
"drop series" { return DROP_SERIES; }
"limit" { BEGIN(INITIAL); return LIMIT; }
"order" { BEGIN(INITIAL); return ORDER; }
"asc" { return ASC; }
Expand Down
19 changes: 17 additions & 2 deletions src/parser/query.yacc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
query* query;
select_query* select_query;
delete_query* delete_query;
drop_series_query* drop_series_query;
groupby_clause* groupby_clause;
struct {
int limit;
Expand All @@ -69,7 +70,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
%lex-param {void *scanner}

// define types of tokens (terminals)
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES
%token SELECT DELETE FROM WHERE EQUAL GROUP BY LIMIT ORDER ASC DESC MERGE INNER JOIN AS LIST SERIES DROP_SERIES
%token <string> STRING_VALUE INT_VALUE FLOAT_VALUE TABLE_NAME SIMPLE_NAME REGEX_OP
%token <string> NEGATION_REGEX_OP REGEX_STRING INSENSITIVE_REGEX_STRING DURATION

Expand All @@ -96,6 +97,7 @@ value *create_expression_value(char *operator, size_t size, ...) {
%type <limit_and_order> LIMIT_AND_ORDER_CLAUSES
%type <query> QUERY
%type <delete_query> DELETE_QUERY
%type <drop_series_query> DROP_SERIES_QUERY
%type <select_query> SELECT_QUERY

// the initial token
Expand Down Expand Up @@ -150,6 +152,12 @@ QUERY:
$$ = calloc(1, sizeof(query));
$$->list_series_query = TRUE;
}
|
DROP_SERIES_QUERY
{
$$ = calloc(1, sizeof(query));
$$->drop_series_query = $1;
}

DELETE_QUERY:
DELETE FROM_CLAUSE WHERE_CLAUSE
Expand All @@ -159,6 +167,13 @@ DELETE_QUERY:
$$->where_condition = $3;
}

DROP_SERIES_QUERY:
DROP_SERIES SIMPLE_NAME_VALUE
{
$$ = malloc(sizeof(drop_series_query));
$$->name = $2;
}

SELECT_QUERY:
SELECT COLUMN_NAMES FROM_CLAUSE GROUP_BY_CLAUSE WHERE_CLAUSE LIMIT_AND_ORDER_CLAUSES
{
Expand Down Expand Up @@ -511,7 +526,7 @@ void yy_delete_buffer(void *, void *);
query
parse_query(char *const query_s)
{
query q = {NULL, NULL, FALSE, NULL};
query q = {NULL, NULL, NULL, FALSE, NULL};
void *scanner;
yylex_init(&scanner);
#ifdef DEBUG
Expand Down
5 changes: 5 additions & 0 deletions src/parser/query_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ typedef struct {
error *error;
} delete_query;

typedef struct {
value *name;
} drop_series_query;

typedef struct {
select_query *select_query;
delete_query *delete_query;
drop_series_query *drop_series_query;
char list_series_query;
error *error;
} query;
Expand Down
3 changes: 3 additions & 0 deletions src/parser/test_memory_leaks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ int main(int argc, char **argv) {
q = parse_query("select email from users.events where email in ('jvshahid@gmail.com')");
close_query(&q);
q = parse_query("drop series foobar");
close_query(&q);
return 0;
}
EOF
Expand Down

0 comments on commit 2a08f41

Please sign in to comment.