From 6109651443233cd35e05420078db84cd3130044b Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Wed, 19 Jan 2022 13:51:21 -0800 Subject: [PATCH] Revert "Accept missing blocks without failing data transfer (#291)" (#294) This reverts commit b442027c85df762bb74538ceb34ff6b79d1cf590. (still contains tracing improvements and module dep updates) Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> --- channelmonitor/channelmonitor_test.go | 4 - channels/channel_state.go | 8 -- channels/channels.go | 5 - channels/channels_fsm.go | 29 ----- channels/channels_test.go | 30 ----- channels/internal/internalchannel.go | 3 - channels/internal/internalchannel_cbor_gen.go | 55 +------- events.go | 9 -- impl/events.go | 4 - impl/integration_test.go | 118 ------------------ statuses.go | 5 - testutil/test_partial_tree.go | 92 -------------- transport.go | 4 - transport/graphsync/graphsync.go | 11 +- transport/graphsync/graphsync_test.go | 49 +------- types.go | 5 - 16 files changed, 3 insertions(+), 428 deletions(-) delete mode 100644 testutil/test_partial_tree.go diff --git a/channelmonitor/channelmonitor_test.go b/channelmonitor/channelmonitor_test.go index cd07bbea..bb4857f0 100644 --- a/channelmonitor/channelmonitor_test.go +++ b/channelmonitor/channelmonitor_test.go @@ -614,10 +614,6 @@ func (m *mockChannelState) ReceivedCidsTotal() int64 { panic("implement me") } -func (m *mockChannelState) MissingCids() []cid.Cid { - panic("implement me") -} - func (m *mockChannelState) QueuedCidsTotal() int64 { panic("implement me") } diff --git a/channels/channel_state.go b/channels/channel_state.go index 9305d4d4..bf89b85a 100644 --- a/channels/channel_state.go +++ b/channels/channel_state.go @@ -61,9 +61,6 @@ type channelState struct { // stages tracks the timeline of events related to a data transfer, for // traceability purposes. stages *datatransfer.ChannelStages - - // missingCids are the set of CIDS that were missing and skipped over in the data transfer - missingCids []cid.Cid } // EmptyChannelState is the zero value for channel state, meaning not present @@ -195,10 +192,6 @@ func (c channelState) OtherPeer() peer.ID { return c.sender } -func (c channelState) MissingCids() []cid.Cid { - return c.missingCids -} - // Stages returns the current ChannelStages object, or an empty object. // It is unsafe for the caller to modify the return value, and changes may not // be persisted. It should be treated as immutable. @@ -237,7 +230,6 @@ func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByT voucherResultDecoder: voucherResultDecoder, voucherDecoder: voucherDecoder, stages: c.Stages, - missingCids: c.MissingCids, } } diff --git a/channels/channels.go b/channels/channels.go index b459dfd9..dd2ad45d 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -354,11 +354,6 @@ func (c *Channels) ReceiveDataError(chid datatransfer.ChannelID, err error) erro return c.send(chid, datatransfer.ReceiveDataError, err) } -// CIDMissing indicates the sender is missing a section of the graph in the response -func (c *Channels) CIDMissing(chid datatransfer.ChannelID, cid cid.Cid) error { - return c.send(chid, datatransfer.CIDMissing, cid) -} - // HasChannel returns true if the given channel id is being tracked func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) { return c.stateMachines.Has(chid) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 712d6dd6..01138435 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -1,7 +1,6 @@ package channels import ( - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" cbg "github.com/whyrusleeping/cbor-gen" @@ -110,24 +109,6 @@ var ChannelEvents = fsm.Events{ chst.AddLog("") return nil }), - - fsm.Event(datatransfer.CIDMissing).FromMany(transferringStates...).ToJustRecord(). - Action(func(chst *internal.ChannelState, missing cid.Cid) error { - // TODO: find a more efficient way to do this - var found bool - for _, c := range chst.MissingCids { - if c.Equals(missing) { - found = true - break - } - } - if !found { - chst.MissingCids = append(chst.MissingCids, missing) - } - chst.AddLog("") - return nil - }), - fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() chst.AddLog("data transfer disconnected: %s", chst.Message) @@ -250,12 +231,6 @@ var ChannelEvents = fsm.Events{ return nil }), - fsm.Event(datatransfer.CleanupCompletePartial). - From(datatransfer.Completing).To(datatransfer.PartiallyCompleted).Action(func(chst *internal.ChannelState) error { - chst.AddLog("") - return nil - }), - // will kickoff state handlers for channels that were cleaning up fsm.Event(datatransfer.CompleteCleanupOnRestart).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error { chst.AddLog("") @@ -278,9 +253,6 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal } env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}) env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String()) - if channel.Status == datatransfer.Completing && len(channel.MissingCids) > 0 { - return ctx.Trigger(datatransfer.CleanupCompletePartial) - } return ctx.Trigger(datatransfer.CleanupComplete) } @@ -296,7 +268,6 @@ var ChannelFinalityStates = []fsm.StateKey{ datatransfer.Cancelled, datatransfer.Completed, datatransfer.Failed, - datatransfer.PartiallyCompleted, } // IsChannelTerminated returns true if the channel is in a finality state diff --git a/channels/channels_test.go b/channels/channels_test.go index 1121a3d3..520c06a6 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" dss "github.com/ipfs/go-datastore/sync" basicnode "github.com/ipld/go-ipld-prime/node/basic" @@ -222,35 +221,6 @@ func TestChannels(t *testing.T) { require.Equal(t, uint64(100), state.Sent()) }) - t.Run("missing cids", func(t *testing.T) { - ds := dss.MutexWrap(datastore.NewMapDatastore()) - - channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, &fakeEnv{}, peers[0]) - require.NoError(t, err) - err = channelList.Start(ctx) - require.NoError(t, err) - - _, err = channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1]) - require.NoError(t, err) - state := checkEvent(ctx, t, received, datatransfer.Open) - require.Equal(t, datatransfer.Requested, state.Status()) - - err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0]) - require.NoError(t, err) - state = checkEvent(ctx, t, received, datatransfer.CIDMissing) - require.Equal(t, []cid.Cid{cids[0]}, state.MissingCids()) - - err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1]) - require.NoError(t, err) - state = checkEvent(ctx, t, received, datatransfer.CIDMissing) - require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids()) - - err = channelList.CIDMissing(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0]) - require.NoError(t, err) - state = checkEvent(ctx, t, received, datatransfer.CIDMissing) - require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.MissingCids()) - }) - t.Run("pause/resume", func(t *testing.T) { state, err := channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}) require.NoError(t, err) diff --git a/channels/internal/internalchannel.go b/channels/internal/internalchannel.go index f233d673..f6cf916b 100644 --- a/channels/internal/internalchannel.go +++ b/channels/internal/internalchannel.go @@ -73,9 +73,6 @@ type ChannelState struct { // // EXPERIMENTAL; subject to change. Stages *datatransfer.ChannelStages - - // MissingCids are the set of CIDS that were missing and skipped over in the data transfer - MissingCids []cid.Cid } // AddLog takes an fmt string with arguments, and adds the formatted string to diff --git a/channels/internal/internalchannel_cbor_gen.go b/channels/internal/internalchannel_cbor_gen.go index 84536672..5a6cfb19 100644 --- a/channels/internal/internalchannel_cbor_gen.go +++ b/channels/internal/internalchannel_cbor_gen.go @@ -23,7 +23,7 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{181}); err != nil { + if _, err := w.Write([]byte{180}); err != nil { return err } @@ -426,31 +426,6 @@ func (t *ChannelState) MarshalCBOR(w io.Writer) error { if err := t.Stages.MarshalCBOR(w); err != nil { return err } - - // t.MissingCids ([]cid.Cid) (slice) - if len("MissingCids") > cbg.MaxLength { - return xerrors.Errorf("Value in field \"MissingCids\" was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("MissingCids"))); err != nil { - return err - } - if _, err := io.WriteString(w, string("MissingCids")); err != nil { - return err - } - - if len(t.MissingCids) > cbg.MaxLength { - return xerrors.Errorf("Slice value in field t.MissingCids was too long") - } - - if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajArray, uint64(len(t.MissingCids))); err != nil { - return err - } - for _, v := range t.MissingCids { - if err := cbg.WriteCidBuf(scratch, w, v); err != nil { - return xerrors.Errorf("failed writing cid field t.MissingCids: %w", err) - } - } return nil } @@ -825,34 +800,6 @@ func (t *ChannelState) UnmarshalCBOR(r io.Reader) error { } } - // t.MissingCids ([]cid.Cid) (slice) - case "MissingCids": - - maj, extra, err = cbg.CborReadHeaderBuf(br, scratch) - if err != nil { - return err - } - - if extra > cbg.MaxLength { - return fmt.Errorf("t.MissingCids: array too large (%d)", extra) - } - - if maj != cbg.MajArray { - return fmt.Errorf("expected cbor array") - } - - if extra > 0 { - t.MissingCids = make([]cid.Cid, extra) - } - - for i := 0; i < int(extra); i++ { - - c, err := cbg.ReadCid(br) - if err != nil { - return xerrors.Errorf("reading cid field t.MissingCids failed: %w", err) - } - t.MissingCids[i] = c - } default: // Field doesn't exist on this type, so ignore it diff --git a/events.go b/events.go index 8ae3c5be..664579c4 100644 --- a/events.go +++ b/events.go @@ -110,13 +110,6 @@ const ( // Opened is fired when a request for data is sent from this node to a peer Opened - - // CIDMissing is fired when the sender is missing a section of the graph in the response - CIDMissing - - // CleanupCompletePartial causes a completing request to transition to a PartiallyCompleted state - // rather than a full Completed state - CleanupCompletePartial ) // Events are human readable names for data transfer events @@ -151,8 +144,6 @@ var Events = map[EventCode]string{ ReceiveDataError: "ReceiveDataError", TransferRequestQueued: "TransferRequestQueued", RequestCancelled: "RequestCancelled", - CIDMissing: "CIDMissing", - CleanupCompletePartial: "CleanupCompletePartial", } // Event is a struct containing information about a data transfer event diff --git a/impl/events.go b/impl/events.go index 442b8e77..2c6860fb 100644 --- a/impl/events.go +++ b/impl/events.go @@ -337,10 +337,6 @@ func (m *manager) OnContextAugment(chid datatransfer.ChannelID) func(context.Con } } -func (m *manager) OnLinkMissing(chid datatransfer.ChannelID, link ipld.Link) error { - return m.channels.CIDMissing(chid, link.(cidlink.Link).Cid) -} - func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming datatransfer.Request) (datatransfer.Response, error) { log.Infof("channel %s: received restart request", chid) diff --git a/impl/integration_test.go b/impl/integration_test.go index f470155f..158827f8 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -274,124 +274,6 @@ func TestRoundTrip(t *testing.T) { } // } -func TestRoundTripMissingBlocks(t *testing.T) { - ctx := context.Background() - testCases := map[string]struct { - isPull bool - }{ - "roundtrip for push requests": {}, - "roundtrip for pull requests": { - isPull: true, - }, - } - for testCase, data := range testCases { - t.Run(testCase, func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil) - host1 := gsData.Host1 // initiator, data sender - host2 := gsData.Host2 // data recipient - - tp1 := gsData.SetupGSTransportHost1() - tp2 := gsData.SetupGSTransportHost2() - - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1) - require.NoError(t, err) - testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2) - require.NoError(t, err) - testutil.StartAndWaitForReady(ctx, t, dt2) - - finished := make(chan struct{}, 2) - errChan := make(chan struct{}, 2) - opened := make(chan struct{}, 2) - sent := make(chan uint64, 21) - received := make(chan uint64, 21) - var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { - if event.Code == datatransfer.DataQueued { - if channelState.Queued() > 0 { - sent <- channelState.Queued() - } - } - - if event.Code == datatransfer.DataReceived { - if channelState.Received() > 0 { - received <- channelState.Received() - } - } - - if channelState.Status() == datatransfer.Completed || channelState.Status() == datatransfer.PartiallyCompleted { - finished <- struct{}{} - } - if event.Code == datatransfer.Error { - fmt.Println(channelState.Message()) - errChan <- struct{}{} - } - if event.Code == datatransfer.Open { - opened <- struct{}{} - } - } - dt1.SubscribeToEvents(subscriber) - dt2.SubscribeToEvents(subscriber) - voucher := testutil.FakeDTType{Data: "applesauce"} - sv := testutil.NewStubbedValidator() - - partialTree := testutil.NewPartialTree(t, gsData.Bs1) - var chid datatransfer.ChannelID - if data.isPull { - sv.ExpectSuccessPull() - require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) - chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, partialTree.PresentRootLink.(cidlink.Link).Cid, gsData.AllSelector) - } else { - sv.ExpectSuccessPush() - require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv)) - chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, partialTree.PresentRootLink.(cidlink.Link).Cid, gsData.AllSelector) - } - require.NoError(t, err) - opens := 0 - completes := 0 - sentIncrements := make([]uint64, 0, 21) - receivedIncrements := make([]uint64, 0, 21) - for opens < 2 || completes < 2 { - select { - case <-ctx.Done(): - t.Fatal("Did not complete successful data transfer") - case <-finished: - completes++ - case <-opened: - opens++ - case sentIncrement := <-sent: - sentIncrements = append(sentIncrements, sentIncrement) - case receivedIncrement := <-received: - receivedIncrements = append(receivedIncrements, receivedIncrement) - case <-errChan: - t.Fatal("received error on data transfer") - } - } - require.Equal(t, sentIncrements, receivedIncrements) - if data.isPull { - assert.Equal(t, chid.Initiator, host2.ID()) - } else { - assert.Equal(t, chid.Initiator, host1.ID()) - } - cs, err := dt2.ChannelState(ctx, chid) - require.NoError(t, err) - require.Equal(t, cs.Status(), datatransfer.PartiallyCompleted) - missingCids := cs.MissingCids() - require.Len(t, missingCids, 2) - require.Contains(t, missingCids, partialTree.MissingLeafLink.(cidlink.Link).Cid) - require.Contains(t, missingCids, partialTree.MissingMiddleLink.(cidlink.Link).Cid) - require.NotContains(t, missingCids, partialTree.PresentMiddleLink.(cidlink.Link).Cid) - require.NotContains(t, missingCids, partialTree.PresentRootLink.(cidlink.Link).Cid) - - // The missing leaf is not included cause it's hidden completely underneath another missing link - require.NotContains(t, missingCids, partialTree.HiddenMissingLeafLink.(cidlink.Link).Cid) - }) - - } // -} - func TestMultipleRoundTripMultipleStores(t *testing.T) { ctx := context.Background() testCases := map[string]struct { diff --git a/statuses.go b/statuses.go index 3fa405e1..6a4c89be 100644 --- a/statuses.go +++ b/statuses.go @@ -58,10 +58,6 @@ const ( // ChannelNotFoundError means the searched for data transfer does not exist ChannelNotFoundError - - // PartiallyCompleted means the data transfer completed without significant error, but the remote - // peer only sent a portion of the requested DAG rather than the whole thing - PartiallyCompleted ) // Statuses are human readable names for data transfer states @@ -84,5 +80,4 @@ var Statuses = map[Status]string{ ResponderFinalizing: "ResponderFinalizing", ResponderFinalizingTransferFinished: "ResponderFinalizingTransferFinished", ChannelNotFoundError: "ChannelNotFoundError", - PartiallyCompleted: "PartiallyCompleted", } diff --git a/testutil/test_partial_tree.go b/testutil/test_partial_tree.go deleted file mode 100644 index 407b4609..00000000 --- a/testutil/test_partial_tree.go +++ /dev/null @@ -1,92 +0,0 @@ -package testutil - -import ( - "context" - "testing" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-graphsync/storeutil" - blockstore "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipld/go-ipld-prime" - "github.com/stretchr/testify/require" - - // to register multicodec - _ "github.com/ipld/go-ipld-prime/codec/dagjson" - "github.com/ipld/go-ipld-prime/fluent" - "github.com/ipld/go-ipld-prime/linking" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/node/basicnode" -) - -// TestPartialTree is a set of IPLD Data that forms a tree spread across some blocks -// with a serialized in memory representation -type TestPartialTree struct { - MissingLeaf ipld.Node - MissingLeafLink ipld.Link - HiddenMissingLeaf ipld.Node - HiddenMissingLeafLink ipld.Link - PresentMiddle ipld.Node - PresentMiddleLink ipld.Link - MissingMiddle ipld.Node - MissingMiddleLink ipld.Link - PresentRoot ipld.Node - PresentRootLink ipld.Link -} - -// NewPartialTree returns a fake tree of nodes, spread over five blocks, then -// removes some of the blocks from the store -func NewPartialTree(t *testing.T, bs blockstore.Blockstore) TestPartialTree { - lsys := storeutil.LinkSystemForBlockstore(bs) - encode := func(n ipld.Node) (ipld.Node, ipld.Link) { - lnk, err := lsys.Store(linking.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.Prefix{ - Version: 1, - Codec: 0x0129, - MhType: 0x13, - MhLength: 4, - }}, n) - require.NoError(t, err) - return n, lnk - } - - var ( - missingLeaf, missingLeafLink = encode(basicnode.NewString("alpha")) - hiddenMissingLeaf, hiddenMissingLeafLink = encode(basicnode.NewString("beta")) - presentMiddle, presentMiddleLink = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 3, func(na fluent.MapAssembler) { - na.AssembleEntry("foo").AssignBool(true) - na.AssembleEntry("bar").AssignBool(false) - na.AssembleEntry("nested").CreateMap(2, func(na fluent.MapAssembler) { - na.AssembleEntry("alink").AssignLink(missingLeafLink) - na.AssembleEntry("nonlink").AssignString("zoo") - }) - })) - missingMiddle, missingMiddleLink = encode(fluent.MustBuildList(basicnode.Prototype.List, 4, func(na fluent.ListAssembler) { - na.AssembleValue().AssignLink(missingLeafLink) - na.AssembleValue().AssignLink(missingLeafLink) - na.AssembleValue().AssignLink(hiddenMissingLeafLink) - na.AssembleValue().AssignLink(missingLeafLink) - })) - rootNode, rootNodeLnk = encode(fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) { - na.AssembleEntry("plain").AssignString("olde string") - na.AssembleEntry("linkedString").AssignLink(missingLeafLink) - na.AssembleEntry("linkedMap").AssignLink(presentMiddleLink) - na.AssembleEntry("linkedList").AssignLink(missingMiddleLink) - })) - ) - - require.NoError(t, bs.DeleteBlock(context.TODO(), missingLeafLink.(cidlink.Link).Cid)) - require.NoError(t, bs.DeleteBlock(context.TODO(), hiddenMissingLeafLink.(cidlink.Link).Cid)) - require.NoError(t, bs.DeleteBlock(context.TODO(), missingMiddleLink.(cidlink.Link).Cid)) - - return TestPartialTree{ - MissingLeaf: missingLeaf, - MissingLeafLink: missingLeafLink, - HiddenMissingLeaf: hiddenMissingLeaf, - HiddenMissingLeafLink: hiddenMissingLeafLink, - PresentMiddle: presentMiddle, - PresentMiddleLink: presentMiddleLink, - MissingMiddle: missingMiddle, - MissingMiddleLink: missingMiddleLink, - PresentRoot: rootNode, - PresentRootLink: rootNodeLnk, - } -} diff --git a/transport.go b/transport.go index 8719a9f9..6d0b99fa 100644 --- a/transport.go +++ b/transport.go @@ -73,10 +73,6 @@ type EventsHandler interface { // OnContextAugment allows the transport to attach data transfer tracing information // to its local context, in order to create a hierarchical trace OnContextAugment(chid ChannelID) func(context.Context) context.Context - - // OnLinkMissing tells data transfer the sending party was missing the given CID so it was - // not traversed on the receiving side - OnLinkMissing(chid ChannelID, link ipld.Link) error } /* diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 3a17d1d5..f95a7cb4 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -183,15 +183,6 @@ func (t *Transport) consumeResponses(req *gsReq) error { log.Infof("channel %s: finished consuming graphsync response channel", req.channelID) for err := range req.errChan { - // graphsync returns a stream of errors, where some can simply report the remote - // peer was missing a block - // here, we check for that type of error and record this missing block, rather than - // fail the transfer - var remoteMissingBlockErr graphsync.RemoteMissingBlockErr - if errors.As(err, &remoteMissingBlockErr) { - t.events.OnLinkMissing(req.channelID, remoteMissingBlockErr.Link) - continue - } lastError = err } log.Infof("channel %s: finished consuming graphsync error channel", req.channelID) @@ -706,7 +697,7 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req } var completeErr error - if status != graphsync.RequestCompletedFull && status != graphsync.RequestCompletedPartial { + if status != graphsync.RequestCompletedFull { statusStr := gsResponseStatusCodeString(status) completeErr = xerrors.Errorf("graphsync response to peer %s did not complete: response status code %s", p, statusStr) } diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index a907875b..c9b0edc7 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -502,7 +502,7 @@ func TestManager(t *testing.T) { "recognized incoming request will record unsuccessful request completion": { responseConfig: gsResponseConfig{ - status: graphsync.RequestFailedUnknown, + status: graphsync.RequestCompletedPartial, }, action: func(gsData *harness) { gsData.incomingRequestHook() @@ -709,40 +709,6 @@ func TestManager(t *testing.T) { require.True(t, events.OnReceiveDataErrorCalled) }, }, - "open channel sends missing Cids": { - action: func(gsData *harness) { - stor, _ := gsData.outgoing.Selector() - gsData.fgs.LeaveRequestsOpen() - go gsData.outgoingRequestHook() - _ = gsData.transport.OpenChannel( - gsData.ctx, - gsData.other, - datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, - cidlink.Link{Cid: gsData.outgoing.BaseCid()}, - stor, - nil, - gsData.outgoing) - }, - check: func(t *testing.T, events *fakeEvents, gsData *harness) { - requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) - close(requestReceived.ResponseChan) - cid := testutil.GenerateCids(1)[0] - requestReceived.ResponseErrChan <- graphsync.RemoteMissingBlockErr{ - Link: cidlink.Link{Cid: cid}, - } - close(requestReceived.ResponseErrChan) - - require.Eventually(t, func() bool { - return events.OnLinkMissingCalled == true - }, 2*time.Second, 100*time.Millisecond) - require.Equal(t, events.OnLinkMissingLink, cidlink.Link{Cid: cid}) - - require.Eventually(t, func() bool { - return events.OnChannelCompletedCalled == true - }, 2*time.Second, 100*time.Millisecond) - require.True(t, events.ChannelCompletedSuccess) - }, - }, "open channel adds block count to the DoNotSendFirstBlocks extension for v1.2 protocol": { action: func(gsData *harness) { cids := testutil.GenerateCids(2) @@ -1148,9 +1114,6 @@ type fakeEvents struct { OnReceiveDataErrorCalled bool OnReceiveDataErrorChannelID datatransfer.ChannelID OnContextAugmentFunc func(context.Context) context.Context - OnLinkMissingCalled bool - OnLinkMissingLink ipld.Link - OnLinkMissingError error TransferQueuedCalled bool TransferQueuedChannelID datatransfer.ChannelID @@ -1241,12 +1204,6 @@ func (fe *fakeEvents) OnContextAugment(chid datatransfer.ChannelID) func(context return fe.OnContextAugmentFunc } -func (fe *fakeEvents) OnLinkMissing(chid datatransfer.ChannelID, link ipld.Link) error { - fe.OnLinkMissingCalled = true - fe.OnLinkMissingLink = link - return fe.OnLinkMissingError -} - type harness struct { outgoing datatransfer.Request incoming datatransfer.Response @@ -1510,7 +1467,3 @@ func (m *mockChannelState) LastVoucherResult() datatransfer.VoucherResult { func (m *mockChannelState) Stages() *datatransfer.ChannelStages { panic("implement me") } - -func (m *mockChannelState) MissingCids() []cid.Cid { - panic("implement me") -} diff --git a/types.go b/types.go index c3fab186..cd970e0d 100644 --- a/types.go +++ b/types.go @@ -144,11 +144,6 @@ type ChannelState interface { // Queued returns the number of bytes read from the node and queued for sending Queued() uint64 - // MissingCids returns a set of CIDS that were missing and skipped over in the data transfer - // Note: because we were unable to traverse these CIDs, there may be additional CIDs - // in the DAGs these CIDs were at the root of we also missed but are not aware of - MissingCids() []cid.Cid - // Stages returns the timeline of events this data transfer has gone through, // for observability purposes. //