Skip to content

Commit

Permalink
fix: use default HTTP routers when FullRT DHT client is used (#9841)
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored and hacdias committed May 3, 2023
1 parent 1b09975 commit 3b33b04
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 53 deletions.
14 changes: 2 additions & 12 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,19 +413,9 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
case routingOptionSupernodeKwd:
return errors.New("supernode routing was never fully implemented and has been removed")
case routingOptionDefaultKwd, routingOptionAutoKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTOption)
case routingOptionAutoClientKwd:
ncfg.Routing = libp2p.ConstructDefaultRouting(
cfg.Identity.PeerID,
cfg.Addresses.Swarm,
cfg.Identity.PrivKey,
libp2p.DHTClientOption,
)
ncfg.Routing = libp2p.ConstructDefaultRouting(cfg, libp2p.DHTClientOption)
case routingOptionDHTClientKwd:
ncfg.Routing = libp2p.DHTClientOption
case routingOptionDHTKwd:
Expand Down
2 changes: 1 addition & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
fx.Provide(libp2p.Routing),
fx.Provide(libp2p.ContentRouting),

fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
fx.Provide(libp2p.BaseRouting(cfg)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down
47 changes: 29 additions & 18 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,34 @@ type processInitialRoutingOut struct {

type AddrInfoChan chan peer.AddrInfo

func BaseRouting(experimentalDHTClient bool) interface{} {
func BaseRouting(cfg *config.Config) interface{} {
return func(lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dr *ddht.DHT
var dualDHT *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dr = dht
dualDHT = dht

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})
}

if pr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range pr.Routers() {
if cr, ok := in.Router.(routinghelpers.ComposableRouter); ok {
for _, r := range cr.Routers() {
if dht, ok := r.(*ddht.DHT); ok {
dr = dht
dualDHT = dht
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
return dualDHT.Close()
},
})

break
}
}
}

if dr != nil && experimentalDHTClient {
if dualDHT != nil && cfg.Experimental.AcceleratedDHTClient {
cfg, err := in.Repo.Config()
if err != nil {
return out, err
Expand All @@ -101,7 +100,7 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
return out, err
}

expClient, err := fullrt.NewFullRT(in.Host,
fullRTClient, err := fullrt.NewFullRT(in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
Expand All @@ -116,18 +115,30 @@ func BaseRouting(experimentalDHTClient bool) interface{} {

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return expClient.Close()
return fullRTClient.Close()
},
})

// we want to also use the default HTTP routers, so wrap the FullRT client
// in a parallel router that calls them in parallel
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return out, err
}
routers := []*routinghelpers.ParallelRouter{
{Router: fullRTClient},
}
routers = append(routers, httpRouters...)
router := routinghelpers.NewComposableParallel(routers)

return processInitialRoutingOut{
Router: Router{
Routing: expClient,
Priority: 1000,
Routing: router,
},
DHT: dr,
DHTClient: expClient,
ContentRouter: expClient,
DHT: dualDHT,
DHTClient: fullRTClient,
ContentRouter: fullRTClient,
}, nil
}

Expand All @@ -136,8 +147,8 @@ func BaseRouting(experimentalDHTClient bool) interface{} {
Priority: 1000,
Routing: in.Router,
},
DHT: dr,
DHTClient: dr,
DHT: dualDHT,
DHTClient: dualDHT,
ContentRouter: in.Router,
}, nil
}
Expand Down
55 changes: 33 additions & 22 deletions core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,35 @@ func init() {
}
}

func constructDefaultHTTPRouters(cfg *config.Config) ([]*routinghelpers.ParallelRouter, error) {
var routers []*routinghelpers.ParallelRouter
// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, cfg.Identity.PeerID, cfg.Addresses.Swarm, cfg.Identity.PrivKey)
if err != nil {
return nil, err
}

r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}

routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
}
return routers, nil
}

// ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto"
func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func(
func ConstructDefaultRouting(cfg *config.Config, routingOpt RoutingOption) func(
ctx context.Context,
host host.Host,
dstore datastore.Batching,
Expand Down Expand Up @@ -68,29 +95,13 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string, rout
ExecuteAfter: 0,
})

// Append HTTP routers for additional speed
for _, endpoint := range defaultHTTPRouters {
httpRouter, err := irouting.ConstructHTTPRouter(endpoint, peerID, addrs, privKey)
if err != nil {
return nil, err
}

r := &irouting.Composer{
GetValueRouter: routinghelpers.Null{},
PutValueRouter: routinghelpers.Null{},
ProvideRouter: routinghelpers.Null{}, // modify this when indexers supports provide
FindPeersRouter: routinghelpers.Null{},
FindProvidersRouter: httpRouter,
}

routers = append(routers, &routinghelpers.ParallelRouter{
Router: r,
IgnoreError: true, // https://github.com/ipfs/kubo/pull/9475#discussion_r1042507387
Timeout: 15 * time.Second, // 5x server value from https://github.com/ipfs/kubo/pull/9475#discussion_r1042428529
ExecuteAfter: 0,
})
httpRouters, err := constructDefaultHTTPRouters(cfg)
if err != nil {
return nil, err
}

routers = append(routers, httpRouters...)

routing := routinghelpers.NewComposableParallel(routers)
return routing, nil
}
Expand Down

0 comments on commit 3b33b04

Please sign in to comment.