Skip to content

Commit

Permalink
Merge pull request #27 from bigs/feat/txdatastore
Browse files Browse the repository at this point in the history
Implement new TxDatastore and Txn interfaces
  • Loading branch information
bigs authored Aug 14, 2018
2 parents f293acf + fffc073 commit f3dd075
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 87 deletions.
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.0: QmciWFTxK1t1PNyPdoTsv8wtj3DWim2qRFjaUEcFWCnV8D
1.6.1: QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza
211 changes: 128 additions & 83 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package badger
import (
"fmt"
"strings"
"time"

osh "github.com/Kubuxu/go-os-helper"
badger "github.com/dgraph-io/badger"
Expand All @@ -18,6 +19,12 @@ type Datastore struct {
gcDiscardRatio float64
}

// Implements the datastore.Txn interface, enabling transaction support for
// the badger Datastore.
type txn struct {
txn *badger.Txn
}

// Options are the badger datastore options, reexported here for convenience.
type Options struct {
gcDiscardRatio float64
Expand Down Expand Up @@ -68,24 +75,123 @@ func NewDatastore(path string, options *Options) (*Datastore, error) {
}, nil
}

// NewTransaction starts a new transaction. The resulting transaction object
// can be mutated without incurring changes to the underlying Datastore until
// the transaction is Committed.
func (d *Datastore) NewTransaction(readOnly bool) ds.Txn {
return &txn{d.DB.NewTransaction(!readOnly)}
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
txn := d.DB.NewTransaction(true)
txn := d.NewTransaction(false)
defer txn.Discard()

err := txn.Set(key.Bytes(), value)
if err != nil {
if err := txn.Put(key, value); err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
txn := d.NewTransaction(false).(*txn)
defer txn.Discard()

if err := txn.PutWithTTL(key, value, ttl); err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
txn := d.NewTransaction(false).(*txn)
defer txn.Discard()

if err := txn.SetTTL(key, ttl); err != nil {
return err
}

//TODO: Setting callback may potentially make this faster
return txn.Commit(nil)
return txn.Commit()
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
txn := d.DB.NewTransaction(false)
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Get(key)
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Has(key)
}

func (d *Datastore) Delete(key ds.Key) error {
txn := d.NewTransaction(false)
defer txn.Discard()

item, err := txn.Get(key.Bytes())
err := txn.Delete(key)
if err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Query(q)
}

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
lsm, vlog := d.DB.Size()
return uint64(lsm + vlog), nil
}

func (d *Datastore) Close() error {
return d.DB.Close()
}

func (d *Datastore) IsThreadSafe() {}

func (d *Datastore) Batch() (ds.Batch, error) {
return d.NewTransaction(false), nil
}

func (d *Datastore) CollectGarbage() error {
err := d.DB.RunValueLogGC(d.gcDiscardRatio)
if err == badger.ErrNoRewrite {
err = nil
}
return err
}

func (t *txn) Put(key ds.Key, value []byte) error {
return t.txn.Set(key.Bytes(), value)
}

func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
return t.txn.SetWithTTL(key.Bytes(), value, ttl)
}

func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error {
data, err := t.Get(key)
if err != nil {
return err
}

return t.PutWithTTL(key, data, ttl)
}

func (t *txn) Get(key ds.Key) ([]byte, error) {
item, err := t.txn.Get(key.Bytes())
if err == badger.ErrKeyNotFound {
err = ds.ErrNotFound
}
Expand All @@ -103,42 +209,28 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return out, nil
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
txn := d.DB.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get(key.Bytes())
if err == badger.ErrKeyNotFound {
return false, nil
}
if err != nil {
return false, err
}

return true, nil
}
func (t *txn) Has(key ds.Key) (bool, error) {
_, err := t.Get(key)

func (d *Datastore) Delete(key ds.Key) error {
txn := d.DB.NewTransaction(true)
defer txn.Discard()
err := txn.Delete(key.Bytes())
if err != nil {
return err
if err == nil {
return true, nil
} else if err == ds.ErrNotFound {
return false, nil
}

//TODO: callback may potentially make this faster
return txn.Commit(nil)
return false, err
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.QueryNew(q)
func (t *txn) Delete(key ds.Key) error {
return t.txn.Delete(key.Bytes())
}

func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
prefix := []byte(q.Prefix)
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly

txn := d.DB.NewTransaction(false)
txn := t.txn

it := txn.NewIterator(opt)
it.Seek([]byte(q.Prefix))
Expand All @@ -151,7 +243,6 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
qrb := dsq.NewResultBuilder(q)

qrb.Process.Go(func(worker goprocess.Process) {
defer txn.Discard()
defer it.Close()

for sent := 0; it.ValidForPrefix(prefix); sent++ {
Expand Down Expand Up @@ -204,56 +295,10 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
return qr, nil
}

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
lsm, vlog := d.DB.Size()
return uint64(lsm + vlog), nil
func (t *txn) Commit() error {
return t.txn.Commit(nil)
}

func (d *Datastore) Close() error {
return d.DB.Close()
}

func (d *Datastore) IsThreadSafe() {}

type badgerBatch struct {
db *badger.DB
txn *badger.Txn
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &badgerBatch{
db: d.DB,
txn: d.DB.NewTransaction(true),
}, nil
}

func (b *badgerBatch) Put(key ds.Key, value []byte) error {
err := b.txn.Set(key.Bytes(), value)
if err != nil {
b.txn.Discard()
}
return err
}

func (b *badgerBatch) Delete(key ds.Key) error {
err := b.txn.Delete(key.Bytes())
if err != nil {
b.txn.Discard()
}
return err
}

func (b *badgerBatch) Commit() error {
//TODO: Setting callback may potentially make this faster
return b.txn.Commit(nil)
}

func (d *Datastore) CollectGarbage() error {
err := d.DB.RunValueLogGC(d.gcDiscardRatio)
if err == badger.ErrNoRewrite {
err = nil
}
return err
func (t *txn) Discard() {
t.txn.Discard()
}
Loading

0 comments on commit f3dd075

Please sign in to comment.