From 8241f6b53d9dd9b7d3f44f3fcea78aa4ec826d30 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 30 Sep 2020 10:46:22 +0530 Subject: [PATCH] Merge Tests cleanup work (#92) * cleanup of restart PR --- channels/channels.go | 4 ++++ channels/channels_fsm.go | 21 +++++++++++++++++++++ channels/channels_test.go | 26 ++++++++++++++++++++++++++ events.go | 5 +++++ impl/impl.go | 5 +++++ impl/restart_integration_test.go | 4 +++- 6 files changed, 64 insertions(+), 1 deletion(-) diff --git a/channels/channels.go b/channels/channels.go index 6adadfdc..8ddbb3ff 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -168,6 +168,10 @@ func (c *Channels) Restart(chid datatransfer.ChannelID) error { return c.send(chid, datatransfer.Restart) } +func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error { + return c.send(chid, datatransfer.CompleteCleanupOnRestart) +} + func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error { return c.send(chid, datatransfer.DataSent, delta, cid) } diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index edc202d1..e29cecf4 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -101,6 +101,9 @@ var ChannelEvents = fsm.Events{ From(datatransfer.Cancelling).To(datatransfer.Cancelled). From(datatransfer.Failing).To(datatransfer.Failed). From(datatransfer.Completing).To(datatransfer.Completed), + + // will kickoff state handlers for channels that were cleaning up + fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(), } // ChannelStateEntryFuncs are handlers called as we enter different states @@ -121,6 +124,13 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal return ctx.Trigger(datatransfer.CleanupComplete) } +// CleanupStates are the penultimate states for a channel +var CleanupStates = []fsm.StateKey{ + datatransfer.Cancelling, + datatransfer.Completing, + datatransfer.Failing, +} + // ChannelFinalityStates are the final states for a channel var ChannelFinalityStates = []fsm.StateKey{ datatransfer.Cancelled, @@ -138,3 +148,14 @@ func IsChannelTerminated(st datatransfer.Status) bool { return false } + +// IsChannelCleaningUp returns true if channel was being cleaned up and finished +func IsChannelCleaningUp(st datatransfer.Status) bool { + for _, s := range CleanupStates { + if s == st { + return true + } + } + + return false +} diff --git a/channels/channels_test.go b/channels/channels_test.go index aebc1b82..32d22e79 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -304,6 +304,25 @@ func TestChannels(t *testing.T) { state = checkEvent(ctx, t, received, datatransfer.Disconnected) require.Equal(t, datatransfer.PeerDisconnected, state.Status()) }) + + t.Run("test self peer and other peer", func(t *testing.T) { + peers := testutil.GeneratePeers(3) + // sender is self peer + chid, err := channelList.CreateNew(peers[1], tid1, cids[0], selector, fv1, peers[1], peers[1], peers[2]) + require.NoError(t, err) + ch, err := channelList.GetByID(context.Background(), chid) + require.NoError(t, err) + require.Equal(t, peers[1], ch.SelfPeer()) + require.Equal(t, peers[2], ch.OtherPeer()) + + // recipient is self peer + chid, err = channelList.CreateNew(peers[2], datatransfer.TransferID(1001), cids[0], selector, fv1, peers[1], peers[2], peers[1]) + require.NoError(t, err) + ch, err = channelList.GetByID(context.Background(), chid) + require.NoError(t, err) + require.Equal(t, peers[2], ch.SelfPeer()) + require.Equal(t, peers[1], ch.OtherPeer()) + }) } func TestIsChannelTerminated(t *testing.T) { @@ -312,6 +331,13 @@ func TestIsChannelTerminated(t *testing.T) { require.False(t, channels.IsChannelTerminated(datatransfer.Ongoing)) } +func TestIsChannelCleaningUp(t *testing.T) { + require.True(t, channels.IsChannelCleaningUp(datatransfer.Cancelling)) + require.True(t, channels.IsChannelCleaningUp(datatransfer.Failing)) + require.True(t, channels.IsChannelCleaningUp(datatransfer.Completing)) + require.False(t, channels.IsChannelCleaningUp(datatransfer.Cancelled)) +} + type event struct { event datatransfer.Event state datatransfer.ChannelState diff --git a/events.go b/events.go index 118f3724..6841bcb6 100644 --- a/events.go +++ b/events.go @@ -66,6 +66,10 @@ const ( // Complete is emitted when a data transfer is complete Complete + + // CompleteCleanupOnRestart is emitted when a data transfer channel is restarted to signal + // that channels that were cleaning up should finish cleanup + CompleteCleanupOnRestart ) // Events are human readable names for data transfer events @@ -88,6 +92,7 @@ var Events = map[EventCode]string{ ResponderCompletes: "ResponderCompletes", BeginFinalizing: "BeginFinalizing", Complete: "Complete", + CompleteCleanupOnRestart: "CompleteCleanupOnRestart", } // Event is a struct containing information about a data transfer event diff --git a/impl/impl.go b/impl/impl.go index eb489a25..5e6003fb 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -328,6 +328,11 @@ func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatrans return nil } + // if channel is is cleanup state, finish it + if channels.IsChannelCleaningUp(channel.Status()) { + return m.channels.CompleteCleanupOnRestart(channel.ChannelID()) + } + // initiate restart chType := m.channelDataTransferType(channel) switch chType { diff --git a/impl/restart_integration_test.go b/impl/restart_integration_test.go index eeb041df..30bafed2 100644 --- a/impl/restart_integration_test.go +++ b/impl/restart_integration_test.go @@ -181,7 +181,7 @@ func TestRestartPush(t *testing.T) { // Connect the peers and restart require.NoError(t, rh.gsData.Mn.LinkAll()) // let linking take effect - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2) require.NoError(t, err) require.NotNil(t, conn) @@ -374,6 +374,8 @@ func TestRestartPull(t *testing.T) { require.NoError(t, rh.gsData.Mn.LinkAll()) // let linking take effect time.Sleep(500 * time.Millisecond) + // let linking take effect + time.Sleep(500 * time.Millisecond) conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2) require.NoError(t, err) require.NotNil(t, conn)