Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
reorg code; harden; godocs.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed May 7, 2019
1 parent dcd2e48 commit 8ef0c2f
Show file tree
Hide file tree
Showing 23 changed files with 865 additions and 656 deletions.
27 changes: 17 additions & 10 deletions dial/addr_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

// AddrFilterFn is a predicate that validates if a multiaddr should be attempted or not.
type AddrFilterFn = func(addr ma.Multiaddr) bool

// defaultFilters contains factory methods to generate default filters based on a Network.
var defaultFilters = struct {
preventSelfDial func(network inet.Network) AddrFilterFn
preventIPv6LinkLocalDial func(network inet.Network) AddrFilterFn
Expand All @@ -30,23 +32,28 @@ var defaultFilters = struct {
},
}

// DefaultAddrFilters returns the following address filters, recommended defaults:
//
// 1. preventSelfDial: aborts dial requests to ourselves.
// 2. preventIPv6LinkLocalDials: aborts dial requests to link local addresses.
func DefaultAddrFilters(network inet.Network) (res []AddrFilterFn) {
res = append(res,
defaultFilters.preventSelfDial(network),
defaultFilters.preventIPv6LinkLocalDial(network))
return res
}

type pstoreAddrResolver struct {
network inet.Network
filters []AddrFilterFn
}

var _ AddressResolver = (*pstoreAddrResolver)(nil)

func NewPeerstoreAddressResolver(network inet.Network, useDefaultFilters bool, filters ...AddrFilterFn) AddressResolver {
var fs []AddrFilterFn
if useDefaultFilters {
fs = append(fs,
defaultFilters.preventSelfDial(network),
defaultFilters.preventIPv6LinkLocalDial(network),
)
}
fs = append(fs, filters...)
return &pstoreAddrResolver{network: network, filters: fs}
// NewPeerstoreAddressResolver returns an AddressResolver that fetches known addresses from the peerstore,
// running no external discovery process.
func NewPeerstoreAddressResolver(network inet.Network, filters ...AddrFilterFn) AddressResolver {
return &pstoreAddrResolver{network: network, filters: filters}
}

func (par *pstoreAddrResolver) Resolve(req *Request) (known []ma.Multiaddr, more <-chan []ma.Multiaddr, err error) {
Expand Down
132 changes: 62 additions & 70 deletions dial/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,59 @@ import (
"github.com/libp2p/go-libp2p-peer"
)

// BackoffBase is the base amount of time to backoff (default: 5s).
var BackoffBase = time.Second * 5

// BackoffCoef is the backoff coefficient (default: 1s).
var BackoffCoef = time.Second

// BackoffMax is the maximum backoff time (default: 5m).
var BackoffMax = time.Minute * 5

// Backoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
// check dialbackoff. If it's there, they don't wait and exit promptly with
// an error. (the single goroutine that is actually dialing continues to
// dial). If a dial is successful, the peer is removed from backoff.
// Example:
//
// for {
// if ok, wait := dialsync.Lock(p); !ok {
// if backoff.Backoff(p) {
// return errDialFailed
// }
// <-wait
// continue
// }
// defer dialsync.Unlock(p)
// c, err := actuallyDial(p)
// if err != nil {
// dialbackoff.AddBackoff(p)
// continue
// }
// dialbackoff.Clear(p)
// }
//
var (
DefaultBackoffBase = time.Second * 5
DefaultBackoffCoef = time.Second
DefaultBackoffMax = time.Minute * 5
)

type BackoffConfig struct {
// BackoffBase is the base amount of time to backoff.
BackoffBase time.Duration
// BackoffCoef is the backoff coefficient.
BackoffCoef time.Duration
// BackoffMax is the maximum backoff time.
BackoffMax time.Duration
}

func DefaultBackoffConfig() *BackoffConfig {
return &BackoffConfig{
BackoffBase: DefaultBackoffBase,
BackoffCoef: DefaultBackoffCoef,
BackoffMax: DefaultBackoffMax,
}
}

type backoffPeer struct {
tries int
until time.Time
}

// DialBackoff is a type for tracking peer dial backoffs.
//
// * It's safe to use its zero value.
// * It's thread-safe.
// * It's *not* safe to move this type after using.
type Backoff struct {
entries map[peer.ID]*backoffPeer
config *BackoffConfig

lock sync.RWMutex
entries map[peer.ID]*backoffPeer
}

func NewBackoff() Preparer {
var _ Preparer = (*Backoff)(nil)

// NewBackoff creates a Preparer used to avoid over-dialing the same, dead peers.
//
// It acts like a circuit-breaker, tracking failed dial requests and denying subsequent
// attempts if they occur during the backoff period.
//
// The backoff period starts with config.BackoffBase. When the next dial attempt is made
// after the backoff expires, we use that dial as a probe. If it succeeds, we clear the
// backoff entry. If it fails, we boost the duration of the existing entry according to
// the following quadratic formula:
//
// BackoffBase + (BakoffCoef * PriorBackoffCount^2)
//
// Where PriorBackoffCount is the number of previous backoffs.
func NewBackoff(config *BackoffConfig) Preparer {
return &Backoff{
config: config,
entries: make(map[peer.ID]*backoffPeer),
}
}
Expand All @@ -71,23 +77,6 @@ func (b *Backoff) Prepare(req *Request) error {
return nil
}

func (b *Backoff) requestCallback(req *Request) {
if _, err := req.Result(); err != nil && err != context.Canceled {
req.Debugf("backing off")
b.AddBackoff(req.PeerID())
} else if err == nil {
req.Debugf("clearing backoffs")
b.ClearBackoff(req.PeerID())
}
}

type backoffPeer struct {
tries int
until time.Time
}

var _ Preparer = (*Backoff)(nil)

// Backoff returns whether the client should backoff from dialing peer p
func (b *Backoff) Backoff(p peer.ID) (backoff bool) {
b.lock.Lock()
Expand All @@ -102,13 +91,6 @@ func (b *Backoff) Backoff(p peer.ID) (backoff bool) {

// AddBackoff adds a new backoff entry for this peer, or boosts the backoff
// period if an entry already exists.
//
// Backoff is not exponential, it's quadratic and computed according to the
// following formula:
//
// BackoffBase + BakoffCoef * PriorBackoffs^2
//
// Where PriorBackoffs is the number of previous backoffs.
func (b *Backoff) AddBackoff(p peer.ID) {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -117,23 +99,33 @@ func (b *Backoff) AddBackoff(p peer.ID) {
if !ok {
b.entries[p] = &backoffPeer{
tries: 1,
until: time.Now().Add(BackoffBase),
until: time.Now().Add(b.config.BackoffBase),
}
return
}

backoffTime := BackoffBase + BackoffCoef*time.Duration(bp.tries*bp.tries)
if backoffTime > BackoffMax {
backoffTime = BackoffMax
backoffTime := b.config.BackoffBase + b.config.BackoffCoef*time.Duration(bp.tries*bp.tries)
if backoffTime > b.config.BackoffMax {
backoffTime = b.config.BackoffMax
}
bp.until = time.Now().Add(backoffTime)
bp.tries++
}

// Clear removes a backoff record. Clients should call this after a successful dial.
// Clear removes a backoff record.
func (b *Backoff) ClearBackoff(p peer.ID) {
b.lock.Lock()
defer b.lock.Unlock()

delete(b.entries, p)
}

func (b *Backoff) requestCallback(req *Request) {
if _, err := req.Result(); err != nil && err != context.Canceled {
req.Debugf("backing off")
b.AddBackoff(req.PeerID())
} else if err == nil {
req.Debugf("clearing backoffs")
b.ClearBackoff(req.PeerID())
}
}
93 changes: 91 additions & 2 deletions dial/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,94 @@
package dial

import logging "github.com/ipfs/go-log"
import (
"context"
"fmt"
"sync"
)

var log = logging.Logger("swarm/dialer")
// Status represents the status of a Request or a Job. It is a bit array where each possible
// status is represented by a bit, to enable efficient mask evaluation.
type Status uint32

const (
// StatusInflight indicates that a request or job is currently making progress.
StatusInflight Status = 1 << iota
// StatusBlock indicates that a request or job is currently blocked, possibly as a
// result of throttling or guarding.
StatusBlocked
// StatusCompleting indicates that a request or job has completed and its
// callbacks are currently firing.
StatusCompleting
// StatusComplete indicates that a request or job has fully completed.
StatusComplete
)

// internCallbackNames interns callback names, via the internedCallbackName function.
var internCallbackNames = make(map[string]string)

// Assert panics if the current status does not adhere to the specified bit mask.
func (s *Status) Assert(mask Status) {
if *s&mask == 0 {
// it may be worth decoding the mask to a friendlier format.
panic(fmt.Sprintf("illegal state %s; mask: %b", s, mask))
}
}

func (s Status) String() string {
switch s {
case StatusComplete:
return "Status(Complete)"
case StatusInflight:
return "Status(Inflight)"
case StatusCompleting:
return "Status(Completing)"
case StatusBlocked:
return "Status(Blocked)"
default:
return fmt.Sprintf("Status(%d)", s)
}
}

// internedCallbackName retrieves the interned string corresponding to this callback name.
func internedCallbackName(name string) string {
if n, ok := internCallbackNames[name]; ok {
return n
}
internCallbackNames[name] = name
return name
}

// contextHolder is a mixin that adds context tracking and mutation capabilities to another struct.
type contextHolder struct {
clk sync.RWMutex
ctx context.Context
cancels []context.CancelFunc
}

// UpdateContext updates the context and cancel functions atomically.
func (ch *contextHolder) UpdateContext(mutator func(orig context.Context) (context.Context, context.CancelFunc)) {
ch.clk.Lock()
defer ch.clk.Unlock()

ctx, cancel := mutator(ch.ctx)
ch.ctx = ctx
ch.cancels = append(ch.cancels, cancel)
}

// Context returns the context in a thread-safe manner.
func (ch *contextHolder) Context() context.Context {
ch.clk.RLock()
defer ch.clk.RUnlock()

return ch.ctx
}

// FireCancels invokes all cancel functions in the inverse order they were added.
func (ch *contextHolder) FireCancels() {
ch.clk.RLock()
defer ch.clk.RUnlock()

for i := len(ch.cancels) - 1; i >= 0; i-- {
ch.cancels[i]()
}
}
27 changes: 27 additions & 0 deletions dial/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Package dial contains the logic to establish outbound connections to other peers. In go lingo, that process is
// called "dialing".
//
// The central component of this package is the dial Pipeline. A Pipeline assembles a set of modular components,
// each with a clearly-delimited responsibility, into a dialing engine. Consumers can replace, delete, add components
// to customize the dialing engine's behavior.
//
// The Pipeline comprises five components. We provide brief descriptions below. For more detail, refer to the
// respective godocs of each interface:
//
// - Preparer: runs preparatory actions prior to the dial execution. Actions may include: deduplicating, populating
// timeouts, circuit breaking, validating the peer ID, etc.
// - AddressResolver: populates the set of addresses for a peer, either from the peerstore and/or from other sources,
// such as a discovery process.
// - Planner: schedules dials in time, responding to events from the environment, such as new addresses discovered,
// or dial jobs completing.
// - Throttler: throttles dials based on resource usage or other factors.
// - Executor: actually carries out the network dials.
//
// This package provides basic implementations of all five dialer components, as well as a default Pipeline suitable
// for simple host constructions. See the godocs on NewDefaultPipeline for details of the composition of the
// default Pipeline. Note that the user can customize the Pipeline using the methods in that struct.
//
// These five components deal with two main entities: the Request and the Job. A Request represents a caller request
// to dial a peer, identified by a peer ID. Along the execution of the Pipeline, the Request will translate into
// one or many dial Jobs, each of which targets a multiaddr where that peer ID is presumably listening.
package dial
Loading

0 comments on commit 8ef0c2f

Please sign in to comment.