Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

WIP Dialer v2: Pluggable and composable dialer #88

Closed
wants to merge 18 commits into from

Conversation

raulk
Copy link
Member

@raulk raulk commented Oct 23, 2018

Heavy work in progress. Early preview. Incubating. Unstable. Embryonic. And any other way to say this is early WIP. ;-)

Some tests pass, some fail. Code needs to be polished, docs are scarce, and the public API (for configuring the pipeline and its components) needs attention and feedback. However, I do feel good enough about this design and PoC to start soliciting feedback.

NOTE: This PR is comprehensive in terms of docs, because it aspires to become as a spec once this work is merged. Also to facilitate the communication of the design decisions I've taken.

Rationale

As libp2p moves forward as the networking stack of choice for decentralised applications, each use case may require a different behaviour from the dialer. Not only in terms of configuration parameters (e.g. throttling), but also in terms of behaviour (how addresses are selected, which addresses are dialed first, how successful connections are selected, etc.)

With the current dialer we are unable to seamlessly do the following without special-casing:

  • Filter addresses based on our NAT status.
  • Adjust dialing strategy if we are serving private/public networks.
  • Adjust throttling policy if we are a relay node, a DHT booster or some other kind of special node.
  • Resolve peer IDs into multiaddrs – that's why we had to create the RoutedHost abstraction. With a composable/pluggable dialer, we could inject a processor to resolve addresses from the DHT.

Design

There are five phases to dialing: PREPARE, PLAN, THROTTLE, EXECUTE, SELECT. Conceptual model of dialer v2:

  • Pipeline: this is the skeleton of the dialer, onto which components attach. It handles the process of dialing a peer as a sequence of actions, and returns the result as either a inet.Conn or an error.
  • (Dial) Request: a request to dial a peer ID.
  • (Dial) Job: an job to dial a single multiaddr, stemming from a Dial Request.
  • Components: logical pieces of the dialing logic. Some components deal with Requests and others deal with Jobs. There are different kinds of components.

Components

RequestPreparer component

Prepares a Request before it is planned. They can abort the request and return a result immediately.

Implementations include:

  • Validator: validates that the peer ID, that we're not dialing to ourselves, and we're not already connected to the peer. It can abort the request and return an error or an existing connection.
  • Backoff: returns an error if the peer is backed off.
  • Sync/dedup: deduplicates dial requests. If a dial request to a peer ID is underway, the new request will wait, and will return the same result as the first.
  • AddressResolver: fetches multiaddrs from the Peerstore, and applies filtering logic (configurable) to inject the set of addresses to dial in the Request.

Planner component

Dial planners take a Request (populated with multiaddrs) and emit dial Jobs on a channel (wired to the throttler) for the addresses they want dialed.

The pipeline will call the planner once at the beginning, to retrieve the initial set of dials to perform, as well as every time a dial job completes (providing a slice of all dials and the last dial), to give the planner a chance to react and emit new dial requests.

When the Planner is satisfied with the result (e.g. when a dial has succeeded), it closes the dial channel to signal no more work to do.

Currently we have a SingleBurstPlanner implementation, but this model is flexible enough to create anything we can imagine, e.g. prioritise certain addrs, group dials together, dial one by one until at least two dials succeed or we have no more dials, etc.

JobPreparer component

Prepares jobs emitted by the planner before they are dialed, e.g. by setting protocol-specific timeouts.

Throttler component

A goroutine that applies a throttling process to dial jobs requested by the Planner. Once spawned, it receives planned jobs via an input channel, and emits jobs to execute on an output channel. The throttler can apply any logic in-between: it may throttle jobs based on system resources, time, inflight dials, network conditions, fail rate, etc.

Currently, we have a default throttler that applies limits based on file descriptors and active dials per peer.

Executor component

A goroutine responsible for ultimately carrying out the network dial to an addr. Once spawned, it receives dial jobs (from the throttler). It must act like a dispatcher, in turn spawning individual
goroutines for each dial, or maintaining a finite set of child workers, to carry out the dials. The Executor must never block.

When a dial completes, it is sent to a channel where it is received by the pipeline, who informs the Planner of the result.

Selector component

Once the Executor has completed all dial jobs planned by the Planner, the Selector is called with a list of successful dials/connections. It can use any logic to decide which dial to keep, e.g. the one with the lowest latency, the first one, a random one, the one with the least relay hops, etc.

The Pipeline will take care of closing all unselected dials.

Tips for review

  1. Start at interfaces.go to get a good overview of the interfaces.
  2. Continue with pipeline.go to understand how the Pipeline works and how the pieces fit together.
  3. Continue with other files in the dial package.
  4. Finish with the (few) changes done on the top-level package to understand how the Pipeline is integrated in the Swarm.

TODO

  • Make old tests pass (fix regressions).
  • Write a lot of new tests.
  • Review concurrency edge cases (I'd love a second, third, etc. set of eyes to find possible gaps in the concurrency logic; the throttler and executor use buffered channels whose sizes are statically configured for now).
  • Review the Pipeline and components API => making configuring custom pipelines easy, as well as taking a pipeline blueprint (e.g. default pipeline) and deriving a modified version by replacing, adding or deleting components.
  • Interceptor component => called in-between other components to attach logging, tracing, performance measuring, etc. logic.
  • Support runtime mutations of the Pipeline. Support swapping components as the status of the host changes, e.g. from NAT Private to NAT Public.
  • Implement address filter and Planner to support AutoNAT/Autorelay. See WIP Dialer v2: Pluggable and composable dialer #88 (comment).
  • Allow for cancelling inflight dial jobs (e.g. from the Planner).

@vyzo
Copy link
Contributor

vyzo commented Oct 23, 2018

I don't think we should swap components based on NAT status, but rather have a single component that handles both NATed and unNATed case; the issue is deciding what to do when faced with a NAT on the dial target.

@vyzo
Copy link
Contributor

vyzo commented Oct 23, 2018

For completeness, this is the critical case we discussed during hack week:

  • when faced with a public/private address set, determine whether to dial the private addresses based on the public addr relative to our own public addr. If our public addr matches the target's public addr, then we should dial those private addrs; otherwise skip them.
  • when faced with multiple relay addrs, group them by relay (so that addrs for the same relay are tried together) and issue a delay before starting dials for different delays.

The new dialer must be able to handle this by default.

@raulk
Copy link
Member Author

raulk commented Oct 24, 2018

@vyzo thanks for writing those down. Implementing those behaviours would make a good test of the modularity principles of the design being introduced here. I'll add a point in the TODO checklist.

when faced with a public/private address set, determine whether to dial the private addresses based on the public addr relative to our own public addr. If our public addr matches the target's public addr, then we should dial those private addrs; otherwise skip them.

These would be a dynamic filter in the AddressResolver RequestPreparer. A dynamic filter has access to the Network (via the dial Request) and can use our own address in the filtering logic.

when faced with multiple relay addrs, group them by relay (so that addrs for the same relay are tried together) and issue a delay before starting dials for different delays.

This would be a Planner implementation ;-) The planner can emit an initial set of dial jobs (first relay group). It then gets called back with the result of the dials, at which point it can emit more dials or it can stop the planning if it's satisfied.

This is a reactive model, but if we really want a proactive one, the Planner can launch a goroutine to drip dial jobs based on a timer or delay schedule. Regardless, it'll get called back each time a dial completes.

@Stebalien
Copy link
Member

👍


By pipeline, I assume that all stages are actually run in parallel, right?

fetches multiaddrs from the Peerstore, and applies filtering logic (configurable) to inject the set of addresses to dial in the

Specifically this. We need to be able to both dial and find new addresses in parallel.

The Executor must never block.
...
or maintaining a finite set of child workers

These two are in contention. However, I agree that the executor shouldn't block (the throttler should handle limiting jobs).

When a dial completes, it is sent to a channel where it is received by the pipeline, who informs the Planner of the result.

How does the throttler learn about these results? I think we'll need a way to attach listeners/callbacks to individual dial jobs.

return false
}

// AddBackoff lets other nodes know that we've entered backoff with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets other nodes know? that doesnt seem right

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree it's confusing; copy-pasted from the original implementation though:

// AddBackoff lets other nodes know that we've entered backoff with

While we are here, @whyrusleeping... the Swarm currently exposes the Backoff via an accessor that's not part of the Network interface:

func (s *Swarm) Backoff() *DialBackoff {

I think it's there mostly for testing purposes, or do we want to allow other components to fiddle with the backoff logic? With it now being a just another RequestPreparer, I'm reluctant to exposing it like this.

dial/pipeline.go Outdated
id peer.ID

// Addresses to be populated by RequestPreparers.
addrs []ma.Multiaddr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to need to be able to feed addresses as we get them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did grapple with the idea of making this a chan or a slice, and settled on a slice because I thought the Planner needs to know all addresses before it can make sound planning decisions. However, that comes at a time cost: you can't start dialing suboptimal addresses while you fetch better ones in parallel.

I had originally thought about an AddressResolver component, with several implementations:

  • DefaultAddressResolver => fetches from the peerstore.
  • DHTAddressResolver => queries the DHT if there are no hits in the peerstore, or if they are stale.

I'll move back in that direction, as it provides a better encapsulation for the chan variant.

//
// When the planner is satisfied and has no more dials to request, it must signal so by closing
// the dialCh channel.
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

  1. Breaking this into two steps: Planner -> Plan.
  2. Letting the plan track the state instead of feeding it back in every time.
type Planner interface {
  Plan(req *Request) (Plan, error)
}
type Plan interface {
  Complete(j *Job) error // may need to pass in a connection?
  Next() (*Job, error) // or maybe a channel?
}

Not sure about the actual interface definitions, I just think breaking this up will help.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, good point!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seconding this, i think this general structure highlights the FLOW as well

// Start spawns the goroutine that is in charge of throttling. It receives planned jobs via inCh and emits
// jobs to execute on dialCh. The throttler can apply any logic in-between: it may throttle jobs based on
// system resources, time, inflight dials, network conditions, fail rate, etc.
Start(ctx context.Context, inCh <-chan *Job, dialCh chan<- *Job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to have:

Start(ctx context.Context, inCh <-chan *Job) <-chan *Job

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these are buffered channels, I preferred the pipeline to be responsible for them.

// the Selector picks the optimal successful connection to return to the consumer. The Pipeline takes care of
// closing unselected successful connections.
type Selector interface {
Select(successful dialJobs) (tpt.Conn, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, the planner needs to be able to determine when to finish anyways. I'd be surprised if it didn't have enough information to perform the selection as well. What's the motivation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Selector will be used in two cases:

  1. The Planner explicitly continues dialling upon a successful dial, with the expectation of finding a better connection.
  2. The Planner is a first-wins one, but it scheduled multiple dials in parallel, and more than 1 came back positive.

In (1), I do agree that the selection heuristic should be part of the Planner by design; if it's deliberately seeking a better something, it must know what that something is. With (2), I'm not so sure it's part of the Planner to resolve that conflict; we may have different selection strategies going forward: by transport, by latency (do a PING on all connections), by packet loss, by traceroute, etc.

dial/pipeline.go Outdated
}
}

func (j *Job) AddCallback(cb func()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

dial/pipeline.go Outdated

reqPreparers []RequestPreparer
planner Planner
throttler Throttler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd allow multiple of these. Throttler's can easily throttle a single job type, passing the rest on to the next throttler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... would there be a case to have multiple Planners as well? Rather than modelling the cardinality here, we could provide Composite* wrappers that from the outside behave the same, with their constructors accepting a number of Planners, Throttlers, etc.

We could even kill the n-cardinality on the {Request/Job}Preparers, in favour of composites. WDYT?

dial/pipeline.go Outdated
addConnFn AddConnFn
}

func (p *Pipeline) Component(name string, comp interface{}) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do this, or we could just have separate RegisterPlanner, etc... functions. What was the motivation?

One potential motivation is that this version allows us to use the same component multiple times (e.g., we can have a single component implement the preparer and the planner).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. This choice is simplistic and controversial on purpose to incite dialogue. I haven't landed on a pipeline configuration API I feel good about yet.

I'm leaning towards providing functions to manage {Request,Job}Preparers positionally, e.g. AddLast, AddFirst, AddAfter, AddBefore, Replace (hence name argument).

I also think the API should make it possible to change components at runtime (some components embed io.Closer to this avail.

dial/pipeline.go Outdated
}

func (p *Pipeline) Component(name string, comp interface{}) error {
switch comp.(type) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can use switch c := comp.(type) { and then use c to avoid the casts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I knew this was too clumsy.

dial/pipeline.go Outdated
pipeline := &Pipeline{
ctx: ctx,
net: net,
addConnFn: addConnFn,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this callback or is there some way we can completely separate this from the network? That is, it would be nice if the pipeline could be completely modular and return connections to the network instead of feeding them in like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially because this makes it hard for users to define custom pipelines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you're right. The pipeline could simply return a connection, and the Swarm could add it locally. I like that much better.


var _ RequestPreparer = (*validator)(nil)

func NewAddrResolver(staticFilters []func(ma.Multiaddr) bool, dynamicFilters []func(req *Request) func(ma.Multiaddr) bool) RequestPreparer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make some type definitions for the funcs? The declaration is alomst comical.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main culprit is type MultiaddrFilter func(ma.Multiaddr) bool would go a long way towards simplifying.

dial/backoff.go Outdated Show resolved Hide resolved
@jacobheun
Copy link

👍 on the overall design.

For failure states, it looks like if no connection can be created the pipeline will ultimately return an error. If multiple jobs are executed in the pipeline, and various points in the pipeline can result in a returned error, what information is the pipeline providing for the consuming application to make informed decisions? This is a request we've seen pretty frequently on the js side of things and have been working to improve the clarity of those errors.

@magik6k magik6k self-requested a review October 31, 2018 16:14
@upperwal
Copy link

upperwal commented Nov 9, 2018

Good work @raulk.

when faced with a public/private address set, determine whether to dial the private addresses based on the public addr relative to our own public addr. If our public addr matches the target's public addr, then we should dial those private addrs; otherwise skip them.

These would be a dynamic filter in the AddressResolver RequestPreparer. A dynamic filter has access to the Network (via the dial Request) and can use our own address in the filtering logic.

  1. Can you help me understand how dynamic filters can address this. You will need all multiaddrs to a peer to make the decision based on the topology. If I get one multiaddr at a time (at too in unknown order) I won't be able to choose whether to drop it or dial using it.

I think a better place would be the planner where you have access to all the multiaddrs of a peer.

var jobs dialJobs
for _, maddr := range req.addrs {
jobs = append(jobs, NewDialJob(req.ctx, req, maddr))
}

  1. DialJobs are running in parallel which is good but we can still have multiple connects to a single peer. I think your selector solves this problem by picking up one connection (first or based on some heuristics) but this could be a problem when scaling. Image a node trying to connect to thousands of nodes on startup. Dial requests will be 3 times of that (public, private and local).

We can maybe keep dialling to the same peer sequential (trying one address at a time and if it fails try the next one) and dialling to new peer concurrent (connection to new peer will yield a new goroutine).

  1. Selector is a good place to resolve issues related to public/private IP's. Let a peer connect on all available addresses. Return the connection which was established first along with the connected notification. If we receive a better connection (private or local) with the same peer (at some point in time). Silently swap the connection without letting the upper layer know that this is a new connection. A different notification may be raised by not the Connected notification.

@bigs
Copy link
Contributor

bigs commented Nov 9, 2018

ight here we go 🍽

Copy link
Contributor

@bigs bigs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a really great start. code is generally structured well and is infinitely more readable/grokable. excited to see this move fwd.

//
// A Preparer may cancel the dial preemptively in error or in success, by calling Complete() on the Request.
type Preparer interface {
Prepare(req *Request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to not have this return a new modified request for easier debugability? also where is Request defined? i can't find it...

edit github was glitchin

//
// When the planner is satisfied and has no more dials to request, it must signal so by closing
// the dialCh channel.
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seconding this, i think this general structure highlights the FLOW as well

dial/pipeline.go Outdated
return pipeline
}

func (p *Pipeline) Start(ctx context.Context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe close existing channels and cancel existing contexts if present

dial/pipeline.go Outdated
req := NewDialRequest(ctx, p.net, id)

// Prepare the dial.
if p.preparer.Prepare(req); req.IsComplete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

places like this are where i think copying the req would make code more clear

dial/pipeline.go Outdated
}

if len(req.addrs) == 0 {
return nil, errors.New("no addresses to dial")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's factor this error into a const so people can compare against it

dial/pipeline.go Outdated
PlanExecute:
for {
select {
case jobs, more := <-planCh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by convention may be worth changing more to ok? don't feel too strongly about it but it did trip me up

dial/planner.go Outdated
// ErrNoSuccessfulDials is returned by Select() when all dials failed.
var ErrNoSuccessfulDials = errors.New("no successful dials")

type singleBurstPlanner struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring

// Process any token releases to avoid those channels getting backed up.
select {
case id := <-t.peerTokenFreed:
waitlist := t.waitingOnPeerLimit[id]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we have a case where jobs waiting on the per-peer limit can then bypass the FD limits since we don't check here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't this is possible because when we get a new job, we check the peer limit first, then the FD limit (by virtue of the implicit ordering of the && here). If a job fails the peer limit, we queue until we get a slot. Then we check the FD limit before actually doing the dial.

}
continue ThrottleLoop

case <-t.fdTokenFreed:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, this can bypass per-peer limits

return nil
}

func (t *throttler) throttleFd(j *Job) (throttle bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we condense these two throttle funcs into one that also accepts an optional current state so it can transition as necessary

@Stebalien
Copy link
Member

@raulk is this blocked on review (trying to keep it out of PR review purgatory)?

@raulk
Copy link
Member Author

raulk commented Jan 10, 2019

@Stebalien, thanks for checking in. This PR is awaiting further changes from me. Will be done soon and will ping you all for another round of reviews. Sorry for the slowness; a few things jumped the queue.

@Stebalien
Copy link
Member

Take your time, I just wanted to make sure it wasn't stuck on something.

@raulk
Copy link
Member Author

raulk commented May 7, 2019

Deprecated in favour of #122; thanks a lot for your comments. I've considered them in the new iteration.

@raulk raulk closed this May 7, 2019
@ghost ghost removed the status/in-progress In progress label May 7, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants