diff --git a/config/config.go b/config/config.go index a9a264158a..880efc9ee5 100644 --- a/config/config.go +++ b/config/config.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p-core/transport" "github.com/libp2p/go-libp2p-peerstore/pstoremem" - drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/host/autonat" "github.com/libp2p/go-libp2p/p2p/host/autorelay" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" @@ -101,8 +100,8 @@ type Config struct { Routing RoutingC EnableAutoRelay bool + AutoRelayOpts []autorelay.Option AutoNATConfig - StaticRelayOpt autorelay.StaticRelayOption EnableHolePunching bool HolePunchingOptions []holepunch.Option @@ -270,7 +269,7 @@ func (cfg *Config) NewNode() (host.Host, error) { } } - // Note: h.AddrsFactory may be changed by AutoRelay, but non-relay version is + // Note: h.AddrsFactory may be changed by relayFinder, but non-relay version is // used by AutoNAT below. var ar *autorelay.AutoRelay addrF := h.AddrsFactory @@ -280,22 +279,7 @@ func (cfg *Config) NewNode() (host.Host, error) { return nil, fmt.Errorf("cannot enable autorelay; relay is not enabled") } - var opts []autorelay.Option - if cfg.StaticRelayOpt != nil { - opts = append(opts, autorelay.Option(cfg.StaticRelayOpt)) - } else { - if router == nil { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; no routing for discovery") - } - crouter, ok := router.(routing.ContentRouting) - if !ok { - h.Close() - return nil, fmt.Errorf("cannot enable autorelay; no suitable routing for discovery") - } - opts = append(opts, autorelay.WithDiscoverer(drouting.NewRoutingDiscovery(crouter))) - } - ar, err = autorelay.NewAutoRelay(h, router, opts...) + ar, err = autorelay.NewAutoRelay(h, cfg.AutoRelayOpts...) if err != nil { return nil, err } diff --git a/options.go b/options.go index 6821f4ed4f..ce40fa5d34 100644 --- a/options.go +++ b/options.go @@ -250,15 +250,10 @@ func EnableRelayService(opts ...relayv2.Option) Option { // // This subsystem performs automatic address rewriting to advertise relay addresses when it // detects that the node is publicly unreachable (e.g. behind a NAT). -func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option { +func EnableAutoRelay(opts ...autorelay.Option) Option { return func(cfg *Config) error { - if len(opts) > 0 { - if len(opts) > 1 { - return errors.New("only expected a single static relay configuration option") - } - cfg.StaticRelayOpt = opts[0] - } cfg.EnableAutoRelay = true + cfg.AutoRelayOpts = opts return nil } } @@ -269,7 +264,7 @@ func EnableAutoRelay(opts ...autorelay.StaticRelayOption) Option { // Deprecated: pass an autorelay.WithStaticRelays option to EnableAutoRelay. func StaticRelays(relays []peer.AddrInfo) Option { return func(cfg *Config) error { - cfg.StaticRelayOpt = autorelay.WithStaticRelays(relays) + cfg.AutoRelayOpts = append(cfg.AutoRelayOpts, autorelay.WithStaticRelays(relays)) return nil } } diff --git a/p2p/host/autorelay/autorelay.go b/p2p/host/autorelay/autorelay.go index 7f81fa954b..bcf8432c8e 100644 --- a/p2p/host/autorelay/autorelay.go +++ b/p2p/host/autorelay/autorelay.go @@ -2,532 +2,145 @@ package autorelay import ( "context" - "errors" - "fmt" - "math/rand" "sync" - "sync/atomic" - "time" - "golang.org/x/sync/errgroup" + basic "github.com/libp2p/go-libp2p/p2p/host/basic" - "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" - - basic "github.com/libp2p/go-libp2p/p2p/host/basic" - relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" - circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" - circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" + logging "github.com/ipfs/go-log/v2" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" -) - -const ( - RelayRendezvous = "/libp2p/relay" - - rsvpRefreshInterval = time.Minute - rsvpExpirationSlack = 2 * time.Minute - - autorelayTag = "autorelay" - - protoIDv1 = string(relayv1.ProtoID) - protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop) ) -var ( - DesiredRelays = 1 - - BootDelay = 20 * time.Second -) - -// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. -var DefaultRelays = []string{ - "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", - "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", - "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", - "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", - "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", - "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", -} - -var defaultStaticRelays []peer.AddrInfo - -func init() { - for _, s := range DefaultRelays { - pi, err := peer.AddrInfoFromString(s) - if err != nil { - panic(fmt.Sprintf("failed to initialize default static relays: %s", err)) - } - defaultStaticRelays = append(defaultStaticRelays, *pi) - } -} - -type Option func(*AutoRelay) error -type StaticRelayOption Option - -func WithStaticRelays(static []peer.AddrInfo) StaticRelayOption { - return func(r *AutoRelay) error { - if len(r.static) > 0 { - return errors.New("can't set static relays, static relays already configured") - } - r.static = static - return nil - } -} - -func WithDefaultStaticRelays() StaticRelayOption { - return WithStaticRelays(defaultStaticRelays) -} +var log = logging.Logger("autorelay") -func WithDiscoverer(discover discovery.Discoverer) Option { - return func(r *AutoRelay) error { - r.discover = discover - return nil - } -} - -// AutoRelay is a Host that uses relays for connectivity when a NAT is detected. type AutoRelay struct { - host *basic.BasicHost - discover discovery.Discoverer - router routing.PeerRouting - addrsF basic.AddrsFactory - - static []peer.AddrInfo - refCount sync.WaitGroup + ctx context.Context ctxCancel context.CancelFunc - relayFound chan struct{} - findRelaysRunning int32 // to be used as an atomic + conf *config mx sync.Mutex - relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay status network.Reachability - cachedAddrs []ma.Multiaddr - cachedAddrsExpiry time.Time + relayFinder *relayFinder + + peerChanOut chan peer.AddrInfo // capacity 20 + + host host.Host + addrsF basic.AddrsFactory } -func NewAutoRelay(bhost *basic.BasicHost, router routing.PeerRouting, opts ...Option) (*AutoRelay, error) { - ctx, cancel := context.WithCancel(context.Background()) - ar := &AutoRelay{ - ctxCancel: cancel, - host: bhost, - router: router, - addrsF: bhost.AddrsFactory, - relays: make(map[peer.ID]*circuitv2.Reservation), - relayFound: make(chan struct{}, 1), - status: network.ReachabilityUnknown, +func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) { + r := &AutoRelay{ + host: bhost, + addrsF: bhost.AddrsFactory, + status: network.ReachabilityUnknown, } + conf := defaultConfig for _, opt := range opts { - if err := opt(ar); err != nil { + if err := opt(&conf); err != nil { return nil, err } } - bhost.AddrsFactory = ar.hostAddrs - ar.refCount.Add(1) - go ar.background(ctx) - return ar, nil -} + r.ctx, r.ctxCancel = context.WithCancel(context.Background()) + r.peerChanOut = make(chan peer.AddrInfo, conf.maxCandidates) + r.conf = &conf + r.relayFinder = newRelayFinder(bhost, r.peerChanOut, &conf) + bhost.AddrsFactory = r.hostAddrs -func (ar *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - return ar.relayAddrs(ar.addrsF(addrs)) + r.refCount.Add(1) + go func() { + defer r.refCount.Done() + r.background() + }() + return r, nil } -func (ar *AutoRelay) background(ctx context.Context) { - defer ar.refCount.Done() - - subReachability, err := ar.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) +func (r *AutoRelay) background() { + subReachability, err := r.host.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) if err != nil { - log.Error("failed to subscribe to the EvtLocalReachabilityChanged") + log.Debug("failed to subscribe to the EvtLocalReachabilityChanged") return } defer subReachability.Close() - subConnectedness, err := ar.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) - if err != nil { - log.Error("failed to subscribe to the EvtPeerConnectednessChanged") - return - } - defer subConnectedness.Close() - ticker := time.NewTicker(rsvpRefreshInterval) - defer ticker.Stop() + var peerChan <-chan peer.AddrInfo + if len(r.conf.staticRelays) == 0 { + peerChan = r.conf.peerChan + } else { + pc := make(chan peer.AddrInfo) + peerChan = pc + r.refCount.Add(1) + go func() { + defer r.refCount.Done() + for _, sr := range r.conf.staticRelays { + select { + case pc <- sr: + case <-r.ctx.Done(): + return + } + } + }() + } for { - // when true, we need to identify push - var push bool - select { - case ev, ok := <-subConnectedness.Out(): - if !ok { - return - } - evt := ev.(event.EvtPeerConnectednessChanged) - switch evt.Connectedness { - case network.Connected: - // If we just connect to one of our static relays, get a reservation immediately. - for _, pi := range ar.static { - if pi.ID == evt.Peer { - rsvp, ok := ar.tryRelay(ctx, pi) - if ok { - ar.mx.Lock() - ar.relays[pi.ID] = rsvp - ar.mx.Unlock() - } - push = true - break - } - } - case network.NotConnected: - ar.mx.Lock() - if ar.usingRelay(evt.Peer) { // we were disconnected from a relay - delete(ar.relays, evt.Peer) - push = true - } - ar.mx.Unlock() - } + case <-r.ctx.Done(): + return case ev, ok := <-subReachability.Out(): if !ok { return } + // TODO: push changed addresses evt := ev.(event.EvtLocalReachabilityChanged) - - if evt.Reachability == network.ReachabilityPrivate { - // findRelays is a long-lived task (runs up to 2.5 minutes) - // Make sure we only start it once. - if atomic.CompareAndSwapInt32(&ar.findRelaysRunning, 0, 1) { - go func() { - defer atomic.StoreInt32(&ar.findRelaysRunning, 0) - ar.findRelays(ctx) - }() + switch evt.Reachability { + case network.ReachabilityPrivate, network.ReachabilityUnknown: + if err := r.relayFinder.Start(); err != nil { + log.Error("failed to start relay finder") } + case network.ReachabilityPublic: + r.relayFinder.Stop() } - - ar.mx.Lock() - // if our reachability changed - if ar.status != evt.Reachability && evt.Reachability != network.ReachabilityUnknown { - push = true - } - ar.status = evt.Reachability - ar.mx.Unlock() - case <-ar.relayFound: - push = true - case now := <-ticker.C: - push = ar.refreshReservations(ctx, now) - case <-ctx.Done(): - return - } - - if push { - ar.mx.Lock() - ar.cachedAddrs = nil - ar.mx.Unlock() - ar.host.SignalAddressChange() - } - } -} - -func (ar *AutoRelay) refreshReservations(ctx context.Context, now time.Time) bool { - ar.mx.Lock() - if ar.status == network.ReachabilityPublic { - // we are public, forget about the relays, unprotect peers - for p := range ar.relays { - ar.host.ConnManager().Unprotect(p, autorelayTag) - delete(ar.relays, p) - } - - ar.mx.Unlock() - return true - } - - if len(ar.relays) == 0 { - ar.mx.Unlock() - return false - } - - // find reservations about to expire and refresh them in parallel - g := new(errgroup.Group) - for p, rsvp := range ar.relays { - if rsvp == nil { - // this is a circuitv1 relay, there is no reservation - continue - } - if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { - continue - } - - p := p - g.Go(func() error { return ar.refreshRelayReservation(ctx, p) }) - } - ar.mx.Unlock() - - err := g.Wait() - return err != nil -} - -func (ar *AutoRelay) refreshRelayReservation(ctx context.Context, p peer.ID) error { - rsvp, err := circuitv2.Reserve(ctx, ar.host, peer.AddrInfo{ID: p}) - - ar.mx.Lock() - defer ar.mx.Unlock() - - if err != nil { - log.Debugf("failed to refresh relay slot reservation with %s: %s", p, err) - - delete(ar.relays, p) - // unprotect the connection - ar.host.ConnManager().Unprotect(p, autorelayTag) - } else { - log.Debugf("refreshed relay slot reservation with %s", p) - ar.relays[p] = rsvp - } - - return err -} - -func (ar *AutoRelay) findRelays(ctx context.Context) { - timer := time.NewTimer(30 * time.Second) - defer timer.Stop() - for retry := 0; retry < 5; retry++ { - if retry > 0 { - log.Debug("no relays connected; retrying in 30s") + r.mx.Lock() + r.status = evt.Reachability + r.mx.Unlock() + case pi := <-peerChan: select { - case <-timer.C: - case <-ctx.Done(): - return + case r.peerChanOut <- pi: // if there's space in the channel, great + default: + // no space left in the channel. Drop the oldest entry. + select { + case <-r.peerChanOut: + default: // The consumer might just have emptied the channel. Make sure we don't block in that case. + } + r.peerChanOut <- pi } } - - if foundAtLeastOneRelay := ar.findRelaysOnce(ctx); foundAtLeastOneRelay { - return - } - } -} - -func (ar *AutoRelay) findRelaysOnce(ctx context.Context) bool { - relays, err := ar.discoverRelays(ctx) - if err != nil { - log.Debugf("error discovering relays: %s", err) - return false - } - log.Debugf("discovered %d relays", len(relays)) - relays = ar.selectRelays(ctx, relays) - log.Debugf("selected %d relays", len(relays)) - - var found bool - for _, pi := range relays { - ar.mx.Lock() - relayInUse := ar.usingRelay(pi.ID) - ar.mx.Unlock() - if relayInUse { - continue - } - rsvp, ok := ar.tryRelay(ctx, pi) - if !ok { - continue - } - // make sure we're still connected. - if ar.host.Network().Connectedness(pi.ID) != network.Connected { - continue - } - found = true - ar.mx.Lock() - ar.relays[pi.ID] = rsvp - // protect the connection - ar.host.ConnManager().Protect(pi.ID, autorelayTag) - numRelays := len(ar.relays) - ar.mx.Unlock() - - if numRelays >= DesiredRelays { - break - } - } - if found { - ar.relayFound <- struct{}{} - return true - } - return false -} - -// usingRelay returns if we're currently using the given relay. -func (ar *AutoRelay) usingRelay(p peer.ID) bool { - _, ok := ar.relays[p] - return ok -} - -// addRelay adds the given relay to our set of relays. -// returns true when we add a new relay -func (ar *AutoRelay) tryRelay(ctx context.Context, pi peer.AddrInfo) (*circuitv2.Reservation, bool) { - if !ar.connect(ctx, pi) { - return nil, false - } - - protos, err := ar.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) - if err != nil { - log.Debugf("error checking relay protocol support for peer %s: %s", pi.ID, err) - return nil, false - } - - var supportsv1, supportsv2 bool -protoLoop: - for _, proto := range protos { - switch proto { - case protoIDv1: - supportsv1 = true - case protoIDv2: - supportsv2 = true - break protoLoop - } - } - - switch { - case supportsv2: - rsvp, err := circuitv2.Reserve(ctx, ar.host, pi) - if err != nil { - log.Debugf("error reserving slot with %s: %s", pi.ID, err) - return nil, false - } - return rsvp, true - case supportsv1: - ok, err := relayv1.CanHop(ctx, ar.host, pi.ID) - if err != nil { - log.Debugf("error querying relay %s for v1 hop: %s", pi.ID, err) - return nil, false - } - return nil, ok - default: // supports neither, unusable relay. - return nil, false - } -} - -func (ar *AutoRelay) connect(ctx context.Context, pi peer.AddrInfo) bool { - ctx, cancel := context.WithTimeout(ctx, 60*time.Second) - defer cancel() - - if len(pi.Addrs) == 0 { - var err error - pi, err = ar.router.FindPeer(ctx, pi.ID) - if err != nil { - log.Debugf("error finding relay peer %s: %s", pi.ID, err.Error()) - return false - } - } - - err := ar.host.Connect(ctx, pi) - if err != nil { - log.Debugf("error connecting to relay %s: %s", pi.ID, err.Error()) - return false - } - - // wait for identify to complete in at least one conn so that we can check the supported protocols - conns := ar.host.Network().ConnsToPeer(pi.ID) - if len(conns) == 0 { - return false - } - - ready := make(chan struct{}, len(conns)) - for _, conn := range conns { - go func(conn network.Conn) { - select { - case <-ar.host.IDService().IdentifyWait(conn): - ready <- struct{}{} - case <-ctx.Done(): - } - }(conn) - } - - select { - case <-ready: - case <-ctx.Done(): - return false - } - - return true -} - -func (ar *AutoRelay) discoverRelays(ctx context.Context) ([]peer.AddrInfo, error) { - if len(ar.static) > 0 { - return ar.static, nil } - - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - var ret []peer.AddrInfo - ch, err := ar.discover.FindPeers(ctx, RelayRendezvous, discovery.Limit(1000)) - if err != nil { - return nil, err - } - for p := range ch { - ret = append(ret, p) - } - return ret, nil } -func (ar *AutoRelay) selectRelays(ctx context.Context, pis []peer.AddrInfo) []peer.AddrInfo { - // TODO: better relay selection strategy; this just selects random relays, - // but we should probably use ping latency as the selection metric - rand.Shuffle(len(pis), func(i, j int) { - pis[i], pis[j] = pis[j], pis[i] - }) - return pis +func (r *AutoRelay) hostAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + return r.relayAddrs(r.addrsF(addrs)) } -// This function is computes the NATed relay addrs when our status is private: -// - The public addrs are removed from the address set. -// - The non-public addrs are included verbatim so that peers behind the same NAT/firewall -// can still dial us directly. -// - On top of those, we add the relay-specific addrs for the relays to which we are -// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr -// through which we can be dialed. -func (ar *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - ar.mx.Lock() - defer ar.mx.Unlock() +func (r *AutoRelay) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + r.mx.Lock() + defer r.mx.Unlock() - if ar.status != network.ReachabilityPrivate { + if r.status != network.ReachabilityPrivate { return addrs } - - if ar.cachedAddrs != nil && time.Now().Before(ar.cachedAddrsExpiry) { - return ar.cachedAddrs - } - - raddrs := make([]ma.Multiaddr, 0, 4*len(ar.relays)+4) - - // only keep private addrs from the original addr set - for _, addr := range addrs { - if manet.IsPrivateAddr(addr) { - raddrs = append(raddrs, addr) - } - } - - // add relay specific addrs to the list - for p := range ar.relays { - addrs := cleanupAddressSet(ar.host.Peerstore().Addrs(p)) - - circuit, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty())) - if err != nil { - panic(err) - } - - for _, addr := range addrs { - pub := addr.Encapsulate(circuit) - raddrs = append(raddrs, pub) - } - } - - ar.cachedAddrs = raddrs - ar.cachedAddrsExpiry = time.Now().Add(30 * time.Second) - - return raddrs + return r.relayFinder.relayAddrs(addrs) } -func (ar *AutoRelay) Close() error { - ar.ctxCancel() - ar.refCount.Wait() - return nil +func (r *AutoRelay) Close() error { + r.ctxCancel() + err := r.relayFinder.Stop() + r.refCount.Wait() + return err } diff --git a/p2p/host/autorelay/autorelay_test.go b/p2p/host/autorelay/autorelay_test.go index 1c836d720c..147630400f 100644 --- a/p2p/host/autorelay/autorelay_test.go +++ b/p2p/host/autorelay/autorelay_test.go @@ -1,132 +1,73 @@ package autorelay_test import ( - "context" - "net" "strings" - "sync" + "sync/atomic" "testing" "time" "github.com/libp2p/go-libp2p" - discovery "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/libp2p/go-libp2p/p2p/host/autorelay" relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" - "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/routing" - "github.com/ipfs/go-cid" ma "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) -// test specific parameters -func init() { - autorelay.BootDelay = 1 * time.Second - autorelay.AdvertiseBootDelay = 100 * time.Millisecond -} - -// mock routing -type mockRoutingTable struct { - mx sync.Mutex - providers map[string]map[peer.ID]peer.AddrInfo - peers map[peer.ID]peer.AddrInfo -} - -func (t *mockRoutingTable) NumPeers() int { - t.mx.Lock() - defer t.mx.Unlock() - return len(t.peers) -} - -type mockRouting struct { - h host.Host - tab *mockRoutingTable -} - -func newMockRoutingTable() *mockRoutingTable { - return &mockRoutingTable{providers: make(map[string]map[peer.ID]peer.AddrInfo)} -} - -func newMockRouting(h host.Host, tab *mockRoutingTable) *mockRouting { - return &mockRouting{h: h, tab: tab} -} - -func (m *mockRouting) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - pi, ok := m.tab.peers[p] - if !ok { - return peer.AddrInfo{}, routing.ErrNotFound - } - return pi, nil +func isRelayAddr(a ma.Multiaddr) (isRelay bool) { + ma.ForEach(a, func(c ma.Component) bool { + switch c.Protocol().Code { + case ma.P_CIRCUIT: + isRelay = true + return false + default: + return true + } + }) + return isRelay } -func (m *mockRouting) Provide(ctx context.Context, cid cid.Cid, bcast bool) error { - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - - pmap, ok := m.tab.providers[cid.String()] - if !ok { - pmap = make(map[peer.ID]peer.AddrInfo) - m.tab.providers[cid.String()] = pmap - } - - pi := peer.AddrInfo{ID: m.h.ID(), Addrs: m.h.Addrs()} - pmap[m.h.ID()] = pi - if m.tab.peers == nil { - m.tab.peers = make(map[peer.ID]peer.AddrInfo) - } - m.tab.peers[m.h.ID()] = pi - - return nil +func newPrivateNode(t *testing.T, opts ...autorelay.Option) host.Host { + t.Helper() + h, err := libp2p.New( + libp2p.ForceReachabilityPrivate(), + libp2p.EnableAutoRelay(opts...), + ) + require.NoError(t, err) + return h } -func (m *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, limit int) <-chan peer.AddrInfo { - ch := make(chan peer.AddrInfo) - go func() { - defer close(ch) - m.tab.mx.Lock() - defer m.tab.mx.Unlock() - - pmap, ok := m.tab.providers[cid.String()] - if !ok { - return - } - - for _, pi := range pmap { - select { - case ch <- pi: - case <-ctx.Done(): - return +func newRelay(t *testing.T) host.Host { + t.Helper() + h, err := libp2p.New( + libp2p.DisableRelay(), + libp2p.EnableRelayService(), + libp2p.ForceReachabilityPublic(), + libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { + for i, addr := range addrs { + saddr := addr.String() + if strings.HasPrefix(saddr, "/ip4/127.0.0.1/") { + addrNoIP := strings.TrimPrefix(saddr, "/ip4/127.0.0.1") + addrs[i] = ma.StringCast("/dns4/localhost" + addrNoIP) + } } - } - }() - - return ch -} - -func connect(t *testing.T, a, b host.Host) { - pinfo := peer.AddrInfo{ID: a.ID(), Addrs: a.Addrs()} - require.NoError(t, b.Connect(context.Background(), pinfo)) + return addrs + }), + ) + require.NoError(t, err) + return h } -// and the actual test! -func TestAutoRelay(t *testing.T) { - private4 := manet.Private4 - t.Cleanup(func() { manet.Private4 = private4 }) - manet.Private4 = []*net.IPNet{} - - // this is the relay host - // announce dns addrs because filter out private addresses from relays, - // and we consider dns addresses "public". - relayHost, err := libp2p.New( +// creates a node that speaks the relay v2 protocol, but doesn't accept any reservations for the first workAfter tries +func newBrokenRelay(t *testing.T, workAfter int) host.Host { + t.Helper() + h, err := libp2p.New( libp2p.DisableRelay(), libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { for i, addr := range addrs { @@ -137,87 +78,167 @@ func TestAutoRelay(t *testing.T) { } } return addrs - })) + }), + libp2p.EnableRelayService(), + ) require.NoError(t, err) - defer relayHost.Close() - - t.Run("with a circuitv1 relay", func(t *testing.T) { - r, err := relayv1.NewRelay(relayHost) - require.NoError(t, err) - defer r.Close() - testAutoRelay(t, relayHost) - }) - t.Run("testing autorelay with circuitv2 relay", func(t *testing.T) { - r, err := relayv2.New(relayHost) - require.NoError(t, err) - defer r.Close() - testAutoRelay(t, relayHost) + var n int32 + h.SetStreamHandler(circuitv2_proto.ProtoIDv2Hop, func(str network.Stream) { + str.Reset() + num := atomic.AddInt32(&n, 1) + if int(num) >= workAfter { + h.RemoveStreamHandler(circuitv2_proto.ProtoIDv2Hop) + r, err := relayv2.New(h) + require.NoError(t, err) + t.Cleanup(func() { r.Close() }) + } }) + return h } -func isRelayAddr(addr ma.Multiaddr) bool { - _, err := addr.ValueForProtocol(ma.P_CIRCUIT) - return err == nil +func TestSingleRelay(t *testing.T) { + const numPeers = 5 + peerChan := make(chan peer.AddrInfo) + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < numPeers; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} + } + }() + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithMaxCandidates(1), + autorelay.WithNumRelays(99999), + autorelay.WithBootDelay(0), + ) + defer h.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) + <-done + // test that we don't add any more relays + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) != 1 + }, 200*time.Millisecond, 50*time.Millisecond) } -func testAutoRelay(t *testing.T, relayHost host.Host) { - mtab := newMockRoutingTable() - makeRouting := func(h host.Host) (*mockRouting, error) { - mr := newMockRouting(h, mtab) - return mr, nil - } - makePeerRouting := func(h host.Host) (routing.PeerRouting, error) { - return makeRouting(h) - } +func TestPreferRelayV2(t *testing.T) { + r := newRelay(t) + defer r.Close() + // The relay supports both v1 and v2. The v1 stream handler should never be called, + // if we prefer v2 relays. + r.SetStreamHandler(relayv1.ProtoID, func(str network.Stream) { + str.Reset() + t.Fatal("used relay v1") + }) + peerChan := make(chan peer.AddrInfo, 1) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithMaxCandidates(1), + autorelay.WithNumRelays(99999), + autorelay.WithBootDelay(0), + ) + defer h.Close() + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) +} - // advertise the relay - relayRouting, err := makeRouting(relayHost) - require.NoError(t, err) - relayDiscovery := discovery.NewRoutingDiscovery(relayRouting) - autorelay.Advertise(context.Background(), relayDiscovery) - require.Eventually(t, func() bool { return mtab.NumPeers() > 0 }, time.Second, 10*time.Millisecond) +func TestWaitForCandidates(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithMinCandidates(2), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(time.Hour), + ) + defer h.Close() + + r1 := newRelay(t) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + + // make sure we don't add any relays yet + // We need to wait until we have at least 2 candidates before we connect. + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 200*time.Millisecond, 50*time.Millisecond) + + r2 := newRelay(t) + t.Cleanup(func() { r2.Close() }) + peerChan <- peer.AddrInfo{ID: r2.ID(), Addrs: r2.Addrs()} + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 3*time.Second, 100*time.Millisecond) +} - // the client hosts - h1, err := libp2p.New(libp2p.EnableRelay()) - require.NoError(t, err) - defer h1.Close() +func TestBackoff(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(0), + autorelay.WithBackoff(500*time.Millisecond), + ) + defer h.Close() + + r1 := newBrokenRelay(t, 1) + t.Cleanup(func() { r1.Close() }) + peerChan <- peer.AddrInfo{ID: r1.ID(), Addrs: r1.Addrs()} + + // make sure we don't add any relays yet + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 400*time.Millisecond, 50*time.Millisecond) + + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 400*time.Millisecond, 50*time.Millisecond) +} - h2, err := libp2p.New(libp2p.EnableRelay(), libp2p.EnableAutoRelay(), libp2p.Routing(makePeerRouting)) - require.NoError(t, err) - defer h2.Close() +func TestMaxBackoffs(t *testing.T) { + peerChan := make(chan peer.AddrInfo) + h := newPrivateNode(t, + autorelay.WithPeerSource(peerChan), + autorelay.WithNumRelays(1), + autorelay.WithBootDelay(0), + autorelay.WithBackoff(25*time.Millisecond), + autorelay.WithMaxAttempts(3), + ) + defer h.Close() + + r := newBrokenRelay(t, 4) + t.Cleanup(func() { r.Close() }) + peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()} + + // make sure we don't add any relays yet + require.Never(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 300*time.Millisecond, 50*time.Millisecond) +} - // verify that we don't advertise relay addrs initially - for _, addr := range h2.Addrs() { - if isRelayAddr(addr) { - t.Fatal("relay addr advertised before auto detection") - } +func TestStaticRelays(t *testing.T) { + const numRelays = 3 + var staticRelays []peer.AddrInfo + for i := 0; i < numRelays; i++ { + r := newRelay(t) + t.Cleanup(func() { r.Close() }) + staticRelays = append(staticRelays, peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}) } - // connect to AutoNAT, have it resolve to private. - connect(t, h1, h2) - privEmitter, _ := h2.EventBus().Emitter(new(event.EvtLocalReachabilityChanged)) - privEmitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate}) - - hasRelayAddrs := func(t *testing.T, addrs []ma.Multiaddr) bool { - unspecificRelay := ma.StringCast("/p2p-circuit") - for _, addr := range addrs { - if addr.Equal(unspecificRelay) { - t.Fatal("unspecific relay addr advertised") - } - if isRelayAddr(addr) { - return true - } - } - return false - } - // Wait for detection to do its magic - require.Eventually(t, func() bool { return hasRelayAddrs(t, h2.Addrs()) }, 3*time.Second, 10*time.Millisecond) - // verify that we have pushed relay addrs to connected peers - require.Eventually(t, func() bool { return hasRelayAddrs(t, h1.Peerstore().Addrs(h2.ID())) }, time.Second, 10*time.Millisecond, "no relay addrs pushed") + h := newPrivateNode(t, + autorelay.WithStaticRelays(staticRelays), + autorelay.WithNumRelays(1), + ) + defer h.Close() - // verify that we can connect through the relay - h3, err := libp2p.New(libp2p.EnableRelay()) - require.NoError(t, err) - defer h3.Close() - require.NoError(t, h3.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: ma.FilterAddrs(h2.Addrs(), isRelayAddr)})) + require.Eventually(t, func() bool { + return len(ma.FilterAddrs(h.Addrs(), isRelayAddr)) > 0 + }, 2*time.Second, 50*time.Millisecond) } diff --git a/p2p/host/autorelay/doc.go b/p2p/host/autorelay/doc.go deleted file mode 100644 index 4955dc5e1f..0000000000 --- a/p2p/host/autorelay/doc.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -The relay package contains the components necessary to implement the "autorelay" -feature. - -Warning: the internal interfaces are unstable. - -System Components: -- A discovery service to discover public relays. -- An AutoNAT client used to determine if the node is behind a NAT/firewall. -- One or more autonat services, instances of `AutoNATServices`. These are used - by the autonat client. -- One or more relays, instances of `RelayHost`. -- The AutoRelay service. This is the service that actually: - -AutoNATService: https://github.com/libp2p/go-libp2p-autonat-svc -AutoNAT: https://github.com/libp2p/go-libp2p/p2p/host/autonat - -How it works: -- `AutoNATService` instances are instantiated in the bootstrappers (or other - well known publicly reachable hosts) -- `AutoRelay`s are constructed with `libp2p.New(libp2p.Routing(makeDHT))` - They passively discover autonat service instances and test dialability of - their listen address set through them. When the presence of NAT is detected, - they discover relays through the DHT, connect to some of them and begin - advertising relay addresses. The new set of addresses is propagated to - connected peers through the `identify/push` protocol. -*/ -package autorelay diff --git a/p2p/host/autorelay/log.go b/p2p/host/autorelay/log.go deleted file mode 100644 index 9c4e5ed52c..0000000000 --- a/p2p/host/autorelay/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package autorelay - -import ( - logging "github.com/ipfs/go-log/v2" -) - -var log = logging.Logger("autorelay") diff --git a/p2p/host/autorelay/options.go b/p2p/host/autorelay/options.go new file mode 100644 index 0000000000..91be40084a --- /dev/null +++ b/p2p/host/autorelay/options.go @@ -0,0 +1,146 @@ +package autorelay + +import ( + "errors" + "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +type config struct { + peerChan <-chan peer.AddrInfo + staticRelays []peer.AddrInfo + // see WithMinCandidates + minCandidates int + // see WithMaxCandidates + maxCandidates int + // Delay until we obtain reservations with relays, if we have less than minCandidates candidates. + // See WithBootDelay. + bootDelay time.Duration + // backoff is the time we wait after failing to obtain a reservation with a candidate + backoff time.Duration + // If we fail to obtain a reservation more than maxAttempts, we stop trying. + maxAttempts int + // Number of relays we strive to obtain a reservation with. + desiredRelays int + setMinCandidates bool +} + +var defaultConfig = config{ + minCandidates: 4, + maxCandidates: 20, + bootDelay: 3 * time.Minute, + backoff: time.Hour, + maxAttempts: 3, + desiredRelays: 2, +} + +var errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays") + +// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022. +var DefaultRelays = []string{ + "/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.80.110/udp/4001/quic/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y", + "/ip4/147.75.195.153/tcp/4001/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.195.153/udp/4001/quic/p2p/QmW9m57aiBDHAkKj9nmFSEn7ZqrcF1fZS4bipsTCHburei", + "/ip4/147.75.70.221/tcp/4001/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", + "/ip4/147.75.70.221/udp/4001/quic/p2p/Qme8g49gm3q4Acp7xWBKg3nAa9fxZ1YmyDJdyGgoG6LsXh", +} + +var defaultStaticRelays []peer.AddrInfo + +func init() { + for _, s := range DefaultRelays { + pi, err := peer.AddrInfoFromString(s) + if err != nil { + panic(fmt.Sprintf("failed to initialize default static relays: %s", err)) + } + defaultStaticRelays = append(defaultStaticRelays, *pi) + } +} + +type Option func(*config) error + +func WithStaticRelays(static []peer.AddrInfo) Option { + return func(c *config) error { + if c.setMinCandidates { + return errStaticRelaysMinCandidates + } + if len(c.staticRelays) > 0 { + return errors.New("can't set static relays, static relays already configured") + } + c.minCandidates = len(static) + c.staticRelays = static + return nil + } +} + +func WithDefaultStaticRelays() Option { + return WithStaticRelays(defaultStaticRelays) +} + +func WithPeerSource(peerChan <-chan peer.AddrInfo) Option { + return func(c *config) error { + c.peerChan = peerChan + return nil + } +} + +// WithNumRelays sets the number of relays we strive to obtain reservations with. +func WithNumRelays(n int) Option { + return func(c *config) error { + c.desiredRelays = n + return nil + } +} + +// WithMaxCandidates sets the number of relay candidates that we buffer. +func WithMaxCandidates(n int) Option { + return func(c *config) error { + c.maxCandidates = n + return nil + } +} + +// WithMinCandidates sets the minimum number of relay candidates we collect before to get a reservation +// with any of them (unless we've been running for longer than the boot delay). +// This is to make sure that we don't just randomly connect to the first candidate that we discover. +func WithMinCandidates(n int) Option { + return func(c *config) error { + if len(c.staticRelays) > 0 { + return errStaticRelaysMinCandidates + } + c.minCandidates = n + c.setMinCandidates = true + return nil + } +} + +// WithBootDelay set the boot delay for finding relays. +// We won't attempt any reservation if we've have less than a minimum number of candidates. +// This prevents us to connect to the "first best" relay, and allows us to carefully select the relay. +// However, in case we haven't found enough relays after the boot delay, we use what we have. +func WithBootDelay(d time.Duration) Option { + return func(c *config) error { + c.bootDelay = d + return nil + } +} + +// WithBackoff sets the time we wait after failing to obtain a reservation with a candidate. +func WithBackoff(d time.Duration) Option { + return func(c *config) error { + c.backoff = d + return nil + } +} + +// WithMaxAttempts sets the number of times we attempt to obtain a reservation with a candidate. +// If we still fail to obtain a reservation, this candidate is dropped. +func WithMaxAttempts(n int) Option { + return func(c *config) error { + c.maxAttempts = n + return nil + } +} diff --git a/p2p/host/autorelay/relay.go b/p2p/host/autorelay/relay.go index 4d7fb0adee..db0d97ec01 100644 --- a/p2p/host/autorelay/relay.go +++ b/p2p/host/autorelay/relay.go @@ -1,55 +1,9 @@ package autorelay import ( - "context" - "time" - - "github.com/libp2p/go-libp2p-core/discovery" - ma "github.com/multiformats/go-multiaddr" ) -var ( - // this is purposefully long to require some node stability before advertising as a relay - AdvertiseBootDelay = 15 * time.Minute - AdvertiseTTL = 30 * time.Minute -) - -// Advertise advertises this node as a libp2p relay. -func Advertise(ctx context.Context, advertise discovery.Advertiser) { - go func() { - select { - case <-time.After(AdvertiseBootDelay): - go func() { - for { - ttl, err := advertise.Advertise(ctx, RelayRendezvous, discovery.TTL(AdvertiseTTL)) - if err != nil { - log.Debugf("Error advertising %s: %s", RelayRendezvous, err.Error()) - if ctx.Err() != nil { - return - } - - select { - case <-time.After(2 * time.Minute): - continue - case <-ctx.Done(): - return - } - } - - wait := 7 * ttl / 8 - select { - case <-time.After(wait): - case <-ctx.Done(): - return - } - } - }() - case <-ctx.Done(): - } - }() -} - // Filter filters out all relay addresses. func Filter(addrs []ma.Multiaddr) []ma.Multiaddr { raddrs := make([]ma.Multiaddr, 0, len(addrs)) diff --git a/p2p/host/autorelay/relay_finder.go b/p2p/host/autorelay/relay_finder.go new file mode 100644 index 0000000000..f803fcf7c9 --- /dev/null +++ b/p2p/host/autorelay/relay_finder.go @@ -0,0 +1,548 @@ +package autorelay + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + relayv1 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv1/relay" + circuitv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" + circuitv2_proto "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" + + "github.com/libp2p/go-libp2p-core/event" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +const ( + protoIDv1 = string(relayv1.ProtoID) + protoIDv2 = string(circuitv2_proto.ProtoIDv2Hop) +) + +// Terminology: +// Candidate: Once we connect to a node and it supports (v1 / v2) relay protocol, +// we call it a candidate, and consider using it as a relay. +// Relay: Out of the list of candidates, we select a relay to connect to. +// Currently, we just randomly select a candidate, but we can employ more sophisticated +// selection strategies here (e.g. by facotring in the RTT). + +const ( + rsvpRefreshInterval = time.Minute + rsvpExpirationSlack = 2 * time.Minute + + autorelayTag = "autorelay" +) + +type candidate struct { + added time.Time + supportsRelayV2 bool + ai peer.AddrInfo + numAttempts int +} + +type candidateOnBackoff struct { + candidate + nextConnAttempt time.Time +} + +// relayFinder is a Host that uses relays for connectivity when a NAT is detected. +type relayFinder struct { + bootTime time.Time + host *basic.BasicHost + + conf *config + + refCount sync.WaitGroup + ctxCancel context.CancelFunc + + peerChan <-chan peer.AddrInfo + + candidateFound chan struct{} // receives every time we find a new relay candidate + candidateMx sync.Mutex + candidates map[peer.ID]*candidate + candidatesOnBackoff []*candidateOnBackoff // this slice is always sorted by the nextConnAttempt time + + relayUpdated chan struct{} + + relayMx sync.Mutex + relays map[peer.ID]*circuitv2.Reservation // rsvp will be nil if it is a v1 relay + + cachedAddrs []ma.Multiaddr + cachedAddrsExpiry time.Time +} + +func newRelayFinder(host *basic.BasicHost, peerChan <-chan peer.AddrInfo, conf *config) *relayFinder { + r := &relayFinder{ + bootTime: time.Now(), + host: host, + conf: conf, + peerChan: peerChan, + candidates: make(map[peer.ID]*candidate), + candidateFound: make(chan struct{}, 1), + relays: make(map[peer.ID]*circuitv2.Reservation), + relayUpdated: make(chan struct{}, 1), + } + return r +} + +func (rf *relayFinder) background(ctx context.Context) { + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.findNodes(ctx) + }() + + subConnectedness, err := rf.host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + log.Error("failed to subscribe to the EvtPeerConnectednessChanged") + return + } + defer subConnectedness.Close() + + bootDelayTimer := time.NewTimer(rf.conf.bootDelay) + defer bootDelayTimer.Stop() + refreshTicker := time.NewTicker(rsvpRefreshInterval) + defer refreshTicker.Stop() + backoffTicker := time.NewTicker(rf.conf.backoff / 5) + defer backoffTicker.Stop() + + for { + // when true, we need to identify push + var push bool + + select { + case ev, ok := <-subConnectedness.Out(): + if !ok { + return + } + evt := ev.(event.EvtPeerConnectednessChanged) + if evt.Connectedness != network.NotConnected { + continue + } + rf.relayMx.Lock() + if rf.usingRelay(evt.Peer) { // we were disconnected from a relay + log.Debugw("disconnected from relay", "id", evt.Peer) + delete(rf.relays, evt.Peer) + push = true + } + rf.relayMx.Unlock() + case <-rf.candidateFound: + rf.handleNewCandidate(ctx) + case <-bootDelayTimer.C: + rf.handleNewCandidate(ctx) + case <-rf.relayUpdated: + push = true + case now := <-refreshTicker.C: + push = rf.refreshReservations(ctx, now) + case now := <-backoffTicker.C: + rf.checkForCandidatesOnBackoff(now) + case <-ctx.Done(): + return + } + + if push { + rf.relayMx.Lock() + rf.cachedAddrs = nil + rf.relayMx.Unlock() + rf.host.SignalAddressChange() + } + } +} + +// findNodes accepts nodes from the channel and tests if they support relaying. +// It is run on both public and private nodes. +// It garbage collects old entries, so that nodes doesn't overflow. +// This makes sure that as soon as we need to find relay candidates, we have them available. +func (rf *relayFinder) findNodes(ctx context.Context) { + for { + select { + case pi := <-rf.peerChan: + log.Debugw("found node", "id", pi.ID) + rf.candidateMx.Lock() + numCandidates := len(rf.candidates) + rf.candidateMx.Unlock() + if numCandidates >= rf.conf.maxCandidates { + log.Debugw("skipping node. Already have enough candidates", "id", pi.ID, "num", numCandidates, "max", rf.conf.maxCandidates) + continue + } + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.handleNewNode(ctx, pi) + }() + case <-ctx.Done(): + return + } + } +} + +func (rf *relayFinder) notifyNewCandidate() { + select { + case rf.candidateFound <- struct{}{}: + default: + } +} + +// handleNewNode tests if a peer supports circuit v1 or v2. +// This method is only run on private nodes. +// If a peer does, it is added to the candidates map. +// Note that just supporting the protocol doesn't guarantee that we can also obtain a reservation. +func (rf *relayFinder) handleNewNode(ctx context.Context, pi peer.AddrInfo) { + rf.relayMx.Lock() + relayInUse := rf.usingRelay(pi.ID) + rf.relayMx.Unlock() + if relayInUse { + return + } + + ctx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + supportsV2, err := rf.tryNode(ctx, pi) + if err != nil { + log.Debugf("node %s not accepted as a candidate: %s", pi.ID, err) + return + } + rf.candidateMx.Lock() + if len(rf.candidates) > rf.conf.maxCandidates { + rf.candidateMx.Unlock() + return + } + log.Debugw("node supports relay protocol", "peer", pi.ID, "supports circuit v2", supportsV2) + rf.candidates[pi.ID] = &candidate{ai: pi, supportsRelayV2: supportsV2} + rf.candidateMx.Unlock() + + rf.notifyNewCandidate() +} + +// tryNode checks if a peer actually supports either circuit v1 or circuit v2. +// It does not modify any internal state. +func (rf *relayFinder) tryNode(ctx context.Context, pi peer.AddrInfo) (supportsRelayV2 bool, err error) { + if err := rf.host.Connect(ctx, pi); err != nil { + return false, fmt.Errorf("error connecting to relay %s: %w", pi.ID, err) + } + + conns := rf.host.Network().ConnsToPeer(pi.ID) + for _, conn := range conns { + if isRelayAddr(conn.RemoteMultiaddr()) { + return false, errors.New("not a public node") + } + } + + // wait for identify to complete in at least one conn so that we can check the supported protocols + ready := make(chan struct{}, 1) + for _, conn := range conns { + go func(conn network.Conn) { + select { + case <-rf.host.IDService().IdentifyWait(conn): + select { + case ready <- struct{}{}: + default: + } + case <-ctx.Done(): + } + }(conn) + } + + select { + case <-ready: + case <-ctx.Done(): + return false, ctx.Err() + } + + protos, err := rf.host.Peerstore().SupportsProtocols(pi.ID, protoIDv1, protoIDv2) + if err != nil { + return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", pi.ID, err) + } + + // If the node speaks both, prefer circuit v2 + var supportsV1, supportsV2 bool + for _, proto := range protos { + switch proto { + case protoIDv1: + supportsV1 = true + case protoIDv2: + supportsV2 = true + } + } + if !supportsV1 && !supportsV2 { + return false, errors.New("doesn't speak circuit v1 or v2") + } + return supportsV2, nil +} + +func (rf *relayFinder) handleNewCandidate(ctx context.Context) { + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + if len(rf.candidates) == 0 { + return + } + // We're already connected to our desired number of relays. Nothing to do here. + if len(rf.relays) == rf.conf.desiredRelays { + return + } + + if len(rf.conf.staticRelays) != 0 { + // make sure we read all static relays before continuing + if len(rf.peerChan) > 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + return + } + } else if len(rf.relays) == 0 && len(rf.candidates) < rf.conf.minCandidates && time.Since(rf.bootTime) < rf.conf.bootDelay { + // During the startup phase, we don't want to connect to the first candidate that we find. + // Instead, we wait until we've found at least minCandidates, and then select the best of those. + // However, if that takes too long (longer than bootDelay), we still go ahead. + return + } + + candidates := rf.selectCandidates() + + // We now iterate over the candidates, attempting (sequentially) to get reservations with them, until + // we reach the desired number of relays. + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + + for _, cand := range candidates { + id := cand.ai.ID + var failed bool + var rsvp *circuitv2.Reservation + + // make sure we're still connected. + if rf.host.Network().Connectedness(id) != network.Connected { + if err := rf.host.Connect(ctx, cand.ai); err != nil { + log.Debugw("failed to reconnect", "peer", cand.ai, "error", err) + rf.candidateMx.Lock() + delete(rf.candidates, cand.ai.ID) + rf.candidateMx.Unlock() + continue + } + } + if cand.supportsRelayV2 { + var err error + rsvp, err = circuitv2.Reserve(ctx, rf.host, cand.ai) + if err != nil { + failed = true + log.Debugw("failed to reserve slot", "id", id, "error", err) + } + } else { + ok, err := relayv1.CanHop(ctx, rf.host, id) + if err != nil { + failed = true + log.Debugw("error querying relay for v1 hop", "id", id, "error", err) + } + if !ok { + failed = true + log.Debugw("relay can't hop", "id", id) + } + } + rf.candidateMx.Lock() + if failed { + cand.numAttempts++ + delete(rf.candidates, id) + // We failed to obtain a reservation for too many times. We give up. + if cand.numAttempts >= rf.conf.maxAttempts { + log.Debugw("failed to obtain a reservation with. Giving up.", "id", id, "num attempts", cand.numAttempts) + } else { + rf.moveCandidateToBackoff(cand) + } + rf.candidateMx.Unlock() + continue + } + rf.candidateMx.Unlock() + log.Debugw("adding new relay", "id", id) + rf.relayMx.Lock() + rf.relays[id] = rsvp + numRelays := len(rf.relays) + rf.relayMx.Unlock() + + rf.host.ConnManager().Protect(id, autorelayTag) // protect the connection + + select { + case rf.relayUpdated <- struct{}{}: + default: + } + if numRelays >= rf.conf.desiredRelays { + break + } + } + }() +} + +// must be called with mutex locked +func (rf *relayFinder) moveCandidateToBackoff(cand *candidate) { + if len(rf.candidatesOnBackoff) >= rf.conf.maxCandidates { + log.Debugw("already have enough candidates on backoff. Dropping.", "id", cand.ai.ID) + return + } + log.Debugw("moving candidate to backoff", "id", cand.ai.ID) + backoff := rf.conf.backoff * (1 << (cand.numAttempts - 1)) + // introduce a bit of jitter + backoff = (backoff * time.Duration(16+rand.Intn(8))) / time.Duration(20) + rf.candidatesOnBackoff = append(rf.candidatesOnBackoff, &candidateOnBackoff{ + candidate: *cand, + nextConnAttempt: time.Now().Add(backoff), + }) +} + +func (rf *relayFinder) checkForCandidatesOnBackoff(now time.Time) { + rf.candidateMx.Lock() + defer rf.candidateMx.Unlock() + + for _, cand := range rf.candidatesOnBackoff { + if cand.nextConnAttempt.After(now) { + break + } + if len(rf.candidates) >= rf.conf.maxCandidates { + // drop this candidate if we already have enough others + log.Debugw("cannot move backoff'ed candidate back. Already have enough candidates.", "id", cand.ai.ID) + } else { + log.Debugw("moving backoff'ed candidate back", "id", cand.ai.ID) + rf.candidates[cand.ai.ID] = &candidate{ + added: cand.added, + supportsRelayV2: cand.supportsRelayV2, + ai: cand.ai, + numAttempts: cand.numAttempts, + } + rf.notifyNewCandidate() + } + rf.candidatesOnBackoff = rf.candidatesOnBackoff[1:] + } +} + +func (rf *relayFinder) refreshReservations(ctx context.Context, now time.Time) bool { + rf.relayMx.Lock() + + // find reservations about to expire and refresh them in parallel + g := new(errgroup.Group) + for p, rsvp := range rf.relays { + if rsvp == nil { // this is a circuit v1 relay, there is no reservation + continue + } + if now.Add(rsvpExpirationSlack).Before(rsvp.Expiration) { + continue + } + + p := p + g.Go(func() error { return rf.refreshRelayReservation(ctx, p) }) + } + rf.relayMx.Unlock() + + err := g.Wait() + return err != nil +} + +func (rf *relayFinder) refreshRelayReservation(ctx context.Context, p peer.ID) error { + rsvp, err := circuitv2.Reserve(ctx, rf.host, peer.AddrInfo{ID: p}) + + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + + if err != nil { + log.Debugw("failed to refresh relay slot reservation", "relay", p, "error", err) + + delete(rf.relays, p) + // unprotect the connection + rf.host.ConnManager().Unprotect(p, autorelayTag) + return err + } + + log.Debugw("refreshed relay slot reservation", "relay", p) + rf.relays[p] = rsvp + return nil +} + +// usingRelay returns if we're currently using the given relay. +func (rf *relayFinder) usingRelay(p peer.ID) bool { + _, ok := rf.relays[p] + return ok +} + +// selectCandidates returns an ordered slice of relay candidates. +// Callers should attempt to obtain reservations with the candidates in this order. +func (rf *relayFinder) selectCandidates() []*candidate { + rf.candidateMx.Lock() + var candidates []*candidate + for _, cand := range rf.candidates { + candidates = append(candidates, cand) + } + rf.candidateMx.Unlock() + + // TODO: better relay selection strategy; this just selects random relays, + // but we should probably use ping latency as the selection metric + rand.Shuffle(len(candidates), func(i, j int) { + candidates[i], candidates[j] = candidates[j], candidates[i] + }) + return candidates +} + +// This function is computes the NATed relay addrs when our status is private: +// - The public addrs are removed from the address set. +// - The non-public addrs are included verbatim so that peers behind the same NAT/firewall +// can still dial us directly. +// - On top of those, we add the relay-specific addrs for the relays to which we are +// connected. For each non-private relay addr, we encapsulate the p2p-circuit addr +// through which we can be dialed. +func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + rf.relayMx.Lock() + defer rf.relayMx.Unlock() + + if rf.cachedAddrs != nil && time.Now().Before(rf.cachedAddrsExpiry) { + return rf.cachedAddrs + } + + raddrs := make([]ma.Multiaddr, 0, 4*len(rf.relays)+4) + + // only keep private addrs from the original addr set + for _, addr := range addrs { + if manet.IsPrivateAddr(addr) { + raddrs = append(raddrs, addr) + } + } + + // add relay specific addrs to the list + for p := range rf.relays { + addrs := cleanupAddressSet(rf.host.Peerstore().Addrs(p)) + + circuit := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", p.Pretty())) + for _, addr := range addrs { + pub := addr.Encapsulate(circuit) + raddrs = append(raddrs, pub) + } + } + + rf.cachedAddrs = raddrs + rf.cachedAddrsExpiry = time.Now().Add(30 * time.Second) + + return raddrs +} + +func (rf *relayFinder) Start() error { + if rf.ctxCancel != nil { + return errors.New("relayFinder already running") + } + log.Debug("starting relay finder") + ctx, cancel := context.WithCancel(context.Background()) + rf.ctxCancel = cancel + rf.refCount.Add(1) + go func() { + defer rf.refCount.Done() + rf.background(ctx) + }() + return nil +} + +func (rf *relayFinder) Stop() error { + log.Debug("stopping relay finder") + rf.ctxCancel() + rf.refCount.Wait() + rf.ctxCancel = nil + return nil +}