From 03968427e35e2f5fa29cf7ad22f2fecf241e6b1f Mon Sep 17 00:00:00 2001 From: HighGoal1991 Date: Mon, 18 Feb 2019 07:38:14 +0100 Subject: [PATCH] p2p, swarm: fix node up races by granular locking (#18976) * swarm/network: DRY out repeated giga comment I not necessarily agree with the way we wait for event propagation. But I truly disagree with having duplicated giga comments. * p2p/simulations: encapsulate Node.Up field so we avoid data races The Node.Up field was accessed concurrently without "proper" locking. There was a lock on Network and that was used sometimes to access the field. Other times the locking was missed and we had a data race. For example: https://github.com/ethereum/go-ethereum/pull/18464 The case above was solved, but there were still intermittent/hard to reproduce races. So let's solve the issue permanently. resolves: ethersphere/go-ethereum#1146 * p2p/simulations: fix unmarshal of simulations.Node Making Node.Up field private in 13292ee897e345045fbfab3bda23a77589a271c1 broke TestHTTPNetwork and TestHTTPSnapshot. Because the default UnmarshalJSON does not handle unexported fields. Important: The fix is partial and not proper to my taste. But I cut scope as I think the fix may require a change to the current serialization format. New ticket: https://github.com/ethersphere/go-ethereum/issues/1177 * p2p/simulations: Add a sanity test case for Node.Config UnmarshalJSON * p2p/simulations: revert back to defer Unlock() pattern for Network It's a good patten to call `defer Unlock()` right after `Lock()` so (new) error cases won't miss to unlock. Let's get back to that pattern. The patten was abandoned in 85a79b3ad3c5863f8612d25c246bcfad339f36b7, while fixing a data race. That data race does not exist anymore, since the Node.Up field got hidden behind its own lock. * p2p/simulations: consistent naming for test providers Node.UnmarshalJSON * p2p/simulations: remove JSON annotation from private fields of Node As unexported fields are not serialized. * p2p/simulations: fix deadlock in Network.GetRandomDownNode() Problem: GetRandomDownNode() locks -> getDownNodeIDs() -> GetNodes() tries to lock -> deadlock On Network type, unexported functions must assume that `net.lock` is already acquired and should not call exported functions which might try to lock again. * p2p/simulations: ensure method conformity for Network Connect* methods were moved to p2p/simulations.Network from swarm/network/simulation. However these new methods did not follow the pattern of Network methods, i.e., all exported method locks the whole Network either for read or write. * p2p/simulations: fix deadlock during network shutdown `TestDiscoveryPersistenceSimulationSimAdapter` often got into deadlock. The execution was stuck on two locks, i.e, `Kademlia.lock` and `p2p/simulations.Network.lock`. Usually the test got stuck once in each 20 executions with high confidence. `Kademlia` was stuck in `Kademlia.EachAddr()` and `Network` in `Network.Stop()`. Solution: in `Network.Stop()` `net.lock` must be released before calling `node.Stop()` as stopping a node (somehow - I did not find the exact code path) causes `Network.InitConn()` to be called from `Kademlia.SuggestPeer()` and that blocks on `net.lock`. Related ticket: https://github.com/ethersphere/go-ethereum/issues/1223 * swarm/state: simplify if statement in DBStore.Put() * p2p/simulations: remove faulty godoc from private function The comment started with the wrong method name. The method is simple and self explanatory. Also, it's private. => Let's just remove the comment. (cherry picked from commit 50b872bf05b8644f14b9bea340092ced6968dd59) --- p2p/simulations/connect.go | 43 ++++-- p2p/simulations/events.go | 2 +- p2p/simulations/http_test.go | 18 ++- p2p/simulations/mocker_test.go | 9 +- p2p/simulations/network.go | 155 ++++++++++++++------ p2p/simulations/network_test.go | 135 +++++++++++++++++ swarm/network/simulation/node.go | 4 +- swarm/network/simulation/node_test.go | 63 +++----- swarm/network/simulation/service.go | 2 +- swarm/network/simulation/simulation_test.go | 4 +- swarm/network/simulations/overlay_test.go | 2 +- swarm/state/dbstore.go | 9 +- 12 files changed, 323 insertions(+), 123 deletions(-) diff --git a/p2p/simulations/connect.go b/p2p/simulations/connect.go index bb7e7999a..ede96b34c 100644 --- a/p2p/simulations/connect.go +++ b/p2p/simulations/connect.go @@ -32,6 +32,9 @@ var ( // It is useful when constructing a chain network topology // when Network adds and removes nodes dynamically. func (net *Network) ConnectToLastNode(id enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + ids := net.getUpNodeIDs() l := len(ids) if l < 2 { @@ -41,29 +44,35 @@ func (net *Network) ConnectToLastNode(id enode.ID) (err error) { if last == id { last = ids[l-2] } - return net.connect(last, id) + return net.connectNotConnected(last, id) } // ConnectToRandomNode connects the node with provided NodeID // to a random node that is up. func (net *Network) ConnectToRandomNode(id enode.ID) (err error) { - selected := net.GetRandomUpNode(id) + net.lock.Lock() + defer net.lock.Unlock() + + selected := net.getRandomUpNode(id) if selected == nil { return ErrNodeNotFound } - return net.connect(selected.ID(), id) + return net.connectNotConnected(selected.ID(), id) } // ConnectNodesFull connects all nodes one to another. // It provides a complete connectivity in the network // which should be rarely needed. func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } for i, lid := range ids { for _, rid := range ids[i+1:] { - if err = net.connect(lid, rid); err != nil { + if err = net.connectNotConnected(lid, rid); err != nil { return err } } @@ -74,12 +83,19 @@ func (net *Network) ConnectNodesFull(ids []enode.ID) (err error) { // ConnectNodesChain connects all nodes in a chain topology. // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + + return net.connectNodesChain(ids) +} + +func (net *Network) connectNodesChain(ids []enode.ID) (err error) { if ids == nil { ids = net.getUpNodeIDs() } l := len(ids) for i := 0; i < l-1; i++ { - if err := net.connect(ids[i], ids[i+1]); err != nil { + if err := net.connectNotConnected(ids[i], ids[i+1]); err != nil { return err } } @@ -89,6 +105,9 @@ func (net *Network) ConnectNodesChain(ids []enode.ID) (err error) { // ConnectNodesRing connects all nodes in a ring topology. // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } @@ -96,15 +115,18 @@ func (net *Network) ConnectNodesRing(ids []enode.ID) (err error) { if l < 2 { return nil } - if err := net.ConnectNodesChain(ids); err != nil { + if err := net.connectNodesChain(ids); err != nil { return err } - return net.connect(ids[l-1], ids[0]) + return net.connectNotConnected(ids[l-1], ids[0]) } // ConnectNodesStar connects all nodes into a star topology // If ids argument is nil, all nodes that are up will be connected. func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error) { + net.lock.Lock() + defer net.lock.Unlock() + if ids == nil { ids = net.getUpNodeIDs() } @@ -112,16 +134,15 @@ func (net *Network) ConnectNodesStar(ids []enode.ID, center enode.ID) (err error if center == id { continue } - if err := net.connect(center, id); err != nil { + if err := net.connectNotConnected(center, id); err != nil { return err } } return nil } -// connect connects two nodes but ignores already connected error. -func (net *Network) connect(oneID, otherID enode.ID) error { - return ignoreAlreadyConnectedErr(net.Connect(oneID, otherID)) +func (net *Network) connectNotConnected(oneID, otherID enode.ID) error { + return ignoreAlreadyConnectedErr(net.connect(oneID, otherID)) } func ignoreAlreadyConnectedErr(err error) error { diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go index 9b2a990e0..984c2e088 100644 --- a/p2p/simulations/events.go +++ b/p2p/simulations/events.go @@ -100,7 +100,7 @@ func ControlEvent(v interface{}) *Event { func (e *Event) String() string { switch e.Type { case EventTypeNode: - return fmt.Sprintf(" id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up) + return fmt.Sprintf(" id: %s up: %t", e.Node.ID().TerminalString(), e.Node.Up()) case EventTypeConn: return fmt.Sprintf(" nodes: %s->%s up: %t", e.Conn.One.TerminalString(), e.Conn.Other.TerminalString(), e.Conn.Up) case EventTypeMsg: diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go index c0a5acb3d..ed43c0ed7 100644 --- a/p2p/simulations/http_test.go +++ b/p2p/simulations/http_test.go @@ -421,14 +421,15 @@ type expectEvents struct { } func (t *expectEvents) nodeEvent(id string, up bool) *Event { + node := Node{ + Config: &adapters.NodeConfig{ + ID: enode.HexID(id), + }, + up: up, + } return &Event{ Type: EventTypeNode, - Node: &Node{ - Config: &adapters.NodeConfig{ - ID: enode.HexID(id), - }, - Up: up, - }, + Node: &node, } } @@ -480,6 +481,7 @@ loop: } func (t *expectEvents) expect(events ...*Event) { + t.Helper() timeout := time.After(10 * time.Second) i := 0 for { @@ -501,8 +503,8 @@ func (t *expectEvents) expect(events ...*Event) { if event.Node.ID() != expected.Node.ID() { t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().TerminalString(), event.Node.ID().TerminalString()) } - if event.Node.Up != expected.Node.Up { - t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + if event.Node.Up() != expected.Node.Up() { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up(), event.Node.Up()) } case EventTypeConn: diff --git a/p2p/simulations/mocker_test.go b/p2p/simulations/mocker_test.go index 192be1732..069040257 100644 --- a/p2p/simulations/mocker_test.go +++ b/p2p/simulations/mocker_test.go @@ -90,15 +90,12 @@ func TestMocker(t *testing.T) { for { select { case event := <-events: - //if the event is a node Up event only - if event.Node != nil && event.Node.Up { + if isNodeUp(event) { //add the correspondent node ID to the map nodemap[event.Node.Config.ID] = true //this means all nodes got a nodeUp event, so we can continue the test if len(nodemap) == nodeCount { nodesComplete = true - //wait for 3s as the mocker will need time to connect the nodes - //time.Sleep( 3 *time.Second) } } else if event.Conn != nil && nodesComplete { connCount += 1 @@ -169,3 +166,7 @@ func TestMocker(t *testing.T) { t.Fatalf("Expected empty list of nodes, got: %d", len(nodesInfo)) } } + +func isNodeUp(event *Event) bool { + return event.Node != nil && event.Node.Up() +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index a0e621b88..2049a5108 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -136,7 +136,7 @@ func (net *Network) Config() *NetworkConfig { // StartAll starts all nodes in the network func (net *Network) StartAll() error { for _, node := range net.Nodes { - if node.Up { + if node.Up() { continue } if err := net.Start(node.ID()); err != nil { @@ -149,7 +149,7 @@ func (net *Network) StartAll() error { // StopAll stops all nodes in the network func (net *Network) StopAll() error { for _, node := range net.Nodes { - if !node.Up { + if !node.Up() { continue } if err := net.Stop(node.ID()); err != nil { @@ -168,27 +168,23 @@ func (net *Network) Start(id enode.ID) error { // snapshots func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error { net.lock.Lock() + defer net.lock.Unlock() node := net.getNode(id) if node == nil { - net.lock.Unlock() return fmt.Errorf("node %v does not exist", id) } - if node.Up { - net.lock.Unlock() + if node.Up() { return fmt.Errorf("node %v already up", id) } log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name()) if err := node.Start(snapshots); err != nil { - net.lock.Unlock() log.Warn("Node startup failed", "id", id, "err", err) return err } - node.Up = true + node.SetUp(true) log.Info("Started node", "id", id) ev := NewEvent(node) - net.lock.Unlock() - net.events.Send(ev) // subscribe to peer events @@ -219,7 +215,7 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub if node == nil { return } - node.Up = false + node.SetUp(false) ev := NewEvent(node) net.events.Send(ev) }() @@ -257,30 +253,42 @@ func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub // Stop stops the node with the given ID func (net *Network) Stop(id enode.ID) error { - net.lock.Lock() - node := net.getNode(id) - if node == nil { - net.lock.Unlock() - return fmt.Errorf("node %v does not exist", id) - } - if !node.Up { - net.lock.Unlock() - return fmt.Errorf("node %v already down", id) + // IMPORTANT: node.Stop() must NOT be called under net.lock as + // node.Reachable() closure has a reference to the network and + // calls net.InitConn() what also locks the network. => DEADLOCK + // That holds until the following ticket is not resolved: + + var err error + + node, err := func() (*Node, error) { + net.lock.Lock() + defer net.lock.Unlock() + + node := net.getNode(id) + if node == nil { + return nil, fmt.Errorf("node %v does not exist", id) + } + if !node.Up() { + return nil, fmt.Errorf("node %v already down", id) + } + node.SetUp(false) + return node, nil + }() + if err != nil { + return err } - node.Up = false - net.lock.Unlock() - err := node.Stop() + err = node.Stop() // must be called without net.lock + + net.lock.Lock() + defer net.lock.Unlock() + if err != nil { - net.lock.Lock() - node.Up = true - net.lock.Unlock() + node.SetUp(true) return err } log.Info("Stopped node", "id", id, "err", err) - net.lock.Lock() ev := ControlEvent(node) - net.lock.Unlock() net.events.Send(ev) return nil } @@ -288,8 +296,14 @@ func (net *Network) Stop(id enode.ID) error { // Connect connects two nodes together by calling the "admin_addPeer" RPC // method on the "one" node so that it connects to the "other" node func (net *Network) Connect(oneID, otherID enode.ID) error { + net.lock.Lock() + defer net.lock.Unlock() + return net.connect(oneID, otherID) +} + +func (net *Network) connect(oneID, otherID enode.ID) error { log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID) - conn, err := net.InitConn(oneID, otherID) + conn, err := net.initConn(oneID, otherID) if err != nil { return err } @@ -387,6 +401,14 @@ func (net *Network) GetNode(id enode.ID) *Node { return net.getNode(id) } +func (net *Network) getNode(id enode.ID) *Node { + i, found := net.nodeMap[id] + if !found { + return nil + } + return net.Nodes[i] +} + // GetNode gets the node with the given name, returning nil if the node does // not exist func (net *Network) GetNodeByName(name string) *Node { @@ -409,28 +431,29 @@ func (net *Network) GetNodes() (nodes []*Node) { net.lock.RLock() defer net.lock.RUnlock() - nodes = append(nodes, net.Nodes...) - return nodes + return net.getNodes() } -func (net *Network) getNode(id enode.ID) *Node { - i, found := net.nodeMap[id] - if !found { - return nil - } - return net.Nodes[i] +func (net *Network) getNodes() (nodes []*Node) { + nodes = append(nodes, net.Nodes...) + return nodes } // GetRandomUpNode returns a random node on the network, which is running. func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node { net.lock.RLock() defer net.lock.RUnlock() + return net.getRandomUpNode(excludeIDs...) +} + +// GetRandomUpNode returns a random node on the network, which is running. +func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node { return net.getRandomNode(net.getUpNodeIDs(), excludeIDs) } func (net *Network) getUpNodeIDs() (ids []enode.ID) { for _, node := range net.Nodes { - if node.Up { + if node.Up() { ids = append(ids, node.ID()) } } @@ -445,8 +468,8 @@ func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node { } func (net *Network) getDownNodeIDs() (ids []enode.ID) { - for _, node := range net.GetNodes() { - if !node.Up { + for _, node := range net.getNodes() { + if !node.Up() { ids = append(ids, node.ID()) } } @@ -538,6 +561,10 @@ func (net *Network) getConn(oneID, otherID enode.ID) *Conn { func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) { net.lock.Lock() defer net.lock.Unlock() + return net.initConn(oneID, otherID) +} + +func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) { if oneID == otherID { return nil, fmt.Errorf("refusing to connect to self %v", oneID) } @@ -595,8 +622,21 @@ type Node struct { // Config if the config used to created the node Config *adapters.NodeConfig `json:"config"` - // Up tracks whether or not the node is running - Up bool `json:"up"` + // up tracks whether or not the node is running + up bool + upMu sync.RWMutex +} + +func (n *Node) Up() bool { + n.upMu.RLock() + defer n.upMu.RUnlock() + return n.up +} + +func (n *Node) SetUp(up bool) { + n.upMu.Lock() + defer n.upMu.Unlock() + n.up = up } // ID returns the ID of the node @@ -630,10 +670,29 @@ func (n *Node) MarshalJSON() ([]byte, error) { }{ Info: n.NodeInfo(), Config: n.Config, - Up: n.Up, + Up: n.Up(), }) } +// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose +// Node.up status. IMPORTANT: The implementation is incomplete; we lose +// p2p.NodeInfo. +func (n *Node) UnmarshalJSON(raw []byte) error { + // TODO: How should we turn back NodeInfo into n.Node? + // Ticket: https://github.com/ethersphere/go-ethereum/issues/1177 + node := struct { + Config *adapters.NodeConfig `json:"config,omitempty"` + Up bool `json:"up"` + }{} + if err := json.Unmarshal(raw, &node); err != nil { + return err + } + + n.SetUp(node.Up) + n.Config = node.Config + return nil +} + // Conn represents a connection between two nodes in the network type Conn struct { // One is the node which initiated the connection @@ -653,10 +712,10 @@ type Conn struct { // nodesUp returns whether both nodes are currently up func (c *Conn) nodesUp() error { - if !c.one.Up { + if !c.one.Up() { return fmt.Errorf("one %v is not up", c.One) } - if !c.other.Up { + if !c.other.Up() { return fmt.Errorf("other %v is not up", c.Other) } return nil @@ -728,7 +787,7 @@ func (net *Network) snapshot(addServices []string, removeServices []string) (*Sn } for i, node := range net.Nodes { snap.Nodes[i] = NodeSnapshot{Node: *node} - if !node.Up { + if !node.Up() { continue } snapshots, err := node.Snapshots() @@ -783,7 +842,7 @@ func (net *Network) Load(snap *Snapshot) error { if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil { return err } - if !n.Node.Up { + if !n.Node.Up() { continue } if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil { @@ -855,7 +914,7 @@ func (net *Network) Load(snap *Snapshot) error { // Start connecting. for _, conn := range snap.Conns { - if !net.GetNode(conn.One).Up || !net.GetNode(conn.Other).Up { + if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() { //in this case, at least one of the nodes of a connection is not up, //so it would result in the snapshot `Load` to fail continue @@ -909,7 +968,7 @@ func (net *Network) executeControlEvent(event *Event) { } func (net *Network) executeNodeEvent(e *Event) error { - if !e.Node.Up { + if !e.Node.Up() { return net.Stop(e.Node.ID()) } diff --git a/p2p/simulations/network_test.go b/p2p/simulations/network_test.go index b7852addb..8b644ffb0 100644 --- a/p2p/simulations/network_test.go +++ b/p2p/simulations/network_test.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "reflect" "strconv" "strings" "testing" @@ -485,3 +486,137 @@ func benchmarkMinimalServiceTmp(b *testing.B) { } } } + +func TestNode_UnmarshalJSON(t *testing.T) { + t.Run( + "test unmarshal of Node up field", + func(t *testing.T) { + runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONUpField()) + }, + ) + t.Run( + "test unmarshal of Node Config field", + func(t *testing.T) { + runNodeUnmarshalJSON(t, casesNodeUnmarshalJSONConfigField()) + }, + ) +} + +func runNodeUnmarshalJSON(t *testing.T, tests []nodeUnmarshalTestCase) { + t.Helper() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got Node + if err := got.UnmarshalJSON([]byte(tt.marshaled)); err != nil { + expectErrorMessageToContain(t, err, tt.wantErr) + } + expectNodeEquality(t, got, tt.want) + }) + } +} + +type nodeUnmarshalTestCase struct { + name string + marshaled string + want Node + wantErr string +} + +func expectErrorMessageToContain(t *testing.T, got error, want string) { + t.Helper() + if got == nil && want == "" { + return + } + + if got == nil && want != "" { + t.Errorf("error was expected, got: nil, want: %v", want) + return + } + + if !strings.Contains(got.Error(), want) { + t.Errorf( + "unexpected error message, got %v, want: %v", + want, + got, + ) + } +} + +func expectNodeEquality(t *testing.T, got Node, want Node) { + t.Helper() + if !reflect.DeepEqual(got, want) { + t.Errorf("Node.UnmarshalJSON() = %v, want %v", got, want) + } +} + +func casesNodeUnmarshalJSONUpField() []nodeUnmarshalTestCase { + return []nodeUnmarshalTestCase{ + { + name: "empty json", + marshaled: "{}", + want: Node{ + up: false, + }, + }, + { + name: "a stopped node", + marshaled: "{\"up\": false}", + want: Node{ + up: false, + }, + }, + { + name: "a running node", + marshaled: "{\"up\": true}", + want: Node{ + up: true, + }, + }, + { + name: "invalid JSON value on valid key", + marshaled: "{\"up\": foo}", + wantErr: "invalid character", + }, + { + name: "invalid JSON key and value", + marshaled: "{foo: bar}", + wantErr: "invalid character", + }, + { + name: "bool value expected but got something else (string)", + marshaled: "{\"up\": \"true\"}", + wantErr: "cannot unmarshal string into Go struct", + }, + } +} + +func casesNodeUnmarshalJSONConfigField() []nodeUnmarshalTestCase { + // Don't do a big fuss around testing, as adapters.NodeConfig should + // handle it's own serialization. Just do a sanity check. + return []nodeUnmarshalTestCase{ + { + name: "Config field is omitted", + marshaled: "{}", + want: Node{ + Config: nil, + }, + }, + { + name: "Config field is nil", + marshaled: "{\"config\": nil}", + want: Node{ + Config: nil, + }, + }, + { + name: "a non default Config field", + marshaled: "{\"config\":{\"name\":\"node_ecdd0\",\"port\":44665}}", + want: Node{ + Config: &adapters.NodeConfig{ + Name: "node_ecdd0", + Port: 44665, + }, + }, + }, + } +} diff --git a/swarm/network/simulation/node.go b/swarm/network/simulation/node.go index 08eb83524..24afe51a4 100644 --- a/swarm/network/simulation/node.go +++ b/swarm/network/simulation/node.go @@ -44,7 +44,7 @@ func (s *Simulation) NodeIDs() (ids []enode.ID) { func (s *Simulation) UpNodeIDs() (ids []enode.ID) { nodes := s.Net.GetNodes() for _, node := range nodes { - if node.Up { + if node.Up() { ids = append(ids, node.ID()) } } @@ -55,7 +55,7 @@ func (s *Simulation) UpNodeIDs() (ids []enode.ID) { func (s *Simulation) DownNodeIDs() (ids []enode.ID) { nodes := s.Net.GetNodes() for _, node := range nodes { - if !node.Up { + if !node.Up() { ids = append(ids, node.ID()) } } diff --git a/swarm/network/simulation/node_test.go b/swarm/network/simulation/node_test.go index dc9189c91..bae5afb26 100644 --- a/swarm/network/simulation/node_test.go +++ b/swarm/network/simulation/node_test.go @@ -54,7 +54,7 @@ func TestUpDownNodeIDs(t *testing.T) { gotIDs = sim.UpNodeIDs() for _, id := range gotIDs { - if !sim.Net.GetNode(id).Up { + if !sim.Net.GetNode(id).Up() { t.Errorf("node %s should not be down", id) } } @@ -66,7 +66,7 @@ func TestUpDownNodeIDs(t *testing.T) { gotIDs = sim.DownNodeIDs() for _, id := range gotIDs { - if sim.Net.GetNode(id).Up { + if sim.Net.GetNode(id).Up() { t.Errorf("node %s should not be up", id) } } @@ -112,7 +112,7 @@ func TestAddNode(t *testing.T) { t.Fatal("node not found") } - if !n.Up { + if !n.Up() { t.Error("node not started") } } @@ -327,7 +327,7 @@ func TestStartStopNode(t *testing.T) { if n == nil { t.Fatal("node not found") } - if !n.Up { + if !n.Up() { t.Error("node not started") } @@ -335,26 +335,17 @@ func TestStartStopNode(t *testing.T) { if err != nil { t.Fatal(err) } - if n.Up { + if n.Up() { t.Error("node not stopped") } - // Sleep here to ensure that Network.watchPeerEvents defer function - // has set the `node.Up = false` before we start the node again. - // p2p/simulations/network.go:215 - // - // The same node is stopped and started again, and upon start - // watchPeerEvents is started in a goroutine. If the node is stopped - // and then very quickly started, that goroutine may be scheduled later - // then start and force `node.Up = false` in its defer function. - // This will make this test unreliable. - time.Sleep(time.Second) + waitForPeerEventPropagation() err = sim.StartNode(id) if err != nil { t.Fatal(err) } - if !n.Up { + if !n.Up() { t.Error("node not started") } } @@ -377,7 +368,7 @@ func TestStartStopRandomNode(t *testing.T) { if n == nil { t.Fatal("node not found") } - if n.Up { + if n.Up() { t.Error("node not stopped") } @@ -386,16 +377,7 @@ func TestStartStopRandomNode(t *testing.T) { t.Fatal(err) } - // Sleep here to ensure that Network.watchPeerEvents defer function - // has set the `node.Up = false` before we start the node again. - // p2p/simulations/network.go:215 - // - // The same node is stopped and started again, and upon start - // watchPeerEvents is started in a goroutine. If the node is stopped - // and then very quickly started, that goroutine may be scheduled later - // then start and force `node.Up = false` in its defer function. - // This will make this test unreliable. - time.Sleep(time.Second) + waitForPeerEventPropagation() idStarted, err := sim.StartRandomNode() if err != nil { @@ -426,21 +408,12 @@ func TestStartStopRandomNodes(t *testing.T) { if n == nil { t.Fatal("node not found") } - if n.Up { + if n.Up() { t.Error("node not stopped") } } - // Sleep here to ensure that Network.watchPeerEvents defer function - // has set the `node.Up = false` before we start the node again. - // p2p/simulations/network.go:215 - // - // The same node is stopped and started again, and upon start - // watchPeerEvents is started in a goroutine. If the node is stopped - // and then very quickly started, that goroutine may be scheduled later - // then start and force `node.Up = false` in its defer function. - // This will make this test unreliable. - time.Sleep(time.Second) + waitForPeerEventPropagation() ids, err = sim.StartRandomNodes(2) if err != nil { @@ -452,8 +425,20 @@ func TestStartStopRandomNodes(t *testing.T) { if n == nil { t.Fatal("node not found") } - if !n.Up { + if !n.Up() { t.Error("node not started") } } } + +func waitForPeerEventPropagation() { + // Sleep here to ensure that Network.watchPeerEvents defer function + // has set the `node.Up() = false` before we start the node again. + // + // The same node is stopped and started again, and upon start + // watchPeerEvents is started in a goroutine. If the node is stopped + // and then very quickly started, that goroutine may be scheduled later + // then start and force `node.Up() = false` in its defer function. + // This will make this test unreliable. + time.Sleep(1 * time.Second) +} diff --git a/swarm/network/simulation/service.go b/swarm/network/simulation/service.go index 7dd4dc6d8..0ac8149a9 100644 --- a/swarm/network/simulation/service.go +++ b/swarm/network/simulation/service.go @@ -52,7 +52,7 @@ func (s *Simulation) Services(name string) (services map[enode.ID]node.Service) nodes := s.Net.GetNodes() services = make(map[enode.ID]node.Service) for _, node := range nodes { - if !node.Up { + if !node.Up() { continue } simNode, ok := node.Node.(*adapters.SimNode) diff --git a/swarm/network/simulation/simulation_test.go b/swarm/network/simulation/simulation_test.go index f837f9382..1d0338f59 100644 --- a/swarm/network/simulation/simulation_test.go +++ b/swarm/network/simulation/simulation_test.go @@ -124,7 +124,7 @@ func TestClose(t *testing.T) { var upNodeCount int for _, n := range sim.Net.GetNodes() { - if n.Up { + if n.Up() { upNodeCount++ } } @@ -140,7 +140,7 @@ func TestClose(t *testing.T) { upNodeCount = 0 for _, n := range sim.Net.GetNodes() { - if n.Up { + if n.Up() { upNodeCount++ } } diff --git a/swarm/network/simulations/overlay_test.go b/swarm/network/simulations/overlay_test.go index 05d403173..41ed5ed26 100644 --- a/swarm/network/simulations/overlay_test.go +++ b/swarm/network/simulations/overlay_test.go @@ -179,7 +179,7 @@ func watchSimEvents(net *simulations.Network, ctx context.Context, trigger chan case ev := <-events: //only catch node up events if ev.Type == simulations.EventTypeNode { - if ev.Node.Up { + if ev.Node.Up() { log.Debug("got node up event", "event", ev, "node", ev.Node.Config.ID) select { case trigger <- ev.Node.Config.ID: diff --git a/swarm/state/dbstore.go b/swarm/state/dbstore.go index 147e34b23..1b541e785 100644 --- a/swarm/state/dbstore.go +++ b/swarm/state/dbstore.go @@ -88,18 +88,15 @@ func (s *DBStore) Get(key string, i interface{}) (err error) { // Put stores an object that implements Binary for a specific key. func (s *DBStore) Put(key string, i interface{}) (err error) { var bytes []byte - - marshaler, ok := i.(encoding.BinaryMarshaler) - if !ok { - if bytes, err = json.Marshal(i); err != nil { + if marshaler, ok := i.(encoding.BinaryMarshaler); ok { + if bytes, err = marshaler.MarshalBinary(); err != nil { return err } } else { - if bytes, err = marshaler.MarshalBinary(); err != nil { + if bytes, err = json.Marshal(i); err != nil { return err } } - return s.db.Put([]byte(key), bytes, nil) }