Skip to content

Commit

Permalink
Complete channels that were cleaning up when we restart (#81)
Browse files Browse the repository at this point in the history
* finish channels that were cleaning up

* peers should connect after linking
  • Loading branch information
aarshkshah1992 committed Sep 24, 2020
1 parent ad68178 commit a1b705a
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 1 deletion.
4 changes: 4 additions & 0 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (c *Channels) Restart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Restart)
}

func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataSent, delta, cid)
}
Expand Down
21 changes: 21 additions & 0 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ var ChannelEvents = fsm.Events{
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
From(datatransfer.Completing).To(datatransfer.Completed),

// will kickoff state handlers for channels that were cleaning up
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange(),
}

// ChannelStateEntryFuncs are handlers called as we enter different states
Expand All @@ -119,6 +122,13 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
return ctx.Trigger(datatransfer.CleanupComplete)
}

// CleanupStates are the penultimate states for a channel
var CleanupStates = []fsm.StateKey{
datatransfer.Cancelling,
datatransfer.Completing,
datatransfer.Failing,
}

// ChannelFinalityStates are the final states for a channel
var ChannelFinalityStates = []fsm.StateKey{
datatransfer.Cancelled,
Expand All @@ -136,3 +146,14 @@ func IsChannelTerminated(st datatransfer.Status) bool {

return false
}

// IsChannelCleaningUp returns true if channel was being cleaned up and finished
func IsChannelCleaningUp(st datatransfer.Status) bool {
for _, s := range CleanupStates {
if s == st {
return true
}
}

return false
}
7 changes: 7 additions & 0 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ func TestIsChannelTerminated(t *testing.T) {
require.False(t, channels.IsChannelTerminated(datatransfer.Ongoing))
}

func TestIsChannelCleaningUp(t *testing.T) {
require.True(t, channels.IsChannelCleaningUp(datatransfer.Cancelling))
require.True(t, channels.IsChannelCleaningUp(datatransfer.Failing))
require.True(t, channels.IsChannelCleaningUp(datatransfer.Completing))
require.False(t, channels.IsChannelCleaningUp(datatransfer.Cancelled))
}

type event struct {
event datatransfer.Event
state datatransfer.ChannelState
Expand Down
5 changes: 5 additions & 0 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ const (

// Complete is emitted when a data transfer is complete
Complete

// CompleteCleanupOnRestart is emitted when a data transfer channel is restarted to signal
// that channels that were cleaning up should finish cleanup
CompleteCleanupOnRestart
)

// Events are human readable names for data transfer events
Expand All @@ -85,6 +89,7 @@ var Events = map[EventCode]string{
ResponderCompletes: "ResponderCompletes",
BeginFinalizing: "BeginFinalizing",
Complete: "Complete",
CompleteCleanupOnRestart: "CompleteCleanupOnRestart",
}

// Event is a struct containing information about a data transfer event
Expand Down
5 changes: 5 additions & 0 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatrans
return nil
}

// if channel is is cleanup state, finish it
if channels.IsChannelCleaningUp(channel.Status()) {
return m.channels.CompleteCleanupOnRestart(channel.ChannelID())
}

// initiate restart
chType := m.channelDataTransferType(channel)
switch chType {
Expand Down
2 changes: 1 addition & 1 deletion impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestRestartPush(t *testing.T) {
// Connect the peers and restart
require.NoError(t, rh.gsData.Mn.LinkAll())
// let linking take effect
time.Sleep(500 * time.Millisecond)
time.Sleep(1 * time.Second)
conn, err := rh.gsData.Mn.ConnectPeers(rh.peer1, rh.peer2)
require.NoError(t, err)
require.NotNil(t, conn)
Expand Down

0 comments on commit a1b705a

Please sign in to comment.