From 914f052d44b8d0f8dc4fd5ce82ac27cb23942157 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 18 Aug 2014 20:38:44 -0700 Subject: [PATCH] change providers map and lock over to an agent based approach for managing providers --- dht.go | 60 ++++++++----------------------------- providers.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 48 deletions(-) create mode 100644 providers.go diff --git a/dht.go b/dht.go index e548a272d..4c6da064b 100644 --- a/dht.go +++ b/dht.go @@ -36,9 +36,7 @@ type IpfsDHT struct { datastore ds.Datastore dslock sync.Mutex - // Map keys to peers that can provide their value - providers map[u.Key][]*providerInfo - providerLock sync.RWMutex + providers *ProviderManager // Signal to shutdown dht shutdown chan struct{} @@ -59,7 +57,7 @@ func NewDHT(p *peer.Peer, net swarm.Network) *IpfsDHT { dht.network = net dht.datastore = ds.NewMapDatastore() dht.self = p - dht.providers = make(map[u.Key][]*providerInfo) + dht.providers = NewProviderManager() dht.shutdown = make(chan struct{}) dht.routingTables = make([]*kb.RoutingTable, 3) @@ -102,7 +100,6 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) { func (dht *IpfsDHT) handleMessages() { u.DOut("Begin message handling routine") - checkTimeouts := time.NewTicker(time.Minute * 5) ch := dht.network.GetChan() for { select { @@ -146,34 +143,18 @@ func (dht *IpfsDHT) handleMessages() { dht.handlePing(mes.Peer, pmes) case PBDHTMessage_DIAGNOSTIC: dht.handleDiagnostic(mes.Peer, pmes) + default: + u.PErr("Recieved invalid message type") } case err := <-ch.Errors: u.PErr("dht err: %s\n", err) case <-dht.shutdown: - checkTimeouts.Stop() return - case <-checkTimeouts.C: - // Time to collect some garbage! - dht.cleanExpiredProviders() } } } -func (dht *IpfsDHT) cleanExpiredProviders() { - dht.providerLock.Lock() - for k, parr := range dht.providers { - var cleaned []*providerInfo - for _, v := range parr { - if time.Since(v.Creation) < time.Hour { - cleaned = append(cleaned, v) - } - } - dht.providers[k] = cleaned - } - dht.providerLock.Unlock() -} - func (dht *IpfsDHT) putValueToNetwork(p *peer.Peer, key string, value []byte) error { pmes := Message{ Type: PBDHTMessage_PUT_VALUE, @@ -202,14 +183,10 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { resp.Value = iVal.([]byte) } else if err == ds.ErrNotFound { // Check if we know any providers for the requested value - dht.providerLock.RLock() - provs, ok := dht.providers[u.Key(pmes.GetKey())] - dht.providerLock.RUnlock() - if ok && len(provs) > 0 { + provs := dht.providers.GetProviders(u.Key(pmes.GetKey())) + if len(provs) > 0 { u.DOut("handleGetValue returning %d provider[s]\n", len(provs)) - for _, prov := range provs { - resp.Peers = append(resp.Peers, prov.Value) - } + resp.Peers = provs resp.Success = true } else { // No providers? @@ -313,9 +290,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { Response: true, } - dht.providerLock.RLock() - providers := dht.providers[u.Key(pmes.GetKey())] - dht.providerLock.RUnlock() + providers := dht.providers.GetProviders(u.Key(pmes.GetKey())) if providers == nil || len(providers) == 0 { level := 0 if len(pmes.GetValue()) > 0 { @@ -329,9 +304,7 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *PBDHTMessage) { resp.Peers = []*peer.Peer{closer} } } else { - for _, prov := range providers { - resp.Peers = append(resp.Peers, prov.Value) - } + resp.Peers = providers resp.Success = true } @@ -345,9 +318,8 @@ type providerInfo struct { } func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *PBDHTMessage) { - //TODO: need to implement TTLs on providers key := u.Key(pmes.GetKey()) - dht.addProviderEntry(key, p) + dht.providers.AddProvider(key, p) } // Halt stops all communications from this peer and shut down @@ -356,14 +328,6 @@ func (dht *IpfsDHT) Halt() { dht.network.Close() } -func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) { - u.DOut("Adding %s as provider for '%s'\n", p.Key().Pretty(), key) - dht.providerLock.Lock() - provs := dht.providers[key] - dht.providers[key] = append(provs, &providerInfo{time.Now(), p}) - dht.providerLock.Unlock() -} - // NOTE: not yet finished, low priority func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) @@ -514,7 +478,7 @@ func (dht *IpfsDHT) getFromPeerList(key u.Key, timeout time.Duration, u.DErr("getFromPeers error: %s\n", err) continue } - dht.addProviderEntry(key, p) + dht.providers.AddProvider(key, p) // Make sure it was a successful get if pmes.GetSuccess() && pmes.Value != nil { @@ -656,7 +620,7 @@ func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer continue } } - dht.addProviderEntry(key, p) + dht.providers.AddProvider(key, p) provArr = append(provArr, p) } return provArr diff --git a/providers.go b/providers.go new file mode 100644 index 000000000..3dc3b7b05 --- /dev/null +++ b/providers.go @@ -0,0 +1,83 @@ +package dht + +import ( + "time" + + u "github.com/jbenet/go-ipfs/util" + peer "github.com/jbenet/go-ipfs/peer" +) + +type ProviderManager struct { + providers map[u.Key][]*providerInfo + newprovs chan *addProv + getprovs chan *getProv + halt chan struct{} +} + +type addProv struct { + k u.Key + val *peer.Peer +} + +type getProv struct { + k u.Key + resp chan []*peer.Peer +} + +func NewProviderManager() *ProviderManager { + pm := new(ProviderManager) + pm.getprovs = make(chan *getProv) + pm.newprovs = make(chan *addProv) + pm.providers = make(map[u.Key][]*providerInfo) + pm.halt = make(chan struct{}) + go pm.run() + return pm +} + +func (pm *ProviderManager) run() { + tick := time.NewTicker(time.Hour) + for { + select { + case np := <-pm.newprovs: + pi := new(providerInfo) + pi.Creation = time.Now() + pi.Value = np.val + arr := pm.providers[np.k] + pm.providers[np.k] = append(arr, pi) + case gp := <-pm.getprovs: + var parr []*peer.Peer + provs := pm.providers[gp.k] + for _, p := range provs { + parr = append(parr, p.Value) + } + gp.resp <- parr + case <-tick.C: + for k, provs := range pm.providers { + var filtered []*providerInfo + for _, p := range provs { + if time.Now().Sub(p.Creation) < time.Hour * 24 { + filtered = append(filtered, p) + } + } + pm.providers[k] = filtered + } + case <-pm.halt: + return + } + } +} + +func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) { + pm.newprovs <- &addProv{ + k: k, + val: val, + } +} + +func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer { + gp := new(getProv) + gp.k = k + gp.resp = make(chan []*peer.Peer) + pm.getprovs <- gp + return <-gp.resp +}