diff --git a/go.mod b/go.mod index 9b6baf4..7c9df02 100644 --- a/go.mod +++ b/go.mod @@ -8,14 +8,14 @@ require ( go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/multierr v1.5.0 - golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c ) require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/kr/pretty v0.2.0 // indirect + github.com/kr/pretty v0.2.1 // indirect github.com/kr/text v0.1.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/atomic v1.6.0 // indirect diff --git a/go.sum b/go.sum index 7db547d..8c5f541 100644 --- a/go.sum +++ b/go.sum @@ -21,8 +21,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0 github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= -github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -61,11 +61,12 @@ golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= diff --git a/keytransform/txndatastore.go b/keytransform/txndatastore.go new file mode 100644 index 0000000..f11d3df --- /dev/null +++ b/keytransform/txndatastore.go @@ -0,0 +1,415 @@ +package keytransform + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" +) + +// WrapTxnDatastore wraps a given datastore with a KeyTransform function. +// The resulting wrapped datastore will use the transform on all TxnDatastore +// operations. +func WrapTxnDatastore(child ds.TxnDatastore, t KeyTransform) *TxnDatastore { + if t == nil { + panic("t (KeyTransform) is nil") + } + + if child == nil { + panic("child (ds.TxnDatastore) is nil") + } + + return &TxnDatastore{child: child, KeyTransform: t} +} + +// TxnDatastore keeps a KeyTransform function +type TxnDatastore struct { + child ds.TxnDatastore + + KeyTransform +} + +var _ ds.Datastore = (*TxnDatastore)(nil) +var _ ds.Batching = (*TxnDatastore)(nil) +var _ ds.Shim = (*TxnDatastore)(nil) +var _ ds.PersistentDatastore = (*TxnDatastore)(nil) +var _ ds.CheckedDatastore = (*TxnDatastore)(nil) +var _ ds.ScrubbedDatastore = (*TxnDatastore)(nil) +var _ ds.GCDatastore = (*TxnDatastore)(nil) +var _ ds.TxnDatastore = (*TxnDatastore)(nil) + +// Children implements ds.Shim +func (d *TxnDatastore) Children() []ds.Datastore { + return []ds.Datastore{d.child} +} + +// Put stores the given value, transforming the key first. +func (d *TxnDatastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { + return d.child.Put(ctx, d.ConvertKey(key), value) +} + +// Sync implements Datastore.Sync +func (d *TxnDatastore) Sync(ctx context.Context, prefix ds.Key) error { + return d.child.Sync(ctx, d.ConvertKey(prefix)) +} + +// Get returns the value for given key, transforming the key first. +func (d *TxnDatastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + return d.child.Get(ctx, d.ConvertKey(key)) +} + +// Has returns whether the datastore has a value for a given key, transforming +// the key first. +func (d *TxnDatastore) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + return d.child.Has(ctx, d.ConvertKey(key)) +} + +// GetSize returns the size of the value named by the given key, transforming +// the key first. +func (d *TxnDatastore) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + return d.child.GetSize(ctx, d.ConvertKey(key)) +} + +// Delete removes the value for given key +func (d *TxnDatastore) Delete(ctx context.Context, key ds.Key) (err error) { + return d.child.Delete(ctx, d.ConvertKey(key)) +} + +// Query implements Query, inverting keys on the way back out. +func (d *TxnDatastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + nq, cq := d.prepareQuery(q) + + cqr, err := d.child.Query(ctx, cq) + if err != nil { + return nil, err + } + + qr := dsq.ResultsFromIterator(q, dsq.Iterator{ + Next: func() (dsq.Result, bool) { + r, ok := cqr.NextSync() + if !ok { + return r, false + } + if r.Error == nil { + r.Entry.Key = d.InvertKey(ds.RawKey(r.Entry.Key)).String() + } + return r, true + }, + Close: func() error { + return cqr.Close() + }, + }) + return dsq.NaiveQueryApply(nq, qr), nil +} + +// Split the query into a child query and a naive query. That way, we can make +// the child datastore do as much work as possible. +func (d *TxnDatastore) prepareQuery(q dsq.Query) (naive, child dsq.Query) { + + // First, put everything in the child query. Then, start taking things + // out. + child = q + + // Always let the child handle the key prefix. + child.Prefix = d.ConvertKey(ds.NewKey(child.Prefix)).String() + + // Check if the key transform is order-preserving so we can use the + // child datastore's built-in ordering. + orderPreserving := false + switch d.KeyTransform.(type) { + case PrefixTransform, *PrefixTransform: + orderPreserving = true + } + + // Try to let the child handle ordering. +orders: + for i, o := range child.Orders { + switch o.(type) { + case dsq.OrderByValue, *dsq.OrderByValue, + dsq.OrderByValueDescending, *dsq.OrderByValueDescending: + // Key doesn't matter. + continue + case dsq.OrderByKey, *dsq.OrderByKey, + dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: + // if the key transform preserves order, we can delegate + // to the child datastore. + if orderPreserving { + // When sorting, we compare with the first + // Order, then, if equal, we compare with the + // second Order, etc. However, keys are _unique_ + // so we'll never apply any additional orders + // after ordering by key. + child.Orders = child.Orders[:i+1] + break orders + } + } + + // Can't handle this order under transform, punt it to a naive + // ordering. + naive.Orders = q.Orders + child.Orders = nil + naive.Offset = q.Offset + child.Offset = 0 + naive.Limit = q.Limit + child.Limit = 0 + break + } + + // Try to let the child handle the filters. + + // don't modify the original filters. + child.Filters = append([]dsq.Filter(nil), child.Filters...) + + for i, f := range child.Filters { + switch f := f.(type) { + case dsq.FilterValueCompare, *dsq.FilterValueCompare: + continue + case dsq.FilterKeyCompare: + child.Filters[i] = dsq.FilterKeyCompare{ + Op: f.Op, + Key: d.ConvertKey(ds.NewKey(f.Key)).String(), + } + continue + case *dsq.FilterKeyCompare: + child.Filters[i] = &dsq.FilterKeyCompare{ + Op: f.Op, + Key: d.ConvertKey(ds.NewKey(f.Key)).String(), + } + continue + case dsq.FilterKeyPrefix: + child.Filters[i] = dsq.FilterKeyPrefix{ + Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(), + } + continue + case *dsq.FilterKeyPrefix: + child.Filters[i] = &dsq.FilterKeyPrefix{ + Prefix: d.ConvertKey(ds.NewKey(f.Prefix)).String(), + } + continue + } + + // Not a known filter, defer to the naive implementation. + naive.Filters = q.Filters + child.Filters = nil + naive.Offset = q.Offset + child.Offset = 0 + naive.Limit = q.Limit + child.Limit = 0 + break + } + return +} + +func (d *TxnDatastore) Close() error { + return d.child.Close() +} + +// DiskUsage implements the PersistentTxnDatastore interface. +func (d *TxnDatastore) DiskUsage(ctx context.Context) (uint64, error) { + return ds.DiskUsage(ctx, d.child) +} + +func (d *TxnDatastore) Batch(ctx context.Context) (ds.Batch, error) { + bds, ok := d.child.(ds.Batching) + if !ok { + return nil, ds.ErrBatchUnsupported + } + + childbatch, err := bds.Batch(ctx) + if err != nil { + return nil, err + } + return &transformBatch{ + dst: childbatch, + f: d.ConvertKey, + }, nil +} + +func (d *TxnDatastore) Check(ctx context.Context) error { + if c, ok := d.child.(ds.CheckedDatastore); ok { + return c.Check(ctx) + } + return nil +} + +func (d *TxnDatastore) Scrub(ctx context.Context) error { + if c, ok := d.child.(ds.ScrubbedDatastore); ok { + return c.Scrub(ctx) + } + return nil +} + +func (d *TxnDatastore) CollectGarbage(ctx context.Context) error { + if c, ok := d.child.(ds.GCDatastore); ok { + return c.CollectGarbage(ctx) + } + return nil +} + +func (d *TxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (ds.Txn, error) { + childTxn, err := d.child.NewTransaction(ctx, readOnly) + if err != nil { + return nil, err + } + return &txnWrapper{child: childTxn, KeyTransform: d.KeyTransform}, nil +} + +type txnWrapper struct { + child ds.Txn + + KeyTransform +} + +var _ ds.Txn = (*txnWrapper)(nil) + +func (t *txnWrapper) Get(ctx context.Context, key ds.Key) (value []byte, err error) { + return t.child.Get(ctx, t.ConvertKey(key)) +} + +func (t *txnWrapper) Has(ctx context.Context, key ds.Key) (exists bool, err error) { + return t.child.Has(ctx, t.ConvertKey(key)) +} + +func (t *txnWrapper) GetSize(ctx context.Context, key ds.Key) (size int, err error) { + return t.child.GetSize(ctx, t.ConvertKey(key)) +} + +func (t *txnWrapper) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + nq, cq := t.prepareQuery(q) + + cqr, err := t.child.Query(ctx, cq) + if err != nil { + return nil, err + } + + qr := dsq.ResultsFromIterator(q, dsq.Iterator{ + Next: func() (dsq.Result, bool) { + r, ok := cqr.NextSync() + if !ok { + return r, false + } + if r.Error == nil { + r.Entry.Key = t.InvertKey(ds.RawKey(r.Entry.Key)).String() + } + return r, true + }, + Close: func() error { + return cqr.Close() + }, + }) + return dsq.NaiveQueryApply(nq, qr), nil +} + +// Split the query into a child query and a naive query. That way, we can make +// the child datastore do as much work as possible. +func (t *txnWrapper) prepareQuery(q dsq.Query) (naive, child dsq.Query) { + + // First, put everything in the child query. Then, start taking things + // out. + child = q + + // Always let the child handle the key prefix. + child.Prefix = t.ConvertKey(ds.NewKey(child.Prefix)).String() + + // Check if the key transform is order-preserving so we can use the + // child datastore's built-in ordering. + orderPreserving := false + switch t.KeyTransform.(type) { + case PrefixTransform, *PrefixTransform: + orderPreserving = true + } + + // Try to let the child handle ordering. +orders: + for i, o := range child.Orders { + switch o.(type) { + case dsq.OrderByValue, *dsq.OrderByValue, + dsq.OrderByValueDescending, *dsq.OrderByValueDescending: + // Key doesn't matter. + continue + case dsq.OrderByKey, *dsq.OrderByKey, + dsq.OrderByKeyDescending, *dsq.OrderByKeyDescending: + // if the key transform preserves order, we can delegate + // to the child datastore. + if orderPreserving { + // When sorting, we compare with the first + // Order, then, if equal, we compare with the + // second Order, etc. However, keys are _unique_ + // so we'll never apply any additional orders + // after ordering by key. + child.Orders = child.Orders[:i+1] + break orders + } + } + + // Can't handle this order under transform, punt it to a naive + // ordering. + naive.Orders = q.Orders + child.Orders = nil + naive.Offset = q.Offset + child.Offset = 0 + naive.Limit = q.Limit + child.Limit = 0 + break + } + + // Try to let the child handle the filters. + + // don't modify the original filters. + child.Filters = append([]dsq.Filter(nil), child.Filters...) + + for i, f := range child.Filters { + switch f := f.(type) { + case dsq.FilterValueCompare, *dsq.FilterValueCompare: + continue + case dsq.FilterKeyCompare: + child.Filters[i] = dsq.FilterKeyCompare{ + Op: f.Op, + Key: t.ConvertKey(ds.NewKey(f.Key)).String(), + } + continue + case *dsq.FilterKeyCompare: + child.Filters[i] = &dsq.FilterKeyCompare{ + Op: f.Op, + Key: t.ConvertKey(ds.NewKey(f.Key)).String(), + } + continue + case dsq.FilterKeyPrefix: + child.Filters[i] = dsq.FilterKeyPrefix{ + Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(), + } + continue + case *dsq.FilterKeyPrefix: + child.Filters[i] = &dsq.FilterKeyPrefix{ + Prefix: t.ConvertKey(ds.NewKey(f.Prefix)).String(), + } + continue + } + + // Not a known filter, defer to the naive implementation. + naive.Filters = q.Filters + child.Filters = nil + naive.Offset = q.Offset + child.Offset = 0 + naive.Limit = q.Limit + child.Limit = 0 + break + } + return +} + +func (t txnWrapper) Put(ctx context.Context, key ds.Key, value []byte) error { + return t.child.Put(ctx, t.ConvertKey(key), value) +} + +func (t txnWrapper) Delete(ctx context.Context, key ds.Key) error { + return t.child.Delete(ctx, t.ConvertKey(key)) +} + +func (t txnWrapper) Commit(ctx context.Context) error { + return t.child.Commit(ctx) +} + +func (t txnWrapper) Discard(ctx context.Context) { + t.child.Discard(ctx) +} diff --git a/keytransform/txndatastore_test.go b/keytransform/txndatastore_test.go new file mode 100644 index 0000000..4fdbc69 --- /dev/null +++ b/keytransform/txndatastore_test.go @@ -0,0 +1,175 @@ +package keytransform_test + +import ( + "bytes" + "context" + "fmt" + "sort" + + . "gopkg.in/check.v1" + + ds "github.com/ipfs/go-datastore" + kt "github.com/ipfs/go-datastore/keytransform" + dsq "github.com/ipfs/go-datastore/query" + dstest "github.com/ipfs/go-datastore/test" +) + +var _ = Suite(&DSSuite{}) + +func (ks *DSSuite) TestWrapTxnDatastoreBasic(c *C) { + ctx := context.Background() + ms := ds.NewMapDatastore() + mpds := dstest.NewTestTxnDatastore(ms, true) + + kt.WrapTxnDatastore(mpds, pair) + ktds := kt.WrapTxnDatastore(mpds, pair) + ktdsTx, err := ktds.NewTransaction(ctx, false) + c.Check(err, Equals, nil) + + keys := strsToKeys([]string{ + "foo", + "foo/bar", + "foo/bar/baz", + "foo/barb", + "foo/bar/bazb", + "foo/bar/baz/barb", + }) + + for _, k := range keys { + err := ktdsTx.Put(ctx, k, []byte(k.String())) + c.Check(err, Equals, nil) + } + + for _, k := range keys { + // underlying mapstore can only see committed results + _, err := ms.Get(ctx, k) + c.Check(err, Equals, ds.ErrNotFound) + + _, err = ms.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, ds.ErrNotFound) + + v1, err := ktdsTx.Get(ctx, k) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v1, []byte(k.String())), Equals, true) + + // underlying TxnDatastore can only see committed results + _, err = mpds.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, ds.ErrNotFound) + } + + run := func(d ds.Read, q dsq.Query) []ds.Key { + r, err := d.Query(ctx, q) + c.Check(err, Equals, nil) + + e, err := r.Rest() + c.Check(err, Equals, nil) + + return ds.EntryKeys(e) + } + + listA := run(mpds, dsq.Query{}) + listB := run(ktdsTx, dsq.Query{}) + if len(listA) == len(listB) { + c.Errorf("TxnDatastore and WrappedTxDatastore should not have equal Query results pre-commit") + } + + if err := ktds.Check(ctx); err != dstest.ErrTest { + c.Errorf("Unexpected Check() error: %s", err) + } + + if err := ktds.CollectGarbage(ctx); err != dstest.ErrTest { + c.Errorf("Unexpected CollectGarbage() error: %s", err) + } + + if err := ktds.Scrub(ctx); err != dstest.ErrTest { + c.Errorf("Unexpected Scrub() error: %s", err) + } + + // Commit wrapped tx and compare + err = ktdsTx.Commit(ctx) + c.Check(err, Equals, nil) + + for _, k := range keys { + // results should be committed to the underlying mapstore + _, err = ms.Get(ctx, k) + c.Check(err, Equals, ds.ErrNotFound) + + v0, err := ms.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v0, []byte(k.String())), Equals, true) + + v1, err := ktdsTx.Get(ctx, k) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v1, []byte(k.String())), Equals, true) + + // results should be committed to the wrapped TxnDatastore + v2, err := mpds.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v2, []byte(k.String())), Equals, true) + } + + listA = run(mpds, dsq.Query{}) + listB = run(ktdsTx, dsq.Query{}) + listC := run(ms, dsq.Query{}) + c.Check(len(listA), Equals, len(listB)) + c.Check(len(listA), Equals, len(listC)) + + // sort them cause yeah. + sort.Sort(ds.KeySlice(listA)) + sort.Sort(ds.KeySlice(listB)) + sort.Sort(ds.KeySlice(listC)) + + for i, kA := range listA { + kB := listB[i] + kC := listC[i] + c.Check(pair.Invert(kA), Equals, kB) + c.Check(kA, Equals, pair.Convert(kB)) + c.Check(kC, Equals, kA) + } + + c.Log("listA: ", listA) + c.Log("listB: ", listB) + c.Log("listC: ", listC) + + // Create a new tx and add some uncommitted values to + ktdsTx, err = ktds.NewTransaction(ctx, false) + c.Check(err, Equals, nil) + + unCommittedKeys := strsToKeys([]string{ + "foo", + "foo/bar", + "foo/bar/baz", + }) + unCommittedKeysMap := make(map[ds.Key][]byte) + for i, k := range unCommittedKeys { + unCommittedKeysMap[k] = []byte(fmt.Sprintf("overwrite value %d", i)) + } + for k, val := range unCommittedKeysMap { + err := ktdsTx.Put(ctx, k, val) + c.Check(err, Equals, nil) + } + + for _, k := range keys { + // underlying mapstore will have only the committed results + _, err = ms.Get(ctx, k) + c.Check(err, Equals, ds.ErrNotFound) + + v0, err := ms.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v0, []byte(k.String())), Equals, true) + + // tx will return a mixture of the pending results and committed results + v1, err := ktdsTx.Get(ctx, k) + c.Check(err, Equals, nil) + if val, ok := unCommittedKeysMap[k]; ok { + c.Check(bytes.Equal(v1, val), Equals, true) + } else { + c.Check(bytes.Equal(v1, []byte(k.String())), Equals, true) + } + + // underlying TxnDatastore will have only the committed results + v2, err := mpds.Get(ctx, ds.NewKey("abc").Child(k)) + c.Check(err, Equals, nil) + c.Check(bytes.Equal(v2, []byte(k.String())), Equals, true) + } +} diff --git a/namespace/namespace.go b/namespace/namespace.go index 1913fb7..a528047 100644 --- a/namespace/namespace.go +++ b/namespace/namespace.go @@ -24,3 +24,11 @@ func Wrap(child ds.Datastore, prefix ds.Key) *ktds.Datastore { return ktds.Wrap(child, PrefixTransform(prefix)) } + +func WrapTxnDatastore(child ds.TxnDatastore, prefix ds.Key) *ktds.TxnDatastore { + if child == nil { + panic("child (ds.TxnDatastore) is nil") + } + + return ktds.WrapTxnDatastore(child, PrefixTransform(prefix)) +} diff --git a/test/test_util.go b/test/test_util.go index 7ac91cc..be7eb8f 100644 --- a/test/test_util.go +++ b/test/test_util.go @@ -9,6 +9,7 @@ import ( "testing" dstore "github.com/ipfs/go-datastore" + dsq "github.com/ipfs/go-datastore/query" ) var ErrTest = errors.New("test error") @@ -187,3 +188,165 @@ func (d *testDatastore) CollectGarbage(_ context.Context) error { } return nil } + +var _ dstore.TxnDatastore = (*testTxnDatastore)(nil) + +type testTxnDatastore struct { + testErrors bool + + *dstore.MapDatastore +} + +func NewTestTxnDatastore(ms *dstore.MapDatastore, testErrors bool) *testTxnDatastore { + if ms == nil { + ms = dstore.NewMapDatastore() + } + return &testTxnDatastore{ + testErrors: testErrors, + MapDatastore: ms, + } +} + +func (t *testTxnDatastore) Check(_ context.Context) error { + if t.testErrors { + return ErrTest + } + return nil +} + +func (t *testTxnDatastore) Scrub(_ context.Context) error { + if t.testErrors { + return ErrTest + } + return nil +} + +func (t *testTxnDatastore) CollectGarbage(_ context.Context) error { + if t.testErrors { + return ErrTest + } + return nil +} + +func (t *testTxnDatastore) NewTransaction(ctx context.Context, readOnly bool) (dstore.Txn, error) { + return newTestTx(t.testErrors, t.MapDatastore), nil +} + +var _ dstore.Txn = (*testTxn)(nil) + +type testTxn struct { + dirty map[dstore.Key][]byte + committed *dstore.MapDatastore +} + +func newTestTx(testTxErrors bool, committed *dstore.MapDatastore) *testTxn { + return &testTxn{ + dirty: make(map[dstore.Key][]byte), + committed: committed, + } +} + +// It is unclear from the dstore.Txn interface definition whether reads should happen from the dirty or committed or both +// It says that operations will not be applied until Commit() is called, but this doesn't really make sense for the Read +// operations as their interface is not designed for returning results asynchronously (except Query). +// For this test datastore, we simply Read from both dirty and committed entries with dirty values overshadowing committed values. + +// NOTE: looking at go-ds-badger2, it looks like Get, Has, and GetSize only read from the dirty (uncommitted badger txn), +// whereas Query considers both the dirty transaction and the underlying committed datastore. + +func (t *testTxn) Get(ctx context.Context, key dstore.Key) ([]byte, error) { + if val, ok := t.dirty[key]; ok { + return val, nil + } + return t.committed.Get(ctx, key) +} + +func (t *testTxn) Has(ctx context.Context, key dstore.Key) (bool, error) { + if _, ok := t.dirty[key]; ok { + return true, nil + } + + return t.committed.Has(ctx, key) +} + +func (t *testTxn) GetSize(ctx context.Context, key dstore.Key) (int, error) { + if val, ok := t.dirty[key]; ok { + return len(val), nil + } + + return t.committed.GetSize(ctx, key) +} + +func (t *testTxn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { + // not entirely sure if Query is *supposed* to access both uncommitted and committed data, but if so I think this + // is the simplest way of handling it and the overhead should be fine for testing purposes + transientStore := dstore.NewMapDatastore() + transientBatch, err := transientStore.Batch(ctx) + if err != nil { + return nil, err + } + + // move committed results into the transientStore + committedResults, err := t.committed.Query(ctx, q) + if err != nil { + return nil, err + } + defer func() { + committedResults.Close() + }() + + for { + res, ok := committedResults.NextSync() + if !ok { + break + } + if res.Error != nil { + return nil, res.Error + } + key := dstore.RawKey(res.Key) + if err := transientBatch.Put(ctx, key, res.Value); err != nil { + return nil, err + } + } + // overwrite transientStore with the dirty results so we can query the union of them + for k, v := range t.dirty { + if err := transientBatch.Put(ctx, k, v); err != nil { + return nil, err + } + } + + // commit the transientStore batch + if err := transientBatch.Commit(ctx); err != nil { + return nil, err + } + + // apply the query to the transient store, return its results + return transientStore.Query(ctx, q) +} + +func (t *testTxn) Put(ctx context.Context, key dstore.Key, value []byte) error { + t.dirty[key] = value + return nil +} + +func (t *testTxn) Delete(ctx context.Context, key dstore.Key) error { + delete(t.dirty, key) + return t.committed.Delete(ctx, key) +} + +func (t *testTxn) Commit(ctx context.Context) error { + batch, err := t.committed.Batch(ctx) + if err != nil { + return err + } + for k, v := range t.dirty { + if err := batch.Put(ctx, k, v); err != nil { + return err + } + } + return batch.Commit(ctx) +} + +func (t *testTxn) Discard(ctx context.Context) { + t.dirty = make(map[dstore.Key][]byte) +}