diff --git a/p2p/host/autonat/autonat.go b/p2p/host/autonat/autonat.go index 9dc352c722..fc8c6763b4 100644 --- a/p2p/host/autonat/autonat.go +++ b/p2p/host/autonat/autonat.go @@ -31,8 +31,8 @@ type AmbientAutoNAT struct { ctxCancel context.CancelFunc // is closed when Close is called backgroundRunning chan struct{} // is closed when the background go routine exits - inboundConn chan network.Conn - observations chan network.Reachability + inboundConn chan network.Conn + dialResponses chan error // status is an autoNATResult reflecting current status. status atomic.Pointer[network.Reachability] // Reflects the confidence on of the NATStatus being private, as a single @@ -107,7 +107,7 @@ func New(h host.Host, options ...Option) (AutoNAT, error) { host: h, config: conf, inboundConn: make(chan network.Conn, 5), - observations: make(chan network.Reachability, 1), + dialResponses: make(chan error, 1), emitReachabilityChanged: emitReachabilityChanged, service: service, @@ -168,6 +168,7 @@ func (as *AmbientAutoNAT) background() { timer := time.NewTimer(delay) defer timer.Stop() timerRunning := true + retryProbe := false for { select { // new inbound connection. @@ -198,15 +199,20 @@ func (as *AmbientAutoNAT) background() { } // probe finished. - case result, ok := <-as.observations: + case err, ok := <-as.dialResponses: if !ok { return } - as.recordObservation(result) + if IsDialRefused(err) { + retryProbe = true + } else { + as.handleDialResponse(err) + } case <-timer.C: peer := as.getPeerToProbe() as.tryProbe(peer) timerRunning = false + retryProbe = false case <-as.ctx.Done(): return } @@ -215,7 +221,7 @@ func (as *AmbientAutoNAT) background() { if timerRunning && !timer.Stop() { <-timer.C } - timer.Reset(as.scheduleProbe()) + timer.Reset(as.scheduleProbe(retryProbe)) timerRunning = true } } @@ -230,10 +236,11 @@ func (as *AmbientAutoNAT) cleanupRecentProbes() { } // scheduleProbe calculates when the next probe should be scheduled for. -func (as *AmbientAutoNAT) scheduleProbe() time.Duration { +func (as *AmbientAutoNAT) scheduleProbe(retryProbe bool) time.Duration { // Our baseline is a probe every 'AutoNATRefreshInterval' // This is modulated by: - // * if we are in an unknown state, or have low confidence, that should drop to 'AutoNATRetryInterval' + // * if we are in an unknown state, have low confidence, or we want to retry because a probe was refused that + // should drop to 'AutoNATRetryInterval' // * recent inbound connections (implying continued connectivity) should decrease the retry when public // * recent inbound connections when not public mean we should try more actively to see if we're public. fixedNow := time.Now() @@ -249,7 +256,9 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration { } if !as.lastProbe.IsZero() { untilNext := as.config.refreshInterval - if currentStatus == network.ReachabilityUnknown { + if retryProbe { + untilNext = as.config.retryInterval + } else if currentStatus == network.ReachabilityUnknown { untilNext = as.config.retryInterval } else if as.confidence < maxConfidence { untilNext = as.config.retryInterval @@ -269,8 +278,24 @@ func (as *AmbientAutoNAT) scheduleProbe() time.Duration { return nextProbe.Sub(fixedNow) } -// Update the current status based on an observed result. +// handleDialResponse updates the current status based on dial response. +func (as *AmbientAutoNAT) handleDialResponse(dialErr error) { + var observation network.Reachability + switch { + case dialErr == nil: + observation = network.ReachabilityPublic + case IsDialError(dialErr): + observation = network.ReachabilityPrivate + default: + observation = network.ReachabilityUnknown + } + + as.recordObservation(observation) +} + +// recordObservation updates NAT status and confidence func (as *AmbientAutoNAT) recordObservation(observation network.Reachability) { + currentStatus := *as.status.Load() if observation == network.ReachabilityPublic { @@ -359,21 +384,10 @@ func (as *AmbientAutoNAT) probe(pi *peer.AddrInfo) { defer cancel() err := cli.DialBack(ctx, pi.ID) - - var result network.Reachability - switch { - case err == nil: - log.Debugf("Dialback through %s successful", pi.ID.Pretty()) - result = network.ReachabilityPublic - case IsDialError(err): - log.Debugf("Dialback through %s failed", pi.ID.Pretty()) - result = network.ReachabilityPrivate - default: - result = network.ReachabilityUnknown - } + log.Debugf("Dialback through peer %s completed: err: %s", pi.ID, err) select { - case as.observations <- result: + case as.dialResponses <- err: case <-as.ctx.Done(): return } @@ -411,8 +425,7 @@ func (as *AmbientAutoNAT) getPeerToProbe() peer.ID { return "" } - shufflePeers(candidates) - return candidates[0] + return candidates[rand.Intn(len(candidates))] } func (as *AmbientAutoNAT) Close() error { @@ -424,13 +437,6 @@ func (as *AmbientAutoNAT) Close() error { return nil } -func shufflePeers(peers []peer.ID) { - for i := range peers { - j := rand.Intn(i + 1) - peers[i], peers[j] = peers[j], peers[i] - } -} - // Status returns the AutoNAT observed reachability status. func (s *StaticAutoNAT) Status() network.Reachability { return s.reachability diff --git a/p2p/host/autonat/autonat_test.go b/p2p/host/autonat/autonat_test.go index d173914814..617f6012af 100644 --- a/p2p/host/autonat/autonat_test.go +++ b/p2p/host/autonat/autonat_test.go @@ -42,6 +42,29 @@ func sayPrivateStreamHandler(t *testing.T) network.StreamHandler { } } +func makeAutoNATRefuseDialRequest(t *testing.T) host.Host { + h := bhost.NewBlankHost(swarmt.GenSwarm(t)) + h.SetStreamHandler(AutoNATProto, sayRefusedStreamHandler(t)) + return h +} + +func sayRefusedStreamHandler(t *testing.T) network.StreamHandler { + return func(s network.Stream) { + defer s.Close() + r := pbio.NewDelimitedReader(s, network.MessageSizeMax) + if err := r.ReadMsg(&pb.Message{}); err != nil { + t.Error(err) + return + } + w := pbio.NewDelimitedWriter(s) + res := pb.Message{ + Type: pb.Message_DIAL_RESPONSE.Enum(), + DialResponse: newDialResponseError(pb.Message_E_DIAL_REFUSED, "dial refused"), + } + w.WriteMsg(&res) + } +} + func makeAutoNATServicePublic(t *testing.T) host.Host { h := bhost.NewBlankHost(swarmt.GenSwarm(t)) h.SetStreamHandler(AutoNATProto, func(s network.Stream) { @@ -154,7 +177,7 @@ func TestAutoNATPublictoPrivate(t *testing.T) { // subscribe to AutoNat events s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) if err != nil { - t.Fatalf("failed to subscribe to event EvtLocalRoutabilityPublic, err=%s", err) + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) } if status := an.Status(); status != network.ReachabilityUnknown { @@ -195,6 +218,36 @@ func TestAutoNATIncomingEvents(t *testing.T) { }, 500*time.Millisecond, 10*time.Millisecond, "Expected probe due to identification of autonat service") } +func TestAutoNATDialRefused(t *testing.T) { + hs := makeAutoNATServicePublic(t) + defer hs.Close() + hc, an := makeAutoNAT(t, hs) + defer hc.Close() + defer an.Close() + + // subscribe to AutoNat events + s, err := hc.EventBus().Subscribe(&event.EvtLocalReachabilityChanged{}) + if err != nil { + t.Fatalf("failed to subscribe to event EvtLocalReachabilityChanged, err=%s", err) + } + + if status := an.Status(); status != network.ReachabilityUnknown { + t.Fatalf("unexpected NAT status: %d", status) + } + + connect(t, hs, hc) + expectEvent(t, s, network.ReachabilityPublic, 10*time.Second) + + hs.SetStreamHandler(AutoNATProto, sayRefusedStreamHandler(t)) + hps := makeAutoNATRefuseDialRequest(t) + connect(t, hps, hc) + identifyAsServer(hps, hc) + + require.Never(t, func() bool { + return an.Status() != network.ReachabilityPublic + }, 3*time.Second, 1*time.Second, "Expected probe to not change reachability from public") +} + func TestAutoNATObservationRecording(t *testing.T) { hs := makeAutoNATServicePublic(t) defer hs.Close()