Skip to content

Commit

Permalink
rpc: Add admin_addTrustedPeer and admin_removeTrustedPeer. (ethereum#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi authored and wanwiset25 committed Oct 10, 2024
1 parent 00bbd97 commit 9799f86
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 10 deletions.
10 changes: 10 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ web3._extend({
call: 'admin_removePeer',
params: 1
}),
new web3._extend.Method({
name: 'addTrustedPeer',
call: 'admin_addTrustedPeer',
params: 1
}),
new web3._extend.Method({
name: 'removeTrustedPeer',
call: 'admin_removeTrustedPeer',
params: 1
}),
new web3._extend.Method({
name: 'exportChain',
call: 'admin_exportChain',
Expand Down
33 changes: 32 additions & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) {
return true, nil
}

// RemovePeer disconnects from a a remote node if the connection exists
// RemovePeer disconnects from a remote node if the connection exists
func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
Expand All @@ -76,6 +76,37 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil
}

// AddTrustedPeer allows a remote node to always connect, even if slots are full
func (api *PrivateAdminAPI) AddTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.AddTrustedPeer(node)
return true, nil
}

// RemoveTrustedPeer removes a remote node from the trusted peer set, but it
// does not disconnect it automatically.
func (api *PrivateAdminAPI) RemoveTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.RemoveTrustedPeer(node)
return true, nil
}

// PeerEvents creates an RPC subscription which receives peer events from the
// node's p2p.Server
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (p *Peer) String() string {

// Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool {
return p.rw.flags&inboundConn != 0
return p.rw.is(inboundConn)
}

func newPeer(conn *conn, protocols []Protocol) *Peer {
Expand Down
64 changes: 58 additions & 6 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"net"
"sync"
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/common"
Expand Down Expand Up @@ -168,6 +169,8 @@ type Server struct {
quit chan struct{}
addstatic chan *discover.Node
removestatic chan *discover.Node
addtrusted chan *discover.Node
removetrusted chan *discover.Node
posthandshake chan *conn
addpeer chan *conn
delpeer chan peerDrop
Expand All @@ -184,7 +187,7 @@ type peerDrop struct {
requested bool // true if signaled by the peer
}

type connFlag int
type connFlag int32

const (
dynDialedConn connFlag = 1 << iota
Expand Down Expand Up @@ -249,7 +252,18 @@ func (f connFlag) String() string {
}

func (c *conn) is(f connFlag) bool {
return c.flags&f != 0
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
return flags&f != 0
}

func (c *conn) set(f connFlag, val bool) {
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
if val {
flags |= f
} else {
flags &= ^f
}
atomic.StoreInt32((*int32)(&c.flags), int32(flags))
}

// Peers returns all connected peers.
Expand Down Expand Up @@ -300,6 +314,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
}
}

// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *discover.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}

// RemoveTrustedPeer removes the given node from the trusted peer set.
func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}

// SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch)
Expand Down Expand Up @@ -410,6 +441,8 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node)
srv.addtrusted = make(chan *discover.Node)
srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{})

Expand Down Expand Up @@ -546,8 +579,7 @@ func (srv *Server) run(dialstate dialer) {
queuedTasks []task // tasks that can't run yet
)
// Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be
// modified while the server is running.
// Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
for _, n := range srv.TrustedNodes {
trusted[n.ID] = true
}
Expand Down Expand Up @@ -599,12 +631,32 @@ running:
case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the
// stop keeping the node connected
srv.log.Debug("Removing static node", "node", n)
// stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested)
}
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID] = true
// Mark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID]; ok {
delete(trusted, n.ID)
}
// Unmark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount.
op(peers)
Expand Down
110 changes: 108 additions & 2 deletions p2p/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {

// tell the server to connect
tcpAddr := listener.Addr().(*net.TCPAddr)
srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)})
node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
srv.AddPeer(node)

select {
case conn := <-accepted:
Expand All @@ -169,6 +170,29 @@ func TestServerDial(t *testing.T) {
if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
}

// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
}
srv.RemoveTrustedPeer(node)
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
done <- true
}()
// Trigger potential race conditions
peer = srv.Peers()[0]
_ = peer.Inbound()
_ = peer.Info()
<-done
case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second")
}
Expand Down Expand Up @@ -365,7 +389,8 @@ func TestServerAtCap(t *testing.T) {
}
}
// Try inserting a non-trusted connection.
c := newconn(randomID())
anotherID := randomID()
c := newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}
Expand All @@ -378,6 +403,87 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag")
}

// Remove from trusted set and try again
srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}

// Add anotherID to trusted set and try again
srv.AddTrustedPeer(&discover.Node{ID: anotherID})
c = newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
}
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
}

func TestServerPeerLimits(t *testing.T) {
srvkey := newkey()

clientid := randomID()
clientnode := &discover.Node{ID: clientid}

var tp *setupTransport = &setupTransport{
id: clientid,
phs: &protoHandshake{
ID: clientid,
// Force "DiscUselessPeer" due to unmatching caps
// Caps: []Cap{discard.cap()},
},
}
var flags connFlag = dynDialedConn
var dialDest *discover.Node = &discover.Node{ID: clientid}

srv := &Server{
Config: Config{
PrivateKey: srvkey,
MaxPeers: 0,
NoDial: true,
Protocols: []Protocol{discard},
},
newTransport: func(fd net.Conn) transport { return tp },
log: log.New(),
}
if err := srv.Start(); err != nil {
t.Fatalf("couldn't start server: %v", err)
}
defer srv.Stop()

// Check that server is full (MaxPeers=0)
conn, _ := net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()

srv.AddTrustedPeer(clientnode)

// Check that server allows a trusted peer despite being full.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr == DiscTooManyPeers {
t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
}

if tp.closeErr != DiscUselessPeer {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()

srv.RemoveTrustedPeer(clientnode)

// Check that server is full again.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()
}

func TestServerSetupConn(t *testing.T) {
Expand Down

0 comments on commit 9799f86

Please sign in to comment.