-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support joining nodes to an existing cluster #3372
Conversation
@@ -229,8 +229,8 @@ restore uses a snapshot of a data node to rebuild a cluster. | |||
|
|||
// Config represents a partial config for rebuilding the server. | |||
type Config struct { | |||
Meta meta.Config `toml:"meta"` | |||
Data tsdb.Config `toml:"data"` | |||
Meta *meta.Config `toml:"meta"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it will become apparent later, but why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needed to set a private join
value on the config if it's specified via the command-line.
How does this interact with the cached MetaStore we have/used-to-have? Are they completely different paths? And serve different purposes? |
@otoolep We still have the caching meta-store. The implementation differences are separated out into |
node, err := func() (*NodeInfo, error) { | ||
// attempt to create the node | ||
node, err := r.store.CreateNode(*req.Addr) | ||
// if it exists, return the exting node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: exting -> existing
Took a second look, generally makes sense. I think @benbjohnson really needs to chime in as well though. |
ClusterTracing bool `toml:"cluster-tracing"` | ||
|
||
// The join command-line argument | ||
join string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this unexported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's set as a command-line arg. It it's public, influxd config
lists it as a config option which is not valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you do toml:"-"
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try. But the Config
still needs to be mutable.
NodeByHost(host string) (*NodeInfo, error) | ||
WaitForDataChanged() error | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why export RPC
but not tracingEnabled
or store
? It seems like they could all be exported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah right.. I think I intended for all this to be private at one point but some is public inadvertently. I'll make fix it up and either make it all private or all public.
Can you explain a bit further how data is sharded after a new data node joins? One very common use case for doing this is an instance where your current cluster is running out of disk space. Is it at all possible to re-shard the data slowly over time to balance total disk utilization of the entire cluster? |
@Jhors2 When new nodes are added, they become part of the pool of nodes that shards can be assigned. Existing shards are not automatically rebalanced or moved. New shards (created when new shard groups are created), will be assigned to both new and existing nodes. |
Non-raft nodes need to be notifified when the metastore changes. For example, a database could be dropped on node 1 (non-raft) and node 2 would not know. Since queries for that database would not be a cache miss, node 2 would not get updated. To propogate changes to non-raft nodes, each non-raft node maintains a blocking connection to a raft node that blocks until a metadata change occurs. When the change is triggered, the updated metadata is returned to the client and the client idempotently updates its local cache. It then reconnects and waits for another change. This is similar watches in zookeeper or etcd. Since the blocking request is always recreated, it also serves as a polling mechanism that will retry another raft member if the current connection is lost.
store.go is getting big.
Not used
Not needed since it was just used as a safeguard for seeing if we are the leader.
Will try each once until one succeeds
Useful for troubleshooting but too verbose for regular use.
Support joining nodes to an existing cluster
Overview
This PR adds support for joining a new node to an existing cluster. It implements some of the functionality in #2966.
The way it works is that an existing cluster with an established raft leader must be running. The new node should be started with
-join addr:port
whereaddr
is the hostname/IP of any existing member and port is the cluster port (default 8088). The new node will attempt to join the cluster and be assigned a node ID. Future shards that are created will be distributed across the cluster and sometimes on the new node.Queries and writes can go to any node on any of the standard service ports. Queries are not addressed by this PR and currently being worked on in other PRs.
Implementation Details
When there are more than 3 nodes in a cluster, new nodes will not take part in the raft cluster. All of the raft implementation is encapsulated in the
meta.Store
. To keep themeta.Store
implementation from having to check whether it's part of the raft cluster or not, the raft details have been pulled out into araftState
implementation. Themeta.Store
now delegates raft related call to theraftState
.There are two implementations of
raftState
:localRaft
andremoteRaft
.localRaft
changes the behavior of themeta.Store
to take part of the raft cluster.remoteRaft
causes themeta.Store
to operate using a local cache with remote calls to the raft cluster. In a subsequent PR, nodes will able to be promoted to raft cluster members and this state pattern will make it easier to change state dynamically.When a node is started with join options, it initiates a
JoinRequest
RPC to an existing members. If the member is not the raft leader, the request is proxied to the current leader. The response to theJoinRequest
indicates the node ID, whether the node should join the raft cluster (currently alwaysfalse
), and the current set of raft peers. If the node should join the raft cluster, it operates as before. If not, the node calls aFetchMetaData
RPC to the raft cluster (auto-proxied to leader) and then starts up.Changes to the meta-store in the raft cluster need to be propagated to non-raft members. This is handled by blocking
FetchMetaData
calls initiated by each non-raft member. They maintain a blocking call that waits for ameta.Data
change. When triggered, the updatedmeta.Data
is returned and the non-raft member updates it's local cache if needed. The blocking call is then repeated indefinitely. This is similar to have zookeeper/etcd watches works.Not implemented TODO
The following things are still not implemented: