Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

WIP Dialer v2: Pluggable and composable dialer #88

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions dial/addr_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dial

import (
addrutil "github.com/libp2p/go-addr-util"
ma "github.com/multiformats/go-multiaddr"
)

type AddrResolver struct {
sFilters []func(ma.Multiaddr) bool
dFilters []func(req *Request) func(ma.Multiaddr) bool
}

func DefaultStaticFilters() []func(ma.Multiaddr) bool {
return []func(ma.Multiaddr) bool{
addrutil.AddrOverNonLocalIP,
}
}

func DefaultDynamicFilters() []func(req *Request) func(ma.Multiaddr) bool {
excludeOurAddrs := func(req *Request) func(ma.Multiaddr) bool {
lisAddrs, _ := req.net.InterfaceListenAddresses()
var ourAddrs []ma.Multiaddr
for _, addr := range lisAddrs {
protos := addr.Protocols()
if len(protos) == 2 && (protos[0].Code == ma.P_IP4 || protos[0].Code == ma.P_IP6) {
// we're only sure about filtering out /ip4 and /ip6 addresses, so far
ourAddrs = append(ourAddrs, addr)
}
}
return addrutil.SubtractFilter(ourAddrs...)
}

return []func(req *Request) func(ma.Multiaddr) bool{
excludeOurAddrs,
}
}

type AddrFilterFactory func(req *Request) []func(ma.Multiaddr) bool

var _ RequestPreparer = (*validator)(nil)

func NewAddrResolver(staticFilters []func(ma.Multiaddr) bool, dynamicFilters []func(req *Request) func(ma.Multiaddr) bool) RequestPreparer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make some type definitions for the funcs? The declaration is alomst comical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main culprit is type MultiaddrFilter func(ma.Multiaddr) bool would go a long way towards simplifying.

return &AddrResolver{
sFilters: staticFilters,
dFilters: dynamicFilters,
}
}

func (m *AddrResolver) Prepare(req *Request) {
req.addrs = req.net.Peerstore().Addrs(req.id)
if len(req.addrs) == 0 {
return
}

// apply the static filters.
req.addrs = addrutil.FilterAddrs(req.addrs, m.sFilters...)
if len(m.dFilters) == 0 {
return
}

// apply the dynamic filters.
var dFilters = make([]func(multiaddr ma.Multiaddr) bool, 0, len(m.dFilters))
for _, df := range m.dFilters {
dFilters = append(dFilters, df(req))
}
req.addrs = addrutil.FilterAddrs(req.addrs, dFilters...)
}
147 changes: 147 additions & 0 deletions dial/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package dial

import (
"errors"
"sync"
"time"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("swarm")

// ErrDialBackoff is returned by the backoff code when a given peer has
// been dialed too frequently
var ErrDialBackoff = errors.New("dial backoff")

// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5

// Backoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
// for {
// if ok, wait := dialsync.Lock(p); !ok {
// if backoff.Backoff(p) {
// return errDialFailed
// }
// <-wait
// continue
// }
// defer dialsync.Unlock(p)
// c, err := actuallyDial(p)
// if err != nil {
// dialbackoff.AddBackoff(p)
// continue
// }
// dialbackoff.Clear(p)
// }
//

// DialBackoff is a type for tracking peer dial backoffs.
//
// * It's safe to use it's zero value.
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type Backoff struct {
entries map[peer.ID]*backoffPeer
lock sync.RWMutex
}

func NewBackoff() RequestPreparer {
return &Backoff{}
}

func (db *Backoff) Prepare(req *Request) {
req.AddCallback(func() {
if req.err != nil {
db.AddBackoff(req.id)
}
db.ClearBackoff(req.id)
raulk marked this conversation as resolved.
Show resolved Hide resolved
})

// if this peer has been backed off, complete the dial immediately
if !db.Backoff(req.id) {
return
}
log.Event(req.ctx, "swarmDialBackoff", req.id)
req.Complete(nil, ErrDialBackoff)
}

var _ RequestPreparer = (*Backoff)(nil)

type backoffPeer struct {
tries int
until time.Time
}

func (db *Backoff) init() {
if db.entries == nil {
db.entries = make(map[peer.ID]*backoffPeer)
}
}

// Backoff returns whether the client should backoff from dialing
// peer p
func (db *Backoff) Backoff(p peer.ID) (backoff bool) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, found := db.entries[p]
if found && time.Now().Before(bp.until) {
return true
}
return false
}

// AddBackoff lets other nodes know that we've entered backoff with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets other nodes know? that doesnt seem right

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's confusing; copy-pasted from the original implementation though:

// AddBackoff lets other nodes know that we've entered backoff with

While we are here, @whyrusleeping... the Swarm currently exposes the Backoff via an accessor that's not part of the Network interface:

func (s *Swarm) Backoff() *DialBackoff {

I think it's there mostly for testing purposes, or do we want to allow other components to fiddle with the backoff logic? With it now being a just another RequestPreparer, I'm reluctant to exposing it like this.

// peer p, so dialers should not wait unnecessarily. We still will
// attempt to dial with one goroutine, in case we get through.
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
// BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (db *Backoff) AddBackoff(p peer.ID) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
bp, ok := db.entries[p]
if !ok {
db.entries[p] = &backoffPeer{
tries: 1,
until: time.Now().Add(BackoffBase),
}
return
}

backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
if backoffTime > BackoffMax {
backoffTime = BackoffMax
}
bp.until = time.Now().Add(backoffTime)
bp.tries++
}

// Clear removes a backoff record. Clients should call this after a
// successful Dial.
func (db *Backoff) ClearBackoff(p peer.ID) {
db.lock.Lock()
defer db.lock.Unlock()
db.init()
delete(db.entries, p)
}
78 changes: 78 additions & 0 deletions dial/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package dial

import (
"context"
"errors"
"fmt"
)

// ErrNoTransport is returned when we don't know a transport for the
// given multiaddr.
var ErrNoTransport = errors.New("no transport for protocol")

type executor struct {
resolver TransportResolverFn

localCloseCh chan struct{}
}

var _ Executor = (*executor)(nil)

func NewExecutor(resolver TransportResolverFn) Executor {
return &executor{
resolver: resolver,
localCloseCh: make(chan struct{}),
}
}

func (e *executor) Start(ctx context.Context, dialCh <-chan *Job) {
for {
select {
case j := <-dialCh:
go e.processDial(j)
case <-ctx.Done():
return
case <-e.localCloseCh:
return
}
}
}

func (e *executor) Close() error {
close(e.localCloseCh)
return nil
}

func (e *executor) processDial(job *Job) {
defer func() {
job.completeCh <- job
}()

addr, id := job.addr, job.req.id

log.Debugf("%s swarm dialing %s %s", job.req.net.LocalPeer(), id, addr)

tpt := e.resolver(addr)
if tpt == nil {
job.err = ErrNoTransport
return
}

tconn, err := tpt.Dial(job.req.ctx, addr, id)
if err != nil {
err = fmt.Errorf("%s --> %s dial attempt failed: %s", job.req.net.LocalPeer(), id, job.err)
job.Complete(tconn, err)
return
}

// Trust the transport? Yeah... right.
if tconn.RemotePeer() != id {
tconn.Close()
err = fmt.Errorf("BUG in transport %T: tried to dial %s, dialed %s", id, job.tconn.RemotePeer(), tpt)
log.Error(err)
job.Complete(nil, err)
return
}

job.Complete(tconn, err)
}
78 changes: 78 additions & 0 deletions dial/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package dial

import (
"context"
"io"

"github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p-peer"
tpt "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
)

// A request preparer can perform operations on a dial Request before it is sent to the Planner.
// Examples include validation, de-duplication, back-off, etc.
//
// A RequestPreparer may cancel the dial preemptively in error or in success, by calling Complete() on the Request.
type RequestPreparer interface {
Prepare(req *Request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to not have this return a new modified request for easier debugability? also where is Request defined? i can't find it...

edit github was glitchin

}

// A job preparer can perform operations on a dial Job as soon as it is emitted from the Planner, and before
// it is sent to the Throttler. Examples include timeout setting, address rewriting, dial filtering, etc.
type JobPreparer interface {
Prepare(job *Job)
}

// Dial planners take a Request (populated with multiaddrs) and emit dial jobs on dialCh for the addresses
// they want dialed. The pipeline will call the planner once, as well as every time a dial job completes.
//
// For more information on the choreography, read the docs on Next().
type Planner interface {

// Next requests the planner to send a new dialJobs on dialCh, if appropriate.
//
// When planning starts, Next is invoked with a nil last parameter.
//
// Next is then subsequently invoked on every completed dial, providing a slice of dialed jobs and the
// last job to complete. With these two elements, in conjunction with any state that may be tracked, the Planner
// can take decisions about what to dial next, or to finish planning altogether.
//
// When the planner is satisfied and has no more dials to request, it must signal so by closing
// the dialCh channel.
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

  1. Breaking this into two steps: Planner -> Plan.
  2. Letting the plan track the state instead of feeding it back in every time.
type Planner interface {
  Plan(req *Request) (Plan, error)
}
type Plan interface {
  Complete(j *Job) error // may need to pass in a connection?
  Next() (*Job, error) // or maybe a channel?
}

Not sure about the actual interface definitions, I just think breaking this up will help.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, good point!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seconding this, i think this general structure highlights the FLOW as well

}

// A throttler is a goroutine that applies a throttling process to dial jobs requested by the Planner.
type Throttler interface {
io.Closer

// Start spawns the goroutine that is in charge of throttling. It receives planned jobs via inCh and emits
// jobs to execute on dialCh. The throttler can apply any logic in-between: it may throttle jobs based on
// system resources, time, inflight dials, network conditions, fail rate, etc.
Start(ctx context.Context, inCh <-chan *Job, dialCh chan<- *Job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to have:

Start(ctx context.Context, inCh <-chan *Job) <-chan *Job

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are buffered channels, I preferred the pipeline to be responsible for them.

}

// An executor is a goroutine responsible for ultimately carrying out the network dial to an addr.
type Executor interface {
io.Closer

// Start spawns the gorutine responsible for executing network dials. Jobs sent to dialCh have already
// been subjected to the throttler. Once a dial finishes, the Executor must send the completed job to
// completeCh, where it'll be received by the pipeline.
//
// In terms of concurrency, the Executor should behave like a dispatcher, in turn spawning individual
// goroutines, or maintaining a finite set of child workers, to carry out the dials.
// The Executor must never block.
Start(ctx context.Context, dialCh <-chan *Job)
}

// Once the Planner is satisfied with the result of the dials, and all inflight dials have finished executing,
// the Selector picks the optimal successful connection to return to the consumer. The Pipeline takes care of
// closing unselected successful connections.
type Selector interface {
Select(successful dialJobs) (tpt.Conn, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the planner needs to be able to determine when to finish anyways. I'd be surprised if it didn't have enough information to perform the selection as well. What's the motivation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Selector will be used in two cases:

  1. The Planner explicitly continues dialling upon a successful dial, with the expectation of finding a better connection.
  2. The Planner is a first-wins one, but it scheduled multiple dials in parallel, and more than 1 came back positive.

In (1), I do agree that the selection heuristic should be part of the Planner by design; if it's deliberately seeking a better something, it must know what that something is. With (2), I'm not so sure it's part of the Planner to resolve that conflict; we may have different selection strategies going forward: by transport, by latency (do a PING on all connections), by packet loss, by traceroute, etc.

}

type TransportResolverFn func(a ma.Multiaddr) tpt.Transport
type BestConnFn func(p peer.ID) inet.Conn
Loading