From 520a4c52b8d1702afcff648be1aecb22104b1d33 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Fri, 23 Sep 2016 14:42:17 -0700 Subject: [PATCH] Purge stale nodes with same prefix and IP Since the node name randomization fix, we need to make sure that we purge the old node with the same prefix and same IP from the nodes database if it still present. This causes unnecessary reconnect attempts. Also added a change to avoid unnecessary update of local lamport time and only do it of we are ready to do a push pull on a join. Join should happen only when the node is bootstrapped or when trying to reconnect with a failed node. Signed-off-by: Jana Radhakrishnan --- networkdb/cluster.go | 14 +++++++------- networkdb/delegate.go | 44 +++++++++++++++++++++++++++++++++--------- networkdb/networkdb.go | 9 +++++++-- 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/networkdb/cluster.go b/networkdb/cluster.go index 9d9ea163d6..3b624c9a27 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() { } nDB.RUnlock() - // Update all the local state to a new time to force update on - // the node we are trying to rejoin, just in case that node - // has these in leaving/deleting state still. This is - // facilitate fast convergence after recovering from a gossip - // failure. - nDB.updateLocalStateTime() - node := nodes[randomOffset(len(nodes))] addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)} @@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() { return } + // Update all the local table state to a new time to + // force update on the node we are trying to rejoin, just in + // case that node has these in deleting state still. This is + // facilitate fast convergence after recovering from a gossip + // failure. + nDB.updateLocalTableTime() + logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) nDB.bulkSync([]string{node.Name}, true) } diff --git a/networkdb/delegate.go b/networkdb/delegate.go index 3e96384465..eb8d18557d 100644 --- a/networkdb/delegate.go +++ b/networkdb/delegate.go @@ -3,6 +3,7 @@ package networkdb import ( "fmt" "net" + "strings" "time" "github.com/Sirupsen/logrus" @@ -31,7 +32,7 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { return nil } - delete(nDB.failedNodes, n.Name) + delete(nodes, n.Name) return n } } @@ -39,16 +40,36 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { return nil } -func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { - // Update our local clock if the received messages has newer - // time. - nDB.networkClock.Witness(nEvent.LTime) +func (nDB *NetworkDB) purgeSameNode(n *node) { + nDB.Lock() + defer nDB.Unlock() + prefix := strings.Split(n.Name, "-")[0] + for _, nodes := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + nDB.nodes, + } { + var nodeNames []string + for name, node := range nodes { + if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) { + nodeNames = append(nodeNames, name) + } + } + + for _, name := range nodeNames { + delete(nodes, name) + } + } +} + +func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { n := nDB.checkAndGetNode(nEvent) if n == nil { return false } + nDB.purgeSameNode(n) n.ltime = nEvent.LTime switch nEvent.Type { @@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { } func (d *delegate) LocalState(join bool) []byte { + if join { + // Update all the local node/network state to a new time to + // force update on the node we are trying to rejoin, just in + // case that node has these in leaving state still. This is + // facilitate fast convergence after recovering from a gossip + // failure. + d.nDB.updateLocalNetworkTime() + } + d.nDB.RLock() defer d.nDB.RUnlock() @@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) { return } - if pp.LTime > 0 { - d.nDB.networkClock.Witness(pp.LTime) - } - nodeEvent := &NodeEvent{ LTime: pp.LTime, NodeName: pp.NodeName, diff --git a/networkdb/networkdb.go b/networkdb/networkdb.go index 1502d7300e..a8c942c9cc 100644 --- a/networkdb/networkdb.go +++ b/networkdb/networkdb.go @@ -524,7 +524,7 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string { return networks } -func (nDB *NetworkDB) updateLocalStateTime() { +func (nDB *NetworkDB) updateLocalNetworkTime() { nDB.Lock() defer nDB.Unlock() @@ -532,8 +532,13 @@ func (nDB *NetworkDB) updateLocalStateTime() { for _, n := range nDB.networks[nDB.config.NodeName] { n.ltime = ltime } +} + +func (nDB *NetworkDB) updateLocalTableTime() { + nDB.Lock() + defer nDB.Unlock() - ltime = nDB.tableClock.Increment() + ltime := nDB.tableClock.Increment() nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { entry := v.(*entry) if entry.node != nDB.config.NodeName {