Skip to content

Commit

Permalink
basic_host: close swarm on Close (#2916)
Browse files Browse the repository at this point in the history
Using the `BasicHost` constructor transfers the ownership of the swarm.
This is similar to how using `libp2p.New` transfers the ownership of
user provided config options like `ResourceManager`, all of which are
closed on `host.Close`
  • Loading branch information
sukunrt committed Aug 15, 2024
1 parent e276a6e commit 8a11b7c
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 30 deletions.
20 changes: 9 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts = append(fxopts, cfg.QUICReuse...)
} else {
fxopts = append(fxopts,
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, _ *swarm.Swarm, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
fx.Provide(func(key quic.StatelessResetKey, tokenGenerator quic.TokenGeneratorKey, lifecycle fx.Lifecycle) (*quicreuse.ConnManager, error) {
var opts []quicreuse.Option
if !cfg.DisableMetrics {
opts = append(opts, quicreuse.EnableMetrics(cfg.PrometheusRegisterer))
Expand Down Expand Up @@ -469,18 +469,17 @@ func (cfg *Config) NewNode() (host.Host, error) {
fx.Provide(func() event.Bus {
return eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer(eventbus.WithRegisterer(cfg.PrometheusRegisterer))))
}),
fx.Provide(func(eventBus event.Bus, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.StopHook(sw.Close))
return sw, nil
fx.Provide(func() crypto.PrivKey {
return cfg.PeerKey
}),
// Make sure the swarm constructor depends on the quicreuse.ConnManager.
// That way, the ConnManager will be started before the swarm, and more importantly,
// the swarm will be stopped before the ConnManager.
fx.Decorate(func(sw *swarm.Swarm, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) *swarm.Swarm {
fx.Provide(func(eventBus event.Bus, _ *quicreuse.ConnManager, lifecycle fx.Lifecycle) (*swarm.Swarm, error) {
sw, err := cfg.makeSwarm(eventBus, !cfg.DisableMetrics)
if err != nil {
return nil, err
}
lifecycle.Append(fx.Hook{
OnStart: func(context.Context) error {
// TODO: This method succeeds if listening on one address succeeds. We
Expand All @@ -491,14 +490,13 @@ func (cfg *Config) NewNode() (host.Host, error) {
return sw.Close()
},
})
return sw
return sw, nil
}),
fx.Provide(cfg.newBasicHost),
fx.Provide(func(bh *bhost.BasicHost) host.Host {
return bh
}),
fx.Provide(func(h *swarm.Swarm) peer.ID { return h.LocalPeer() }),
fx.Provide(func(h *swarm.Swarm) crypto.PrivKey { return h.Peerstore().PrivKey(h.LocalPeer()) }),
}
transportOpts, err := cfg.addTransports()
if err != nil {
Expand Down
28 changes: 10 additions & 18 deletions libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -429,24 +428,15 @@ func TestMain(m *testing.M) {
}

func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
relay, err := New(EnableRelayService())
relay, err := New(EnableRelayService(), ForceReachabilityPublic())
require.NoError(t, err)
defer relay.Close()

// Fake that the relay is publicly reachable
emitterForRelay, err := relay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
require.NoError(t, err)
defer emitterForRelay.Close()
emitterForRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})

peerBehindRelay, err := New(EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}))
peerBehindRelay, err := New(
EnableAutoRelayWithStaticRelays([]peer.AddrInfo{{ID: relay.ID(), Addrs: relay.Addrs()}}),
ForceReachabilityPrivate())
require.NoError(t, err)
defer peerBehindRelay.Close()
// Emit an event to tell this peer it is private
emitterForPeerBehindRelay, err := peerBehindRelay.EventBus().Emitter(&event.EvtLocalReachabilityChanged{})
require.NoError(t, err)
defer emitterForPeerBehindRelay.Close()
emitterForPeerBehindRelay.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})

// Use a wrapped resource manager to test that the circuit dialing works
// with it. Look at the PR introducing this test for context
Expand All @@ -467,10 +457,12 @@ func TestDialCircuitAddrWithWrappedResourceManager(t *testing.T) {
)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
require.NoError(t, res.Error)
defer cancel()
require.Eventually(t, func() bool {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
res := <-ping.Ping(ctx, h, peerBehindRelay.ID())
return res.Error == nil
}, 5*time.Second, 50*time.Millisecond)
}

func TestHostAddrsFactoryAddsCerthashes(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,10 @@ func (h *BasicHost) Close() error {
_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()

if err := h.network.Close(); err != nil {
log.Errorf("swarm close failed: %v", err)
}

h.psManager.Close()
if h.Peerstore() != nil {
h.Peerstore().Close()
Expand Down
10 changes: 9 additions & 1 deletion p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ func TestMultipleClose(t *testing.T) {

require.NoError(t, h.Close())
require.NoError(t, h.Close())
require.NoError(t, h.Close())
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h2.Close()
require.Error(t, h.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
h.Network().Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
_, err = h.NewStream(context.Background(), h2.ID())
require.Error(t, err)
require.Empty(t, h.Addrs())
require.Empty(t, h.AllAddrs())
}

func TestSignedPeerRecordWithNoListenAddrs(t *testing.T) {
Expand Down

0 comments on commit 8a11b7c

Please sign in to comment.