Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/network/stream: remove dead code #19650

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{ //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
Hashes: nil,
From: 0,
To: 0,
Hashes: nil,
From: 0,
To: 0,
},
Peer: node.ID(),
},
Expand Down
8 changes: 2 additions & 6 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,6 @@ func (c *testExternalClient) NeedData(ctx context.Context, hash []byte) func(con
return wait
}

func (c *testExternalClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
return nil
}

func (c *testExternalClient) Close() {}

type testExternalServer struct {
Expand All @@ -343,15 +339,15 @@ func (s *testExternalServer) SessionIndex() (uint64, error) {
return s.sessionAt, nil
}

func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
if to > s.maxKeys {
to = s.maxKeys
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil, nil
return b, from, to, nil
}

func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down
15 changes: 4 additions & 11 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error {
// OfferedHashesMsg is the protocol msg for offering to hand over a
// stream section
type OfferedHashesMsg struct {
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
*HandoverProof // HandoverProof
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
}

// String pretty prints OfferedHashesMsg
Expand Down Expand Up @@ -265,7 +264,7 @@ func (p *Peer) handleOfferedHashesMsg(ctx context.Context, req *OfferedHashesMsg
}
}
select {
case c.next <- c.batchDone(p, req, hashes):
case c.next <- c.AddInterval(req.From, req.To):
case <-c.quit:
log.Debug("client.handleOfferedHashesMsg() quit")
case <-ctx.Done():
Expand Down Expand Up @@ -385,12 +384,6 @@ type Handover struct {
Root []byte // Root hash for indexed segment inclusion proofs
}

// HandoverProof represents a signed statement that the upstream peer handed over the stream section
type HandoverProof struct {
Sig []byte // Sign(Hash(Serialisation(Handover)))
*Handover
}

// Takeover represents a statement that downstream peer took over (stored all data)
nonsense marked this conversation as resolved.
Show resolved Hide resolved
// handed over
type Takeover Handover
Expand Down
16 changes: 5 additions & 11 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,20 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {

defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now())

hashes, from, to, proof, err := s.setNextBatch(f, t)
hashes, from, to, err := s.setNextBatch(f, t)
if err != nil {
return err
}
// true only when quitting
if len(hashes) == 0 {
return nil
}
if proof == nil {
proof = &HandoverProof{
Handover: &Handover{},
}
}
s.currentBatch = hashes
msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
Expand Down
25 changes: 3 additions & 22 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ type server struct {
// setNextBatch adjusts passed interval based on session index and whether
Copy link
Member Author

@nonsense nonsense May 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justelad I think we should extract this logic and have it unit tested. Basically the functionality which adjusts the input intervals and uses the adjusted intervals is not clear and not tested, which is weird, as there are very little input parameters to it:

  1. from
  2. to
  3. sessionIndex
  4. stream type (live or history)

This should have been unit tested, at least for the purpose of documentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function should not even exist IMO. not sure why we would ever want to override a value. e.g. when syncing is live and from is 0... but i think that with the first offered hashes msg the client already gets the actual session index from the server with the from range specified on the message. a lot of the code here is not needed, some of it also should return errors directly (which it doesn't)

// stream is live or history. It calls Server SetNextBatch with adjusted
// interval and returns batch hashes and their interval.
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
if s.stream.Live {
if from == 0 {
from = s.sessionIndex
Expand All @@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove
}
} else {
if (to < from && to != 0) || from > s.sessionIndex {
return nil, 0, 0, nil, nil
return nil, 0, 0, nil
}
if to == 0 || to > s.sessionIndex {
to = s.sessionIndex
Expand All @@ -500,7 +500,7 @@ type Server interface {
// Based on this index, live and history stream intervals
// will be adjusted before calling SetNextBatch.
SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()
}
Expand Down Expand Up @@ -544,7 +544,6 @@ func (c *client) NextInterval() (start, end uint64, err error) {
// Client interface for incoming peer Streamer
type Client interface {
NeedData(context.Context, []byte) func(context.Context) error
BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error)
Close()
}

Expand Down Expand Up @@ -574,24 +573,6 @@ func (c *client) nextBatch(from uint64) (nextFrom uint64, nextTo uint64) {
return
}

func (c *client) batchDone(p *Peer, req *OfferedHashesMsg, hashes []byte) error {
if tf := c.BatchDone(req.Stream, req.From, hashes, req.Root); tf != nil {
tp, err := tf()
if err != nil {
return err
}

if err := p.Send(context.TODO(), tp); err != nil {
return err
}
if c.to > 0 && tp.Takeover.End >= c.to {
return p.streamer.Unsubscribe(p.Peer.ID(), req.Stream)
}
return nil
}
return c.AddInterval(req.From, req.To)
}

func (c *client) close() {
select {
case <-c.quit:
Expand Down
63 changes: 3 additions & 60 deletions swarm/network/stream/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type testClient struct {
t string
wait0 chan bool
wait2 chan bool
batchDone chan bool
receivedHashes map[string][]byte
}

Expand All @@ -90,7 +89,6 @@ func newTestClient(t string) *testClient {
t: t,
wait0: make(chan bool),
wait2: make(chan bool),
batchDone: make(chan bool),
receivedHashes: make(map[string][]byte),
}
}
Expand All @@ -111,11 +109,6 @@ func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.
return nil
}

func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) {
close(self.batchDone)
return nil
}

func (self *testClient) Close() {}

type testServer struct {
Expand All @@ -134,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) {
return s.sessionIndex, nil
}

func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
return make([]byte, HashSize), from + 1, to + 1, nil, nil
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
return make([]byte, HashSize), from + 1, to + 1, nil
}

func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down Expand Up @@ -186,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -271,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand Down Expand Up @@ -337,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -448,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -461,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -521,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: corruptHashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -586,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -620,26 +592,9 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {

close(tc.wait0)

timeout := time.NewTimer(100 * time.Millisecond)
defer timeout.Stop()

select {
case <-tc.batchDone:
t.Fatal("batch done early")
case <-timeout.C:
}
time.Sleep(100 * time.Millisecond)

close(tc.wait2)

timeout2 := time.NewTimer(10000 * time.Millisecond)
defer timeout2.Stop()

select {
case <-tc.batchDone:
case <-timeout2.C:
t.Fatal("timeout waiting batchdone call")
}

}

func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Expand Down Expand Up @@ -694,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -707,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -811,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -915,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down
15 changes: 3 additions & 12 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// chunk addresses. If at least one chunk is added to the batch and no new chunks
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
batchStart := time.Now()
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
if err != nil {
metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
return nil, 0, 0, nil, err
return nil, 0, 0, err
}
batchSize++
if batchStartID == nil {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// if batch start id is not set, return 0
batchStartID = new(uint64)
}
return batch, *batchStartID, batchEndID, nil, nil
return batch, *batchStartID, batchEndID, nil
}

// SwarmSyncerClient
Expand Down Expand Up @@ -203,15 +203,6 @@ func (s *SwarmSyncerClient) NeedData(ctx context.Context, key []byte) (wait func
return s.netStore.FetchFunc(ctx, key)
}

// BatchDone
func (s *SwarmSyncerClient) BatchDone(stream Stream, from uint64, hashes []byte, root []byte) func() (*TakeoverProof, error) {
// TODO: reenable this with putter/getter refactored code
// if s.chunker != nil {
// return func() (*TakeoverProof, error) { return s.TakeoverProof(stream, from, hashes, root) }
// }
return nil
}

func (s *SwarmSyncerClient) Close() {}

// base for parsing and formating sync bin key
Expand Down