Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Tombstone handling in Iter #21

Merged
merged 3 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 17 additions & 50 deletions bench_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package honu_test

import (
"fmt"
"io/ioutil"
"math/rand"
"os"
Expand All @@ -22,26 +21,26 @@ var (

func setupLevelDB(t testing.TB) (*leveldb.DB, string) {
// Create a new leveldb database in a temporary directory
tmpDir, err := ioutil.TempDir("", "honuldb-*")
tmpDir, err := ioutil.TempDir("", "leveldb-*")
require.NoError(t, err)

// Open a leveldb database directly without honu wrapper
db, err := leveldb.OpenFile(tmpDir, nil)
require.NoError(t, err)
if err != nil && tmpDir != "" {
fmt.Println(tmpDir)
os.RemoveAll(tmpDir)
}
require.NoError(t, err)

t.Cleanup(func() {
db.Close()
os.RemoveAll(tmpDir)
})

return db, tmpDir
}

func BenchmarkHonuGet(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
// NOTE: defers are evaluated in FIFO order, so this ensures the db is closed first then the directory deleted
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -63,11 +62,7 @@ func BenchmarkHonuGet(b *testing.B) {
}

func BenchmarkLevelDBGet(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -88,11 +83,7 @@ func BenchmarkLevelDBGet(b *testing.B) {
}

func BenchmarkHonuPut(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -110,11 +101,7 @@ func BenchmarkHonuPut(b *testing.B) {
}

func BenchmarkLevelDBPut(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -132,11 +119,7 @@ func BenchmarkLevelDBPut(b *testing.B) {
}

func BenchmarkHonuDelete(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -158,11 +141,7 @@ func BenchmarkHonuDelete(b *testing.B) {
}

func BenchmarkLevelDBDelete(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
key := []byte("foo")
Expand All @@ -183,11 +162,7 @@ func BenchmarkLevelDBDelete(b *testing.B) {
}

func BenchmarkHonuIter(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
for _, key := range []string{"aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh", "ii", "jj"} {
Expand Down Expand Up @@ -219,11 +194,7 @@ func BenchmarkHonuIter(b *testing.B) {
}

func BenchmarkLevelDBIter(b *testing.B) {
db, tmpDir := setupLevelDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupLevelDB(b)

// Create a key and value
for _, key := range []string{"aa", "bb", "cc", "dd", "ee", "ff", "gg", "hh", "ii", "jj"} {
Expand Down Expand Up @@ -253,11 +224,7 @@ func BenchmarkLevelDBIter(b *testing.B) {
}

func BenchmarkHonuObject(b *testing.B) {
db, tmpDir := setupHonuDB(b)

// Cleanup when we're done with the test
defer os.RemoveAll(tmpDir)
defer db.Close()
db, _ := setupHonuDB(b)

// Create a key and value
key := []byte("foo")
Expand Down
62 changes: 47 additions & 15 deletions engines/leveldb/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,90 @@ package leveldb

import (
"bytes"
"fmt"

honuiter "github.com/rotationalio/honu/iterator"
pb "github.com/rotationalio/honu/object"
opts "github.com/rotationalio/honu/options"
"github.com/syndtr/goleveldb/leveldb/iterator"
"google.golang.org/protobuf/proto"
)

// NewLevelDBIterator creates a new iterator that wraps a leveldb Iterator with object
// management access and Honu-specific serialization.
func NewLevelDBIterator(iter iterator.Iterator, namespace string) honuiter.Iterator {
return &ldbIterator{ldb: iter, namespace: namespace}
func NewLevelDBIterator(iter iterator.Iterator, options *opts.Options) honuiter.Iterator {
return &ldbIterator{ldb: iter, options: options}
}

// Wraps the underlying leveldb iterator to provide object management access.
type ldbIterator struct {
ldb iterator.Iterator
namespace string
ldb iterator.Iterator
options *opts.Options
}

// Type check for the ldbIterator
var _ honuiter.Iterator = &ldbIterator{}

func (i *ldbIterator) Next() bool { return i.ldb.Next() }
func (i *ldbIterator) Prev() bool { return i.ldb.Prev() }
func (i *ldbIterator) Error() error { return i.ldb.Error() }
func (i *ldbIterator) Release() { i.ldb.Release() }

func (i *ldbIterator) Next() bool {
if ok := i.ldb.Next(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ignoring the error here or does it get captured somewhere else?

return i.Next()
}
}
return true
}

func (i *ldbIterator) Prev() bool {
if ok := i.ldb.Prev(); !ok {
return false
}

// If we aren't including Tombstones, we need to check if the next version is a
// tombstone before we know if we have a next value or not.
if !i.options.Tombstones {
if obj, err := i.Object(); err != nil || obj.Tombstone() {
return i.Prev()
}
}
return true
}

func (i *ldbIterator) Seek(key []byte) bool {
// NOTE: no need to do tombstone checking in Seek because Next will be called.
// We need to prefix the seek with the correct namespace
key = prepend(i.namespace, key)
if i.options.Namespace != "" {
key = prepend(i.options.Namespace, key)
}
return i.ldb.Seek(key)
}

func (i *ldbIterator) Key() []byte {
// Fetch the key then split the namespace from the key
// Note that because the namespace itself might have colons in it, we
// strip off the namespace prefix then remove any preceding colons.
key := i.ldb.Key()
parts := bytes.SplitN(key, nssep, 2)
if len(parts) == 2 {
return parts[1]
if i.options.Namespace != "" {
prefix := prepend(i.options.Namespace, nil)
return bytes.TrimPrefix(key, prefix)
}
return key
}

func (i *ldbIterator) Value() []byte {
obj, err := i.Object()
if err != nil {
fmt.Println(err)
// NOTE: if err is not nil, it's up to the caller to get the error from Object
return nil
} else {
return obj.Data
}
return obj.Data
}

func (i *ldbIterator) Object() (obj *pb.Object, err error) {
Expand All @@ -65,5 +97,5 @@ func (i *ldbIterator) Object() (obj *pb.Object, err error) {
}

func (i *ldbIterator) Namespace() string {
return i.namespace
return i.options.Namespace
}
2 changes: 1 addition & 1 deletion engines/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (db *LevelDBEngine) Iter(prefix []byte, options *opts.Options) (i iterator.
if len(prefix) > 0 {
slice = util.BytesPrefix(prefix)
}
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options.Namespace), nil
return NewLevelDBIterator(db.ldb.NewIterator(slice, options.LevelDBRead), options), nil
}

var nssep = []byte("::")
Expand Down
58 changes: 29 additions & 29 deletions engines/leveldb/leveldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ import (
"google.golang.org/protobuf/proto"
)

// a test set of key/value pairs used to evaluate iteration
// note because :: is the namespace separator in leveldb, we want to ensure that keys
// with colons are correctly iterated on.
var pairs = [][]string{
{"aa", "first"},
{"ab", "second"},
{"ba", "third"},
{"bb", "fourth"},
{"bc", "fifth"},
{"b::a", "third"},
{"b::b", "fourth"},
{"b::c", "fifth"},
{"ca", "sixth"},
{"cb", "seventh"},
}
Expand All @@ -44,11 +47,18 @@ func setupLevelDBEngine(t testing.TB) (_ *leveldb.LevelDBEngine, path string) {
os.RemoveAll(tempDir)
}
require.NoError(t, err)

// Add a cleanup function to ensure the fixture is deleted after tests
t.Cleanup(func() {
// Teardown after finishing the test
engine.Close()
os.RemoveAll(tempDir)
})

return engine, tempDir
}

// Creates an options.Options struct with namespace set and returns
// a pointer to it.
// Creates an options.Options struct with namespace set and returns a pointer to it.
func namespaceOpts(namespace string, t *testing.T) *options.Options {
opts, err := options.New(options.WithNamespace(namespace))
require.NoError(t, err)
Expand Down Expand Up @@ -78,18 +88,14 @@ func checkDelete(ldbStore engine.Store, opts *options.Options, key []byte, t *te
require.Empty(t, value)
}

func TestLeveldbEngine(t *testing.T) {
func TestLevelDBEngine(t *testing.T) {
// Setup a levelDB Engine.
ldbEngine, ldbPath := setupLevelDBEngine(t)
require.Equal(t, "leveldb", ldbEngine.Engine())

// Ensure the db was created.
require.DirExists(t, ldbPath)

// Teardown after finishing the test.
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()

// Use a constant key to ensure namespaces
// are working correctly.
key := []byte("foo")
Expand All @@ -111,12 +117,8 @@ func TestLeveldbEngine(t *testing.T) {
}
}

func TestLeveldbTransactions(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
func TestLevelDBTransactions(t *testing.T) {
ldbEngine, _ := setupLevelDBEngine(t)

// Use a constant key to ensure namespaces
// are working correctly.
Expand Down Expand Up @@ -155,20 +157,9 @@ func TestLeveldbTransactions(t *testing.T) {
}

func TestLevelDBIter(t *testing.T) {
ldbEngine, ldbPath := setupLevelDBEngine(t)

// Teardown after finishing the test
defer os.RemoveAll(ldbPath)
defer ldbEngine.Close()
ldbEngine, _ := setupLevelDBEngine(t)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the cleanup func, it makes the teardown code a lot neater.


for _, namespace := range testNamespaces {
// TODO: figure out what to do with this testcase.
// Iter currently grabs the namespace by splitting
// on :: and grabbing the first string, so it only
// grabs "namespace".
if namespace == "namespace::with::colons" {
continue
}
// Add data to the database to iterate over.
opts := namespaceOpts(namespace, t)

Expand Down Expand Up @@ -223,7 +214,16 @@ func addIterPairsToDB(ldbStore engine.Store, opts *options.Options, pairs [][]st
obj := &pb.Object{
Key: key,
Namespace: opts.Namespace,
Data: value,
Version: &pb.Version{
Pid: 1,
Version: 1,
Region: "testing",
Parent: nil,
Tombstone: false,
},
Region: "testing",
Owner: "testing",
Data: value,
}

data, err := proto.Marshal(obj)
Expand Down
Loading