From 3dfe73967ed004b5b31d148cdc2ae59993ad4f3c Mon Sep 17 00:00:00 2001 From: Denis Kolegov Date: Tue, 16 Aug 2022 18:26:55 +0300 Subject: [PATCH 1/4] Improve libp2p transport --- pkg/net/libp2p/libp2ptransport.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/net/libp2p/libp2ptransport.go b/pkg/net/libp2p/libp2ptransport.go index 21dd1768a..38c118a16 100644 --- a/pkg/net/libp2p/libp2ptransport.go +++ b/pkg/net/libp2p/libp2ptransport.go @@ -27,8 +27,8 @@ import ( ) const ( - ID = "/mir/0.0.1" - defaultMaxTimeout = 300 * time.Millisecond + ProtocolID = "/mir/0.0.1" + defaultMaxTimeout = 200 * time.Millisecond PermanentAddrTTL = math.MaxInt64 - iota ) @@ -70,7 +70,7 @@ func (t *Transport) EventsOut() <-chan *events.EventList { func (t *Transport) Start() error { t.logger.Log(logging.LevelDebug, fmt.Sprintf("node %s handler starting on %v", t.ownID, t.host.Addrs())) - t.host.SetStreamHandler(ID, t.mirHandler) + t.host.SetStreamHandler(ProtocolID, t.mirHandler) return nil } @@ -95,7 +95,7 @@ func (t *Transport) Stop() { t.logger.Log(logging.LevelDebug, "Closed connection", "to", id) } - t.host.RemoveStreamHandler(ID) + t.host.RemoveStreamHandler(ProtocolID) if err := t.host.Close(); err != nil { t.logger.Log(logging.LevelError, fmt.Sprintf("Could not close libp2p %v: %v", t.ownID, err)) @@ -171,10 +171,13 @@ func (t *Transport) connectToNode(ctx context.Context, addr multiaddr.Multiaddr) t.host.Peerstore().AddAddrs(info.ID, info.Addrs, PermanentAddrTTL) - s, err := t.openStream(ctx, info.ID) + ctx, cancel := context.WithTimeout(ctx, defaultMaxTimeout) + defer cancel() + + // s, err := t.openStream(ctx, info.ID) + s, err := t.host.NewStream(ctx, info.ID, ProtocolID) if err != nil { - t.logger.Log(logging.LevelError, fmt.Sprintf("couldn't open stream: %v", err)) - return nil, fmt.Errorf("couldn't open stream to %v: %w", addr, err) + return nil, fmt.Errorf("failed to open new stream to node %v: %w", addr, err) } return s, nil @@ -183,7 +186,7 @@ func (t *Transport) connectToNode(ctx context.Context, addr multiaddr.Multiaddr) func (t *Transport) openStream(ctx context.Context, p peer.ID) (network.Stream, error) { for { sctx, cancel := context.WithTimeout(ctx, defaultMaxTimeout) - s, err := t.host.NewStream(sctx, p, ID) + s, err := t.host.NewStream(sctx, p, ProtocolID) cancel() if err == nil { From 405d56a06d47fe387621dabe27950c7f5f6c299b Mon Sep 17 00:00:00 2001 From: Denis Kolegov Date: Tue, 16 Aug 2022 22:17:33 +0300 Subject: [PATCH 2/4] back to the original one --- pkg/net/libp2p/libp2ptransport.go | 33 ++++++++++++++++--------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/net/libp2p/libp2ptransport.go b/pkg/net/libp2p/libp2ptransport.go index 38c118a16..1ac0bd7df 100644 --- a/pkg/net/libp2p/libp2ptransport.go +++ b/pkg/net/libp2p/libp2ptransport.go @@ -27,9 +27,11 @@ import ( ) const ( - ProtocolID = "/mir/0.0.1" - defaultMaxTimeout = 200 * time.Millisecond - PermanentAddrTTL = math.MaxInt64 - iota + ProtocolID = "/mir/0.0.1" + maxConnectingTimeout = 200 * time.Millisecond + retryTimeout = 2 * time.Second + retryAttempts = 20 + PermanentAddrTTL = math.MaxInt64 - iota ) type TransportMessage struct { @@ -171,11 +173,7 @@ func (t *Transport) connectToNode(ctx context.Context, addr multiaddr.Multiaddr) t.host.Peerstore().AddAddrs(info.ID, info.Addrs, PermanentAddrTTL) - ctx, cancel := context.WithTimeout(ctx, defaultMaxTimeout) - defer cancel() - - // s, err := t.openStream(ctx, info.ID) - s, err := t.host.NewStream(ctx, info.ID, ProtocolID) + s, err := t.openStream(ctx, info.ID) if err != nil { return nil, fmt.Errorf("failed to open new stream to node %v: %w", addr, err) } @@ -184,19 +182,21 @@ func (t *Transport) connectToNode(ctx context.Context, addr multiaddr.Multiaddr) } func (t *Transport) openStream(ctx context.Context, p peer.ID) (network.Stream, error) { - for { - sctx, cancel := context.WithTimeout(ctx, defaultMaxTimeout) - s, err := t.host.NewStream(sctx, p, ProtocolID) + var streamErr error + for i := 0; i < retryAttempts; i++ { + sctx, cancel := context.WithTimeout(ctx, maxConnectingTimeout) + + s, streamErr := t.host.NewStream(sctx, p, ProtocolID) cancel() - if err == nil { + if streamErr == nil { return s, nil } - t.logger.Log(logging.LevelError, fmt.Sprintf("failed to open stream: %v", err)) - - delay := time.NewTimer(defaultMaxTimeout) + t.logger.Log( + logging.LevelError, fmt.Sprintf("failed to open stream to %s, retry in %d sec", p, retryTimeout)) + delay := time.NewTimer(retryTimeout) select { case <-delay.C: continue @@ -204,9 +204,10 @@ func (t *Transport) openStream(ctx context.Context, p peer.ID) (network.Stream, if !delay.Stop() { <-delay.C } - return nil, fmt.Errorf("context closed") + return nil, fmt.Errorf("libp2p opening stream: context closed") } } + return nil, fmt.Errorf("failed to open stream to %s: %w", p, streamErr) } func (t *Transport) Send(dest types.NodeID, payload *messagepb.Message) error { From b1a63ea2ecb9501c49a69593d005505a5926fe1c Mon Sep 17 00:00:00 2001 From: Denis Kolegov Date: Wed, 17 Aug 2022 11:43:30 +0300 Subject: [PATCH 3/4] report an error after 2 failed connections --- pkg/net/libp2p/libp2ptransport.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/net/libp2p/libp2ptransport.go b/pkg/net/libp2p/libp2ptransport.go index 1ac0bd7df..afb644cbc 100644 --- a/pkg/net/libp2p/libp2ptransport.go +++ b/pkg/net/libp2p/libp2ptransport.go @@ -31,6 +31,7 @@ const ( maxConnectingTimeout = 200 * time.Millisecond retryTimeout = 2 * time.Second retryAttempts = 20 + nonErrorAttempts = 2 PermanentAddrTTL = math.MaxInt64 - iota ) @@ -193,8 +194,10 @@ func (t *Transport) openStream(ctx context.Context, p peer.ID) (network.Stream, return s, nil } - t.logger.Log( - logging.LevelError, fmt.Sprintf("failed to open stream to %s, retry in %d sec", p, retryTimeout)) + if i >= nonErrorAttempts { + t.logger.Log( + logging.LevelError, fmt.Sprintf("failed to open stream to %s, retry in %d sec", p, retryTimeout)) + } delay := time.NewTimer(retryTimeout) select { From dd3153447a84da894f5a30fef77b74be928bd32e Mon Sep 17 00:00:00 2001 From: Denis Kolegov Date: Wed, 17 Aug 2022 11:46:30 +0300 Subject: [PATCH 4/4] print info message --- pkg/net/libp2p/libp2ptransport.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/net/libp2p/libp2ptransport.go b/pkg/net/libp2p/libp2ptransport.go index afb644cbc..a8eeb3e7e 100644 --- a/pkg/net/libp2p/libp2ptransport.go +++ b/pkg/net/libp2p/libp2ptransport.go @@ -197,6 +197,9 @@ func (t *Transport) openStream(ctx context.Context, p peer.ID) (network.Stream, if i >= nonErrorAttempts { t.logger.Log( logging.LevelError, fmt.Sprintf("failed to open stream to %s, retry in %d sec", p, retryTimeout)) + } else { + t.logger.Log( + logging.LevelInfo, fmt.Sprintf("failed to open stream to %s, retry in %d sec", p, retryTimeout)) } delay := time.NewTimer(retryTimeout)