Skip to content

Commit

Permalink
swarm/network/stream: remove HandoverProof
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed May 31, 2019
1 parent 654a51a commit e9f4d1a
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 68 deletions.
7 changes: 3 additions & 4 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
{ //to which the peer responds with offered hashes
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: nil,
Hashes: nil,
From: 0,
To: 0,
Hashes: nil,
From: 0,
To: 0,
},
Peer: node.ID(),
},
Expand Down
4 changes: 2 additions & 2 deletions swarm/network/stream/intervals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,15 @@ func (s *testExternalServer) SessionIndex() (uint64, error) {
return s.sessionAt, nil
}

func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
if to > s.maxKeys {
to = s.maxKeys
}
b := make([]byte, HashSize*(to-from+1))
for i := from; i <= to; i++ {
s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i)
}
return b, from, to, nil, nil
return b, from, to, nil
}

func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down
13 changes: 3 additions & 10 deletions swarm/network/stream/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error {
// OfferedHashesMsg is the protocol msg for offering to hand over a
// stream section
type OfferedHashesMsg struct {
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
*HandoverProof // HandoverProof
Stream Stream // name of Stream
From, To uint64 // peer and db-specific entry count
Hashes []byte // stream of hashes (128)
}

// String pretty prints OfferedHashesMsg
Expand Down Expand Up @@ -385,12 +384,6 @@ type Handover struct {
Root []byte // Root hash for indexed segment inclusion proofs
}

// HandoverProof represents a signed statement that the upstream peer handed over the stream section
type HandoverProof struct {
Sig []byte // Sign(Hash(Serialisation(Handover)))
*Handover
}

// Takeover represents a statement that downstream peer took over (stored all data)
// handed over
type Takeover Handover
Expand Down
16 changes: 5 additions & 11 deletions swarm/network/stream/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,26 +183,20 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error {

defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now())

hashes, from, to, proof, err := s.setNextBatch(f, t)
hashes, from, to, err := s.setNextBatch(f, t)
if err != nil {
return err
}
// true only when quitting
if len(hashes) == 0 {
return nil
}
if proof == nil {
proof = &HandoverProof{
Handover: &Handover{},
}
}
s.currentBatch = hashes
msg := &OfferedHashesMsg{
HandoverProof: proof,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
Hashes: hashes,
From: from,
To: to,
Stream: s.stream,
}
log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to)
ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes")
Expand Down
6 changes: 3 additions & 3 deletions swarm/network/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ type server struct {
// setNextBatch adjusts passed interval based on session index and whether
// stream is live or history. It calls Server SetNextBatch with adjusted
// interval and returns batch hashes and their interval.
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
if s.stream.Live {
if from == 0 {
from = s.sessionIndex
Expand All @@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove
}
} else {
if (to < from && to != 0) || from > s.sessionIndex {
return nil, 0, 0, nil, nil
return nil, 0, 0, nil
}
if to == 0 || to > s.sessionIndex {
to = s.sessionIndex
Expand All @@ -500,7 +500,7 @@ type Server interface {
// Based on this index, live and history stream intervals
// will be adjusted before calling SetNextBatch.
SessionIndex() (uint64, error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error)
SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error)
GetData(context.Context, []byte) ([]byte, error)
Close()
}
Expand Down
37 changes: 2 additions & 35 deletions swarm/network/stream/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) {
return s.sessionIndex, nil
}

func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
return make([]byte, HashSize), from + 1, to + 1, nil, nil
func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) {
return make([]byte, HashSize), from + 1, to + 1, nil
}

func (self *testServer) GetData(context.Context, []byte) ([]byte, error) {
Expand Down Expand Up @@ -179,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -264,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand Down Expand Up @@ -330,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -441,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -454,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -514,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: corruptHashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -579,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
{
Code: 1,
Msg: &OfferedHashesMsg{
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: hashes,
From: 5,
To: 8,
Expand Down Expand Up @@ -670,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: NewStream("foo", "", false),
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 6,
To: 9,
Expand All @@ -683,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
From: 11,
To: 0,
Hashes: make([]byte, HashSize),
Expand Down Expand Up @@ -787,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down Expand Up @@ -891,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 0,
Expand Down
6 changes: 3 additions & 3 deletions swarm/network/stream/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) {
// chunk addresses. If at least one chunk is added to the batch and no new chunks
// are added in batchTimeout period, the batch will be returned. This function
// will block until new chunks are received from localstore pull subscription.
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) {
func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) {
batchStart := time.Now()
descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to)
defer stop()
Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
if err != nil {
metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1)
log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err)
return nil, 0, 0, nil, err
return nil, 0, 0, err
}
batchSize++
if batchStartID == nil {
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6
// if batch start id is not set, return 0
batchStartID = new(uint64)
}
return batch, *batchStartID, batchEndID, nil, nil
return batch, *batchStartID, batchEndID, nil
}

// SwarmSyncerClient
Expand Down

0 comments on commit e9f4d1a

Please sign in to comment.