Skip to content

Commit

Permalink
Merge branch 'master' into test/restart-block-resend
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 29, 2021
2 parents 7607b16 + 745720a commit 81ab709
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 71 deletions.
9 changes: 6 additions & 3 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ func TestChannels(t *testing.T) {

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(50), state.Received())
require.Equal(t, uint64(0), state.Sent())
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataSentProgress)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(50), state.Received())
require.Equal(t, uint64(100), state.Sent())
Expand All @@ -167,6 +169,7 @@ func TestChannels(t *testing.T) {

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
Expand All @@ -176,14 +179,14 @@ func TestChannels(t *testing.T) {
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(125), state.Sent())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(150), state.Received())
require.Equal(t, uint64(125), state.Sent())
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1], cids[0]}, state.ReceivedCids())
})

Expand Down
3 changes: 1 addition & 2 deletions cidsets/cidsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package cidsets
import (
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"

"github.com/ipfs/go-cid"
)

// SetID is a unique ID for a CID set
Expand Down
4 changes: 2 additions & 2 deletions cidsets/cidsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package cidsets
import (
"testing"

ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-data-transfer/testutil"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
)

func TestCIDSetManager(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion events.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,22 @@ const (
// that channels that were cleaning up should finish cleanup
CompleteCleanupOnRestart

// DataQueued is emmited is read and queued for sending to the remote peer
// DataQueued is emitted when data is read and queued for sending to the remote peer
DataQueued

// DataQueuedProgress is emitted the first time a block is queued for
// sending to the remote peer. It is used to measure progress of how much
// of the total data has been queued.
DataQueuedProgress

// DataSentProgress is emitted the first time a block is sent to the remote
// peer. It is used to measure progress of how much of the total data has
// been sent.
DataSentProgress

// DataReceivedProgress is emitted the first time a block is received from
// the remote peer. It is used to measure progress of how much of the total
// data has been received.
DataReceivedProgress
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0=
github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
6 changes: 3 additions & 3 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestDataTransferInitiating(t *testing.T) {
},
},
"Disconnected request resumes": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.DataReceived},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Disconnected, datatransfer.DataReceivedProgress, datatransfer.DataReceived},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
Expand All @@ -137,7 +137,7 @@ func TestDataTransferInitiating(t *testing.T) {
},
},
"Disconnected request resumes, push": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Accept, datatransfer.ResumeResponder, datatransfer.Disconnected, datatransfer.DataSent},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.Accept, datatransfer.ResumeResponder, datatransfer.Disconnected, datatransfer.DataSentProgress, datatransfer.DataSent},
options: []DataTransferOption{ChannelRemoveTimeout(10 * time.Millisecond)},
verify: func(t *testing.T, h *harness) {
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestDataTransferRestartInitiating(t *testing.T) {
verify func(t *testing.T, h *harness)
}{
"RestartDataTransferChannel: Manager Peer Create Pull Restart works": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.DataReceived, datatransfer.DataReceived},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.DataReceivedProgress, datatransfer.DataReceived, datatransfer.DataReceivedProgress, datatransfer.DataReceived},
verify: func(t *testing.T, h *harness) {
// open a pull channel
channelID, err := h.dt.OpenPullDataChannel(h.ctx, h.peers[1], h.voucher, h.baseCid, h.stor)
Expand Down
38 changes: 24 additions & 14 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ func TestPauseAndResume(t *testing.T) {
}
for testCase, isPull := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
Expand All @@ -893,8 +893,8 @@ func TestPauseAndResume(t *testing.T) {
finished := make(chan struct{}, 2)
errChan := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
sent := make(chan uint64, 21)
received := make(chan uint64, 21)
sent := make(chan uint64, 100)
received := make(chan uint64, 100)
pauseInitiator := make(chan struct{}, 2)
resumeInitiator := make(chan struct{}, 2)
pauseResponder := make(chan struct{}, 2)
Expand Down Expand Up @@ -1186,7 +1186,7 @@ func (fgsr *fakeGraphSyncReceiver) ReceiveMessage(ctx context.Context, sender pe
}
}

func (fgsr *fakeGraphSyncReceiver) ReceiveError(_ error) {
func (fgsr *fakeGraphSyncReceiver) ReceiveError(_ peer.ID, _ error) {
}
func (fgsr *fakeGraphSyncReceiver) Connected(p peer.ID) {
}
Expand Down Expand Up @@ -1262,8 +1262,10 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
gsmessage.AddRequest(request)
builder := gsmsg.NewBuilder(0)
builder.AddRequest(request)
gsmessage, err := builder.Build()
require.NoError(t, err)
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))

status := gsr.consumeResponses(ctx, t)
Expand All @@ -1282,8 +1284,10 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
gsmessage.AddRequest(request)
builder := gsmsg.NewBuilder(0)
builder.AddRequest(request)
gsmessage, err := builder.Build()
require.NoError(t, err)
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))

status := gsr.consumeResponses(ctx, t)
Expand Down Expand Up @@ -1337,8 +1341,10 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
}

request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()))
gsmessage := gsmsg.New()
gsmessage.AddRequest(request)
builder := gsmsg.NewBuilder(0)
builder.AddRequest(request)
gsmessage, err := builder.Build()
require.NoError(t, err)
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))

status := gsr.consumeResponses(ctx, t)
Expand Down Expand Up @@ -1376,8 +1382,10 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
})

// initiator requests data over graphsync network
gsmessage := gsmsg.New()
gsmessage.AddRequest(gsRequest)
builder := gsmsg.NewBuilder(0)
builder.AddRequest(gsRequest)
gsmessage, err := builder.Build()
require.NoError(t, err)
require.NoError(t, gsData.GsNet1.SendMessage(ctx, gsData.Host2.ID(), gsmessage))
status := gsr.consumeResponses(ctx, t)
require.False(t, gsmsg.IsTerminalFailureCode(status))
Expand All @@ -1403,8 +1411,10 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
Name: extension.ExtensionDataTransfer1_1,
Data: extData,
})
gsmessage := gsmsg.New()
gsmessage.AddRequest(request)
builder := gsmsg.NewBuilder(0)
builder.AddRequest(request)
gsmessage, err := builder.Build()
require.NoError(t, err)

// non-initiator requests data over graphsync network, but should not get it
// because there was no previous request
Expand Down
9 changes: 7 additions & 2 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func TestDataTransferResponding(t *testing.T) {
datatransfer.Open,
datatransfer.NewVoucherResult,
datatransfer.Accept,
datatransfer.DataReceivedProgress,
datatransfer.DataReceived,
datatransfer.NewVoucherResult,
datatransfer.PauseResponder,
Expand Down Expand Up @@ -315,6 +316,7 @@ func TestDataTransferResponding(t *testing.T) {
datatransfer.Open,
datatransfer.NewVoucherResult,
datatransfer.Accept,
datatransfer.DataReceivedProgress,
datatransfer.DataReceived,
datatransfer.NewVoucherResult,
},
Expand Down Expand Up @@ -350,6 +352,7 @@ func TestDataTransferResponding(t *testing.T) {
datatransfer.Open,
datatransfer.NewVoucherResult,
datatransfer.Accept,
datatransfer.DataReceivedProgress,
datatransfer.DataReceived,
datatransfer.NewVoucherResult,
datatransfer.PauseResponder,
Expand Down Expand Up @@ -399,6 +402,7 @@ func TestDataTransferResponding(t *testing.T) {
datatransfer.Open,
datatransfer.NewVoucherResult,
datatransfer.Accept,
datatransfer.DataQueuedProgress,
datatransfer.DataQueued,
datatransfer.NewVoucherResult,
datatransfer.PauseResponder,
Expand Down Expand Up @@ -631,7 +635,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
},
"receiving a push restart request validates and opens a channel for pull": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.NewVoucherResult, datatransfer.Accept,
datatransfer.DataReceived, datatransfer.DataReceived, datatransfer.NewVoucherResult, datatransfer.Restart},
datatransfer.DataReceivedProgress, datatransfer.DataReceived, datatransfer.DataReceivedProgress, datatransfer.DataReceived,
datatransfer.NewVoucherResult, datatransfer.Restart},
configureValidator: func(sv *testutil.StubbedValidator) {
sv.ExpectSuccessPush()
sv.StubResult(testutil.NewFakeDTType())
Expand Down Expand Up @@ -846,7 +851,7 @@ func TestDataTransferRestartResponding(t *testing.T) {
},
},
"ReceiveRestartExistingChannelRequest: Reopen Pull Channel": {
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.DataReceived, datatransfer.DataReceived},
expectedEvents: []datatransfer.EventCode{datatransfer.Open, datatransfer.DataReceivedProgress, datatransfer.DataReceived, datatransfer.DataReceivedProgress, datatransfer.DataReceived},
configureValidator: func(sv *testutil.StubbedValidator) {
},
verify: func(t *testing.T, h *receiverHarness) {
Expand Down
47 changes: 28 additions & 19 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,25 +94,26 @@ type PersistenceOption struct {

// FakeGraphSync implements a GraphExchange but does nothing
type FakeGraphSync struct {
requests chan ReceivedGraphSyncRequest // records calls to fakeGraphSync.Request
pauseRequests chan PauseRequest
resumeRequests chan ResumeRequest
pauseResponses chan PauseResponse
resumeResponses chan ResumeResponse
cancelResponses chan CancelResponse
persistenceOptionsLk sync.RWMutex
persistenceOptions map[string]PersistenceOption
leaveRequestsOpen bool
OutgoingRequestHook graphsync.OnOutgoingRequestHook
IncomingBlockHook graphsync.OnIncomingBlockHook
OutgoingBlockHook graphsync.OnOutgoingBlockHook
IncomingRequestHook graphsync.OnIncomingRequestHook
CompletedResponseListener graphsync.OnResponseCompletedListener
RequestUpdatedHook graphsync.OnRequestUpdatedHook
IncomingResponseHook graphsync.OnIncomingResponseHook
RequestorCancelledListener graphsync.OnRequestorCancelledListener
BlockSentListener graphsync.OnBlockSentListener
NetworkErrorListener graphsync.OnNetworkErrorListener
requests chan ReceivedGraphSyncRequest // records calls to fakeGraphSync.Request
pauseRequests chan PauseRequest
resumeRequests chan ResumeRequest
pauseResponses chan PauseResponse
resumeResponses chan ResumeResponse
cancelResponses chan CancelResponse
persistenceOptionsLk sync.RWMutex
persistenceOptions map[string]PersistenceOption
leaveRequestsOpen bool
OutgoingRequestHook graphsync.OnOutgoingRequestHook
IncomingBlockHook graphsync.OnIncomingBlockHook
OutgoingBlockHook graphsync.OnOutgoingBlockHook
IncomingRequestHook graphsync.OnIncomingRequestHook
CompletedResponseListener graphsync.OnResponseCompletedListener
RequestUpdatedHook graphsync.OnRequestUpdatedHook
IncomingResponseHook graphsync.OnIncomingResponseHook
RequestorCancelledListener graphsync.OnRequestorCancelledListener
BlockSentListener graphsync.OnBlockSentListener
NetworkErrorListener graphsync.OnNetworkErrorListener
ReceiverNetworkErrorListener graphsync.OnReceiverNetworkErrorListener
}

// NewFakeGraphSync returns a new fake graphsync implementation
Expand Down Expand Up @@ -387,6 +388,14 @@ func (fgs *FakeGraphSync) RegisterNetworkErrorListener(listener graphsync.OnNetw
}
}

// RegisterNetworkErrorListener adds a listener on the responder as blocks go out
func (fgs *FakeGraphSync) RegisterReceiverNetworkErrorListener(listener graphsync.OnReceiverNetworkErrorListener) graphsync.UnregisterHookFunc {
fgs.ReceiverNetworkErrorListener = listener
return func() {
fgs.ReceiverNetworkErrorListener = nil
}
}

var _ graphsync.GraphExchange = &FakeGraphSync{}

type fakeBlkData struct {
Expand Down
Loading

0 comments on commit 81ab709

Please sign in to comment.