Skip to content

Commit

Permalink
rate limiting and selectivity of autonat svc
Browse files Browse the repository at this point in the history
* limits addresses for a peer (at most 4 chosen) - fix #39
* clears addresses before dialing back - fix #38
* global rate limit of 30 responses per (1 - 1.25 min) - fix #36
* only dial back on the source IP - fix #32
  • Loading branch information
willscott committed Feb 13, 2020
1 parent 9ad01ee commit b952c16
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
59 changes: 50 additions & 9 deletions p2p/host/autonat/svc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package autonat

import (
"bytes"
"context"
"math/rand"
"net"
"sync"
"time"

Expand All @@ -24,8 +27,11 @@ const P_CIRCUIT = 290
var (
AutoNATServiceDialTimeout = 15 * time.Second
AutoNATServiceResetInterval = 1 * time.Minute
AutoNATServiceResetJitter = 15 * time.Second

AutoNATServiceThrottle = 3
AutoNATServiceThrottle = 3
AutoNATGlobalThrottle = 30
AutoNATMaxPeerAddresses = 4
)

// AutoNATService provides NAT autodetection services to other peers
Expand All @@ -34,8 +40,9 @@ type AutoNATService struct {
dialer host.Host

// rate limiter
mx sync.Mutex
reqs map[peer.ID]int
mx sync.Mutex
reqs map[peer.ID]int
globalReqs int
}

// NewAutoNATService creates a new AutoNATService instance attached to a host
Expand Down Expand Up @@ -96,6 +103,21 @@ func (as *AutoNATService) handleStream(s network.Stream) {
}
}

// Optimistically extract the net.IP host from a multiaddress.
func addrToIP(addr ma.Multiaddr) net.IP {
if v4, err := addr.ValueForProtocol(ma.P_IP4); err == nil {
if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP4).Name, v4); err == nil {
return net.IP(c.RawValue())
}
}
if v6, err := addr.ValueForProtocol(ma.P_IP6); err == nil {
if c, err := ma.NewComponent(ma.ProtocolWithCode(ma.P_IP6).Name, v6); err == nil {
return net.IP(c.RawValue())
}
}
return nil
}

func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse {
if mpi == nil {
return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info")
Expand All @@ -113,13 +135,15 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me
}
}

addrs := make([]ma.Multiaddr, 0)
addrs := make([]ma.Multiaddr, 0, AutoNATMaxPeerAddresses)
seen := make(map[string]struct{})

// add observed addr to the list of addresses to dial
var obsHost net.IP
if !as.skipDial(obsaddr) {
addrs = append(addrs, obsaddr)
seen[obsaddr.String()] = struct{}{}
obsHost = addrToIP(obsaddr)
}

for _, maddr := range mpi.GetAddrs() {
Expand All @@ -129,10 +153,22 @@ func (as *AutoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Me
continue
}

if len(addrs) >= AutoNATMaxPeerAddresses {
continue
}

if as.skipDial(addr) {
continue
}

if err != nil {
log.Debugf("Unexpected public, non-IP multiaddr: %s", err)
continue
}
if !bytes.Equal(obsHost, addrToIP(addr)) {
continue
}

str := addr.String()
_, ok := seen[str]
if ok {
Expand Down Expand Up @@ -169,16 +205,19 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
// rate limit check
as.mx.Lock()
count := as.reqs[pi.ID]
if count >= AutoNATServiceThrottle {
if count >= AutoNATServiceThrottle || as.globalReqs >= AutoNATGlobalThrottle {
as.mx.Unlock()
return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials")
}
as.reqs[pi.ID] = count + 1
as.globalReqs++
as.mx.Unlock()

ctx, cancel := context.WithTimeout(as.ctx, AutoNATServiceDialTimeout)
defer cancel()

as.dialer.Peerstore().ClearAddrs(pi.ID)

err := as.dialer.Connect(ctx, pi)
if err != nil {
log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error())
Expand All @@ -200,16 +239,18 @@ func (as *AutoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse {
}

func (as *AutoNATService) resetRateLimiter() {
ticker := time.NewTicker(AutoNATServiceResetInterval)
defer ticker.Stop()
timer := time.NewTimer(AutoNATServiceResetInterval)
defer timer.Stop()

for {
select {
case <-ticker.C:
case <-timer.C:
as.mx.Lock()
as.reqs = make(map[peer.ID]int)
as.globalReqs = 0
as.mx.Unlock()

jitter := rand.Float32() * float32(AutoNATServiceResetJitter)
timer.Reset(AutoNATServiceResetInterval + time.Duration(int64(jitter)))
case <-as.ctx.Done():
return
}
Expand Down
30 changes: 30 additions & 0 deletions p2p/host/autonat/svc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"

autonat "github.com/libp2p/go-libp2p-autonat"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

Expand Down Expand Up @@ -101,6 +102,8 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
AutoNATServiceThrottle = 1
save4 := manet.Private4
manet.Private4 = []*net.IPNet{}
save5 := AutoNATServiceResetJitter
AutoNATServiceResetJitter = 0 * time.Second

hs, _ := makeAutoNATService(ctx, t)
hc, ac := makeAutoNATClient(ctx, t)
Expand Down Expand Up @@ -131,4 +134,31 @@ func TestAutoNATServiceDialRateLimiter(t *testing.T) {
AutoNATServiceResetInterval = save2
AutoNATServiceThrottle = save3
manet.Private4 = save4
AutoNATServiceResetJitter = save5
}

func TestAddrToIP(t *testing.T) {
addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/0")
if !addrToIP(addr).Equal(net.IPv4(127, 0, 0, 1)) {
t.Fatal("addrToIP of ipv4 localhost incorrect!")
}

addr, _ = ma.NewMultiaddr("/ip4/192.168.0.1/tcp/6")
if !addrToIP(addr).Equal(net.IPv4(192, 168, 0, 1)) {
t.Fatal("addrToIP of ipv4 incorrect!")
}

addr, _ = ma.NewMultiaddr("/ip6/::ffff:127.0.0.1/tcp/111")
if !addrToIP(addr).Equal(net.ParseIP("::ffff:127.0.0.1")) {
t.Fatal("addrToIP of ipv6 incorrect!")
}
addr, _ = ma.NewMultiaddr("/ip6zone/eth0/ip6/fe80::1")
if !addrToIP(addr).Equal(net.ParseIP("fe80::1")) {
t.Fatal("addrToIP of ip6zone incorrect!")
}

addr, _ = ma.NewMultiaddr("/unix/a/b/c/d")
if addrToIP(addr) != nil {
t.Fatal("invalid addrToIP populates")
}
}

0 comments on commit b952c16

Please sign in to comment.