Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
remove unused context in Swarm.dialWorkerLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Aug 23, 2021
1 parent fa91592 commit 137a238
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 71 deletions.
5 changes: 2 additions & 3 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// DialWorerFunc is used by DialSync to spawn a new dial worker
type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) error
type dialWorkerFunc func(peer.ID, <-chan dialRequest) error

// newDialSync constructs a new DialSync
func newDialSync(worker dialWorkerFunc) *DialSync {
Expand Down Expand Up @@ -94,8 +94,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) (*activeDial, error) {
ds: ds,
}

err := ds.dialWorker(adctx, p, actd.reqch)
if err != nil {
if err := ds.dialWorker(p, actd.reqch); err != nil {
cancel()
return nil, err
}
Expand Down
94 changes: 29 additions & 65 deletions dial_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,37 @@ func getMockDialFunc() (dialWorkerFunc, func(), context.Context, <-chan struct{}
dfcalls := make(chan struct{}, 512) // buffer it large enough that we won't care
dialctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
defer cancel()
dfcalls <- struct{}{}
go func() {
defer cancel()
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

select {
case <-ch:
req.resch <- dialResponse{conn: new(Conn)}
case <-ctx.Done():
req.resch <- dialResponse{err: ctx.Err()}
return
}
case <-ctx.Done():
return
}
for req := range reqch {
<-ch
req.resch <- dialResponse{conn: new(Conn)}
}
}()
return nil
}

o := new(sync.Once)

return f, func() { o.Do(func() { close(ch) }) }, dialctx, dfcalls
var once sync.Once
return f, func() { once.Do(func() { close(ch) }) }, dialctx, dfcalls
}

func TestBasicDialSync(t *testing.T) {
df, done, _, callsch := getMockDialFunc()

dsync := newDialSync(df)

p := peer.ID("testpeer")

ctx := context.Background()

finished := make(chan struct{})
finished := make(chan struct{}, 2)
go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
}()

go func() {
_, err := dsync.DialLock(ctx, p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Error(err)
}
finished <- struct{}{}
Expand Down Expand Up @@ -139,30 +118,26 @@ func TestDialSyncAllCancel(t *testing.T) {
df, done, dctx, _ := getMockDialFunc()

dsync := newDialSync(df)

p := peer.ID("testpeer")

ctx1, cancel1 := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())

finished := make(chan struct{})
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

// Add a second dialwait in so two actors are waiting on the same dial
go func() {
_, err := dsync.DialLock(ctx1, p)
if err != ctx1.Err() {
if _, err := dsync.DialLock(ctx, p); err != ctx.Err() {
t.Error("should have gotten context error")
}
finished <- struct{}{}
}()

cancel1()
cancel()
for i := 0; i < 2; i++ {
select {
case <-finished:
Expand All @@ -180,33 +155,27 @@ func TestDialSyncAllCancel(t *testing.T) {

// should be able to successfully dial that peer again
done()
_, err := dsync.DialLock(context.Background(), p)
if err != nil {
if _, err := dsync.DialLock(context.Background(), p); err != nil {
t.Fatal(err)
}
}

func TestFailFirst(t *testing.T) {
var count int32
f := func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
f := func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)

case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}

if atomic.LoadInt32(&count) > 0 {
req.resch <- dialResponse{conn: new(Conn)}
} else {
req.resch <- dialResponse{err: fmt.Errorf("gophers ate the modem")}
}
atomic.AddInt32(&count, 1)
}
}()
return nil
Expand Down Expand Up @@ -235,19 +204,14 @@ func TestFailFirst(t *testing.T) {
}

func TestStressActiveDial(t *testing.T) {
ds := newDialSync(func(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
ds := newDialSync(func(p peer.ID, reqch <-chan dialRequest) error {
go func() {
for {
select {
case req, ok := <-reqch:
if !ok {
return
}

req.resch <- dialResponse{}
case <-ctx.Done():
req, ok := <-reqch
if !ok {
return
}
req.resch <- dialResponse{}
}
}()
return nil
Expand Down
6 changes: 3 additions & 3 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,16 @@ type dialResponse struct {
}

// startDialWorker starts an active dial goroutine that synchronizes and executes concurrent dials
func (s *Swarm) startDialWorker(ctx context.Context, p peer.ID, reqch <-chan dialRequest) error {
func (s *Swarm) startDialWorker(p peer.ID, reqch <-chan dialRequest) error {
if p == s.local {
return ErrDialToSelf
}

go s.dialWorkerLoop(ctx, p, reqch)
go s.dialWorkerLoop(p, reqch)
return nil
}

func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
defer s.limiter.clearAllPeerDials(p)

type pendRequest struct {
Expand Down

0 comments on commit 137a238

Please sign in to comment.