Skip to content

Commit

Permalink
call the connection gater when accepting connections and after crypto…
Browse files Browse the repository at this point in the history
… handshake (#55)
  • Loading branch information
aarshkshah1992 committed May 15, 2020
1 parent 59da2c1 commit bf33d1e
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 105 deletions.
60 changes: 60 additions & 0 deletions p2p/net/upgrader/gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package stream_test

import (
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/control"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type testGater struct {
sync.Mutex

blockAccept, blockSecured bool
}

var _ connmgr.ConnectionGater = (*testGater)(nil)

func (t *testGater) BlockAccept(block bool) {
t.Lock()
defer t.Unlock()

t.blockAccept = block
}

func (t *testGater) BlockSecured(block bool) {
t.Lock()
defer t.Unlock()

t.blockSecured = block
}

func (t *testGater) InterceptPeerDial(p peer.ID) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAddrDial(id peer.ID, multiaddr ma.Multiaddr) (allow bool) {
panic("not implemented")
}

func (t *testGater) InterceptAccept(multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockAccept
}

func (t *testGater) InterceptSecured(direction network.Direction, id peer.ID, multiaddrs network.ConnMultiaddrs) (allow bool) {
t.Lock()
defer t.Unlock()

return !t.blockSecured
}

func (t *testGater) InterceptUpgraded(conn network.Conn) (allow bool, reason control.DisconnectReason) {
panic("not implemented")
}
11 changes: 11 additions & 0 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (l *listener) handleIncoming() {
return
}

// gate the connection if applicable
if l.upgrader.ConnGater != nil && !l.upgrader.ConnGater.InterceptAccept(maconn) {
log.Debugf("gater blocked incoming connection on local addr %s from %s",
maconn.LocalMultiaddr(), maconn.RemoteMultiaddr())

if err := maconn.Close(); err != nil {
log.Warnf("failed to incoming connection rejected by gater; err: %s", err)
}
continue
}

// The go routine below calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()
Expand Down
135 changes: 46 additions & 89 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package stream_test

import (
"context"
"errors"
"io"
"net"
"sync"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/sec/insecure"
"github.com/libp2p/go-libp2p-core/transport"

mplex "github.com/libp2p/go-libp2p-mplex"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"

Expand All @@ -23,94 +18,10 @@ import (
"github.com/stretchr/testify/require"
)

// negotiatingMuxer sets up a new mplex connection
// It makes sure that this happens at the same time for client and server.
type negotiatingMuxer struct{}

func (m *negotiatingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
var err error
// run a fake muxer negotiation
if isServer {
_, err = c.Write([]byte("setup"))
} else {
_, err = c.Read(make([]byte, 5))
}
if err != nil {
return nil, err
}
return mplex.DefaultTransport.NewConn(c, isServer)
}

// blockingMuxer blocks the muxer negotiation until the contain chan is closed
type blockingMuxer struct {
unblock chan struct{}
}

var _ mux.Multiplexer = &blockingMuxer{}

func newBlockingMuxer() *blockingMuxer {
return &blockingMuxer{unblock: make(chan struct{})}
}

func (m *blockingMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
<-m.unblock
return (&negotiatingMuxer{}).NewConn(c, isServer)
}

func (m *blockingMuxer) Unblock() {
close(m.unblock)
}

// errorMuxer is a muxer that errors while setting up
type errorMuxer struct{}

var _ mux.Multiplexer = &errorMuxer{}

func (m *errorMuxer) NewConn(c net.Conn, isServer bool) (mux.MuxedConn, error) {
return nil, errors.New("mux error")
}

var (
defaultUpgrader = &st.Upgrader{
Secure: insecure.New(peer.ID(1)),
Muxer: &negotiatingMuxer{},
}
)

func init() {
transport.AcceptTimeout = 1 * time.Hour
}

func testConn(t *testing.T, clientConn, serverConn transport.CapableConn) {
t.Helper()
require := require.New(t)

cstr, err := clientConn.OpenStream()
require.NoError(err)

_, err = cstr.Write([]byte("foobar"))
require.NoError(err)

sstr, err := serverConn.AcceptStream()
require.NoError(err)

b := make([]byte, 6)
_, err = sstr.Read(b)
require.NoError(err)
require.Equal([]byte("foobar"), b)
}

func dial(t *testing.T, upgrader *st.Upgrader, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
t.Helper()

macon, err := manet.Dial(raddr)
if err != nil {
return nil, err
}

return upgrader.UpgradeOutbound(context.Background(), nil, macon, p)
}

func createListener(t *testing.T, upgrader *st.Upgrader) transport.Listener {
t.Helper()
require := require.New(t)
Expand Down Expand Up @@ -385,3 +296,49 @@ func TestAcceptQueueBacklogged(t *testing.T) {

require.Eventually(func() bool { return len(errCh) == st.AcceptQueueLength+1 }, 2*time.Second, 100*time.Millisecond)
}

func TestListenerConnectionGater(t *testing.T) {
require := require.New(t)

testGater := &testGater{}
upgrader := *defaultUpgrader
upgrader.ConnGater = testGater

ln := createListener(t, &upgrader)
defer ln.Close()

// no gating.
conn, err := dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger first.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, defaultUpgrader, ln.Multiaddr(), peer.ID(0))
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
}
38 changes: 22 additions & 16 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"net"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
ipnet "github.com/libp2p/go-libp2p-core/pnet"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p-core/transport"
"github.com/libp2p/go-libp2p-pnet"

filter "github.com/libp2p/go-maddr-filter"
manet "github.com/multiformats/go-multiaddr-net"
)

Expand All @@ -27,10 +28,10 @@ var AcceptQueueLength = 16
// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
Filters *filter.Filters
PSK ipnet.PSK
Secure sec.SecureTransport
Muxer mux.Multiplexer
ConnGater connmgr.ConnectionGater
}

// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
Expand All @@ -55,22 +56,16 @@ func (u *Upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, m
if p == "" {
return nil, ErrNilPeer
}
return u.upgrade(ctx, t, maconn, p)
return u.upgrade(ctx, t, maconn, p, network.DirOutbound)
}

// UpgradeInbound upgrades the given inbound multiaddr-net connection into a
// full libp2p-transport connection.
func (u *Upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, "")
return u.upgrade(ctx, t, maconn, "", network.DirInbound)
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID) (transport.CapableConn, error) {
if u.Filters != nil && u.Filters.AddrBlocked(maconn.RemoteMultiaddr()) {
log.Debugf("blocked connection from %s", maconn.RemoteMultiaddr())
maconn.Close()
return nil, fmt.Errorf("blocked connection from %s", maconn.RemoteMultiaddr())
}

func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, dir network.Direction) (transport.CapableConn, error) {
var conn net.Conn = maconn
if u.PSK != nil {
pconn, err := pnet.NewProtectedConn(u.PSK, conn)
Expand All @@ -91,18 +86,29 @@ func (u *Upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return nil, fmt.Errorf("failed to negotiate security protocol: %s", err)
}

// call the connection gater, if one is registered.
if u.ConnGater != nil && !u.ConnGater.InterceptSecured(dir, sconn.RemotePeer(), maconn) {
if err := maconn.Close(); err != nil {
log.Errorf("failed to close connection with peer %s and addr %s; err: %s",
p.Pretty(), maconn.RemoteMultiaddr(), err)
}
return nil, fmt.Errorf("gater rejected connection with peer %s and addr %s with direction %d",
sconn.RemotePeer().Pretty(), maconn.RemoteMultiaddr(), dir)
}

smconn, err := u.setupMuxer(ctx, sconn, p)
if err != nil {
sconn.Close()
return nil, fmt.Errorf("failed to negotiate stream multiplexer: %s", err)
}

return &transportConn{
tc := &transportConn{
MuxedConn: smconn,
ConnMultiaddrs: maconn,
ConnSecurity: sconn,
transport: t,
}, nil
}
return tc, nil
}

func (u *Upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID) (sec.SecureConn, error) {
Expand Down
Loading

0 comments on commit bf33d1e

Please sign in to comment.