Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Tombstone handling in Iter #21

Merged
merged 3 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions engines/leveldb/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,90 @@ package leveldb

import (
"bytes"
"fmt"

honuiter "github.com/rotationalio/honu/iterator"
pb "github.com/rotationalio/honu/object"
opts "github.com/rotationalio/honu/options"
"github.com/syndtr/goleveldb/leveldb/iterator"
"google.golang.org/protobuf/proto"
)

// NewLevelDBIterator creates a new iterator that wraps a leveldb Iterator with object
// management access and Honu-specific serialization.
func NewLevelDBIterator(iter iterator.Iterator, namespace string) honuiter.Iterator {
return &ldbIterator{ldb: iter, namespace: namespace}
func NewLevelDBIterator(iter iterator.Iterator, options *opts.Options) honuiter.Iterator {
return &ldbIterator{ldb: iter, options: options}
}

// Wraps the underlying leveldb iterator to provide object management access.
type ldbIterator struct {
ldb iterator.Iterator
namespace string
ldb iterator.Iterator
options *opts.Options
}

// Type check for the ldbIterator
var _ honuiter.Iterator = &ldbIterator{}

func (i *ldbIterator) Next() bool { return i.ldb.Next() }
func (i *ldbIterator) Prev() bool { return i.ldb.Prev() }
func (i *ldbIterator) Error() error { return i.ldb.Error() }
func (i *ldbIterator) Release() { i.ldb.Release() }

func (i *ldbIterator) Next() bool {
if ok := i.ldb.Next(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ignoring the error here or does it get captured somewhere else?

return i.Next()
}
}
return true
}

func (i *ldbIterator) Prev() bool {
if ok := i.ldb.Prev(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
return i.Prev()
}
}
return true
}

func (i *ldbIterator) Seek(key []byte) bool {
// NOTE: no need to do tombstone checking in Seek because Next will be called.
// We need to prefix the seek with the correct namespace
key = prepend(i.namespace, key)
if i.options.Namespace != "" {
key = prepend(i.options.Namespace, key)
}
return i.ldb.Seek(key)
}

func (i *ldbIterator) Key() []byte {
// Fetch the key then split the namespace from the key
// Note that because the namespace itself might have colons in it, we
// strip off the namespace prefix then remove any preceding colons.
key := i.ldb.Key()
parts := bytes.SplitN(key, nssep, 2)
if len(parts) == 2 {
return parts[1]
if i.options.Namespace != "" {
prefix := prepend(i.options.Namespace, nil)
return bytes.TrimPrefix(key, prefix)
}
return key
}

func (i *ldbIterator) Value() []byte {
obj, err := i.Object()
if err != nil {
fmt.Println(err)
// NOTE: if err is not nil, it's up to the caller to get the error from Object
return nil
} else {
return obj.Data
}
return obj.Data
}

func (i *ldbIterator) Object() (obj *pb.Object, err error) {
Expand All @@ -65,5 +97,5 @@ func (i *ldbIterator) Object() (obj *pb.Object, err error) {
}

func (i *ldbIterator) Namespace() string {
return i.namespace
return i.options.Namespace
}
2 changes: 1 addition & 1 deletion engines/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (db *LevelDBEngine) Iter(prefix []byte, options *opts.Options) (i iterator.
if len(prefix) > 0 {
slice = util.BytesPrefix(prefix)
}
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options.Namespace), nil
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options), nil
}

var nssep = []byte("::")
Expand Down
60 changes: 31 additions & 29 deletions engines/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package leveldb_test

import (
"fmt"
"io/ioutil"
"os"
"testing"
Expand All @@ -15,12 +16,15 @@ import (
"google.golang.org/protobuf/proto"
)

// a test set of key/value pairs used to evaluate iteration
// note because :: is the namespace separator in leveldb, we want to ensure that keys
// with colons are correctly iterated on.
var pairs = [][]string{
{"aa", "first"},
{"ab", "second"},
{"ba", "third"},
{"bb", "fourth"},
{"bc", "fifth"},
{"b::a", "third"},
{"b::b", "fourth"},
{"b::c", "fifth"},
{"ca", "sixth"},
{"cb", "seventh"},
}
Expand All @@ -44,11 +48,19 @@ func setupLevelDBEngine(t testing.TB) (_ *leveldb.LevelDBEngine, path string) {
os.RemoveAll(tempDir)
}
require.NoError(t, err)

// Add a cleanup function to ensure the fixture is deleted after tests
t.Cleanup(func() {
// Teardown after finishing the test
engine.Close()
os.RemoveAll(tempDir)
fmt.Printf("cleaned up %s\n", tempDir)
bbengfort marked this conversation as resolved.
Show resolved Hide resolved
})

return engine, tempDir
}

// Creates an options.Options struct with namespace set and returns
// a pointer to it.
// Creates an options.Options struct with namespace set and returns a pointer to it.
func namespaceOpts(namespace string, t *testing.T) *options.Options {
opts, err := options.New(options.WithNamespace(namespace))
require.NoError(t, err)
Expand Down Expand Up @@ -78,18 +90,14 @@ func checkDelete(ldbStore engine.Store, opts *options.Options, key []byte, t *te
require.Empty(t, value)
}

func TestLeveldbEngine(t *testing.T) {
func TestLevelDBEngine(t *testing.T) {
// Setup a levelDB Engine.
ldbEngine, ldbPath := setupLevelDBEngine(t)
require.Equal(t, "leveldb", ldbEngine.Engine())

// Ensure the db was created.
require.DirExists(t, ldbPath)

// Teardown after finishing the test.
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()

// Use a constant key to ensure namespaces
// are working correctly.
key := []byte("foo")
Expand All @@ -111,12 +119,8 @@ func TestLeveldbEngine(t *testing.T) {
}
}

func TestLeveldbTransactions(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
func TestLevelDBTransactions(t *testing.T) {
ldbEngine, _ := setupLevelDBEngine(t)

// Use a constant key to ensure namespaces
// are working correctly.
Expand Down Expand Up @@ -155,20 +159,9 @@ func TestLeveldbTransactions(t *testing.T) {
}

func TestLevelDBIter(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
ldbEngine, _ := setupLevelDBEngine(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the cleanup func, it makes the teardown code a lot neater.


for _, namespace := range testNamespaces {
// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
// on :: and grabbing the first string, so it only
// grabs "namespace".
if namespace == "namespace::with::colons" {
continue
}
// Add data to the database to iterate over.
opts := namespaceOpts(namespace, t)

Expand Down Expand Up @@ -223,7 +216,16 @@ func addIterPairsToDB(ldbStore engine.Store, opts *options.Options, pairs [][]st
obj := &pb.Object{
Key: key,
Namespace: opts.Namespace,
Data: value,
Version: &pb.Version{
Pid: 1,
Version: 1,
Region: "testing",
Parent: nil,
Tombstone: false,
},
Region: "testing",
Owner: "testing",
Data: value,
}

data, err := proto.Marshal(obj)
Expand Down
Loading