Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

sync pinner on every pin operation #13

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,17 @@ func New(ctx context.Context, dstore ds.Datastore, dserv ipld.DAGService) (ipfsp
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) (perr error) {
log.Infof("pinning cid %v, recurse=%v", node.Cid().String(), recurse)
var found bool

// flush if the pin was not already present and it was successfully added to the stores
defer func() {
if perr == nil && !found {
perr = p.Flush(ctx)
}
}()

err := p.dserv.Add(ctx, node)
if err != nil {
return err
Expand All @@ -177,7 +187,7 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
defer p.lock.Unlock()

if recurse {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +231,7 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
return err
}
} else {
found, err := p.cidRIndex.HasAny(ctx, cidKey)
found, err = p.cidRIndex.HasAny(ctx, cidKey)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +328,17 @@ func (p *pinner) removePin(ctx context.Context, pp *pin) error {
}

// Unpin a given key
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) error {
func (p *pinner) Unpin(ctx context.Context, c cid.Cid, recursive bool) (perr error) {
log.Infof("unpinning cid %v, recurse=%v", c.String(), recursive)

// Flush after a removal. Occasionally, removal is a nop and flush is unnecessary;
// however in the interest of simplicity, we still flush even in these rare cases.
defer func() {
if perr == nil {
perr = p.Flush(ctx)
}
}()
petar marked this conversation as resolved.
Show resolved Hide resolved

cidKey := c.KeyString()

p.lock.Lock()
Expand Down Expand Up @@ -533,7 +553,10 @@ func (p *pinner) CheckIfPinned(ctx context.Context, cids ...cid.Cid) ([]ipfspinn
// Use with care! If used improperly, garbage collection may not
// be successful.
func (p *pinner) RemovePinWithMode(c cid.Cid, mode ipfspinner.Mode) {
mstr, _ := ipfspinner.ModeToString(mode)
log.Infof("unpinning cid %v with mode %v", c.String(), mstr)
ctx := context.TODO()

// Check cache to see if CID is pinned
switch mode {
case ipfspinner.Direct, ipfspinner.Recursive:
Expand Down Expand Up @@ -773,7 +796,16 @@ func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
// to pinning the new one and unpinning the old one.
//
// TODO: This will not work when multiple pins are supported
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error {
func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) (perr error) {
log.Infof("updating from cid %v to cid %v, unpin=%v", from.String(), to.String(), unpin)

// Flush to disk after updating the stores to reduce exposure to dirty state.
defer func() {
if perr == nil {
perr = p.Flush(ctx)
}
}()

p.lock.Lock()
defer p.lock.Unlock()

Expand Down Expand Up @@ -827,6 +859,8 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error

// Flush encodes and writes pinner keysets to the datastore
func (p *pinner) Flush(ctx context.Context) error {
log.Infof("flushing pinner")

p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -849,8 +883,19 @@ func (p *pinner) Flush(ctx context.Context) error {
// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
mstr, _ := ipfspinner.ModeToString(mode)
log.Infof("pin cid %v with mode %v", c.String(), mstr)

ctx := context.TODO()

var perr error
// Flush to disk after pinning to reduce exposure to dirty state.
defer func() {
if perr == nil {
perr = p.Flush(ctx)
}
}()

p.lock.Lock()
defer p.lock.Unlock()

Expand All @@ -868,8 +913,8 @@ func (p *pinner) PinWithMode(c cid.Cid, mode ipfspinner.Mode) {
panic("unrecognized pin mode")
}

_, err := p.addPin(ctx, c, mode, "")
if err != nil {
_, perr = p.addPin(ctx, c, mode, "")
if perr != nil {
return
}
}
Expand Down
13 changes: 13 additions & 0 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,9 @@ func TestLoadDirty(t *testing.T) {
cidRIndex.DeleteKey(ctx, cidAKey)
cidRIndex.Add(ctx, cidBKey, "not-a-pin-id")

// Force dirty, since Pin syncs automatically
p.(*pinner).setDirty(ctx, true)

// Verify dirty
data, err := dstore.Get(dirtyKey)
if err != nil {
Expand Down Expand Up @@ -847,6 +850,10 @@ func makeNodes(count int, dserv ipld.DAGService) []ipld.Node {
}

func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) {
pinNodesSyncEvery(nodes, p, recursive, false)
}

func pinNodesSyncEvery(nodes []ipld.Node, p ipfspin.Pinner, recursive bool, syncEvery bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var err error
Expand All @@ -856,6 +863,12 @@ func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) {
if err != nil {
panic(err)
}
if syncEvery {
err = p.Flush(ctx)
if err != nil {
panic(err)
}
}
}
err = p.Flush(ctx)
if err != nil {
Expand Down
77 changes: 77 additions & 0 deletions dspinner/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package dspinner

import (
"context"
"testing"

bs "github.com/ipfs/go-blockservice"
ds "github.com/ipfs/go-datastore"
bds "github.com/ipfs/go-ds-badger"
lds "github.com/ipfs/go-ds-leveldb"
blockstore "github.com/ipfs/go-ipfs-blockstore"
offline "github.com/ipfs/go-ipfs-exchange-offline"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"
)

func makeStoreLevelDB(dir string) (ds.Datastore, ipld.DAGService) {
ldstore, err := lds.NewDatastore(dir, nil)
if err != nil {
panic(err)
}
// dstore := &batchWrap{ldstore}
dstore := ldstore
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dstore, dserv
}

func makeStoreBadger(dir string) (ds.Datastore, ipld.DAGService) {
bdstore, err := bds.NewDatastore(dir, nil)
if err != nil {
panic(err)
}
dstore := &batchWrap{bdstore}
bstore := blockstore.NewBlockstore(dstore)
bserv := bs.New(bstore, offline.Exchange(bstore))
dserv := mdag.NewDAGService(bserv)
return dstore, dserv
}

func benchSyncEvery(b *testing.B, N int, every bool, dstore ds.Datastore, dserv ipld.DAGService) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pinner, err := New(ctx, dstore, dserv)
if err != nil {
panic(err.Error())
}

nodes := makeNodes(N, dserv)
pinNodesSyncEvery(nodes, pinner, true, every)
}

func BenchmarkSyncOnceBadger(b *testing.B) {
dstoreB1, dservB1 := makeStoreBadger("b-once")
benchSyncEvery(b, b.N, false, dstoreB1, dservB1)
dstoreB1.Close()
}

func BenchmarkSyncEveryBadger(b *testing.B) {
dstoreB2, dservB2 := makeStoreBadger("b-every")
benchSyncEvery(b, b.N, true, dstoreB2, dservB2)
dstoreB2.Close()
}

func BenchmarkSyncOnceLevelDB(b *testing.B) {
dstoreL1, dservL1 := makeStoreLevelDB("l-once")
benchSyncEvery(b, b.N, false, dstoreL1, dservL1)
dstoreL1.Close()
}

func BenchmarkSyncEveryLevelDB(b *testing.B) {
dstoreL2, dservL2 := makeStoreLevelDB("l-every")
benchSyncEvery(b, b.N, true, dstoreL2, dservL2)
dstoreL2.Close()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/ipfs/go-blockservice v0.1.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.6
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
Expand Down
Loading