Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
document and improve pipeline API.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed May 6, 2019
1 parent 24be5f4 commit dcd2e48
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 60 deletions.
35 changes: 35 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package swarm

import (
"github.com/libp2p/go-libp2p-swarm/dial"
)

type Option func(*Swarm) error

// WithDialPipeline injects a custom dial pipeline in this swarm.
//
// To modify behaviour from the default pipeline, use NewDefaultPipeline and the accessor methods on *dial.Pipeline:
//
// factory := func(s *swarm.Swarm) *dial.Pipeline {
// pipeline := s.NewDefaultPipeline()
//
// var planner dial.Planner
// pipeline.SetPlanner(planner)
//
// prep := pipeline.Preparer()
// seq := prep.(*dial.PreparerSeq)
//
// var newBackoff Preparer
// // replace the backokff preparer.
// seq.Replace("backoff", newBackoff)
//
// return pipeline
// }
//
// s := swarm.NewSwarm(ctx, pid, peerstore, swarm.WithDialPipeline(factory))
func WithDialPipeline(factory func(swarm *Swarm) *dial.Pipeline) Option {
return func(s *Swarm) error {
s.pipeline = factory(s)
return nil
}
}
62 changes: 7 additions & 55 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-addr-util"
metrics "github.com/libp2p/go-libp2p-metrics"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-swarm/dial"
transport "github.com/libp2p/go-libp2p-transport"
filter "github.com/libp2p/go-maddr-filter"
ma "github.com/multiformats/go-multiaddr"
mafilter "github.com/whyrusleeping/multiaddr-filter"
"golang.org/x/xerrors"
)

var (
Expand Down Expand Up @@ -100,10 +99,8 @@ type Swarm struct {
pipeline *dial.Pipeline
}

type SwarmOption func(*Swarm) error

// NewSwarm constructs a Swarm
func NewSwarm(ctx context.Context, local peer.ID, peers pstore.Peerstore, bwc metrics.Reporter, opts ...SwarmOption) *Swarm {
func NewSwarm(ctx context.Context, local peer.ID, peers pstore.Peerstore, bwc metrics.Reporter, opts ...Option) *Swarm {
s := &Swarm{
local: local,
peers: peers,
Expand All @@ -119,59 +116,21 @@ func NewSwarm(ctx context.Context, local peer.ID, peers pstore.Peerstore, bwc me
s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown)
s.ctx = goprocessctx.OnClosingContext(s.proc)

// TODO: provide Options to customize the pipeline
for _, opt := range opts {
opt(s)
if err := opt(s); err != nil {
panic(xerrors.Errorf("error while applying swarm options: %w", err))
}
}

if s.pipeline == nil {
s.pipeline = s.defaultPipeline()
// apply the default pipeline if a custom one was not provided.
s.pipeline = s.NewDefaultPipeline()
}

s.pipeline.Start()
return s
}

func (s *Swarm) defaultPipeline() *dial.Pipeline {
p := dial.NewPipeline(s.ctx, s, func(tc transport.Conn) (conn inet.Conn, e error) {
return s.addConn(tc, inet.DirOutbound)
})

// preparers.
seq := new(dial.PreparerSeq)
seq.AddLast("validator", dial.NewValidator(s.LocalPeer()))
seq.AddLast("request_timeout", dial.NewRequestTimeout())
seq.AddLast("dedup", dial.NewDedup())
seq.AddLast("backoff", dial.NewBackoff())
p.SetPreparer(seq)

// dial address filters.
var filters []dial.AddrFilterFn

// do we have a transport for dialing this address?
filters = append(filters, func(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return t != nil && t.CanDial(addr)
})

// is the address blocked?
filters = append(filters, (dial.AddrFilterFn)(addrutil.FilterNeg(s.Filters.AddrBlocked)))

// address resolver.
p.SetAddressResolver(dial.NewPeerstoreAddressResolver(s, true, filters...))

// throttler.
p.SetThrottler(dial.NewDefaultThrottler())

// planner.
p.SetPlanner(dial.NewImmediatePlanner())

// executor.
p.SetExecutor(dial.NewExecutor(s.TransportForDialing, dial.SetJobTimeout))

return p
}

// DialPeer connects to a peer.
//
// The idea is that the client of Swarm does not need to know what network
Expand All @@ -187,13 +146,6 @@ func (s *Swarm) DialPeer(ctx context.Context, p peer.ID) (inet.Conn, error) {
return s.pipeline.Dial(ctx, p)
}

func WithPipeline(pipeline *dial.Pipeline) SwarmOption {
return func(s *Swarm) error {
s.pipeline = pipeline
return nil
}
}

func (s *Swarm) teardown() error {
if err := s.pipeline.Close(); err != nil {
log.Errorf("error when shutting down the swarm dial pipeline: %s", err)
Expand Down
50 changes: 50 additions & 0 deletions swarm_pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package swarm

import (
addrutil "github.com/libp2p/go-addr-util"
inet "github.com/libp2p/go-libp2p-net"
dial "github.com/libp2p/go-libp2p-swarm/dial"
transport "github.com/libp2p/go-libp2p-transport"

ma "github.com/multiformats/go-multiaddr"
)

func (s *Swarm) NewDefaultPipeline() *dial.Pipeline {
p := dial.NewPipeline(s.ctx, s, func(tc transport.Conn) (conn inet.Conn, e error) {
return s.addConn(tc, inet.DirOutbound)
})

// preparers.
seq := new(dial.PreparerSeq)
seq.AddLast("validator", dial.NewValidator(s.LocalPeer()))
seq.AddLast("timeout", dial.NewRequestTimeout())
seq.AddLast("dedup", dial.NewDedup())
seq.AddLast("backoff", dial.NewBackoff())
p.SetPreparer(seq)

// dial address filters.
var filters []dial.AddrFilterFn

// do we have a transport for dialing this address?
filters = append(filters, func(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return t != nil && t.CanDial(addr)
})

// is the address blocked?
filters = append(filters, (dial.AddrFilterFn)(addrutil.FilterNeg(s.Filters.AddrBlocked)))

// address resolver.
p.SetAddressResolver(dial.NewPeerstoreAddressResolver(s, true, filters...))

// throttler.
p.SetThrottler(dial.NewDefaultThrottler())

// planner.
p.SetPlanner(dial.NewImmediatePlanner())

// executor.
p.SetExecutor(dial.NewExecutor(s.TransportForDialing, dial.SetJobTimeout))

return p
}
10 changes: 5 additions & 5 deletions swarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
ma "github.com/multiformats/go-multiaddr"

. "github.com/libp2p/go-libp2p-swarm"
. "github.com/libp2p/go-libp2p-swarm/testing"
stesting "github.com/libp2p/go-libp2p-swarm/testing"
)

var log = logging.Logger("swarm_test")
Expand Down Expand Up @@ -54,17 +54,17 @@ func EchoStreamHandler(stream inet.Stream) {
}

func makeDialOnlySwarm(ctx context.Context, t *testing.T) *Swarm {
swarm := GenSwarm(t, ctx, OptDialOnly)
swarm := stesting.GenSwarm(t, ctx, stesting.OptDialOnly)
swarm.SetStreamHandler(EchoStreamHandler)

return swarm
}

func makeSwarms(ctx context.Context, t *testing.T, num int, opts ...Option) []*Swarm {
func makeSwarms(ctx context.Context, t *testing.T, num int, opts ...stesting.Option) []*Swarm {
swarms := make([]*Swarm, 0, num)

for i := 0; i < num; i++ {
swarm := GenSwarm(t, ctx, opts...)
swarm := stesting.GenSwarm(t, ctx, opts...)
swarm.SetStreamHandler(EchoStreamHandler)
swarms = append(swarms, swarm)
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func SubtestSwarm(t *testing.T, SwarmNum int, MsgNum int) {
// t.Skip("skipping for another test")

ctx := context.Background()
swarms := makeSwarms(ctx, t, SwarmNum, OptDisableReuseport)
swarms := makeSwarms(ctx, t, SwarmNum, stesting.OptDisableReuseport)

// connect everyone
connectSwarms(t, ctx, swarms)
Expand Down

0 comments on commit dcd2e48

Please sign in to comment.