Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
add user-defined conn select alg and user-defined dest select alg
Browse files Browse the repository at this point in the history
  • Loading branch information
lnykww committed Mar 19, 2019
1 parent e20fb5e commit 8c1eaf0
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 8 deletions.
42 changes: 42 additions & 0 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pstore "github.com/libp2p/go-libp2p-peerstore"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
mafilter "github.com/whyrusleeping/multiaddr-filter"
)

Expand Down Expand Up @@ -81,6 +82,9 @@ type Swarm struct {
// filters for addresses that shouldnt be dialed (or accepted)
Filters *filter.Filters

bestConn BestConn
bestDest BestDest

proc goprocess.Process
ctx context.Context
bwc metrics.Reporter
Expand Down Expand Up @@ -274,6 +278,16 @@ func (s *Swarm) StreamHandler() inet.StreamHandler {
return handler
}

// SetBestConn set the BestConn interface
func (s *Swarm) SetBestConn(bc BestConn) {
s.bestConn = bc
}

// SetBestDest set the BestDest interface
func (s *Swarm) SetBestDest(bd BestDest) {
s.bestDest = bd
}

// NewStream creates a new stream on any available connection to peer, dialing
// if necessary.
func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (inet.Stream, error) {
Expand Down Expand Up @@ -359,6 +373,34 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
return best
}

// Wrapper for BestConn Interface
func (s *Swarm) bestConnToPeerWrapper(p peer.ID) *Conn {
if s.bestConn == nil {
return s.bestConnToPeer(p)
}
s.conns.RLock()
defer s.conns.RUnlock()
return s.bestConn.BestConn(p, s.conns.m[p])
}

// Wrapper for BestConnFallback Interface
func (s *Swarm) bestConnToPeerFallbackWrapper(p peer.ID) *Conn {
if s.bestConn == nil {
return s.bestConnToPeer(p)
}
s.conns.RLock()
defer s.conns.RUnlock()
return s.bestConn.BestConnFallback(p, s.conns.m[p])
}

// Wrapper for BestDest Interface
func (s *Swarm) bestDestSelectWrapper(id peer.ID, addrs []ma.Multiaddr) []ma.Multiaddr {
if s.bestDest == nil {
return addrs
}
return s.bestDest.BestDestSelect(id, addrs)
}

// Connectedness returns our "connectedness" state with the given peer.
//
// To check if we have an open connection, use `s.Connectedness(p) ==
Expand Down
4 changes: 4 additions & 0 deletions swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (c *Conn) Close() error {
return c.err
}

func (c *Conn) IsClosed() bool {
return c.conn.IsClosed()
}

func (c *Conn) doClose() {
c.swarm.removeConn(c)

Expand Down
21 changes: 13 additions & 8 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()

// check if we already have an open connection first
conn := s.bestConnToPeer(p)
conn := s.bestConnToPeerWrapper(p)
if conn != nil {
return conn, nil
}
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// Short circuit.
// By the time we take the dial lock, we may already *have* a connection
// to the peer.
c := s.bestConnToPeer(p)
c := s.bestConnToPeerWrapper(p)
if c != nil {
return c, nil
}
Expand All @@ -239,7 +239,7 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {

conn, err := s.dial(ctx, p)
if err != nil {
conn = s.bestConnToPeer(p)
conn = s.bestConnToPeerFallbackWrapper(p)
if conn != nil {
// Hm? What error?
// Could have canceled the dial because we received a
Expand Down Expand Up @@ -289,14 +289,19 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
peerAddrs := s.peers.Addrs(p)
if len(peerAddrs) == 0 {
return nil, errors.New("no addresses")
}
goodAddrs := s.filterKnownUndialables(peerAddrs)
goodAddrs := s.filterKnownUndialables(s.peers.Addrs(p))

if len(goodAddrs) == 0 {
return nil, errors.New("no good addresses")
}

if s.bestDest != nil {
// Select the best address to peer.
bestAddrs := s.bestDestSelectWrapper(p, goodAddrs)
if len(bestAddrs) != 0 {
goodAddrs = bestAddrs
}
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
for _, a := range goodAddrs {
goodAddrsChan <- a
Expand Down

0 comments on commit 8c1eaf0

Please sign in to comment.