From 9ff05867ec6e5251f2279db66be228ba8cfd7758 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Mon, 16 May 2022 15:47:10 -0400 Subject: [PATCH] feat: log when resource manager limits are exceeded This periodically logs how many times Resource Manager limits were exceeded. If they aren't exceeded, then nothing is logged. The log levels are at ERROR log level so that they are shown by default. The motivation is so that users know when they have exceeded resource manager limits. To find what is exceeding the limits, they'll need to turn on debug logging and inspect the errors being logged. This could collect the specific limits being reached, but that's more complicated to implement and could result in much longer log messages. --- core/commands/swarm.go | 2 +- core/node/libp2p/rcmgr.go | 15 ++- core/node/libp2p/rcmgr_logging.go | 160 +++++++++++++++++++++++++ core/node/libp2p/rcmgr_logging_test.go | 58 +++++++++ go.mod | 7 +- 5 files changed, 236 insertions(+), 6 deletions(-) create mode 100644 core/node/libp2p/rcmgr_logging.go create mode 100644 core/node/libp2p/rcmgr_logging_test.go diff --git a/core/commands/swarm.go b/core/commands/swarm.go index d6a3e8d696d..f904ed67e19 100644 --- a/core/commands/swarm.go +++ b/core/commands/swarm.go @@ -409,7 +409,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel return errors.New("expected a JSON file") } if err := json.NewDecoder(file).Decode(&newLimit); err != nil { - return errors.New("failed to decode JSON as ResourceMgrScopeConfig") + return fmt.Errorf("decoding JSON as ResourceMgrScopeConfig: %w", err) } return libp2p.NetSetLimit(node.ResourceManager, node.Repo, scope, newLimit) } diff --git a/core/node/libp2p/rcmgr.go b/core/node/libp2p/rcmgr.go index 28d05a131b4..4d4b29a564d 100644 --- a/core/node/libp2p/rcmgr.go +++ b/core/node/libp2p/rcmgr.go @@ -7,9 +7,11 @@ import ( "path/filepath" "strings" + "github.com/benbjohnson/clock" config "github.com/ipfs/go-ipfs/config" + "github.com/ipfs/go-ipfs/core/node/helpers" "github.com/ipfs/go-ipfs/repo" - + logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -24,8 +26,8 @@ const NetLimitTraceFilename = "rcmgr.json.gz" var NoResourceMgrError = fmt.Errorf("missing ResourceMgr: make sure the daemon is running with Swarm.ResourceMgr.Enabled") -func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (network.ResourceManager, Libp2pOpts, error) { - return func(lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { +func ResourceManager(cfg config.SwarmConfig) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo) (network.ResourceManager, Libp2pOpts, error) { var manager network.ResourceManager var opts Libp2pOpts @@ -72,6 +74,13 @@ func ResourceManager(cfg config.SwarmConfig) func(fx.Lifecycle, repo.Repo) (netw if err != nil { return nil, opts, fmt.Errorf("creating libp2p resource manager: %w", err) } + lrm := &loggingResourceManager{ + clock: clock.New(), + logger: &logging.Logger("resourcemanager").SugaredLogger, + delegate: manager, + } + lrm.start(helpers.LifecycleCtx(mctx, lc)) + manager = lrm } else { log.Debug("libp2p resource manager is disabled") manager = network.NullResourceManager diff --git a/core/node/libp2p/rcmgr_logging.go b/core/node/libp2p/rcmgr_logging.go new file mode 100644 index 00000000000..06d22c71b8c --- /dev/null +++ b/core/node/libp2p/rcmgr_logging.go @@ -0,0 +1,160 @@ +package libp2p + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "go.uber.org/zap" +) + +type loggingResourceManager struct { + clock clock.Clock + logger *zap.SugaredLogger + delegate network.ResourceManager + logInterval time.Duration + + mut sync.Mutex + limitExceededErrs uint64 +} + +type loggingScope struct { + logger *zap.SugaredLogger + delegate network.ResourceScope + countErrs func(error) +} + +var _ network.ResourceManager = (*loggingResourceManager)(nil) + +func (n *loggingResourceManager) start(ctx context.Context) { + logInterval := n.logInterval + if logInterval == 0 { + logInterval = 10 * time.Second + } + ticker := n.clock.Ticker(logInterval) + go func() { + defer ticker.Stop() + for { + select { + case <-ticker.C: + n.mut.Lock() + errs := n.limitExceededErrs + n.limitExceededErrs = 0 + n.mut.Unlock() + if errs != 0 { + n.logger.Warnf("Resource limits were exceeded %d times, consider inspecting logs and raising the resource manager limits.", errs) + } + case <-ctx.Done(): + return + } + } + }() +} + +func (n *loggingResourceManager) countErrs(err error) { + if errors.Is(err, network.ErrResourceLimitExceeded) { + n.mut.Lock() + n.limitExceededErrs++ + n.mut.Unlock() + } +} + +func (n *loggingResourceManager) ViewSystem(f func(network.ResourceScope) error) error { + return n.delegate.ViewSystem(f) +} +func (n *loggingResourceManager) ViewTransient(f func(network.ResourceScope) error) error { + return n.delegate.ViewTransient(func(s network.ResourceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewService(svc string, f func(network.ServiceScope) error) error { + return n.delegate.ViewService(svc, func(s network.ServiceScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewProtocol(p protocol.ID, f func(network.ProtocolScope) error) error { + return n.delegate.ViewProtocol(p, func(s network.ProtocolScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) ViewPeer(p peer.ID, f func(network.PeerScope) error) error { + return n.delegate.ViewPeer(p, func(s network.PeerScope) error { + return f(&loggingScope{logger: n.logger, delegate: s, countErrs: n.countErrs}) + }) +} +func (n *loggingResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) { + connMgmtScope, err := n.delegate.OpenConnection(dir, usefd) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) { + connMgmtScope, err := n.delegate.OpenStream(p, dir) + n.countErrs(err) + return connMgmtScope, err +} +func (n *loggingResourceManager) Close() error { + return n.delegate.Close() +} + +func (s *loggingScope) ReserveMemory(size int, prio uint8) error { + err := s.delegate.ReserveMemory(size, prio) + s.countErrs(err) + return err +} +func (s *loggingScope) ReleaseMemory(size int) { + s.delegate.ReleaseMemory(size) +} +func (s *loggingScope) Stat() network.ScopeStat { + return s.delegate.Stat() +} +func (s *loggingScope) BeginSpan() (network.ResourceScopeSpan, error) { + return s.delegate.BeginSpan() +} +func (s *loggingScope) Done() { + s.delegate.(network.ResourceScopeSpan).Done() +} +func (s *loggingScope) Name() string { + return s.delegate.(network.ServiceScope).Name() +} +func (s *loggingScope) Protocol() protocol.ID { + return s.delegate.(network.ProtocolScope).Protocol() +} +func (s *loggingScope) Peer() peer.ID { + return s.delegate.(network.PeerScope).Peer() +} +func (s *loggingScope) PeerScope() network.PeerScope { + return s.delegate.(network.PeerScope) +} +func (s *loggingScope) SetPeer(p peer.ID) error { + err := s.delegate.(network.ConnManagementScope).SetPeer(p) + s.countErrs(err) + return err +} +func (s *loggingScope) ProtocolScope() network.ProtocolScope { + return s.delegate.(network.ProtocolScope) +} +func (s *loggingScope) SetProtocol(proto protocol.ID) error { + err := s.delegate.(network.StreamManagementScope).SetProtocol(proto) + s.countErrs(err) + return err +} +func (s *loggingScope) ServiceScope() network.ServiceScope { + return s.delegate.(network.ServiceScope) +} +func (s *loggingScope) SetService(srv string) error { + err := s.delegate.(network.StreamManagementScope).SetService(srv) + s.countErrs(err) + return err +} +func (s *loggingScope) Limit() rcmgr.Limit { + return s.delegate.(rcmgr.ResourceScopeLimiter).Limit() +} +func (s *loggingScope) SetLimit(limit rcmgr.Limit) { + s.delegate.(rcmgr.ResourceScopeLimiter).SetLimit(limit) +} diff --git a/core/node/libp2p/rcmgr_logging_test.go b/core/node/libp2p/rcmgr_logging_test.go new file mode 100644 index 00000000000..72f34b80885 --- /dev/null +++ b/core/node/libp2p/rcmgr_logging_test.go @@ -0,0 +1,58 @@ +package libp2p + +import ( + "context" + "testing" + "time" + + "github.com/benbjohnson/clock" + "github.com/libp2p/go-libp2p-core/network" + rcmgr "github.com/libp2p/go-libp2p-resource-manager" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +func TestLoggingResourceManager(t *testing.T) { + clock := clock.NewMock() + limiter := rcmgr.NewDefaultLimiter() + limiter.SystemLimits = limiter.SystemLimits.WithConnLimit(1, 1, 1) + rm, err := rcmgr.NewResourceManager(limiter) + if err != nil { + t.Fatal(err) + } + + oCore, oLogs := observer.New(zap.WarnLevel) + oLogger := zap.New(oCore) + lrm := &loggingResourceManager{ + clock: clock, + logger: oLogger.Sugar(), + delegate: rm, + logInterval: 1 * time.Second, + } + + // 2 of these should result in resource limit exceeded errors and subsequent log messages + for i := 0; i < 3; i++ { + _, _ = lrm.OpenConnection(network.DirInbound, false) + } + + // run the logger which will write an entry for those errors + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + lrm.start(ctx) + clock.Add(3 * time.Second) + + timer := time.NewTimer(1 * time.Second) + for { + select { + case <-timer.C: + t.Fatalf("expected logs never arrived") + default: + if oLogs.Len() == 0 { + continue + } + require.Equal(t, "Resource limits were exceeded 2 times, consider inspecting logs and raising the resource manager limits.", oLogs.All()[0].Message) + return + } + } +} diff --git a/go.mod b/go.mod index c2b77d793fe..b7946841b27 100644 --- a/go.mod +++ b/go.mod @@ -125,13 +125,17 @@ require ( golang.org/x/sys v0.0.0-20220412211240-33da011f77ad ) +require ( + github.com/benbjohnson/clock v1.3.0 + github.com/ipfs/go-log/v2 v2.5.1 +) + require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/Kubuxu/go-os-helper v0.0.1 // indirect github.com/Stebalien/go-bitfield v0.0.1 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect - github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect @@ -172,7 +176,6 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect github.com/ipfs/go-ipfs-pq v0.0.2 // indirect - github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/ipfs/go-peertaskqueue v0.7.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/klauspost/compress v1.15.1 // indirect