diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go new file mode 100644 index 00000000..25eeada2 --- /dev/null +++ b/benchmarks/benchmark_test.go @@ -0,0 +1,207 @@ +package graphsync_test + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + "io/ioutil" + "os" + "runtime" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-graphsync/benchmarks/testinstance" + tn "github.com/ipfs/go-graphsync/benchmarks/testnet" + blockstore "github.com/ipfs/go-ipfs-blockstore" + chunker "github.com/ipfs/go-ipfs-chunker" + delay "github.com/ipfs/go-ipfs-delay" + offline "github.com/ipfs/go-ipfs-exchange-offline" + files "github.com/ipfs/go-ipfs-files" + ipldformat "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-merkledag" + "github.com/ipfs/go-unixfs/importer/balanced" + ihelper "github.com/ipfs/go-unixfs/importer/helpers" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + ipldselector "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/stretchr/testify/require" +) + +const stdBlockSize = 8000 + +type runStats struct { + Time time.Duration + Name string +} + +var benchmarkLog []runStats + +func BenchmarkRoundtripSuccess(b *testing.B) { + ctx := context.Background() + tdm, err := newTempDirMaker(b) + require.NoError(b, err) + b.Run("test-20-10000", func(b *testing.B) { + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm) + }) +} + +func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + net := tn.VirtualNetwork(d) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + instances, err := ig.Instances(numnodes + b.N) + require.NoError(b, err) + destCids := df(ctx, b, instances[:numnodes]) + // Set the blockstore latency on seed nodes + if bstoreLatency > 0 { + for _, i := range instances { + i.SetBlockstoreLatency(bstoreLatency) + } + } + ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) + + allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node() + + runtime.GC() + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + fetcher := instances[i+numnodes] + var wg sync.WaitGroup + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + require.NoError(b, err) + start := time.Now() + for j := 0; j < numnodes; j++ { + instance := instances[j] + _, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: destCids[j]}, allSelector) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case err, ok := <-errChan: + if !ok { + return + } + b.Fatalf("received error on request: %s", err.Error()) + } + } + }() + } + wg.Wait() + result := runStats{ + Time: time.Since(start), + Name: b.Name(), + } + benchmarkLog = append(benchmarkLog, result) + cancel() + fetcher.Close() + } + testinstance.Close(instances) + ig.Close() + +} + +type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid + +const unixfsChunkSize uint64 = 1 << 10 +const unixfsLinksPerLevel = 1024 + +func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid { + + data := make([]byte, size) + _, err := rand.Read(data) + require.NoError(b, err) + buf := bytes.NewReader(data) + file := files.NewReaderFile(buf) + + dagService := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + + // import to UnixFS + bufferedDS := ipldformat.NewBufferedDAG(ctx, dagService) + + params := ihelper.DagBuilderParams{ + Maxlinks: unixfsLinksPerLevel, + RawLeaves: true, + CidBuilder: nil, + Dagserv: bufferedDS, + } + + db, err := params.New(chunker.NewSizeSplitter(file, int64(unixfsChunkSize))) + require.NoError(b, err, "unable to setup dag builder") + + nd, err := balanced.Layout(db) + require.NoError(b, err, "unable to create unix fs node") + + err = bufferedDS.Commit() + require.NoError(b, err, "unable to commit unix fs node") + + return nd.Cid() +} + +func allFilesUniformSize(size uint64) distFunc { + return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid { + cids := make([]cid.Cid, 0, len(provs)) + for _, prov := range provs { + c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size) + cids = append(cids, c) + } + return cids + } +} + +type tempDirMaker struct { + tdm string + tempDirSeq int32 + b *testing.B +} + +var tempDirReplacer struct { + sync.Once + r *strings.Replacer +} + +// Cribbed from https://github.com/golang/go/blob/master/src/testing/testing.go#L890 +// and modified as needed due to https://github.com/golang/go/issues/41062 +func newTempDirMaker(b *testing.B) (*tempDirMaker, error) { + c := &tempDirMaker{} + // ioutil.TempDir doesn't like path separators in its pattern, + // so mangle the name to accommodate subtests. + tempDirReplacer.Do(func() { + tempDirReplacer.r = strings.NewReplacer("/", "_", "\\", "_", ":", "_") + }) + pattern := tempDirReplacer.r.Replace(b.Name()) + + var err error + c.tdm, err = ioutil.TempDir("", pattern) + if err != nil { + return nil, err + } + b.Cleanup(func() { + if err := os.RemoveAll(c.tdm); err != nil { + b.Errorf("TempDir RemoveAll cleanup: %v", err) + } + }) + return c, nil +} + +func (tdm *tempDirMaker) TempDir() string { + seq := atomic.AddInt32(&tdm.tempDirSeq, 1) + dir := fmt.Sprintf("%s%c%03d", tdm.tdm, os.PathSeparator, seq) + if err := os.Mkdir(dir, 0777); err != nil { + tdm.b.Fatalf("TempDir: %v", err) + } + return dir +} diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go new file mode 100644 index 00000000..c0d4e8eb --- /dev/null +++ b/benchmarks/testinstance/testinstance.go @@ -0,0 +1,169 @@ +package testinstance + +import ( + "context" + "time" + + "github.com/ipfs/go-datastore" + ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/delayed" + ds_sync "github.com/ipfs/go-datastore/sync" + graphsync "github.com/ipfs/go-graphsync" + tn "github.com/ipfs/go-graphsync/benchmarks/testnet" + gsimpl "github.com/ipfs/go-graphsync/impl" + gsnet "github.com/ipfs/go-graphsync/network" + "github.com/ipfs/go-graphsync/storeutil" + blockstore "github.com/ipfs/go-ipfs-blockstore" + delay "github.com/ipfs/go-ipfs-delay" + "github.com/ipld/go-ipld-prime" + peer "github.com/libp2p/go-libp2p-core/peer" + p2ptestutil "github.com/libp2p/go-libp2p-netutil" + tnet "github.com/libp2p/go-libp2p-testing/net" +) + +// TempDirGenerator is any interface that can generate temporary directories +type TempDirGenerator interface { + TempDir() string +} + +// NewTestInstanceGenerator generates a new InstanceGenerator for the given +// testnet +func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator { + ctx, cancel := context.WithCancel(ctx) + return InstanceGenerator{ + net: net, + seq: 0, + ctx: ctx, // TODO take ctx as param to Next, Instances + cancel: cancel, + gsOptions: gsOptions, + tempDirGenerator: tempDirGenerator, + } +} + +// InstanceGenerator generates new test instances of bitswap+dependencies +type InstanceGenerator struct { + seq int + net tn.Network + ctx context.Context + cancel context.CancelFunc + gsOptions []gsimpl.Option + tempDirGenerator TempDirGenerator +} + +// Close closes the clobal context, shutting down all test instances +func (g *InstanceGenerator) Close() error { + g.cancel() + return nil // for Closer interface +} + +// Next generates a new instance of graphsync + dependencies +func (g *InstanceGenerator) Next() (Instance, error) { + g.seq++ + p, err := p2ptestutil.RandTestBogusIdentity() + if err != nil { + return Instance{}, err + } + return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir()) +} + +// Instances creates N test instances of bitswap + dependencies and connects +// them to each other +func (g *InstanceGenerator) Instances(n int) ([]Instance, error) { + var instances []Instance + for j := 0; j < n; j++ { + inst, err := g.Next() + if err != nil { + return nil, err + } + instances = append(instances, inst) + } + ConnectInstances(instances) + return instances, nil +} + +// ConnectInstances connects the given instances to each other +func ConnectInstances(instances []Instance) { + for i, inst := range instances { + for j := i + 1; j < len(instances); j++ { + oinst := instances[j] + err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer) + if err != nil { + panic(err.Error()) + } + } + } +} + +// Close closes multiple instances at once +func Close(instances []Instance) error { + for _, i := range instances { + if err := i.Close(); err != nil { + return err + } + } + return nil +} + +// Instance is a test instance of bitswap + dependencies for integration testing +type Instance struct { + Peer peer.ID + Loader ipld.Loader + Storer ipld.Storer + Exchange graphsync.GraphExchange + BlockStore blockstore.Blockstore + Adapter gsnet.GraphSyncNetwork + blockstoreDelay delay.D + ds datastore.Batching +} + +// Close closes the associated datastore +func (i *Instance) Close() error { + return i.ds.Close() +} + +// Blockstore returns the block store for this test instance +func (i *Instance) Blockstore() blockstore.Blockstore { + return i.BlockStore +} + +// SetBlockstoreLatency customizes the artificial delay on receiving blocks +// from a blockstore test instance. +func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { + return i.blockstoreDelay.Set(t) +} + +// NewInstance creates a test bitswap instance. +// +// NB: It's easy make mistakes by providing the same peer ID to two different +// instances. To safeguard, use the InstanceGenerator to generate instances. It's +// just a much better idea. +func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) { + bsdelay := delay.Fixed(0) + + adapter := net.Adapter(p) + dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + bstore, err := blockstore.CachedBlockstore(ctx, + blockstore.NewBlockstore(dstore), + blockstore.DefaultCacheOpts()) + if err != nil { + return Instance{}, err + } + + loader := storeutil.LoaderForBlockstore(bstore) + storer := storeutil.StorerForBlockstore(bstore) + gs := gsimpl.New(ctx, adapter, loader, storer, gsOptions...) + gs.RegisterIncomingRequestHook(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { + hookActions.ValidateRequest() + }) + + return Instance{ + Adapter: adapter, + Peer: p.ID(), + Exchange: gs, + Loader: loader, + Storer: storer, + BlockStore: bstore, + blockstoreDelay: bsdelay, + ds: dstore, + }, nil +} diff --git a/benchmarks/testnet/interface.go b/benchmarks/testnet/interface.go new file mode 100644 index 00000000..ab65debe --- /dev/null +++ b/benchmarks/testnet/interface.go @@ -0,0 +1,16 @@ +package testnet + +import ( + gsnet "github.com/ipfs/go-graphsync/network" + + "github.com/libp2p/go-libp2p-core/peer" + tnet "github.com/libp2p/go-libp2p-testing/net" +) + +// Network is an interface for generating graphsync network interfaces +// based on a test network. +type Network interface { + Adapter(tnet.Identity) gsnet.GraphSyncNetwork + HasPeer(peer.ID) bool +} + diff --git a/benchmarks/testnet/internet_latency_delay_generator.go b/benchmarks/testnet/internet_latency_delay_generator.go new file mode 100644 index 00000000..eca20941 --- /dev/null +++ b/benchmarks/testnet/internet_latency_delay_generator.go @@ -0,0 +1,63 @@ +package testnet + +import ( + "math/rand" + "time" + + delay "github.com/ipfs/go-ipfs-delay" +) + +var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano())) + +// InternetLatencyDelayGenerator generates three clusters of delays, +// typical of the type of peers you would encounter on the interenet. +// Given a base delay time T, the wait time generated will be either: +// 1. A normalized distribution around the base time +// 2. A normalized distribution around the base time plus a "medium" delay +// 3. A normalized distribution around the base time plus a "large" delay +// The size of the medium & large delays are determined when the generator +// is constructed, as well as the relative percentages with which delays fall +// into each of the three different clusters, and the standard deviation for +// the normalized distribution. +// This can be used to generate a number of scenarios typical of latency +// distribution among peers on the internet. +func InternetLatencyDelayGenerator( + mediumDelay time.Duration, + largeDelay time.Duration, + percentMedium float64, + percentLarge float64, + std time.Duration, + rng *rand.Rand) delay.Generator { + if rng == nil { + rng = sharedRNG + } + + return &internetLatencyDelayGenerator{ + mediumDelay: mediumDelay, + largeDelay: largeDelay, + percentLarge: percentLarge, + percentMedium: percentMedium, + std: std, + rng: rng, + } +} + +type internetLatencyDelayGenerator struct { + mediumDelay time.Duration + largeDelay time.Duration + percentLarge float64 + percentMedium float64 + std time.Duration + rng *rand.Rand +} + +func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration { + clusterDistribution := d.rng.Float64() + baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t + if clusterDistribution < d.percentLarge { + return baseDelay + d.largeDelay + } else if clusterDistribution < d.percentMedium+d.percentLarge { + return baseDelay + d.mediumDelay + } + return baseDelay +} diff --git a/benchmarks/testnet/internet_latency_delay_generator_test.go b/benchmarks/testnet/internet_latency_delay_generator_test.go new file mode 100644 index 00000000..ec6a44f0 --- /dev/null +++ b/benchmarks/testnet/internet_latency_delay_generator_test.go @@ -0,0 +1,62 @@ +package testnet_test + +import ( + "math" + "math/rand" + "testing" + "time" + + "github.com/ipfs/go-graphsync/benchmarks/testnet" + "github.com/stretchr/testify/require" +) + +const testSeed = 99 + +func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) { + initialValue := 1000 * time.Millisecond + deviation := 100 * time.Millisecond + mediumDelay := 1000 * time.Millisecond + largeDelay := 3000 * time.Millisecond + percentMedium := 0.2 + percentLarge := 0.4 + buckets := make(map[string]int) + internetLatencyDistributionDelay := testnet.InternetLatencyDelayGenerator( + mediumDelay, + largeDelay, + percentMedium, + percentLarge, + deviation, + rand.New(rand.NewSource(testSeed))) + + buckets["fast"] = 0 + buckets["medium"] = 0 + buckets["slow"] = 0 + buckets["outside_1_deviation"] = 0 + + // strategy here is rather than mock randomness, just use enough samples to + // get approximately the distribution you'd expect + for i := 0; i < 10000; i++ { + next := internetLatencyDistributionDelay.NextWaitTime(initialValue) + if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() { + buckets["fast"]++ + } else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() { + buckets["medium"]++ + } else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() { + buckets["slow"]++ + } else { + buckets["outside_1_deviation"]++ + } + } + totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"]) + oneDeviationPercentage := totalInOneDeviation / 10000 + fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation + mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation + slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation + + // see 68-95-99 rule for normal distributions + require.LessOrEqual(t, math.Abs(oneDeviationPercentage-0.6827), 0.1, "should distribute values normally based on standard deviation") + + require.LessOrEqual(t, math.Abs(fastPercentageResult+percentMedium+percentLarge-1), 0.1, "should have correct percentage of values distributed around fast delay time") + require.LessOrEqual(t, math.Abs(mediumPercentageResult-percentMedium), 0.1, "should have correct percentage of values distributed around medium delay time") + require.LessOrEqual(t, math.Abs(slowPercentageResult-percentLarge), 0.1, "should have correct percentage of values distributed around slow delay time") +} diff --git a/benchmarks/testnet/network_test.go b/benchmarks/testnet/network_test.go new file mode 100644 index 00000000..c414b341 --- /dev/null +++ b/benchmarks/testnet/network_test.go @@ -0,0 +1,101 @@ +package testnet_test + +import ( + "context" + "sync" + "testing" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-graphsync/benchmarks/testnet" + gsmsg "github.com/ipfs/go-graphsync/message" + gsnet "github.com/ipfs/go-graphsync/network" + delay "github.com/ipfs/go-ipfs-delay" + + "github.com/libp2p/go-libp2p-core/peer" + tnet "github.com/libp2p/go-libp2p-testing/net" +) + +func TestSendMessageAsyncButWaitForResponse(t *testing.T) { + net := testnet.VirtualNetwork(delay.Fixed(0)) + responderPeer := tnet.RandIdentityOrFatal(t) + waiter := net.Adapter(tnet.RandIdentityOrFatal(t)) + responder := net.Adapter(responderPeer) + + var wg sync.WaitGroup + + wg.Add(1) + + expectedStr := "received async" + + responder.SetDelegate(lambda(func( + ctx context.Context, + fromWaiter peer.ID, + msgFromWaiter gsmsg.GraphSyncMessage) { + + msgToWaiter := gsmsg.New() + msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) + err := waiter.SendMessage(ctx, fromWaiter, msgToWaiter) + if err != nil { + t.Error(err) + } + })) + + waiter.SetDelegate(lambda(func( + ctx context.Context, + fromResponder peer.ID, + msgFromResponder gsmsg.GraphSyncMessage) { + + // TODO assert that this came from the correct peer and that the message contents are as expected + ok := false + for _, b := range msgFromResponder.Blocks() { + if string(b.RawData()) == expectedStr { + wg.Done() + ok = true + } + } + + if !ok { + t.Fatal("Message not received from the responder") + } + })) + + messageSentAsync := gsmsg.New() + messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) + errSending := waiter.SendMessage( + context.Background(), responderPeer.ID(), messageSentAsync) + if errSending != nil { + t.Fatal(errSending) + } + + wg.Wait() // until waiter delegate function is executed +} + +type receiverFunc func(ctx context.Context, p peer.ID, + incoming gsmsg.GraphSyncMessage) + +// lambda returns a Receiver instance given a receiver function +func lambda(f receiverFunc) gsnet.Receiver { + return &lambdaImpl{ + f: f, + } +} + +type lambdaImpl struct { + f func(ctx context.Context, p peer.ID, incoming gsmsg.GraphSyncMessage) +} + +func (lam *lambdaImpl) ReceiveMessage(ctx context.Context, + p peer.ID, incoming gsmsg.GraphSyncMessage) { + lam.f(ctx, p, incoming) +} + +func (lam *lambdaImpl) ReceiveError(err error) { + // TODO log error +} + +func (lam *lambdaImpl) Connected(p peer.ID) { + // TODO +} +func (lam *lambdaImpl) Disconnected(peer.ID) { + // TODO +} diff --git a/benchmarks/testnet/rate_limit_generators.go b/benchmarks/testnet/rate_limit_generators.go new file mode 100644 index 00000000..1fe3fcc6 --- /dev/null +++ b/benchmarks/testnet/rate_limit_generators.go @@ -0,0 +1,42 @@ +package testnet + +import ( + "math/rand" +) + +type fixedRateLimitGenerator struct { + rateLimit float64 +} + +// FixedRateLimitGenerator returns a rate limit generatoe that always generates +// the specified rate limit in bytes/sec. +func FixedRateLimitGenerator(rateLimit float64) RateLimitGenerator { + return &fixedRateLimitGenerator{rateLimit} +} + +func (rateLimitGenerator *fixedRateLimitGenerator) NextRateLimit() float64 { + return rateLimitGenerator.rateLimit +} + +type variableRateLimitGenerator struct { + rateLimit float64 + std float64 + rng *rand.Rand +} + +// VariableRateLimitGenerator makes rate limites that following a normal distribution. +func VariableRateLimitGenerator(rateLimit float64, std float64, rng *rand.Rand) RateLimitGenerator { + if rng == nil { + rng = sharedRNG + } + + return &variableRateLimitGenerator{ + std: std, + rng: rng, + rateLimit: rateLimit, + } +} + +func (rateLimitGenerator *variableRateLimitGenerator) NextRateLimit() float64 { + return rateLimitGenerator.rng.NormFloat64()*rateLimitGenerator.std + rateLimitGenerator.rateLimit +} diff --git a/benchmarks/testnet/virtual.go b/benchmarks/testnet/virtual.go new file mode 100644 index 00000000..47555df5 --- /dev/null +++ b/benchmarks/testnet/virtual.go @@ -0,0 +1,307 @@ +package testnet + +import ( + "context" + "errors" + "sort" + "sync" + "time" + + gsmsg "github.com/ipfs/go-graphsync/message" + gsnet "github.com/ipfs/go-graphsync/network" + + delay "github.com/ipfs/go-ipfs-delay" + mockrouting "github.com/ipfs/go-ipfs-routing/mock" + + "github.com/libp2p/go-libp2p-core/peer" + tnet "github.com/libp2p/go-libp2p-testing/net" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" +) + +// VirtualNetwork generates a new testnet instance - a fake network that +// is used to simulate sending messages. +func VirtualNetwork(d delay.D) Network { + return &network{ + latencies: make(map[peer.ID]map[peer.ID]time.Duration), + clients: make(map[peer.ID]*receiverQueue), + delay: d, + isRateLimited: false, + rateLimitGenerator: nil, + conns: make(map[string]struct{}), + } +} + +// RateLimitGenerator is an interface for generating rate limits across peers +type RateLimitGenerator interface { + NextRateLimit() float64 +} + +// RateLimitedVirtualNetwork generates a testnet instance where nodes are rate +// limited in the upload/download speed. +func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network { + return &network{ + latencies: make(map[peer.ID]map[peer.ID]time.Duration), + rateLimiters: make(map[peer.ID]map[peer.ID]*mocknet.RateLimiter), + clients: make(map[peer.ID]*receiverQueue), + delay: d, + isRateLimited: true, + rateLimitGenerator: rateLimitGenerator, + conns: make(map[string]struct{}), + } +} + +type network struct { + mu sync.Mutex + latencies map[peer.ID]map[peer.ID]time.Duration + rateLimiters map[peer.ID]map[peer.ID]*mocknet.RateLimiter + clients map[peer.ID]*receiverQueue + delay delay.D + isRateLimited bool + rateLimitGenerator RateLimitGenerator + conns map[string]struct{} +} + +type message struct { + from peer.ID + msg gsmsg.GraphSyncMessage + shouldSend time.Time +} + +// receiverQueue queues up a set of messages to be sent, and sends them *in +// order* with their delays respected as much as sending them in order allows +// for +type receiverQueue struct { + receiver *networkClient + queue []*message + active bool + lk sync.Mutex +} + +func (n *network) Adapter(p tnet.Identity) gsnet.GraphSyncNetwork { + n.mu.Lock() + defer n.mu.Unlock() + + client := &networkClient{ + local: p.ID(), + network: n, + } + n.clients[p.ID()] = &receiverQueue{receiver: client} + return client +} + +func (n *network) HasPeer(p peer.ID) bool { + n.mu.Lock() + defer n.mu.Unlock() + + _, found := n.clients[p] + return found +} + +// TODO should this be completely asynchronous? +// TODO what does the network layer do with errors received from services? +func (n *network) SendMessage( + ctx context.Context, + from peer.ID, + to peer.ID, + mes gsmsg.GraphSyncMessage) error { + + mes = mes.Clone() + + n.mu.Lock() + defer n.mu.Unlock() + + latencies, ok := n.latencies[from] + if !ok { + latencies = make(map[peer.ID]time.Duration) + n.latencies[from] = latencies + } + + latency, ok := latencies[to] + if !ok { + latency = n.delay.NextWaitTime() + latencies[to] = latency + } + + var bandwidthDelay time.Duration + if n.isRateLimited { + rateLimiters, ok := n.rateLimiters[from] + if !ok { + rateLimiters = make(map[peer.ID]*mocknet.RateLimiter) + n.rateLimiters[from] = rateLimiters + } + + rateLimiter, ok := rateLimiters[to] + if !ok { + rateLimiter = mocknet.NewRateLimiter(n.rateLimitGenerator.NextRateLimit()) + rateLimiters[to] = rateLimiter + } + + pbMsg, err := mes.ToProto() + if err != nil { + return err + } + size := pbMsg.Size() + bandwidthDelay = rateLimiter.Limit(size) + } else { + bandwidthDelay = 0 + } + + receiver, ok := n.clients[to] + if !ok { + return errors.New("cannot locate peer on network") + } + + // nb: terminate the context since the context wouldn't actually be passed + // over the network in a real scenario + + msg := &message{ + from: from, + msg: mes, + shouldSend: time.Now().Add(latency).Add(bandwidthDelay), + } + receiver.enqueue(msg) + + return nil +} + +type networkClient struct { + local peer.ID + gsnet.Receiver + network *network +} + +func (nc *networkClient) SendMessage( + ctx context.Context, + to peer.ID, + message gsmsg.GraphSyncMessage) error { + if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil { + return err + } + return nil +} + +type messagePasser struct { + net *networkClient + target peer.ID + local peer.ID + ctx context.Context +} + +func (mp *messagePasser) SendMsg(ctx context.Context, m gsmsg.GraphSyncMessage) error { + return mp.net.SendMessage(ctx, mp.target, m) +} + +func (mp *messagePasser) Close() error { + return nil +} + +func (mp *messagePasser) Reset() error { + return nil +} + +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (gsnet.MessageSender, error) { + return &messagePasser{ + net: nc, + target: p, + local: nc.local, + ctx: ctx, + }, nil +} + +func (nc *networkClient) SetDelegate(r gsnet.Receiver) { + nc.Receiver = r +} + +func (nc *networkClient) ConnectTo(_ context.Context, p peer.ID) error { + nc.network.mu.Lock() + otherClient, ok := nc.network.clients[p] + if !ok { + nc.network.mu.Unlock() + return errors.New("no such peer in network") + } + + tag := tagForPeers(nc.local, p) + if _, ok := nc.network.conns[tag]; ok { + nc.network.mu.Unlock() + // log.Warning("ALREADY CONNECTED TO PEER (is this a reconnect? test lib needs fixing)") + return nil + } + nc.network.conns[tag] = struct{}{} + nc.network.mu.Unlock() + + otherClient.receiver.Connected(nc.local) + nc.Receiver.Connected(p) + return nil +} + +func (nc *networkClient) DisconnectFrom(_ context.Context, p peer.ID) error { + nc.network.mu.Lock() + defer nc.network.mu.Unlock() + + otherClient, ok := nc.network.clients[p] + if !ok { + return errors.New("no such peer in network") + } + + tag := tagForPeers(nc.local, p) + if _, ok := nc.network.conns[tag]; !ok { + // Already disconnected + return nil + } + delete(nc.network.conns, tag) + + otherClient.receiver.Disconnected(nc.local) + nc.Receiver.Disconnected(p) + return nil +} + +func (rq *receiverQueue) enqueue(m *message) { + rq.lk.Lock() + defer rq.lk.Unlock() + rq.queue = append(rq.queue, m) + if !rq.active { + rq.active = true + go rq.process() + } +} + +func (rq *receiverQueue) Swap(i, j int) { + rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i] +} + +func (rq *receiverQueue) Len() int { + return len(rq.queue) +} + +func (rq *receiverQueue) Less(i, j int) bool { + return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano() +} + +func (rq *receiverQueue) process() { + for { + rq.lk.Lock() + sort.Sort(rq) + if len(rq.queue) == 0 { + rq.active = false + rq.lk.Unlock() + return + } + m := rq.queue[0] + if time.Until(m.shouldSend).Seconds() < 0.1 { + rq.queue = rq.queue[1:] + rq.lk.Unlock() + time.Sleep(time.Until(m.shouldSend)) + rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) + } else { + rq.lk.Unlock() + time.Sleep(100 * time.Millisecond) + } + } +} + +func tagForPeers(a, b peer.ID) string { + if a < b { + return string(a + b) + } + return string(b + a) +} diff --git a/go.mod b/go.mod index fb603f14..d09ed700 100644 --- a/go.mod +++ b/go.mod @@ -14,21 +14,26 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 + github.com/ipfs/go-ipfs-delay v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 + github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-ipld-cbor v0.0.4 // indirect github.com/ipfs/go-ipld-format v0.2.0 - github.com/ipfs/go-log v1.0.2 + github.com/ipfs/go-log v1.0.3 github.com/ipfs/go-merkledag v0.3.1 github.com/ipfs/go-peertaskqueue v0.2.0 github.com/ipfs/go-unixfs v0.2.4 - github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e - github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1 + github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef + github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c + github.com/jbenet/goprocess v0.1.4 // indirect github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-core v0.5.0 + github.com/libp2p/go-libp2p-netutil v0.1.0 github.com/libp2p/go-libp2p-record v0.1.1 // indirect + github.com/libp2p/go-libp2p-testing v0.1.1 github.com/multiformats/go-multiaddr v0.2.1 github.com/multiformats/go-multihash v0.0.13 github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a // indirect @@ -37,6 +42,5 @@ require ( github.com/stretchr/testify v1.5.1 github.com/whyrusleeping/cbor-gen v0.0.0-20200402171437-3d27c146c105 // indirect go.uber.org/multierr v1.4.0 // indirect - golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect - golang.org/x/tools v0.0.0-20191216173652-a0e659d51361 // indirect + golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3 // indirect ) diff --git a/go.sum b/go.sum index daf4e10c..ff3f281d 100644 --- a/go.sum +++ b/go.sum @@ -45,9 +45,12 @@ github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018 h1:6xT9KW8zLC github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018/go.mod h1:rQYf4tfk5sSwFsnDg3qYaBxSjsD9S8+59vW0dKUgme4= github.com/dgraph-io/badger v1.5.5-0.20190226225317-8115aed38f8f/go.mod h1:VZxzAIRPHRVNRKRo6AXrX9BJegn6il06VMTZVJYCIjQ= github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= +github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgryski/go-farm v0.0.0-20190104051053-3adb47b1fb0f/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -198,8 +201,12 @@ github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= github.com/ipfs/go-log v1.0.2 h1:s19ZwJxH8rPWzypjcDpqPLIyV7BnbLqvpli3iZoqYK0= github.com/ipfs/go-log v1.0.2/go.mod h1:1MNjMxe0u6xvJZgeqbJ8vdo2TKaGwZ1a0Bpza+sr2Sk= +github.com/ipfs/go-log v1.0.3 h1:Gg7SUYSZ7BrqaKMwM+hRgcAkKv4QLfzP4XPQt5Sx/OI= +github.com/ipfs/go-log v1.0.3/go.mod h1:OsLySYkwIbiSUR/yBTdv1qPtcE4FW3WPWk/ewz9Ru+A= github.com/ipfs/go-log/v2 v2.0.2 h1:xguurydRdfKMJjKyxNXNU8lYP0VZH1NUwJRwUorjuEw= github.com/ipfs/go-log/v2 v2.0.2/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= +github.com/ipfs/go-log/v2 v2.0.3 h1:Q2gXcBoCALyLN/pUQlz1qgu0x3uFV6FzP9oXhpfyJpc= +github.com/ipfs/go-log/v2 v2.0.3/go.mod h1:O7P1lJt27vWHhOwQmcFEvlmo49ry2VY2+JfBWFaa9+0= github.com/ipfs/go-merkledag v0.2.3 h1:aMdkK9G1hEeNvn3VXfiEMLY0iJnbiQQUHnM0HFJREsE= github.com/ipfs/go-merkledag v0.2.3/go.mod h1:SQiXrtSts3KGNmgOzMICy5c0POOpUNQLvB3ClKnBAlk= github.com/ipfs/go-merkledag v0.3.1 h1:3UqWINBEr3/N+r6OwgFXAddDP/8zpQX/8J7IGVOCqRQ= @@ -217,8 +224,10 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2 github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e h1:ZISbJlM0urTANR9KRfRaqlBmyOj5uUtxs2r4Up9IXsA= github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1 h1:K1Ysr7kgIlo7YQkPqdkA6H7BVdIugvuAz7OQUTJxLdE= -github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= +github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef h1:/yPelt/0CuzZsmRkYzBBnJ499JnAOGaIaAXHujx96ic= +github.com/ipld/go-ipld-prime v0.0.4-0.20200828224805-5ff8c8b0b6ef/go.mod h1:uVIwe/u0H4VdKv3kaN1ck7uCb6yD9cFLS9/ELyXbsw8= +github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b h1:ZtlW6pubN17TDaStlxgrwEXXwwUfJaXu9RobwczXato= +github.com/ipld/go-ipld-prime-proto v0.0.0-20200828231332-ae0aea07222b/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA= @@ -234,6 +243,8 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5f github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= +github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= +github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -569,6 +580,7 @@ github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 h1: github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7/go.mod h1:X2c0RVCI1eSUFI8eLcY3c0423ykwiUdxLJtkDvruhjI= github.com/x-cray/logrus-prefixed-formatter v0.5.2/go.mod h1:2duySbKsL6M18s5GU7VPsoEPHyzalCE06qoARUCeBBE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= @@ -607,6 +619,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d h1:1ZiEyfaQIg3Qh0EoqpwAakHVhecoE5wlSg5GjnafJGw= golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -614,7 +628,8 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -629,13 +644,14 @@ golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190611141213-3f473d35a33a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -659,6 +675,8 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -674,8 +692,9 @@ golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191216173652-a0e659d51361 h1:RIIXAeV6GvDBuADKumTODatUqANFZ+5BPMnzsy4hulY= -golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3 h1:r3P/5xOq/dK1991B65Oy6E1fRF/2d/fSYZJ/fXGVfJc= +golang.org/x/tools v0.0.0-20200827010519-17fd2f27a9e3/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/message/message.go b/message/message.go index d8c4d6f3..63bf71df 100644 --- a/message/message.go +++ b/message/message.go @@ -58,6 +58,8 @@ type GraphSyncMessage interface { Exportable Loggable() map[string]interface{} + + Clone() GraphSyncMessage } // Exportable is an interface that can serialize to a protobuf @@ -339,6 +341,20 @@ func (gsm *graphSyncMessage) Loggable() map[string]interface{} { } } +func (gsm *graphSyncMessage) Clone() GraphSyncMessage { + clone := newMsg() + for id, request := range gsm.requests { + clone.requests[id] = request + } + for id, response := range gsm.responses { + clone.responses[id] = response + } + for cid, block := range gsm.blocks { + clone.blocks[cid] = block + } + return clone +} + // ID Returns the request ID for this Request func (gsr GraphSyncRequest) ID() graphsync.RequestID { return gsr.id } diff --git a/requestmanager/executor/executor.go b/requestmanager/executor/executor.go index 8ce4cb4e..2e0cb7ee 100644 --- a/requestmanager/executor/executor.go +++ b/requestmanager/executor/executor.go @@ -132,7 +132,7 @@ func (re *requestExecutor) traverse() error { if err != nil { return err } - err = traverser.Advance(bytes.NewReader(result.Data)) + err = traverser.Advance(bytes.NewBuffer(result.Data)) if err != nil { return err } @@ -220,7 +220,7 @@ func (re *requestExecutor) processResult(traverser ipldutil.Traverser, link ipld if err != nil { return err } - err = traverser.Advance(bytes.NewReader(result.Data)) + err = traverser.Advance(bytes.NewBuffer(result.Data)) if err != nil { return err } diff --git a/responsemanager/runtraversal/runtraversal.go b/responsemanager/runtraversal/runtraversal.go index 45bc69c8..fc30f062 100644 --- a/responsemanager/runtraversal/runtraversal.go +++ b/responsemanager/runtraversal/runtraversal.go @@ -33,13 +33,16 @@ func RunTraversal( if err != nil { traverser.Error(traversal.SkipMe{}) } else { - var blockBuffer bytes.Buffer - _, err = io.Copy(&blockBuffer, result) + blockBuffer, ok := result.(*bytes.Buffer) + if !ok { + blockBuffer = new(bytes.Buffer) + _, err = io.Copy(blockBuffer, result) + } if err != nil { traverser.Error(err) } else { data = blockBuffer.Bytes() - err = traverser.Advance(&blockBuffer) + err = traverser.Advance(blockBuffer) if err != nil { return err } diff --git a/storeutil/storeutil.go b/storeutil/storeutil.go index f438c878..57d5e1a4 100644 --- a/storeutil/storeutil.go +++ b/storeutil/storeutil.go @@ -23,7 +23,7 @@ func LoaderForBlockstore(bs bstore.Blockstore) ipld.Loader { if err != nil { return nil, err } - return bytes.NewReader(block.RawData()), nil + return bytes.NewBuffer(block.RawData()), nil } } diff --git a/testutil/channelassertions.go b/testutil/channelassertions.go index 502ad819..58c84e66 100644 --- a/testutil/channelassertions.go +++ b/testutil/channelassertions.go @@ -3,20 +3,19 @@ package testutil import ( "context" "reflect" - "testing" "github.com/stretchr/testify/require" ) // AssertReceive verifies that a channel returns a value before the given context closes, and writes into // into out, which should be a pointer to the value type -func AssertReceive(ctx context.Context, t *testing.T, channel interface{}, out interface{}, errorMessage string) { +func AssertReceive(ctx context.Context, t TestingT, channel interface{}, out interface{}, errorMessage string) { AssertReceiveFirst(t, channel, out, errorMessage, ctx.Done()) } // AssertReceiveFirst verifies that a channel returns a value on the specified channel before the other channels, // and writes the value into out, which should be a pointer to the value type -func AssertReceiveFirst(t *testing.T, channel interface{}, out interface{}, errorMessage string, incorrectChannels ...interface{}) { +func AssertReceiveFirst(t TestingT, channel interface{}, out interface{}, errorMessage string, incorrectChannels ...interface{}) { chanValue := reflect.ValueOf(channel) outValue := reflect.ValueOf(out) require.Equal(t, reflect.Chan, chanValue.Kind(), "incorrect argument: should pass channel to read from") @@ -45,12 +44,12 @@ func AssertReceiveFirst(t *testing.T, channel interface{}, out interface{}, erro } // AssertDoesReceive verifies that a channel returns some value before the given context closes -func AssertDoesReceive(ctx context.Context, t *testing.T, channel interface{}, errorMessage string) { +func AssertDoesReceive(ctx context.Context, t TestingT, channel interface{}, errorMessage string) { AssertDoesReceiveFirst(t, channel, errorMessage, ctx.Done()) } // AssertDoesReceiveFirst asserts that the given channel receives a value before any of the other channels specified -func AssertDoesReceiveFirst(t *testing.T, channel interface{}, errorMessage string, incorrectChannels ...interface{}) { +func AssertDoesReceiveFirst(t TestingT, channel interface{}, errorMessage string, incorrectChannels ...interface{}) { chanValue := reflect.ValueOf(channel) require.Equal(t, reflect.Chan, chanValue.Kind(), "incorrect argument: should pass channel to read from") require.Contains(t, []reflect.ChanDir{reflect.BothDir, reflect.RecvDir}, chanValue.Type().ChanDir(), "incorrect argument: should pass a receiving channel") @@ -74,7 +73,7 @@ func AssertDoesReceiveFirst(t *testing.T, channel interface{}, errorMessage stri } // AssertChannelEmpty verifies that a channel has no value currently -func AssertChannelEmpty(t *testing.T, channel interface{}, errorMessage string) { +func AssertChannelEmpty(t TestingT, channel interface{}, errorMessage string) { chanValue := reflect.ValueOf(channel) require.Equal(t, reflect.Chan, chanValue.Kind(), "incorrect argument: should pass channel to read from") require.Contains(t, []reflect.ChanDir{reflect.BothDir, reflect.RecvDir}, chanValue.Type().ChanDir(), "incorrect argument: should pass a receiving channel") @@ -91,7 +90,7 @@ func AssertChannelEmpty(t *testing.T, channel interface{}, errorMessage string) } // AssertSends attempts to send the given input value to the given channel before the given context closes -func AssertSends(ctx context.Context, t *testing.T, channel interface{}, in interface{}, errorMessage string) { +func AssertSends(ctx context.Context, t TestingT, channel interface{}, in interface{}, errorMessage string) { chanValue := reflect.ValueOf(channel) inValue := reflect.ValueOf(in) require.Equal(t, reflect.Chan, chanValue.Kind(), "incorrect argument: should pass channel to send to") diff --git a/testutil/testchain.go b/testutil/testchain.go index c26690a2..f625f166 100644 --- a/testutil/testchain.go +++ b/testutil/testchain.go @@ -3,7 +3,6 @@ package testutil import ( "context" "io/ioutil" - "testing" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" @@ -18,12 +17,20 @@ import ( "github.com/ipfs/go-graphsync" ) +// TestingT covers the interface methods we need from either *testing.T or +// *testing.B +type TestingT interface { + Errorf(format string, args ...interface{}) + FailNow() + Fatal(args ...interface{}) +} + const blockChainTraversedNodesPerBlock = 2 // TestBlockChain is a simulated data structure similar to a blockchain // which graphsync is uniquely suited for type TestBlockChain struct { - t *testing.T + t TestingT blockChainLength int loader ipld.Loader GenisisNode ipld.Node @@ -87,7 +94,7 @@ func createBlock(parents []ipld.Link, size uint64) (ipld.Node, error) { // SetupBlockChain creates a new test block chain with the given height func SetupBlockChain( ctx context.Context, - t *testing.T, + t TestingT, loader ipld.Loader, storer ipld.Storer, size uint64, diff --git a/testutil/testutil.go b/testutil/testutil.go index 3ca91359..baf20b08 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -3,6 +3,7 @@ package testutil import ( "bytes" "context" + "fmt" "math/rand" "testing" @@ -61,7 +62,7 @@ func GeneratePeers(n int) []peer.ID { peerIds := make([]peer.ID, 0, n) for i := 0; i < n; i++ { peerSeq++ - p := peer.ID(peerSeq) + p := peer.ID(fmt.Sprint(peerSeq)) peerIds = append(peerIds, p) } return peerIds @@ -78,12 +79,12 @@ func ContainsPeer(peers []peer.ID, p peer.ID) bool { } // AssertContainsPeer will fail a test if the peer is not in the given peer list -func AssertContainsPeer(t *testing.T, peers []peer.ID, p peer.ID) { +func AssertContainsPeer(t TestingT, peers []peer.ID, p peer.ID) { require.True(t, ContainsPeer(peers, p), "given peer should be in list") } // RefuteContainsPeer will fail a test if the peer is in the given peer list -func RefuteContainsPeer(t *testing.T, peers []peer.ID, p peer.ID) { +func RefuteContainsPeer(t TestingT, peers []peer.ID, p peer.ID) { require.False(t, ContainsPeer(peers, p), "given peer should not be in list") } @@ -103,18 +104,18 @@ func ContainsBlock(blks []blocks.Block, block blocks.Block) bool { } // AssertContainsBlock will fail a test if the block is not in the given block list -func AssertContainsBlock(t *testing.T, blks []blocks.Block, block blocks.Block) { +func AssertContainsBlock(t TestingT, blks []blocks.Block, block blocks.Block) { require.True(t, ContainsBlock(blks, block), "given block should be in list") } // RefuteContainsBlock will fail a test if the block is in the given block list -func RefuteContainsBlock(t *testing.T, blks []blocks.Block, block blocks.Block) { +func RefuteContainsBlock(t TestingT, blks []blocks.Block, block blocks.Block) { require.False(t, ContainsBlock(blks, block), "given block should not be in list") } // CollectResponses is just a utility to convert a graphsync response progress // channel into an array. -func CollectResponses(ctx context.Context, t *testing.T, responseChan <-chan graphsync.ResponseProgress) []graphsync.ResponseProgress { +func CollectResponses(ctx context.Context, t TestingT, responseChan <-chan graphsync.ResponseProgress) []graphsync.ResponseProgress { var collectedBlocks []graphsync.ResponseProgress for { select { @@ -147,7 +148,7 @@ func CollectErrors(ctx context.Context, t *testing.T, errChan <-chan error) []er // ReadNResponses does a partial read from a ResponseProgress channel -- up // to n values -func ReadNResponses(ctx context.Context, t *testing.T, responseChan <-chan graphsync.ResponseProgress, count int) []graphsync.ResponseProgress { +func ReadNResponses(ctx context.Context, t TestingT, responseChan <-chan graphsync.ResponseProgress, count int) []graphsync.ResponseProgress { var returnedBlocks []graphsync.ResponseProgress for i := 0; i < count; i++ { select { @@ -162,7 +163,7 @@ func ReadNResponses(ctx context.Context, t *testing.T, responseChan <-chan graph // VerifySingleTerminalError verifies that exactly one error was sent over a channel // and then the channel was closed. -func VerifySingleTerminalError(ctx context.Context, t *testing.T, errChan <-chan error) { +func VerifySingleTerminalError(ctx context.Context, t TestingT, errChan <-chan error) { var err error AssertReceive(ctx, t, errChan, &err, "should receive an error") select { @@ -174,7 +175,7 @@ func VerifySingleTerminalError(ctx context.Context, t *testing.T, errChan <-chan } // VerifyHasErrors verifies that at least one error was sent over a channel -func VerifyHasErrors(ctx context.Context, t *testing.T, errChan <-chan error) { +func VerifyHasErrors(ctx context.Context, t TestingT, errChan <-chan error) { errCount := 0 for { select { @@ -192,7 +193,7 @@ func VerifyHasErrors(ctx context.Context, t *testing.T, errChan <-chan error) { // VerifyEmptyErrors verifies that no errors were sent over a channel before // it was closed -func VerifyEmptyErrors(ctx context.Context, t *testing.T, errChan <-chan error) { +func VerifyEmptyErrors(ctx context.Context, t TestingT, errChan <-chan error) { for { select { case _, ok := <-errChan: @@ -208,7 +209,7 @@ func VerifyEmptyErrors(ctx context.Context, t *testing.T, errChan <-chan error) // VerifyEmptyResponse verifies that no response progress happened before the // channel was closed. -func VerifyEmptyResponse(ctx context.Context, t *testing.T, responseChan <-chan graphsync.ResponseProgress) { +func VerifyEmptyResponse(ctx context.Context, t TestingT, responseChan <-chan graphsync.ResponseProgress) { for { select { case _, ok := <-responseChan: