diff --git a/config/config.go b/config/config.go index 47459b8b27..80157b307b 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-peerstore/pstoremem" + "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" routed "github.com/libp2p/go-libp2p/p2p/host/routed" @@ -24,7 +25,6 @@ import ( relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" - autonat "github.com/libp2p/go-libp2p-autonat" blankhost "github.com/libp2p/go-libp2p-blankhost" discovery "github.com/libp2p/go-libp2p-discovery" swarm "github.com/libp2p/go-libp2p-swarm" diff --git a/examples/go.sum b/examples/go.sum index 8e5faba875..82a1f75a80 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -433,8 +433,6 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo= github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E= github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= -github.com/libp2p/go-libp2p-autonat v0.7.0 h1:rCP5s+A2dlhM1Xd66wurE0k7S7pPmM0D+FlqqSBXxks= -github.com/libp2p/go-libp2p-autonat v0.7.0/go.mod h1:uPvPn6J7cN+LCfFwW5tpOYvAz5NvPTc4iBamTV/WDMg= github.com/libp2p/go-libp2p-blankhost v0.2.0/go.mod h1:eduNKXGTioTuQAUcZ5epXi9vMl+t4d8ugUBRQ4SqaNQ= github.com/libp2p/go-libp2p-blankhost v0.3.0 h1:kTnLArltMabZlzY63pgGDA4kkUcLkBFSM98zBssn/IY= github.com/libp2p/go-libp2p-blankhost v0.3.0/go.mod h1:urPC+7U01nCGgJ3ZsV8jdwTp6Ji9ID0dMTvq+aJ+nZU= diff --git a/examples/ipfs-camp-2019/go.sum b/examples/ipfs-camp-2019/go.sum index 6a3123d2ac..d20014cda3 100644 --- a/examples/ipfs-camp-2019/go.sum +++ b/examples/ipfs-camp-2019/go.sum @@ -435,8 +435,6 @@ github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS github.com/libp2p/go-libp2p-asn-util v0.0.0-20200825225859-85005c6cf052/go.mod h1:nRMRTab+kZuk0LnKZpxhOVH/ndsdr2Nr//Zltc/vwgo= github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E= github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= -github.com/libp2p/go-libp2p-autonat v0.7.0 h1:rCP5s+A2dlhM1Xd66wurE0k7S7pPmM0D+FlqqSBXxks= -github.com/libp2p/go-libp2p-autonat v0.7.0/go.mod h1:uPvPn6J7cN+LCfFwW5tpOYvAz5NvPTc4iBamTV/WDMg= github.com/libp2p/go-libp2p-blankhost v0.2.0/go.mod h1:eduNKXGTioTuQAUcZ5epXi9vMl+t4d8ugUBRQ4SqaNQ= github.com/libp2p/go-libp2p-blankhost v0.3.0 h1:kTnLArltMabZlzY63pgGDA4kkUcLkBFSM98zBssn/IY= github.com/libp2p/go-libp2p-blankhost v0.3.0/go.mod h1:urPC+7U01nCGgJ3ZsV8jdwTp6Ji9ID0dMTvq+aJ+nZU= diff --git a/examples/pubsub/chat/go.sum b/examples/pubsub/chat/go.sum index 360c46adcd..e0e2964be6 100644 --- a/examples/pubsub/chat/go.sum +++ b/examples/pubsub/chat/go.sum @@ -412,8 +412,6 @@ github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E= github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= -github.com/libp2p/go-libp2p-autonat v0.7.0 h1:rCP5s+A2dlhM1Xd66wurE0k7S7pPmM0D+FlqqSBXxks= -github.com/libp2p/go-libp2p-autonat v0.7.0/go.mod h1:uPvPn6J7cN+LCfFwW5tpOYvAz5NvPTc4iBamTV/WDMg= github.com/libp2p/go-libp2p-blankhost v0.2.0/go.mod h1:eduNKXGTioTuQAUcZ5epXi9vMl+t4d8ugUBRQ4SqaNQ= github.com/libp2p/go-libp2p-blankhost v0.3.0 h1:kTnLArltMabZlzY63pgGDA4kkUcLkBFSM98zBssn/IY= github.com/libp2p/go-libp2p-blankhost v0.3.0/go.mod h1:urPC+7U01nCGgJ3ZsV8jdwTp6Ji9ID0dMTvq+aJ+nZU= diff --git a/go.mod b/go.mod index 0720c2fe38..8774c14383 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/libp2p/go-conn-security-multistream v0.3.0 github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-libp2p-asn-util v0.1.0 - github.com/libp2p/go-libp2p-autonat v0.7.0 github.com/libp2p/go-libp2p-blankhost v0.3.0 github.com/libp2p/go-libp2p-circuit v0.4.0 github.com/libp2p/go-libp2p-core v0.13.0 diff --git a/go.sum b/go.sum index ffe215fdd1..3cea6c6c54 100644 --- a/go.sum +++ b/go.sum @@ -406,8 +406,6 @@ github.com/libp2p/go-flow-metrics v0.0.3 h1:8tAs/hSdNvUiLgtlSy3mxwxWP4I9y/jlkPFT github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= github.com/libp2p/go-libp2p-asn-util v0.1.0 h1:rABPCO77SjdbJ/eJ/ynIo8vWICy1VEnL5JAxJbQLo1E= github.com/libp2p/go-libp2p-asn-util v0.1.0/go.mod h1:wu+AnM9Ii2KgO5jMmS1rz9dvzTdj8BXqsPR9HR0XB7I= -github.com/libp2p/go-libp2p-autonat v0.7.0 h1:rCP5s+A2dlhM1Xd66wurE0k7S7pPmM0D+FlqqSBXxks= -github.com/libp2p/go-libp2p-autonat v0.7.0/go.mod h1:uPvPn6J7cN+LCfFwW5tpOYvAz5NvPTc4iBamTV/WDMg= github.com/libp2p/go-libp2p-blankhost v0.2.0/go.mod h1:eduNKXGTioTuQAUcZ5epXi9vMl+t4d8ugUBRQ4SqaNQ= github.com/libp2p/go-libp2p-blankhost v0.3.0 h1:kTnLArltMabZlzY63pgGDA4kkUcLkBFSM98zBssn/IY= github.com/libp2p/go-libp2p-blankhost v0.3.0/go.mod h1:urPC+7U01nCGgJ3ZsV8jdwTp6Ji9ID0dMTvq+aJ+nZU= diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go new file mode 100644 index 0000000000..3439e49b58 --- /dev/null +++ b/p2p/host/autonat/autonat.go @@ -0,0 +1,468 @@ +package autonat + +import ( + "context" + "errors" + "math/rand" + "sync/atomic" + "time" + + "github.com/libp2p/go-eventbus" + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + logging "github.com/ipfs/go-log/v2" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var log = logging.Logger("autonat") + +// AmbientAutoNAT is the implementation of ambient NAT autodiscovery +type AmbientAutoNAT struct { + host host.Host + + *config + + ctx context.Context + ctxCancel context.CancelFunc // is closed when Close is called + backgroundRunning chan struct{} // is closed when the background go routine exits + + inboundConn chan network.Conn + observations chan autoNATResult + // status is an autoNATResult reflecting current status. + status atomic.Value + // Reflects the confidence on of the NATStatus being private, as a single + // dialback may fail for reasons unrelated to NAT. + // If it is <3, then multiple autoNAT peers may be contacted for dialback + // If only a single autoNAT peer is known, then the confidence increases + // for each failure until it reaches 3. + confidence int + lastInbound time.Time + lastProbeTry time.Time + lastProbe time.Time + recentProbes map[peer.ID]time.Time + + service *autoNATService + + emitReachabilityChanged event.Emitter + subscriber event.Subscription +} + +// StaticAutoNAT is a simple AutoNAT implementation when a single NAT status is desired. +type StaticAutoNAT struct { + host host.Host + reachability network.Reachability + service *autoNATService +} + +type autoNATResult struct { + network.Reachability + address ma.Multiaddr +} + +// New creates a new NAT autodiscovery system attached to a host +func New(h host.Host, options ...Option) (AutoNAT, error) { + var err error + conf := new(config) + conf.host = h + conf.dialPolicy.host = h + + if err = defaults(conf); err != nil { + return nil, err + } + if conf.addressFunc == nil { + conf.addressFunc = h.Addrs + } + + for _, o := range options { + if err = o(conf); err != nil { + return nil, err + } + } + emitReachabilityChanged, _ := h.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful) + + var service *autoNATService + if (!conf.forceReachability || conf.reachability == network.ReachabilityPublic) && conf.dialer != nil { + service, err = newAutoNATService(conf) + if err != nil { + return nil, err + } + service.Enable() + } + + if conf.forceReachability { + emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: conf.reachability}) + + return &StaticAutoNAT{ + host: h, + reachability: conf.reachability, + service: service, + }, nil + } + + ctx, cancel := context.WithCancel(context.Background()) + as := &AmbientAutoNAT{ + ctx: ctx, + ctxCancel: cancel, + backgroundRunning: make(chan struct{}), + host: h, + config: conf, + inboundConn: make(chan network.Conn, 5), + observations: make(chan autoNATResult, 1), + + emitReachabilityChanged: emitReachabilityChanged, + service: service, + recentProbes: make(map[peer.ID]time.Time), + } + as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) + + subscriber, err := as.host.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerIdentificationCompleted)}) + if err != nil { + return nil, err + } + as.subscriber = subscriber + + h.Network().Notify(as) + go as.background() + + return as, nil +} + +// Status returns the AutoNAT observed reachability status. +func (as *AmbientAutoNAT) Status() network.Reachability { + s := as.status.Load().(autoNATResult) + return s.Reachability +} + +func (as *AmbientAutoNAT) emitStatus() { + status := as.status.Load().(autoNATResult) + as.emitReachabilityChanged.Emit(event.EvtLocalReachabilityChanged{Reachability: status.Reachability}) +} + +// PublicAddr returns the publicly connectable Multiaddr of this node if one is known. +func (as *AmbientAutoNAT) PublicAddr() (ma.Multiaddr, error) { + s := as.status.Load().(autoNATResult) + if s.Reachability != network.ReachabilityPublic { + return nil, errors.New("NAT status is not public") + } + + return s.address, nil +} + +func ipInList(candidate ma.Multiaddr, list []ma.Multiaddr) bool { + candidateIP, _ := manet.ToIP(candidate) + for _, i := range list { + if ip, err := manet.ToIP(i); err == nil && ip.Equal(candidateIP) { + return true + } + } + return false +} + +func (as *AmbientAutoNAT) background() { + defer close(as.backgroundRunning) + // wait a bit for the node to come online and establish some connections + // before starting autodetection + delay := as.config.bootDelay + + var lastAddrUpdated time.Time + subChan := as.subscriber.Out() + defer as.subscriber.Close() + defer as.emitReachabilityChanged.Close() + + timer := time.NewTimer(delay) + defer timer.Stop() + timerRunning := true + for { + select { + // new inbound connection. + case conn := <-as.inboundConn: + localAddrs := as.host.Addrs() + ca := as.status.Load().(autoNATResult) + if ca.address != nil { + localAddrs = append(localAddrs, ca.address) + } + if manet.IsPublicAddr(conn.RemoteMultiaddr()) && + !ipInList(conn.RemoteMultiaddr(), localAddrs) { + as.lastInbound = time.Now() + } + + case e := <-subChan: + switch e := e.(type) { + case event.EvtLocalAddressesUpdated: + if !lastAddrUpdated.Add(time.Second).After(time.Now()) { + lastAddrUpdated = time.Now() + if as.confidence > 1 { + as.confidence-- + } + } + case event.EvtPeerIdentificationCompleted: + if s, err := as.host.Peerstore().SupportsProtocols(e.Peer, AutoNATProto); err == nil && len(s) > 0 { + currentStatus := as.status.Load().(autoNATResult) + if currentStatus.Reachability == network.ReachabilityUnknown { + as.tryProbe(e.Peer) + } + } + default: + log.Errorf("unknown event type: %T", e) + } + + // probe finished. + case result, ok := <-as.observations: + if !ok { + return + } + as.recordObservation(result) + case <-timer.C: + peer := as.getPeerToProbe() + as.tryProbe(peer) + timerRunning = false + case <-as.ctx.Done(): + return + } + + // Drain the timer channel if it hasn't fired in preparation for Resetting it. + if timerRunning && !timer.Stop() { + <-timer.C + } + timer.Reset(as.scheduleProbe()) + timerRunning = true + } +} + +func (as *AmbientAutoNAT) cleanupRecentProbes() { + fixedNow := time.Now() + for k, v := range as.recentProbes { + if fixedNow.Sub(v) > as.throttlePeerPeriod { + delete(as.recentProbes, k) + } + } +} + +// scheduleProbe calculates when the next probe should be scheduled for. +func (as *AmbientAutoNAT) scheduleProbe() time.Duration { + // Our baseline is a probe every 'AutoNATRefreshInterval' + // This is modulated by: + // * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval' + // * recent inbound connections (implying continued connectivity) should decrease the retry when public + // * recent inbound connections when not public mean we should try more actively to see if we're public. + fixedNow := time.Now() + currentStatus := as.status.Load().(autoNATResult) + + nextProbe := fixedNow + // Don't look for peers in the peer store more than once per second. + if !as.lastProbeTry.IsZero() { + backoff := as.lastProbeTry.Add(time.Second) + if backoff.After(nextProbe) { + nextProbe = backoff + } + } + if !as.lastProbe.IsZero() { + untilNext := as.config.refreshInterval + if currentStatus.Reachability == network.ReachabilityUnknown { + untilNext = as.config.retryInterval + } else if as.confidence < 3 { + untilNext = as.config.retryInterval + } else if currentStatus.Reachability == network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { + untilNext *= 2 + } else if currentStatus.Reachability != network.ReachabilityPublic && as.lastInbound.After(as.lastProbe) { + untilNext /= 5 + } + + if as.lastProbe.Add(untilNext).After(nextProbe) { + nextProbe = as.lastProbe.Add(untilNext) + } + } + + return nextProbe.Sub(fixedNow) +} + +// Update the current status based on an observed result. +func (as *AmbientAutoNAT) recordObservation(observation autoNATResult) { + currentStatus := as.status.Load().(autoNATResult) + if observation.Reachability == network.ReachabilityPublic { + log.Debugf("NAT status is public") + changed := false + if currentStatus.Reachability != network.ReachabilityPublic { + // we are flipping our NATStatus, so confidence drops to 0 + as.confidence = 0 + if as.service != nil { + as.service.Enable() + } + changed = true + } else if as.confidence < 3 { + as.confidence++ + } + if observation.address != nil { + if !changed && currentStatus.address != nil && !observation.address.Equal(currentStatus.address) { + as.confidence-- + } + if currentStatus.address == nil || !observation.address.Equal(currentStatus.address) { + changed = true + } + as.status.Store(observation) + } + if observation.address != nil && changed { + as.emitStatus() + } + } else if observation.Reachability == network.ReachabilityPrivate { + log.Debugf("NAT status is private") + if currentStatus.Reachability == network.ReachabilityPublic { + if as.confidence > 0 { + as.confidence-- + } else { + // we are flipping our NATStatus, so confidence drops to 0 + as.confidence = 0 + as.status.Store(observation) + if as.service != nil { + as.service.Disable() + } + as.emitStatus() + } + } else if as.confidence < 3 { + as.confidence++ + as.status.Store(observation) + if currentStatus.Reachability != network.ReachabilityPrivate { + as.emitStatus() + } + } + } else if as.confidence > 0 { + // don't just flip to unknown, reduce confidence first + as.confidence-- + } else { + log.Debugf("NAT status is unknown") + as.status.Store(autoNATResult{network.ReachabilityUnknown, nil}) + if currentStatus.Reachability != network.ReachabilityUnknown { + if as.service != nil { + as.service.Enable() + } + as.emitStatus() + } + } +} + +func (as *AmbientAutoNAT) tryProbe(p peer.ID) bool { + as.lastProbeTry = time.Now() + if p.Validate() != nil { + return false + } + + if lastTime, ok := as.recentProbes[p]; ok { + if time.Since(lastTime) < as.throttlePeerPeriod { + return false + } + } + as.cleanupRecentProbes() + + info := as.host.Peerstore().PeerInfo(p) + + if !as.config.dialPolicy.skipPeer(info.Addrs) { + as.recentProbes[p] = time.Now() + as.lastProbe = time.Now() + go as.probe(&info) + return true + } + return false +} + +func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) { + cli := NewAutoNATClient(as.host, as.config.addressFunc) + ctx, cancel := context.WithTimeout(as.ctx, as.config.requestTimeout) + defer cancel() + + a, err := cli.DialBack(ctx, pi.ID) + + var result autoNATResult + switch { + case err == nil: + log.Debugf("Dialback through %s successful; public address is %s", pi.ID.Pretty(), a.String()) + result.Reachability = network.ReachabilityPublic + result.address = a + case IsDialError(err): + log.Debugf("Dialback through %s failed", pi.ID.Pretty()) + result.Reachability = network.ReachabilityPrivate + default: + result.Reachability = network.ReachabilityUnknown + } + + select { + case as.observations <- result: + case <-as.ctx.Done(): + return + } +} + +func (as *AmbientAutoNAT) getPeerToProbe() peer.ID { + peers := as.host.Network().Peers() + if len(peers) == 0 { + return "" + } + + candidates := make([]peer.ID, 0, len(peers)) + + for _, p := range peers { + info := as.host.Peerstore().PeerInfo(p) + // Exclude peers which don't support the autonat protocol. + if proto, err := as.host.Peerstore().SupportsProtocols(p, AutoNATProto); len(proto) == 0 || err != nil { + continue + } + + // Exclude peers in backoff. + if lastTime, ok := as.recentProbes[p]; ok { + if time.Since(lastTime) < as.throttlePeerPeriod { + continue + } + } + + if as.config.dialPolicy.skipPeer(info.Addrs) { + continue + } + candidates = append(candidates, p) + } + + if len(candidates) == 0 { + return "" + } + + shufflePeers(candidates) + return candidates[0] +} + +func (as *AmbientAutoNAT) Close() error { + as.ctxCancel() + if as.service != nil { + as.service.Disable() + } + <-as.backgroundRunning + return nil +} + +func shufflePeers(peers []peer.ID) { + for i := range peers { + j := rand.Intn(i + 1) + peers[i], peers[j] = peers[j], peers[i] + } +} + +// Status returns the AutoNAT observed reachability status. +func (s *StaticAutoNAT) Status() network.Reachability { + return s.reachability +} + +// PublicAddr returns the publicly connectable Multiaddr of this node if one is known. +func (s *StaticAutoNAT) PublicAddr() (ma.Multiaddr, error) { + if s.reachability != network.ReachabilityPublic { + return nil, errors.New("NAT status is not public") + } + return nil, errors.New("no available address") +} + +func (s *StaticAutoNAT) Close() error { + if s.service != nil { + s.service.Disable() + } + return nil +} diff --git a/p2p/host/autonat/autonat_test.go b/p2p/host/autonat/autonat_test.go new file mode 100644 index 0000000000..bbf0a8c7cb --- /dev/null +++ b/p2p/host/autonat/autonat_test.go @@ -0,0 +1,297 @@ +package autonat + +import ( + "context" + "testing" + "time" + + pb "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + bhost "github.com/libp2p/go-libp2p-blankhost" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + "github.com/libp2p/go-msgio/protoio" + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +// these are mock service implementations for testing +func makeAutoNATServicePrivate(t *testing.T) host.Host { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + h.SetStreamHandler(AutoNATProto, sayPrivateStreamHandler(t)) + return h +} + +func sayPrivateStreamHandler(t *testing.T) network.StreamHandler { + return func(s network.Stream) { + defer s.Close() + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + if err := r.ReadMsg(&pb.Message{}); err != nil { + t.Error(err) + return + } + w := protoio.NewDelimitedWriter(s) + res := pb.Message{ + Type: pb.Message_DIAL_RESPONSE.Enum(), + DialResponse: newDialResponseError(pb.Message_E_DIAL_ERROR, "no dialable addresses"), + } + w.WriteMsg(&res) + } +} + +func makeAutoNATServicePublic(t *testing.T) host.Host { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + h.SetStreamHandler(AutoNATProto, func(s network.Stream) { + defer s.Close() + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + if err := r.ReadMsg(&pb.Message{}); err != nil { + t.Error(err) + return + } + w := protoio.NewDelimitedWriter(s) + res := pb.Message{ + Type: pb.Message_DIAL_RESPONSE.Enum(), + DialResponse: newDialResponseOK(s.Conn().RemoteMultiaddr()), + } + w.WriteMsg(&res) + }) + return h +} + +func makeAutoNAT(t *testing.T, ash host.Host) (host.Host, AutoNAT) { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + h.Peerstore().AddAddrs(ash.ID(), ash.Addrs(), time.Minute) + h.Peerstore().AddProtocols(ash.ID(), AutoNATProto) + a, _ := New(h, WithSchedule(100*time.Millisecond, time.Second), WithoutStartupDelay()) + a.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true + a.(*AmbientAutoNAT).config.throttlePeerPeriod = 100 * time.Millisecond + return h, a +} + +func identifyAsServer(server, recip host.Host) { + recip.Peerstore().AddAddrs(server.ID(), server.Addrs(), time.Minute) + recip.Peerstore().AddProtocols(server.ID(), AutoNATProto) + +} + +func connect(t *testing.T, a, b host.Host) { + pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()} + err := b.Connect(context.Background(), pinfo) + if err != nil { + t.Fatal(err) + } +} + +func expectEvent(t *testing.T, s event.Subscription, expected network.Reachability) { + select { + case e := <-s.Out(): + ev, ok := e.(event.EvtLocalReachabilityChanged) + if !ok || ev.Reachability != expected { + t.Fatal("got wrong event type from the bus") + } + + case <-time.After(100 * time.Millisecond): + t.Fatal("failed to get the reachability event from the bus") + } +} + +// tests +func TestAutoNATPrivate(t *testing.T) { + hs := makeAutoNATServicePrivate(t) + defer hs.Close() + hc, an := makeAutoNAT(t, hs) + defer hc.Close() + defer an.Close() + + // subscribe to AutoNat events + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) + } + + status := an.Status() + if status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + require.Eventually(t, + func() bool { return an.Status() == network.ReachabilityPrivate }, + 2*time.Second, + 25*time.Millisecond, + "expected NAT status to be private", + ) + expectEvent(t, s, network.ReachabilityPrivate) +} + +func TestAutoNATPublic(t *testing.T) { + hs := makeAutoNATServicePublic(t) + defer hs.Close() + hc, an := makeAutoNAT(t, hs) + defer hc.Close() + defer an.Close() + + // subscribe to AutoNat events + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) + } + + status := an.Status() + if status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + require.Eventually(t, + func() bool { return an.Status() == network.ReachabilityPublic }, + 2*time.Second, + 25*time.Millisecond, + "expected NAT status to be public", + ) + + expectEvent(t, s, network.ReachabilityPublic) +} + +func TestAutoNATPublictoPrivate(t *testing.T) { + hs := makeAutoNATServicePublic(t) + defer hs.Close() + hc, an := makeAutoNAT(t, hs) + defer hc.Close() + defer an.Close() + + // subscribe to AutoNat events + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err) + } + + if status := an.Status(); status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + require.Eventually(t, + func() bool { return an.Status() == network.ReachabilityPublic }, + 2*time.Second, + 25*time.Millisecond, + "expected NAT status to be public", + ) + expectEvent(t, s, network.ReachabilityPublic) + + hs.SetStreamHandler(AutoNATProto, sayPrivateStreamHandler(t)) + hps := makeAutoNATServicePrivate(t) + connect(t, hps, hc) + identifyAsServer(hps, hc) + + require.Eventually(t, + func() bool { return an.Status() == network.ReachabilityPrivate }, + 2*time.Second, + 25*time.Millisecond, + "expected NAT status to be private", + ) + expectEvent(t, s, network.ReachabilityPrivate) +} + +func TestAutoNATIncomingEvents(t *testing.T) { + hs := makeAutoNATServicePrivate(t) + defer hs.Close() + hc, ani := makeAutoNAT(t, hs) + defer hc.Close() + defer ani.Close() + an := ani.(*AmbientAutoNAT) + + status := an.Status() + if status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + + em, _ := hc.EventBus().Emitter(&event.EvtPeerIdentificationCompleted{}) + em.Emit(event.EvtPeerIdentificationCompleted{Peer: hs.ID()}) + + require.Eventually(t, func() bool { + return an.Status() != network.ReachabilityUnknown + }, 500*time.Millisecond, 10*time.Millisecond, "Expected probe due to identification of autonat service") +} + +func TestAutoNATObservationRecording(t *testing.T) { + hs := makeAutoNATServicePublic(t) + defer hs.Close() + hc, ani := makeAutoNAT(t, hs) + defer hc.Close() + defer ani.Close() + an := ani.(*AmbientAutoNAT) + + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err) + } + + // pubic observation without address should be ignored. + an.recordObservation(autoNATResult{network.ReachabilityPublic, nil}) + if an.Status() != network.ReachabilityUnknown { + t.Fatalf("unexpected transition") + } + + select { + case <-s.Out(): + t.Fatal("not expecting a public reachability event") + default: + // expected + } + + addr, _ := ma.NewMultiaddr("/ip4/127.0.0.1/udp/1234") + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("failed to transition to public.") + } + + expectEvent(t, s, network.ReachabilityPublic) + + // a single recording should have confidence still at 0, and transition to private quickly. + an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil}) + if an.Status() != network.ReachabilityPrivate { + t.Fatalf("failed to transition to private.") + } + + expectEvent(t, s, network.ReachabilityPrivate) + + // stronger public confidence should be harder to undo. + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + an.recordObservation(autoNATResult{network.ReachabilityPublic, addr}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("failed to transition to public.") + } + + expectEvent(t, s, network.ReachabilityPublic) + + an.recordObservation(autoNATResult{network.ReachabilityPrivate, nil}) + if an.Status() != network.ReachabilityPublic { + t.Fatalf("too-extreme private transition.") + } + +} + +func TestStaticNat(t *testing.T) { + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + defer h.Close() + s, _ := h.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + + nat, err := New(h, WithReachability(network.ReachabilityPrivate)) + if err != nil { + t.Fatal(err) + } + if nat.Status() != network.ReachabilityPrivate { + t.Fatalf("should be private") + } + expectEvent(t, s, network.ReachabilityPrivate) +} diff --git a/p2p/host/autonat/client.go b/p2p/host/autonat/client.go new file mode 100644 index 0000000000..47936d9018 --- /dev/null +++ b/p2p/host/autonat/client.go @@ -0,0 +1,103 @@ +package autonat + +import ( + "context" + "fmt" + "time" + + pb "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-msgio/protoio" + + ma "github.com/multiformats/go-multiaddr" +) + +// NewAutoNATClient creates a fresh instance of an AutoNATClient +// If addrFunc is nil, h.Addrs will be used +func NewAutoNATClient(h host.Host, addrFunc AddrFunc) Client { + if addrFunc == nil { + addrFunc = h.Addrs + } + return &client{h: h, addrFunc: addrFunc} +} + +type client struct { + h host.Host + addrFunc AddrFunc +} + +// DialBack asks peer p to dial us back on all addresses returned by the addrFunc. +// It blocks until we've received a response from the peer. +func (c *client) DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error) { + s, err := c.h.NewStream(ctx, p, AutoNATProto) + if err != nil { + return nil, err + } + s.SetDeadline(time.Now().Add(streamTimeout)) + // Might as well just reset the stream. Once we get to this point, we + // don't care about being nice. + defer s.Close() + + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + w := protoio.NewDelimitedWriter(s) + + req := newDialMessage(peer.AddrInfo{ID: c.h.ID(), Addrs: c.addrFunc()}) + if err := w.WriteMsg(req); err != nil { + s.Reset() + return nil, err + } + + var res pb.Message + if err := r.ReadMsg(&res); err != nil { + s.Reset() + return nil, err + } + if res.GetType() != pb.Message_DIAL_RESPONSE { + s.Reset() + return nil, fmt.Errorf("unexpected response: %s", res.GetType().String()) + } + + status := res.GetDialResponse().GetStatus() + switch status { + case pb.Message_OK: + addr := res.GetDialResponse().GetAddr() + return ma.NewMultiaddrBytes(addr) + default: + return nil, Error{Status: status, Text: res.GetDialResponse().GetStatusText()} + } +} + +// Error wraps errors signalled by AutoNAT services +type Error struct { + Status pb.Message_ResponseStatus + Text string +} + +func (e Error) Error() string { + return fmt.Sprintf("AutoNAT error: %s (%s)", e.Text, e.Status.String()) +} + +// IsDialError returns true if the error was due to a dial back failure +func (e Error) IsDialError() bool { + return e.Status == pb.Message_E_DIAL_ERROR +} + +// IsDialRefused returns true if the error was due to a refusal to dial back +func (e Error) IsDialRefused() bool { + return e.Status == pb.Message_E_DIAL_REFUSED +} + +// IsDialError returns true if the AutoNAT peer signalled an error dialing back +func IsDialError(e error) bool { + ae, ok := e.(Error) + return ok && ae.IsDialError() +} + +// IsDialRefused returns true if the AutoNAT peer signalled refusal to dial back +func IsDialRefused(e error) bool { + ae, ok := e.(Error) + return ok && ae.IsDialRefused() +} diff --git a/p2p/host/autonat/dialpolicy.go b/p2p/host/autonat/dialpolicy.go new file mode 100644 index 0000000000..653882cbad --- /dev/null +++ b/p2p/host/autonat/dialpolicy.go @@ -0,0 +1,94 @@ +package autonat + +import ( + "net" + + "github.com/libp2p/go-libp2p-core/host" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +type dialPolicy struct { + allowSelfDials bool + host host.Host +} + +// skipDial indicates that a multiaddress isn't worth attempted dialing. +// The same logic is used when the autonat client is considering if +// a remote peer is worth using as a server, and when the server is +// considering if a requested client is worth dialing back. +func (d *dialPolicy) skipDial(addr ma.Multiaddr) bool { + // skip relay addresses + _, err := addr.ValueForProtocol(ma.P_CIRCUIT) + if err == nil { + return true + } + + if d.allowSelfDials { + return false + } + + // skip private network (unroutable) addresses + if !manet.IsPublicAddr(addr) { + return true + } + candidateIP, err := manet.ToIP(addr) + if err != nil { + return true + } + + // Skip dialing addresses we believe are the local node's + for _, localAddr := range d.host.Addrs() { + localIP, err := manet.ToIP(localAddr) + if err != nil { + continue + } + if localIP.Equal(candidateIP) { + return true + } + } + + return false +} + +// skipPeer indicates that the collection of multiaddresses representing a peer +// isn't worth attempted dialing. If one of the addresses matches an address +// we believe is ours, we exclude the peer, even if there are other valid +// public addresses in the list. +func (d *dialPolicy) skipPeer(addrs []ma.Multiaddr) bool { + localAddrs := d.host.Addrs() + localHosts := make([]net.IP, 0) + for _, lAddr := range localAddrs { + if _, err := lAddr.ValueForProtocol(ma.P_CIRCUIT); err != nil && manet.IsPublicAddr(lAddr) { + lIP, err := manet.ToIP(lAddr) + if err != nil { + continue + } + localHosts = append(localHosts, lIP) + } + } + + // if a public IP of the peer is one of ours: skip the peer. + goodPublic := false + for _, addr := range addrs { + if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err != nil && manet.IsPublicAddr(addr) { + aIP, err := manet.ToIP(addr) + if err != nil { + continue + } + + for _, lIP := range localHosts { + if lIP.Equal(aIP) { + return true + } + } + goodPublic = true + } + } + + if d.allowSelfDials { + return false + } + + return !goodPublic +} diff --git a/p2p/host/autonat/dialpolicy_test.go b/p2p/host/autonat/dialpolicy_test.go new file mode 100644 index 0000000000..92436405ff --- /dev/null +++ b/p2p/host/autonat/dialpolicy_test.go @@ -0,0 +1,135 @@ +package autonat + +import ( + "context" + "errors" + "net" + "testing" + + blankhost "github.com/libp2p/go-libp2p-blankhost" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/transport" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + "github.com/multiformats/go-multiaddr" +) + +func makeMA(a string) multiaddr.Multiaddr { + addr, err := multiaddr.NewMultiaddr(a) + if err != nil { + panic(err) + } + return addr +} + +type mockT struct { + ctx context.Context + addr multiaddr.Multiaddr +} + +func (m *mockT) Dial(ctx context.Context, a multiaddr.Multiaddr, p peer.ID) (transport.CapableConn, error) { + return nil, nil +} +func (m *mockT) CanDial(_ multiaddr.Multiaddr) bool { return true } +func (m *mockT) Listen(a multiaddr.Multiaddr) (transport.Listener, error) { + return &mockL{m.ctx, m.addr}, nil +} +func (m *mockT) Protocols() []int { return []int{multiaddr.P_IP4} } +func (m *mockT) Proxy() bool { return false } +func (m *mockT) String() string { return "mock-tcp-ipv4" } + +type mockL struct { + ctx context.Context + addr multiaddr.Multiaddr +} + +func (l *mockL) Accept() (transport.CapableConn, error) { + <-l.ctx.Done() + return nil, errors.New("expected in mocked test") +} +func (l *mockL) Close() error { return nil } +func (l *mockL) Addr() net.Addr { return nil } +func (l *mockL) Multiaddr() multiaddr.Multiaddr { return l.addr } + +func TestSkipDial(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := swarmt.GenSwarm(t) + d := dialPolicy{host: blankhost.NewBlankHost(s)} + if d.skipDial(makeMA("/ip4/8.8.8.8")) != false { + t.Fatal("failed dialing a valid public addr") + } + + if d.skipDial(makeMA("/ip6/2607:f8b0:400a::1")) != false { + t.Fatal("failed dialing a valid public addr") + } + + if d.skipDial(makeMA("/ip4/192.168.0.1")) != true { + t.Fatal("didn't skip dialing an internal addr") + } + + s.AddTransport(&mockT{ctx, makeMA("/ip4/8.8.8.8")}) + err := s.AddListenAddr(makeMA("/ip4/8.8.8.8")) + if err != nil { + t.Fatal(err) + } + if d.skipDial(makeMA("/ip4/8.8.8.8")) != true { + t.Fatal("failed dialing a valid host address") + } +} + +func TestSkipPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := swarmt.GenSwarm(t) + d := dialPolicy{host: blankhost.NewBlankHost(s)} + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8")}) != false { + t.Fatal("failed dialing a valid public addr") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != false { + t.Fatal("failed dialing a valid public addr") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/192.168.0.1")}) != true { + t.Fatal("succeeded with no public addr") + } + + s.AddTransport(&mockT{ctx, makeMA("/ip4/8.8.8.8")}) + err := s.AddListenAddr(makeMA("/ip4/8.8.8.8")) + if err != nil { + t.Fatal(err) + } + + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != true { + t.Fatal("succeeded dialing host address") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/9.9.9.9")}) != true { + t.Fatal("succeeded dialing host address when other public") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/9.9.9.9")}) != false { + t.Fatal("succeeded dialing host address when other public") + } +} + +func TestSkipLocalPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := swarmt.GenSwarm(t) + d := dialPolicy{host: blankhost.NewBlankHost(s)} + s.AddTransport(&mockT{ctx, makeMA("/ip4/192.168.0.1")}) + err := s.AddListenAddr(makeMA("/ip4/192.168.0.1")) + if err != nil { + t.Fatal(err) + } + + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8")}) != false { + t.Fatal("failed dialing a valid public addr") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/8.8.8.8"), makeMA("/ip4/192.168.0.1")}) != false { + t.Fatal("failed dialing a valid public addr") + } + if d.skipPeer([]multiaddr.Multiaddr{makeMA("/ip4/192.168.0.1")}) != true { + t.Fatal("succeeded with no public addr") + } +} diff --git a/p2p/host/autonat/interface.go b/p2p/host/autonat/interface.go new file mode 100644 index 0000000000..2551f2c5a4 --- /dev/null +++ b/p2p/host/autonat/interface.go @@ -0,0 +1,34 @@ +package autonat + +import ( + "context" + "io" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +// AutoNAT is the interface for NAT autodiscovery +type AutoNAT interface { + // Status returns the current NAT status + Status() network.Reachability + // PublicAddr returns the public dial address when NAT status is public and an + // error otherwise + PublicAddr() (ma.Multiaddr, error) + io.Closer +} + +// Client is a stateless client interface to AutoNAT peers +type Client interface { + // DialBack requests from a peer providing AutoNAT services to test dial back + // and report the address on a successful connection. + DialBack(ctx context.Context, p peer.ID) (ma.Multiaddr, error) +} + +// AddrFunc is a function returning the candidate addresses for the local host. +type AddrFunc func() []ma.Multiaddr + +// Option is an Autonat option for configuration +type Option func(*config) error diff --git a/p2p/host/autonat/notify.go b/p2p/host/autonat/notify.go new file mode 100644 index 0000000000..c5811f160e --- /dev/null +++ b/p2p/host/autonat/notify.go @@ -0,0 +1,36 @@ +package autonat + +import ( + "github.com/libp2p/go-libp2p-core/network" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +var _ network.Notifiee = (*AmbientAutoNAT)(nil) + +// Listen is part of the network.Notifiee interface +func (as *AmbientAutoNAT) Listen(net network.Network, a ma.Multiaddr) {} + +// ListenClose is part of the network.Notifiee interface +func (as *AmbientAutoNAT) ListenClose(net network.Network, a ma.Multiaddr) {} + +// OpenedStream is part of the network.Notifiee interface +func (as *AmbientAutoNAT) OpenedStream(net network.Network, s network.Stream) {} + +// ClosedStream is part of the network.Notifiee interface +func (as *AmbientAutoNAT) ClosedStream(net network.Network, s network.Stream) {} + +// Connected is part of the network.Notifiee interface +func (as *AmbientAutoNAT) Connected(net network.Network, c network.Conn) { + if c.Stat().Direction == network.DirInbound && + manet.IsPublicAddr(c.RemoteMultiaddr()) { + select { + case as.inboundConn <- c: + default: + } + } +} + +// Disconnected is part of the network.Notifiee interface +func (as *AmbientAutoNAT) Disconnected(net network.Network, c network.Conn) {} diff --git a/p2p/host/autonat/options.go b/p2p/host/autonat/options.go new file mode 100644 index 0000000000..390533137b --- /dev/null +++ b/p2p/host/autonat/options.go @@ -0,0 +1,144 @@ +package autonat + +import ( + "errors" + "time" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" +) + +// config holds configurable options for the autonat subsystem. +type config struct { + host host.Host + + addressFunc AddrFunc + dialPolicy dialPolicy + dialer network.Network + forceReachability bool + reachability network.Reachability + + // client + bootDelay time.Duration + retryInterval time.Duration + refreshInterval time.Duration + requestTimeout time.Duration + throttlePeerPeriod time.Duration + + // server + dialTimeout time.Duration + maxPeerAddresses int + throttleGlobalMax int + throttlePeerMax int + throttleResetPeriod time.Duration + throttleResetJitter time.Duration +} + +var defaults = func(c *config) error { + c.bootDelay = 15 * time.Second + c.retryInterval = 90 * time.Second + c.refreshInterval = 15 * time.Minute + c.requestTimeout = 30 * time.Second + c.throttlePeerPeriod = 90 * time.Second + + c.dialTimeout = 15 * time.Second + c.maxPeerAddresses = 16 + c.throttleGlobalMax = 30 + c.throttlePeerMax = 3 + c.throttleResetPeriod = 1 * time.Minute + c.throttleResetJitter = 15 * time.Second + return nil +} + +// EnableService specifies that AutoNAT should be allowed to run a NAT service to help +// other peers determine their own NAT status. The provided Network should not be the +// default network/dialer of the host passed to `New`, as the NAT system will need to +// make parallel connections, and as such will modify both the associated peerstore +// and terminate connections of this dialer. The dialer provided +// should be compatible (TCP/UDP) however with the transports of the libp2p network. +func EnableService(dialer network.Network) Option { + return func(c *config) error { + if dialer == c.host.Network() || dialer.Peerstore() == c.host.Peerstore() { + return errors.New("dialer should not be that of the host") + } + c.dialer = dialer + return nil + } +} + +// WithReachability overrides autonat to simply report an over-ridden reachability +// status. +func WithReachability(reachability network.Reachability) Option { + return func(c *config) error { + c.forceReachability = true + c.reachability = reachability + return nil + } +} + +// UsingAddresses allows overriding which Addresses the AutoNAT client believes +// are "its own". Useful for testing, or for more exotic port-forwarding +// scenarios where the host may be listening on different ports than it wants +// to externally advertise or verify connectability on. +func UsingAddresses(addrFunc AddrFunc) Option { + return func(c *config) error { + if addrFunc == nil { + return errors.New("invalid address function supplied") + } + c.addressFunc = addrFunc + return nil + } +} + +// WithSchedule configures how agressively probes will be made to verify the +// address of the host. retryInterval indicates how often probes should be made +// when the host lacks confident about its address, while refresh interval +// is the schedule of periodic probes when the host believes it knows its +// steady-state reachability. +func WithSchedule(retryInterval, refreshInterval time.Duration) Option { + return func(c *config) error { + c.retryInterval = retryInterval + c.refreshInterval = refreshInterval + return nil + } +} + +// WithoutStartupDelay removes the initial delay the NAT subsystem typically +// uses as a buffer for ensuring that connectivity and guesses as to the hosts +// local interfaces have settled down during startup. +func WithoutStartupDelay() Option { + return func(c *config) error { + c.bootDelay = 1 + return nil + } +} + +// WithoutThrottling indicates that this autonat service should not place +// restrictions on how many peers it is willing to help when acting as +// a server. +func WithoutThrottling() Option { + return func(c *config) error { + c.throttleGlobalMax = 0 + return nil + } +} + +// WithThrottling specifies how many peers (`amount`) it is willing to help +// ever `interval` amount of time when acting as a server. +func WithThrottling(amount int, interval time.Duration) Option { + return func(c *config) error { + c.throttleGlobalMax = amount + c.throttleResetPeriod = interval + c.throttleResetJitter = interval / 4 + return nil + } +} + +// WithPeerThrottling specifies a limit for the maximum number of IP checks +// this node will provide to an individual peer in each `interval`. +func WithPeerThrottling(amount int) Option { + return func(c *config) error { + c.throttlePeerMax = amount + return nil + } +} diff --git a/p2p/host/autonat/pb/Makefile b/p2p/host/autonat/pb/Makefile new file mode 100644 index 0000000000..dd21e878f8 --- /dev/null +++ b/p2p/host/autonat/pb/Makefile @@ -0,0 +1,6 @@ +pbgos := $(patsubst %.proto,%.pb.go,$(wildcard *.proto)) + +all: $(pbgos) + +%.pb.go: %.proto + protoc --gogofast_out=. --proto_path=$(GOPATH)/src:. $< diff --git a/p2p/host/autonat/pb/autonat.pb.go b/p2p/host/autonat/pb/autonat.pb.go new file mode 100644 index 0000000000..a22b5e99e3 --- /dev/null +++ b/p2p/host/autonat/pb/autonat.pb.go @@ -0,0 +1,1246 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: autonat.proto + +package autonat_pb + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type Message_MessageType int32 + +const ( + Message_DIAL Message_MessageType = 0 + Message_DIAL_RESPONSE Message_MessageType = 1 +) + +var Message_MessageType_name = map[int32]string{ + 0: "DIAL", + 1: "DIAL_RESPONSE", +} + +var Message_MessageType_value = map[string]int32{ + "DIAL": 0, + "DIAL_RESPONSE": 1, +} + +func (x Message_MessageType) Enum() *Message_MessageType { + p := new(Message_MessageType) + *p = x + return p +} + +func (x Message_MessageType) String() string { + return proto.EnumName(Message_MessageType_name, int32(x)) +} + +func (x *Message_MessageType) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_MessageType_value, data, "Message_MessageType") + if err != nil { + return err + } + *x = Message_MessageType(value) + return nil +} + +func (Message_MessageType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0, 0} +} + +type Message_ResponseStatus int32 + +const ( + Message_OK Message_ResponseStatus = 0 + Message_E_DIAL_ERROR Message_ResponseStatus = 100 + Message_E_DIAL_REFUSED Message_ResponseStatus = 101 + Message_E_BAD_REQUEST Message_ResponseStatus = 200 + Message_E_INTERNAL_ERROR Message_ResponseStatus = 300 +) + +var Message_ResponseStatus_name = map[int32]string{ + 0: "OK", + 100: "E_DIAL_ERROR", + 101: "E_DIAL_REFUSED", + 200: "E_BAD_REQUEST", + 300: "E_INTERNAL_ERROR", +} + +var Message_ResponseStatus_value = map[string]int32{ + "OK": 0, + "E_DIAL_ERROR": 100, + "E_DIAL_REFUSED": 101, + "E_BAD_REQUEST": 200, + "E_INTERNAL_ERROR": 300, +} + +func (x Message_ResponseStatus) Enum() *Message_ResponseStatus { + p := new(Message_ResponseStatus) + *p = x + return p +} + +func (x Message_ResponseStatus) String() string { + return proto.EnumName(Message_ResponseStatus_name, int32(x)) +} + +func (x *Message_ResponseStatus) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_ResponseStatus_value, data, "Message_ResponseStatus") + if err != nil { + return err + } + *x = Message_ResponseStatus(value) + return nil +} + +func (Message_ResponseStatus) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0, 1} +} + +type Message struct { + Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=autonat.pb.Message_MessageType" json:"type,omitempty"` + Dial *Message_Dial `protobuf:"bytes,2,opt,name=dial" json:"dial,omitempty"` + DialResponse *Message_DialResponse `protobuf:"bytes,3,opt,name=dialResponse" json:"dialResponse,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0} +} +func (m *Message) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return m.Size() +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetType() Message_MessageType { + if m != nil && m.Type != nil { + return *m.Type + } + return Message_DIAL +} + +func (m *Message) GetDial() *Message_Dial { + if m != nil { + return m.Dial + } + return nil +} + +func (m *Message) GetDialResponse() *Message_DialResponse { + if m != nil { + return m.DialResponse + } + return nil +} + +type Message_PeerInfo struct { + Id []byte `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"` + Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message_PeerInfo) Reset() { *m = Message_PeerInfo{} } +func (m *Message_PeerInfo) String() string { return proto.CompactTextString(m) } +func (*Message_PeerInfo) ProtoMessage() {} +func (*Message_PeerInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0, 0} +} +func (m *Message_PeerInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Message_PeerInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Message_PeerInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Message_PeerInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message_PeerInfo.Merge(m, src) +} +func (m *Message_PeerInfo) XXX_Size() int { + return m.Size() +} +func (m *Message_PeerInfo) XXX_DiscardUnknown() { + xxx_messageInfo_Message_PeerInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_Message_PeerInfo proto.InternalMessageInfo + +func (m *Message_PeerInfo) GetId() []byte { + if m != nil { + return m.Id + } + return nil +} + +func (m *Message_PeerInfo) GetAddrs() [][]byte { + if m != nil { + return m.Addrs + } + return nil +} + +type Message_Dial struct { + Peer *Message_PeerInfo `protobuf:"bytes,1,opt,name=peer" json:"peer,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message_Dial) Reset() { *m = Message_Dial{} } +func (m *Message_Dial) String() string { return proto.CompactTextString(m) } +func (*Message_Dial) ProtoMessage() {} +func (*Message_Dial) Descriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0, 1} +} +func (m *Message_Dial) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Message_Dial) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Message_Dial.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Message_Dial) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message_Dial.Merge(m, src) +} +func (m *Message_Dial) XXX_Size() int { + return m.Size() +} +func (m *Message_Dial) XXX_DiscardUnknown() { + xxx_messageInfo_Message_Dial.DiscardUnknown(m) +} + +var xxx_messageInfo_Message_Dial proto.InternalMessageInfo + +func (m *Message_Dial) GetPeer() *Message_PeerInfo { + if m != nil { + return m.Peer + } + return nil +} + +type Message_DialResponse struct { + Status *Message_ResponseStatus `protobuf:"varint,1,opt,name=status,enum=autonat.pb.Message_ResponseStatus" json:"status,omitempty"` + StatusText *string `protobuf:"bytes,2,opt,name=statusText" json:"statusText,omitempty"` + Addr []byte `protobuf:"bytes,3,opt,name=addr" json:"addr,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message_DialResponse) Reset() { *m = Message_DialResponse{} } +func (m *Message_DialResponse) String() string { return proto.CompactTextString(m) } +func (*Message_DialResponse) ProtoMessage() {} +func (*Message_DialResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_a04e278ef61ac07a, []int{0, 2} +} +func (m *Message_DialResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Message_DialResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Message_DialResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Message_DialResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message_DialResponse.Merge(m, src) +} +func (m *Message_DialResponse) XXX_Size() int { + return m.Size() +} +func (m *Message_DialResponse) XXX_DiscardUnknown() { + xxx_messageInfo_Message_DialResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_Message_DialResponse proto.InternalMessageInfo + +func (m *Message_DialResponse) GetStatus() Message_ResponseStatus { + if m != nil && m.Status != nil { + return *m.Status + } + return Message_OK +} + +func (m *Message_DialResponse) GetStatusText() string { + if m != nil && m.StatusText != nil { + return *m.StatusText + } + return "" +} + +func (m *Message_DialResponse) GetAddr() []byte { + if m != nil { + return m.Addr + } + return nil +} + +func init() { + proto.RegisterEnum("autonat.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value) + proto.RegisterEnum("autonat.pb.Message_ResponseStatus", Message_ResponseStatus_name, Message_ResponseStatus_value) + proto.RegisterType((*Message)(nil), "autonat.pb.Message") + proto.RegisterType((*Message_PeerInfo)(nil), "autonat.pb.Message.PeerInfo") + proto.RegisterType((*Message_Dial)(nil), "autonat.pb.Message.Dial") + proto.RegisterType((*Message_DialResponse)(nil), "autonat.pb.Message.DialResponse") +} + +func init() { proto.RegisterFile("autonat.proto", fileDescriptor_a04e278ef61ac07a) } + +var fileDescriptor_a04e278ef61ac07a = []byte{ + // 372 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xcf, 0x8a, 0xda, 0x50, + 0x14, 0xc6, 0xbd, 0x31, 0xb5, 0xf6, 0x18, 0xc3, 0xed, 0xa1, 0x85, 0x20, 0x25, 0x0d, 0x59, 0x49, + 0x29, 0x22, 0x76, 0x53, 0xba, 0x53, 0x72, 0x0b, 0xd2, 0x56, 0xed, 0x49, 0x5c, 0x87, 0x94, 0xdc, + 0x0e, 0x01, 0x31, 0x21, 0x89, 0x30, 0x6e, 0xe6, 0x89, 0x66, 0x3b, 0xef, 0xe0, 0x72, 0x1e, 0x61, + 0xf0, 0x49, 0x86, 0x5c, 0xa3, 0xa3, 0xe0, 0xac, 0xce, 0x1f, 0x7e, 0xdf, 0x39, 0x1f, 0x1f, 0x74, + 0xa3, 0x4d, 0x99, 0xae, 0xa3, 0x72, 0x90, 0xe5, 0x69, 0x99, 0x22, 0x9c, 0xc6, 0x7f, 0xee, 0x83, + 0x0e, 0x6f, 0xff, 0xc8, 0xa2, 0x88, 0x6e, 0x24, 0x7e, 0x03, 0xbd, 0xdc, 0x66, 0xd2, 0x62, 0x0e, + 0xeb, 0x9b, 0xa3, 0xcf, 0x83, 0x17, 0x6c, 0x50, 0x23, 0xc7, 0x1a, 0x6c, 0x33, 0x49, 0x0a, 0xc6, + 0xaf, 0xa0, 0xc7, 0x49, 0xb4, 0xb2, 0x34, 0x87, 0xf5, 0x3b, 0x23, 0xeb, 0x9a, 0xc8, 0x4b, 0xa2, + 0x15, 0x29, 0x0a, 0x3d, 0x30, 0xaa, 0x4a, 0xb2, 0xc8, 0xd2, 0x75, 0x21, 0xad, 0xa6, 0x52, 0x39, + 0xaf, 0xaa, 0x6a, 0x8e, 0x2e, 0x54, 0xbd, 0x21, 0xb4, 0x17, 0x52, 0xe6, 0xd3, 0xf5, 0xff, 0x14, + 0x4d, 0xd0, 0x92, 0x58, 0x59, 0x36, 0x48, 0x4b, 0x62, 0xfc, 0x00, 0x6f, 0xa2, 0x38, 0xce, 0x0b, + 0x4b, 0x73, 0x9a, 0x7d, 0x83, 0x0e, 0x43, 0xef, 0x3b, 0xe8, 0xd5, 0x3d, 0x1c, 0x82, 0x9e, 0x49, + 0x99, 0x2b, 0xbe, 0x33, 0xfa, 0x74, 0xed, 0xef, 0xf1, 0x32, 0x29, 0xb2, 0x77, 0x07, 0xc6, 0xb9, + 0x13, 0xfc, 0x01, 0xad, 0xa2, 0x8c, 0xca, 0x4d, 0x51, 0xc7, 0xe4, 0x5e, 0xbb, 0x71, 0xa4, 0x7d, + 0x45, 0x52, 0xad, 0x40, 0x1b, 0xe0, 0xd0, 0x05, 0xf2, 0xb6, 0x54, 0x89, 0xbd, 0xa3, 0xb3, 0x0d, + 0x22, 0xe8, 0x95, 0x5d, 0x95, 0x8a, 0x41, 0xaa, 0x77, 0xbf, 0x40, 0xe7, 0x2c, 0x74, 0x6c, 0x83, + 0xee, 0x4d, 0xc7, 0xbf, 0x79, 0x03, 0xdf, 0x43, 0xb7, 0xea, 0x42, 0x12, 0xfe, 0x62, 0x3e, 0xf3, + 0x05, 0x67, 0x6e, 0x02, 0xe6, 0xe5, 0x67, 0x6c, 0x81, 0x36, 0xff, 0xc5, 0x1b, 0xc8, 0xc1, 0x10, + 0xa1, 0xc2, 0x05, 0xd1, 0x9c, 0x78, 0x8c, 0x08, 0x66, 0xbd, 0x21, 0xf1, 0x73, 0xe9, 0x0b, 0x8f, + 0x4b, 0x44, 0xe8, 0x8a, 0x70, 0x32, 0xf6, 0x42, 0x12, 0x7f, 0x97, 0xc2, 0x0f, 0xf8, 0x8e, 0xe1, + 0x47, 0xe0, 0x22, 0x9c, 0xce, 0x02, 0x41, 0xb3, 0x93, 0xfa, 0x5e, 0x9b, 0x18, 0xbb, 0xbd, 0xcd, + 0x1e, 0xf7, 0x36, 0x7b, 0xda, 0xdb, 0xec, 0x39, 0x00, 0x00, 0xff, 0xff, 0x8e, 0xe2, 0x93, 0x4e, + 0x61, 0x02, 0x00, 0x00, +} + +func (m *Message) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Message) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.DialResponse != nil { + { + size, err := m.DialResponse.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintAutonat(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + if m.Dial != nil { + { + size, err := m.Dial.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintAutonat(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + if m.Type != nil { + i = encodeVarintAutonat(dAtA, i, uint64(*m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Message_PeerInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Message_PeerInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_PeerInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Addrs) > 0 { + for iNdEx := len(m.Addrs) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Addrs[iNdEx]) + copy(dAtA[i:], m.Addrs[iNdEx]) + i = encodeVarintAutonat(dAtA, i, uint64(len(m.Addrs[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if m.Id != nil { + i -= len(m.Id) + copy(dAtA[i:], m.Id) + i = encodeVarintAutonat(dAtA, i, uint64(len(m.Id))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Message_Dial) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Message_Dial) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_Dial) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Peer != nil { + { + size, err := m.Peer.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintAutonat(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Message_DialResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Message_DialResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message_DialResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Addr != nil { + i -= len(m.Addr) + copy(dAtA[i:], m.Addr) + i = encodeVarintAutonat(dAtA, i, uint64(len(m.Addr))) + i-- + dAtA[i] = 0x1a + } + if m.StatusText != nil { + i -= len(*m.StatusText) + copy(dAtA[i:], *m.StatusText) + i = encodeVarintAutonat(dAtA, i, uint64(len(*m.StatusText))) + i-- + dAtA[i] = 0x12 + } + if m.Status != nil { + i = encodeVarintAutonat(dAtA, i, uint64(*m.Status)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintAutonat(dAtA []byte, offset int, v uint64) int { + offset -= sovAutonat(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Message) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != nil { + n += 1 + sovAutonat(uint64(*m.Type)) + } + if m.Dial != nil { + l = m.Dial.Size() + n += 1 + l + sovAutonat(uint64(l)) + } + if m.DialResponse != nil { + l = m.DialResponse.Size() + n += 1 + l + sovAutonat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Message_PeerInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Id != nil { + l = len(m.Id) + n += 1 + l + sovAutonat(uint64(l)) + } + if len(m.Addrs) > 0 { + for _, b := range m.Addrs { + l = len(b) + n += 1 + l + sovAutonat(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Message_Dial) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Peer != nil { + l = m.Peer.Size() + n += 1 + l + sovAutonat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Message_DialResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Status != nil { + n += 1 + sovAutonat(uint64(*m.Status)) + } + if m.StatusText != nil { + l = len(*m.StatusText) + n += 1 + l + sovAutonat(uint64(l)) + } + if m.Addr != nil { + l = len(m.Addr) + n += 1 + l + sovAutonat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovAutonat(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozAutonat(x uint64) (n int) { + return sovAutonat(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Message) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Message: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var v Message_MessageType + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= Message_MessageType(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Type = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Dial", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Dial == nil { + m.Dial = &Message_Dial{} + } + if err := m.Dial.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DialResponse", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DialResponse == nil { + m.DialResponse = &Message_DialResponse{} + } + if err := m.DialResponse.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAutonat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Message_PeerInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PeerInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PeerInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...) + if m.Id == nil { + m.Id = []byte{} + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addrs", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addrs = append(m.Addrs, make([]byte, postIndex-iNdEx)) + copy(m.Addrs[len(m.Addrs)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAutonat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Message_Dial) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Dial: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Dial: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Peer", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Peer == nil { + m.Peer = &Message_PeerInfo{} + } + if err := m.Peer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAutonat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Message_DialResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DialResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DialResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Status", wireType) + } + var v Message_ResponseStatus + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= Message_ResponseStatus(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Status = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StatusText", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + s := string(dAtA[iNdEx:postIndex]) + m.StatusText = &s + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Addr", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAutonat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthAutonat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthAutonat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Addr = append(m.Addr[:0], dAtA[iNdEx:postIndex]...) + if m.Addr == nil { + m.Addr = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipAutonat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthAutonat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipAutonat(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAutonat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAutonat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAutonat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthAutonat + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupAutonat + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthAutonat + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthAutonat = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowAutonat = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupAutonat = fmt.Errorf("proto: unexpected end of group") +) diff --git a/p2p/host/autonat/pb/autonat.proto b/p2p/host/autonat/pb/autonat.proto new file mode 100644 index 0000000000..777270a139 --- /dev/null +++ b/p2p/host/autonat/pb/autonat.proto @@ -0,0 +1,37 @@ +syntax = "proto2"; + +package autonat.pb; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} diff --git a/p2p/host/autonat/proto.go b/p2p/host/autonat/proto.go new file mode 100644 index 0000000000..69ac587173 --- /dev/null +++ b/p2p/host/autonat/proto.go @@ -0,0 +1,40 @@ +package autonat + +import ( + pb "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" +) + +// AutoNATProto identifies the autonat service protocol +const AutoNATProto = "/libp2p/autonat/1.0.0" + +func newDialMessage(pi peer.AddrInfo) *pb.Message { + msg := new(pb.Message) + msg.Type = pb.Message_DIAL.Enum() + msg.Dial = new(pb.Message_Dial) + msg.Dial.Peer = new(pb.Message_PeerInfo) + msg.Dial.Peer.Id = []byte(pi.ID) + msg.Dial.Peer.Addrs = make([][]byte, len(pi.Addrs)) + for i, addr := range pi.Addrs { + msg.Dial.Peer.Addrs[i] = addr.Bytes() + } + + return msg +} + +func newDialResponseOK(addr ma.Multiaddr) *pb.Message_DialResponse { + dr := new(pb.Message_DialResponse) + dr.Status = pb.Message_OK.Enum() + dr.Addr = addr.Bytes() + return dr +} + +func newDialResponseError(status pb.Message_ResponseStatus, text string) *pb.Message_DialResponse { + dr := new(pb.Message_DialResponse) + dr.Status = status.Enum() + dr.StatusText = &text + return dr +} diff --git a/p2p/host/autonat/svc.go b/p2p/host/autonat/svc.go new file mode 100644 index 0000000000..cf80226fe9 --- /dev/null +++ b/p2p/host/autonat/svc.go @@ -0,0 +1,262 @@ +package autonat + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + pb "github.com/libp2p/go-libp2p/p2p/host/autonat/pb" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + + "github.com/libp2p/go-msgio/protoio" + ma "github.com/multiformats/go-multiaddr" +) + +var streamTimeout = 60 * time.Second + +// AutoNATService provides NAT autodetection services to other peers +type autoNATService struct { + instanceLock sync.Mutex + instance context.CancelFunc + backgroundRunning chan struct{} // closed when background exits + + config *config + + // rate limiter + mx sync.Mutex + reqs map[peer.ID]int + globalReqs int +} + +// NewAutoNATService creates a new AutoNATService instance attached to a host +func newAutoNATService(c *config) (*autoNATService, error) { + if c.dialer == nil { + return nil, errors.New("cannot create NAT service without a network") + } + return &autoNATService{ + config: c, + reqs: make(map[peer.ID]int), + }, nil +} + +func (as *autoNATService) handleStream(s network.Stream) { + s.SetDeadline(time.Now().Add(streamTimeout)) + defer s.Close() + + pid := s.Conn().RemotePeer() + log.Debugf("New stream from %s", pid.Pretty()) + + r := protoio.NewDelimitedReader(s, network.MessageSizeMax) + w := protoio.NewDelimitedWriter(s) + + var req pb.Message + var res pb.Message + + err := r.ReadMsg(&req) + if err != nil { + log.Debugf("Error reading message from %s: %s", pid.Pretty(), err.Error()) + s.Reset() + return + } + + t := req.GetType() + if t != pb.Message_DIAL { + log.Debugf("Unexpected message from %s: %s (%d)", pid.Pretty(), t.String(), t) + s.Reset() + return + } + + dr := as.handleDial(pid, s.Conn().RemoteMultiaddr(), req.GetDial().GetPeer()) + res.Type = pb.Message_DIAL_RESPONSE.Enum() + res.DialResponse = dr + + err = w.WriteMsg(&res) + if err != nil { + log.Debugf("Error writing response to %s: %s", pid.Pretty(), err.Error()) + s.Reset() + return + } +} + +func (as *autoNATService) handleDial(p peer.ID, obsaddr ma.Multiaddr, mpi *pb.Message_PeerInfo) *pb.Message_DialResponse { + if mpi == nil { + return newDialResponseError(pb.Message_E_BAD_REQUEST, "missing peer info") + } + + mpid := mpi.GetId() + if mpid != nil { + mp, err := peer.IDFromBytes(mpid) + if err != nil { + return newDialResponseError(pb.Message_E_BAD_REQUEST, "bad peer id") + } + + if mp != p { + return newDialResponseError(pb.Message_E_BAD_REQUEST, "peer id mismatch") + } + } + + addrs := make([]ma.Multiaddr, 0, as.config.maxPeerAddresses) + seen := make(map[string]struct{}) + + // Don't even try to dial peers with blocked remote addresses. In order to dial a peer, we + // need to know their public IP address, and it needs to be different from our public IP + // address. + if as.config.dialPolicy.skipDial(obsaddr) { + return newDialResponseError(pb.Message_E_DIAL_ERROR, "refusing to dial peer with blocked observed address") + } + + // Determine the peer's IP address. + hostIP, _ := ma.SplitFirst(obsaddr) + switch hostIP.Protocol().Code { + case ma.P_IP4, ma.P_IP6: + default: + // This shouldn't be possible as we should skip all addresses that don't include + // public IP addresses. + return newDialResponseError(pb.Message_E_INTERNAL_ERROR, "expected an IP address") + } + + // add observed addr to the list of addresses to dial + addrs = append(addrs, obsaddr) + seen[obsaddr.String()] = struct{}{} + + for _, maddr := range mpi.GetAddrs() { + addr, err := ma.NewMultiaddrBytes(maddr) + if err != nil { + log.Debugf("Error parsing multiaddr: %s", err.Error()) + continue + } + + // For security reasons, we _only_ dial the observed IP address. + // Replace other IP addresses with the observed one so we can still try the + // requested ports/transports. + if ip, rest := ma.SplitFirst(addr); !ip.Equal(hostIP) { + // Make sure it's an IP address + switch ip.Protocol().Code { + case ma.P_IP4, ma.P_IP6: + default: + continue + } + addr = hostIP + if rest != nil { + addr = addr.Encapsulate(rest) + } + } + + // Make sure we're willing to dial the rest of the address (e.g., not a circuit + // address). + if as.config.dialPolicy.skipDial(addr) { + continue + } + + str := addr.String() + _, ok := seen[str] + if ok { + continue + } + + addrs = append(addrs, addr) + seen[str] = struct{}{} + + if len(addrs) >= as.config.maxPeerAddresses { + break + } + } + + if len(addrs) == 0 { + return newDialResponseError(pb.Message_E_DIAL_ERROR, "no dialable addresses") + } + + return as.doDial(peer.AddrInfo{ID: p, Addrs: addrs}) +} + +func (as *autoNATService) doDial(pi peer.AddrInfo) *pb.Message_DialResponse { + // rate limit check + as.mx.Lock() + count := as.reqs[pi.ID] + if count >= as.config.throttlePeerMax || (as.config.throttleGlobalMax > 0 && + as.globalReqs >= as.config.throttleGlobalMax) { + as.mx.Unlock() + return newDialResponseError(pb.Message_E_DIAL_REFUSED, "too many dials") + } + as.reqs[pi.ID] = count + 1 + as.globalReqs++ + as.mx.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), as.config.dialTimeout) + defer cancel() + + as.config.dialer.Peerstore().ClearAddrs(pi.ID) + + as.config.dialer.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) + + defer func() { + as.config.dialer.Peerstore().ClearAddrs(pi.ID) + as.config.dialer.Peerstore().RemovePeer(pi.ID) + }() + + conn, err := as.config.dialer.DialPeer(ctx, pi.ID) + if err != nil { + log.Debugf("error dialing %s: %s", pi.ID.Pretty(), err.Error()) + // wait for the context to timeout to avoid leaking timing information + // this renders the service ineffective as a port scanner + <-ctx.Done() + return newDialResponseError(pb.Message_E_DIAL_ERROR, "dial failed") + } + + ra := conn.RemoteMultiaddr() + as.config.dialer.ClosePeer(pi.ID) + return newDialResponseOK(ra) +} + +// Enable the autoNAT service if it is not running. +func (as *autoNATService) Enable() { + as.instanceLock.Lock() + defer as.instanceLock.Unlock() + if as.instance != nil { + return + } + ctx, cancel := context.WithCancel(context.Background()) + as.instance = cancel + as.backgroundRunning = make(chan struct{}) + + go as.background(ctx) +} + +// Disable the autoNAT service if it is running. +func (as *autoNATService) Disable() { + as.instanceLock.Lock() + defer as.instanceLock.Unlock() + if as.instance != nil { + as.instance() + as.instance = nil + <-as.backgroundRunning + } +} + +func (as *autoNATService) background(ctx context.Context) { + defer close(as.backgroundRunning) + as.config.host.SetStreamHandler(AutoNATProto, as.handleStream) + + timer := time.NewTimer(as.config.throttleResetPeriod) + defer timer.Stop() + + for { + select { + case <-timer.C: + as.mx.Lock() + as.reqs = make(map[peer.ID]int) + as.globalReqs = 0 + as.mx.Unlock() + jitter := rand.Float32() * float32(as.config.throttleResetJitter) + timer.Reset(as.config.throttleResetPeriod + time.Duration(int64(jitter))) + case <-ctx.Done(): + as.config.host.RemoveStreamHandler(AutoNATProto) + return + } + } +} diff --git a/p2p/host/autonat/svc_test.go b/p2p/host/autonat/svc_test.go new file mode 100644 index 0000000000..8759a1fe8c --- /dev/null +++ b/p2p/host/autonat/svc_test.go @@ -0,0 +1,230 @@ +package autonat + +import ( + "context" + "os" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + + bhost "github.com/libp2p/go-libp2p-blankhost" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + + ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func makeAutoNATConfig(t *testing.T) *config { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + dh := bhost.NewBlankHost(swarmt.GenSwarm(t)) + c := config{host: h, dialer: dh.Network()} + _ = defaults(&c) + c.forceReachability = true + c.dialPolicy.allowSelfDials = true + return &c +} + +func makeAutoNATService(t *testing.T, c *config) *autoNATService { + as, err := newAutoNATService(c) + if err != nil { + t.Fatal(err) + } + as.Enable() + + return as +} + +func makeAutoNATClient(t *testing.T) (host.Host, Client) { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + cli := NewAutoNATClient(h, nil) + return h, cli +} + +// Note: these tests assume that the host has only private network addresses! +func TestAutoNATServiceDialError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeAutoNATConfig(t) + defer c.host.Close() + defer c.dialer.Close() + + c.dialTimeout = 1 * time.Second + c.dialPolicy.allowSelfDials = false + _ = makeAutoNATService(t, c) + hc, ac := makeAutoNATClient(t) + defer hc.Close() + connect(t, c.host, hc) + + _, err := ac.DialBack(ctx, c.host.ID()) + if err == nil { + t.Fatal("Dial back succeeded unexpectedly!") + } + + if !IsDialError(err) { + t.Fatal(err) + } +} + +func TestAutoNATServiceDialSuccess(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeAutoNATConfig(t) + defer c.host.Close() + defer c.dialer.Close() + + _ = makeAutoNATService(t, c) + + hc, ac := makeAutoNATClient(t) + defer hc.Close() + connect(t, c.host, hc) + + _, err := ac.DialBack(ctx, c.host.ID()) + if err != nil { + t.Fatalf("Dial back failed: %s", err.Error()) + } +} + +func TestAutoNATServiceDialRateLimiter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeAutoNATConfig(t) + defer c.host.Close() + defer c.dialer.Close() + + c.dialTimeout = 200 * time.Millisecond + c.throttleResetPeriod = 200 * time.Millisecond + c.throttleResetJitter = 0 + c.throttlePeerMax = 1 + _ = makeAutoNATService(t, c) + + hc, ac := makeAutoNATClient(t) + defer hc.Close() + connect(t, c.host, hc) + + _, err := ac.DialBack(ctx, c.host.ID()) + if err != nil { + t.Fatal(err) + } + + _, err = ac.DialBack(ctx, c.host.ID()) + if err == nil { + t.Fatal("Dial back succeeded unexpectedly!") + } + + if !IsDialRefused(err) { + t.Fatal(err) + } + + time.Sleep(400 * time.Millisecond) + + _, err = ac.DialBack(ctx, c.host.ID()) + if err != nil { + t.Fatal(err) + } +} + +func TestAutoNATServiceGlobalLimiter(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := makeAutoNATConfig(t) + defer c.host.Close() + defer c.dialer.Close() + + c.dialTimeout = time.Second + c.throttleResetPeriod = 10 * time.Second + c.throttleResetJitter = 0 + c.throttlePeerMax = 1 + c.throttleGlobalMax = 5 + _ = makeAutoNATService(t, c) + + hs := c.host + + for i := 0; i < 5; i++ { + hc, ac := makeAutoNATClient(t) + connect(t, hs, hc) + + _, err := ac.DialBack(ctx, hs.ID()) + if err != nil { + t.Fatal(err) + } + } + + hc, ac := makeAutoNATClient(t) + defer hc.Close() + connect(t, hs, hc) + _, err := ac.DialBack(ctx, hs.ID()) + if err == nil { + t.Fatal("Dial back succeeded unexpectedly!") + } + + if !IsDialRefused(err) { + t.Fatal(err) + } +} + +func TestAutoNATServiceRateLimitJitter(t *testing.T) { + c := makeAutoNATConfig(t) + defer c.host.Close() + defer c.dialer.Close() + + dur := 100 * time.Millisecond + if os.Getenv("CI") != "" { + dur = 200 * time.Millisecond + } + + c.throttleResetPeriod = dur + c.throttleResetJitter = dur + c.throttleGlobalMax = 1 + svc := makeAutoNATService(t, c) + svc.mx.Lock() + svc.globalReqs = 1 + svc.mx.Unlock() + + require.Eventually(t, func() bool { + svc.mx.Lock() + defer svc.mx.Unlock() + return svc.globalReqs == 0 + }, dur*5/2, 10*time.Millisecond, "reset of rate limitter occured slower than expected") +} + +func TestAutoNATServiceStartup(t *testing.T) { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + defer h.Close() + dh := bhost.NewBlankHost(swarmt.GenSwarm(t)) + defer dh.Close() + an, err := New(h, EnableService(dh.Network())) + an.(*AmbientAutoNAT).config.dialPolicy.allowSelfDials = true + if err != nil { + t.Fatal(err) + } + + hc, ac := makeAutoNATClient(t) + connect(t, h, hc) + + _, err = ac.DialBack(context.Background(), h.ID()) + if err != nil { + t.Fatal("autonat service be active in unknown mode.") + } + + sub, _ := h.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + + anc := an.(*AmbientAutoNAT) + anc.recordObservation(autoNATResult{Reachability: network.ReachabilityPublic, address: ma.StringCast("/ip4/127.0.0.1/tcp/1234")}) + + <-sub.Out() + + _, err = ac.DialBack(context.Background(), h.ID()) + if err != nil { + t.Fatalf("autonat should be active, was %v", err) + } + if an.Status() != network.ReachabilityPublic { + t.Fatalf("autonat should report public, but didn't") + } +} diff --git a/p2p/host/autonat/test/autonat_test.go b/p2p/host/autonat/test/autonat_test.go new file mode 100644 index 0000000000..46a592e732 --- /dev/null +++ b/p2p/host/autonat/test/autonat_test.go @@ -0,0 +1,48 @@ +// This separate testing package helps to resolve a circular dependency potentially +// being created between libp2p and libp2p-autonat +package autonattest + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/p2p/host/autonat" + + "github.com/stretchr/testify/require" +) + +func TestAutonatRoundtrip(t *testing.T) { + t.Skip("this test doesn't work") + + // 3 hosts are used: [client] and [service + dialback dialer] + client, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.EnableNATService()) + require.NoError(t, err) + service, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")) + require.NoError(t, err) + dialback, err := libp2p.New(libp2p.NoListenAddrs) + require.NoError(t, err) + if _, err := autonat.New(service, autonat.EnableService(dialback.Network())); err != nil { + t.Fatal(err) + } + + client.Peerstore().AddAddrs(service.ID(), service.Addrs(), time.Hour) + require.NoError(t, client.Connect(context.Background(), service.Peerstore().PeerInfo(service.ID()))) + + cSub, err := client.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + require.NoError(t, err) + defer cSub.Close() + + select { + case stat := <-cSub.Out(): + if stat == network.ReachabilityUnknown { + t.Fatalf("After status update, client did not know its status") + } + case <-time.After(30 * time.Second): + t.Fatal("sub timed out.") + } +} diff --git a/p2p/host/autonat/test/dummy.go b/p2p/host/autonat/test/dummy.go new file mode 100644 index 0000000000..c3597f8160 --- /dev/null +++ b/p2p/host/autonat/test/dummy.go @@ -0,0 +1,3 @@ +package autonattest + +// needed so that go test ./... doesn't error diff --git a/p2p/host/autorelay/doc.go b/p2p/host/autorelay/doc.go index 2e93c3e8b8..4955dc5e1f 100644 --- a/p2p/host/autorelay/doc.go +++ b/p2p/host/autorelay/doc.go @@ -13,7 +13,7 @@ System Components: - The AutoRelay service. This is the service that actually: AutoNATService: https://github.com/libp2p/go-libp2p-autonat-svc -AutoNAT: https://github.com/libp2p/go-libp2p-autonat +AutoNAT: https://github.com/libp2p/go-libp2p/p2p/host/autonat How it works: - `AutoNATService` instances are instantiated in the bootstrappers (or other diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index c50d65837d..4c3f2be6f5 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -9,6 +9,7 @@ import ( "sync" "time" + autonat "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/pstoremanager" "github.com/libp2p/go-libp2p/p2p/host/relaysvc" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" @@ -28,7 +29,6 @@ import ( addrutil "github.com/libp2p/go-addr-util" "github.com/libp2p/go-eventbus" - autonat "github.com/libp2p/go-libp2p-autonat" inat "github.com/libp2p/go-libp2p-nat" "github.com/libp2p/go-netroute" diff --git a/p2p/host/basic/basic_host_test.go b/p2p/host/basic/basic_host_test.go index 9d05dfa1b5..74671fc818 100644 --- a/p2p/host/basic/basic_host_test.go +++ b/p2p/host/basic/basic_host_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/libp2p/go-eventbus" - autonat "github.com/libp2p/go-libp2p-autonat" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" @@ -22,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p-core/record" "github.com/libp2p/go-libp2p-core/test" swarmt "github.com/libp2p/go-libp2p-swarm/testing" + "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/protocol/identify" ma "github.com/multiformats/go-multiaddr"