From 53c525f0edf2eacf0cd796dd794fc4e08adc3a93 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 15 Mar 2022 09:43:18 +0200 Subject: [PATCH 1/2] improve resource manager integration - add opt-in env var to control instantation, until we are comfortable with testing to enble by default. - adjust default limits if the connection manager high mark is higher than the default inbound conn limit. --- node/builder.go | 3 +- node/modules/lp2p/rcmgr.go | 127 ++++++++++++++++++++++++++----------- 2 files changed, 91 insertions(+), 39 deletions(-) diff --git a/node/builder.go b/node/builder.go index acda5a07a80..6386f78a608 100644 --- a/node/builder.go +++ b/node/builder.go @@ -222,7 +222,7 @@ var LibP2P = Options( Override(ConnGaterKey, lp2p.ConnGaterOption), // Services (resource management) - Override(new(network.ResourceManager), lp2p.ResourceManager), + Override(new(network.ResourceManager), lp2p.ResourceManager(200)), Override(ResourceManagerKey, lp2p.ResourceManagerOption), ) @@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option { cfg.Libp2p.ConnMgrHigh, time.Duration(cfg.Libp2p.ConnMgrGrace), cfg.Libp2p.ProtectedPeers)), + Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)), Override(new(*pubsub.PubSub), lp2p.GossipSub), Override(new(*config.Pubsub), &cfg.Pubsub), diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 0bc4dd6b215..24bea465ff7 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/bits" "os" "path/filepath" @@ -15,6 +16,8 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" rcmgr "github.com/libp2p/go-libp2p-resource-manager" + logging "github.com/ipfs/go-log/v2" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node/repo" @@ -22,55 +25,103 @@ import ( "go.opencensus.io/tag" ) -func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { - var limiter *rcmgr.BasicLimiter - var opts []rcmgr.Option - - repoPath := repo.Path() +func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { + return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { + envvar := os.Getenv("LOTUS_RCMGR") + if envvar == "" || envvar == "0" { + // TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing + log.Info("libp2p resource manager is disabled") + return network.NullResourceManager, nil + } - // create limiter -- parse $repo/limits.json if exists - limitsFile := filepath.Join(repoPath, "limits.json") - limitsIn, err := os.Open(limitsFile) - switch { - case err == nil: - defer limitsIn.Close() //nolint:errcheck - limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn) - if err != nil { - return nil, fmt.Errorf("error parsing limit file: %w", err) + log.Info("libp2p resource manager is enabled") + // enable debug logs for rcmgr + logging.SetLogLevel("rcmgr", "debug") + + // Adjust default limits + // - give it more memory, up to 4G, min of 1G + // - if maxconns are too high, adjust Conn/FD/Stream limits + defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30) + maxconns := int(connMgrHi) + if maxconns > defaultLimits.SystemBaseLimit.ConnsInbound { + defaultLimits.SystemBaseLimit.ConnsInbound = logScale(maxconns) + defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(maxconns) + defaultLimits.SystemBaseLimit.Conns = logScale(2 * maxconns) + + defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns) + defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns) + defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns) + + if maxconns > defaultLimits.SystemBaseLimit.FD { + defaultLimits.SystemBaseLimit.FD = logScale(maxconns) + } + + defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns) + defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns) + defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns) + + defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns) + defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns) + defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns) + + log.Info("adjusted default resource manager limits") } - case errors.Is(err, os.ErrNotExist): - limiter = rcmgr.NewDefaultLimiter() + // initialize + var limiter *rcmgr.BasicLimiter + var opts []rcmgr.Option - default: - return nil, err - } + repoPath := repo.Path() - // TODO: also set appropriate default limits for lotus protocols - libp2p.SetDefaultServiceLimits(limiter) + // create limiter -- parse $repo/limits.json if exists + limitsFile := filepath.Join(repoPath, "limits.json") + limitsIn, err := os.Open(limitsFile) + switch { + case err == nil: + defer limitsIn.Close() //nolint:errcheck + limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits) + if err != nil { + return nil, fmt.Errorf("error parsing limit file: %w", err) + } - opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) + case errors.Is(err, os.ErrNotExist): + limiter = rcmgr.NewStaticLimiter(defaultLimits) - if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { - debugPath := filepath.Join(repoPath, "debug") - if err := os.MkdirAll(debugPath, 0755); err != nil { - return nil, fmt.Errorf("error creating debug directory: %w", err) + default: + return nil, err } - traceFile := filepath.Join(debugPath, "rcmgr.json.gz") - opts = append(opts, rcmgr.WithTrace(traceFile)) - } - mgr, err := rcmgr.NewResourceManager(limiter, opts...) - if err != nil { - return nil, fmt.Errorf("error creating resource manager: %w", err) - } + // TODO: also set appropriate default limits for lotus protocols + libp2p.SetDefaultServiceLimits(limiter) + + opts = append(opts, rcmgr.WithMetrics(rcmgrMetrics{})) - lc.Append(fx.Hook{ - OnStop: func(_ context.Context) error { - return mgr.Close() - }}) + if os.Getenv("LOTUS_DEBUG_RCMGR") != "" { + debugPath := filepath.Join(repoPath, "debug") + if err := os.MkdirAll(debugPath, 0755); err != nil { + return nil, fmt.Errorf("error creating debug directory: %w", err) + } + traceFile := filepath.Join(debugPath, "rcmgr.json.gz") + opts = append(opts, rcmgr.WithTrace(traceFile)) + } + + mgr, err := rcmgr.NewResourceManager(limiter, opts...) + if err != nil { + return nil, fmt.Errorf("error creating resource manager: %w", err) + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + return mgr.Close() + }}) + + return mgr, nil + } +} - return mgr, nil +func logScale(val int) int { + bitlen := bits.Len(uint(val)) + return 1 << bitlen } func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { From 9fcafff847173961bd294e11670de24b0163f0e6 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 15 Mar 2022 09:59:34 +0200 Subject: [PATCH 2/2] adjust conns to 2x maxconns for duplicate connections --- node/modules/lp2p/rcmgr.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/node/modules/lp2p/rcmgr.go b/node/modules/lp2p/rcmgr.go index 24bea465ff7..d0906fd8fc3 100644 --- a/node/modules/lp2p/rcmgr.go +++ b/node/modules/lp2p/rcmgr.go @@ -43,17 +43,18 @@ func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) // - if maxconns are too high, adjust Conn/FD/Stream limits defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30) maxconns := int(connMgrHi) - if maxconns > defaultLimits.SystemBaseLimit.ConnsInbound { - defaultLimits.SystemBaseLimit.ConnsInbound = logScale(maxconns) - defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(maxconns) - defaultLimits.SystemBaseLimit.Conns = logScale(2 * maxconns) + if 2*maxconns > defaultLimits.SystemBaseLimit.ConnsInbound { + // adjust conns to 2x to allow for two conns per peer (TCP+QUIC) + defaultLimits.SystemBaseLimit.ConnsInbound = logScale(2 * maxconns) + defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(2 * maxconns) + defaultLimits.SystemBaseLimit.Conns = logScale(4 * maxconns) defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns) defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns) defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns) - if maxconns > defaultLimits.SystemBaseLimit.FD { - defaultLimits.SystemBaseLimit.FD = logScale(maxconns) + if 2*maxconns > defaultLimits.SystemBaseLimit.FD { + defaultLimits.SystemBaseLimit.FD = logScale(2 * maxconns) } defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns)