Skip to content

Commit cc7e359

Browse files
committed
wip
1 parent 3a951e1 commit cc7e359

File tree

9 files changed

+174
-46
lines changed

9 files changed

+174
-46
lines changed

p2p/net/simconn/router.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package simconn
22

33
import (
44
"errors"
5+
"fmt"
56
"net"
67
"sync"
78
"time"
@@ -73,21 +74,28 @@ func (f *simpleNodeFirewall) IsPacketInAllowed(p Packet) bool {
7374
}
7475

7576
type SimpleFirewallRouter struct {
76-
nodes map[net.Addr]*simpleNodeFirewall
77+
nodes map[string]*simpleNodeFirewall
78+
}
79+
80+
func (r *SimpleFirewallRouter) String() string {
81+
return fmt.Sprintf("%+v", r.nodes)
7782
}
7883

7984
func (r *SimpleFirewallRouter) SendPacket(deadline time.Time, p Packet) error {
80-
toNode, exists := r.nodes[p.To]
85+
toNode, exists := r.nodes[p.To.String()]
8186
if !exists {
8287
return errors.New("unknown destination")
8388
}
8489

8590
// Record that this node is sending a packet to the destination
86-
fromNode, exists := r.nodes[p.From]
91+
fromNode, exists := r.nodes[p.From.String()]
8792
if !exists {
8893
return errors.New("unknown source")
8994
}
9095
fromNode.mu.Lock()
96+
if fromNode.packetsOutTo == nil {
97+
fromNode.packetsOutTo = make(map[string]struct{})
98+
}
9199
fromNode.packetsOutTo[p.To.String()] = struct{}{}
92100
fromNode.mu.Unlock()
93101

@@ -100,21 +108,27 @@ func (r *SimpleFirewallRouter) SendPacket(deadline time.Time, p Packet) error {
100108
}
101109

102110
func (r *SimpleFirewallRouter) AddNode(addr net.Addr, conn *SimConn) {
103-
r.nodes[addr] = &simpleNodeFirewall{
111+
if r.nodes == nil {
112+
r.nodes = make(map[string]*simpleNodeFirewall)
113+
}
114+
r.nodes[addr.String()] = &simpleNodeFirewall{
104115
packetsOutTo: make(map[string]struct{}),
105116
node: conn,
106117
}
107118
}
108119

109120
func (r *SimpleFirewallRouter) AddPublicNode(addr net.Addr, conn *SimConn) {
110-
r.nodes[addr] = &simpleNodeFirewall{
121+
r.nodes[addr.String()] = &simpleNodeFirewall{
111122
public: true,
112123
node: conn,
113124
}
114125
}
115126

116127
func (r *SimpleFirewallRouter) RemoveNode(addr net.Addr) {
117-
delete(r.nodes, addr)
128+
if r.nodes == nil {
129+
return
130+
}
131+
delete(r.nodes, addr.String())
118132
}
119133

120134
var _ Router = &SimpleFirewallRouter{}

p2p/net/simconn/simconn_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func TestSimConnDeadlinesWithLatency(t *testing.T) {
253253

254254
func TestSimpleHolePunch(t *testing.T) {
255255
router := &SimpleFirewallRouter{
256-
nodes: make(map[net.Addr]*simpleNodeFirewall),
256+
nodes: make(map[string]*simpleNodeFirewall),
257257
}
258258

259259
// Create two peers

p2p/protocol/holepunch/holepunch_test.go

+96-26
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package holepunch_test
22

33
import (
44
"context"
5+
"fmt"
56
"net"
6-
"slices"
77
"sync"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

@@ -14,12 +15,14 @@ import (
1415
"github.com/libp2p/go-libp2p/core/network"
1516
"github.com/libp2p/go-libp2p/core/peer"
1617
"github.com/libp2p/go-libp2p/core/peerstore"
18+
"github.com/libp2p/go-libp2p/p2p/net/simconn"
1719
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
1820
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
1921
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
2022
holepunch_pb "github.com/libp2p/go-libp2p/p2p/protocol/holepunch/pb"
2123
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
22-
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
24+
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
25+
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
2326

2427
"github.com/libp2p/go-msgio/pbio"
2528
ma "github.com/multiformats/go-multiaddr"
@@ -130,15 +133,34 @@ func TestDirectDialWorks(t *testing.T) {
130133
}
131134

132135
func TestEndToEndSimConnect(t *testing.T) {
133-
t.Skip("This test is broken. It is hard to do an end-to-end test without mocking the separate networks that holepunching is supposed to connect. It only worked previously because one of the hosts was able to learn about a non-holepunched direct connection via identify.")
134-
135136
h1tr := &mockEventTracer{}
136137
h2tr := &mockEventTracer{}
137138
h1, h2, relay, _ := makeRelayedHosts(t, []holepunch.Option{holepunch.WithTracer(h1tr)}, []holepunch.Option{holepunch.WithTracer(h2tr)}, true)
138139
defer h1.Close()
139140
defer h2.Close()
140141
defer relay.Close()
141142

143+
// time.Sleep(100 * time.Millisecond)
144+
p1 := ping.NewPingService(h1)
145+
require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
146+
ID: relay.ID(),
147+
Addrs: relay.Addrs(),
148+
}))
149+
// var raddr ma.Multiaddr
150+
// for _, a := range h2.Addrs() {
151+
// if _, err := a.ValueForProtocol(ma.P_CIRCUIT); err == nil {
152+
// raddr = a
153+
// break
154+
// }
155+
// }
156+
// require.NoError(t, h1.Connect(context.Background(), peer.AddrInfo{
157+
// ID: h2.ID(),
158+
// Addrs: []ma.Multiaddr{raddr},
159+
// }))
160+
res := p1.Ping(network.WithAllowLimitedConn(context.Background(), "test"), h2.ID())
161+
result := <-res
162+
require.NoError(t, result.Error)
163+
142164
// wait till a direct connection is complete
143165
ensureDirectConn(t, h1, h2)
144166
// ensure no hole-punching streams are open on either side
@@ -147,14 +169,15 @@ func TestEndToEndSimConnect(t *testing.T) {
147169
require.Eventually(t,
148170
func() bool {
149171
h2Events = h2tr.getEvents()
150-
return len(h2Events) == 3
172+
return len(h2Events) == 4
151173
},
152174
time.Second,
153-
10*time.Millisecond,
175+
100*time.Millisecond,
154176
)
155-
require.Equal(t, holepunch.StartHolePunchEvtT, h2Events[0].Type)
156-
require.Equal(t, holepunch.HolePunchAttemptEvtT, h2Events[1].Type)
157-
require.Equal(t, holepunch.EndHolePunchEvtT, h2Events[2].Type)
177+
require.Equal(t, holepunch.DirectDialEvtT, h2Events[0].Type)
178+
require.Equal(t, holepunch.StartHolePunchEvtT, h2Events[1].Type)
179+
require.Equal(t, holepunch.HolePunchAttemptEvtT, h2Events[2].Type)
180+
require.Equal(t, holepunch.EndHolePunchEvtT, h2Events[3].Type)
158181

159182
h1Events := h1tr.getEvents()
160183
// We don't really expect a hole-punched connection to be established in this test,
@@ -230,7 +253,7 @@ func TestFailuresOnInitiator(t *testing.T) {
230253
opts = append(opts, holepunch.WithAddrFilter(f))
231254
}
232255

233-
hps := addHolePunchService(t, h2, opts...)
256+
hps := addHolePunchService(t, h2, nil, opts...)
234257
// wait until the hole punching protocol has actually started
235258
require.Eventually(t, func() bool {
236259
protos, _ := h2.Peerstore().SupportsProtocols(h1.ID(), holepunch.Protocol)
@@ -409,7 +432,7 @@ func ensureDirectConn(t *testing.T, h1, h2 host.Host) {
409432
}, 5*time.Second, 50*time.Millisecond)
410433
}
411434

412-
func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
435+
func mkHostWithStaticAutoRelay(t *testing.T, ipAddr string, port int, relay host.Host, router *simconn.SimpleFirewallRouter) host.Host {
413436
if race.WithRace() {
414437
t.Skip("modifying manet.Private4 is racy")
415438
}
@@ -418,16 +441,13 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
418441
Addrs: relay.Addrs(),
419442
}
420443

421-
cpy := manet.Private4
422-
manet.Private4 = []*net.IPNet{}
423-
defer func() { manet.Private4 = cpy }()
424-
425444
h, err := libp2p.New(
426-
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
445+
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
427446
libp2p.EnableRelay(),
428447
libp2p.EnableAutoRelayWithStaticRelays([]peer.AddrInfo{pi}),
429448
libp2p.ForceReachabilityPrivate(),
430449
libp2p.ResourceManager(&network.NullResourceManager{}),
450+
quicReuseOpts(false, router),
431451
)
432452
require.NoError(t, err)
433453

@@ -443,23 +463,58 @@ func mkHostWithStaticAutoRelay(t *testing.T, relay host.Host) host.Host {
443463
return h
444464
}
445465

466+
var lastPort atomic.Uint32
467+
468+
type MockSourceIPSelector struct {
469+
ip atomic.Pointer[net.IP]
470+
}
471+
472+
func (m *MockSourceIPSelector) PreferredSourceIPForDestination(dst *net.UDPAddr) (net.IP, error) {
473+
return *m.ip.Load(), nil
474+
}
475+
476+
func quicReuseOpts(isPublic bool, router *simconn.SimpleFirewallRouter) libp2p.Option {
477+
m := &MockSourceIPSelector{}
478+
return libp2p.QUICReuse(
479+
quicreuse.NewConnManager,
480+
quicreuse.CustomSourceIPSelector(func() (quicreuse.SourceIPSelector, error) {
481+
return m, nil
482+
}),
483+
quicreuse.CustomListenUDP(func(network string, address *net.UDPAddr) (net.PacketConn, error) {
484+
m.ip.Store(&address.IP)
485+
if address.Port == 0 {
486+
address.Port = int(lastPort.Add(1))
487+
}
488+
c := simconn.NewSimConn(address, router)
489+
if isPublic {
490+
router.AddPublicNode(address, c)
491+
} else {
492+
router.AddNode(address, c)
493+
}
494+
return c, nil
495+
}))
496+
}
497+
446498
func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePuncher bool) (h1, h2, relay host.Host, hps *holepunch.Service) {
447499
t.Helper()
448-
h1, _ = mkHostWithHolePunchSvc(t, h1opt...)
500+
router := &simconn.SimpleFirewallRouter{}
501+
h1, _ = mkHostWithHolePunchSvc2(t, "2.0.0.1", 8001, router, h1opt...)
449502
var err error
503+
450504
relay, err = libp2p.New(
451-
libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/tcp/0")),
505+
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.1/udp/8000/quic-v1")),
452506
libp2p.DisableRelay(),
453507
libp2p.ResourceManager(&network.NullResourceManager{}),
508+
quicReuseOpts(true, router),
454509
)
455510
require.NoError(t, err)
456511
_, err = relayv2.New(relay)
457512
require.NoError(t, err)
458513

459514
// make sure the relay service is started and advertised by Identify
460515
h, err := libp2p.New(
461-
libp2p.NoListenAddrs,
462-
libp2p.Transport(tcp.NewTCPTransport),
516+
libp2p.ListenAddrs(ma.StringCast("/ip4/1.2.0.2/udp/8000/quic-v1")),
517+
quicReuseOpts(false, router),
463518
libp2p.DisableRelay(),
464519
)
465520
require.NoError(t, err)
@@ -470,9 +525,9 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
470525
return err == nil && len(supported) > 0
471526
}, 3*time.Second, 100*time.Millisecond)
472527

473-
h2 = mkHostWithStaticAutoRelay(t, relay)
528+
h2 = mkHostWithStaticAutoRelay(t, "2.0.0.2", 8002, relay, router)
474529
if addHolePuncher {
475-
hps = addHolePunchService(t, h2, h2opt...)
530+
hps = addHolePunchService(t, h2, []ma.Multiaddr{ma.StringCast("/ip4/2.0.0.2/udp/8002/quic-v1")}, h2opt...)
476531
}
477532

478533
// h2 has a relay addr
@@ -492,12 +547,14 @@ func makeRelayedHosts(t *testing.T, h1opt, h2opt []holepunch.Option, addHolePunc
492547
return
493548
}
494549

495-
func addHolePunchService(t *testing.T, h host.Host, opts ...holepunch.Option) *holepunch.Service {
550+
func addHolePunchService(t *testing.T, h host.Host, extraAddrs []ma.Multiaddr, opts ...holepunch.Option) *holepunch.Service {
496551
t.Helper()
497552
hps, err := holepunch.NewService(h, newMockIDService(t, h), func() []ma.Multiaddr {
498553
addrs := h.Addrs()
499-
addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
500-
return append(addrs, ma.StringCast("/ip4/1.2.3.4/tcp/1234"))
554+
addrs = append(addrs, extraAddrs...)
555+
return addrs
556+
// addrs = slices.DeleteFunc(addrs, func(a ma.Multiaddr) bool { return !manet.IsPublicAddr(a) })
557+
// return append(addrs, ma.StringCast("/ip4/1.2.3.4/tcp/1234"))
501558
}, opts...)
502559
require.NoError(t, err)
503560
return hps
@@ -511,6 +568,19 @@ func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host,
511568
libp2p.ResourceManager(&network.NullResourceManager{}),
512569
)
513570
require.NoError(t, err)
514-
hps := addHolePunchService(t, h, opts...)
571+
hps := addHolePunchService(t, h, nil, opts...)
572+
return h, hps
573+
}
574+
575+
func mkHostWithHolePunchSvc2(t *testing.T, ipAddr string, port int, router *simconn.SimpleFirewallRouter, opts ...holepunch.Option) (host.Host, *holepunch.Service) {
576+
t.Helper()
577+
h, err := libp2p.New(
578+
libp2p.ListenAddrs(ma.StringCast(fmt.Sprintf("/ip4/%s/udp/%d/quic-v1", ipAddr, port))),
579+
libp2p.ForceReachabilityPrivate(),
580+
libp2p.ResourceManager(&network.NullResourceManager{}),
581+
quicReuseOpts(false, router),
582+
)
583+
require.NoError(t, err)
584+
hps := addHolePunchService(t, h, nil, opts...)
515585
return h, hps
516586
}

p2p/protocol/holepunch/holepuncher.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func (hp *holePuncher) beginDirectConnect(p peer.ID) error {
9191
// It first attempts a direct dial (if we have a public address of that peer), and then
9292
// coordinates a hole punch over the given relay connection.
9393
func (hp *holePuncher) DirectConnect(p peer.ID) error {
94+
log.Debugw("beginDirectConnect", "host", hp.host.ID(), "peer", p)
9495
if err := hp.beginDirectConnect(p); err != nil {
9596
return err
9697
}
@@ -107,14 +108,17 @@ func (hp *holePuncher) DirectConnect(p peer.ID) error {
107108
func (hp *holePuncher) directConnect(rp peer.ID) error {
108109
// short-circuit check to see if we already have a direct connection
109110
if getDirectConnection(hp.host, rp) != nil {
111+
log.Debugw("already connected", "host", hp.host.ID(), "peer", rp)
110112
return nil
111113
}
114+
115+
log.Debugw("attempting direct dial", "host", hp.host.ID(), "peer", rp, "addrs", hp.host.Peerstore().Addrs(rp))
112116
// short-circuit hole punching if a direct dial works.
113117
// attempt a direct connection ONLY if we have a public address for the remote peer
114118
for _, a := range hp.host.Peerstore().Addrs(rp) {
115119
if !isRelayAddress(a) && manet.IsPublicAddr(a) {
116120
forceDirectConnCtx := network.WithForceDirectDial(hp.ctx, "hole-punching")
117-
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout)
121+
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, 100*time.Millisecond)
118122

119123
tstart := time.Now()
120124
// This dials *all* addresses, public and private, from the peerstore.
@@ -186,6 +190,7 @@ func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multi
186190
return nil, nil, 0, fmt.Errorf("failed to open hole-punching stream: %w", err)
187191
}
188192
defer str.Close()
193+
log.Debugf("initiateHolePunch: %s, %s", str.Conn().RemotePeer(), str.Conn().RemoteMultiaddr())
189194

190195
addr, obsAddr, rtt, err := hp.initiateHolePunchImpl(str)
191196
if err != nil {

p2p/protocol/holepunch/svc.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func NewService(h host.Host, ids identify.IDService, listenAddrs func() []ma.Mul
102102
func (s *Service) waitForPublicAddr() {
103103
defer s.refCount.Done()
104104

105-
log.Debug("waiting until we have at least one public address", "peer", s.host.ID())
105+
log.Debugw("waiting until we have at least one public address", "peer", s.host.ID())
106106

107107
// TODO: We should have an event here that fires when identify discovers a new
108108
// address.
@@ -114,7 +114,7 @@ func (s *Service) waitForPublicAddr() {
114114
defer t.Stop()
115115
for {
116116
if len(s.listenAddrs()) > 0 {
117-
log.Debug("Host now has a public address. Starting holepunch protocol.")
117+
log.Debugf("Host %s now has a public address. Starting holepunch protocol.", s.host.ID())
118118
s.host.SetStreamHandler(Protocol, s.handleNewStream)
119119
break
120120
}
@@ -185,12 +185,14 @@ func (s *Service) incomingHolePunch(str network.Stream) (rtt time.Duration, remo
185185

186186
str.SetDeadline(time.Now().Add(StreamTimeout))
187187

188+
log.Debugf("incomingHolePunch reading messsage")
188189
if err := rd.ReadMsg(msg); err != nil {
189190
return 0, nil, nil, fmt.Errorf("failed to read message from initiator: %w", err)
190191
}
191192
if t := msg.GetType(); t != pb.HolePunch_CONNECT {
192193
return 0, nil, nil, fmt.Errorf("expected CONNECT message from initiator but got %d", t)
193194
}
195+
log.Debugf("incomingHolePunch read CONNECT message")
194196

195197
obsDial := removeRelayAddrs(addrsFromBytes(msg.ObsAddrs))
196198
if s.filter != nil {
@@ -223,6 +225,8 @@ func (s *Service) incomingHolePunch(str network.Stream) (rtt time.Duration, remo
223225
}
224226

225227
func (s *Service) handleNewStream(str network.Stream) {
228+
log.Debugf("handleNewStream: %s, %s", str.Conn().RemotePeer(), str.Conn().RemoteMultiaddr())
229+
226230
// Check directionality of the underlying connection.
227231
// Peer A receives an inbound connection from peer B.
228232
// Peer A opens a new hole punch stream to peer B.

0 commit comments

Comments
 (0)