From 5d92b4d2bd7614cd37bca2fc4bd381cd39bad336 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 20 Feb 2020 15:34:19 -0800 Subject: [PATCH] prevent closing concurrently with other operations. Add a `sync.RWMutex` protecting the Close operation. Follows the same pattern as in go-ds-badger. Address #29 / https://github.com/ipfs/go-ipfs/issues/6880 --- datastore.go | 43 +++++++++++++++++++++++++++++++++++++------ ds_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/datastore.go b/datastore.go index 6b9974b..2609b41 100644 --- a/datastore.go +++ b/datastore.go @@ -3,6 +3,7 @@ package leveldb import ( "os" "path/filepath" + "sync" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" @@ -52,11 +53,12 @@ func NewDatastore(path string, opts *Options) (*Datastore, error) { return nil, err } - return &Datastore{ - accessor: &accessor{ldb: db, syncWrites: true}, + ds := Datastore{ + accessor: &accessor{ldb: db, syncWrites: true, closeLk: new(sync.RWMutex)}, DB: db, path: path, - }, nil + } + return &ds, nil } // An extraction of the common interface between LevelDB Transactions and the DB itself. @@ -74,9 +76,12 @@ type levelDbOps interface { type accessor struct { ldb levelDbOps syncWrites bool + closeLk *sync.RWMutex } func (a *accessor) Put(key ds.Key, value []byte) (err error) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() return a.ldb.Put(key.Bytes(), value, &opt.WriteOptions{Sync: a.syncWrites}) } @@ -85,6 +90,8 @@ func (a *accessor) Sync(prefix ds.Key) error { } func (a *accessor) Get(key ds.Key) (value []byte, err error) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() val, err := a.ldb.Get(key.Bytes(), nil) if err != nil { if err == leveldb.ErrNotFound { @@ -96,18 +103,24 @@ func (a *accessor) Get(key ds.Key) (value []byte, err error) { } func (a *accessor) Has(key ds.Key) (exists bool, err error) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() return a.ldb.Has(key.Bytes(), nil) } -func (d *accessor) GetSize(key ds.Key) (size int, err error) { - return ds.GetBackedSize(d, key) +func (a *accessor) GetSize(key ds.Key) (size int, err error) { + return ds.GetBackedSize(a, key) } func (a *accessor) Delete(key ds.Key) (err error) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() return a.ldb.Delete(key.Bytes(), &opt.WriteOptions{Sync: a.syncWrites}) } func (a *accessor) Query(q dsq.Query) (dsq.Results, error) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() var rnge *util.Range // make a copy of the query for the fallback naive query implementation. @@ -135,6 +148,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) { } r := dsq.ResultsFromIterator(q, dsq.Iterator{ Next: func() (dsq.Result, bool) { + a.closeLk.RLock() + defer a.closeLk.RUnlock() if !next() { return dsq.Result{}, false } @@ -149,6 +164,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) { return dsq.Result{Entry: e}, true }, Close: func() error { + a.closeLk.RLock() + defer a.closeLk.RUnlock() i.Release() return nil }, @@ -159,6 +176,8 @@ func (a *accessor) Query(q dsq.Query) (dsq.Results, error) { // DiskUsage returns the current disk size used by this levelDB. // For in-mem datastores, it will return 0. func (d *Datastore) DiskUsage() (uint64, error) { + d.closeLk.RLock() + defer d.closeLk.RUnlock() if d.path == "" { // in-mem return 0, nil } @@ -182,12 +201,15 @@ func (d *Datastore) DiskUsage() (uint64, error) { // LevelDB needs to be closed. func (d *Datastore) Close() (err error) { + d.closeLk.Lock() + defer d.closeLk.Unlock() return d.DB.Close() } type leveldbBatch struct { b *leveldb.Batch db *leveldb.DB + closeLk *sync.RWMutex syncWrites bool } @@ -195,6 +217,7 @@ func (d *Datastore) Batch() (ds.Batch, error) { return &leveldbBatch{ b: new(leveldb.Batch), db: d.DB, + closeLk: d.closeLk, syncWrites: d.syncWrites, }, nil } @@ -205,6 +228,8 @@ func (b *leveldbBatch) Put(key ds.Key, value []byte) error { } func (b *leveldbBatch) Commit() error { + b.closeLk.RLock() + defer b.closeLk.RUnlock() return b.db.Write(b.b, &opt.WriteOptions{Sync: b.syncWrites}) } @@ -220,18 +245,24 @@ type transaction struct { } func (t *transaction) Commit() error { + t.closeLk.RLock() + defer t.closeLk.RUnlock() return t.tx.Commit() } func (t *transaction) Discard() { + t.closeLk.RLock() + defer t.closeLk.RUnlock() t.tx.Discard() } func (d *Datastore) NewTransaction(readOnly bool) (ds.Txn, error) { + d.closeLk.RLock() + defer d.closeLk.RUnlock() tx, err := d.DB.OpenTransaction() if err != nil { return nil, err } - accessor := &accessor{ldb: tx, syncWrites: false} + accessor := &accessor{ldb: tx, syncWrites: false, closeLk: d.closeLk} return &transaction{accessor, tx}, nil } diff --git a/ds_test.go b/ds_test.go index 1f642ac..9a57eac 100644 --- a/ds_test.go +++ b/ds_test.go @@ -147,6 +147,44 @@ func TestQueryRespectsProcess(t *testing.T) { addTestCases(t, d, testcases) } +func TestCloseRace(t *testing.T) { + d, close := newDS(t) + for n := 0; n < 100; n++ { + d.Put(ds.NewKey(fmt.Sprintf("%d", n)), []byte(fmt.Sprintf("test%d", n))) + } + + tx, _ := d.NewTransaction(false) + tx.Put(ds.NewKey("txnversion"), []byte("bump")) + + closeCh := make(chan interface{}) + + go func() { + close() + closeCh <- nil + }() + for k := range testcases { + tx.Get(ds.NewKey(k)) + } + tx.Commit() + <-closeCh +} + +func TestCloseSafety(t *testing.T) { + d, close := newDS(t) + addTestCases(t, d, testcases) + + tx, _ := d.NewTransaction(false) + err := tx.Put(ds.NewKey("test"), []byte("test")) + if err != nil { + t.Error("Failed to put in a txn.") + } + close() + err = tx.Commit() + if err == nil { + t.Error("committing after close should fail.") + } +} + func TestQueryRespectsProcessMem(t *testing.T) { d := newDSMem(t) addTestCases(t, d, testcases)