Skip to content

Commit

Permalink
change providers map and lock over to an agent based approach for man…
Browse files Browse the repository at this point in the history
…aging providers
  • Loading branch information
whyrusleeping authored and jbenet committed Aug 21, 2014
1 parent 334ea87 commit 914f052
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 48 deletions.
60 changes: 12 additions & 48 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ type IpfsDHT struct {
datastore ds.Datastore
dslock sync.Mutex

// Map keys to peers that can provide their value
providers map[u.Key][]*providerInfo
providerLock sync.RWMutex
providers *ProviderManager

// Signal to shutdown dht
shutdown chan struct{}
Expand All @@ -59,7 +57,7 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT {
dht.network = net
dht.datastore = ds.NewMapDatastore()
dht.self = p
dht.providers = make(map[u.Key][]*providerInfo)
dht.providers = NewProviderManager()
dht.shutdown = make(chan struct{})

dht.routingTables = make([]*kb.RoutingTable, 3)
Expand Down Expand Up @@ -102,7 +100,6 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
func (dht *IpfsDHT) handleMessages() {
u.DOut("Begin message handling routine")

checkTimeouts := time.NewTicker(time.Minute * 5)
ch := dht.network.GetChan()
for {
select {
Expand Down Expand Up @@ -146,34 +143,18 @@ func (dht *IpfsDHT) handleMessages() {
dht.handlePing(mes.Peer, pmes)
case PBDHTMessage_DIAGNOSTIC:
dht.handleDiagnostic(mes.Peer, pmes)
default:
u.PErr("Recieved invalid message type")
}

case err := <-ch.Errors:
u.PErr("dht err: %s\n", err)
case <-dht.shutdown:
checkTimeouts.Stop()
return
case <-checkTimeouts.C:
// Time to collect some garbage!
dht.cleanExpiredProviders()
}
}
}

func (dht *IpfsDHT) cleanExpiredProviders() {
dht.providerLock.Lock()
for k, parr := range dht.providers {
var cleaned []*providerInfo
for _, v := range parr {
if time.Since(v.Creation) < time.Hour {
cleaned = append(cleaned, v)
}
}
dht.providers[k] = cleaned
}
dht.providerLock.Unlock()
}

func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error {
pmes := Message{
Type: PBDHTMessage_PUT_VALUE,
Expand Down Expand Up @@ -202,14 +183,10 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) {
resp.Value = iVal.([]byte)
} else if err == ds.ErrNotFound {
// Check if we know any providers for the requested value
dht.providerLock.RLock()
provs, ok := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
if ok && len(provs) > 0 {
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
for _, prov := range provs {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Peers = provs
resp.Success = true
} else {
// No providers?
Expand Down Expand Up @@ -313,9 +290,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
Response: true,
}

dht.providerLock.RLock()
providers := dht.providers[u.Key(pmes.GetKey())]
dht.providerLock.RUnlock()
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if providers == nil || len(providers) == 0 {
level := 0
if len(pmes.GetValue()) > 0 {
Expand All @@ -329,9 +304,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) {
resp.Peers = []*peer.Peer{closer}
}
} else {
for _, prov := range providers {
resp.Peers = append(resp.Peers, prov.Value)
}
resp.Peers = providers
resp.Success = true
}

Expand All @@ -345,9 +318,8 @@ type providerInfo struct {
}

func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) {
//TODO: need to implement TTLs on providers
key := u.Key(pmes.GetKey())
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)
}

// Halt stops all communications from this peer and shut down
Expand All @@ -356,14 +328,6 @@ func (dht *IpfsDHT) Halt() {
dht.network.Close()
}

func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
u.DOut("Adding %s as provider for '%s'\n", p.Key().Pretty(), key)
dht.providerLock.Lock()
provs := dht.providers[key]
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
dht.providerLock.Unlock()
}

// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) {
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
Expand Down Expand Up @@ -514,7 +478,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration,
u.DErr("getFromPeers error: %s\n", err)
continue
}
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)

// Make sure it was a successful get
if pmes.GetSuccess() && pmes.Value != nil {
Expand Down Expand Up @@ -656,7 +620,7 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer
continue
}
}
dht.addProviderEntry(key, p)
dht.providers.AddProvider(key, p)
provArr = append(provArr, p)
}
return provArr
Expand Down
83 changes: 83 additions & 0 deletions providers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package dht

import (
"time"

u "github.com/jbenet/go-ipfs/util"
peer "github.com/jbenet/go-ipfs/peer"
)

type ProviderManager struct {
providers map[u.Key][]*providerInfo
newprovs chan *addProv
getprovs chan *getProv
halt chan struct{}
}

type addProv struct {
k u.Key
val *peer.Peer
}

type getProv struct {
k u.Key
resp chan []*peer.Peer
}

func NewProviderManager() *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[u.Key][]*providerInfo)
pm.halt = make(chan struct{})
go pm.run()
return pm
}

func (pm *ProviderManager) run() {
tick := time.NewTicker(time.Hour)
for {
select {
case np := <-pm.newprovs:
pi := new(providerInfo)
pi.Creation = time.Now()
pi.Value = np.val
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)
case gp := <-pm.getprovs:
var parr []*peer.Peer
provs := pm.providers[gp.k]
for _, p := range provs {
parr = append(parr, p.Value)
}
gp.resp <- parr
case <-tick.C:
for k, provs := range pm.providers {
var filtered []*providerInfo
for _, p := range provs {
if time.Now().Sub(p.Creation) < time.Hour * 24 {
filtered = append(filtered, p)
}
}
pm.providers[k] = filtered
}
case <-pm.halt:
return
}
}
}

func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) {
pm.newprovs <- &addProv{
k: k,
val: val,
}
}

func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []*peer.Peer)
pm.getprovs <- gp
return <-gp.resp
}

0 comments on commit 914f052

Please sign in to comment.