Skip to content

Commit

Permalink
Merge pull request ethereum#74 from ethersphere/network-testing-frame…
Browse files Browse the repository at this point in the history
…work-peer-events

p2p/simulations: Get peer events via RPC
  • Loading branch information
zelig authored Apr 29, 2017
2 parents 8419f11 + 751384d commit 5bc8776
Show file tree
Hide file tree
Showing 14 changed files with 378 additions and 137 deletions.
10 changes: 6 additions & 4 deletions p2p/adapters/docker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package adapters

import (
"crypto/ecdsa"
"fmt"
"io"
"io/ioutil"
Expand All @@ -22,7 +23,7 @@ type DockerNode struct {

// NewDockerNode creates a new DockerNode, building the docker image if
// necessary
func NewDockerNode(id *NodeId, service string) (*DockerNode, error) {
func NewDockerNode(id *NodeId, key *ecdsa.PrivateKey, service string) (*DockerNode, error) {
if runtime.GOOS != "linux" {
return nil, fmt.Errorf("NewDockerNode can only be used on Linux as it uses the current binary (which must be a Linux binary)")
}
Expand Down Expand Up @@ -51,6 +52,7 @@ func NewDockerNode(id *NodeId, service string) (*DockerNode, error) {
ID: id,
Service: service,
Config: &conf,
key: key,
},
}
node.newCmd = node.dockerCommand
Expand All @@ -60,13 +62,13 @@ func NewDockerNode(id *NodeId, service string) (*DockerNode, error) {
// dockerCommand returns a command which exec's the binary in a docker
// container.
//
// It uses a shell so that we can pass the _P2P_NODE_CONFIG environment
// variable to the container using the --env flag.
// It uses a shell so that we can pass the _P2P_NODE_CONFIG and _P2P_NODE_KEY
// environment variables to the container using the --env flag.
func (n *DockerNode) dockerCommand() *exec.Cmd {
return exec.Command(
"sh", "-c",
fmt.Sprintf(
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" %s p2p-node %s %s`,
`exec docker run --interactive --env _P2P_NODE_CONFIG="${_P2P_NODE_CONFIG}" --env _P2P_NODE_KEY="${_P2P_NODE_KEY}" %s p2p-node %s %s`,
dockerImage, n.Service, n.ID.String(),
),
)
Expand Down
110 changes: 102 additions & 8 deletions p2p/adapters/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package adapters

import (
"context"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -16,6 +18,7 @@ import (
"time"

"github.com/docker/docker/pkg/reexec"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -55,11 +58,12 @@ type ExecNode struct {

client *rpc.Client
newCmd func() *exec.Cmd
key *ecdsa.PrivateKey
}

// NewExecNode creates a new ExecNode which will run the given service using a
// sub-directory of the given baseDir
func NewExecNode(id *NodeId, service, baseDir string) (*ExecNode, error) {
func NewExecNode(id *NodeId, key *ecdsa.PrivateKey, service, baseDir string) (*ExecNode, error) {
if _, exists := serviceFuncs[service]; !exists {
return nil, fmt.Errorf("unknown node service %q", service)
}
Expand All @@ -82,6 +86,7 @@ func NewExecNode(id *NodeId, service, baseDir string) (*ExecNode, error) {
Service: service,
Dir: dir,
Config: &conf,
key: key,
}
node.newCmd = node.execCommand
return node, nil
Expand All @@ -99,9 +104,10 @@ func (n *ExecNode) Client() (*rpc.Client, error) {
return n.client, nil
}

// Start exec's the node passing the ID and service as command line arguments
// and the node config encoded as JSON in the _P2P_NODE_CONFIG environment
// variable
// Start exec's the node passing the ID and service as command line arguments,
// the node config encoded as JSON in the _P2P_NODE_CONFIG environment
// variable and the node's private key hex-endoded in the _P2P_NODE_KEY
// environment variable
func (n *ExecNode) Start() (err error) {
if n.Cmd != nil {
return errors.New("already started")
Expand All @@ -119,6 +125,9 @@ func (n *ExecNode) Start() (err error) {
return fmt.Errorf("error generating node config: %s", err)
}

// encode the private key
key := hex.EncodeToString(crypto.FromECDSA(n.key))

// create a net.Pipe for RPC communication over stdin / stdout
pipe1, pipe2 := net.Pipe()

Expand All @@ -127,7 +136,10 @@ func (n *ExecNode) Start() (err error) {
cmd.Stdin = pipe1
cmd.Stdout = pipe1
cmd.Stderr = os.Stderr
cmd.Env = append(os.Environ(), fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf))
cmd.Env = append(os.Environ(),
fmt.Sprintf("_P2P_NODE_CONFIG=%s", conf),
fmt.Sprintf("_P2P_NODE_KEY=%s", key),
)
if err := cmd.Start(); err != nil {
return fmt.Errorf("error starting node: %s", err)
}
Expand Down Expand Up @@ -215,6 +227,17 @@ func execP2PNode() {
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
}

// decode the private key
keyEnv := os.Getenv("_P2P_NODE_KEY")
if keyEnv == "" {
log.Crit("missing _P2P_NODE_KEY")
}
key, err := hex.DecodeString(keyEnv)
if err != nil {
log.Crit("error decoding _P2P_NODE_KEY", "err", err)
}
conf.P2P.PrivateKey = crypto.ToECDSA(key)

// initialize the service
serviceFunc, exists := serviceFuncs[serviceName]
if !exists {
Expand Down Expand Up @@ -268,10 +291,22 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
if err != nil {
return nil, err
}
constructor := func(ctx *node.ServiceContext) (node.Service, error) {
return service, nil

constructor := func(s node.Service) node.ServiceConstructor {
return func(ctx *node.ServiceContext) (node.Service, error) {
return s, nil
}
}

// register the peer events API
//
// TODO: move this to node.PrivateAdminAPI once the following is merged:
// https://github.com/ethereum/go-ethereum/pull/13885
if err := stack.Register(constructor(&PeerAPI{stack.Server})); err != nil {
return nil, err
}
if err := stack.Register(constructor); err != nil {

if err := stack.Register(constructor(service)); err != nil {
return nil, err
}
if err := stack.Start(); err != nil {
Expand All @@ -280,6 +315,65 @@ func startP2PNode(conf *node.Config, service node.Service) (*node.Node, error) {
return stack, nil
}

// PeerAPI is used to expose peer events under the "eth" RPC namespace.
//
// TODO: move this to node.PrivateAdminAPI and expose under the "admin"
// namespace once the following is merged:
// https://github.com/ethereum/go-ethereum/pull/13885
type PeerAPI struct {
server func() p2p.Server
}

func (p *PeerAPI) Protocols() []p2p.Protocol {
return nil
}

func (p *PeerAPI) APIs() []rpc.API {
return []rpc.API{{
Namespace: "eth",
Version: "1.0",
Service: p,
}}
}

func (p *PeerAPI) Start(p2p.Server) error {
return nil
}

func (p *PeerAPI) Stop() error {
return nil
}

// PeerEvents creates an RPC sunscription which receives peer events from the
// underlying p2p.Server
func (p *PeerAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

rpcSub := notifier.CreateSubscription()

go func() {
events := make(chan *p2p.PeerEvent)
sub := p.server().SubscribePeers(events)
defer sub.Unsubscribe()

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-rpcSub.Err():
return
case <-notifier.Closed():
return
}
}
}()

return rpcSub, nil
}

// stdioConn wraps os.Stdin / os.Stdout with a nop Close method so we can
// use them to handle RPC messages
type stdioConn struct {
Expand Down
45 changes: 29 additions & 16 deletions p2p/adapters/inproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
Expand Down Expand Up @@ -53,13 +54,14 @@ type Network interface {

// SimNode is the network adapter that
type SimNode struct {
lock sync.RWMutex
Id *NodeId
network Network
service node.Service
peerMap map[discover.NodeID]int
peers []*Peer
client *rpc.Client
lock sync.RWMutex
Id *NodeId
network Network
service node.Service
peerMap map[discover.NodeID]int
peers []*Peer
peerFeed event.Feed
client *rpc.Client
}

func NewSimNode(id *NodeId, svc node.Service, n Network) *SimNode {
Expand Down Expand Up @@ -115,13 +117,20 @@ func (self *SimNode) startRPC() error {
return errors.New("RPC already started")
}

// add SimAdminAPI so that the network can call the AddPeer
// and RemovePeer RPC methods
apis := append(self.service.APIs(), rpc.API{
Namespace: "admin",
Version: "1.0",
Service: &SimAdminAPI{self},
})
// add SimAdminAPI and PeerAPI so that the network can call the
// AddPeer, RemovePeer and PeerEvents RPC methods
apis := append(self.service.APIs(), []rpc.API{
{
Namespace: "admin",
Version: "1.0",
Service: &SimAdminAPI{self},
},
{
Namespace: "eth",
Version: "1.0",
Service: &PeerAPI{func() p2p.Server { return self }},
},
}...)

// start the RPC handler
handler := rpc.NewServer()
Expand Down Expand Up @@ -219,6 +228,10 @@ func (self *SimNode) AddPeer(node *discover.Node) {
self.RunProtocol(na.(*SimNode), rw, rrw, peer)
}

func (self *SimNode) SubscribePeers(ch chan *p2p.PeerEvent) event.Subscription {
return self.peerFeed.Subscribe(ch)
}

func (self *SimNode) PeerCount() int {
self.lock.Lock()
defer self.lock.Unlock()
Expand All @@ -243,13 +256,13 @@ func (self *SimNode) RunProtocol(node *SimNode, rw, rrw p2p.MsgReadWriter, peer
log.Trace(fmt.Sprintf("protocol starting on peer %v (connection with %v)", self.Id, id))
p := p2p.NewPeer(id.NodeID, id.Label(), []p2p.Cap{})
go func() {
self.network.DidConnect(self.Id, id)
self.peerFeed.Send(&p2p.PeerEvent{Type: p2p.PeerEventTypeAdd, Peer: id.NodeID})
err := protocol.Run(p, rw)
<-peer.Readyc
self.RemovePeer(node.Node())
peer.Errc <- err
log.Trace(fmt.Sprintf("protocol quit on peer %v (connection with %v broken: %v)", self.Id, id, err))
self.network.DidDisconnect(self.Id, id)
self.peerFeed.Send(&p2p.PeerEvent{Type: p2p.PeerEventTypeDrop, Peer: id.NodeID})
}()
}

Expand Down
20 changes: 0 additions & 20 deletions p2p/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package adapters

import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/rpc"
Expand Down Expand Up @@ -79,22 +78,3 @@ type Reporter interface {
DidConnect(*NodeId, *NodeId) error
DidDisconnect(*NodeId, *NodeId) error
}

func RandomNodeId() *NodeId {
key, err := crypto.GenerateKey()
if err != nil {
panic("unable to generate key")
}
var id discover.NodeID
pubkey := crypto.FromECDSAPub(&key.PublicKey)
copy(id[:], pubkey[1:])
return &NodeId{id}
}

func RandomNodeIds(n int) []*NodeId {
var ids []*NodeId
for i := 0; i < n; i++ {
ids = append(ids, RandomNodeId())
}
return ids
}
20 changes: 20 additions & 0 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@ type protoHandshake struct {
Rest []rlp.RawValue `rlp:"tail"`
}

// PeerEventType is the type of peer events emitted by a p2p.Server
type PeerEventType string

const (
// PeerEventTypeAdd is the type of event emitted when a peer is added
// to a p2p.Server
PeerEventTypeAdd PeerEventType = "add"

// PeerEventTypeDrop is the type of event emitted when a peer is
// dropped from a p2p.Server
PeerEventTypeDrop PeerEventType = "drop"
)

// PeerEvent is an event emitted when peers are either added or dropped from
// a p2p.Server
type PeerEvent struct {
Type PeerEventType
Peer discover.NodeID
}

// Peer represents a connected remote node.
type Peer struct {
rw *conn
Expand Down
5 changes: 3 additions & 2 deletions p2p/protocols/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/adapters"
"github.com/ethereum/go-ethereum/p2p/simulations"
p2ptest "github.com/ethereum/go-ethereum/p2p/testing"
)

Expand Down Expand Up @@ -117,8 +118,8 @@ func newProtocol(pp *p2ptest.TestPeerPool) adapters.ProtoCall {
}

func protocolTester(t *testing.T, pp *p2ptest.TestPeerPool) *p2ptest.ProtocolTester {
id := adapters.RandomNodeId()
return p2ptest.NewProtocolTester(t, id, 2, newProtocol(pp))
conf := simulations.RandomNodeConfig()
return p2ptest.NewProtocolTester(t, conf.Id, 2, newProtocol(pp))
}

func protoHandshakeExchange(id *adapters.NodeId, proto *protoHandshake) []p2ptest.Exchange {
Expand Down
Loading

0 comments on commit 5bc8776

Please sign in to comment.