Skip to content

Commit

Permalink
Merge pull request #17 from raulk/txn
Browse files Browse the repository at this point in the history
Add transactional support to leveldb datastore.
  • Loading branch information
raulk committed Sep 27, 2018
2 parents b71f76e + bdf88d6 commit 13c8c18
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.2: QmPnXsHj9W8WpDDwj2iogRcnVL6d5ANtK9SAJLgKpeBMq8
1.2.0: QmcxDvw8NnJsfdEcfrypwHkLeVxZY2rT8iiWsUuBnw93gb
88 changes: 64 additions & 24 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,21 @@ import (
"github.com/jbenet/goprocess"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)

type datastore struct {
*accessor
DB *leveldb.DB
path string
}

var _ ds.Datastore = (*datastore)(nil)
var _ ds.TxnDatastore = (*datastore)(nil)

// Options is an alias of syndtr/goleveldb/opt.Options which might be extended
// in the future.
type Options opt.Options
Expand Down Expand Up @@ -49,21 +54,34 @@ func NewDatastore(path string, opts *Options) (*datastore, error) {
}

return &datastore{
DB: db,
path: path,
accessor: &accessor{ldb: db},
DB: db,
path: path,
}, nil
}

// Returns ErrInvalidType if value is not of type []byte.
// An extraction of the common interface between LevelDB Transactions and the DB itself.
//
// Note: using sync = false.
// see http://godoc.org/github.com/syndtr/goleveldb/leveldb/opt#WriteOptions
func (d *datastore) Put(key ds.Key, value []byte) (err error) {
return d.DB.Put(key.Bytes(), value, nil)
// It allows to plug in either inside the `accessor`.
type levelDbOps interface {
Put(key, value []byte, wo *opt.WriteOptions) error
Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
Delete(key []byte, wo *opt.WriteOptions) error
NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator
}

// Datastore operations using either the DB or a transaction as the backend.
type accessor struct {
ldb levelDbOps
}

func (a *accessor) Put(key ds.Key, value []byte) (err error) {
return a.ldb.Put(key.Bytes(), value, nil)
}

func (d *datastore) Get(key ds.Key) (value []byte, err error) {
val, err := d.DB.Get(key.Bytes(), nil)
func (a *accessor) Get(key ds.Key) (value []byte, err error) {
val, err := a.ldb.Get(key.Bytes(), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ds.ErrNotFound
Expand All @@ -73,40 +91,40 @@ func (d *datastore) Get(key ds.Key) (value []byte, err error) {
return val, nil
}

func (d *datastore) Has(key ds.Key) (exists bool, err error) {
return d.DB.Has(key.Bytes(), nil)
func (a *accessor) Has(key ds.Key) (exists bool, err error) {
return a.ldb.Has(key.Bytes(), nil)
}

func (d *datastore) Delete(key ds.Key) (err error) {
func (a *accessor) Delete(key ds.Key) (err error) {
// leveldb Delete will not return an error if the key doesn't
// exist (see https://github.com/syndtr/goleveldb/issues/109),
// so check that the key exists first and if not return an
// error
exists, err := d.DB.Has(key.Bytes(), nil)
exists, err := a.ldb.Has(key.Bytes(), nil)
if !exists {
return ds.ErrNotFound
} else if err != nil {
return err
}
return d.DB.Delete(key.Bytes(), nil)
return a.ldb.Delete(key.Bytes(), nil)
}

func (d *datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.QueryNew(q)
func (a *accessor) Query(q dsq.Query) (dsq.Results, error) {
return a.queryNew(q)
}

func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
func (a *accessor) queryNew(q dsq.Query) (dsq.Results, error) {
if len(q.Filters) > 0 ||
len(q.Orders) > 0 ||
q.Limit > 0 ||
q.Offset > 0 {
return d.QueryOrig(q)
return a.queryOrig(q)
}
var rnge *util.Range
if q.Prefix != "" {
rnge = util.BytesPrefix([]byte(q.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
i := a.ldb.NewIterator(rnge, nil)
return dsq.ResultsFromIterator(q, dsq.Iterator{
Next: func() (dsq.Result, bool) {
ok := i.Next()
Expand All @@ -130,7 +148,7 @@ func (d *datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
}), nil
}

func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
func (a *accessor) queryOrig(q dsq.Query) (dsq.Results, error) {
// we can use multiple iterators concurrently. see:
// https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
// advance the iterator only if the reader reads
Expand All @@ -140,7 +158,7 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
// that resources should be reclaimed.
qrb := dsq.NewResultBuilder(q)
qrb.Process.Go(func(worker goprocess.Process) {
d.runQuery(worker, qrb)
a.runQuery(worker, qrb)
})

// go wait on the worker (without signaling close)
Expand All @@ -157,13 +175,12 @@ func (d *datastore) QueryOrig(q dsq.Query) (dsq.Results, error) {
return qr, nil
}

func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {

func (a *accessor) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
var rnge *util.Range
if qrb.Query.Prefix != "" {
rnge = util.BytesPrefix([]byte(qrb.Query.Prefix))
}
i := d.DB.NewIterator(rnge, nil)
i := a.ldb.NewIterator(rnge, nil)
defer i.Release()

// advance iterator for offset
Expand Down Expand Up @@ -261,3 +278,26 @@ func (b *leveldbBatch) Delete(key ds.Key) error {
b.b.Delete(key.Bytes())
return nil
}

// A leveldb transaction embedding the accessor backed by the transaction.
type transaction struct {
*accessor
tx *leveldb.Transaction
}

func (t *transaction) Commit() error {
return t.tx.Commit()
}

func (t *transaction) Discard() {
t.tx.Discard()
}

func (d *datastore) NewTransaction(readOnly bool) (ds.Txn, error) {
tx, err := d.DB.OpenTransaction()
if err != nil {
return nil, err
}
accessor := &accessor{tx}
return &transaction{accessor, tx}, nil
}
97 changes: 97 additions & 0 deletions ds_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package leveldb

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -223,3 +225,98 @@ func TestDiskUsageInMem(t *testing.T) {
t.Fatal("inmem dbs have 0 disk usage")
}
}

func TestTransactionCommit(t *testing.T) {
key := ds.NewKey("/test/key1")

d, done := newDS(t)
defer done()

txn, err := d.NewTransaction(false)
if err != nil {
t.Fatal(err)
}
defer txn.Discard()

if err := txn.Put(key, []byte("hello")); err != nil {
t.Fatal(err)
}
if val, err := d.Get(key); err != ds.ErrNotFound {
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
}
if err := txn.Commit(); err != nil {
t.Fatal(err)
}
if val, err := d.Get(key); err != nil || !bytes.Equal(val, []byte("hello")) {
t.Fatalf("expected entry present after commit, got err: %v, value: %v", err, val)
}
}

func TestTransactionDiscard(t *testing.T) {
key := ds.NewKey("/test/key1")

d, done := newDS(t)
defer done()

txn, err := d.NewTransaction(false)
if err != nil {
t.Fatal(err)
}
defer txn.Discard()

if err := txn.Put(key, []byte("hello")); err != nil {
t.Fatal(err)
}
if val, err := d.Get(key); err != ds.ErrNotFound {
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
}
if txn.Discard(); err != nil {
t.Fatal(err)
}
if val, err := d.Get(key); err != ds.ErrNotFound {
t.Fatalf("expected ErrNotFound, got err: %v, value: %v", err, val)
}
}

func TestTransactionManyOperations(t *testing.T) {
keys := []ds.Key{ds.NewKey("/test/key1"), ds.NewKey("/test/key2"), ds.NewKey("/test/key3"), ds.NewKey("/test/key4"), ds.NewKey("/test/key5")}

d, done := newDS(t)
defer done()

txn, err := d.NewTransaction(false)
if err != nil {
t.Fatal(err)
}
defer txn.Discard()

// Insert all entries.
for i := 0; i < 5; i++ {
if err := txn.Put(keys[i], []byte(fmt.Sprintf("hello%d", i))); err != nil {
t.Fatal(err)
}
}

// Remove the third entry.
if err := txn.Delete(keys[2]); err != nil {
t.Fatal(err)
}

// Check existences.
if has, err := txn.Has(keys[1]); err != nil || !has {
t.Fatalf("expected key[1] to be present, err: %v, has: %v", err, has)
}
if has, err := txn.Has(keys[2]); err != nil || has {
t.Fatalf("expected key[2] to be absent, err: %v, has: %v", err, has)
}

var res dsq.Results
if res, err = txn.Query(dsq.Query{Prefix: "/test"}); err != nil {
t.Fatalf("query failed, err: %v", err)
}
if entries, err := res.Rest(); err != nil || len(entries) != 4 {
t.Fatalf("query failed or contained unexpected number of entries, err: %v, results: %v", err, entries)
}

txn.Discard()
}
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
},
{
"author": "jbenet",
"hash": "QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV",
"hash": "QmbQshXLNcCPRUGZv4sBGxnZNAHREA6MKeomkwihNXPZWP",
"name": "go-datastore",
"version": "3.2.0"
"version": "3.3.0"
}
],
"gxVersion": "0.8.0",
"language": "go",
"license": "",
"name": "go-ds-leveldb",
"releaseCmd": "git commit -a -m \"gx publish $VERSION\"",
"version": "1.1.2"
"version": "1.2.0"
}

0 comments on commit 13c8c18

Please sign in to comment.