Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p, swarm: fix node up races by granular locking #18976

Merged
merged 12 commits into from
Feb 18, 2019
Merged
2 changes: 1 addition & 1 deletion p2p/simulations/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event {
func (e *Event) String() string {
switch e.Type {
case EventTypeNode:
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up)
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up())
case EventTypeConn:
return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up)
case EventTypeMsg:
Expand Down
18 changes: 10 additions & 8 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,15 @@ type expectEvents struct {
}

func (t *expectEvents) nodeEvent(id string, up bool) *Event {
node := Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
up: up,
}
return &Event{
Type: EventTypeNode,
Node: &Node{
Config: &adapters.NodeConfig{
ID: enode.HexID(id),
},
Up: up,
},
Node: &node,
}
}

Expand Down Expand Up @@ -480,6 +481,7 @@ loop:
}

func (t *expectEvents) expect(events ...*Event) {
t.Helper()
timeout := time.After(10 * time.Second)
i := 0
for {
Expand All @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) {
if event.Node.ID() != expected.Node.ID() {
t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString())
}
if event.Node.Up != expected.Node.Up {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up)
if event.Node.Up() != expected.Node.Up() {
t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up())
}

case EventTypeConn:
Expand Down
9 changes: 5 additions & 4 deletions p2p/simulations/mocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) {
for {
select {
case event := <-events:
//if the event is a node Up event only
if event.Node != nil && event.Node.Up {
if isNodeUp(event) {
//add the correspondent node ID to the map
nodemap[event.Node.Config.ID] = true
//this means all nodes got a nodeUp event, so we can continue the test
if len(nodemap) == nodeCount {
nodesComplete = true
//wait for 3s as the mocker will need time to connect the nodes
//time.Sleep( 3 *time.Second)
}
} else if event.Conn != nil && nodesComplete {
connCount += 1
Expand Down Expand Up @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) {
t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo))
}
}

func isNodeUp(event *Event) bool {
return event.Node != nil && event.Node.Up()
}
84 changes: 53 additions & 31 deletions p2p/simulations/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig {
// StartAll starts all nodes in the network
func (net *Network) StartAll() error {
for _, node := range net.Nodes {
if node.Up {
if node.Up() {
continue
}
if err := net.Start(node.ID()); err != nil {
Expand All @@ -149,7 +149,7 @@ func (net *Network) StartAll() error {
// StopAll stops all nodes in the network
func (net *Network) StopAll() error {
for _, node := range net.Nodes {
if !node.Up {
if !node.Up() {
continue
}
if err := net.Stop(node.ID()); err != nil {
Expand All @@ -168,27 +168,23 @@ func (net *Network) Start(id enode.ID) error {
// snapshots
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
net.lock.Lock()
defer net.lock.Unlock()

node := net.getNode(id)
if node == nil {
net.lock.Unlock()
return fmt.Errorf("node %v does not exist", id)
}
if node.Up {
net.lock.Unlock()
if node.Up() {
return fmt.Errorf("node %v already up", id)
}
log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
if err := node.Start(snapshots); err != nil {
net.lock.Unlock()
log.Warn("Node startup failed", "id", id, "err", err)
return err
}
node.Up = true
node.SetUp(true)
log.Info("Started node", "id", id)
ev := NewEvent(node)
net.lock.Unlock()

net.events.Send(ev)

// subscribe to peer events
Expand Down Expand Up @@ -219,7 +215,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
if node == nil {
return
}
node.Up = false
node.SetUp(false)
ev := NewEvent(node)
net.events.Send(ev)
}()
Expand Down Expand Up @@ -258,29 +254,23 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub
// Stop stops the node with the given ID
func (net *Network) Stop(id enode.ID) error {
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
net.lock.Unlock()
return fmt.Errorf("node %v does not exist", id)
}
if !node.Up {
net.lock.Unlock()
if !node.Up() {
return fmt.Errorf("node %v already down", id)
}
node.Up = false
net.lock.Unlock()
node.SetUp(false)

err := node.Stop()
if err != nil {
net.lock.Lock()
node.Up = true
net.lock.Unlock()
node.SetUp(true)
return err
}
log.Info("Stopped node", "id", id, "err", err)
net.lock.Lock()
ev := ControlEvent(node)
net.lock.Unlock()
net.events.Send(ev)
return nil
}
Expand Down Expand Up @@ -430,7 +420,7 @@ func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {

func (net *Network) getUpNodeIDs() (ids []enode.ID) {
for _, node := range net.Nodes {
if node.Up {
if node.Up() {
ids = append(ids, node.ID())
}
}
Expand All @@ -446,7 +436,7 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {

func (net *Network) getDownNodeIDs() (ids []enode.ID) {
for _, node := range net.GetNodes() {
if !node.Up {
if !node.Up() {
ids = append(ids, node.ID())
}
}
Expand Down Expand Up @@ -595,8 +585,21 @@ type Node struct {
// Config if the config used to created the node
Config *adapters.NodeConfig `json:"config"`

// Up tracks whether or not the node is running
Up bool `json:"up"`
// up tracks whether or not the node is running
up bool
zelig marked this conversation as resolved.
Show resolved Hide resolved
upMu sync.RWMutex
}

func (n *Node) Up() bool {
n.upMu.RLock()
frncmx marked this conversation as resolved.
Show resolved Hide resolved
defer n.upMu.RUnlock()
return n.up
}

func (n *Node) SetUp(up bool) {
n.upMu.Lock()
defer n.upMu.Unlock()
n.up = up
}

// ID returns the ID of the node
Expand Down Expand Up @@ -630,10 +633,29 @@ func (n *Node) MarshalJSON() ([]byte, error) {
}{
Info: n.NodeInfo(),
Config: n.Config,
Up: n.Up,
Up: n.Up(),
})
}

// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose
// Node.up status. IMPORTANT: The implementation is incomplete; we lose
acud marked this conversation as resolved.
Show resolved Hide resolved
// p2p.NodeInfo.
func (n *Node) UnmarshalJSON(raw []byte) error {
// TODO: How should we turn back NodeInfo into n.Node?
// Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
node := struct {
Config *adapters.NodeConfig `json:"config,omitempty"`
Up bool `json:"up"`
}{}
if err := json.Unmarshal(raw, &node); err != nil {
return err
}

n.SetUp(node.Up)
n.Config = node.Config
return nil
}

// Conn represents a connection between two nodes in the network
type Conn struct {
// One is the node which initiated the connection
Expand All @@ -653,10 +675,10 @@ type Conn struct {

// nodesUp returns whether both nodes are currently up
func (c *Conn) nodesUp() error {
if !c.one.Up {
if !c.one.Up() {
return fmt.Errorf("one %v is not up", c.One)
}
if !c.other.Up {
if !c.other.Up() {
return fmt.Errorf("other %v is not up", c.Other)
}
return nil
Expand Down Expand Up @@ -728,7 +750,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn
}
for i, node := range net.Nodes {
snap.Nodes[i] = NodeSnapshot{Node: *node}
if !node.Up {
if !node.Up() {
continue
}
snapshots, err := node.Snapshots()
Expand Down Expand Up @@ -783,7 +805,7 @@ func (net *Network) Load(snap *Snapshot) error {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
}
if !n.Node.Up {
if !n.Node.Up() {
continue
}
if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
Expand Down Expand Up @@ -855,7 +877,7 @@ func (net *Network) Load(snap *Snapshot) error {
// Start connecting.
for _, conn := range snap.Conns {

if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up {
if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
//in this case, at least one of the nodes of a connection is not up,
//so it would result in the snapshot `Load` to fail
continue
Expand Down Expand Up @@ -909,7 +931,7 @@ func (net *Network) executeControlEvent(event *Event) {
}

func (net *Network) executeNodeEvent(e *Event) error {
if !e.Node.Up {
if !e.Node.Up() {
return net.Stop(e.Node.ID())
}

Expand Down
Loading