Skip to content

Commit

Permalink
Merge pull request #6484 from benbjohnson/delete-series
Browse files Browse the repository at this point in the history
Add DELETE query support
  • Loading branch information
benbjohnson committed Apr 27, 2016
2 parents 51b1af4 + f7af787 commit 0157968
Show file tree
Hide file tree
Showing 12 changed files with 280 additions and 43 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
- [#3558](https://github.com/influxdata/influxdb/issues/3558): Support field math inside a WHERE clause.
- [#6429](https://github.com/influxdata/influxdb/issues/6429): Log slow queries if they pass a configurable threshold.
- [#4675](https://github.com/influxdata/influxdb/issues/4675): Allow derivative() function to be used with ORDER BY desc.
- [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM
- [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM
- [#6484](https://github.com/influxdata/influxdb/pull/6484): Query language support for DELETE


### Bugfixes

Expand Down
4 changes: 3 additions & 1 deletion cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,14 @@ func (s *Service) processExecuteStatementRequest(buf []byte) error {

func (s *Service) executeStatement(stmt influxql.Statement, database string) error {
switch t := stmt.(type) {
case *influxql.DeleteSeriesStatement:
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition, false)
case *influxql.DropDatabaseStatement:
return s.TSDBStore.DeleteDatabase(t.Name)
case *influxql.DropMeasurementStatement:
return s.TSDBStore.DeleteMeasurement(database, t.Name)
case *influxql.DropSeriesStatement:
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition)
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition, true)
case *influxql.DropRetentionPolicyStatement:
return s.TSDBStore.DeleteRetentionPolicy(database, t.Name)
case *influxql.DropShardStatement:
Expand Down
20 changes: 18 additions & 2 deletions cluster/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ
err = e.executeCreateSubscriptionStatement(stmt)
case *influxql.CreateUserStatement:
err = e.executeCreateUserStatement(stmt)
case *influxql.DeleteSeriesStatement:
err = e.executeDeleteSeriesStatement(stmt, ctx.Database)
case *influxql.DropContinuousQueryStatement:
err = e.executeDropContinuousQueryStatement(stmt)
case *influxql.DropDatabaseStatement:
Expand Down Expand Up @@ -197,6 +199,20 @@ func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserSta
return err
}

func (e *StatementExecutor) executeDeleteSeriesStatement(stmt *influxql.DeleteSeriesStatement, database string) error {
if dbi, err := e.MetaClient.Database(database); err != nil {
return err
} else if dbi == nil {
return influxql.ErrDatabaseNotFound(database)
}

// Convert "now()" to current time.
stmt.Condition = influxql.Reduce(stmt.Condition, &influxql.NowValuer{Now: time.Now().UTC()})

// Locally delete the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, false)
}

func (e *StatementExecutor) executeDropContinuousQueryStatement(q *influxql.DropContinuousQueryStatement) error {
return e.MetaClient.DropContinuousQuery(q.Database, q.Name)
}
Expand Down Expand Up @@ -238,7 +254,7 @@ func (e *StatementExecutor) executeDropSeriesStatement(stmt *influxql.DropSeries
}

// Locally drop the series.
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition, true)
}

func (e *StatementExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
Expand Down Expand Up @@ -819,7 +835,7 @@ type TSDBStore interface {
DeleteDatabase(name string) error
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, dropMeta bool) error
DeleteShard(id uint64) error
IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error)
}
Expand Down
6 changes: 3 additions & 3 deletions cluster/statement_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ type TSDBStore struct {
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr, dropMeta bool) error
ShardIteratorCreatorFn func(id uint64) influxql.IteratorCreator
}

Expand Down Expand Up @@ -242,8 +242,8 @@ func (s *TSDBStore) DeleteShard(id uint64) error {
return s.DeleteShardFn(id)
}

func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr, dropMeta bool) error {
return s.DeleteSeriesFn(database, sources, condition, dropMeta)
}

func (s *TSDBStore) IteratorCreator(shards []meta.ShardInfo) (influxql.IteratorCreator, error) {
Expand Down
46 changes: 45 additions & 1 deletion cmd/influxd/run/server_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,50 @@ func init() {
},
}

tests["delete_series"] = Test{
db: "db0",
rp: "rp0",
writes: Writes{
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=100 %d`, mustParseTime(time.RFC3339Nano, "2000-01-02T00:00:00Z").UnixNano())},
&Write{data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=200 %d`, mustParseTime(time.RFC3339Nano, "2000-01-03T00:00:00Z").UnixNano())},
&Write{db: "db1", data: fmt.Sprintf(`cpu,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano())},
},
queries: []*Query{
&Query{
name: "Show series is present",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Delete series",
command: `DELETE FROM cpu WHERE time < '2000-01-03T00:00:00Z'`,
exp: `{"results":[{}]}`,
params: url.Values{"db": []string{"db0"}},
once: true,
},
&Query{
name: "Show series still exists",
command: `SHOW SERIES`,
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure last point still exists",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "Make sure data wasn't deleted from other database.",
command: `SELECT * FROM cpu`,
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
params: url.Values{"db": []string{"db1"}},
},
},
}

tests["drop_and_recreate_series"] = Test{
db: "db0",
rp: "rp0",
Expand Down Expand Up @@ -302,7 +346,7 @@ func init() {
&Query{
name: "Drop series with WHERE field should error",
command: `DROP SERIES FROM c WHERE val > 50.0`,
exp: `{"results":[{"error":"DROP SERIES doesn't support fields in WHERE clause"}]}`,
exp: `{"results":[{"error":"fields not supported in WHERE clause during deletion"}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
Expand Down
32 changes: 32 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,38 @@ func TestServer_Query_DropDatabaseIsolated(t *testing.T) {
}
}

func TestServer_Query_DeleteSeries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

test := tests.load(t, "delete_series")

if err := s.CreateDatabaseAndRetentionPolicy(test.database(), newRetentionPolicyInfo(test.retentionPolicy(), 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaClient.SetDefaultRetentionPolicy(test.database(), test.retentionPolicy()); err != nil {
t.Fatal(err)
}

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_DropAndRecreateSeries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
14 changes: 14 additions & 0 deletions influxql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ CREATE USER jdoe WITH PASSWORD '1337password';
CREATE USER jdoe WITH PASSWORD '1337password' WITH ALL PRIVILEGES;
```

### DELETE SERIES

```
delete_series_stmt = "DELETE SERIES" ( from_clause | where_clause | from_clause where_clause ) .
```

#### Example:

```sql
DELETE FROM cpu
DELETE FROM cpu WHERE time < '2000-01-01T00:00:00Z'
DELETE WHERE time < '2000-01-01T00:00:00Z'
```

### DROP CONTINUOUS QUERY

```
Expand Down
37 changes: 37 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (*CreateRetentionPolicyStatement) node() {}
func (*CreateSubscriptionStatement) node() {}
func (*CreateUserStatement) node() {}
func (*Distinct) node() {}
func (*DeleteSeriesStatement) node() {}
func (*DeleteStatement) node() {}
func (*DropContinuousQueryStatement) node() {}
func (*DropDatabaseStatement) node() {}
Expand Down Expand Up @@ -209,6 +210,7 @@ func (*CreateDatabaseStatement) stmt() {}
func (*CreateRetentionPolicyStatement) stmt() {}
func (*CreateSubscriptionStatement) stmt() {}
func (*CreateUserStatement) stmt() {}
func (*DeleteSeriesStatement) stmt() {}
func (*DeleteStatement) stmt() {}
func (*DropContinuousQueryStatement) stmt() {}
func (*DropDatabaseStatement) stmt() {}
Expand Down Expand Up @@ -2164,6 +2166,37 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: WritePrivilege}}
}

// DeleteSeriesStatement represents a command for deleting all or part of a series from a database.
type DeleteSeriesStatement struct {
// Data source that fields are extracted from (optional)
Sources Sources

// An expression evaluated on data point (optional)
Condition Expr
}

// String returns a string representation of the delete series statement.
func (s *DeleteSeriesStatement) String() string {
var buf bytes.Buffer
buf.WriteString("DELETE")

if s.Sources != nil {
buf.WriteString(" FROM ")
buf.WriteString(s.Sources.String())
}
if s.Condition != nil {
buf.WriteString(" WHERE ")
buf.WriteString(s.Condition.String())
}

return buf.String()
}

// RequiredPrivileges returns the privilege required to execute a DeleteSeriesStatement.
func (s DeleteSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: false, Name: "", Privilege: WritePrivilege}}
}

// DropShardStatement represents a command for removing a shard from
// the node.
type DropShardStatement struct {
Expand Down Expand Up @@ -3468,6 +3501,10 @@ func Walk(v Visitor, node Node) {
Walk(v, c)
}

case *DeleteSeriesStatement:
Walk(v, n.Sources)
Walk(v, n.Condition)

case *DropSeriesStatement:
Walk(v, n.Sources)
Walk(v, n.Condition)
Expand Down
53 changes: 27 additions & 26 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,33 +993,34 @@ func (p *Parser) parseTarget(tr targetRequirement) (*Target, error) {
return t, nil
}

// parseDeleteStatement parses a delete string and returns a DeleteStatement.
// parseDeleteStatement parses a string and returns a delete statement.
// This function assumes the DELETE token has already been consumed.
func (p *Parser) parseDeleteStatement() (*DeleteStatement, error) {
// TODO remove and do not skip test once we wire up DELETE FROM.
// See issues https://github.com/influxdata/influxdb/issues/1647
// and https://github.com/influxdata/influxdb/issues/4404
return nil, errors.New("DELETE FROM is currently not supported. Use DROP SERIES or DROP MEASUREMENT instead")
//stmt := &DeleteStatement{}

//// Parse source
//if tok, pos, lit := p.scanIgnoreWhitespace(); tok != FROM {
// return nil, newParseError(tokstr(tok, lit), []string{"FROM"}, pos)
//}
//source, err := p.parseSource()
//if err != nil {
// return nil, err
//}
//stmt.Source = source

//// Parse condition: "WHERE EXPR".
//condition, err := p.parseCondition()
//if err != nil {
// return nil, err
//}
//stmt.Condition = condition

//return stmt, nil
func (p *Parser) parseDeleteStatement() (Statement, error) {
stmt := &DeleteSeriesStatement{}
var err error

tok, pos, lit := p.scanIgnoreWhitespace()

if tok == FROM {
// Parse source.
if stmt.Sources, err = p.parseSources(); err != nil {
return nil, err
}
} else {
p.unscan()
}

// Parse condition: "WHERE EXPR".
if stmt.Condition, err = p.parseCondition(); err != nil {
return nil, err
}

// If they didn't provide a FROM or a WHERE, this query is invalid
if stmt.Condition == nil && stmt.Sources == nil {
return nil, newParseError(tokstr(tok, lit), []string{"FROM", "WHERE"}, pos)
}

return stmt, nil
}

// parseShowSeriesStatement parses a string and returns a ShowSeriesStatement.
Expand Down
Loading

0 comments on commit 0157968

Please sign in to comment.