-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
p2p/net/swarm: dial once at a time #549
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package swarm | |
import ( | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
conn "github.com/jbenet/go-ipfs/p2p/net/conn" | ||
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr" | ||
|
@@ -13,24 +14,133 @@ import ( | |
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" | ||
) | ||
|
||
// dialAttempts governs how many times a goroutine will try to dial a given peer. | ||
const dialAttempts = 3 | ||
|
||
// dialsync is a small object that helps manage ongoing dials. | ||
// this way, if we receive many simultaneous dial requests, one | ||
// can do its thing, while the rest wait. | ||
// | ||
// this interface is so would-be dialers can just: | ||
// | ||
// for { | ||
// c := findConnectionToPeer(peer) | ||
// if c != nil { | ||
// return c | ||
// } | ||
// | ||
// // ok, no connections. should we dial? | ||
// if ok, wait := dialsync.Lock(peer); !ok { | ||
// <-wait // can optionally wait | ||
// continue | ||
// } | ||
// defer dialsync.Unlock(peer) | ||
// | ||
// c := actuallyDial(peer) | ||
// return c | ||
// } | ||
// | ||
type dialsync struct { | ||
// ongoing is a map of tickets for the current peers being dialed. | ||
// this way, we dont kick off N dials simultaneously. | ||
ongoing map[peer.ID]chan struct{} | ||
lock sync.Mutex | ||
} | ||
|
||
// Lock governs the beginning of a dial attempt. | ||
// If there are no ongoing dials, it returns true, and the client is now | ||
// scheduled to dial. Every other goroutine that calls startDial -- with | ||
//the same dst -- will block until client is done. The client MUST call | ||
// ds.Unlock(p) when it is done, to unblock the other callers. | ||
// The client is not reponsible for achieving a successful dial, only for | ||
// reporting the end of the attempt (calling ds.Unlock(p)). | ||
// | ||
// see the example below `dialsync` | ||
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) { | ||
ds.lock.Lock() | ||
if ds.ongoing == nil { // init if not ready | ||
ds.ongoing = make(map[peer.ID]chan struct{}) | ||
} | ||
wait, found := ds.ongoing[dst] | ||
if !found { | ||
ds.ongoing[dst] = make(chan struct{}) | ||
} | ||
ds.lock.Unlock() | ||
|
||
if found { | ||
return false, wait | ||
} | ||
|
||
// ok! you're signed up to dial! | ||
return true, nil | ||
} | ||
|
||
// Unlock releases waiters to a dial attempt. see Lock. | ||
// if Unlock(p) is called without calling Lock(p) first, Unlock panics. | ||
func (ds *dialsync) Unlock(dst peer.ID) { | ||
ds.lock.Lock() | ||
wait, found := ds.ongoing[dst] | ||
if !found { | ||
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty()) | ||
} | ||
delete(ds.ongoing, dst) // remove ongoing dial | ||
close(wait) // release everyone else | ||
ds.lock.Unlock() | ||
} | ||
|
||
// Dial connects to a peer. | ||
// | ||
// The idea is that the client of Swarm does not need to know what network | ||
// the connection will happen over. Swarm can use whichever it choses. | ||
// This allows us to use various transport protocols, do NAT traversal/relay, | ||
// etc. to achive connection. | ||
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) { | ||
|
||
if p == s.local { | ||
return nil, errors.New("Attempted connection to self!") | ||
} | ||
|
||
// check if we already have an open connection first | ||
cs := s.ConnectionsToPeer(p) | ||
for _, c := range cs { | ||
if c != nil { // dump out the first one we find | ||
return c, nil | ||
// this loop is here because dials take time, and we should not be dialing | ||
// the same peer concurrently (silly waste). Additonally, it's structured | ||
// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we | ||
// may have received an incoming connection! if so, we no longer must dial. | ||
// | ||
// During the dial attempts, we may be doing the dialing. if not, we wait. | ||
var err error | ||
var conn *Conn | ||
for i := 0; i < dialAttempts; i++ { | ||
// check if we already have an open connection first | ||
cs := s.ConnectionsToPeer(p) | ||
for _, conn = range cs { | ||
if conn != nil { // dump out the first one we find. (TODO pick better) | ||
return conn, nil | ||
} | ||
} | ||
|
||
// check if there's an ongoing dial to this peer | ||
if ok, wait := s.dsync.Lock(p); !ok { | ||
select { | ||
case <-wait: // wait for that dial to finish. | ||
continue // and see if it worked (loop), OR we got an incoming dial. | ||
case <-ctx.Done(): // or we may have to bail... | ||
return nil, ctx.Err() | ||
} | ||
} | ||
|
||
// ok, we have been charged to dial! let's do it. | ||
conn, err = s.dial(ctx, p) | ||
s.dsync.Unlock(p) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It wasn't immediately clear that the conn is added to the swarm's list of conns in the dial method, so I thought this lock might've been released too early. However, I read through and saw that swarm add conn occurs in dialConnSetup. (nested state changes are somewhat hard to follow. It's nice to have lots of pure code (and to isolate code containing the side-effects, and to keep the side-effects as close to the surface as possible)) but the code looks correct. LGTM There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, i was annoyed by this, and other warts. it isn't easy to lift up, without carrying a lot of logic into this part, which I wanted to keep very light-- only focused on the sync attempts (turns out dialing is hard!). I added a note 8e888b1 which may clear it up for the reader. |
||
if err != nil { | ||
continue // ok, we failed. try again. (if loop is done, our error is output) | ||
} | ||
return conn, nil | ||
} | ||
return nil, err | ||
} | ||
|
||
// dial is the actual swarm's dial logic, gated by Dial. | ||
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { | ||
if p == s.local { | ||
return nil, errors.New("Attempted connection to self!") | ||
} | ||
|
||
sk := s.peers.PrivKey(s.local) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't immediately clear that the
conn
is added to the swarm's list of conns in thedial
method, so I thought this lock might've been released too early.However, I read through and saw that swarm add conn occurs in dialConnSetup.
(nested state changes are somewhat hard to follow. It's nice to have lots of pure code (and to isolate code containing the side-effects, and to keep the side-effects as close to the surface as possible))
but the code looks correct. LGTM