Skip to content

Commit

Permalink
Merge pull request #1467 from mrjana/networkdb
Browse files Browse the repository at this point in the history
Purge stale nodes with same prefix and IP
  • Loading branch information
Santhosh Manohar committed Sep 23, 2016
2 parents 378d556 + 520a4c5 commit bf3d9cc
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
14 changes: 7 additions & 7 deletions networkdb/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,6 @@ func (nDB *NetworkDB) reconnectNode() {
}
nDB.RUnlock()

// Update all the local state to a new time to force update on
// the node we are trying to rejoin, just in case that node
// has these in leaving/deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalStateTime()

node := nodes[randomOffset(len(nodes))]
addr := net.UDPAddr{IP: node.Addr, Port: int(node.Port)}

Expand All @@ -256,6 +249,13 @@ func (nDB *NetworkDB) reconnectNode() {
return
}

// Update all the local table state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in deleting state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
nDB.updateLocalTableTime()

logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name)
nDB.bulkSync([]string{node.Name}, true)
}
Expand Down
44 changes: 35 additions & 9 deletions networkdb/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package networkdb
import (
"fmt"
"net"
"strings"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -31,24 +32,44 @@ func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
return nil
}

delete(nDB.failedNodes, n.Name)
delete(nodes, n.Name)
return n
}
}

return nil
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)
func (nDB *NetworkDB) purgeSameNode(n *node) {
nDB.Lock()
defer nDB.Unlock()

prefix := strings.Split(n.Name, "-")[0]
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
var nodeNames []string
for name, node := range nodes {
if strings.HasPrefix(name, prefix) && n.Addr.Equal(node.Addr) {
nodeNames = append(nodeNames, name)
}
}

for _, name := range nodeNames {
delete(nodes, name)
}
}
}

func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
n := nDB.checkAndGetNode(nEvent)
if n == nil {
return false
}

nDB.purgeSameNode(n)
n.ltime = nEvent.LTime

switch nEvent.Type {
Expand Down Expand Up @@ -357,6 +378,15 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
}

func (d *delegate) LocalState(join bool) []byte {
if join {
// Update all the local node/network state to a new time to
// force update on the node we are trying to rejoin, just in
// case that node has these in leaving state still. This is
// facilitate fast convergence after recovering from a gossip
// failure.
d.nDB.updateLocalNetworkTime()
}

d.nDB.RLock()
defer d.nDB.RUnlock()

Expand Down Expand Up @@ -408,10 +438,6 @@ func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
return
}

if pp.LTime > 0 {
d.nDB.networkClock.Witness(pp.LTime)
}

nodeEvent := &NodeEvent{
LTime: pp.LTime,
NodeName: pp.NodeName,
Expand Down
9 changes: 7 additions & 2 deletions networkdb/networkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,16 +524,21 @@ func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
return networks
}

func (nDB *NetworkDB) updateLocalStateTime() {
func (nDB *NetworkDB) updateLocalNetworkTime() {
nDB.Lock()
defer nDB.Unlock()

ltime := nDB.networkClock.Increment()
for _, n := range nDB.networks[nDB.config.NodeName] {
n.ltime = ltime
}
}

func (nDB *NetworkDB) updateLocalTableTime() {
nDB.Lock()
defer nDB.Unlock()

ltime = nDB.tableClock.Increment()
ltime := nDB.tableClock.Increment()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry := v.(*entry)
if entry.node != nDB.config.NodeName {
Expand Down

0 comments on commit bf3d9cc

Please sign in to comment.