Skip to content

Commit

Permalink
Revert "Accept missing blocks without failing data transfer (#291)" (#…
Browse files Browse the repository at this point in the history
…294)

This reverts commit b442027.
(still contains tracing improvements and module dep updates)

Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com>
  • Loading branch information
hannahhoward and jennijuju committed Jan 19, 2022
1 parent eb62ceb commit 6109651
Show file tree
Hide file tree
Showing 16 changed files with 3 additions and 428 deletions.
4 changes: 0 additions & 4 deletions channelmonitor/channelmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,6 @@ func (m *mockChannelState) ReceivedCidsTotal() int64 {
panic("implement me")
}

func (m *mockChannelState) MissingCids() []cid.Cid {
panic("implement me")
}

func (m *mockChannelState) QueuedCidsTotal() int64 {
panic("implement me")
}
Expand Down
8 changes: 0 additions & 8 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ type channelState struct {
// stages tracks the timeline of events related to a data transfer, for
// traceability purposes.
stages *datatransfer.ChannelStages

// missingCids are the set of CIDS that were missing and skipped over in the data transfer
missingCids []cid.Cid
}

// EmptyChannelState is the zero value for channel state, meaning not present
Expand Down Expand Up @@ -195,10 +192,6 @@ func (c channelState) OtherPeer() peer.ID {
return c.sender
}

func (c channelState) MissingCids() []cid.Cid {
return c.missingCids
}

// Stages returns the current ChannelStages object, or an empty object.
// It is unsafe for the caller to modify the return value, and changes may not
// be persisted. It should be treated as immutable.
Expand Down Expand Up @@ -237,7 +230,6 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
stages: c.Stages,
missingCids: c.MissingCids,
}
}

Expand Down
5 changes: 0 additions & 5 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,6 @@ func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) erro
return c.send(chid, datatransfer.ReceiveDataError, err)
}

// CIDMissing indicates the sender is missing a section of the graph in the response
func (c *Channels) CIDMissing(chid datatransfer.ChannelID, cid cid.Cid) error {
return c.send(chid, datatransfer.CIDMissing, cid)
}

// HasChannel returns true if the given channel id is being tracked
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
Expand Down
29 changes: 0 additions & 29 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package channels

import (
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
cbg "github.com/whyrusleeping/cbor-gen"

Expand Down Expand Up @@ -110,24 +109,6 @@ var ChannelEvents = fsm.Events{
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.CIDMissing).FromMany(transferringStates...).ToJustRecord().
Action(func(chst *internal.ChannelState, missing cid.Cid) error {
// TODO: find a more efficient way to do this
var found bool
for _, c := range chst.MissingCids {
if c.Equals(missing) {
found = true
break
}
}
if !found {
chst.MissingCids = append(chst.MissingCids, missing)
}
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer disconnected: %s", chst.Message)
Expand Down Expand Up @@ -250,12 +231,6 @@ var ChannelEvents = fsm.Events{
return nil
}),

fsm.Event(datatransfer.CleanupCompletePartial).
From(datatransfer.Completing).To(datatransfer.PartiallyCompleted).Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

// will kickoff state handlers for channels that were cleaning up
fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
Expand All @@ -278,9 +253,6 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
}
env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder})
env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String())
if channel.Status == datatransfer.Completing && len(channel.MissingCids) > 0 {
return ctx.Trigger(datatransfer.CleanupCompletePartial)
}
return ctx.Trigger(datatransfer.CleanupComplete)
}

Expand All @@ -296,7 +268,6 @@ var ChannelFinalityStates = []fsm.StateKey{
datatransfer.Cancelled,
datatransfer.Completed,
datatransfer.Failed,
datatransfer.PartiallyCompleted,
}

// IsChannelTerminated returns true if the channel is in a finality state
Expand Down
30 changes: 0 additions & 30 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand Down Expand Up @@ -222,35 +221,6 @@ func TestChannels(t *testing.T) {
require.Equal(t, uint64(100), state.Sent())
})

t.Run("missing cids", func(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())

channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, &fakeEnv{}, peers[0])
require.NoError(t, err)
err = channelList.Start(ctx)
require.NoError(t, err)

_, err = channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
require.NoError(t, err)
state := checkEvent(ctx, t, received, datatransfer.Open)
require.Equal(t, datatransfer.Requested, state.Status())

err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0])
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
require.Equal(t, []cid.Cid{cids[0]}, state.MissingCids())

err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1])
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids())

err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0])
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.CIDMissing)
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids())
})

t.Run("pause/resume", func(t *testing.T) {
state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
require.NoError(t, err)
Expand Down
3 changes: 0 additions & 3 deletions channels/internal/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ type ChannelState struct {
//
// EXPERIMENTAL; subject to change.
Stages *datatransfer.ChannelStages

// MissingCids are the set of CIDS that were missing and skipped over in the data transfer
MissingCids []cid.Cid
}

// AddLog takes an fmt string with arguments, and adds the formatted string to
Expand Down
55 changes: 1 addition & 54 deletions channels/internal/internalchannel_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ const (

// Opened is fired when a request for data is sent from this node to a peer
Opened

// CIDMissing is fired when the sender is missing a section of the graph in the response
CIDMissing

// CleanupCompletePartial causes a completing request to transition to a PartiallyCompleted state
// rather than a full Completed state
CleanupCompletePartial
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -151,8 +144,6 @@ var Events = map[EventCode]string{
ReceiveDataError: "ReceiveDataError",
TransferRequestQueued: "TransferRequestQueued",
RequestCancelled: "RequestCancelled",
CIDMissing: "CIDMissing",
CleanupCompletePartial: "CleanupCompletePartial",
}

// Event is a struct containing information about a data transfer event
Expand Down
4 changes: 0 additions & 4 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,6 @@ func (m *manager) OnContextAugment(chid datatransfer.ChannelID) func(context.Con
}
}

func (m *manager) OnLinkMissing(chid datatransfer.ChannelID, link ipld.Link) error {
return m.channels.CIDMissing(chid, link.(cidlink.Link).Cid)
}

func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming datatransfer.Request) (datatransfer.Response, error) {
log.Infof("channel %s: received restart request", chid)

Expand Down
Loading

0 comments on commit 6109651

Please sign in to comment.