Skip to content

Commit

Permalink
implement kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed May 3, 2019
1 parent 9e1a65c commit 317a78e
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 119 deletions.
50 changes: 24 additions & 26 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kb "github.com/libp2p/go-libp2p-kbucket"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
notif "github.com/libp2p/go-libp2p-routing/notifications"
)

Expand Down Expand Up @@ -62,10 +63,30 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee

out := make(chan peer.ID, KValue)

query := dht.newClosestPeersQuery(ctx, key, nil)

go func() {
defer close(out)
defer e.Done()
// run it!
res, err := query.Run(ctx, tablepeers)
if err != nil {
logger.Debugf("closestPeers query run error: %s", err)
}

for _, p := range res {
out <- p
}
}()

return out, nil
}

func (dht *IpfsDHT) newClosestPeersQuery(ctx context.Context, key string, finish finishFunc) *dhtQuery {
// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
parent := ctx
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
return dht.newQuery(key, func(ctx context.Context, p peer.ID) ([]*pstore.PeerInfo, error) {
// For DHT query command
notif.PublishQueryEvent(parent, &notif.QueryEvent{
Type: notif.SendingQuery,
Expand All @@ -86,29 +107,6 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
Responses: peers,
})

return &dhtQueryResult{closerPeers: peers}, nil
})

go func() {
defer close(out)
defer e.Done()
// run it!
res, err := query.Run(ctx, tablepeers)
if err != nil {
logger.Debugf("closestPeers query run error: %s", err)
}

if len(res) > 0 {
sorted := kb.SortClosestPeers(res, kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
}

for _, p := range sorted {
out <- p
}
}
}()

return out, nil
return peers, nil
}, finish)
}
177 changes: 138 additions & 39 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,35 @@ var maxQueryConcurrency = AlphaValue

type dhtQuery struct {
dht *IpfsDHT
key string // the key we're querying for
qfunc queryFunc // the function to execute per peer
concurrency int // the concurrency parameter
key string // the key we're querying for
rfunc recurseFunc // the function to execute per peer when recursing
ffunc finishFunc // the function to execute per peer when finishing
concurrency int // the concurrency parameter
}

type dhtQueryResult struct {
closerPeers []*pstore.PeerInfo // *
type dhtQueryFinishStep struct {
query *dhtQuery
seen, queried, failed []peer.ID
}

// constructs query
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
func (dht *IpfsDHT) newQuery(k string, recurse recurseFunc, finish finishFunc) *dhtQuery {
return &dhtQuery{
key: k,
dht: dht,
qfunc: f,
rfunc: recurse,
ffunc: finish,
concurrency: maxQueryConcurrency,
}
}

// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
// recurseFunc is a function that runs a particular query with a given peer (when recursing). It
// returns the set of peers to query next.
type recurseFunc func(context.Context, peer.ID) ([]*pstore.PeerInfo, error)

// finishFunc is a function that runs a particular query with a given peer (when finishing).
type finishFunc func(context.Context, peer.ID) error

// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) ([]peer.ID, error) {
if len(peers) == 0 {
logger.Warning("Running query with no peers!")
Expand All @@ -66,17 +68,15 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) ([]peer.ID, error)
default:
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

runner := newQueryRunner(q)
return runner.Run(ctx, peers)
return newQueryRunner(q).run(ctx, peers)
}

type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
kPeers *kpeerset.KPeerSet // k best peers queried.
peersSeen *pset.PeerSet // all peers seen.
peersQueried *pset.PeerSet // all peers successfully queried.
peersFailed *pset.PeerSet // all peers not successfully queried.
aPeers *kpeerset.KPeerSet // k best peers queried.
peersDialed *dialQueue // peers we have dialed to
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
Expand All @@ -98,7 +98,9 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
query: q,
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
kPeers: kpeerset.New(KValue, q.key),
peersQueried: pset.New(),
peersFailed: pset.New(),
aPeers: kpeerset.New(AlphaValue, q.key),
rateLimit: make(chan struct{}, q.concurrency),
peersToQuery: peersToQuery,
proc: proc,
Expand All @@ -117,10 +119,21 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
return r
}

func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) ([]peer.ID, error) {
func (r *dhtQueryRunner) run(ctx context.Context, peers []peer.ID) ([]peer.ID, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

r.log = logger
r.runCtx = ctx

err := r.recurse(ctx, peers)
if err != nil {
return nil, err
}
return r.finish(ctx)
}

func (r *dhtQueryRunner) recurse(ctx context.Context, peers []peer.ID) error {
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
Expand Down Expand Up @@ -149,15 +162,15 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) ([]peer.ID, e
var err error
select {
case <-r.peersRemaining.Done():
// Cleanup workers.
r.proc.Close()
if r.kPeers.Len() == 0 {
if r.peersQueried.Size() == 0 {
err = ErrNoPeersQueried
}
case <-r.proc.Closed():
err = r.runCtx.Err()
}

return r.kPeers.Peers(), err
return err
}

func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
Expand All @@ -171,15 +184,15 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
return
}

if !r.kPeers.Check(next) {
return
}

notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})

if !r.aPeers.Check(next) {
return
}

r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
Expand Down Expand Up @@ -219,7 +232,7 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
}

func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
if !r.kPeers.Check(p) {
if !r.aPeers.Check(p) {
// Don't bother with this peer. We'll skip it in the query phase as well.
return nil
}
Expand All @@ -244,6 +257,7 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
ID: p,
})

r.peersFailed.Add(p)
// This peer is dropping out of the race.
r.peersRemaining.Decrement(1)
return err
Expand All @@ -265,25 +279,27 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
r.rateLimit <- struct{}{}
}()

if !r.kPeers.Check(p) {
if !r.aPeers.Check(p) {
// Don't bother with this peer.
return
}

r.peersQueried.Add(p)

// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)
closerPeers, err := r.query.rfunc(ctx, p)

if err == nil {
// Make sure we only return DHT peers that actually respond to
// the query.
r.kPeers.Add(p)
r.aPeers.Add(p)
} else {
r.peersFailed.Add(p)
}

if err != nil {
logger.Debugf("ERROR worker for: %v %v", p, err)
} else if len(res.closerPeers) > 0 {
logger.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
for _, next := range res.closerPeers {
} else if len(closerPeers) > 0 {
logger.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(closerPeers))
for _, next := range closerPeers {
if next.ID == r.query.dht.self { // don't add self.
logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
Expand All @@ -298,3 +314,86 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
}
}

func (r *dhtQueryRunner) finish(ctx context.Context) ([]peer.ID, error) {

// Get a sorted list of peers to query.
seen := r.peersSeen.Peers()
closest := make([]peer.ID, 0, len(seen))
for _, p := range seen {
if r.peersFailed.Contains(p) {
continue
}
closest = append(closest, p)
}
closest = kb.SortClosestPeers(closest, kb.ConvertKey(r.query.key))

// Query them.
bucket := make([]peer.ID, 0, KValue)
workQ := make(chan peer.ID)
resultQ := make(chan peer.ID, KValue)

requery := true
qfunc := r.query.ffunc
if qfunc == nil {
requery = false
qfunc = func(ctx context.Context, p peer.ID) error {
_, err := r.query.rfunc(ctx, p)
return err
}
}

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg.Add(KValue)
for i := 0; i < KValue; i++ {
go func() {
defer wg.Done()
for p := range workQ {
if qfunc(ctx, p) == nil {
resultQ <- p
return
}
}
}()
}

go func() {
wg.Wait()
close(resultQ)
}()

// No need to handle the context, assuming the _user_ does in rfunc.

for len(bucket) < KValue && len(closest) > 0 {
if !requery && r.peersQueried.Contains(closest[0]) {
// no need to re-query this peer as we've already sent
// them this query.
bucket = append(bucket, closest[0])
closest = closest[1:]
continue
}

select {
case workQ <- closest[0]:
closest = closest[1:]
case successPeer, ok := <-resultQ:
if !ok {
return bucket, ctx.Err()
}
bucket = append(bucket, successPeer)
}
}

close(workQ)

for p := range resultQ {
bucket = append(bucket, p)
}

return bucket, ctx.Err()
}
Loading

0 comments on commit 317a78e

Please sign in to comment.