Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockstore on all dagstore cids #116

Merged
merged 23 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
70b915e
blockstore on all dagstore cids
aarshkshah1992 Jan 21, 2022
72f676f
double caching
aarshkshah1992 Jan 21, 2022
38d0ceb
key by mh
aarshkshah1992 Jan 21, 2022
bc3cae8
ensure we close shard accessors
aarshkshah1992 Jan 27, 2022
15ec8e3
better logging
aarshkshah1992 Jan 27, 2022
fc7ddff
Merge branch 'master' into feat/blockstore
aarshkshah1992 Mar 29, 2022
bdfb1ec
ready for review
aarshkshah1992 Mar 29, 2022
388f51f
Apply suggestions from code review
aarshkshah1992 Mar 29, 2022
8d9286a
changes as per review
aarshkshah1992 Mar 29, 2022
8387a9e
thread safe
aarshkshah1992 Mar 29, 2022
7bd999d
better docs
aarshkshah1992 Mar 29, 2022
5139685
remove redundant param
aarshkshah1992 Mar 30, 2022
a8c50b8
Merge branch 'master' into feat/blockstore
hannahhoward Aug 18, 2022
8027d20
chore(deps): upgrade deps
hannahhoward Aug 18, 2022
aba4e75
refactor(indexbs): use dagstore.Interface
hannahhoward Aug 18, 2022
5fd6621
feat: support GetSize for index backed blockstore
dirkmc Sep 5, 2022
0b07b8f
refactor: simplify locking in index-backed blockstore
dirkmc Sep 12, 2022
74fd751
refactor: ref-count blockstore acquires so as to close exactly once
dirkmc Sep 13, 2022
2a000e2
Merge pull request #142 from filecoin-project/feat/blockstore-ref-count
dirkmc Sep 15, 2022
194d8cf
feat: index-backed blockstore - synchronize acquires
dirkmc Sep 16, 2022
5c8caac
refactor: use global context for Acquireshard
dirkmc Sep 19, 2022
173a32a
Merge pull request #144 from filecoin-project/feat/ibs-acquire-sync
dirkmc Sep 19, 2022
4e73a53
Merge branch 'master' of github.com:filecoin-project/dagstore into fe…
dirkmc Sep 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ module github.com/filecoin-project/dagstore
go 1.16

require (
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.5.0
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-ipfs-blockstore v1.1.2
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipld/go-car/v2 v2.1.1
Expand Down
232 changes: 232 additions & 0 deletions indexbs/indexbacked_bs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package indexbs

import (
"context"
"errors"
"fmt"
"sync"

"github.com/filecoin-project/dagstore"
blocks "github.com/ipfs/go-block-format"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/dagstore/shard"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)

var logbs = logging.Logger("dagstore-all-readblockstore")

var ErrBlockNotFound = errors.New("block not found")

var _ blockstore.Blockstore = (*IndexBackedBlockstore)(nil)

// ErrNoShardSelected means that the shard selection function rejected all of the given shards.
var ErrNoShardSelected = errors.New("no shard selected")

// ShardSelectorF helps select a shard to fetch a cid from if the given cid is present in multiple shards.
// It should return `ErrNoShardSelected` if none of the given shard is selected.
type ShardSelectorF func(c cid.Cid, shards []shard.Key) (shard.Key, error)

type accessorWithBlockstore struct {
sa *dagstore.ShardAccessor
bs dagstore.ReadBlockstore
}

// IndexBackedBlockstore is a read only blockstore over all cids across all shards in the dagstore.
type IndexBackedBlockstore struct {
d *dagstore.DAGStore
shardSelectF ShardSelectorF

bsStripedLocks [256]sync.Mutex
blockstoreCache *lru.Cache // caches the blockstore for a given shard for shard read affinity i.e. further reads will likely be from the same shard. Maps (shard key -> blockstore).
}

func NewIndexBackedBlockstore(d *dagstore.DAGStore, shardSelector ShardSelectorF, maxCacheSize int, maxBlocks int) (blockstore.Blockstore, error) {
// instantiate the blockstore cache
bslru, err := lru.NewWithEvict(maxCacheSize, func(_ interface{}, val interface{}) {
// ensure we close the blockstore for a shard when it's evicted from the cache so dagstore can gc it.
abs := val.(*accessorWithBlockstore)
abs.sa.Close()
})
if err != nil {
return nil, fmt.Errorf("failed to create lru cache for read only blockstores")
}

return &IndexBackedBlockstore{
d: d,
shardSelectF: shardSelector,
blockstoreCache: bslru,
}, nil
}

func (ro *IndexBackedBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, finalErr error) {
logbs.Debugw("Get called", "cid", c)
defer func() {
if finalErr != nil {
logbs.Debugw("Get: got error", "cid", c, "error", finalErr)
}
}()

mhash := c.Hash()

// fetch all the shards containing the multihash
shards, err := ro.d.ShardsContainingMultihash(ctx, mhash)
if err != nil {
return nil, fmt.Errorf("failed to fetch shards containing the block: %w", err)
}
if len(shards) == 0 {
return nil, ErrBlockNotFound
}

// do we have a cached blockstore for a shard containing the required block ? If yes, serve the block from that blockstore
for _, sk := range shards {
lk := &ro.bsStripedLocks[shardKeyToStriped(sk)]
lk.Lock()

blk, err := ro.readFromBSCacheUnlocked(ctx, c, sk)
if err == nil && blk != nil {
logbs.Debugw("Get: returning from block store cache", "cid", c)

lk.Unlock()
return blk, nil
}

lk.Unlock()
}

// ---- we don't have a cached blockstore for a shard that can serve the block -> let's build one.

// select a valid shard that can serve the retrieval
sk, err := ro.shardSelectF(c, shards)
if err != nil && err == ErrNoShardSelected {
return nil, ErrBlockNotFound
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}
if err != nil {
return nil, fmt.Errorf("failed to run shard selection function: %w", err)
}

lk := &ro.bsStripedLocks[shardKeyToStriped(sk)]
lk.Lock()
defer lk.Unlock()

// see if we have blockstore in the cache we can serve the retrieval from as the previous code in this critical section
// could have added a blockstore to the cache for the given shard key.
blk, err := ro.readFromBSCacheUnlocked(ctx, c, sk)
if err == nil && blk != nil {
return blk, nil
}

// load blockstore for the selected shard and try to serve the cid from that blockstore.
resch := make(chan dagstore.ShardResult, 1)
if err := ro.d.AcquireShard(ctx, sk, resch, dagstore.AcquireOpts{}); err != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, err)
}
var res dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, fmt.Errorf("failed to acquire shard %s: %w", sk, res.Error)
}
}

sa := res.Accessor
bs, err := sa.Blockstore()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to load read only blockstore for shard %s: %w", sk, err)
}

blk, err = bs.Get(ctx, c)
if err != nil {
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("failed to get block: %w", err)
}

// update the block cache and the blockstore cache
ro.blockstoreCache.Add(sk, &accessorWithBlockstore{sa, bs})

logbs.Debugw("Get: returning after creating new blockstore", "cid", c)
return blk, nil
}

func (ro *IndexBackedBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
logbs.Debugw("Has called", "cid", c)

// if there is a shard that can serve the retrieval for the given cid, we have the requested cid
// and has should return true.
shards, err := ro.d.ShardsContainingMultihash(ctx, c.Hash())
if err != nil {
logbs.Debugw("Has error", "cid", c, "err", err)
return false, nil
}
if len(shards) == 0 {
logbs.Debugw("Has: returning false no error", "cid", c)
return false, nil
}

_, err = ro.shardSelectF(c, shards)
if err != nil && err == ErrNoShardSelected {
logbs.Debugw("Has error", "cid", c, "err", err)
return false, nil
}
if err != nil {
logbs.Debugw("Has error", "cid", c, "err", err)
return false, fmt.Errorf("failed to run shard selection function: %w", err)
}

logbs.Debugw("Has: returning true", "cid", c)
return true, nil
}

func (ro *IndexBackedBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
logbs.Debugw("GetSize called", "cid", c)

blk, err := ro.Get(ctx, c)
if err != nil {
logbs.Debugw("GetSize error", "cid", c, "err", err)
return 0, fmt.Errorf("failed to get block: %w", err)
}

logbs.Debugw("GetSize success", "cid", c)
return len(blk.RawData()), nil
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
}

func (ro *IndexBackedBlockstore) readFromBSCacheUnlocked(ctx context.Context, c cid.Cid, sk shard.Key) (blocks.Block, error) {
val, ok := ro.blockstoreCache.Get(sk)
if !ok {
return nil, ErrBlockNotFound
}

rbs := val.(*accessorWithBlockstore).bs
blk, err := rbs.Get(ctx, c)
if err != nil {
// we know that the cid we want to lookup belongs to a shard with key `sk` and
// so if we fail to get the corresponding block from the blockstore for that shards, something has gone wrong
// and we should remove the blockstore for that shard from our cache.
ro.blockstoreCache.Remove(sk)
return nil, err
}
aarshkshah1992 marked this conversation as resolved.
Show resolved Hide resolved

return blk, nil
}

func shardKeyToStriped(sk shard.Key) byte {
return sk.String()[len(sk.String())-1]
}

// --- UNSUPPORTED BLOCKSTORE METHODS -------
func (ro *IndexBackedBlockstore) DeleteBlock(context.Context, cid.Cid) error {
return errors.New("unsupported operation DeleteBlock")
}
func (ro *IndexBackedBlockstore) HashOnRead(_ bool) {}
func (ro *IndexBackedBlockstore) Put(context.Context, blocks.Block) error {
return errors.New("unsupported operation Put")
}
func (ro *IndexBackedBlockstore) PutMany(context.Context, []blocks.Block) error {
return errors.New("unsupported operation PutMany")
}
func (ro *IndexBackedBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("unsupported operation AllKeysChan")
}
137 changes: 137 additions & 0 deletions indexbs/indexbacked_bs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package indexbs

import (
"context"
"errors"
"testing"

"golang.org/x/sync/errgroup"

"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/testdata"

"github.com/multiformats/go-multihash"

"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)

var noOpSelector = func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
return shards[0], nil
}

var carv2mnt = &mount.FSMount{FS: testdata.FS, Path: testdata.FSPathCarV2}

func TestReadOnlyBs(t *testing.T) {
ctx := context.Background()
store := dssync.MutexWrap(datastore.NewMapDatastore())
dagst, err := dagstore.NewDAGStore(dagstore.Config{
MountRegistry: testRegistry(t),
TransientsDir: t.TempDir(),
Datastore: store,
})
require.NoError(t, err)

err = dagst.Start(context.Background())
require.NoError(t, err)

// register a shard
ch := make(chan dagstore.ShardResult, 1)
sk := shard.KeyFromString("test1")
err = dagst.RegisterShard(context.Background(), sk, carv2mnt, ch, dagstore.RegisterOpts{})
require.NoError(t, err)
res := <-ch
require.NoError(t, res.Error)

rbs, err := NewIndexBackedBlockstore(dagst, noOpSelector, 10, 10)
require.NoError(t, err)

// iterate over the CARV2 Index for the given CARv2 file and ensure the readonly blockstore
// works for each of those cids
it, err := dagst.GetIterableIndex(sk)
require.NoError(t, err)

var errg errgroup.Group

it.ForEach(func(mh multihash.Multihash, _ uint64) error {

mhs := mh
errg.Go(func() error {
c := cid.NewCidV1(cid.Raw, mhs)

// Has
has, err := rbs.Has(ctx, c)
if err != nil {
return err
}
if !has {
return errors.New("has should be true")
}

// Get
blk, err := rbs.Get(ctx, c)
if err != nil {
return err
}
if blk == nil {
return errors.New("block should not be empty")
}

// GetSize
_, err = rbs.GetSize(ctx, c)
if err != nil {
return err
}

// ensure cids match
if blk.Cid() != c {
return errors.New("cid mismatch")
}
return nil

})

return nil
})

require.NoError(t, errg.Wait())

// ------------------------------------------
// Now test with a shard selector that rejects everything and ensure we always see errors
fss := func(c cid.Cid, shards []shard.Key) (shard.Key, error) {
return shard.Key{}, errors.New("rejected")
}

rbs, err = NewIndexBackedBlockstore(dagst, fss, 10, 10)
require.NoError(t, err)
it.ForEach(func(mh multihash.Multihash, u uint64) error {
c := cid.NewCidV1(cid.Raw, mh)

has, err := rbs.Has(ctx, c)
require.Error(t, err)
require.False(t, has)

blk, err := rbs.Get(ctx, c)
require.Error(t, err)
require.Empty(t, blk)

sz, err := rbs.GetSize(ctx, c)
require.Error(t, err)
require.EqualValues(t, 0, sz)

return nil
})
}

func testRegistry(t *testing.T) *mount.Registry {
r := mount.NewRegistry()
err := r.Register("fs", &mount.FSMount{FS: testdata.FS})
require.NoError(t, err)
err = r.Register("counting", new(mount.Counting))
require.NoError(t, err)
return r
}