diff --git a/.gx/lastpubver b/.gx/lastpubver index d07ed80..f3a4c6b 100644 --- a/.gx/lastpubver +++ b/.gx/lastpubver @@ -1 +1 @@ -1.5.0: QmciWFTxK1t1PNyPdoTsv8wtj3DWim2qRFjaUEcFWCnV8D +1.6.1: QmUCfrikzKVGAfpE31RPwPd32fu1DYxSG7HTGCadba5Wza diff --git a/datastore.go b/datastore.go index 3ad870f..3555da4 100644 --- a/datastore.go +++ b/datastore.go @@ -3,6 +3,7 @@ package badger import ( "fmt" "strings" + "time" osh "github.com/Kubuxu/go-os-helper" badger "github.com/dgraph-io/badger" @@ -18,6 +19,12 @@ type Datastore struct { gcDiscardRatio float64 } +// Implements the datastore.Txn interface, enabling transaction support for +// the badger Datastore. +type txn struct { + txn *badger.Txn +} + // Options are the badger datastore options, reexported here for convenience. type Options struct { gcDiscardRatio float64 @@ -68,24 +75,123 @@ func NewDatastore(path string, options *Options) (*Datastore, error) { }, nil } +// NewTransaction starts a new transaction. The resulting transaction object +// can be mutated without incurring changes to the underlying Datastore until +// the transaction is Committed. +func (d *Datastore) NewTransaction(readOnly bool) ds.Txn { + return &txn{d.DB.NewTransaction(!readOnly)} +} + func (d *Datastore) Put(key ds.Key, value []byte) error { - txn := d.DB.NewTransaction(true) + txn := d.NewTransaction(false) defer txn.Discard() - err := txn.Set(key.Bytes(), value) - if err != nil { + if err := txn.Put(key, value); err != nil { + return err + } + + return txn.Commit() +} + +func (d *Datastore) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error { + txn := d.NewTransaction(false).(*txn) + defer txn.Discard() + + if err := txn.PutWithTTL(key, value, ttl); err != nil { + return err + } + + return txn.Commit() +} + +func (d *Datastore) SetTTL(key ds.Key, ttl time.Duration) error { + txn := d.NewTransaction(false).(*txn) + defer txn.Discard() + + if err := txn.SetTTL(key, ttl); err != nil { return err } - //TODO: Setting callback may potentially make this faster - return txn.Commit(nil) + return txn.Commit() } func (d *Datastore) Get(key ds.Key) (value []byte, err error) { - txn := d.DB.NewTransaction(false) + txn := d.NewTransaction(true) + defer txn.Discard() + + return txn.Get(key) +} + +func (d *Datastore) Has(key ds.Key) (bool, error) { + txn := d.NewTransaction(true) + defer txn.Discard() + + return txn.Has(key) +} + +func (d *Datastore) Delete(key ds.Key) error { + txn := d.NewTransaction(false) defer txn.Discard() - item, err := txn.Get(key.Bytes()) + err := txn.Delete(key) + if err != nil { + return err + } + + return txn.Commit() +} + +func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { + txn := d.NewTransaction(true) + defer txn.Discard() + + return txn.Query(q) +} + +// DiskUsage implements the PersistentDatastore interface. +// It returns the sum of lsm and value log files sizes in bytes. +func (d *Datastore) DiskUsage() (uint64, error) { + lsm, vlog := d.DB.Size() + return uint64(lsm + vlog), nil +} + +func (d *Datastore) Close() error { + return d.DB.Close() +} + +func (d *Datastore) IsThreadSafe() {} + +func (d *Datastore) Batch() (ds.Batch, error) { + return d.NewTransaction(false), nil +} + +func (d *Datastore) CollectGarbage() error { + err := d.DB.RunValueLogGC(d.gcDiscardRatio) + if err == badger.ErrNoRewrite { + err = nil + } + return err +} + +func (t *txn) Put(key ds.Key, value []byte) error { + return t.txn.Set(key.Bytes(), value) +} + +func (t *txn) PutWithTTL(key ds.Key, value []byte, ttl time.Duration) error { + return t.txn.SetWithTTL(key.Bytes(), value, ttl) +} + +func (t *txn) SetTTL(key ds.Key, ttl time.Duration) error { + data, err := t.Get(key) + if err != nil { + return err + } + + return t.PutWithTTL(key, data, ttl) +} + +func (t *txn) Get(key ds.Key) ([]byte, error) { + item, err := t.txn.Get(key.Bytes()) if err == badger.ErrKeyNotFound { err = ds.ErrNotFound } @@ -103,42 +209,28 @@ func (d *Datastore) Get(key ds.Key) (value []byte, err error) { return out, nil } -func (d *Datastore) Has(key ds.Key) (bool, error) { - txn := d.DB.NewTransaction(false) - defer txn.Discard() - _, err := txn.Get(key.Bytes()) - if err == badger.ErrKeyNotFound { - return false, nil - } - if err != nil { - return false, err - } - - return true, nil -} +func (t *txn) Has(key ds.Key) (bool, error) { + _, err := t.Get(key) -func (d *Datastore) Delete(key ds.Key) error { - txn := d.DB.NewTransaction(true) - defer txn.Discard() - err := txn.Delete(key.Bytes()) - if err != nil { - return err + if err == nil { + return true, nil + } else if err == ds.ErrNotFound { + return false, nil } - //TODO: callback may potentially make this faster - return txn.Commit(nil) + return false, err } -func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { - return d.QueryNew(q) +func (t *txn) Delete(key ds.Key) error { + return t.txn.Delete(key.Bytes()) } -func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) { +func (t *txn) Query(q dsq.Query) (dsq.Results, error) { prefix := []byte(q.Prefix) opt := badger.DefaultIteratorOptions opt.PrefetchValues = !q.KeysOnly - txn := d.DB.NewTransaction(false) + txn := t.txn it := txn.NewIterator(opt) it.Seek([]byte(q.Prefix)) @@ -151,7 +243,6 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) { qrb := dsq.NewResultBuilder(q) qrb.Process.Go(func(worker goprocess.Process) { - defer txn.Discard() defer it.Close() for sent := 0; it.ValidForPrefix(prefix); sent++ { @@ -204,56 +295,10 @@ func (d *Datastore) QueryNew(q dsq.Query) (dsq.Results, error) { return qr, nil } -// DiskUsage implements the PersistentDatastore interface. -// It returns the sum of lsm and value log files sizes in bytes. -func (d *Datastore) DiskUsage() (uint64, error) { - lsm, vlog := d.DB.Size() - return uint64(lsm + vlog), nil +func (t *txn) Commit() error { + return t.txn.Commit(nil) } -func (d *Datastore) Close() error { - return d.DB.Close() -} - -func (d *Datastore) IsThreadSafe() {} - -type badgerBatch struct { - db *badger.DB - txn *badger.Txn -} - -func (d *Datastore) Batch() (ds.Batch, error) { - return &badgerBatch{ - db: d.DB, - txn: d.DB.NewTransaction(true), - }, nil -} - -func (b *badgerBatch) Put(key ds.Key, value []byte) error { - err := b.txn.Set(key.Bytes(), value) - if err != nil { - b.txn.Discard() - } - return err -} - -func (b *badgerBatch) Delete(key ds.Key) error { - err := b.txn.Delete(key.Bytes()) - if err != nil { - b.txn.Discard() - } - return err -} - -func (b *badgerBatch) Commit() error { - //TODO: Setting callback may potentially make this faster - return b.txn.Commit(nil) -} - -func (d *Datastore) CollectGarbage() error { - err := d.DB.RunValueLogGC(d.gcDiscardRatio) - if err == badger.ErrNoRewrite { - err = nil - } - return err +func (t *txn) Discard() { + t.txn.Discard() } diff --git a/ds_test.go b/ds_test.go index 0eb400b..f78db82 100644 --- a/ds_test.go +++ b/ds_test.go @@ -8,6 +8,7 @@ import ( "os" "sort" "testing" + "time" ds "github.com/ipfs/go-datastore" dsq "github.com/ipfs/go-datastore/query" @@ -520,3 +521,154 @@ func TestDiskUsage(t *testing.T) { } d.Close() } + +func TestTxnDiscard(t *testing.T) { + d, err := NewDatastore("/tmp/testing_badger_du", nil) + defer os.RemoveAll("/tmp/testing_badger_du") + if err != nil { + t.Fatal(err) + } + + txn := d.NewTransaction(false) + key := ds.NewKey("/test/thing") + if err := txn.Put(key, []byte{1, 2, 3}); err != nil { + t.Fatal(err) + } + txn.Discard() + has, err := d.Has(key) + if err != nil { + t.Fatal(err) + } + if has { + t.Fatal("key written in aborted transaction still exists") + } + + d.Close() +} + +func TestTxnCommit(t *testing.T) { + d, err := NewDatastore("/tmp/testing_badger_du", nil) + defer os.RemoveAll("/tmp/testing_badger_du") + if err != nil { + t.Fatal(err) + } + + txn := d.NewTransaction(false) + key := ds.NewKey("/test/thing") + if err := txn.Put(key, []byte{1, 2, 3}); err != nil { + t.Fatal(err) + } + txn.Commit() + has, err := d.Has(key) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("key written in committed transaction does not exist") + } + + d.Close() +} + +func TestTxnBatch(t *testing.T) { + d, err := NewDatastore("/tmp/testing_badger_du", nil) + defer os.RemoveAll("/tmp/testing_badger_du") + if err != nil { + t.Fatal(err) + } + + txn := d.NewTransaction(false) + + data := make(map[ds.Key][]byte) + for i := 0; i < 10; i++ { + key := ds.NewKey(fmt.Sprintf("/test/%d", i)) + bytes := make([]byte, 16) + _, err := rand.Read(bytes) + if err != nil { + t.Fatal(err) + } + data[key] = bytes + + err = txn.Put(key, bytes) + if err != nil { + t.Fatal(err) + } + } + err = txn.Commit() + if err != nil { + t.Fatal(err) + } + + for key, bytes := range data { + retrieved, err := d.Get(key) + if err != nil { + t.Fatal(err) + } + if len(retrieved) != len(bytes) { + t.Fatal("bytes stored different length from bytes generated") + } + for i, b := range retrieved { + if bytes[i] != b { + t.Fatal("bytes stored different content from bytes generated") + } + } + } + + d.Close() +} + +func TestTTL(t *testing.T) { + d, err := NewDatastore("/tmp/testing_badger_du", nil) + defer os.RemoveAll("/tmp/testing_badger_du") + if err != nil { + t.Fatal(err) + } + + txn := d.NewTransaction(false) + + data := make(map[ds.Key][]byte) + for i := 0; i < 10; i++ { + key := ds.NewKey(fmt.Sprintf("/test/%d", i)) + bytes := make([]byte, 16) + _, err := rand.Read(bytes) + if err != nil { + t.Fatal(err) + } + data[key] = bytes + } + + // write data + for key, bytes := range data { + err = txn.(ds.TTLDatastore).PutWithTTL(key, bytes, time.Second) + if err != nil { + t.Fatal(err) + } + } + err = txn.Commit() + if err != nil { + t.Fatal(err) + } + + txn = d.NewTransaction(true) + for key := range data { + _, err := txn.Get(key) + if err != nil { + t.Fatal(err) + } + } + txn.Discard() + + time.Sleep(time.Second) + + for key := range data { + has, err := d.Has(key) + if err != nil { + t.Fatal(err) + } + if has { + t.Fatal("record with ttl did not expire") + } + } + + d.Close() +} diff --git a/package.json b/package.json index e1e1f45..f6eaa23 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,9 @@ }, { "author": "jbenet", - "hash": "QmVG5gxteQNEMhrS8prJSmU2C9rebtFuTd3SYZ5kE3YZ5k", + "hash": "QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w", "name": "go-datastore", - "version": "3.0.0" + "version": "3.1.0" }, { "author": "dgraph-io", @@ -37,6 +37,6 @@ "license": "", "name": "go-ds-badger", "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", - "version": "1.5.0" + "version": "1.6.1" }