Skip to content

Commit

Permalink
feat(connecteventmanager): block Connected() until accepted (#435)
Browse files Browse the repository at this point in the history
* feat(connecteventmanager): block Connected() until accepted

Ref: #432

Minimal attempt at solving #432

* fix(connecteventmanager): less complex channel signalling

* fix(connecteventmanager): handle change queue edge cases and closure

* fix(connecteventmanager): add test to confirm sync Connected() call flow

changelog: put the 435 fix in the right version

fix(connecteventmanager): clean up tests for new synchronous flow
  • Loading branch information
rvagg authored and gammazero committed May 17, 2024
1 parent 76648b7 commit 894484c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- Address a Bitswap findpeers / connect race condition that can prevent peer communication ([#435](https://github.com/ipfs/boxo/issues/435))

### Security

## [v0.19.0]
Expand Down
102 changes: 67 additions & 35 deletions bitswap/network/connecteventmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,25 @@ type connectEventManager struct {
cond sync.Cond
peers map[peer.ID]*peerState

changeQueue []peer.ID
changeQueue []change
stop bool
done chan struct{}
}

type change struct {
pid peer.ID
handled chan struct{}
}

type peerState struct {
newState, curState state
pending bool
}

type waitFn func()

func waitNoop() {}

func newConnectEventManager(connListeners ...ConnectionListener) *connectEventManager {
evtManager := &connectEventManager{
connListeners: connListeners,
Expand Down Expand Up @@ -66,7 +75,16 @@ func (c *connectEventManager) getState(p peer.ID) state {
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) {
func (c *connectEventManager) makeWaitFunc(handled chan struct{}) waitFn {
return func() {
select {
case <-handled:
case <-c.done:
}
}
}

func (c *connectEventManager) setState(p peer.ID, newState state) waitFn {
state, ok := c.peers[p]
if !ok {
state = new(peerState)
Expand All @@ -75,9 +93,20 @@ func (c *connectEventManager) setState(p peer.ID, newState state) {
state.newState = newState
if !state.pending && state.newState != state.curState {
state.pending = true
c.changeQueue = append(c.changeQueue, p)
change := change{p, make(chan struct{})}
c.changeQueue = append(c.changeQueue, change)
c.cond.Broadcast()
return c.makeWaitFunc(change.handled)
} else if state.pending {
// Find the change in the queue and return a wait function for it
for _, change := range c.changeQueue {
if change.pid == p {
return c.makeWaitFunc(change.handled)
}
}
log.Error("a peer was marked as change pending but not found in the change queue")
}
return waitNoop
}

// Waits for a change to be enqueued, or for the event manager to be stopped. Returns false if the
Expand All @@ -95,67 +124,70 @@ func (c *connectEventManager) worker() {
defer close(c.done)

for c.waitChange() {
pid := c.changeQueue[0]
c.changeQueue[0] = peer.ID("") // free the peer ID (slicing won't do that)
pch := c.changeQueue[0]
c.changeQueue[0] = change{} // free the resources (slicing won't do that)
c.changeQueue = c.changeQueue[1:]

state, ok := c.peers[pid]
state, ok := c.peers[pch.pid]
// If we've disconnected and forgotten, continue.
if !ok {
// This shouldn't be possible because _this_ thread is responsible for
// removing peers from this map, and we shouldn't get duplicate entries in
// the change queue.
log.Error("a change was enqueued for a peer we're not tracking")
close(pch.handled)
continue
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false

// Then, if there's nothing to do, continue.
if state.curState == state.newState {
continue
}

// Or record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
// Is there anything to do?
if state.curState != state.newState {
// Record the state update, then apply it.
oldState := state.curState
state.curState = state.newState

switch state.newState {
case stateDisconnected:
delete(c.peers, pch.pid)
fallthrough
case stateUnresponsive:
// Only trigger a disconnect event if the peer was responsive.
// We could be transitioning from unresponsive to disconnected.
if oldState == stateResponsive {
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerDisconnected(pid)
v.PeerConnected(pch.pid)
}
c.lk.Lock()
}
case stateResponsive:
c.lk.Unlock()
for _, v := range c.connListeners {
v.PeerConnected(pid)
}
c.lk.Lock()
}

// Record the fact that this "state" is no longer in the queue.
state.pending = false
// Signal that we've handled the state change
close(pch.handled)
}
}

// Called whenever we receive a new connection. May be called many times.
func (c *connectEventManager) Connected(p peer.ID) {
c.lk.Lock()
defer c.lk.Unlock()

// !responsive -> responsive

if c.getState(p) == stateResponsive {
c.lk.Unlock()
return
}
c.setState(p, stateResponsive)
wait := c.setState(p, stateResponsive)
c.lk.Unlock()
wait()
}

// Called when we drop the final connection to a peer.
Expand Down
69 changes: 59 additions & 10 deletions bitswap/network/connecteventmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type mockConnEvent struct {

type mockConnListener struct {
sync.Mutex
events []mockConnEvent
events []mockConnEvent
peerConnectedCb func(p peer.ID)
}

func newMockConnListener() *mockConnListener {
Expand All @@ -28,6 +29,9 @@ func (cl *mockConnListener) PeerConnected(p peer.ID) {
cl.Lock()
defer cl.Unlock()
cl.events = append(cl.events, mockConnEvent{connected: true, peer: p})
if cl.peerConnectedCb != nil {
cl.peerConnectedCb(p)
}
}

func (cl *mockConnListener) PeerDisconnected(p peer.ID) {
Expand Down Expand Up @@ -61,24 +65,21 @@ func TestConnectEventManagerConnectDisconnect(t *testing.T) {
connected: true,
})

// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)

// Block up the event loop.
connListener.Lock()
cem.Connected(peers[1])
expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[1],
connected: true,
})
require.Equal(t, expectedEvents, connListener.events)

// We don't expect this to show up.
cem.Disconnected(peers[0])
cem.Connected(peers[0])

connListener.Unlock()

expectedEvents = append(expectedEvents, mockConnEvent{
peer: peers[0],
connected: false,
})
// Flush the event queue.
wait(t, cem)
require.Equal(t, expectedEvents, connListener.events)
}
Expand Down Expand Up @@ -166,3 +167,51 @@ func TestConnectEventManagerDisconnectAfterMarkUnresponsive(t *testing.T) {
require.Empty(t, cem.peers) // all disconnected
require.Equal(t, expectedEvents, connListener.events)
}

func TestConnectEventManagerConnectFlowSynchronous(t *testing.T) {
connListener := newMockConnListener()
actionsCh := make(chan string)
connListener.peerConnectedCb = func(p peer.ID) {
actionsCh <- "PeerConnected:" + p.String()
time.Sleep(time.Millisecond * 50)
}

peers := testutil.GeneratePeers(2)
cem := newConnectEventManager(connListener)
cem.Start()
t.Cleanup(cem.Stop)

go func() {
actionsCh <- "Connected:" + peers[0].String()
cem.Connected(peers[0])
actionsCh <- "Done:" + peers[0].String()
actionsCh <- "Connected:" + peers[1].String()
cem.Connected(peers[1])
actionsCh <- "Done:" + peers[1].String()
close(actionsCh)
}()

// We expect Done to be sent _after_ PeerConnected, which demonstrates the
// call to Connected() blocks until PeerConnected() returns.
gotActions := make([]string, 0, 3)
for event := range actionsCh {
gotActions = append(gotActions, event)
}
expectedActions := []string{
"Connected:" + peers[0].String(),
"PeerConnected:" + peers[0].String(),
"Done:" + peers[0].String(),
"Connected:" + peers[1].String(),
"PeerConnected:" + peers[1].String(),
"Done:" + peers[1].String(),
}
require.Equal(t, expectedActions, gotActions)

// Flush the event queue.
wait(t, cem)
expectedEvents := []mockConnEvent{
{peer: peers[0], connected: true},
{peer: peers[1], connected: true},
}
require.Equal(t, expectedEvents, connListener.events)
}

0 comments on commit 894484c

Please sign in to comment.