Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/net/swarm: dial once at a time #549

Merged
merged 4 commits into from
Jan 13, 2015
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions p2p/net/swarm/simul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,49 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)

func TestSimultDials(t *testing.T) {
// t.Skip("skipping for another test")

ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)

// connect everyone
{
var wg sync.WaitGroup
connect := func(s *Swarm, dst peer.ID, addr ma.Multiaddr) {
// copy for other peer
log.Debugf("TestSimultOpen: connecting: %s --> %s (%s)", s.local, dst, addr)
s.peers.AddAddress(dst, addr)
if _, err := s.Dial(ctx, dst); err != nil {
t.Fatal("error swarm dialing to peer", err)
}
wg.Done()
}

log.Info("Connecting swarms simultaneously.")
for i := 0; i < 10; i++ { // connect 10x for each.
wg.Add(2)
go connect(swarms[0], swarms[1].local, swarms[1].ListenAddresses()[0])
go connect(swarms[1], swarms[0].local, swarms[0].ListenAddresses()[0])
}
wg.Wait()
}

// should still just have 1, at most 2 connections :)
c01l := len(swarms[0].ConnectionsToPeer(swarms[1].local))
if c01l > 2 {
t.Error("0->1 has", c01l)
}
c10l := len(swarms[1].ConnectionsToPeer(swarms[0].local))
if c10l > 2 {
t.Error("1->0 has", c10l)
}

for _, s := range swarms {
s.Close()
}
}

func TestSimultOpen(t *testing.T) {
// t.Skip("skipping for another test")

Expand Down
9 changes: 5 additions & 4 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Swarm struct {
local peer.ID
peers peer.Peerstore
connh ConnHandler
dsync dialsync

cg ctxgroup.ContextGroup
}
Expand All @@ -49,10 +50,10 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr,
}

s := &Swarm{
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
swarm: ps.NewSwarm(PSTransport),
local: local,
peers: peers,
cg: ctxgroup.WithContext(ctx),
}

// configure Swarm
Expand Down
122 changes: 116 additions & 6 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package swarm
import (
"errors"
"fmt"
"sync"

conn "github.com/jbenet/go-ipfs/p2p/net/conn"
addrutil "github.com/jbenet/go-ipfs/p2p/net/swarm/addr"
Expand All @@ -13,24 +14,133 @@ import (
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)

// dialAttempts governs how many times a goroutine will try to dial a given peer.
const dialAttempts = 3

// dialsync is a small object that helps manage ongoing dials.
// this way, if we receive many simultaneous dial requests, one
// can do its thing, while the rest wait.
//
// this interface is so would-be dialers can just:
//
// for {
// c := findConnectionToPeer(peer)
// if c != nil {
// return c
// }
//
// // ok, no connections. should we dial?
// if ok, wait := dialsync.Lock(peer); !ok {
// <-wait // can optionally wait
// continue
// }
// defer dialsync.Unlock(peer)
//
// c := actuallyDial(peer)
// return c
// }
//
type dialsync struct {
// ongoing is a map of tickets for the current peers being dialed.
// this way, we dont kick off N dials simultaneously.
ongoing map[peer.ID]chan struct{}
lock sync.Mutex
}

// Lock governs the beginning of a dial attempt.
// If there are no ongoing dials, it returns true, and the client is now
// scheduled to dial. Every other goroutine that calls startDial -- with
//the same dst -- will block until client is done. The client MUST call
// ds.Unlock(p) when it is done, to unblock the other callers.
// The client is not reponsible for achieving a successful dial, only for
// reporting the end of the attempt (calling ds.Unlock(p)).
//
// see the example below `dialsync`
func (ds *dialsync) Lock(dst peer.ID) (bool, chan struct{}) {
ds.lock.Lock()
if ds.ongoing == nil { // init if not ready
ds.ongoing = make(map[peer.ID]chan struct{})
}
wait, found := ds.ongoing[dst]
if !found {
ds.ongoing[dst] = make(chan struct{})
}
ds.lock.Unlock()

if found {
return false, wait
}

// ok! you're signed up to dial!
return true, nil
}

// Unlock releases waiters to a dial attempt. see Lock.
// if Unlock(p) is called without calling Lock(p) first, Unlock panics.
func (ds *dialsync) Unlock(dst peer.ID) {
ds.lock.Lock()
wait, found := ds.ongoing[dst]
if !found {
panic("called dialDone with no ongoing dials to peer: " + dst.Pretty())
}
delete(ds.ongoing, dst) // remove ongoing dial
close(wait) // release everyone else
ds.lock.Unlock()
}

// Dial connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
// the connection will happen over. Swarm can use whichever it choses.
// This allows us to use various transport protocols, do NAT traversal/relay,
// etc. to achive connection.
func (s *Swarm) Dial(ctx context.Context, p peer.ID) (*Conn, error) {

if p == s.local {
return nil, errors.New("Attempted connection to self!")
}

// check if we already have an open connection first
cs := s.ConnectionsToPeer(p)
for _, c := range cs {
if c != nil { // dump out the first one we find
return c, nil
// this loop is here because dials take time, and we should not be dialing
// the same peer concurrently (silly waste). Additonally, it's structured
// to check s.ConnectionsToPeer(p) _first_, and _between_ attempts because we
// may have received an incoming connection! if so, we no longer must dial.
//
// During the dial attempts, we may be doing the dialing. if not, we wait.
var err error
var conn *Conn
for i := 0; i < dialAttempts; i++ {
// check if we already have an open connection first
cs := s.ConnectionsToPeer(p)
for _, conn = range cs {
if conn != nil { // dump out the first one we find. (TODO pick better)
return conn, nil
}
}

// check if there's an ongoing dial to this peer
if ok, wait := s.dsync.Lock(p); !ok {
select {
case <-wait: // wait for that dial to finish.
continue // and see if it worked (loop), OR we got an incoming dial.
case <-ctx.Done(): // or we may have to bail...
return nil, ctx.Err()
}
}

// ok, we have been charged to dial! let's do it.
conn, err = s.dial(ctx, p)
s.dsync.Unlock(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't immediately clear that the conn is added to the swarm's list of conns in the dial method, so I thought this lock might've been released too early.

However, I read through and saw that swarm add conn occurs in dialConnSetup.

(nested state changes are somewhat hard to follow. It's nice to have lots of pure code (and to isolate code containing the side-effects, and to keep the side-effects as close to the surface as possible))

but the code looks correct. LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't immediately clear that the conn is added to the swarm's list of conns in the dial method, so I thought this lock might've been released too early.

However, I read through and saw that swarm add conn occurs in dialConnSetup.

(nested state changes are somewhat hard to follow. It's nice to have lots of pure code (and to isolate code containing the side-effects, and to keep the side-effects as close to the surface as possible))

but the code looks correct. LGTM

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, i was annoyed by this, and other warts. it isn't easy to lift up, without carrying a lot of logic into this part, which I wanted to keep very light-- only focused on the sync attempts (turns out dialing is hard!). I added a note 8e888b1 which may clear it up for the reader.

if err != nil {
continue // ok, we failed. try again. (if loop is done, our error is output)
}
return conn, nil
}
return nil, err
}

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
if p == s.local {
return nil, errors.New("Attempted connection to self!")
}

sk := s.peers.PrivKey(s.local)
Expand Down