Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement new TxDatastore and Txn interfaces #27

Merged
merged 12 commits into from
Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gx/lastpubver
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.0: QmciWFTxK1t1PNyPdoTsv8wtj3DWim2qRFjaUEcFWCnV8D
1.6.1: QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza
211 changes: 128 additions & 83 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package badger
import (
"fmt"
"strings"
"time"

osh "github.com/Kubuxu/go-os-helper"
badger "github.com/dgraph-io/badger"
Expand All @@ -18,6 +19,12 @@ type Datastore struct {
gcDiscardRatio float64
}

// Implements the datastore.Txn interface, enabling transaction support for
// the badger Datastore.
type txn struct {
txn *badger.Txn
}

// Options are the badger datastore options, reexported here for convenience.
type Options struct {
gcDiscardRatio float64
Expand Down Expand Up @@ -68,24 +75,123 @@ func NewDatastore(path string, options *Options) (*Datastore, error) {
}, nil
}

// NewTransaction starts a new transaction. The resulting transaction object
// can be mutated without incurring changes to the underlying Datastore until
// the transaction is Committed.
func (d *Datastore) NewTransaction(readOnly bool) ds.Txn {
return &txn{d.DB.NewTransaction(!readOnly)}
}

func (d *Datastore) Put(key ds.Key, value []byte) error {
txn := d.DB.NewTransaction(true)
txn := d.NewTransaction(false)
defer txn.Discard()

err := txn.Set(key.Bytes(), value)
if err != nil {
if err := txn.Put(key, value); err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
txn := d.NewTransaction(false).(*txn)
defer txn.Discard()

if err := txn.PutWithTTL(key, value, ttl); err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error {
txn := d.NewTransaction(false).(*txn)
defer txn.Discard()

if err := txn.SetTTL(key, ttl); err != nil {
return err
}

//TODO: Setting callback may potentially make this faster
return txn.Commit(nil)
return txn.Commit()
}

func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
txn := d.DB.NewTransaction(false)
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Get(key)
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Has(key)
}

func (d *Datastore) Delete(key ds.Key) error {
txn := d.NewTransaction(false)
defer txn.Discard()

item, err := txn.Get(key.Bytes())
err := txn.Delete(key)
if err != nil {
return err
}

return txn.Commit()
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
txn := d.NewTransaction(true)
defer txn.Discard()

return txn.Query(q)
}

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
lsm, vlog := d.DB.Size()
return uint64(lsm + vlog), nil
}

func (d *Datastore) Close() error {
return d.DB.Close()
}

func (d *Datastore) IsThreadSafe() {}

func (d *Datastore) Batch() (ds.Batch, error) {
return d.NewTransaction(false), nil
}

func (d *Datastore) CollectGarbage() error {
err := d.DB.RunValueLogGC(d.gcDiscardRatio)
if err == badger.ErrNoRewrite {
err = nil
}
return err
}

func (t *txn) Put(key ds.Key, value []byte) error {
return t.txn.Set(key.Bytes(), value)
}

func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error {
return t.txn.SetWithTTL(key.Bytes(), value, ttl)
}

func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error {
data, err := t.Get(key)
if err != nil {
return err
}

return t.PutWithTTL(key, data, ttl)
}

func (t *txn) Get(key ds.Key) ([]byte, error) {
item, err := t.txn.Get(key.Bytes())
if err == badger.ErrKeyNotFound {
err = ds.ErrNotFound
}
Expand All @@ -103,42 +209,28 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) {
return out, nil
}

func (d *Datastore) Has(key ds.Key) (bool, error) {
txn := d.DB.NewTransaction(false)
defer txn.Discard()
_, err := txn.Get(key.Bytes())
if err == badger.ErrKeyNotFound {
return false, nil
}
if err != nil {
return false, err
}

return true, nil
}
func (t *txn) Has(key ds.Key) (bool, error) {
_, err := t.Get(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there still no way to do that touching vlogs / having to fetch the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you know what i can open a transaction that specifically doesn't pre-load values. i will change that. good catch


func (d *Datastore) Delete(key ds.Key) error {
txn := d.DB.NewTransaction(true)
defer txn.Discard()
err := txn.Delete(key.Bytes())
if err != nil {
return err
if err == nil {
return true, nil
} else if err == ds.ErrNotFound {
return false, nil
}

//TODO: callback may potentially make this faster
return txn.Commit(nil)
return false, err
}

func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
return d.QueryNew(q)
func (t *txn) Delete(key ds.Key) error {
return t.txn.Delete(key.Bytes())
}

func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
func (t *txn) Query(q dsq.Query) (dsq.Results, error) {
prefix := []byte(q.Prefix)
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = !q.KeysOnly

txn := d.DB.NewTransaction(false)
txn := t.txn

it := txn.NewIterator(opt)
it.Seek([]byte(q.Prefix))
Expand All @@ -151,7 +243,6 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
qrb := dsq.NewResultBuilder(q)

qrb.Process.Go(func(worker goprocess.Process) {
defer txn.Discard()
defer it.Close()

for sent := 0; it.ValidForPrefix(prefix); sent++ {
Expand Down Expand Up @@ -204,56 +295,10 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) {
return qr, nil
}

// DiskUsage implements the PersistentDatastore interface.
// It returns the sum of lsm and value log files sizes in bytes.
func (d *Datastore) DiskUsage() (uint64, error) {
lsm, vlog := d.DB.Size()
return uint64(lsm + vlog), nil
func (t *txn) Commit() error {
return t.txn.Commit(nil)
}

func (d *Datastore) Close() error {
return d.DB.Close()
}

func (d *Datastore) IsThreadSafe() {}

type badgerBatch struct {
db *badger.DB
txn *badger.Txn
}

func (d *Datastore) Batch() (ds.Batch, error) {
return &badgerBatch{
db: d.DB,
txn: d.DB.NewTransaction(true),
}, nil
}

func (b *badgerBatch) Put(key ds.Key, value []byte) error {
err := b.txn.Set(key.Bytes(), value)
if err != nil {
b.txn.Discard()
}
return err
}

func (b *badgerBatch) Delete(key ds.Key) error {
err := b.txn.Delete(key.Bytes())
if err != nil {
b.txn.Discard()
}
return err
}

func (b *badgerBatch) Commit() error {
//TODO: Setting callback may potentially make this faster
return b.txn.Commit(nil)
}

func (d *Datastore) CollectGarbage() error {
err := d.DB.RunValueLogGC(d.gcDiscardRatio)
if err == badger.ErrNoRewrite {
err = nil
}
return err
func (t *txn) Discard() {
t.txn.Discard()
}
Loading