Skip to content

Commit

Permalink
Merge Tests cleanup work (#92)
Browse files Browse the repository at this point in the history
* cleanup of restart PR
  • Loading branch information
aarshkshah1992 committed Sep 30, 2020
1 parent 1e77fff commit 8241f6b
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 1 deletion.
4 changes: 4 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (c *Channels) Restart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Restart)
}

func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataSent, delta, cid)
}
Expand Down
21 changes: 21 additions & 0 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ var ChannelEvents = fsm.Events{
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
From(datatransfer.Completing).To(datatransfer.Completed),

// will kickoff state handlers for channels that were cleaning up
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(),
}

// ChannelStateEntryFuncs are handlers called as we enter different states
Expand All @@ -121,6 +124,13 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
return ctx.Trigger(datatransfer.CleanupComplete)
}

// CleanupStates are the penultimate states for a channel
var CleanupStates = []fsm.StateKey{
datatransfer.Cancelling,
datatransfer.Completing,
datatransfer.Failing,
}

// ChannelFinalityStates are the final states for a channel
var ChannelFinalityStates = []fsm.StateKey{
datatransfer.Cancelled,
Expand All @@ -138,3 +148,14 @@ func IsChannelTerminated(st datatransfer.Status) bool {

return false
}

// IsChannelCleaningUp returns true if channel was being cleaned up and finished
func IsChannelCleaningUp(st datatransfer.Status) bool {
for _, s := range CleanupStates {
if s == st {
return true
}
}

return false
}
26 changes: 26 additions & 0 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,25 @@ func TestChannels(t *testing.T) {
state = checkEvent(ctx, t, received, datatransfer.Disconnected)
require.Equal(t, datatransfer.PeerDisconnected, state.Status())
})

t.Run("test self peer and other peer", func(t *testing.T) {
peers := testutil.GeneratePeers(3)
// sender is self peer
chid, err := channelList.CreateNew(peers[1], tid1, cids[0], selector, fv1, peers[1], peers[1], peers[2])
require.NoError(t, err)
ch, err := channelList.GetByID(context.Background(), chid)
require.NoError(t, err)
require.Equal(t, peers[1], ch.SelfPeer())
require.Equal(t, peers[2], ch.OtherPeer())

// recipient is self peer
chid, err = channelList.CreateNew(peers[2], datatransfer.TransferID(1001), cids[0], selector, fv1, peers[1], peers[2], peers[1])
require.NoError(t, err)
ch, err = channelList.GetByID(context.Background(), chid)
require.NoError(t, err)
require.Equal(t, peers[2], ch.SelfPeer())
require.Equal(t, peers[1], ch.OtherPeer())
})
}

func TestIsChannelTerminated(t *testing.T) {
Expand All @@ -312,6 +331,13 @@ func TestIsChannelTerminated(t *testing.T) {
require.False(t, channels.IsChannelTerminated(datatransfer.Ongoing))
}

func TestIsChannelCleaningUp(t *testing.T) {
require.True(t, channels.IsChannelCleaningUp(datatransfer.Cancelling))
require.True(t, channels.IsChannelCleaningUp(datatransfer.Failing))
require.True(t, channels.IsChannelCleaningUp(datatransfer.Completing))
require.False(t, channels.IsChannelCleaningUp(datatransfer.Cancelled))
}

type event struct {
event datatransfer.Event
state datatransfer.ChannelState
Expand Down
5 changes: 5 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ const (

// Complete is emitted when a data transfer is complete
Complete

// CompleteCleanupOnRestart is emitted when a data transfer channel is restarted to signal
// that channels that were cleaning up should finish cleanup
CompleteCleanupOnRestart
)

// Events are human readable names for data transfer events
Expand All @@ -88,6 +92,7 @@ var Events = map[EventCode]string{
ResponderCompletes: "ResponderCompletes",
BeginFinalizing: "BeginFinalizing",
Complete: "Complete",
CompleteCleanupOnRestart: "CompleteCleanupOnRestart",
}

// Event is a struct containing information about a data transfer event
Expand Down
5 changes: 5 additions & 0 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatrans
return nil
}

// if channel is is cleanup state, finish it
if channels.IsChannelCleaningUp(channel.Status()) {
return m.channels.CompleteCleanupOnRestart(channel.ChannelID())
}

// initiate restart
chType := m.channelDataTransferType(channel)
switch chType {
Expand Down
4 changes: 3 additions & 1 deletion impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestRestartPush(t *testing.T) {
// Connect the peers and restart
require.NoError(t, rh.gsData.Mn.LinkAll())
// let linking take effect
time.Sleep(500 * time.Millisecond)
time.Sleep(1 * time.Second)
conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2)
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down Expand Up @@ -374,6 +374,8 @@ func TestRestartPull(t *testing.T) {
require.NoError(t, rh.gsData.Mn.LinkAll())
// let linking take effect
time.Sleep(500 * time.Millisecond)
// let linking take effect
time.Sleep(500 * time.Millisecond)
conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2)
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down

0 comments on commit 8241f6b

Please sign in to comment.