From e9f4d1aa60eb0289a871d004caa5399cc8434d52 Mon Sep 17 00:00:00 2001 From: Anton Evangelatov Date: Fri, 31 May 2019 12:39:34 +0200 Subject: [PATCH] swarm/network/stream: remove HandoverProof --- swarm/network/stream/delivery_test.go | 7 +++-- swarm/network/stream/intervals_test.go | 4 +-- swarm/network/stream/messages.go | 13 +++------ swarm/network/stream/peer.go | 16 ++++------- swarm/network/stream/stream.go | 6 ++--- swarm/network/stream/streamer_test.go | 37 ++------------------------ swarm/network/stream/syncer.go | 6 ++--- 7 files changed, 21 insertions(+), 68 deletions(-) diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index fc0f9d5dfc2a..32c6109abf1a 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -72,10 +72,9 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) { { //to which the peer responds with offered hashes Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: nil, - Hashes: nil, - From: 0, - To: 0, + Hashes: nil, + From: 0, + To: 0, }, Peer: node.ID(), }, diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index 02f012b2aead..421fb31bbfe3 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -339,7 +339,7 @@ func (s *testExternalServer) SessionIndex() (uint64, error) { return s.sessionAt, nil } -func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) { if to > s.maxKeys { to = s.maxKeys } @@ -347,7 +347,7 @@ func (s *testExternalServer) SetNextBatch(from uint64, to uint64) ([]byte, uint6 for i := from; i <= to; i++ { s.keyFunc(b[(i-from)*HashSize:(i-from+1)*HashSize], i) } - return b, from, to, nil, nil + return b, from, to, nil } func (s *testExternalServer) GetData(context.Context, []byte) ([]byte, error) { diff --git a/swarm/network/stream/messages.go b/swarm/network/stream/messages.go index 830bcae247ec..3ed4ce40b78a 100644 --- a/swarm/network/stream/messages.go +++ b/swarm/network/stream/messages.go @@ -183,10 +183,9 @@ func (p *Peer) handleQuitMsg(req *QuitMsg) error { // OfferedHashesMsg is the protocol msg for offering to hand over a // stream section type OfferedHashesMsg struct { - Stream Stream // name of Stream - From, To uint64 // peer and db-specific entry count - Hashes []byte // stream of hashes (128) - *HandoverProof // HandoverProof + Stream Stream // name of Stream + From, To uint64 // peer and db-specific entry count + Hashes []byte // stream of hashes (128) } // String pretty prints OfferedHashesMsg @@ -385,12 +384,6 @@ type Handover struct { Root []byte // Root hash for indexed segment inclusion proofs } -// HandoverProof represents a signed statement that the upstream peer handed over the stream section -type HandoverProof struct { - Sig []byte // Sign(Hash(Serialisation(Handover))) - *Handover -} - // Takeover represents a statement that downstream peer took over (stored all data) // handed over type Takeover Handover diff --git a/swarm/network/stream/peer.go b/swarm/network/stream/peer.go index 28fd06e4d510..900ee453358b 100644 --- a/swarm/network/stream/peer.go +++ b/swarm/network/stream/peer.go @@ -183,7 +183,7 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { defer metrics.GetOrRegisterResettingTimer("send.offered.hashes", nil).UpdateSince(time.Now()) - hashes, from, to, proof, err := s.setNextBatch(f, t) + hashes, from, to, err := s.setNextBatch(f, t) if err != nil { return err } @@ -191,18 +191,12 @@ func (p *Peer) SendOfferedHashes(s *server, f, t uint64) error { if len(hashes) == 0 { return nil } - if proof == nil { - proof = &HandoverProof{ - Handover: &Handover{}, - } - } s.currentBatch = hashes msg := &OfferedHashesMsg{ - HandoverProof: proof, - Hashes: hashes, - From: from, - To: to, - Stream: s.stream, + Hashes: hashes, + From: from, + To: to, + Stream: s.stream, } log.Trace("Swarm syncer offer batch", "peer", p.ID(), "stream", s.stream, "len", len(hashes), "from", from, "to", to) ctx = context.WithValue(ctx, "stream_send_tag", "send.offered.hashes") diff --git a/swarm/network/stream/stream.go b/swarm/network/stream/stream.go index ee461b7b2776..38e4130465ea 100644 --- a/swarm/network/stream/stream.go +++ b/swarm/network/stream/stream.go @@ -474,7 +474,7 @@ type server struct { // setNextBatch adjusts passed interval based on session index and whether // stream is live or history. It calls Server SetNextBatch with adjusted // interval and returns batch hashes and their interval. -func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, error) { if s.stream.Live { if from == 0 { from = s.sessionIndex @@ -484,7 +484,7 @@ func (s *server) setNextBatch(from, to uint64) ([]byte, uint64, uint64, *Handove } } else { if (to < from && to != 0) || from > s.sessionIndex { - return nil, 0, 0, nil, nil + return nil, 0, 0, nil } if to == 0 || to > s.sessionIndex { to = s.sessionIndex @@ -500,7 +500,7 @@ type Server interface { // Based on this index, live and history stream intervals // will be adjusted before calling SetNextBatch. SessionIndex() (uint64, error) - SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, proof *HandoverProof, err error) + SetNextBatch(uint64, uint64) (hashes []byte, from uint64, to uint64, err error) GetData(context.Context, []byte) ([]byte, error) Close() } diff --git a/swarm/network/stream/streamer_test.go b/swarm/network/stream/streamer_test.go index 9602bc6da29a..b9b7f6e8df07 100644 --- a/swarm/network/stream/streamer_test.go +++ b/swarm/network/stream/streamer_test.go @@ -127,8 +127,8 @@ func (s *testServer) SessionIndex() (uint64, error) { return s.sessionIndex, nil } -func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { - return make([]byte, HashSize), from + 1, to + 1, nil, nil +func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, error) { + return make([]byte, HashSize), from + 1, to + 1, nil } func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { @@ -179,9 +179,6 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: hashes, From: 5, To: 8, @@ -264,9 +261,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -330,9 +324,6 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, @@ -441,9 +432,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -454,9 +442,6 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, From: 11, To: 0, Hashes: make([]byte, HashSize), @@ -514,9 +499,6 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: corruptHashes, From: 5, To: 8, @@ -579,9 +561,6 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { { Code: 1, Msg: &OfferedHashesMsg{ - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: hashes, From: 5, To: 8, @@ -670,9 +649,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 6, To: 9, @@ -683,9 +659,6 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, From: 11, To: 0, Hashes: make([]byte, HashSize), @@ -787,9 +760,6 @@ func TestMaxPeerServersWithUnsubscribe(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, @@ -891,9 +861,6 @@ func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, - HandoverProof: &HandoverProof{ - Handover: &Handover{}, - }, Hashes: make([]byte, HashSize), From: 1, To: 0, diff --git a/swarm/network/stream/syncer.go b/swarm/network/stream/syncer.go index e8359ef70ed8..2b29ee358f80 100644 --- a/swarm/network/stream/syncer.go +++ b/swarm/network/stream/syncer.go @@ -92,7 +92,7 @@ func (s *SwarmSyncerServer) SessionIndex() (uint64, error) { // chunk addresses. If at least one chunk is added to the batch and no new chunks // are added in batchTimeout period, the batch will be returned. This function // will block until new chunks are received from localstore pull subscription. -func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { +func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint64, error) { batchStart := time.Now() descriptors, stop := s.netStore.SubscribePull(context.Background(), s.po, from, to) defer stop() @@ -131,7 +131,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 if err != nil { metrics.GetOrRegisterCounter("syncer.set-next-batch.set-sync-err", nil).Inc(1) log.Debug("syncer pull subscription - err setting chunk as synced", "correlateId", s.correlateId, "err", err) - return nil, 0, 0, nil, err + return nil, 0, 0, err } batchSize++ if batchStartID == nil { @@ -171,7 +171,7 @@ func (s *SwarmSyncerServer) SetNextBatch(from, to uint64) ([]byte, uint64, uint6 // if batch start id is not set, return 0 batchStartID = new(uint64) } - return batch, *batchStartID, batchEndID, nil, nil + return batch, *batchStartID, batchEndID, nil } // SwarmSyncerClient