Skip to content

Commit

Permalink
Update to context datastores
Browse files Browse the repository at this point in the history
  • Loading branch information
arajasek committed Dec 11, 2021
1 parent 13d0b5e commit 6715bb5
Show file tree
Hide file tree
Showing 12 changed files with 641 additions and 178 deletions.
3 changes: 2 additions & 1 deletion v2/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package car_test

import (
"context"
"io"
"math/rand"
"os"
Expand Down Expand Up @@ -139,7 +140,7 @@ func generateRandomCarV2File(b *testing.B, path string, minTotalBlockSize int) {
}

blk := merkledag.NewRawNode(buf)
if err := bs.Put(blk); err != nil {
if err := bs.Put(context.TODO(), blk); err != nil {
b.Fatal(err)
}
totalBlockSize += size
Expand Down
3 changes: 2 additions & 1 deletion v2/blockstore/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockstore_test

import (
"context"
"io"
mathrand "math/rand"
"os"
Expand Down Expand Up @@ -65,7 +66,7 @@ func BenchmarkOpenReadOnlyV1(b *testing.B) {
}

for _, c := range shuffledCIDs {
_, err := bs.Get(c)
_, err := bs.Get(context.TODO(), c)
if err != nil {
b.Fatal(err)
}
Expand Down
16 changes: 10 additions & 6 deletions v2/blockstore/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"time"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -57,7 +58,7 @@ func ExampleOpenReadOnly() {
cancel()
break
}
size, err := robs.GetSize(k)
size, err := robs.GetSize(context.TODO(), k)
if err != nil {
panic(err)
}
Expand All @@ -78,6 +79,9 @@ func ExampleOpenReadOnly() {

// ExampleOpenReadWrite creates a read-write blockstore and puts
func ExampleOpenReadWrite() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

thisBlock := merkledag.NewRawNode([]byte("fish")).Block
thatBlock := merkledag.NewRawNode([]byte("lobster")).Block
andTheOtherBlock := merkledag.NewRawNode([]byte("barreleye")).Block
Expand All @@ -96,13 +100,13 @@ func ExampleOpenReadWrite() {

// Put all blocks onto the blockstore.
blocks := []blocks.Block{thisBlock, thatBlock}
if err := rwbs.PutMany(blocks); err != nil {
if err := rwbs.PutMany(ctx, blocks); err != nil {
panic(err)
}
fmt.Printf("Successfully wrote %v blocks into the blockstore.\n", len(blocks))

// Any blocks put can be read back using the same blockstore instance.
block, err := rwbs.Get(thatBlock.Cid())
block, err := rwbs.Get(ctx, thatBlock.Cid())
if err != nil {
panic(err)
}
Expand All @@ -122,21 +126,21 @@ func ExampleOpenReadWrite() {
}

// Put another block, appending it to the set of blocks that are written previously.
if err := resumedRwbos.Put(andTheOtherBlock); err != nil {
if err := resumedRwbos.Put(ctx, andTheOtherBlock); err != nil {
panic(err)
}

// Read back the the block put before resumption.
// Blocks previously put are present.
block, err = resumedRwbos.Get(thatBlock.Cid())
block, err = resumedRwbos.Get(ctx, thatBlock.Cid())
if err != nil {
panic(err)
}
fmt.Printf("Resumed blockstore contains blocks put previously with raw value of `%v`.\n", string(block.RawData()))

// Put an additional block to the CAR.
// Blocks put after resumption are also present.
block, err = resumedRwbos.Get(andTheOtherBlock.Cid())
block, err = resumedRwbos.Get(ctx, andTheOtherBlock.Cid())
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions v2/blockstore/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ func (b *ReadOnly) readBlock(idx int64) (cid.Cid, []byte, error) {
}

// DeleteBlock is unsupported and always errors.
func (b *ReadOnly) DeleteBlock(_ cid.Cid) error {
func (b *ReadOnly) DeleteBlock(_ context.Context, _ cid.Cid) error {
return errReadOnly
}

// Has indicates if the store contains a block that corresponds to the given key.
// This function always returns true for any given key with multihash.IDENTITY code.
func (b *ReadOnly) Has(key cid.Cid) (bool, error) {
func (b *ReadOnly) Has(ctx context.Context, key cid.Cid) (bool, error) {
// Check if the given CID has multihash.IDENTITY code
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if _, ok, err := isIdentity(key); err != nil {
Expand Down Expand Up @@ -241,7 +241,7 @@ func (b *ReadOnly) Has(key cid.Cid) (bool, error) {

// Get gets a block corresponding to the given key.
// This API will always return true if the given key has multihash.IDENTITY code.
func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
func (b *ReadOnly) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
// Check if the given CID has multihash.IDENTITY code
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if digest, ok, err := isIdentity(key); err != nil {
Expand Down Expand Up @@ -293,7 +293,7 @@ func (b *ReadOnly) Get(key cid.Cid) (blocks.Block, error) {
}

// GetSize gets the size of an item corresponding to the given key.
func (b *ReadOnly) GetSize(key cid.Cid) (int, error) {
func (b *ReadOnly) GetSize(ctx context.Context, key cid.Cid) (int, error) {
// Check if the given CID has multihash.IDENTITY code
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if digest, ok, err := isIdentity(key); err != nil {
Expand Down Expand Up @@ -361,12 +361,12 @@ func isIdentity(key cid.Cid) (digest []byte, ok bool, err error) {
}

// Put is not supported and always returns an error.
func (b *ReadOnly) Put(blocks.Block) error {
func (b *ReadOnly) Put(context.Context, blocks.Block) error {
return errReadOnly
}

// PutMany is not supported and always returns an error.
func (b *ReadOnly) PutMany([]blocks.Block) error {
func (b *ReadOnly) PutMany(context.Context, []blocks.Block) error {
return errReadOnly
}

Expand Down
30 changes: 16 additions & 14 deletions v2/blockstore/readonly_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestReadOnlyGetReturnsBlockstoreNotFoundWhenCidDoesNotExist(t *testing.T) {
nonExistingKey := merkledag.NewRawNode([]byte("lobstermuncher")).Block.Cid()

// Assert blockstore API returns blockstore.ErrNotFound
gotBlock, err := subject.Get(nonExistingKey)
gotBlock, err := subject.Get(context.TODO(), nonExistingKey)
require.Equal(t, blockstore.ErrNotFound, err)
require.Nil(t, gotBlock)
}
Expand Down Expand Up @@ -58,6 +58,7 @@ func TestReadOnly(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.TODO()
subject, err := OpenReadOnly(tt.v1OrV2path, tt.opts...)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })
Expand Down Expand Up @@ -87,25 +88,25 @@ func TestReadOnly(t *testing.T) {
wantCids = append(wantCids, key)

// Assert blockstore contains key.
has, err := subject.Has(key)
has, err := subject.Has(ctx, key)
require.NoError(t, err)
require.True(t, has)

// Assert size matches block raw data length.
gotSize, err := subject.GetSize(key)
gotSize, err := subject.GetSize(ctx, key)
wantSize := len(wantBlock.RawData())
require.NoError(t, err)
require.Equal(t, wantSize, gotSize)

// Assert block itself matches v1 payload block.
gotBlock, err := subject.Get(key)
gotBlock, err := subject.Get(ctx, key)
require.NoError(t, err)
require.Equal(t, wantBlock, gotBlock)

// Assert write operations error
require.Error(t, subject.Put(wantBlock))
require.Error(t, subject.PutMany([]blocks.Block{wantBlock}))
require.Error(t, subject.DeleteBlock(key))
require.Error(t, subject.Put(ctx, wantBlock))
require.Error(t, subject.PutMany(ctx, []blocks.Block{wantBlock}))
require.Error(t, subject.DeleteBlock(ctx, key))
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
Expand Down Expand Up @@ -239,15 +240,16 @@ func newV1Reader(r io.Reader, zeroLenSectionAsEOF bool) (*carv1.CarReader, error

func TestReadOnlyErrorAfterClose(t *testing.T) {
bs, err := OpenReadOnly("../testdata/sample-v1.car")
ctx := context.TODO()
require.NoError(t, err)

roots, err := bs.Roots()
require.NoError(t, err)
_, err = bs.Has(roots[0])
_, err = bs.Has(ctx, roots[0])
require.NoError(t, err)
_, err = bs.Get(roots[0])
_, err = bs.Get(ctx, roots[0])
require.NoError(t, err)
_, err = bs.GetSize(roots[0])
_, err = bs.GetSize(ctx, roots[0])
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -259,11 +261,11 @@ func TestReadOnlyErrorAfterClose(t *testing.T) {

_, err = bs.Roots()
require.Error(t, err)
_, err = bs.Has(roots[0])
_, err = bs.Has(ctx, roots[0])
require.Error(t, err)
_, err = bs.Get(roots[0])
_, err = bs.Get(ctx, roots[0])
require.Error(t, err)
_, err = bs.GetSize(roots[0])
_, err = bs.GetSize(ctx, roots[0])
require.Error(t, err)
_, err = bs.AllKeysChan(ctx)
require.Error(t, err)
Expand Down Expand Up @@ -293,7 +295,7 @@ func TestNewReadOnly_CarV1WithoutIndexWorksAsExpected(t *testing.T) {
require.NoError(t, err)

// Require that the block is found via ReadOnly API and contetns are as expected.
gotBlock, err := subject.Get(wantBlock.Cid())
gotBlock, err := subject.Get(context.TODO(), wantBlock.Cid())
require.NoError(t, err)
require.Equal(t, wantBlock, gotBlock)
}
20 changes: 10 additions & 10 deletions v2/blockstore/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,14 @@ func (b *ReadWrite) unfinalize() error {
}

// Put puts a given block to the underlying datastore
func (b *ReadWrite) Put(blk blocks.Block) error {
func (b *ReadWrite) Put(ctx context.Context, blk blocks.Block) error {
// PutMany already checks b.ronly.closed.
return b.PutMany([]blocks.Block{blk})
return b.PutMany(ctx, []blocks.Block{blk})
}

// PutMany puts a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
func (b *ReadWrite) PutMany(blks []blocks.Block) error {
func (b *ReadWrite) PutMany(ctx context.Context, blks []blocks.Block) error {
b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

Expand Down Expand Up @@ -393,19 +393,19 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.ronly.AllKeysChan(ctx)
}

func (b *ReadWrite) Has(key cid.Cid) (bool, error) {
return b.ronly.Has(key)
func (b *ReadWrite) Has(ctx context.Context, key cid.Cid) (bool, error) {
return b.ronly.Has(ctx, key)
}

func (b *ReadWrite) Get(key cid.Cid) (blocks.Block, error) {
return b.ronly.Get(key)
func (b *ReadWrite) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
return b.ronly.Get(ctx, key)
}

func (b *ReadWrite) GetSize(key cid.Cid) (int, error) {
return b.ronly.GetSize(key)
func (b *ReadWrite) GetSize(ctx context.Context, key cid.Cid) (int, error) {
return b.ronly.GetSize(ctx, key)
}

func (b *ReadWrite) DeleteBlock(_ cid.Cid) error {
func (b *ReadWrite) DeleteBlock(_ context.Context, _ cid.Cid) error {
return fmt.Errorf("ReadWrite blockstore does not support deleting blocks")
}

Expand Down
Loading

0 comments on commit 6715bb5

Please sign in to comment.