Skip to content

Commit

Permalink
fix(network): close relay connection for public node (#891)
Browse files Browse the repository at this point in the history
  • Loading branch information
themantre authored Jan 1, 2024
1 parent 74dc6da commit 3963c43
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 37 deletions.
10 changes: 5 additions & 5 deletions network/gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (g *ConnectionGater) InterceptPeerDial(pid lp2ppeer.ID) bool {
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Debug("InterceptPeerDial rejected: many connections", "pid", pid)
g.logger.Info("InterceptPeerDial rejected: many connections", "pid", pid)
return false
}

Expand All @@ -70,13 +70,13 @@ func (g *ConnectionGater) InterceptAddrDial(pid lp2ppeer.ID, ma multiaddr.Multia
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Debug("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
g.logger.Info("InterceptAddrDial rejected: many connections", "pid", pid, "ma", ma.String())
return false
}

deny := g.filters.AddrBlocked(ma)
if deny {
g.logger.Debug("InterceptAddrDial rejected", "pid", pid, "ma", ma.String())
g.logger.Info("InterceptAddrDial rejected", "pid", pid, "ma", ma.String())
return false
}

Expand All @@ -88,13 +88,13 @@ func (g *ConnectionGater) InterceptAccept(cma lp2pnetwork.ConnMultiaddrs) bool {
defer g.lk.RUnlock()

if g.onConnectionLimit() {
g.logger.Debug("InterceptAccept rejected: many connections")
g.logger.Info("InterceptAccept rejected: many connections")
return false
}

deny := g.filters.AddrBlocked(cma.RemoteMultiaddr())
if deny {
g.logger.Debug("InterceptAccept rejected")
g.logger.Info("InterceptAccept rejected")
return false
}

Expand Down
5 changes: 0 additions & 5 deletions network/peermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
lp2phost "github.com/libp2p/go-libp2p/core/host"
lp2pnet "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/util/logger"
)
Expand Down Expand Up @@ -147,10 +146,6 @@ func (mgr *peerMgr) CheckConnectivity() {
continue
}

if swarm, ok := mgr.host.Network().(*lp2pswarm.Swarm); ok {
swarm.Backoff().Clear(ai.ID)
}

ConnectAsync(mgr.ctx, mgr.host, ai, mgr.logger)
}
}
Expand Down
55 changes: 28 additions & 27 deletions network/relay_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,19 @@ func newRelayService(ctx context.Context, host lp2phost.Host, conf *Config, log
}

func (rs *relayService) Start() {
if rs.conf.EnableRelay {
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()

for {
select {
case <-rs.ctx.Done():
return
case <-ticker.C:
rs.checkConnectivity()
}
for {
select {
case <-rs.ctx.Done():
return
case <-ticker.C:
rs.checkConnectivity()
}
}()
}
}
}()
}

func (rs *relayService) Stop() {
Expand All @@ -56,9 +54,7 @@ func (rs *relayService) SetReachability(reachability lp2pnetwork.Reachability) {
rs.reachability = reachability
rs.lk.Unlock()

if rs.conf.EnableRelay {
rs.checkConnectivity()
}
rs.checkConnectivity()
}

func (rs *relayService) Reachability() lp2pnetwork.Reachability {
Expand All @@ -72,19 +68,24 @@ func (rs *relayService) checkConnectivity() {
rs.lk.Lock()
defer rs.lk.Unlock()

if rs.reachability != lp2pnetwork.ReachabilityPrivate {
return
}
net := rs.host.Network()
for _, ai := range rs.conf.RelayAddrInfos() {
if net.Connectedness(ai.ID) != lp2pnetwork.Connected {
rs.logger.Info("try connecting relay node", "addr", ai.Addrs)
err := ConnectSync(rs.ctx, rs.host, ai)
if err != nil {
rs.logger.Warn("unable to connect to relay node", "error", err, "addr", ai.Addrs)
} else {
rs.logger.Info("connect to relay node", "addr", ai.Addrs)
if rs.conf.EnableRelay &&
rs.reachability == lp2pnetwork.ReachabilityPrivate {
for _, ai := range rs.conf.RelayAddrInfos() {
if net.Connectedness(ai.ID) != lp2pnetwork.Connected {
rs.logger.Info("try connecting relay node", "addr", ai.Addrs)
err := ConnectSync(rs.ctx, rs.host, ai)
if err != nil {
rs.logger.Warn("unable to connect to relay node", "error", err, "addr", ai.Addrs)
} else {
rs.logger.Info("connect to relay node", "addr", ai.Addrs)
}
}
}
} else {
// It is public node or relay is disabled.
for _, ai := range rs.conf.RelayAddrInfos() {
_ = net.ClosePeer(ai.ID)
}
}
}
5 changes: 5 additions & 0 deletions network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
lp2pnetwork "github.com/libp2p/go-libp2p/core/network"
lp2ppeer "github.com/libp2p/go-libp2p/core/peer"
lp2prcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
lp2pswarm "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/multiformats/go-multiaddr"
"github.com/pactus-project/pactus/crypto/hash"
"github.com/pactus-project/pactus/util/logger"
Expand Down Expand Up @@ -86,6 +87,10 @@ func ConnectAsync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrIn
}

func ConnectSync(ctx context.Context, h lp2phost.Host, addrInfo lp2ppeer.AddrInfo) error {
if swarm, ok := h.Network().(*lp2pswarm.Swarm); ok {
swarm.Backoff().Clear(addrInfo.ID)
}

return h.Connect(lp2pnetwork.WithDialPeerTimeout(ctx, 30*time.Second), addrInfo)
}

Expand Down

0 comments on commit 3963c43

Please sign in to comment.