Skip to content
This repository was archived by the owner on Nov 22, 2023. It is now read-only.

Commit

Permalink
fix: index tx keys (#96)
Browse files Browse the repository at this point in the history
* add each entry in ref.keys map

* fix isLocal and related tests

* add keys in ref when missing

* add switch case

* use OpenPullDataChannel in handle request

* refactor utils.MapKeys

* add SetRef before OpenPullDataChannel

* add error logs

* add minor refactors & improvements

* add minor refactors & improvements

* fix MapKeys formatting

Co-authored-by: tchardin <tdotchardin@gmail.com>
  • Loading branch information
gallexis and tchardin authored Jun 8, 2021
1 parent 85f073c commit 93816c8
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 65 deletions.
14 changes: 12 additions & 2 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,21 @@ func (e *Exchange) FindAndRetrieve(ctx context.Context, root cid.Cid) error {
if res.Err != nil {
return res.Err
}
return e.idx.SetRef(&DataRef{

keys, err := utils.MapKeys(ctx, root, tx.Store().Loader)
if err != nil {
return err
}

ref := &DataRef{
PayloadCID: root,
StoreID: tx.StoreID(),
PayloadSize: int64(res.Size),
})
Keys: keys.AsBytes(),
}

return e.idx.SetRef(ref)

case <-ctx.Done():
return ctx.Err()
}
Expand Down
10 changes: 10 additions & 0 deletions exchange/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,22 @@ type DataRef struct {
PayloadCID cid.Cid
PayloadSize int64
StoreID multistore.StoreID
Keys [][]byte
Freq int64
BucketID int64
// do not serialize
bucketNode *list.Element
}

func (d DataRef) Has(key string) bool {
for _, elt := range d.Keys {
if bytes.Compare(elt, []byte(key)) == 0 {
return true
}
}
return false
}

// IndexOption customizes the behavior of the index
type IndexOption func(*Index)

Expand Down
83 changes: 82 additions & 1 deletion exchange/index_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 57 additions & 12 deletions exchange/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/myelnet/pop/internal/utils"
sel "github.com/myelnet/pop/selectors"
)

Expand Down Expand Up @@ -137,14 +138,6 @@ func NewReplication(h host.Host, idx *Index, dt datatransfer.Manager, rtv Routed
r.dt.RegisterTransportConfigurer(&Request{}, TransportConfigurer(r.idx, r, h.ID()))
r.emitter, _ = h.EventBus().Emitter(new(IndexEvt))

// TODO: clean this up
r.dt.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.Error && channelState.Recipient() == h.ID() {
// If transfers fail and we're the recipient we need to remove it from our index
r.idx.DropRef(channelState.BaseCID())
}
})

return r
}

Expand Down Expand Up @@ -333,27 +326,79 @@ func (r *Replication) handleRequest(s network.Stream) {
defer rs.Close()
req, err := rs.ReadRequest()
if err != nil {
fmt.Println("error when reading stream request :", err)
return
}

// Only the dispatch method is streamed directly at this time
switch req.Method {
case Dispatch:
// TODO: validate request
// Create a new store to receive our new blocks
// It will be automatically picked up in the TransportConfigurer
storeID := r.idx.ms.Next()
err = r.idx.SetRef(&DataRef{

_, err := r.idx.GetRef(req.PayloadCID)
if err == nil {
fmt.Printf("Payload CID %s already exists\n", req.PayloadCID.String())
return
}

ref := &DataRef{
PayloadCID: req.PayloadCID,
PayloadSize: int64(req.Size),
StoreID: storeID,
})
Keys: [][]byte{},
}

err = r.idx.SetRef(ref)
if err != nil {
return
fmt.Println("error when setting ref before OpenPullDataChannel :", err)
}
_, err = r.dt.OpenPullDataChannel(context.TODO(), p, &req, req.PayloadCID, sel.All())

ctx := context.Background()
chid, err := r.dt.OpenPullDataChannel(ctx, p, &req, req.PayloadCID, sel.All())
if err != nil {
fmt.Println("error when opening channel data channel :", err)
return
}

for {
state, err := r.dt.ChannelState(ctx, chid)
if err != nil {
fmt.Println("error when fetching channel state :", err)
return
}

switch state.Status() {
case datatransfer.Failed, datatransfer.Cancelled:
err = r.idx.DropRef(state.BaseCID())
if err != nil {
fmt.Println("error when droping ref :", err)
}
return

case datatransfer.Completed:
store, err := r.idx.ms.Get(storeID)
if err != nil {
fmt.Println("error when fetching store :", err)
return
}

keys, err := utils.MapKeys(ctx, ref.PayloadCID, store.Loader)
if err != nil {
fmt.Println("error when fetching keys :", err)
return
}
ref.Keys = keys.AsBytes()

err = r.idx.SetRef(ref)
if err != nil {
fmt.Println("error when setting ref :", err)
}
return
}
}
}
}

Expand Down
77 changes: 46 additions & 31 deletions exchange/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
mh "github.com/multiformats/go-multihash"
"github.com/myelnet/pop/filecoin"
"github.com/myelnet/pop/internal/utils"
"github.com/myelnet/pop/retrieval"
"github.com/myelnet/pop/retrieval/deal"
)
Expand Down Expand Up @@ -269,10 +270,26 @@ func (tx *Tx) buildRoot() error {

// Ref returns the DataRef associated with this transaction
func (tx *Tx) Ref() *DataRef {
var keys [][]byte

if len(tx.entries) == 0 {
for k := range tx.entries {
keys = append(keys, []byte(k))
}
} else {
kl, err := utils.MapKeys(context.TODO(), tx.root, tx.Store().Loader)
if err != nil {
tx.Err = err
} else {
keys = kl.AsBytes()
}
}

return &DataRef{
PayloadCID: tx.root,
StoreID: tx.storeID,
PayloadSize: tx.size,
Keys: keys,
}
}

Expand All @@ -281,14 +298,27 @@ func (tx *Tx) Commit() error {
if tx.Err != nil {
return tx.Err
}
err := tx.index.SetRef(&DataRef{

ref := &DataRef{
PayloadCID: tx.root,
StoreID: tx.storeID,
PayloadSize: tx.size,
})
}

entries, err := tx.GetEntries()
if err != nil {
return err
}

for _, key := range entries {
ref.Keys = append(ref.Keys, []byte(key))
}

err = tx.index.SetRef(ref)
if err != nil {
return err
}

opts := DefaultDispatchOptions
if tx.cacheRF > 0 {
opts.RF = tx.cacheRF
Expand Down Expand Up @@ -329,15 +359,21 @@ func (tx *Tx) GetFile(k string) (files.Node, error) {
}

// IsLocal tells us if this node is storing the content of this transaction or if it needs to retrieve it
func (tx *Tx) IsLocal() bool {
// We have entries this means the content from this root is stored locally
if len(tx.entries) > 0 {
func (tx *Tx) IsLocal(key string) bool {
_, exists := tx.entries[key]
if exists {
return true
}
if _, err := tx.index.GetRef(tx.root); err != nil {

ref, err := tx.index.GetRef(tx.root)
if err != nil {
return false
}
return true
if ref != nil {
return ref.Has(key)
}

return false
}

// GetEntries retrieves all the entries associated with the root of this transaction
Expand All @@ -353,35 +389,14 @@ func (tx *Tx) GetEntries() ([]string, error) {
}
return entries, nil
}

if ref, err := tx.index.GetRef(tx.root); err == nil {
store, err := tx.ms.Get(ref.StoreID)
if err != nil {
return nil, err
}
lk := cidlink.Link{Cid: tx.root}
nb := basicnode.Prototype.Map.NewBuilder()
err = lk.Load(tx.ctx, ipld.LinkContext{}, nb, store.Loader)
if err != nil {
return nil, err
}
nd := nb.Build()
entries := make([]string, nd.Length())
it := nd.MapIterator()
i := 0
for !it.Done() {
k, _, err := it.Next()
// all succeed of fail
if err != nil {
return nil, err
}
key, err := k.AsString()
if err != nil {
return nil, err
}
entries[i] = key
i++
}
return entries, nil

return utils.MapKeys(tx.ctx, ref.PayloadCID, store.Loader)
}
return nil, fmt.Errorf("failed to get entried")
}
Expand Down
Loading

0 comments on commit 93816c8

Please sign in to comment.