Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockstore on all dagstore cids #116

Merged
merged 23 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
70b915e
blockstore on all dagstore cids
aarshkshah1992 Jan 21, 2022
72f676f
double caching
aarshkshah1992 Jan 21, 2022
38d0ceb
key by mh
aarshkshah1992 Jan 21, 2022
bc3cae8
ensure we close shard accessors
aarshkshah1992 Jan 27, 2022
15ec8e3
better logging
aarshkshah1992 Jan 27, 2022
fc7ddff
Merge branch 'master' into feat/blockstore
aarshkshah1992 Mar 29, 2022
bdfb1ec
ready for review
aarshkshah1992 Mar 29, 2022
388f51f
Apply suggestions from code review
aarshkshah1992 Mar 29, 2022
8d9286a
changes as per review
aarshkshah1992 Mar 29, 2022
8387a9e
thread safe
aarshkshah1992 Mar 29, 2022
7bd999d
better docs
aarshkshah1992 Mar 29, 2022
5139685
remove redundant param
aarshkshah1992 Mar 30, 2022
a8c50b8
Merge branch 'master' into feat/blockstore
hannahhoward Aug 18, 2022
8027d20
chore(deps): upgrade deps
hannahhoward Aug 18, 2022
aba4e75
refactor(indexbs): use dagstore.Interface
hannahhoward Aug 18, 2022
5fd6621
feat: support GetSize for index backed blockstore
dirkmc Sep 5, 2022
0b07b8f
refactor: simplify locking in index-backed blockstore
dirkmc Sep 12, 2022
74fd751
refactor: ref-count blockstore acquires so as to close exactly once
dirkmc Sep 13, 2022
2a000e2
Merge pull request #142 from filecoin-project/feat/blockstore-ref-count
dirkmc Sep 15, 2022
194d8cf
feat: index-backed blockstore - synchronize acquires
dirkmc Sep 16, 2022
5c8caac
refactor: use global context for Acquireshard
dirkmc Sep 19, 2022
173a32a
Merge pull request #144 from filecoin-project/feat/ibs-acquire-sync
dirkmc Sep 19, 2022
4e73a53
Merge branch 'master' of github.com:filecoin-project/dagstore into fe…
dirkmc Sep 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipld/go-car/v2 v2.1.1
Expand Down
3 changes: 3 additions & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package dagstore
import (
"context"

blockstore "github.com/ipfs/go-ipfs-blockstore"

carindex "github.com/ipld/go-car/v2/index"
mh "github.com/multiformats/go-multihash"

Expand All @@ -24,4 +26,5 @@ type Interface interface {
ShardsContainingMultihash(ctx context.Context, h mh.Multihash) ([]shard.Key, error)
GC(ctx context.Context) (*GCResult, error)
Close() error
AllShardsReadBlockstore(shardSelector ShardSelectorF, maxBSCachesize int, maxBlkCachesize int) (blockstore.Blockstore, error)
}
207 changes: 207 additions & 0 deletions readonly_bs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package dagstore

import (
"context"
"errors"
"fmt"

logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

var logbs = logging.Logger("dagstore-all-readblockstore")

var _ blockstore.Blockstore = (*AllShardsReadBlockstore)(nil)

// ShardSelectorF helps select a shard to fetch a cid from if the given cid is present in multiple shards.
type ShardSelectorF func(c cid.Cid, shards []shard.Key) (shard.Key, error)

type accessorWithBlockstore struct {
sa *ShardAccessor
bs ReadBlockstore
}

// AllShardsReadBlockstore is a read only blockstore over all cids across all shards in the dagstore.
type AllShardsReadBlockstore struct {
d *DAGStore
shardSelectF ShardSelectorF

// caches the blockstore for a given shard for shard read affinity i.e. further reads will likely be from the same shard.
// shard key -> read only blockstore
blockstoreCache *lru.Cache

// caches the blocks themselves -> can be scaled by using a redis/memcache etc distributed cache.
// multihash -> block
blockCache *lru.Cache
}

func (d *DAGStore) AllShardsReadBlockstore(shardSelector ShardSelectorF, maxCacheSize int, maxBlocks int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
// ensure we close the blockstore for a shard when it's evicted from the cache so dagstore can gc it.
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
}

// instantiate the block cache
blkLru, err := lru.New(maxBlocks)
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for blocks: %w", err)
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

return &AllShardsReadBlockstore{
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
blockCache: blkLru,
}, nil
}

func (ro *AllShardsReadBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, finalErr error) {
logbs.Debugw("bitswap Get called", "cid", c)
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
if finalErr != nil {
logbs.Debugw("bitswap Get: got error", "cid", c, "error", finalErr)
}
}()

mhash := c.Hash()
// do we have the block cached ?
if val, ok := ro.blockCache.Get(mhash.String()); ok {
logbs.Debugw("bitswap Get: returning from block cache", "cid", c)
return val.(blocks.Block), nil
}

// fetch all the shards containing the multihash
shards, err := ro.d.ShardsContainingMultihash(ctx, mhash)
if err != nil {
return nil, fmt.Errorf("failed to fetch shards containing the block: %w", err)
}
if len(shards) == 0 {
return nil, errors.New("no shards contain the requested block")
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}

// do we have a cached blockstore for a shard containing the required block ? If yes, serve the block from that blockstore
for _, sk := range shards {
// a valid cache hit here updates the priority of the shard's blockstore in the LRU cache.
val, ok := ro.blockstoreCache.Get(sk)
if !ok {
continue
}

rbs := val.(*accessorWithBlockstore).bs
blk, err := rbs.Get(ctx, c)
if err != nil {
ro.blockstoreCache.Remove(sk)
continue
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

// add the block to the block cache
logbs.Debugw("bitswap Get: returning from block store cache", "cid", c)
ro.blockCache.Add(mhash.String(), blk)
return blk, nil
}

// ---- we don't have a cached blockstore for a shard that can serve the block -> let's build one.

// select a valid shard that can serve the retrieval
sk, err := ro.shardSelectF(c, shards)
if err != nil {
return nil, fmt.Errorf("failed to select a shard: %w", err)
}

// load blockstore for the selected shard and try to serve the cid from that blockstore.
resch := make(chan ShardResult, 1)
if err := ro.d.AcquireShard(ctx, sk, resch, AcquireOpts{}); err != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
}
var res ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, res.Error)
}
}

sa := res.Accessor
bs, err := sa.Blockstore()
if err != nil {
return nil, fmt.Errorf("failed to load read only blockstore for shard %s: %w", sk, err)
}

blk, err := bs.Get(ctx, c)
if err != nil {
return nil, fmt.Errorf("failed to get block: %w", err)
}

// update the block cache and the blockstore cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
ro.blockCache.Add(mhash.String(), blk)

logbs.Debugw("bitswap Get: returning after creating new blockstore", "cid", c)
return blk, nil
}

func (ro *AllShardsReadBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
logbs.Debugw("bitswap Has called", "cid", c)

// if there is a shard that can serve the retrieval for the given cid, we have the requested cid
// and has should return true.
shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash())
if err != nil {
logbs.Debugw("bitswap Has error", "cid", c, "err", err)
return false, fmt.Errorf("failed to fetch shards containing the multihash %w", err)
}
if len(shards) == 0 {
logbs.Debugw("bitswap Has: returning false no error", "cid", c)
return false, nil
}

_, err = ro.shardSelectF(c, shards)
if err != nil {
logbs.Debugw("bitswap Has error", "cid", c, "err", err)
return false, fmt.Errorf("failed to select a shard: %w", err)
}
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

logbs.Debugw("bitswap Has: returning true", "cid", c)
return true, nil
}

func (ro *AllShardsReadBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
logbs.Debugw("bitswap GetSize called", "cid", c)

blk, err := ro.Get(ctx, c)
if err != nil {
logbs.Debugw("bitswap GetSize error", "cid", c, "err", err)
return 0, fmt.Errorf("failed to get block: %w", err)
}

logbs.Debugw("bitswap GetSize success", "cid", c)
return len(blk.RawData()), nil
}

// --- UNSUPPORTED BLOCKSTORE METHODS -------
func (ro *AllShardsReadBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return errors.New("unsupported operation DeleteBlock")
}
func (ro *AllShardsReadBlockstore) HashOnRead(_ bool) {}
func (ro *AllShardsReadBlockstore) Put(context.Context, blocks.Block) error {
return errors.New("unsupported operation Put")
}
func (ro *AllShardsReadBlockstore) PutMany(context.Context, []blocks.Block) error {
return errors.New("unsupported operation PutMany")
}
func (ro *AllShardsReadBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}
90 changes: 90 additions & 0 deletions readonly_bs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package dagstore

import (
"context"
"errors"
"testing"

"github.com/multiformats/go-multihash"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)

var noOpSelector = func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
return shards[0], nil
}

func TestReadOnlyBs(t *testing.T) {
ctx := context.Background()
store := dssync.MutexWrap(datastore.NewMapDatastore())
dagst, err := NewDAGStore(Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: store,
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// two shards containing the same cid
keys := registerShards(t, dagst, 2, carv2mnt, RegisterOpts{})

rbs, err := dagst.AllShardsReadBlockstore(noOpSelector, 10, 10)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
// works for each of those cids
it, err := dagst.GetIterableIndex(keys[0])
require.NoError(t, err)

it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)

has, err := rbs.Has(ctx, c)
require.NoError(t, err)
require.True(t, has)

blk, err := rbs.Get(ctx, c)
require.NoError(t, err)
require.NotEmpty(t, blk)

sz, err := rbs.GetSize(ctx, c)
require.NoError(t, err)
require.EqualValues(t, len(blk.RawData()), sz)

require.EqualValues(t, c, blk.Cid())
return nil
})

// ------------------------------------------
// Now test with a shard selector that rejects everything and ensure we always see errors
fss := func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
return shard.Key{}, errors.New("rejected")
}

rbs, err = dagst.AllShardsReadBlockstore(fss, 10, 10)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)

has, err := rbs.Has(ctx, c)
require.Error(t, err)
require.False(t, has)

blk, err := rbs.Get(ctx, c)
require.Error(t, err)
require.Empty(t, blk)

sz, err := rbs.GetSize(ctx, c)
require.Error(t, err)
require.EqualValues(t, 0, sz)

return nil
})

}