From 9961608ef0369efff20a82a98232f162b20f6f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Sep 2018 14:04:52 +0100 Subject: [PATCH 1/3] datastore-backed impls of KeyBook and PeerMetadata. --- p2p/host/peerstore/interface.go | 7 ++ p2p/host/peerstore/pstoreds/addr_book.go | 55 +++------- p2p/host/peerstore/pstoreds/ds_test.go | 18 +++- p2p/host/peerstore/pstoreds/keybook.go | 128 +++++++++++++++++++++++ p2p/host/peerstore/pstoreds/metadata.go | 65 ++++++++++++ p2p/host/peerstore/pstoreds/peerstore.go | 58 +++++++++- p2p/host/peerstore/pstoremem/metadata.go | 3 +- 7 files changed, 288 insertions(+), 46 deletions(-) create mode 100644 p2p/host/peerstore/pstoreds/keybook.go create mode 100644 p2p/host/peerstore/pstoreds/metadata.go diff --git a/p2p/host/peerstore/interface.go b/p2p/host/peerstore/interface.go index 068211c846..25d2c08578 100644 --- a/p2p/host/peerstore/interface.go +++ b/p2p/host/peerstore/interface.go @@ -68,6 +68,13 @@ type Peerstore interface { Peers() peer.IDSlice } +// PeerMetadata can handle values of any type. Serializing values is +// up to the implementation. Dynamic type introspection may not be +// supported, in which case explicitly enlisting types in the +// serializer may be required. +// +// Refer to the docs of the underlying implementation for more +// information. type PeerMetadata interface { // Get/Put is a simple registry for other peer-related key/value pairs. // if we find something we use often, it should become its own set of diff --git a/p2p/host/peerstore/pstoreds/addr_book.go b/p2p/host/peerstore/pstoreds/addr_book.go index df486217cc..ad37529a5c 100644 --- a/p2p/host/peerstore/pstoreds/addr_book.go +++ b/p2p/host/peerstore/pstoreds/addr_book.go @@ -7,13 +7,15 @@ import ( "time" lru "github.com/hashicorp/golang-lru" + ds "github.com/ipfs/go-datastore" query "github.com/ipfs/go-datastore/query" logging "github.com/ipfs/go-log" - peer "github.com/libp2p/go-libp2p-peer" + ma "github.com/multiformats/go-multiaddr" mh "github.com/multiformats/go-multihash" + peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) @@ -28,6 +30,10 @@ var ( ErrTTLDatastore = errors.New("datastore must provide TTL support") ) +// Peer addresses are stored under the following db key pattern: +// /peers/addr// +var abBase = ds.NewKey("/peers/addrs") + var _ pstore.AddrBook = (*dsAddrBook)(nil) // dsAddrBook is an address book backed by a Datastore with both an @@ -101,7 +107,7 @@ func keysAndAddrs(p peer.ID, addrs []ma.Multiaddr) ([]ds.Key, []ma.Multiaddr, er var ( keys = make([]ds.Key, len(addrs)) clean = make([]ma.Multiaddr, len(addrs)) - parentKey = ds.NewKey(peer.IDB58Encode(p)) + parentKey = abBase.ChildString(peer.IDB58Encode(p)) i = 0 ) @@ -287,7 +293,7 @@ func (mgr *dsAddrBook) UpdateAddrs(p peer.ID, oldTTL time.Duration, newTTL time. func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time.Duration) error { var ( - prefix = ds.NewKey(p.Pretty()) + prefix = abBase.ChildString(peer.IDB58Encode(p)) q = query.Query{Prefix: prefix.String(), KeysOnly: false} results query.Results err error @@ -330,7 +336,7 @@ func (mgr *dsAddrBook) dbUpdateTTL(p peer.ID, oldTTL time.Duration, newTTL time. // Addrs returns all of the non-expired addresses for a given peer. func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { var ( - prefix = ds.NewKey(p.Pretty()) + prefix = abBase.ChildString(peer.IDB58Encode(p)) q = query.Query{Prefix: prefix.String(), KeysOnly: false, ReturnExpirations: true} results query.Results err error @@ -385,42 +391,11 @@ func (mgr *dsAddrBook) Addrs(p peer.ID) []ma.Multiaddr { // Peers returns all of the peer IDs for which the AddrBook has addresses. func (mgr *dsAddrBook) PeersWithAddrs() peer.IDSlice { - var ( - q = query.Query{KeysOnly: true} - results query.Results - err error - ) - - txn, err := mgr.ds.NewTransaction(true) + ids, err := uniquePeerIds(mgr.ds, abBase, func(result query.Result) string { + return ds.RawKey(result.Key).Parent().Name() + }) if err != nil { - log.Error(err) - return peer.IDSlice{} - } - defer txn.Discard() - - if results, err = txn.Query(q); err != nil { - log.Error(err) - return peer.IDSlice{} - } - - defer results.Close() - - idset := make(map[string]struct{}) - for result := range results.Next() { - key := ds.RawKey(result.Key) - idset[key.Parent().Name()] = struct{}{} - } - - if len(idset) == 0 { - return peer.IDSlice{} - } - - ids := make(peer.IDSlice, len(idset)) - i := 0 - for id := range idset { - pid, _ := peer.IDB58Decode(id) - ids[i] = pid - i++ + log.Errorf("error while retrieving peers with addresses: %v", err) } return ids } @@ -436,7 +411,7 @@ func (mgr *dsAddrBook) AddrStream(ctx context.Context, p peer.ID) <-chan ma.Mult func (mgr *dsAddrBook) ClearAddrs(p peer.ID) { var ( err error - prefix = ds.NewKey(p.Pretty()) + prefix = abBase.ChildString(peer.IDB58Encode(p)) deleteFn func() error ) diff --git a/p2p/host/peerstore/pstoreds/ds_test.go b/p2p/host/peerstore/pstoreds/ds_test.go index 253fd8b5d3..2eb3f995d4 100644 --- a/p2p/host/peerstore/pstoreds/ds_test.go +++ b/p2p/host/peerstore/pstoreds/ds_test.go @@ -143,6 +143,10 @@ func TestBadgerDsAddrBook(t *testing.T) { }) } +func TestBadgerDsKeyBook(t *testing.T) { + pt.TestKeyBook(t, keyBookFactory(t, DefaultOpts())) +} + func BenchmarkBadgerDsPeerstore(b *testing.B) { caching := DefaultOpts() caching.CacheSize = 1024 @@ -159,15 +163,15 @@ func badgerStore(t testing.TB) (ds.TxnDatastore, func()) { if err != nil { t.Fatal(err) } - ds, err := badger.NewDatastore(dataPath, nil) + store, err := badger.NewDatastore(dataPath, nil) if err != nil { t.Fatal(err) } closer := func() { - ds.Close() + store.Close() os.RemoveAll(dataPath) } - return ds, closer + return store, closer } func peerstoreFactory(tb testing.TB, opts Options) pt.PeerstoreFactory { @@ -195,3 +199,11 @@ func addressBookFactory(tb testing.TB, opts Options) pt.AddrBookFactory { return ab, closeFunc } } + +func keyBookFactory(tb testing.TB, opts Options) pt.KeyBookFactory { + return func() (pstore.KeyBook, func()) { + store, closeFunc := badgerStore(tb) + kb, _ := NewKeyBook(context.Background(), store, opts) + return kb, closeFunc + } +} diff --git a/p2p/host/peerstore/pstoreds/keybook.go b/p2p/host/peerstore/pstoreds/keybook.go new file mode 100644 index 0000000000..4801e6b338 --- /dev/null +++ b/p2p/host/peerstore/pstoreds/keybook.go @@ -0,0 +1,128 @@ +package pstoreds + +import ( + "context" + "errors" + + ds "github.com/ipfs/go-datastore" + query "github.com/ipfs/go-datastore/query" + + ic "github.com/libp2p/go-libp2p-crypto" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +// Public and private keys are stored under the following db key pattern: +// /peers/keys//{pub, priv} +var ( + kbBase = ds.NewKey("/peers/keys") + pubSuffix = ds.NewKey("/pub") + privSuffix = ds.NewKey("/priv") +) + +type dsKeyBook struct { + ds ds.TxnDatastore +} + +var _ pstore.KeyBook = (*dsKeyBook)(nil) + +func NewKeyBook(_ context.Context, store ds.TxnDatastore, _ Options) (pstore.KeyBook, error) { + return &dsKeyBook{store}, nil +} + +func (kb *dsKeyBook) PubKey(p peer.ID) ic.PubKey { + key := kbBase.ChildString(peer.IDB58Encode(p)).Child(pubSuffix) + + var pk ic.PubKey + if value, err := kb.ds.Get(key); err == nil { + pk, err = ic.UnmarshalPublicKey(value) + if err != nil { + log.Errorf("error when unmarshalling pubkey from datastore for peer %s: %s\n", p.Pretty(), err) + } + } else if err == ds.ErrNotFound { + pk, err = p.ExtractPublicKey() + if err != nil { + log.Errorf("error when extracting pubkey from peer ID for peer %s: %s\n", p.Pretty(), err) + return nil + } + pkb, err := pk.Bytes() + if err != nil { + log.Errorf("error when turning extracted pubkey into bytes for peer %s: %s\n", p.Pretty(), err) + return nil + } + err = kb.ds.Put(key, pkb) + if err != nil { + log.Errorf("error when adding extracted pubkey to peerstore for peer %s: %s\n", p.Pretty(), err) + return nil + } + } else { + log.Errorf("error when fetching pubkey from datastore for peer %s: %s\n", p.Pretty(), err) + } + + return pk +} + +func (kb *dsKeyBook) AddPubKey(p peer.ID, pk ic.PubKey) error { + // check it's correct. + if !p.MatchesPublicKey(pk) { + return errors.New("peer ID does not match public key") + } + + key := kbBase.ChildString(peer.IDB58Encode(p)).Child(pubSuffix) + val, err := pk.Bytes() + if err != nil { + log.Errorf("error while converting pubkey byte string for peer %s: %s\n", p.Pretty(), err) + return err + } + err = kb.ds.Put(key, val) + if err != nil { + log.Errorf("error while updating pubkey in datastore for peer %s: %s\n", p.Pretty(), err) + } + return err +} + +func (kb *dsKeyBook) PrivKey(p peer.ID) ic.PrivKey { + key := kbBase.ChildString(peer.IDB58Encode(p)).Child(privSuffix) + value, err := kb.ds.Get(key) + if err != nil { + log.Errorf("error while fetching privkey from datastore for peer %s: %s\n", p.Pretty(), err) + return nil + } + sk, err := ic.UnmarshalPrivateKey(value) + if err != nil { + return nil + } + return sk +} + +func (kb *dsKeyBook) AddPrivKey(p peer.ID, sk ic.PrivKey) error { + if sk == nil { + return errors.New("private key is nil") + } + // check it's correct. + if !p.MatchesPrivateKey(sk) { + return errors.New("peer ID does not match private key") + } + + key := kbBase.ChildString(peer.IDB58Encode(p)).Child(privSuffix) + val, err := sk.Bytes() + if err != nil { + log.Errorf("error while converting privkey byte string for peer %s: %s\n", p.Pretty(), err) + return err + } + err = kb.ds.Put(key, val) + if err != nil { + log.Errorf("error while updating privkey in datastore for peer %s: %s\n", p.Pretty(), err) + } + return err +} + +func (kb *dsKeyBook) PeersWithKeys() peer.IDSlice { + ids, err := uniquePeerIds(kb.ds, kbBase, func(result query.Result) string { + return ds.RawKey(result.Key).Parent().Name() + }) + if err != nil { + log.Errorf("error while retrieving peers with keys: %v", err) + } + return ids +} diff --git a/p2p/host/peerstore/pstoreds/metadata.go b/p2p/host/peerstore/pstoreds/metadata.go new file mode 100644 index 0000000000..de21434d6e --- /dev/null +++ b/p2p/host/peerstore/pstoreds/metadata.go @@ -0,0 +1,65 @@ +package pstoreds + +import ( + "bytes" + "context" + "encoding/gob" + + ds "github.com/ipfs/go-datastore" + + pool "github.com/libp2p/go-buffer-pool" + peer "github.com/libp2p/go-libp2p-peer" + pstore "github.com/libp2p/go-libp2p-peerstore" +) + +// Metadata is stored under the following db key pattern: +// /peers/metadata// +var pmBase = ds.NewKey("/peers/metadata") + +type dsPeerMetadata struct { + ds ds.Datastore +} + +var _ pstore.PeerMetadata = (*dsPeerMetadata)(nil) + +func init() { + // Gob registers basic types by default. + // + // Register complex types used by the peerstore itself. + gob.Register(make(map[string]struct{})) +} + +// NewPeerMetadata creates a metadata store backed by a persistent db. It uses gob for serialisation. +// +// See `init()` to learn which types are registered by default. Modules wishing to store +// values of other types will need to `gob.Register()` them explicitly, or else callers +// will receive runtime errors. +func NewPeerMetadata(_ context.Context, store ds.Datastore, _ Options) (pstore.PeerMetadata, error) { + return &dsPeerMetadata{store}, nil +} + +func (pm *dsPeerMetadata) Get(p peer.ID, key string) (interface{}, error) { + k := pmBase.ChildString(peer.IDB58Encode(p)).ChildString(key) + value, err := pm.ds.Get(k) + if err != nil { + if err == ds.ErrNotFound { + err = pstore.ErrNotFound + } + return nil, err + } + + var res interface{} + if err := gob.NewDecoder(bytes.NewReader(value)).Decode(&res); err != nil { + return nil, err + } + return res, nil +} + +func (pm *dsPeerMetadata) Put(p peer.ID, key string, val interface{}) error { + k := pmBase.ChildString(peer.IDB58Encode(p)).ChildString(key) + var buf pool.Buffer + if err := gob.NewEncoder(&buf).Encode(&val); err != nil { + return err + } + return pm.ds.Put(k, buf.Bytes()) +} diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index a3b4d3f174..fc3b6d70e1 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -5,9 +5,10 @@ import ( "time" ds "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" - pstoremem "github.com/libp2p/go-libp2p-peerstore/pstoremem" ) // Configuration object for the peerstore. @@ -42,6 +43,59 @@ func NewPeerstore(ctx context.Context, store ds.TxnDatastore, opts Options) (pst return nil, err } - ps := pstore.NewPeerstore(pstoremem.NewKeyBook(), addrBook, pstoremem.NewPeerMetadata()) + keyBook, err := NewKeyBook(ctx, store, opts) + if err != nil { + return nil, err + } + + peerMetadata, err := NewPeerMetadata(ctx, store, opts) + if err != nil { + return nil, err + } + + ps := pstore.NewPeerstore(keyBook, addrBook, peerMetadata) return ps, nil } + +// uniquePeerIds extracts and returns unique peer IDs from database keys. +func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result query.Result) string) (peer.IDSlice, error) { + var ( + q = query.Query{Prefix: prefix.String(), KeysOnly: true} + results query.Results + err error + ) + + txn, err := ds.NewTransaction(true) + if err != nil { + return peer.IDSlice{}, err + } + defer txn.Discard() + + if results, err = txn.Query(q); err != nil { + log.Error(err) + return peer.IDSlice{}, err + } + + defer results.Close() + + idset := make(map[string]struct{}) + for result := range results.Next() { + k := extractor(result) + idset[k] = struct{}{} + //key := ds.RawKey(result.Key) + //idset[key.Parent().Name()] = struct{}{} + } + + if len(idset) == 0 { + return peer.IDSlice{}, nil + } + + ids := make(peer.IDSlice, len(idset)) + i := 0 + for id := range idset { + pid, _ := peer.IDB58Decode(id) + ids[i] = pid + i++ + } + return ids, nil +} diff --git a/p2p/host/peerstore/pstoremem/metadata.go b/p2p/host/peerstore/pstoremem/metadata.go index 8b58256c16..ba57dd7fbf 100644 --- a/p2p/host/peerstore/pstoremem/metadata.go +++ b/p2p/host/peerstore/pstoremem/metadata.go @@ -10,11 +10,12 @@ import ( type memoryPeerMetadata struct { // store other data, like versions //ds ds.ThreadSafeDatastore - // TODO: use a datastore for this ds map[string]interface{} dslock sync.Mutex } +var _ pstore.PeerMetadata = (*memoryPeerMetadata)(nil) + func NewPeerMetadata() pstore.PeerMetadata { return &memoryPeerMetadata{ ds: make(map[string]interface{}), From 0531abfbccae5d7685afbbeaa28ff6f7f2566cfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Sep 2018 14:18:05 +0100 Subject: [PATCH 2/3] add explicit import aliases. --- p2p/host/peerstore/pstoreds/peerstore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index fc3b6d70e1..34b150649e 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -5,9 +5,9 @@ import ( "time" ds "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - "github.com/libp2p/go-libp2p-peer" + query "github.com/ipfs/go-datastore/query" + peer "github.com/libp2p/go-libp2p-peer" pstore "github.com/libp2p/go-libp2p-peerstore" ) From 78cfb8ba63731b0b218ed53a32a567b0be5af6a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 28 Sep 2018 18:47:41 +0100 Subject: [PATCH 3/3] review comments. --- p2p/host/peerstore/pstoreds/peerstore.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/p2p/host/peerstore/pstoreds/peerstore.go b/p2p/host/peerstore/pstoreds/peerstore.go index 34b150649e..cc3c6f88ab 100644 --- a/p2p/host/peerstore/pstoreds/peerstore.go +++ b/p2p/host/peerstore/pstoreds/peerstore.go @@ -67,13 +67,13 @@ func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result quer txn, err := ds.NewTransaction(true) if err != nil { - return peer.IDSlice{}, err + return nil, err } defer txn.Discard() if results, err = txn.Query(q); err != nil { log.Error(err) - return peer.IDSlice{}, err + return nil, err } defer results.Close() @@ -82,8 +82,6 @@ func uniquePeerIds(ds ds.TxnDatastore, prefix ds.Key, extractor func(result quer for result := range results.Next() { k := extractor(result) idset[k] = struct{}{} - //key := ds.RawKey(result.Key) - //idset[key.Parent().Name()] = struct{}{} } if len(idset) == 0 {