diff --git a/CHANGELOG.md b/CHANGELOG.md index 07db3d78df9..44ec992a1a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ - [#4721](https://github.com/influxdb/influxdb/pull/4721): Export tsdb.InterfaceValues - [#4681](https://github.com/influxdb/influxdb/pull/4681): Increase default buffer size for collectd and graphite listeners - [#4659](https://github.com/influxdb/influxdb/pull/4659): Support IF EXISTS for DROP DATABASE +- [#4685](https://github.com/influxdb/influxdb/pull/4685): Automatically promote node to raft peer if drop server results in removing a raft peer. ### Bugfixes - [#4715](https://github.com/influxdb/influxdb/pull/4715): Fix panic during Raft-close. Fix [issue #4707](https://github.com/influxdb/influxdb/issues/4707). Thanks @oiooj diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 92016e30243..848a0647aae 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -33,6 +33,14 @@ reporting-disabled = false heartbeat-timeout = "1s" leader-lease-timeout = "500ms" commit-timeout = "50ms" + cluster-tracing = false + + # If enabled, when a Raft cluster loses a peer due to a `DROP SERVER` command, + # the leader will automatically ask a non-raft peer node to promote to a raft + # peer. This only happens if there is a non-raft peer node available to promote. + # This setting only affects the local node, so to ensure if operates correctly, be sure to set + # it in the config of every node. + raft-promotion-enabled = true ### ### [data] diff --git a/meta/config.go b/meta/config.go index 6b292db2fbc..62b9b4cdab2 100644 --- a/meta/config.go +++ b/meta/config.go @@ -24,31 +24,36 @@ const ( // DefaultCommitTimeout is the default commit timeout for the store. DefaultCommitTimeout = 50 * time.Millisecond + + // DefaultRaftPromotionEnabled is the default for auto promoting a node to a raft node when needed + DefaultRaftPromotionEnabled = true ) // Config represents the meta configuration. type Config struct { - Dir string `toml:"dir"` - Hostname string `toml:"hostname"` - BindAddress string `toml:"bind-address"` - Peers []string `toml:"-"` - RetentionAutoCreate bool `toml:"retention-autocreate"` - ElectionTimeout toml.Duration `toml:"election-timeout"` - HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"` - LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"` - CommitTimeout toml.Duration `toml:"commit-timeout"` - ClusterTracing bool `toml:"cluster-tracing"` + Dir string `toml:"dir"` + Hostname string `toml:"hostname"` + BindAddress string `toml:"bind-address"` + Peers []string `toml:"-"` + RetentionAutoCreate bool `toml:"retention-autocreate"` + ElectionTimeout toml.Duration `toml:"election-timeout"` + HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"` + LeaderLeaseTimeout toml.Duration `toml:"leader-lease-timeout"` + CommitTimeout toml.Duration `toml:"commit-timeout"` + ClusterTracing bool `toml:"cluster-tracing"` + RaftPromotionEnabled bool `toml:"raft-promotion-enabled"` } // NewConfig builds a new configuration with default values. func NewConfig() *Config { return &Config{ - Hostname: DefaultHostname, - BindAddress: DefaultBindAddress, - RetentionAutoCreate: true, - ElectionTimeout: toml.Duration(DefaultElectionTimeout), - HeartbeatTimeout: toml.Duration(DefaultHeartbeatTimeout), - LeaderLeaseTimeout: toml.Duration(DefaultLeaderLeaseTimeout), - CommitTimeout: toml.Duration(DefaultCommitTimeout), + Hostname: DefaultHostname, + BindAddress: DefaultBindAddress, + RetentionAutoCreate: true, + ElectionTimeout: toml.Duration(DefaultElectionTimeout), + HeartbeatTimeout: toml.Duration(DefaultHeartbeatTimeout), + LeaderLeaseTimeout: toml.Duration(DefaultLeaderLeaseTimeout), + CommitTimeout: toml.Duration(DefaultCommitTimeout), + RaftPromotionEnabled: DefaultRaftPromotionEnabled, } } diff --git a/meta/config_test.go b/meta/config_test.go index dc4337e5e31..8032ebb7463 100644 --- a/meta/config_test.go +++ b/meta/config_test.go @@ -17,6 +17,7 @@ election-timeout = "10s" heartbeat-timeout = "20s" leader-lease-timeout = "30h" commit-timeout = "40m" +raft-promotion-enabled = false `, &c); err != nil { t.Fatal(err) } @@ -32,5 +33,7 @@ commit-timeout = "40m" t.Fatalf("unexpected leader lease timeout: %v", c.LeaderLeaseTimeout) } else if time.Duration(c.CommitTimeout) != 40*time.Minute { t.Fatalf("unexpected commit timeout: %v", c.CommitTimeout) + } else if c.RaftPromotionEnabled { + t.Fatalf("unexpected raft promotion enabled: %v", c.RaftPromotionEnabled) } } diff --git a/meta/data.go b/meta/data.go index fdf92c00294..0d934c561ed 100644 --- a/meta/data.go +++ b/meta/data.go @@ -751,6 +751,13 @@ func (ni *NodeInfo) unmarshal(pb *internal.NodeInfo) { ni.Host = pb.GetHost() } +// NodeInfos is a slice of NodeInfo used for sorting +type NodeInfos []NodeInfo + +func (n NodeInfos) Len() int { return len(n) } +func (n NodeInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] } +func (n NodeInfos) Less(i, j int) bool { return n[i].ID < n[j].ID } + // DatabaseInfo represents information about a database in the system. type DatabaseInfo struct { Name string diff --git a/meta/internal/meta.pb.go b/meta/internal/meta.pb.go index 13f02fc293b..f4d23e5781a 100644 --- a/meta/internal/meta.pb.go +++ b/meta/internal/meta.pb.go @@ -50,33 +50,40 @@ It has these top-level messages: FetchDataResponse JoinRequest JoinResponse + PromoteRaftRequest + PromoteRaftResponse */ package internal import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" import math "math" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +var _ = fmt.Errorf var _ = math.Inf type RPCType int32 const ( - RPCType_Error RPCType = 1 - RPCType_FetchData RPCType = 2 - RPCType_Join RPCType = 3 + RPCType_Error RPCType = 1 + RPCType_FetchData RPCType = 2 + RPCType_Join RPCType = 3 + RPCType_PromoteRaft RPCType = 4 ) var RPCType_name = map[int32]string{ 1: "Error", 2: "FetchData", 3: "Join", + 4: "PromoteRaft", } var RPCType_value = map[string]int32{ - "Error": 1, - "FetchData": 2, - "Join": 3, + "Error": 1, + "FetchData": 2, + "Join": 3, + "PromoteRaft": 4, } func (x RPCType) Enum() *RPCType { @@ -190,15 +197,15 @@ func (x *Command_Type) UnmarshalJSON(data []byte) error { } type Data struct { - Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"` - Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"` - ClusterID *uint64 `protobuf:"varint,3,req" json:"ClusterID,omitempty"` - Nodes []*NodeInfo `protobuf:"bytes,4,rep" json:"Nodes,omitempty"` - Databases []*DatabaseInfo `protobuf:"bytes,5,rep" json:"Databases,omitempty"` - Users []*UserInfo `protobuf:"bytes,6,rep" json:"Users,omitempty"` - MaxNodeID *uint64 `protobuf:"varint,7,req" json:"MaxNodeID,omitempty"` - MaxShardGroupID *uint64 `protobuf:"varint,8,req" json:"MaxShardGroupID,omitempty"` - MaxShardID *uint64 `protobuf:"varint,9,req" json:"MaxShardID,omitempty"` + Term *uint64 `protobuf:"varint,1,req,name=Term" json:"Term,omitempty"` + Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"` + ClusterID *uint64 `protobuf:"varint,3,req,name=ClusterID" json:"ClusterID,omitempty"` + Nodes []*NodeInfo `protobuf:"bytes,4,rep,name=Nodes" json:"Nodes,omitempty"` + Databases []*DatabaseInfo `protobuf:"bytes,5,rep,name=Databases" json:"Databases,omitempty"` + Users []*UserInfo `protobuf:"bytes,6,rep,name=Users" json:"Users,omitempty"` + MaxNodeID *uint64 `protobuf:"varint,7,req,name=MaxNodeID" json:"MaxNodeID,omitempty"` + MaxShardGroupID *uint64 `protobuf:"varint,8,req,name=MaxShardGroupID" json:"MaxShardGroupID,omitempty"` + MaxShardID *uint64 `protobuf:"varint,9,req,name=MaxShardID" json:"MaxShardID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -270,8 +277,8 @@ func (m *Data) GetMaxShardID() uint64 { } type NodeInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -294,10 +301,10 @@ func (m *NodeInfo) GetHost() string { } type DatabaseInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - DefaultRetentionPolicy *string `protobuf:"bytes,2,req" json:"DefaultRetentionPolicy,omitempty"` - RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep" json:"RetentionPolicies,omitempty"` - ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep" json:"ContinuousQueries,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + DefaultRetentionPolicy *string `protobuf:"bytes,2,req,name=DefaultRetentionPolicy" json:"DefaultRetentionPolicy,omitempty"` + RetentionPolicies []*RetentionPolicyInfo `protobuf:"bytes,3,rep,name=RetentionPolicies" json:"RetentionPolicies,omitempty"` + ContinuousQueries []*ContinuousQueryInfo `protobuf:"bytes,4,rep,name=ContinuousQueries" json:"ContinuousQueries,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -334,12 +341,12 @@ func (m *DatabaseInfo) GetContinuousQueries() []*ContinuousQueryInfo { } type RetentionPolicyInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Duration *int64 `protobuf:"varint,2,req" json:"Duration,omitempty"` - ShardGroupDuration *int64 `protobuf:"varint,3,req" json:"ShardGroupDuration,omitempty"` - ReplicaN *uint32 `protobuf:"varint,4,req" json:"ReplicaN,omitempty"` - ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep" json:"ShardGroups,omitempty"` - Subscriptions []*SubscriptionInfo `protobuf:"bytes,6,rep" json:"Subscriptions,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Duration *int64 `protobuf:"varint,2,req,name=Duration" json:"Duration,omitempty"` + ShardGroupDuration *int64 `protobuf:"varint,3,req,name=ShardGroupDuration" json:"ShardGroupDuration,omitempty"` + ReplicaN *uint32 `protobuf:"varint,4,req,name=ReplicaN" json:"ReplicaN,omitempty"` + ShardGroups []*ShardGroupInfo `protobuf:"bytes,5,rep,name=ShardGroups" json:"ShardGroups,omitempty"` + Subscriptions []*SubscriptionInfo `protobuf:"bytes,6,rep,name=Subscriptions" json:"Subscriptions,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -390,11 +397,11 @@ func (m *RetentionPolicyInfo) GetSubscriptions() []*SubscriptionInfo { } type ShardGroupInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - StartTime *int64 `protobuf:"varint,2,req" json:"StartTime,omitempty"` - EndTime *int64 `protobuf:"varint,3,req" json:"EndTime,omitempty"` - DeletedAt *int64 `protobuf:"varint,4,req" json:"DeletedAt,omitempty"` - Shards []*ShardInfo `protobuf:"bytes,5,rep" json:"Shards,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + StartTime *int64 `protobuf:"varint,2,req,name=StartTime" json:"StartTime,omitempty"` + EndTime *int64 `protobuf:"varint,3,req,name=EndTime" json:"EndTime,omitempty"` + DeletedAt *int64 `protobuf:"varint,4,req,name=DeletedAt" json:"DeletedAt,omitempty"` + Shards []*ShardInfo `protobuf:"bytes,5,rep,name=Shards" json:"Shards,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -438,9 +445,9 @@ func (m *ShardGroupInfo) GetShards() []*ShardInfo { } type ShardInfo struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - OwnerIDs []uint64 `protobuf:"varint,2,rep" json:"OwnerIDs,omitempty"` - Owners []*ShardOwner `protobuf:"bytes,3,rep" json:"Owners,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + OwnerIDs []uint64 `protobuf:"varint,2,rep,name=OwnerIDs" json:"OwnerIDs,omitempty"` + Owners []*ShardOwner `protobuf:"bytes,3,rep,name=Owners" json:"Owners,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -470,9 +477,9 @@ func (m *ShardInfo) GetOwners() []*ShardOwner { } type SubscriptionInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Mode *string `protobuf:"bytes,2,req" json:"Mode,omitempty"` - Destinations []string `protobuf:"bytes,3,rep" json:"Destinations,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Mode *string `protobuf:"bytes,2,req,name=Mode" json:"Mode,omitempty"` + Destinations []string `protobuf:"bytes,3,rep,name=Destinations" json:"Destinations,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -502,7 +509,7 @@ func (m *SubscriptionInfo) GetDestinations() []string { } type ShardOwner struct { - NodeID *uint64 `protobuf:"varint,1,req" json:"NodeID,omitempty"` + NodeID *uint64 `protobuf:"varint,1,req,name=NodeID" json:"NodeID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -518,8 +525,8 @@ func (m *ShardOwner) GetNodeID() uint64 { } type ContinuousQueryInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Query *string `protobuf:"bytes,2,req" json:"Query,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Query *string `protobuf:"bytes,2,req,name=Query" json:"Query,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -542,10 +549,10 @@ func (m *ContinuousQueryInfo) GetQuery() string { } type UserInfo struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` - Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"` - Privileges []*UserPrivilege `protobuf:"bytes,4,rep" json:"Privileges,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` + Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"` + Privileges []*UserPrivilege `protobuf:"bytes,4,rep,name=Privileges" json:"Privileges,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -582,8 +589,8 @@ func (m *UserInfo) GetPrivileges() []*UserPrivilege { } type UserPrivilege struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Privilege *int32 `protobuf:"varint,2,req" json:"Privilege,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Privilege *int32 `protobuf:"varint,2,req,name=Privilege" json:"Privilege,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -637,8 +644,8 @@ func (m *Command) GetType() Command_Type { } type CreateNodeCommand struct { - Host *string `protobuf:"bytes,1,req" json:"Host,omitempty"` - Rand *uint64 `protobuf:"varint,2,req" json:"Rand,omitempty"` + Host *string `protobuf:"bytes,1,req,name=Host" json:"Host,omitempty"` + Rand *uint64 `protobuf:"varint,2,req,name=Rand" json:"Rand,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -669,8 +676,8 @@ var E_CreateNodeCommand_Command = &proto.ExtensionDesc{ } type DeleteNodeCommand struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - Force *bool `protobuf:"varint,2,req" json:"Force,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Force *bool `protobuf:"varint,2,req,name=Force" json:"Force,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -701,7 +708,7 @@ var E_DeleteNodeCommand_Command = &proto.ExtensionDesc{ } type CreateDatabaseCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -725,7 +732,7 @@ var E_CreateDatabaseCommand_Command = &proto.ExtensionDesc{ } type DropDatabaseCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -749,8 +756,8 @@ var E_DropDatabaseCommand_Command = &proto.ExtensionDesc{ } type CreateRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req" json:"RetentionPolicy,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + RetentionPolicy *RetentionPolicyInfo `protobuf:"bytes,2,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -781,8 +788,8 @@ var E_CreateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type DropRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -813,8 +820,8 @@ var E_DropRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type SetDefaultRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -845,11 +852,11 @@ var E_SetDefaultRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type UpdateRetentionPolicyCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` - NewName *string `protobuf:"bytes,3,opt" json:"NewName,omitempty"` - Duration *int64 `protobuf:"varint,4,opt" json:"Duration,omitempty"` - ReplicaN *uint32 `protobuf:"varint,5,opt" json:"ReplicaN,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + NewName *string `protobuf:"bytes,3,opt,name=NewName" json:"NewName,omitempty"` + Duration *int64 `protobuf:"varint,4,opt,name=Duration" json:"Duration,omitempty"` + ReplicaN *uint32 `protobuf:"varint,5,opt,name=ReplicaN" json:"ReplicaN,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -901,9 +908,9 @@ var E_UpdateRetentionPolicyCommand_Command = &proto.ExtensionDesc{ } type CreateShardGroupCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` - Timestamp *int64 `protobuf:"varint,3,req" json:"Timestamp,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"` + Timestamp *int64 `protobuf:"varint,3,req,name=Timestamp" json:"Timestamp,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -941,9 +948,9 @@ var E_CreateShardGroupCommand_Command = &proto.ExtensionDesc{ } type DeleteShardGroupCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Policy *string `protobuf:"bytes,2,req" json:"Policy,omitempty"` - ShardGroupID *uint64 `protobuf:"varint,3,req" json:"ShardGroupID,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Policy *string `protobuf:"bytes,2,req,name=Policy" json:"Policy,omitempty"` + ShardGroupID *uint64 `protobuf:"varint,3,req,name=ShardGroupID" json:"ShardGroupID,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -981,9 +988,9 @@ var E_DeleteShardGroupCommand_Command = &proto.ExtensionDesc{ } type CreateContinuousQueryCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` - Query *string `protobuf:"bytes,3,req" json:"Query,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` + Query *string `protobuf:"bytes,3,req,name=Query" json:"Query,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1021,8 +1028,8 @@ var E_CreateContinuousQueryCommand_Command = &proto.ExtensionDesc{ } type DropContinuousQueryCommand struct { - Database *string `protobuf:"bytes,1,req" json:"Database,omitempty"` - Name *string `protobuf:"bytes,2,req" json:"Name,omitempty"` + Database *string `protobuf:"bytes,1,req,name=Database" json:"Database,omitempty"` + Name *string `protobuf:"bytes,2,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1053,9 +1060,9 @@ var E_DropContinuousQueryCommand_Command = &proto.ExtensionDesc{ } type CreateUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` - Admin *bool `protobuf:"varint,3,req" json:"Admin,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` + Admin *bool `protobuf:"varint,3,req,name=Admin" json:"Admin,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1093,7 +1100,7 @@ var E_CreateUserCommand_Command = &proto.ExtensionDesc{ } type DropUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1117,8 +1124,8 @@ var E_DropUserCommand_Command = &proto.ExtensionDesc{ } type UpdateUserCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Hash *string `protobuf:"bytes,2,req" json:"Hash,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Hash *string `protobuf:"bytes,2,req,name=Hash" json:"Hash,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1149,9 +1156,9 @@ var E_UpdateUserCommand_Command = &proto.ExtensionDesc{ } type SetPrivilegeCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` - Privilege *int32 `protobuf:"varint,3,req" json:"Privilege,omitempty"` + Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + Privilege *int32 `protobuf:"varint,3,req,name=Privilege" json:"Privilege,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1189,7 +1196,7 @@ var E_SetPrivilegeCommand_Command = &proto.ExtensionDesc{ } type SetDataCommand struct { - Data *Data `protobuf:"bytes,1,req" json:"Data,omitempty"` + Data *Data `protobuf:"bytes,1,req,name=Data" json:"Data,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1213,8 +1220,8 @@ var E_SetDataCommand_Command = &proto.ExtensionDesc{ } type SetAdminPrivilegeCommand struct { - Username *string `protobuf:"bytes,1,req" json:"Username,omitempty"` - Admin *bool `protobuf:"varint,2,req" json:"Admin,omitempty"` + Username *string `protobuf:"bytes,1,req,name=Username" json:"Username,omitempty"` + Admin *bool `protobuf:"varint,2,req,name=Admin" json:"Admin,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1245,8 +1252,8 @@ var E_SetAdminPrivilegeCommand_Command = &proto.ExtensionDesc{ } type UpdateNodeCommand struct { - ID *uint64 `protobuf:"varint,1,req" json:"ID,omitempty"` - Host *string `protobuf:"bytes,2,req" json:"Host,omitempty"` + ID *uint64 `protobuf:"varint,1,req,name=ID" json:"ID,omitempty"` + Host *string `protobuf:"bytes,2,req,name=Host" json:"Host,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1277,11 +1284,11 @@ var E_UpdateNodeCommand_Command = &proto.ExtensionDesc{ } type CreateSubscriptionCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` - RetentionPolicy *string `protobuf:"bytes,3,req" json:"RetentionPolicy,omitempty"` - Mode *string `protobuf:"bytes,4,req" json:"Mode,omitempty"` - Destinations []string `protobuf:"bytes,5,rep" json:"Destinations,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,3,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` + Mode *string `protobuf:"bytes,4,req,name=Mode" json:"Mode,omitempty"` + Destinations []string `protobuf:"bytes,5,rep,name=Destinations" json:"Destinations,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1333,9 +1340,9 @@ var E_CreateSubscriptionCommand_Command = &proto.ExtensionDesc{ } type DropSubscriptionCommand struct { - Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` - Database *string `protobuf:"bytes,2,req" json:"Database,omitempty"` - RetentionPolicy *string `protobuf:"bytes,3,req" json:"RetentionPolicy,omitempty"` + Name *string `protobuf:"bytes,1,req,name=Name" json:"Name,omitempty"` + Database *string `protobuf:"bytes,2,req,name=Database" json:"Database,omitempty"` + RetentionPolicy *string `protobuf:"bytes,3,req,name=RetentionPolicy" json:"RetentionPolicy,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1405,9 +1412,9 @@ var E_RemovePeerCommand_Command = &proto.ExtensionDesc{ } type Response struct { - OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"` - Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"` - Index *uint64 `protobuf:"varint,3,opt" json:"Index,omitempty"` + OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"` + Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` + Index *uint64 `protobuf:"varint,3,opt,name=Index" json:"Index,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1437,8 +1444,8 @@ func (m *Response) GetIndex() uint64 { } type ResponseHeader struct { - OK *bool `protobuf:"varint,1,req" json:"OK,omitempty"` - Error *string `protobuf:"bytes,2,opt" json:"Error,omitempty"` + OK *bool `protobuf:"varint,1,req,name=OK" json:"OK,omitempty"` + Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1461,7 +1468,7 @@ func (m *ResponseHeader) GetError() string { } type ErrorResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1477,9 +1484,9 @@ func (m *ErrorResponse) GetHeader() *ResponseHeader { } type FetchDataRequest struct { - Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"` - Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"` - Blocking *bool `protobuf:"varint,3,opt,def=0" json:"Blocking,omitempty"` + Index *uint64 `protobuf:"varint,1,req,name=Index" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,2,req,name=Term" json:"Term,omitempty"` + Blocking *bool `protobuf:"varint,3,opt,name=Blocking,def=0" json:"Blocking,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1511,10 +1518,10 @@ func (m *FetchDataRequest) GetBlocking() bool { } type FetchDataResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` - Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"` - Term *uint64 `protobuf:"varint,3,req" json:"Term,omitempty"` - Data []byte `protobuf:"bytes,4,opt" json:"Data,omitempty"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` + Index *uint64 `protobuf:"varint,2,req,name=Index" json:"Index,omitempty"` + Term *uint64 `protobuf:"varint,3,req,name=Term" json:"Term,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1551,7 +1558,7 @@ func (m *FetchDataResponse) GetData() []byte { } type JoinRequest struct { - Addr *string `protobuf:"bytes,1,req" json:"Addr,omitempty"` + Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -1567,15 +1574,11 @@ func (m *JoinRequest) GetAddr() string { } type JoinResponse struct { - Header *ResponseHeader `protobuf:"bytes,1,req" json:"Header,omitempty"` - // Indicates that this node should take part in the raft cluster. - EnableRaft *bool `protobuf:"varint,2,opt" json:"EnableRaft,omitempty"` - // The addresses of raft peers to use if joining as a raft member. If not joining - // as a raft member, these are the nodes running raft. - RaftNodes []string `protobuf:"bytes,3,rep" json:"RaftNodes,omitempty"` - // The node ID assigned to the requesting node. - NodeID *uint64 `protobuf:"varint,4,opt" json:"NodeID,omitempty"` - XXX_unrecognized []byte `json:"-"` + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` + EnableRaft *bool `protobuf:"varint,2,opt,name=EnableRaft" json:"EnableRaft,omitempty"` + RaftNodes []string `protobuf:"bytes,3,rep,name=RaftNodes" json:"RaftNodes,omitempty"` + NodeID *uint64 `protobuf:"varint,4,opt,name=NodeID" json:"NodeID,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *JoinResponse) Reset() { *m = JoinResponse{} } @@ -1610,6 +1613,54 @@ func (m *JoinResponse) GetNodeID() uint64 { return 0 } +type PromoteRaftRequest struct { + Addr *string `protobuf:"bytes,1,req,name=Addr" json:"Addr,omitempty"` + RaftNodes []string `protobuf:"bytes,2,rep,name=RaftNodes" json:"RaftNodes,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *PromoteRaftRequest) Reset() { *m = PromoteRaftRequest{} } +func (m *PromoteRaftRequest) String() string { return proto.CompactTextString(m) } +func (*PromoteRaftRequest) ProtoMessage() {} + +func (m *PromoteRaftRequest) GetAddr() string { + if m != nil && m.Addr != nil { + return *m.Addr + } + return "" +} + +func (m *PromoteRaftRequest) GetRaftNodes() []string { + if m != nil { + return m.RaftNodes + } + return nil +} + +type PromoteRaftResponse struct { + Header *ResponseHeader `protobuf:"bytes,1,req,name=Header" json:"Header,omitempty"` + Success *bool `protobuf:"varint,2,opt,name=Success" json:"Success,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *PromoteRaftResponse) Reset() { *m = PromoteRaftResponse{} } +func (m *PromoteRaftResponse) String() string { return proto.CompactTextString(m) } +func (*PromoteRaftResponse) ProtoMessage() {} + +func (m *PromoteRaftResponse) GetHeader() *ResponseHeader { + if m != nil { + return m.Header + } + return nil +} + +func (m *PromoteRaftResponse) GetSuccess() bool { + if m != nil && m.Success != nil { + return *m.Success + } + return false +} + func init() { proto.RegisterEnum("internal.RPCType", RPCType_name, RPCType_value) proto.RegisterEnum("internal.Command_Type", Command_Type_name, Command_Type_value) diff --git a/meta/internal/meta.proto b/meta/internal/meta.proto index 8605187d8a0..2308d6a3dff 100644 --- a/meta/internal/meta.proto +++ b/meta/internal/meta.proto @@ -322,6 +322,7 @@ enum RPCType { Error = 1; FetchData = 2; Join = 3; + PromoteRaft = 4; } message ResponseHeader { @@ -363,3 +364,14 @@ message JoinResponse { // The node ID assigned to the requesting node. optional uint64 NodeID = 4; } + +message PromoteRaftRequest { + required string Addr = 1; + repeated string RaftNodes = 2; +} + +message PromoteRaftResponse { + required ResponseHeader Header = 1; + + optional bool Success = 2; +} diff --git a/meta/rpc.go b/meta/rpc.go index 13696ceab66..e2edab792a5 100644 --- a/meta/rpc.go +++ b/meta/rpc.go @@ -28,9 +28,11 @@ type rpc struct { store interface { cachedData() *Data + enableLocalRaft() error IsLeader() bool Leader() string Peers() ([]string, error) + SetPeers(addrs []string) error AddPeer(host string) error CreateNode(host string) (*NodeInfo, error) NodeByHost(host string) (*NodeInfo, error) @@ -51,7 +53,7 @@ type Reply interface { } // proxyLeader proxies the connection to the current raft leader -func (r *rpc) proxyLeader(conn *net.TCPConn) { +func (r *rpc) proxyLeader(conn *net.TCPConn, buf []byte) { if r.store.Leader() == "" { r.sendError(conn, "no leader detected during proxyLeader") return @@ -65,6 +67,8 @@ func (r *rpc) proxyLeader(conn *net.TCPConn) { defer leaderConn.Close() leaderConn.Write([]byte{MuxRPCHeader}) + // re-write the original message to the leader + leaderConn.Write(buf) if err := proxy(leaderConn.(*net.TCPConn), conn); err != nil { r.sendError(conn, fmt.Sprintf("leader proxy error: %v", err)) } @@ -78,66 +82,25 @@ func (r *rpc) handleRPCConn(conn net.Conn) { // in the cluster. r.traceCluster("rpc connection from: %v", conn.RemoteAddr()) - if !r.store.IsLeader() { - r.proxyLeader(conn.(*net.TCPConn)) + // Read and execute request. + typ, buf, err := r.readMessage(conn) + // Handle unexpected RPC errors + if err != nil { + r.sendError(conn, err.Error()) return } - // Read and execute request. - typ, resp, err := func() (internal.RPCType, proto.Message, error) { - // Read request size. - var sz uint64 - if err := binary.Read(conn, binary.BigEndian, &sz); err != nil { - return internal.RPCType_Error, nil, fmt.Errorf("read size: %s", err) - } - - if sz == 0 { - return 0, nil, fmt.Errorf("invalid message size: %d", sz) - } - - if sz >= MaxMessageSize { - return 0, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz) - } - - // Read request. - buf := make([]byte, sz) - if _, err := io.ReadFull(conn, buf); err != nil { - return internal.RPCType_Error, nil, fmt.Errorf("read request: %s", err) - } - - // Determine the RPC type - rpcType := internal.RPCType(btou64(buf[0:8])) - buf = buf[8:] + if !r.store.IsLeader() && typ != internal.RPCType_PromoteRaft { + r.proxyLeader(conn.(*net.TCPConn), pack(typ, buf)) + return + } - r.traceCluster("recv %v request on: %v", rpcType, conn.RemoteAddr()) - switch rpcType { - case internal.RPCType_FetchData: - var req internal.FetchDataRequest - if err := proto.Unmarshal(buf, &req); err != nil { - return internal.RPCType_Error, nil, fmt.Errorf("fetch request unmarshal: %v", err) - } - resp, err := r.handleFetchData(&req) - return rpcType, resp, err - case internal.RPCType_Join: - var req internal.JoinRequest - if err := proto.Unmarshal(buf, &req); err != nil { - return internal.RPCType_Error, nil, fmt.Errorf("join request unmarshal: %v", err) - } - resp, err := r.handleJoinRequest(&req) - return rpcType, resp, err - default: - return internal.RPCType_Error, nil, fmt.Errorf("unknown rpc type:%v", rpcType) - } - }() + typ, resp, err := r.executeMessage(conn, typ, buf) // Handle unexpected RPC errors if err != nil { - resp = &internal.ErrorResponse{ - Header: &internal.ResponseHeader{ - OK: proto.Bool(false), - }, - } - typ = internal.RPCType_Error + r.sendError(conn, err.Error()) + return } // Set the status header and error message @@ -151,6 +114,63 @@ func (r *rpc) handleRPCConn(conn net.Conn) { r.sendResponse(conn, typ, resp) } +func (r *rpc) readMessage(conn net.Conn) (internal.RPCType, []byte, error) { + // Read request size. + var sz uint64 + if err := binary.Read(conn, binary.BigEndian, &sz); err != nil { + return internal.RPCType_Error, nil, fmt.Errorf("read size: %s", err) + } + + if sz == 0 { + return internal.RPCType_Error, nil, fmt.Errorf("invalid message size: %d", sz) + } + + if sz >= MaxMessageSize { + return internal.RPCType_Error, nil, fmt.Errorf("max message size of %d exceeded: %d", MaxMessageSize, sz) + } + + // Read request. + buf := make([]byte, sz) + if _, err := io.ReadFull(conn, buf); err != nil { + return internal.RPCType_Error, nil, fmt.Errorf("read request: %s", err) + } + + // Determine the RPC type + rpcType := internal.RPCType(btou64(buf[0:8])) + buf = buf[8:] + + r.traceCluster("recv %v request on: %v", rpcType, conn.RemoteAddr()) + return rpcType, buf, nil +} + +func (r *rpc) executeMessage(conn net.Conn, rpcType internal.RPCType, buf []byte) (internal.RPCType, proto.Message, error) { + switch rpcType { + case internal.RPCType_FetchData: + var req internal.FetchDataRequest + if err := proto.Unmarshal(buf, &req); err != nil { + return internal.RPCType_Error, nil, fmt.Errorf("fetch request unmarshal: %v", err) + } + resp, err := r.handleFetchData(&req) + return rpcType, resp, err + case internal.RPCType_Join: + var req internal.JoinRequest + if err := proto.Unmarshal(buf, &req); err != nil { + return internal.RPCType_Error, nil, fmt.Errorf("join request unmarshal: %v", err) + } + resp, err := r.handleJoinRequest(&req) + return rpcType, resp, err + case internal.RPCType_PromoteRaft: + var req internal.PromoteRaftRequest + if err := proto.Unmarshal(buf, &req); err != nil { + return internal.RPCType_Error, nil, fmt.Errorf("promote to raft request unmarshal: %v", err) + } + resp, err := r.handlePromoteRaftRequest(&req) + return rpcType, resp, err + default: + return internal.RPCType_Error, nil, fmt.Errorf("unknown rpc type:%v", rpcType) + } +} + func (r *rpc) sendResponse(conn net.Conn, typ internal.RPCType, resp proto.Message) { // Marshal the response back to a protobuf buf, err := proto.Marshal(resp) @@ -160,7 +180,7 @@ func (r *rpc) sendResponse(conn net.Conn, typ internal.RPCType, resp proto.Messa } // Encode response back to connection. - if _, err := conn.Write(r.pack(typ, buf)); err != nil { + if _, err := conn.Write(pack(typ, buf)); err != nil { r.logger.Printf("unable to write rpc response: %s", err) } } @@ -271,12 +291,39 @@ func (r *rpc) handleJoinRequest(req *internal.JoinRequest) (*internal.JoinRespon RaftNodes: peers, NodeID: proto.Uint64(nodeID), }, err +} + +func (r *rpc) handlePromoteRaftRequest(req *internal.PromoteRaftRequest) (*internal.PromoteRaftResponse, error) { + r.traceCluster("promote raft request from: %v", *req.Addr) + + // Need to set the local store peers to match what we are about to join + if err := r.store.SetPeers(req.RaftNodes); err != nil { + return nil, err + } + + if err := r.store.enableLocalRaft(); err != nil { + return nil, err + } + + if !contains(req.RaftNodes, *req.Addr) { + req.RaftNodes = append(req.RaftNodes, *req.Addr) + } + + if err := r.store.SetPeers(req.RaftNodes); err != nil { + return nil, err + } + return &internal.PromoteRaftResponse{ + Header: &internal.ResponseHeader{ + OK: proto.Bool(true), + }, + Success: proto.Bool(true), + }, nil } // pack returns a TLV style byte slice encoding the size of the payload, the RPC type // and the RPC data -func (r *rpc) pack(typ internal.RPCType, b []byte) []byte { +func pack(typ internal.RPCType, b []byte) []byte { buf := u64tob(uint64(len(b)) + 8) buf = append(buf, u64tob(uint64(typ))...) buf = append(buf, b...) @@ -353,6 +400,29 @@ func (r *rpc) join(localAddr, remoteAddr string) (*JoinResult, error) { } } +// enableRaft attempts to promote a node at remoteAddr using localAddr as the current +// node's cluster address +func (r *rpc) enableRaft(addr string, peers []string) error { + req := &internal.PromoteRaftRequest{ + Addr: proto.String(addr), + RaftNodes: peers, + } + + resp, err := r.call(addr, req) + if err != nil { + return err + } + + switch t := resp.(type) { + case *internal.PromoteRaftResponse: + return nil + case *internal.ErrorResponse: + return fmt.Errorf("rpc failed: %s", t.GetHeader().GetError()) + default: + return fmt.Errorf("rpc failed: unknown response type: %v", t.String()) + } +} + // call sends an encoded request to the remote leader and returns // an encoded response value. func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { @@ -363,6 +433,8 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { rpcType = internal.RPCType_Join case *internal.FetchDataRequest: rpcType = internal.RPCType_FetchData + case *internal.PromoteRaftRequest: + rpcType = internal.RPCType_PromoteRaft default: return nil, fmt.Errorf("unknown rpc request type: %v", t) } @@ -386,7 +458,7 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { } // Write request size & bytes. - if _, err := conn.Write(r.pack(rpcType, b)); err != nil { + if _, err := conn.Write(pack(rpcType, b)); err != nil { return nil, fmt.Errorf("write %v rpc: %s", rpcType, err) } @@ -419,6 +491,8 @@ func (r *rpc) call(dest string, req proto.Message) (proto.Message, error) { resp = &internal.FetchDataResponse{} case internal.RPCType_Error: resp = &internal.ErrorResponse{} + case internal.RPCType_PromoteRaft: + resp = &internal.PromoteRaftResponse{} default: return nil, fmt.Errorf("unknown rpc response type: %v", rpcType) } diff --git a/meta/rpc_test.go b/meta/rpc_test.go index 40f3540affb..76e77f86774 100644 --- a/meta/rpc_test.go +++ b/meta/rpc_test.go @@ -240,3 +240,9 @@ func (f *fakeStore) WaitForDataChanged() error { <-f.blockChan return nil } +func (f *fakeStore) enableLocalRaft() error { + return nil +} +func (f *fakeStore) SetPeers(addrs []string) error { + return nil +} diff --git a/meta/state.go b/meta/state.go index 2612302c30f..77c941b6760 100644 --- a/meta/state.go +++ b/meta/state.go @@ -35,6 +35,7 @@ type raftState interface { lastIndex() uint64 apply(b []byte) error snapshot() error + isLocal() bool } // localRaft is a consensus strategy that uses a local raft implementation for @@ -114,14 +115,15 @@ func (r *localRaft) open() error { config.ElectionTimeout = s.ElectionTimeout config.LeaderLeaseTimeout = s.LeaderLeaseTimeout config.CommitTimeout = s.CommitTimeout + // Since we actually never call `removePeer` this is safe. + // If in the future we decide to call remove peer we have to re-evaluate how to handle this + config.ShutdownOnRemove = false // If no peers are set in the config or there is one and we are it, then start as a single server. if len(s.peers) <= 1 { config.EnableSingleNode = true // Ensure we can always become the leader config.DisableBootstrapAfterElect = false - // Don't shutdown raft automatically if we renamed our hostname back to a previous name - config.ShutdownOnRemove = false } // Build raft layer to multiplex listener. @@ -152,7 +154,7 @@ func (r *localRaft) open() error { // is difficult to resolve automatically because we need to have all the raft peers agree on the current members // of the cluster before we can change them. if len(peers) > 0 && !raft.PeerContained(peers, s.RemoteAddr.String()) { - s.Logger.Printf("%v is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path()) + s.Logger.Printf("%s is not in the list of raft peers. Please update %v/peers.json on all raft nodes to have the same contents.", s.RemoteAddr.String(), s.Path()) return fmt.Errorf("peers out of sync: %v not in %v", s.RemoteAddr.String(), peers) } @@ -350,6 +352,10 @@ func (r *localRaft) isLeader() bool { return r.raft.State() == raft.Leader } +func (r *localRaft) isLocal() bool { + return true +} + // remoteRaft is a consensus strategy that uses a remote raft cluster for // consensus operations. type remoteRaft struct { @@ -468,6 +474,10 @@ func (r *remoteRaft) isLeader() bool { return false } +func (r *remoteRaft) isLocal() bool { + return false +} + func (r *remoteRaft) lastIndex() uint64 { return r.store.cachedData().Index } diff --git a/meta/store.go b/meta/store.go index 4915affdb87..be2ac0a2aea 100644 --- a/meta/store.go +++ b/meta/store.go @@ -14,6 +14,7 @@ import ( "net" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -123,6 +124,10 @@ type Store struct { // Returns an error if the password is invalid or a hash cannot be generated. hashPassword HashPasswordFn + // raftPromotionEnabled determines if non-raft nodes should be automatically + // promoted to a raft node to self-heal a raft cluster + raftPromotionEnabled bool + Logger *log.Logger } @@ -145,6 +150,7 @@ func NewStore(c *Config) *Store { clusterTracingEnabled: c.ClusterTracing, retentionAutoCreate: c.RetentionAutoCreate, + raftPromotionEnabled: c.RaftPromotionEnabled, HeartbeatTimeout: time.Duration(c.HeartbeatTimeout), ElectionTimeout: time.Duration(c.ElectionTimeout), @@ -255,7 +261,17 @@ func (s *Store) Open() error { // Wait for a leader to be elected so we know the raft log is loaded // and up to date <-s.ready - return s.WaitForLeader(0) + if err := s.WaitForLeader(0); err != nil { + return err + } + + if s.raftPromotionEnabled { + s.wg.Add(1) + s.Logger.Printf("spun up monitoring for %d", s.NodeID()) + go s.monitorPeerHealth() + } + + return nil } // syncNodeInfo continuously tries to update the current nodes hostname @@ -415,6 +431,77 @@ func (s *Store) changeState(state raftState) error { return nil } +// monitorPeerHealth periodically checks if we have a node that can be promoted to a +// raft peer to fill any missing slots. +// This function runs in a separate goroutine. +func (s *Store) monitorPeerHealth() { + defer s.wg.Done() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + // Wait for next tick or timeout. + select { + case <-ticker.C: + case <-s.closing: + return + } + if err := s.promoteNodeToPeer(); err != nil { + s.Logger.Printf("error promoting node to raft peer: %s", err) + } + } +} + +func (s *Store) promoteNodeToPeer() error { + // Only do this if you are the leader + + if !s.IsLeader() { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + peers, err := s.raftState.peers() + if err != nil { + return err + } + + nodes := s.data.Nodes + var nonraft NodeInfos + for _, n := range nodes { + if contains(peers, n.Host) { + continue + } + nonraft = append(nonraft, n) + } + + // Check to see if any action is required or possible + if len(peers) >= 3 || len(nonraft) == 0 { + return nil + } + + // Sort the nodes + sort.Sort(nonraft) + + // Get the lowest node for a deterministic outcome + n := nonraft[0] + // Set peers on the leader now to the new peers + if err := s.AddPeer(n.Host); err != nil { + return fmt.Errorf("unable to add raft peer %s on leader: %s", n.Host, err) + } + + // add node to peers list + peers = append(peers, n.Host) + if err := s.rpc.enableRaft(n.Host, peers); err != nil { + return fmt.Errorf("error notifying raft peer: %s", err) + } + s.Logger.Printf("promoted nodeID %d, host %s to raft peer", n.ID, n.Host) + + return nil +} + // openRaft initializes the raft store. func (s *Store) openRaft() error { return s.raftState.open() @@ -604,6 +691,13 @@ func (s *Store) IsLeader() bool { return s.raftState.isLeader() } +// IsLocal returns true if the store is currently participating in local raft. +func (s *Store) IsLocal() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.raftState.isLocal() +} + // Leader returns what the store thinks is the current leader. An empty // string indicates no leader exists. func (s *Store) Leader() string { diff --git a/meta/store_test.go b/meta/store_test.go index 6c7244ea1de..7a56ec3038b 100644 --- a/meta/store_test.go +++ b/meta/store_test.go @@ -1056,6 +1056,79 @@ func TestCluster_Restart(t *testing.T) { wg.Wait() } +// Ensure a multi-node cluster can start, join the cluster, and the first three members are raft nodes., then add a 4th non raft +// Remove a raft node, ensure the 4th promotes to raft +func TestCluster_ReplaceRaft(t *testing.T) { + t.Parallel() + // Start a single node. + c := MustOpenCluster(1) + defer c.Close() + + // Check that the node becomes leader. + if s := c.Leader(); s == nil { + t.Fatal("no leader found") + } + + // Add 2 more nodes. + for i := 0; i < 2; i++ { + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) + } + } + + // sleep to let them become raft + time.Sleep(time.Second) + + // ensure we have 3 raft nodes + for _, s := range c.Stores { + if !s.IsLocal() { + t.Fatalf("node %d is not a local raft instance.", s.NodeID()) + } + } + + // ensure all the nodes see the same metastore data + assertDatabaseReplicated(t, c) + + // Add another node + if err := c.Join(); err != nil { + t.Fatalf("failed to join cluster: %v", err) + } + + var leader, follower *Store + + // find a non-leader node + for _, s := range c.Stores { + if s.IsLeader() { + leader = s + } + // Find any follower to remove + if !s.IsLeader() && s.IsLocal() { + follower = s + } + if leader != nil && follower != nil { + break + } + } + + // drop the node + if err := leader.DeleteNode(follower.NodeID(), true); err != nil { + t.Fatal(err) + } + if err := c.Remove(follower.NodeID()); err != nil { + t.Fatal(err) + } + + // sleep to let them become raft + time.Sleep(1 * time.Second) + + // ensure we have 3 raft nodes + for _, s := range c.Stores { + if !s.IsLocal() { + t.Fatalf("node %d is not a local raft instance.", s.NodeID()) + } + } +} + // Store is a test wrapper for meta.Store. type Store struct { *meta.Store @@ -1149,13 +1222,14 @@ func (s *Store) Close() error { // NewConfig returns the default test configuration. func NewConfig(path string) *meta.Config { return &meta.Config{ - Dir: path, - Hostname: "localhost", - BindAddress: "127.0.0.1:0", - HeartbeatTimeout: toml.Duration(500 * time.Millisecond), - ElectionTimeout: toml.Duration(500 * time.Millisecond), - LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond), - CommitTimeout: toml.Duration(5 * time.Millisecond), + Dir: path, + Hostname: "localhost", + BindAddress: "127.0.0.1:0", + HeartbeatTimeout: toml.Duration(500 * time.Millisecond), + ElectionTimeout: toml.Duration(500 * time.Millisecond), + LeaderLeaseTimeout: toml.Duration(500 * time.Millisecond), + CommitTimeout: toml.Duration(5 * time.Millisecond), + RaftPromotionEnabled: true, } } @@ -1210,6 +1284,17 @@ func (c *Cluster) Join() error { return nil } +func (c *Cluster) Remove(nodeID uint64) error { + for i, s := range c.Stores { + if s.NodeID() == nodeID { + // This could hang for a variety of reasons, so don't wait for it + go s.Close() + c.Stores = append(c.Stores[:i], c.Stores[i+1:]...) + } + } + return nil +} + // Open opens and initializes all stores in the cluster. func (c *Cluster) Open() error { if err := func() error {