Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: remove unnecessary reqno for pending request tracking #2460

Merged
merged 3 commits into from
Aug 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 25 additions & 49 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type addrDial struct {
conn *Conn
// err is the err on dialing the address
err error
// requests is the list of pendRequests interested in this dial
// the value in the slice is the request number assigned to this request by the dialWorker
requests []int
// dialed indicates whether we have triggered the dial to the address
dialed bool
// createdAt is the time this struct was created
Expand All @@ -79,13 +76,9 @@ type dialWorker struct {
peer peer.ID
// reqch is used to send dial requests to the worker. close reqch to end the worker loop
reqch <-chan dialRequest
// reqno is the request number used to track different dialRequests for a peer.
// Each incoming request is assigned a reqno. This reqno is used in pendingRequests and in
// addrDial objects in trackedDials to track this request
reqno int
// pendingRequests maps reqno to the pendRequest object for a dialRequest
pendingRequests map[int]*pendRequest
// trackedDials tracks dials to the peers addresses. An entry here is used to ensure that
// pendingRequests is the set of pendingRequests
pendingRequests map[*pendRequest]struct{}
// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that
// we dial an address at most once
trackedDials map[string]*addrDial
// resch is used to receive response for dials to the peers addresses.
Expand All @@ -106,7 +99,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dia
s: s,
peer: p,
reqch: reqch,
pendingRequests: make(map[int]*pendRequest),
pendingRequests: make(map[*pendRequest]struct{}),
trackedDials: make(map[string]*addrDial),
resch: make(chan dialResult),
cl: cl,
Expand Down Expand Up @@ -237,10 +230,8 @@ loop:
continue loop
}

// The request has some pending or new dials. We assign this request a request number.
// This value of w.reqno is used to track this request in all the structures
w.reqno++
w.pendingRequests[w.reqno] = pr
// The request has some pending or new dials
w.pendingRequests[pr] = struct{}{}

for _, ad := range tojoin {
if !ad.dialed {
Expand All @@ -258,7 +249,6 @@ loop:
}
}
// add the request to the addrDial
ad.requests = append(ad.requests, w.reqno)
}

if len(todial) > 0 {
Expand All @@ -268,7 +258,6 @@ loop:
w.trackedDials[string(a.Bytes())] = &addrDial{
addr: a,
ctx: req.ctx,
requests: []int{w.reqno},
createdAt: now,
}
dq.Add(network.AddrDelay{Addr: a, Delay: addrDelay[string(a.Bytes())]})
Expand Down Expand Up @@ -333,20 +322,14 @@ loop:
continue loop
}

// request succeeded, respond to all pending requests
for _, reqno := range ad.requests {
pr, ok := w.pendingRequests[reqno]
if !ok {
// some other dial for this request succeeded before this one
continue
for pr := range w.pendingRequests {
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
pr.req.resch <- dialResponse{conn: conn}
delete(w.pendingRequests, pr)
}
pr.req.resch <- dialResponse{conn: conn}
delete(w.pendingRequests, reqno)
}

ad.conn = conn
ad.requests = nil

if !w.connected {
w.connected = true
if w.s.metricsTracer != nil {
Expand Down Expand Up @@ -380,33 +363,26 @@ loop:
// dispatches an error to a specific addr dial
func (w *dialWorker) dispatchError(ad *addrDial, err error) {
ad.err = err
for _, reqno := range ad.requests {
pr, ok := w.pendingRequests[reqno]
if !ok {
// some other dial for this request succeeded before this one
continue
}

for pr := range w.pendingRequests {
// accumulate the error
pr.err.recordErr(ad.addr, err)

delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
// a simultaneous dial that started later and added new acceptable addrs
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
if c != nil {
pr.req.resch <- dialResponse{conn: c}
} else {
pr.req.resch <- dialResponse{err: pr.err}
if _, ok := pr.addrs[string(ad.addr.Bytes())]; ok {
pr.err.recordErr(ad.addr, err)
delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
// a simultaneous dial that started later and added new acceptable addrs
c, _ := w.s.bestAcceptableConnToPeer(pr.req.ctx, w.peer)
if c != nil {
pr.req.resch <- dialResponse{conn: c}
} else {
pr.req.resch <- dialResponse{err: pr.err}
}
delete(w.pendingRequests, pr)
}
delete(w.pendingRequests, reqno)
}
}

ad.requests = nil

// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
// this is necessary to support active listen scenarios, where a new dial comes in while
// another dial is in progress, and needs to do a direct connection without inhibitions from
Expand Down