Skip to content

Commit

Permalink
backward compatibility of restart (#96)
Browse files Browse the repository at this point in the history
* backward compatibility of restart

* changes and tests

* more tests

* better error handling for restarts
  • Loading branch information
aarshkshah1992 authored and hannahhoward committed Oct 11, 2020
1 parent 778b3f8 commit b80beae
Show file tree
Hide file tree
Showing 27 changed files with 1,571 additions and 424 deletions.
2 changes: 2 additions & 0 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ func TestDataTransferInitiating(t *testing.T) {
},
}
for testCase, verify := range testCases {

// test for new protocol -> new protocol
t.Run(testCase, func(t *testing.T) {
h := &harness{}
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
285 changes: 151 additions & 134 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -41,6 +42,20 @@ import (

const loremFile = "lorem.txt"

// nil means use the default protocols
// tests data transfer for the following protocol combinations:
// default protocol -> default protocols
// old protocol -> default protocols
// default protocols -> old protocol
var protocolsForTest = map[string]struct {
host1Protocols []protocol.ID
host2Protocols []protocol.ID
}{
"(new -> new)": {nil, nil},
"(old -> new, old)": {[]protocol.ID{datatransfer.ProtocolDataTransfer1_0}, nil},
"(new, old -> old)": {nil, []protocol.ID{datatransfer.ProtocolDataTransfer1_0}},
}

func TestRoundTrip(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
Expand Down Expand Up @@ -77,145 +92,147 @@ func TestRoundTrip(t *testing.T) {
},
}
for testCase, data := range testCases {
t.Run(testCase, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
for pname, ps := range protocolsForTest {
t.Run(testCase+pname, func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient
gsData := testutil.NewGraphsyncTestingData(ctx, t, ps.host1Protocols, ps.host2Protocols)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
err = dt2.Start(ctx)
require.NoError(t, err)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.DtNet1, tp1, gsData.StoredCounter1)
require.NoError(t, err)
err = dt1.Start(ctx)
require.NoError(t, err)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
err = dt2.Start(ctx)
require.NoError(t, err)

finished := make(chan struct{}, 2)
errChan := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
sent := make(chan uint64, 21)
received := make(chan uint64, 21)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.DataSent {
if channelState.Sent() > 0 {
sent <- channelState.Sent()
finished := make(chan struct{}, 2)
errChan := make(chan struct{}, 2)
opened := make(chan struct{}, 2)
sent := make(chan uint64, 21)
received := make(chan uint64, 21)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.DataSent {
if channelState.Sent() > 0 {
sent <- channelState.Sent()
}
}
}

if event.Code == datatransfer.DataReceived {
if channelState.Received() > 0 {
received <- channelState.Received()
if event.Code == datatransfer.DataReceived {
if channelState.Received() > 0 {
received <- channelState.Received()
}
}
}

if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
if event.Code == datatransfer.Error {
errChan <- struct{}{}
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
}
}
if event.Code == datatransfer.Error {
errChan <- struct{}{}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()

var sourceDagService ipldformat.DAGService
if data.customSourceStore {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore")))
loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
sourceDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err := dt1.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) {
fv, ok := testVoucher.(*testutil.FakeDTType)
if ok && fv.Data == voucher.Data {
gsTransport, ok := transport.(*tp.Transport)
if ok {
err := gsTransport.UseStore(channelID, loader, storer)
require.NoError(t, err)
}
}
})
require.NoError(t, err)
} else {
sourceDagService = gsData.DagService1
}
if event.Code == datatransfer.Open {
opened <- struct{}{}
root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
rootCid := root.(cidlink.Link).Cid

var destDagService ipldformat.DAGService
if data.customTargetStore {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore")))
loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
destDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err := dt2.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) {
fv, ok := testVoucher.(*testutil.FakeDTType)
if ok && fv.Data == voucher.Data {
gsTransport, ok := transport.(*tp.Transport)
if ok {
err := gsTransport.UseStore(channelID, loader, storer)
require.NoError(t, err)
}
}
})
require.NoError(t, err)
} else {
destDagService = gsData.DagService2
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()

var sourceDagService ipldformat.DAGService
if data.customSourceStore {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore")))
loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
sourceDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err := dt1.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) {
fv, ok := testVoucher.(*testutil.FakeDTType)
if ok && fv.Data == voucher.Data {
gsTransport, ok := transport.(*tp.Transport)
if ok {
err := gsTransport.UseStore(channelID, loader, storer)
require.NoError(t, err)
}
}
})
var chid datatransfer.ChannelID
if data.isPull {
sv.ExpectSuccessPull()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
} else {
sv.ExpectSuccessPush()
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
}
require.NoError(t, err)
} else {
sourceDagService = gsData.DagService1
}
root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
rootCid := root.(cidlink.Link).Cid

var destDagService ipldformat.DAGService
if data.customTargetStore {
ds := dss.MutexWrap(datastore.NewMapDatastore())
bs := bstore.NewBlockstore(namespace.Wrap(ds, datastore.NewKey("blockstore")))
loader := storeutil.LoaderForBlockstore(bs)
storer := storeutil.StorerForBlockstore(bs)
destDagService = merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err := dt2.RegisterTransportConfigurer(&testutil.FakeDTType{}, func(channelID datatransfer.ChannelID, testVoucher datatransfer.Voucher, transport datatransfer.Transport) {
fv, ok := testVoucher.(*testutil.FakeDTType)
if ok && fv.Data == voucher.Data {
gsTransport, ok := transport.(*tp.Transport)
if ok {
err := gsTransport.UseStore(channelID, loader, storer)
require.NoError(t, err)
}
opens := 0
completes := 0
sentIncrements := make([]uint64, 0, 21)
receivedIncrements := make([]uint64, 0, 21)
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
completes++
case <-opened:
opens++
case sentIncrement := <-sent:
sentIncrements = append(sentIncrements, sentIncrement)
case receivedIncrement := <-received:
receivedIncrements = append(receivedIncrements, receivedIncrement)
case <-errChan:
t.Fatal("received error on data transfer")
}
})
require.NoError(t, err)
} else {
destDagService = gsData.DagService2
}

var chid datatransfer.ChannelID
if data.isPull {
sv.ExpectSuccessPull()
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt2.OpenPullDataChannel(ctx, host1.ID(), &voucher, rootCid, gsData.AllSelector)
} else {
sv.ExpectSuccessPush()
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err = dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
}
require.NoError(t, err)
opens := 0
completes := 0
sentIncrements := make([]uint64, 0, 21)
receivedIncrements := make([]uint64, 0, 21)
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
case <-finished:
completes++
case <-opened:
opens++
case sentIncrement := <-sent:
sentIncrements = append(sentIncrements, sentIncrement)
case receivedIncrement := <-received:
receivedIncrements = append(receivedIncrements, receivedIncrement)
case <-errChan:
t.Fatal("received error on data transfer")
}
}
require.Equal(t, sentIncrements, receivedIncrements)
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
if data.isPull {
assert.Equal(t, chid.Initiator, host2.ID())
} else {
assert.Equal(t, chid.Initiator, host1.ID())
}
})
}
require.Equal(t, sentIncrements, receivedIncrements)
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
if data.isPull {
assert.Equal(t, chid.Initiator, host2.ID())
} else {
assert.Equal(t, chid.Initiator, host1.ID())
}
})
}
} //
}

func TestMultipleRoundTripMultipleStores(t *testing.T) {
Expand All @@ -237,7 +254,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

Expand Down Expand Up @@ -364,7 +381,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

tp1 := gsData.SetupGSTransportHost1()
Expand Down Expand Up @@ -499,7 +516,7 @@ func TestRoundTripCancelledRequest(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2

Expand Down Expand Up @@ -642,7 +659,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 4*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender

root := gsData.LoadUnixFSFile(t, false)
Expand Down Expand Up @@ -771,7 +788,7 @@ func TestPauseAndResume(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

Expand Down Expand Up @@ -914,7 +931,7 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
// ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
// defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

Expand Down Expand Up @@ -985,7 +1002,7 @@ func TestDataTransferSubscribing(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host2 := gsData.Host2

tp1 := gsData.SetupGSTransportHost1()
Expand Down Expand Up @@ -1115,7 +1132,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator and data sender
host2 := gsData.Host2 // data recipient, makes graphsync request for data
voucher := testutil.NewFakeDTType()
Expand Down Expand Up @@ -1199,7 +1216,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
host1 := gsData.Host1 // initiator and data sender
host2 := gsData.Host2 // data recipient, makes graphsync request for data
voucher := testutil.FakeDTType{Data: "applesauce"}
Expand Down Expand Up @@ -1326,7 +1343,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t)
gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)

// setup receiving peer to just record message coming in
gsr := &fakeGraphSyncReceiver{
Expand Down
Loading

0 comments on commit b80beae

Please sign in to comment.