Skip to content

Commit

Permalink
DROP SHARD
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Mar 14, 2016
1 parent ad8605d commit 166d8d9
Show file tree
Hide file tree
Showing 13 changed files with 305 additions and 157 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

### Features

- [#6001](https://github.com/influxdata/influxdb/pull/6001): Add DROP SHARD support.

### Bugfixes

## v0.11.0 [unreleased]
Expand Down
1 change: 1 addition & 0 deletions cluster/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MetaClient interface {
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
Database(name string) (*meta.DatabaseInfo, error)
Databases() ([]meta.DatabaseInfo, error)
DropShard(id uint64) error
DropContinuousQuery(database, name string) error
DropDatabase(name string) error
DropRetentionPolicy(database, name string) error
Expand Down
4 changes: 4 additions & 0 deletions cluster/meta_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type MetaClient struct {
DropDatabaseFn func(name string) error
DropRetentionPolicyFn func(database, name string) error
DropSubscriptionFn func(database, rp, name string) error
DropShardFn func(id uint64) error
DropUserFn func(name string) error
MetaNodesFn func() ([]meta.NodeInfo, error)
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
Expand Down Expand Up @@ -54,6 +55,9 @@ func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.Re
func (c *MetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
return c.CreateRetentionPolicyFn(database, rpi)
}
func (c *MetaClient) DropShard(id uint64) error {
return c.DropShardFn(id)
}

func (c *MetaClient) CreateSubscription(database, rp, name, mode string, destinations []string) error {
return c.CreateSubscriptionFn(database, rp, name, mode, destinations)
Expand Down
11 changes: 11 additions & 0 deletions cluster/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,16 @@ func (e *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat
return e.TSDBStore.DeleteSeries(database, stmt.Sources, stmt.Condition)
}

func (e *QueryExecutor) executeDropShardStatement(stmt *influxql.DropShardStatement) error {
// Remove the shard reference from the Meta Store.
if err := e.MetaClient.DropShard(stmt.ID); err != nil {
return err
}

// Locally delete the shard.
return e.TSDBStore.DeleteShard(stmt.ID)
}

func (e *QueryExecutor) executeDropRetentionPolicyStatement(stmt *influxql.DropRetentionPolicyStatement) error {
if err := e.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name); err != nil {
return err
Expand Down Expand Up @@ -877,6 +887,7 @@ type TSDBStore interface {
DeleteMeasurement(database, name string) error
DeleteRetentionPolicy(database, name string) error
DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShard(id uint64) error
ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
ExpandSources(sources influxql.Sources) (influxql.Sources, error)
Expand Down
5 changes: 5 additions & 0 deletions cluster/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type TSDBStore struct {
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteShardFn func(id uint64) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
ExecuteShowFieldKeysStatementFn func(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error)
ExecuteShowTagValuesStatementFn func(stmt *influxql.ShowTagValuesStatement, database string) (models.Rows, error)
Expand Down Expand Up @@ -154,6 +155,10 @@ func (s *TSDBStore) DeleteRetentionPolicy(database, name string) error {
return s.DeleteRetentionPolicyFn(database, name)
}

func (s *TSDBStore) DeleteShard(id uint64) error {
return s.DeleteShardFn(id)
}

func (s *TSDBStore) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {
return s.DeleteSeriesFn(database, sources, condition)
}
Expand Down
2 changes: 2 additions & 0 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ func (s *Service) executeStatement(stmt influxql.Statement, database string) err
return s.TSDBStore.DeleteSeries(database, t.Sources, t.Condition)
case *influxql.DropRetentionPolicyStatement:
return s.TSDBStore.DeleteRetentionPolicy(database, t.Name)
case *influxql.DropShardStatement:
return s.TSDBStore.DeleteShard(t.ID)
default:
return fmt.Errorf("%q should not be executed across a cluster", stmt.String())
}
Expand Down
26 changes: 26 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropServerStatement) node() {}
func (*DropShardStatement) node() {}
func (*DropSubscriptionStatement) node() {}
func (*DropUserStatement) node() {}
func (*GrantStatement) node() {}
Expand Down Expand Up @@ -229,6 +230,7 @@ func (*ShowSeriesStatement) stmt() {}
func (*ShowShardGroupsStatement) stmt() {}
func (*ShowShardsStatement) stmt() {}
func (*ShowStatsStatement) stmt() {}
func (*DropShardStatement) stmt() {}
func (*ShowSubscriptionsStatement) stmt() {}
func (*ShowDiagnosticsStatement) stmt() {}
func (*ShowTagKeysStatement) stmt() {}
Expand Down Expand Up @@ -2134,6 +2136,30 @@ func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}

// DropShardStatement represents a command for removing a shard from
// the node.
type DropShardStatement struct {
// ID of the shard to be dropped.
ID uint64

// Meta indicates if the server being dropped is a meta or data node
Meta bool
}

// String returns a string representation of the drop series statement.
func (s *DropShardStatement) String() string {
var buf bytes.Buffer
buf.WriteString("DROP SHARD ")
buf.WriteString(strconv.FormatUint(s.ID, 10))
return buf.String()
}

// RequiredPrivileges returns the privilege required to execute a
// DropShardStatement.
func (s *DropShardStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Admin: true, Name: "", Privilege: AllPrivileges}}
}

// ShowContinuousQueriesStatement represents a command for listing continuous queries.
type ShowContinuousQueriesStatement struct{}

Expand Down
45 changes: 31 additions & 14 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,28 +214,31 @@ func (p *Parser) parseCreateStatement() (Statement, error) {
// This function assumes the DROP token has already been consumed.
func (p *Parser) parseDropStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == SERIES {
return p.parseDropSeriesStatement()
} else if tok == MEASUREMENT {
return p.parseDropMeasurementStatement()
} else if tok == CONTINUOUS {
switch tok {
case CONTINUOUS:
return p.parseDropContinuousQueryStatement()
} else if tok == DATABASE {
case DATA, META:
return p.parseDropServerStatement(tok)
case DATABASE:
return p.parseDropDatabaseStatement()
} else if tok == RETENTION {
case MEASUREMENT:
return p.parseDropMeasurementStatement()
case RETENTION:
if tok, pos, lit := p.scanIgnoreWhitespace(); tok != POLICY {
return nil, newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
}
return p.parseDropRetentionPolicyStatement()
} else if tok == USER {
return p.parseDropUserStatement()
} else if tok == META || tok == DATA {
return p.parseDropServerStatement(tok)
} else if tok == SUBSCRIPTION {
case SERIES:
return p.parseDropSeriesStatement()
case SHARD:
return p.parseDropShardStatement()
case SUBSCRIPTION:
return p.parseDropSubscriptionStatement()
case USER:
return p.parseDropUserStatement()
default:
return nil, newParseError(tokstr(tok, lit), []string{"CONTINUOUS", "DATA", "MEASUREMENT", "META", "RETENTION", "SERIES", "SHARD", "SUBSCRIPTION", "USER"}, pos)
}

return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT", "SERVER", "SUBSCRIPTION"}, pos)
}

// parseAlterStatement parses a string and returns an alter statement.
Expand Down Expand Up @@ -1316,6 +1319,20 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
return stmt, nil
}

// parseDropShardStatement parses a string and returns a
// DropShardStatement. This function assumes the "DROP SHARD" tokens
// have already been consumed.
func (p *Parser) parseDropShardStatement() (*DropShardStatement, error) {
var err error
stmt := &DropShardStatement{}

// Parse the ID of the shard to be dropped.
if stmt.ID, err = p.parseUInt64(); err != nil {
return nil, err
}
return stmt, nil
}

// parseDropServerStatement parses a string and returns a DropServerStatement.
// This function assumes the "DROP <META|DATA>" tokens have already been consumed.
func (p *Parser) parseDropServerStatement(tok Token) (*DropServerStatement, error) {
Expand Down
2 changes: 1 addition & 1 deletion influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `CREATE CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `CREATE CONTINUOUS QUERY cq ON db RESAMPLE FOR 5s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(10s) END`, err: `FOR duration must be >= GROUP BY time duration: must be a minimum of 10s, got 5s`},
{s: `CREATE CONTINUOUS QUERY cq ON db RESAMPLE EVERY 10s FOR 5s BEGIN SELECT mean(value) INTO cpu_mean FROM cpu GROUP BY time(5s) END`, err: `FOR duration must be >= GROUP BY time duration: must be a minimum of 10s, got 5s`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENT, SERVER, SUBSCRIPTION at line 1, char 6`},
{s: `DROP FOO`, err: `found FOO, expected CONTINUOUS, DATA, MEASUREMENT, META, RETENTION, SERIES, SHARD, SUBSCRIPTION, USER at line 1, char 6`},
{s: `CREATE FOO`, err: `found FOO, expected CONTINUOUS, DATABASE, USER, RETENTION, SUBSCRIPTION at line 1, char 8`},
{s: `CREATE DATABASE`, err: `found EOF, expected identifier at line 1, char 17`},
{s: `CREATE DATABASE "testdb" WITH`, err: `found EOF, expected DURATION, REPLICATION, NAME at line 1, char 31`},
Expand Down
10 changes: 10 additions & 0 deletions services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,16 @@ func (c *Client) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Tim
return a, nil
}

// DropShard deletes a shard by ID.
func (c *Client) DropShard(id uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

data := c.cacheData.Clone()
data.DropShard(id)
return c.commit(data)
}

// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
c.mu.Lock()
Expand Down
32 changes: 32 additions & 0 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,38 @@ func (data *Data) SetDefaultRetentionPolicy(database, name string) error {
return nil
}

// DropShard removes a shard by ID.
//
// DropShard won't return an error if the shard can't be found, which
// allows the command to be re-run in the case that the meta store
// succeeds but a data node fails.
func (data *Data) DropShard(id uint64) {
found := -1
for dbidx, dbi := range data.Databases {
for rpidx, rpi := range dbi.RetentionPolicies {
for sgidx, sg := range rpi.ShardGroups {
for sidx, s := range sg.Shards {
if s.ID == id {
found = sidx
break
}
}

if found > -1 {
shards := sg.Shards
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)

if len(shards) == 1 {
// We just deleted the last shard in the shard group.
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
}
return
}
}
}
}
}

// ShardGroups returns a list of all shard groups on a database and policy.
func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) {
// Find retention policy.
Expand Down
30 changes: 30 additions & 0 deletions services/meta/internal/meta.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 166d8d9

Please sign in to comment.