Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
network/syncer: initial commit
Browse files Browse the repository at this point in the history
add handlers.go file, syncer stream maintainance logic

copy subscription diff functionality from old syncer

network/syncer: add stream req handler

network/syncer: add StreamInfoReq handler

network/syncer: create initial StreamInfo message exchange

remove peer
  • Loading branch information
acud committed Jun 20, 2019
1 parent 8afb316 commit 6815d14
Show file tree
Hide file tree
Showing 6 changed files with 714 additions and 0 deletions.
19 changes: 19 additions & 0 deletions network/syncer/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package syncer

//StreamInfoReq
153 changes: 153 additions & 0 deletions network/syncer/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package syncer

import (
"context"
"errors"
"fmt"

"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/network"
)

// ErrMaxPeerServers will be returned if peer server limit is reached.
// It will be sent in the SubscribeErrorMsg.
var ErrMaxPeerServers = errors.New("max peer servers")

// Peer is the Peer extension for the streaming protocol
type Peer struct {
*network.BzzPeer
streamCursors map[uint]uint //key: bin, value: session cursor
syncer *SwarmSyncer

quit chan struct{}
}

// NewPeer is the constructor for Peer
func NewPeer(peer *network.BzzPeer, s *SwarmSyncer) *Peer {
p := &Peer{
BzzPeer: peer,
streamCursors: make(map[uint]uint),
syncer: s,
quit: make(chan struct{}),
}
return p
}

func (p *Peer) Left() {
close(p.quit)
}

// HandleMsg is the message handler that delegates incoming messages
func (p *Peer) HandleMsg(ctx context.Context, msg interface{}) error {
switch msg := msg.(type) {
case *StreamInfoReq:
go p.handleStreamInfoReq(ctx, msg)
case *StreamInfoRes:
go p.handleStreamInfoRes(ctx, msg)

default:
return fmt.Errorf("unknown message type: %T", msg)
}
return nil
}

func (p *Peer) handleStreamInfoRes(ctx context.Context, msg *StreamInfoRes) {
log.Debug("handleStreamInfoRes", "msg", msg)
}
func (p *Peer) handleStreamInfoReq(ctx context.Context, msg *StreamInfoReq) {
log.Debug("handleStreamInfoReq", "msg", msg)
streamRes := StreamInfoRes{}

for _, v := range msg.Streams {
streamCursor, err := p.syncer.netStore.LastPullSubscriptionBinID(uint8(v))
if err != nil {
log.Error("error getting last bin id", "bin", v)
}
descriptor := StreamDescriptor{
Name: "SYNC",
Cursor: streamCursor,
Bounded: false,
}
streamRes.Streams = append(streamRes.Streams, descriptor)
}
if err := p.Send(ctx, streamRes); err != nil {
log.Error("failed to send StreamInfoRes to client", "requested bins", msg.Streams)
}
}

// syncSubscriptionsDiff calculates to which proximity order bins a peer
// (with po peerPO) needs to be subscribed after kademlia neighbourhood depth
// change from prevDepth to newDepth. Max argument limits the number of
// proximity order bins. Returned values are slices of integers which represent
// proximity order bins, the first one to which additional subscriptions need to
// be requested and the second one which subscriptions need to be quit. Argument
// prevDepth with value less then 0 represents no previous depth, used for
// initial syncing subscriptions.
func syncSubscriptionsDiff(peerPO, prevDepth, newDepth, max int) (subBins, quitBins []uint) {
newStart, newEnd := syncBins(peerPO, newDepth, max)
if prevDepth < 0 {
// no previous depth, return the complete range
// for subscriptions requests and nothing for quitting
return intRange(newStart, newEnd), nil
}

prevStart, prevEnd := syncBins(peerPO, prevDepth, max)

if newStart < prevStart {
subBins = append(subBins, intRange(newStart, prevStart)...)
}

if prevStart < newStart {
quitBins = append(quitBins, intRange(prevStart, newStart)...)
}

if newEnd < prevEnd {
quitBins = append(quitBins, intRange(newEnd, prevEnd)...)
}

if prevEnd < newEnd {
subBins = append(subBins, intRange(prevEnd, newEnd)...)
}

return subBins, quitBins
}

// syncBins returns the range to which proximity order bins syncing
// subscriptions need to be requested, based on peer proximity and
// kademlia neighbourhood depth. Returned range is [start,end), inclusive for
// start and exclusive for end.
func syncBins(peerPO, depth, max int) (start, end int) {
if peerPO < depth {
// subscribe only to peerPO bin if it is not
// in the nearest neighbourhood
return peerPO, peerPO + 1
}
// subscribe from depth to max bin if the peer
// is in the nearest neighbourhood
return depth, max + 1
}

// intRange returns the slice of integers [start,end). The start
// is inclusive and the end is not.
func intRange(start, end int) (r []uint) {
for i := start; i < end; i++ {
r = append(r, uint(i))
}
return r
}
116 changes: 116 additions & 0 deletions network/syncer/peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2019 The Swarm Authors
// This file is part of the Swarm library.
//
// The Swarm library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The Swarm library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the Swarm library. If not, see <http://www.gnu.org/licenses/>.

package syncer

import (
"fmt"
"testing"

"github.com/ethersphere/swarm/network"
)

// TestSyncSubscriptionsDiff validates the output of syncSubscriptionsDiff
// function for various arguments.
func TestSyncSubscriptionsDiff(t *testing.T) {
max := network.NewKadParams().MaxProxDisplay
for _, tc := range []struct {
po, prevDepth, newDepth int
subBins, quitBins []int
}{
{
po: 0, prevDepth: -1, newDepth: 0,
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 1, prevDepth: -1, newDepth: 0,
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 2, prevDepth: -1, newDepth: 0,
subBins: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 0, prevDepth: -1, newDepth: 1,
subBins: []int{0},
},
{
po: 1, prevDepth: -1, newDepth: 1,
subBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 2, prevDepth: -1, newDepth: 2,
subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 3, prevDepth: -1, newDepth: 2,
subBins: []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 1, prevDepth: -1, newDepth: 2,
subBins: []int{1},
},
{
po: 0, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16
},
{
po: 1, prevDepth: 0, newDepth: 0, // 0-16 -> 0-16
},
{
po: 0, prevDepth: 0, newDepth: 1, // 0-16 -> 0
quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 0, prevDepth: 0, newDepth: 2, // 0-16 -> 0
quitBins: []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 1, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16
quitBins: []int{0},
},
{
po: 1, prevDepth: 1, newDepth: 0, // 1-16 -> 0-16
subBins: []int{0},
},
{
po: 4, prevDepth: 0, newDepth: 1, // 0-16 -> 1-16
quitBins: []int{0},
},
{
po: 4, prevDepth: 0, newDepth: 4, // 0-16 -> 4-16
quitBins: []int{0, 1, 2, 3},
},
{
po: 4, prevDepth: 0, newDepth: 5, // 0-16 -> 4
quitBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 4, prevDepth: 5, newDepth: 0, // 4 -> 0-16
subBins: []int{0, 1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
},
{
po: 4, prevDepth: 5, newDepth: 6, // 4 -> 4
},
} {
subBins, quitBins := syncSubscriptionsDiff(tc.po, tc.prevDepth, tc.newDepth, max)
if fmt.Sprint(subBins) != fmt.Sprint(tc.subBins) {
t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got subBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, subBins, tc.subBins)
}
if fmt.Sprint(quitBins) != fmt.Sprint(tc.quitBins) {
t.Errorf("po: %v, prevDepth: %v, newDepth: %v: got quitBins %v, want %v", tc.po, tc.prevDepth, tc.newDepth, quitBins, tc.quitBins)
}
}
}
Loading

0 comments on commit 6815d14

Please sign in to comment.