diff --git a/CHANGELOG.md b/CHANGELOG.md index ba2bed39d6c..3eb9828ee6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy - [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff - [#4284](https://github.com/influxdb/influxdb/pull/4284): Add exponential backoff for hinted-handoff failures +- [#4310](https://github.com/influxdb/influxdb/pull/4310): Support dropping non-Raft nodes. Work mostly by @corylanou ### Bugfixes - [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW diff --git a/cluster/internal/data.pb.go b/cluster/internal/data.pb.go index 4fa3f34b89b..f9546390378 100644 --- a/cluster/internal/data.pb.go +++ b/cluster/internal/data.pb.go @@ -17,15 +17,17 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type WriteShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` - Points [][]byte `protobuf:"bytes,2,rep" json:"Points,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` + Points [][]byte `protobuf:"bytes,2,rep,name=Points" json:"Points,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -48,8 +50,8 @@ func (m *WriteShardRequest) GetPoints() [][]byte { } type WriteShardResponse struct { - Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` + Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -72,9 +74,9 @@ func (m *WriteShardResponse) GetMessage() string { } type MapShardRequest struct { - ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` - Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"` - ChunkSize *int32 `protobuf:"varint,3,req" json:"ChunkSize,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` + Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"` + ChunkSize *int32 `protobuf:"varint,3,req,name=ChunkSize" json:"ChunkSize,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -104,11 +106,11 @@ func (m *MapShardRequest) GetChunkSize() int32 { } type MapShardResponse struct { - Code *int32 `protobuf:"varint,1,req" json:"Code,omitempty"` - Message *string `protobuf:"bytes,2,opt" json:"Message,omitempty"` - Data []byte `protobuf:"bytes,3,opt" json:"Data,omitempty"` - TagSets []string `protobuf:"bytes,4,rep" json:"TagSets,omitempty"` - Fields []string `protobuf:"bytes,5,rep" json:"Fields,omitempty"` + Code *int32 `protobuf:"varint,1,req,name=Code" json:"Code,omitempty"` + Message *string `protobuf:"bytes,2,opt,name=Message" json:"Message,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=Data" json:"Data,omitempty"` + TagSets []string `protobuf:"bytes,4,rep,name=TagSets" json:"TagSets,omitempty"` + Fields []string `protobuf:"bytes,5,rep,name=Fields" json:"Fields,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -150,6 +152,3 @@ func (m *MapShardResponse) GetFields() []string { } return nil } - -func init() { -} diff --git a/influxql/ast.go b/influxql/ast.go index 0b058166479..7d7f77f1991 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -92,6 +92,7 @@ func (*DropDatabaseStatement) node() {} func (*DropMeasurementStatement) node() {} func (*DropRetentionPolicyStatement) node() {} func (*DropSeriesStatement) node() {} +func (*DropServerStatement) node() {} func (*DropUserStatement) node() {} func (*GrantStatement) node() {} func (*GrantAdminStatement) node() {} @@ -198,6 +199,7 @@ func (*DropDatabaseStatement) stmt() {} func (*DropMeasurementStatement) stmt() {} func (*DropRetentionPolicyStatement) stmt() {} func (*DropSeriesStatement) stmt() {} +func (*DropServerStatement) stmt() {} func (*DropUserStatement) stmt() {} func (*GrantStatement) stmt() {} func (*GrantAdminStatement) stmt() {} @@ -1824,6 +1826,30 @@ func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges { return ExecutionPrivileges{{Admin: false, Name: "", Privilege: WritePrivilege}} } +// DropServerStatement represents a command for removing a server from the cluster. +type DropServerStatement struct { + // ID of the node to be dropped. + NodeID uint64 + // Force will force the server to drop even it it means losing data + Force bool +} + +// String returns a string representation of the drop series statement. +func (s *DropServerStatement) String() string { + var buf bytes.Buffer + _, _ = buf.WriteString("DROP SERVER ") + _, _ = buf.WriteString(strconv.FormatUint(s.NodeID, 10)) + if s.Force { + _, _ = buf.WriteString(" FORCE") + } + return buf.String() +} + +// RequiredPrivileges returns the privilege required to execute a DropServerStatement. +func (s *DropServerStatement) RequiredPrivileges() ExecutionPrivileges { + return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}} +} + // ShowContinuousQueriesStatement represents a command for listing continuous queries. type ShowContinuousQueriesStatement struct{} diff --git a/influxql/parser.go b/influxql/parser.go index 7809689d850..f7b801dcb7d 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -208,6 +208,8 @@ func (p *Parser) parseDropStatement() (Statement, error) { return p.parseDropRetentionPolicyStatement() } else if tok == USER { return p.parseDropUserStatement() + } else if tok == SERVER { + return p.parseDropServerStatement() } return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos) @@ -311,8 +313,8 @@ func (p *Parser) parseCreateRetentionPolicyStatement() (*CreateRetentionPolicySt // Parse optional DEFAULT token. if tok, pos, lit = p.scanIgnoreWhitespace(); tok == DEFAULT { stmt.Default = true - } else { - p.unscan() + } else if tok != EOF && tok != SEMICOLON { + return nil, newParseError(tokstr(tok, lit), []string{"DEFAULT"}, pos) } return stmt, nil @@ -1178,6 +1180,27 @@ func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) { return stmt, nil } +// parseDropServerStatement parses a string and returns a DropServerStatement. +// This function assumes the "DROP SERVER" tokens have already been consumed. +func (p *Parser) parseDropServerStatement() (*DropServerStatement, error) { + s := &DropServerStatement{} + var err error + + // Parse the server's ID. + if s.NodeID, err = p.parseUInt64(); err != nil { + return nil, err + } + + // Parse optional FORCE token. + if tok, pos, lit := p.scanIgnoreWhitespace(); tok == FORCE { + s.Force = true + } else if tok != EOF && tok != SEMICOLON { + return nil, newParseError(tokstr(tok, lit), []string{"FORCE"}, pos) + } + + return s, nil +} + // parseShowContinuousQueriesStatement parses a string and returns a ShowContinuousQueriesStatement. // This function assumes the "SHOW CONTINUOUS" tokens have already been consumed. func (p *Parser) parseShowContinuousQueriesStatement() (*ShowContinuousQueriesStatement, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index da14145c6f7..47f43e554ea 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1013,6 +1013,16 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // DROP SERVER statement + { + s: `DROP SERVER 123`, + stmt: &influxql.DropServerStatement{NodeID: 123}, + }, + { + s: `DROP SERVER 123 FORCE`, + stmt: &influxql.DropServerStatement{NodeID: 123, Force: true}, + }, + // SHOW CONTINUOUS QUERIES statement { s: `SHOW CONTINUOUS QUERIES`, @@ -1453,15 +1463,15 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT top() FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 0`}, {s: `SELECT top(field1) FROM myseries`, err: `invalid number of arguments for top, expected at least 2, got 1`}, {s: `SELECT top(field1,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`}, - {s: `SELECT top(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`}, - {s: `SELECT top(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`}, - {s: `SELECT top(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`}, + {s: `SELECT top(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in top(), found foo`}, + {s: `SELECT top(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found 5.000`}, + {s: `SELECT top(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in top(), found max(foo)`}, {s: `SELECT bottom() FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 0`}, {s: `SELECT bottom(field1) FROM myseries`, err: `invalid number of arguments for bottom, expected at least 2, got 1`}, {s: `SELECT bottom(field1,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`}, - {s: `SELECT bottom(field1,host,server,foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`}, - {s: `SELECT bottom(field1,5,server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`}, - {s: `SELECT bottom(field1,max(foo),server,2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`}, + {s: `SELECT bottom(field1,host,'server',foo) FROM myseries`, err: `expected integer as last argument in bottom(), found foo`}, + {s: `SELECT bottom(field1,5,'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found 5.000`}, + {s: `SELECT bottom(field1,max(foo),'server',2) FROM myseries`, err: `only fields or tags are allowed in bottom(), found max(foo)`}, {s: `SELECT percentile() FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 0`}, {s: `SELECT percentile(field1) FROM myseries`, err: `invalid number of arguments for percentile, expected 2, got 1`}, {s: `SELECT percentile(field1, foo) FROM myseries`, err: `expected float argument in percentile()`}, @@ -1515,6 +1525,9 @@ func TestParser_ParseStatement(t *testing.T) { {s: `DROP SERIES`, err: `found EOF, expected FROM, WHERE at line 1, char 13`}, {s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`}, {s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`}, + {s: `DROP SERVER`, err: `found EOF, expected number at line 1, char 13`}, + {s: `DROP SERVER abc`, err: `found abc, expected number at line 1, char 13`}, + {s: `DROP SERVER 1 1`, err: `found 1, expected FORCE at line 1, char 15`}, {s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`}, {s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`}, {s: `SHOW RETENTION ON`, err: `found ON, expected POLICIES at line 1, char 16`}, @@ -1624,6 +1637,7 @@ func TestParser_ParseStatement(t *testing.T) { {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 3.14`, err: `number must be an integer at line 1, char 67`}, {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 0`, err: `invalid value 0: must be 1 <= n <= 2147483647 at line 1, char 67`}, {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION bad`, err: `found bad, expected number at line 1, char 67`}, + {s: `CREATE RETENTION POLICY policy1 ON testdb DURATION 1h REPLICATION 1 foo`, err: `found foo, expected DEFAULT at line 1, char 69`}, {s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`}, {s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`}, {s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`}, diff --git a/influxql/scanner_test.go b/influxql/scanner_test.go index b365a1b559a..277848ced26 100644 --- a/influxql/scanner_test.go +++ b/influxql/scanner_test.go @@ -154,6 +154,8 @@ func TestScanner_Scan(t *testing.T) { {s: `REVOKE`, tok: influxql.REVOKE}, {s: `SELECT`, tok: influxql.SELECT}, {s: `SERIES`, tok: influxql.SERIES}, + {s: `SERVER`, tok: influxql.SERVER}, + {s: `SERVERS`, tok: influxql.SERVERS}, {s: `TAG`, tok: influxql.TAG}, {s: `TO`, tok: influxql.TO}, {s: `USER`, tok: influxql.USER}, diff --git a/influxql/token.go b/influxql/token.go index 795c7b169e8..fe521f4ebaf 100644 --- a/influxql/token.go +++ b/influxql/token.go @@ -77,6 +77,7 @@ const ( EXPLAIN FIELD FOR + FORCE FROM GRANT GRANTS @@ -108,6 +109,7 @@ const ( REVOKE SELECT SERIES + SERVER SERVERS SET SHOW @@ -187,6 +189,7 @@ var tokens = [...]string{ EXPLAIN: "EXPLAIN", FIELD: "FIELD", FOR: "FOR", + FORCE: "FORCE", FROM: "FROM", GRANT: "GRANT", GRANTS: "GRANTS", @@ -218,6 +221,7 @@ var tokens = [...]string{ REVOKE: "REVOKE", SELECT: "SELECT", SERIES: "SERIES", + SERVER: "SERVER", SERVERS: "SERVERS", SET: "SET", SHOW: "SHOW", diff --git a/meta/data.go b/meta/data.go index 71c278eeb1b..9257e61cb82 100644 --- a/meta/data.go +++ b/meta/data.go @@ -74,14 +74,71 @@ func (data *Data) CreateNode(host string) error { } // DeleteNode removes a node from the metadata. -func (data *Data) DeleteNode(id uint64) error { - for i := range data.Nodes { - if data.Nodes[i].ID == id { - data.Nodes = append(data.Nodes[:i], data.Nodes[i+1:]...) - return nil +func (data *Data) DeleteNode(id uint64, force bool) error { + // Node has to be larger than 0 to be real + if id == 0 { + return ErrNodeIDRequired + } + // Is this a valid node? + nodeInfo := data.Node(id) + if nodeInfo == nil { + return ErrNodeNotFound + } + + // Am I the only node? If so, nothing to do + if len(data.Nodes) == 1 { + return ErrNodeUnableToDropFinalNode + } + + // Determine if there are any any non-replicated nodes and force was not specified + if !force { + for _, d := range data.Databases { + for _, rp := range d.RetentionPolicies { + // ignore replicated retention policies + if rp.ReplicaN > 1 { + continue + } + for _, sg := range rp.ShardGroups { + for _, s := range sg.Shards { + if s.OwnedBy(id) && len(s.Owners) == 1 { + return ErrShardNotReplicated + } + } + } + } + } + } + + // Remove node id from all shard infos + for di, d := range data.Databases { + for ri, rp := range d.RetentionPolicies { + for sgi, sg := range rp.ShardGroups { + for si, s := range sg.Shards { + if s.OwnedBy(id) { + var owners []ShardOwner + for _, o := range s.Owners { + if o.NodeID != id { + owners = append(owners, o) + } + } + data.Databases[di].RetentionPolicies[ri].ShardGroups[sgi].Shards[si].Owners = owners + } + } + } } } - return ErrNodeNotFound + + // Remove this node from the in memory nodes + var nodes []NodeInfo + for _, n := range data.Nodes { + if n.ID == id { + continue + } + nodes = append(nodes, n) + } + data.Nodes = nodes + + return nil } // Database returns a database by name. diff --git a/meta/data_test.go b/meta/data_test.go index d26eb946701..8b7fcc6abf0 100644 --- a/meta/data_test.go +++ b/meta/data_test.go @@ -26,7 +26,7 @@ func TestData_CreateNode(t *testing.T) { } // Ensure a node can be removed. -func TestData_DeleteNode(t *testing.T) { +func TestData_DeleteNode_Basic(t *testing.T) { var data meta.Data if err := data.CreateNode("host0"); err != nil { t.Fatal(err) @@ -36,7 +36,7 @@ func TestData_DeleteNode(t *testing.T) { t.Fatal(err) } - if err := data.DeleteNode(1); err != nil { + if err := data.DeleteNode(1, false); err != nil { t.Fatal(err) } else if len(data.Nodes) != 2 { t.Fatalf("unexpected node count: %d", len(data.Nodes)) @@ -47,6 +47,49 @@ func TestData_DeleteNode(t *testing.T) { } } +// Ensure a node can be removed with shard info in play +func TestData_DeleteNode_Shards(t *testing.T) { + var data meta.Data + if err := data.CreateNode("host0"); err != nil { + t.Fatal(err) + } else if err = data.CreateNode("host1"); err != nil { + t.Fatal(err) + } else if err := data.CreateNode("host2"); err != nil { + t.Fatal(err) + } else if err := data.CreateNode("host3"); err != nil { + t.Fatal(err) + } + + if err := data.CreateDatabase("mydb"); err != nil { + t.Fatal(err) + } + + rpi := &meta.RetentionPolicyInfo{ + Name: "myrp", + ReplicaN: 3, + } + if err := data.CreateRetentionPolicy("mydb", rpi); err != nil { + t.Fatal(err) + } + if err := data.CreateShardGroup("mydb", "myrp", time.Now()); err != nil { + t.Fatal(err) + } + if len(data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners) != 3 { + t.Fatal("wrong number of shard owners") + } + if err := data.DeleteNode(2, false); err != nil { + t.Fatal(err) + } + if got, exp := len(data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards[0].Owners), 2; exp != got { + t.Fatalf("wrong number of shard owners, got %d, exp %d", got, exp) + } + for _, s := range data.Databases[0].RetentionPolicies[0].ShardGroups[0].Shards { + if s.OwnedBy(2) { + t.Fatal("shard still owned by delted node") + } + } +} + // Ensure a database can be created. func TestData_CreateDatabase(t *testing.T) { var data meta.Data diff --git a/meta/errors.go b/meta/errors.go index 6c726af6347..213b102f4eb 100644 --- a/meta/errors.go +++ b/meta/errors.go @@ -26,6 +26,16 @@ var ( // ErrNodesRequired is returned when at least one node is required for an operation. // This occurs when creating a shard group. ErrNodesRequired = newError("at least one node required") + + // ErrNodeIDRequired is returned when using a zero node id. + ErrNodeIDRequired = newError("node id must be greater than 0") + + // ErrNodeUnableToDropSingleNode is returned if the node being dropped is the last + // node in the cluster + ErrNodeUnableToDropFinalNode = newError("unable to drop the final node in a cluster") + + // ErrNodeRaft is returned when attempting an operation prohibted for a Raft-node. + ErrNodeRaft = newError("node is a Raft node") ) var ( @@ -73,6 +83,10 @@ var ( // ErrShardGroupNotFound is returned when mutating a shard group that doesn't exist. ErrShardGroupNotFound = newError("shard group not found") + + // ErrShardNotReplicated is returned if the node requested to be dropped has + // the last copy of a shard present and the force keyword was not used + ErrShardNotReplicated = newError("shard not replicated") ) var ( diff --git a/meta/internal/meta.pb.go b/meta/internal/meta.pb.go index 4d6752d85c0..285ccb2e417 100644 --- a/meta/internal/meta.pb.go +++ b/meta/internal/meta.pb.go @@ -50,10 +50,12 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type RPCType int32 @@ -177,15 +179,15 @@ func (x *Command_Type) UnmarshalJSON(data []byte) error { } type Data struct { - Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"` - Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"` - ClusterID *uint64 `protobuf:"varint,3,req" json:"ClusterID,omitempty"` - Nodes []*NodeInfo `protobuf:"bytes,4,rep" json:"Nodes,omitempty"` - Databases []*DatabaseInfo `protobuf:"bytes,5,rep" json:"Databases,omitempty"` - Users []*UserInfo `protobuf:"bytes,6,rep" json:"Users,omitempty"` - MaxNodeID *uint64 `protobuf:"varint,7,req" json:"MaxNodeID,omitempty"` - MaxShardGroupID *uint64 `protobuf:"varint,8,req" json:"MaxShardGroupID,omitempty"` - MaxShardID *uint64 `protobuf:"varint,9,req" json:"MaxShardID,omitempty"` + Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"` + Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"` + ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"` + Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"` + Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"` + Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"` + MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"` + MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"` + MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -257,8 +259,8 @@ func (m *Data) GetMaxShardID() uint64 { } type NodeInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -281,10 +283,10 @@ func (m *NodeInfo) GetHost() string { } type DatabaseInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - DefaultRetentionPolicy *string `protobuf:"bytes,2,req" json:"DefaultRetentionPolicy,omitempty"` - RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep" json:"RetentionPolicies,omitempty"` - ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep" json:"ContinuousQueries,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + DefaultRetentionPolicy *string `protobuf:"bytes,2,req,name=DefaultRetentionPolicy" json:"DefaultRetentionPolicy,omitempty"` + RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep,name=RetentionPolicies" json:"RetentionPolicies,omitempty"` + ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep,name=ContinuousQueries" json:"ContinuousQueries,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -321,11 +323,11 @@ func (m *DatabaseInfo) GetContinuousQueries() []*ContinuousQueryInfo { } type RetentionPolicyInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Duration *int64 `protobuf:"varint,2,req" json:"Duration,omitempty"` - ShardGroupDuration *int64 `protobuf:"varint,3,req" json:"ShardGroupDuration,omitempty"` - ReplicaN *uint32 `protobuf:"varint,4,req" json:"ReplicaN,omitempty"` - ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep" json:"ShardGroups,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Duration *int64 `protobuf:"varint,2,req,name=Duration" json:"Duration,omitempty"` + ShardGroupDuration *int64 `protobuf:"varint,3,req,name=ShardGroupDuration" json:"ShardGroupDuration,omitempty"` + ReplicaN *uint32 `protobuf:"varint,4,req,name=ReplicaN" json:"ReplicaN,omitempty"` + ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep,name=ShardGroups" json:"ShardGroups,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -369,11 +371,11 @@ func (m *RetentionPolicyInfo) GetShardGroups() []*ShardGroupInfo { } type ShardGroupInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - StartTime *int64 `protobuf:"varint,2,req" json:"StartTime,omitempty"` - EndTime *int64 `protobuf:"varint,3,req" json:"EndTime,omitempty"` - DeletedAt *int64 `protobuf:"varint,4,req" json:"DeletedAt,omitempty"` - Shards []*ShardInfo `protobuf:"bytes,5,rep" json:"Shards,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + StartTime *int64 `protobuf:"varint,2,req,name=StartTime" json:"StartTime,omitempty"` + EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"` + DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"` + Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -417,9 +419,9 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo { } type ShardInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - OwnerIDs []uint64 `protobuf:"varint,2,rep" json:"OwnerIDs,omitempty"` - Owners []*ShardOwner `protobuf:"bytes,3,rep" json:"Owners,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"` + Owners []*ShardOwner `protobuf:"bytes,3,rep,name=Owners" json:"Owners,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -449,7 +451,7 @@ func (m *ShardInfo) GetOwners() []*ShardOwner { } type ShardOwner struct { - NodeID *uint64 `protobuf:"varint,1,req" json:"NodeID,omitempty"` + NodeID *uint64 `protobuf:"varint,1,req,name=NodeID" json:"NodeID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -465,8 +467,8 @@ func (m *ShardOwner) GetNodeID() uint64 { } type ContinuousQueryInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -489,10 +491,10 @@ func (m *ContinuousQueryInfo) GetQuery() string { } type UserInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` - Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"` - Privileges []*UserPrivilege `protobuf:"bytes,4,rep" json:"Privileges,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` + Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"` + Privileges []*UserPrivilege `protobuf:"bytes,4,rep,name=Privileges" json:"Privileges,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -529,8 +531,8 @@ func (m *UserInfo) GetPrivileges() []*UserPrivilege { } type UserPrivilege struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Privilege *int32 `protobuf:"varint,2,req" json:"Privilege,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Privilege *int32 `protobuf:"varint,2,req,name=Privilege" json:"Privilege,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -584,8 +586,8 @@ func (m *Command) GetType() Command_Type { } type CreateNodeCommand struct { - Host *string `protobuf:"bytes,1,req" json:"Host,omitempty"` - Rand *uint64 `protobuf:"varint,2,req" json:"Rand,omitempty"` + Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"` + Rand *uint64 `protobuf:"varint,2,req,name=Rand" json:"Rand,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -616,7 +618,8 @@ var E_CreateNodeCommand_Command = &proto.ExtensionDesc{ } type DeleteNodeCommand struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Force *bool `protobuf:"varint,2,req,name=Force" json:"Force,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -631,6 +634,13 @@ func (m *DeleteNodeCommand) GetID() uint64 { return 0 } +func (m *DeleteNodeCommand) GetForce() bool { + if m != nil && m.Force != nil { + return *m.Force + } + return false +} + var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{ ExtendedType: (*Command)(nil), ExtensionType: (*DeleteNodeCommand)(nil), @@ -640,7 +650,7 @@ var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{ } type CreateDatabaseCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -664,7 +674,7 @@ var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{ } type DropDatabaseCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -688,8 +698,8 @@ var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{ } type CreateRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -720,8 +730,8 @@ var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type DropRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -752,8 +762,8 @@ var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type SetDefaultRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -784,11 +794,11 @@ var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type UpdateRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` - NewName *string `protobuf:"bytes,3,opt" json:"NewName,omitempty"` - Duration *int64 `protobuf:"varint,4,opt" json:"Duration,omitempty"` - ReplicaN *uint32 `protobuf:"varint,5,opt" json:"ReplicaN,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + NewName *string `protobuf:"bytes,3,opt,name=NewName" json:"NewName,omitempty"` + Duration *int64 `protobuf:"varint,4,opt,name=Duration" json:"Duration,omitempty"` + ReplicaN *uint32 `protobuf:"varint,5,opt,name=ReplicaN" json:"ReplicaN,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -840,9 +850,9 @@ var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type CreateShardGroupCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` - Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"` + Timestamp *int64 `protobuf:"varint,3,req,name=Timestamp" json:"Timestamp,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -880,9 +890,9 @@ var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{ } type DeleteShardGroupCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` - ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"` + ShardGroupID *uint64 `protobuf:"varint,3,req,name=ShardGroupID" json:"ShardGroupID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -920,9 +930,9 @@ var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{ } type CreateContinuousQueryCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` - Query *string `protobuf:"bytes,3,req" json:"Query,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + Query *string `protobuf:"bytes,3,req,name=Query" json:"Query,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -960,8 +970,8 @@ var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{ } type DropContinuousQueryCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -992,9 +1002,9 @@ var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{ } type CreateUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` - Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` + Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1032,7 +1042,7 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{ } type DropUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1056,8 +1066,8 @@ var E_DropUserCommand_Command = &proto.ExtensionDesc{ } type UpdateUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1088,9 +1098,9 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{ } type SetPrivilegeCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` - Privilege *int32 `protobuf:"varint,3,req" json:"Privilege,omitempty"` + Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + Privilege *int32 `protobuf:"varint,3,req,name=Privilege" json:"Privilege,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1128,7 +1138,7 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{ } type SetDataCommand struct { - Data *Data `protobuf:"bytes,1,req" json:"Data,omitempty"` + Data *Data `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1152,8 +1162,8 @@ var E_SetDataCommand_Command = &proto.ExtensionDesc{ } type SetAdminPrivilegeCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Admin *bool `protobuf:"varint,2,req" json:"Admin,omitempty"` + Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"` + Admin *bool `protobuf:"varint,2,req,name=Admin" json:"Admin,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1184,8 +1194,8 @@ var E_SetAdminPrivilegeCommand_Command = &proto.ExtensionDesc{ } type UpdateNodeCommand struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1216,9 +1226,9 @@ var E_UpdateNodeCommand_Command = &proto.ExtensionDesc{ } type Response struct { - OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"` - Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"` - Index *uint64 `protobuf:"varint,3,opt" json:"Index,omitempty"` + OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"` + Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` + Index *uint64 `protobuf:"varint,3,opt,name=Index" json:"Index,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1248,8 +1258,8 @@ func (m *Response) GetIndex() uint64 { } type ResponseHeader struct { - OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"` - Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"` + OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"` + Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1272,7 +1282,7 @@ func (m *ResponseHeader) GetError() string { } type ErrorResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1288,9 +1298,9 @@ func (m *ErrorResponse) GetHeader() *ResponseHeader { } type FetchDataRequest struct { - Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"` - Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"` - Blocking *bool `protobuf:"varint,3,opt,def=0" json:"Blocking,omitempty"` + Index *uint64 `protobuf:"varint,1,req,name=Index" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,2,req,name=Term" json:"Term,omitempty"` + Blocking *bool `protobuf:"varint,3,opt,name=Blocking,def=0" json:"Blocking,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1322,10 +1332,10 @@ func (m *FetchDataRequest) GetBlocking() bool { } type FetchDataResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` - Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"` - Term *uint64 `protobuf:"varint,3,req" json:"Term,omitempty"` - Data []byte `protobuf:"bytes,4,opt" json:"Data,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` + Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,3,req,name=Term" json:"Term,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1362,7 +1372,7 @@ func (m *FetchDataResponse) GetData() []byte { } type JoinRequest struct { - Addr *string `protobuf:"bytes,1,req" json:"Addr,omitempty"` + Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1378,14 +1388,14 @@ func (m *JoinRequest) GetAddr() string { } type JoinResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` // Indicates that this node should take part in the raft cluster. - EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"` + EnableRaft *bool `protobuf:"varint,2,opt,name=EnableRaft" json:"EnableRaft,omitempty"` // The addresses of raft peers to use if joining as a raft member. If not joining // as a raft member, these are the nodes running raft. - RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"` + RaftNodes []string `protobuf:"bytes,3,rep,name=RaftNodes" json:"RaftNodes,omitempty"` // The node ID assigned to the requesting node. - NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"` + NodeID *uint64 `protobuf:"varint,4,opt,name=NodeID" json:"NodeID,omitempty"` XXX_unrecognized []byte `json:"-"` } diff --git a/meta/internal/meta.proto b/meta/internal/meta.proto index 31114722570..1dc1d76fd00 100644 --- a/meta/internal/meta.proto +++ b/meta/internal/meta.proto @@ -123,6 +123,7 @@ message DeleteNodeCommand { optional DeleteNodeCommand command = 102; } required uint64 ID = 1; + required bool Force = 2; } message CreateDatabaseCommand { diff --git a/meta/statement_executor.go b/meta/statement_executor.go index 97944b971c2..fadbd1e21c8 100644 --- a/meta/statement_executor.go +++ b/meta/statement_executor.go @@ -13,9 +13,12 @@ import ( // StatementExecutor translates InfluxQL queries to meta store methods. type StatementExecutor struct { Store interface { + Node(id uint64) (ni *NodeInfo, err error) Nodes() ([]NodeInfo, error) Peers() ([]string, error) + Leader() string + DeleteNode(nodeID uint64, force bool) error Database(name string) (*DatabaseInfo, error) Databases() ([]DatabaseInfo, error) CreateDatabase(name string) (*DatabaseInfo, error) @@ -88,6 +91,8 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement) *influxql. return e.executeShowShardsStatement(stmt) case *influxql.ShowStatsStatement: return e.executeShowStatsStatement(stmt) + case *influxql.DropServerStatement: + return e.executeDropServerStatement(stmt) default: panic(fmt.Sprintf("unsupported statement type: %T", stmt)) } @@ -142,13 +147,33 @@ func (e *StatementExecutor) executeShowServersStatement(q *influxql.ShowServersS return &influxql.Result{Err: err} } - row := &models.Row{Columns: []string{"id", "cluster_addr", "raft"}} + leader := e.Store.Leader() + + row := &models.Row{Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}} for _, ni := range nis { - row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host)}) + row.Values = append(row.Values, []interface{}{ni.ID, ni.Host, contains(peers, ni.Host), leader == ni.Host}) } return &influxql.Result{Series: []*models.Row{row}} } +func (e *StatementExecutor) executeDropServerStatement(q *influxql.DropServerStatement) *influxql.Result { + // Dropping only non-Raft nodes supported. + peers, err := e.Store.Peers() + if err != nil { + return &influxql.Result{Err: err} + } + ni, err := e.Store.Node(q.NodeID) + if err != nil { + return &influxql.Result{Err: err} + } + if contains(peers, ni.Host) { + return &influxql.Result{Err: ErrNodeRaft} + } + + err = e.Store.DeleteNode(q.NodeID, q.Force) + return &influxql.Result{Err: err} +} + func (e *StatementExecutor) executeCreateUserStatement(q *influxql.CreateUserStatement) *influxql.Result { _, err := e.Store.CreateUser(q.Name, q.Password, q.Admin) return &influxql.Result{Err: err} diff --git a/meta/statement_executor_test.go b/meta/statement_executor_test.go index f6bd72ec893..6e76b12655e 100644 --- a/meta/statement_executor_test.go +++ b/meta/statement_executor_test.go @@ -125,15 +125,18 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) { e.Store.PeersFn = func() ([]string, error) { return []string{"node0"}, nil } + e.Store.LeaderFn = func() string { + return "node0" + } if res := e.ExecuteStatement(influxql.MustParseStatement(`SHOW SERVERS`)); res.Err != nil { t.Fatal(res.Err) } else if !reflect.DeepEqual(res.Series, models.Rows{ { - Columns: []string{"id", "cluster_addr", "raft"}, + Columns: []string{"id", "cluster_addr", "raft", "raft-leader"}, Values: [][]interface{}{ - {uint64(1), "node0", true}, - {uint64(2), "node1", false}, + {uint64(1), "node0", true, true}, + {uint64(2), "node1", false, false}, }, }, }) { @@ -141,6 +144,35 @@ func TestStatementExecutor_ExecuteStatement_ShowServers(t *testing.T) { } } +// Ensure a DROP SERVER statement can be executed. +func TestStatementExecutor_ExecuteStatement_DropServer(t *testing.T) { + e := NewStatementExecutor() + e.Store.NodeFn = func(id uint64) (*meta.NodeInfo, error) { + return &meta.NodeInfo{ + ID: 1, Host: "node1", + }, nil + } + + // Ensure Raft nodes cannot be dropped. + e.Store.PeersFn = func() ([]string, error) { + return []string{"node1"}, nil + } + if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP SERVER 1`)); res.Err != meta.ErrNodeRaft { + t.Fatalf("unexpected error: %s", res.Err) + } + + // Ensure non-Raft nodes can be dropped. + e.Store.PeersFn = func() ([]string, error) { + return []string{"node2"}, nil + } + e.Store.DeleteNodeFn = func(id uint64, force bool) error { + return nil + } + if res := e.ExecuteStatement(influxql.MustParseStatement(`DROP SERVER 1`)); res.Err != nil { + t.Fatalf("unexpected error: %s", res.Err) + } +} + // Ensure a SHOW SERVERS statement returns errors from the store. func TestStatementExecutor_ExecuteStatement_ShowServers_Err(t *testing.T) { e := NewStatementExecutor() @@ -832,12 +864,15 @@ func NewStatementExecutor() *StatementExecutor { // StatementExecutorStore represents a mock implementation of StatementExecutor.Store. type StatementExecutorStore struct { + NodeFn func(id uint64) (*meta.NodeInfo, error) NodesFn func() ([]meta.NodeInfo, error) PeersFn func() ([]string, error) + LeaderFn func() string DatabaseFn func(name string) (*meta.DatabaseInfo, error) DatabasesFn func() ([]meta.DatabaseInfo, error) CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) DropDatabaseFn func(name string) error + DeleteNodeFn func(nodeID uint64, force bool) error DefaultRetentionPolicyFn func(database string) (*meta.RetentionPolicyInfo, error) CreateRetentionPolicyFn func(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate) error @@ -856,6 +891,10 @@ type StatementExecutorStore struct { DropContinuousQueryFn func(database, name string) error } +func (s *StatementExecutorStore) Node(id uint64) (*meta.NodeInfo, error) { + return s.NodeFn(id) +} + func (s *StatementExecutorStore) Nodes() ([]meta.NodeInfo, error) { return s.NodesFn() } @@ -864,6 +903,17 @@ func (s *StatementExecutorStore) Peers() ([]string, error) { return s.PeersFn() } +func (s *StatementExecutorStore) Leader() string { + if s.LeaderFn != nil { + return s.LeaderFn() + } + return "" +} + +func (s *StatementExecutorStore) DeleteNode(nodeID uint64, force bool) error { + return s.DeleteNodeFn(nodeID, force) +} + func (s *StatementExecutorStore) Database(name string) (*meta.DatabaseInfo, error) { return s.DatabaseFn(name) } diff --git a/meta/store.go b/meta/store.go index ddc7bff4797..1d8da401c78 100644 --- a/meta/store.go +++ b/meta/store.go @@ -823,10 +823,16 @@ func (s *Store) UpdateNode(id uint64, host string) (*NodeInfo, error) { } // DeleteNode removes a node from the metastore by id. -func (s *Store) DeleteNode(id uint64) error { +func (s *Store) DeleteNode(id uint64, force bool) error { + ni := s.data.Node(id) + if ni == nil { + return ErrNodeNotFound + } + return s.exec(internal.Command_DeleteNodeCommand, internal.E_DeleteNodeCommand_Command, &internal.DeleteNodeCommand{ - ID: proto.Uint64(id), + ID: proto.Uint64(id), + Force: proto.Bool(force), }, ) } @@ -1706,11 +1712,14 @@ func (fsm *storeFSM) applyDeleteNodeCommand(cmd *internal.Command) interface{} { // Copy data and update. other := fsm.data.Clone() - if err := other.DeleteNode(v.GetID()); err != nil { + if err := other.DeleteNode(v.GetID(), v.GetForce()); err != nil { return err } fsm.data = other + id := v.GetID() + fsm.Logger.Printf("node '%d' removed", id) + return nil } diff --git a/meta/store_test.go b/meta/store_test.go index f8f6c7bce6b..ae8d9bf1f8e 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -151,7 +151,7 @@ func TestStore_DeleteNode(t *testing.T) { } // Remove second node. - if err := s.DeleteNode(3); err != nil { + if err := s.DeleteNode(3, false); err != nil { t.Fatal(err) } @@ -173,7 +173,7 @@ func TestStore_DeleteNode_ErrNodeNotFound(t *testing.T) { s := MustOpenStore() defer s.Close() - if err := s.DeleteNode(2); err != meta.ErrNodeNotFound { + if err := s.DeleteNode(2, false); err != meta.ErrNodeNotFound { t.Fatalf("unexpected error: %s", err) } } diff --git a/services/copier/internal/internal.pb.go b/services/copier/internal/internal.pb.go index 81732a93b5d..68caa62fb6d 100644 --- a/services/copier/internal/internal.pb.go +++ b/services/copier/internal/internal.pb.go @@ -15,14 +15,16 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type Request struct { - ShardID *uint64 `protobuf:"varint,1,req" json:"ShardID,omitempty"` + ShardID *uint64 `protobuf:"varint,1,req,name=ShardID" json:"ShardID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -38,7 +40,7 @@ func (m *Request) GetShardID() uint64 { } type Response struct { - Error *string `protobuf:"bytes,1,opt" json:"Error,omitempty"` + Error *string `protobuf:"bytes,1,opt,name=Error" json:"Error,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -52,6 +54,3 @@ func (m *Response) GetError() string { } return "" } - -func init() { -} diff --git a/tsdb/internal/meta.pb.go b/tsdb/internal/meta.pb.go index cbe051393df..c580f4dba61 100644 --- a/tsdb/internal/meta.pb.go +++ b/tsdb/internal/meta.pb.go @@ -17,15 +17,17 @@ It has these top-level messages: package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type Series struct { - Key *string `protobuf:"bytes,1,req" json:"Key,omitempty"` - Tags []*Tag `protobuf:"bytes,2,rep" json:"Tags,omitempty"` + Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` + Tags []*Tag `protobuf:"bytes,2,rep,name=Tags" json:"Tags,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -48,8 +50,8 @@ func (m *Series) GetTags() []*Tag { } type Tag struct { - Key *string `protobuf:"bytes,1,req" json:"Key,omitempty"` - Value *string `protobuf:"bytes,2,req" json:"Value,omitempty"` + Key *string `protobuf:"bytes,1,req,name=Key" json:"Key,omitempty"` + Value *string `protobuf:"bytes,2,req,name=Value" json:"Value,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -72,7 +74,7 @@ func (m *Tag) GetValue() string { } type MeasurementFields struct { - Fields []*Field `protobuf:"bytes,1,rep" json:"Fields,omitempty"` + Fields []*Field `protobuf:"bytes,1,rep,name=Fields" json:"Fields,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -88,9 +90,9 @@ func (m *MeasurementFields) GetFields() []*Field { } type Field struct { - ID *int32 `protobuf:"varint,1,req" json:"ID,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` - Type *int32 `protobuf:"varint,3,req" json:"Type,omitempty"` + ID *int32 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + Type *int32 `protobuf:"varint,3,req,name=Type" json:"Type,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -118,6 +120,3 @@ func (m *Field) GetType() int32 { } return 0 } - -func init() { -}