Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connecteventmanager): block Connected() until accepted #435

Merged
merged 4 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The following emojis are used to highlight certain changes:

- Removed mentions of unused ARC algorithm ([#336](https://github.com/ipfs/boxo/issues/366#issuecomment-1597253540))
- Handle `_redirects` file when `If-None-Match` header is present ([#412](https://github.com/ipfs/boxo/pull/412))
- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))

### Security

Expand Down
102 changes: 67 additions & 35 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ type connectEventManager struct {
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
changeQueue []change
stop bool
done chan struct{}
}

type change struct {
pid peer.ID
handled chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

type waitFn func()

func waitNoop() {}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
Expand Down Expand Up @@ -66,7 +75,16 @@ func (c *connectEventManager) getState(p peer.ID) state {
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn {
return func() {
select {
case <-handled:
case <-c.done:
}
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -75,9 +93,20 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
change := change{p, make(chan struct{})}
c.changeQueue = append(c.changeQueue, change)
c.cond.Broadcast()
return c.makeWaitFunc(change.handled)
} else if state.pending {
// Find the change in the queue and return a wait function for it
for _, change := range c.changeQueue {
if change.pid == p {
return c.makeWaitFunc(change.handled)
}
}
log.Error("a peer was marked as change pending but not found in the change queue")
}
return waitNoop
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
Expand All @@ -95,67 +124,70 @@ func (c *connectEventManager) worker() {
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
pch := c.changeQueue[0]
c.changeQueue[0] = change{} // free the resources (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
state, ok := c.peers[pch.pid]
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
close(pch.handled)
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
// Is there anything to do?
if state.curState != state.newState {
// Record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pch.pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pid)
v.PeerConnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false
// Signal that we've handled the state change
close(pch.handled)
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()
return
}
c.setState(p, stateResponsive)
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
wait()
}

// Called when we drop the final connection to a peer.
Expand Down
54 changes: 53 additions & 1 deletion bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type mockConnEvent struct {

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
events []mockConnEvent
peerConnectedCb func(p peer.ID)
}

func newMockConnListener() *mockConnListener {
Expand All @@ -29,6 +30,9 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
if cl.peerConnectedCb != nil {
cl.peerConnectedCb(p)
}
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
Expand Down Expand Up @@ -173,3 +177,51 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) {
connListener := newMockConnListener()
actionsCh := make(chan string)
connListener.peerConnectedCb = func(p peer.ID) {
actionsCh <- "PeerConnected:" + p.String()
time.Sleep(time.Millisecond * 50)
}

peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

go func() {
actionsCh <- "Connected:" + peers[0].String()
cem.Connected(peers[0])
actionsCh <- "Done:" + peers[0].String()
actionsCh <- "Connected:" + peers[1].String()
cem.Connected(peers[1])
actionsCh <- "Done:" + peers[1].String()
close(actionsCh)
}()

// We expect Done to be sent _after_ PeerConnected, which demonstrates the
// call to Connected() blocks until PeerConnected() returns.
gotActions := make([]string, 0, 3)
for event := range actionsCh {
gotActions = append(gotActions, event)
}
expectedActions := []string{
"Connected:" + peers[0].String(),
"PeerConnected:" + peers[0].String(),
"Done:" + peers[0].String(),
"Connected:" + peers[1].String(),
"PeerConnected:" + peers[1].String(),
"Done:" + peers[1].String(),
}
require.Equal(t, expectedActions, gotActions)

// Flush the event queue.
wait(t, cem)
expectedEvents := []mockConnEvent{
{peer: peers[0], connected: true},
{peer: peers[1], connected: true},
}
require.Equal(t, expectedEvents, connListener.events)
}