From 9f6a39d9d839d973b0b95fc450c33491f116f477 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste PIN Date: Mon, 5 Feb 2018 14:53:29 +0100 Subject: [PATCH 1/4] added support for seek in iterator and IteratorWithRange on storage --- iterator.go | 5 ++++ mock/storage.go | 11 ++++++++ storage/append.go | 4 +++ storage/iterator.go | 4 +++ storage/memory.go | 48 +++++++++++++++++++++++++++++----- storage/multi_iterator.go | 13 +++++++++ storage/multi_iterator_test.go | 15 ++++++++++- storage/null.go | 8 ++++++ storage/storage.go | 23 ++++++++++++++++ storage/storage_test.go | 7 +++++ 10 files changed, 130 insertions(+), 8 deletions(-) diff --git a/iterator.go b/iterator.go index 48023674..84d4fe69 100644 --- a/iterator.go +++ b/iterator.go @@ -9,6 +9,7 @@ type Iterator interface { Key() string Value() (interface{}, error) Release() + Seek(key []byte) bool } type iterator struct { @@ -41,3 +42,7 @@ func (i *iterator) Value() (interface{}, error) { func (i *iterator) Release() { i.iter.Release() } + +func (i *iterator) Seek(key []byte) bool { + return i.iter.Seek(key) +} diff --git a/mock/storage.go b/mock/storage.go index 6866e6fb..00e98956 100644 --- a/mock/storage.go +++ b/mock/storage.go @@ -93,6 +93,17 @@ func (_mr *_MockStorageRecorder) Iterator() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "Iterator") } +func (_m *MockStorage) IteratorWithRange(start, limit []byte) (storage.Iterator, error) { + ret := _m.ctrl.Call(_m, "Iterator") + ret0, _ := ret[0].(storage.Iterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockStorageRecorder) IteratorWithRange() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "IteratorWithRange") +} + func (_m *MockStorage) MarkRecovered() error { ret := _m.ctrl.Call(_m, "MarkRecovered") ret0, _ := ret[0].(error) diff --git a/storage/append.go b/storage/append.go index 967c4a88..d5ae23a1 100644 --- a/storage/append.go +++ b/storage/append.go @@ -75,6 +75,10 @@ func (f *file) Iterator() (Iterator, error) { return new(NullIter), nil } +func (f *file) IteratorWithRange(start, limit []byte) (Iterator, error) { + return new(NullIter), nil +} + func (f *file) Open() error { return nil } diff --git a/storage/iterator.go b/storage/iterator.go index d741cc3b..df185e41 100644 --- a/storage/iterator.go +++ b/storage/iterator.go @@ -43,3 +43,7 @@ func (i *iterator) Release() { i.iter.Release() i.snap.Release() } + +func (i *iterator) Seek(key []byte) bool { + return i.iter.Seek(key) +} diff --git a/storage/memory.go b/storage/memory.go index 1d16f8b9..f60e440b 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -1,6 +1,11 @@ package storage -import "fmt" +import ( + "bytes" + "fmt" + + "github.com/syndtr/goleveldb/leveldb/util" +) type memiter struct { current int @@ -41,13 +46,19 @@ func (i *memiter) Release() { i.current = len(i.keys) } -func (m *memory) Iterator() (Iterator, error) { - keys := make([]string, 0, len(m.storage)) - for k := range m.storage { - keys = append(keys, k) +func (i *memiter) Seek(key []byte) bool { + seek := make(map[string][]byte) + keys := []string{} + for k, v := range i.storage { + if bytes.ContainsAny(key, k) { + keys = append(keys, k) + seek[k] = v + } } - - return &memiter{-1, keys, m.storage}, nil + i.current = -1 + i.storage = seek + i.keys = keys + return !i.exhausted() } type memory struct { @@ -87,6 +98,29 @@ func (m *memory) Delete(key string) error { return nil } +func (m *memory) Iterator() (Iterator, error) { + keys := make([]string, 0, len(m.storage)) + for k := range m.storage { + keys = append(keys, k) + } + + return &memiter{-1, keys, m.storage}, nil +} + +func (m *memory) IteratorWithRange(start, limit []byte) (Iterator, error) { + keys := []string{} // using slice as keys has an unknown size + if len(limit) == 0 { + limit = util.BytesPrefix(start).Limit + } + for k := range m.storage { + if bytes.Compare([]byte(k), start) > -1 && bytes.Compare([]byte(k), limit) < 1 { + keys = append(keys, k) + } + } + + return &memiter{-1, keys, m.storage}, nil +} + func (m *memory) MarkRecovered() error { return nil } diff --git a/storage/multi_iterator.go b/storage/multi_iterator.go index 182002b9..2a72e196 100644 --- a/storage/multi_iterator.go +++ b/storage/multi_iterator.go @@ -39,3 +39,16 @@ func (m *multiIterator) Release() { m.current = 0 m.iters = []Iterator{&NullIter{}} } + +func (m *multiIterator) Seek(key []byte) bool { + m.current = 0 + iters := []Iterator{} + ok := false + for i := range m.iters { + if m.iters[i].Seek(key) { + iters = append(iters, m.iters[i]) + ok = true + } + } + return ok +} diff --git a/storage/multi_iterator_test.go b/storage/multi_iterator_test.go index 85187e56..8bd34299 100644 --- a/storage/multi_iterator_test.go +++ b/storage/multi_iterator_test.go @@ -39,6 +39,19 @@ func TestMultiIterator(t *testing.T) { ensure.DeepEqual(t, expected[string(iter.Key())], string(val)) count++ } - ensure.DeepEqual(t, count, len(expected)) + + k := []byte("storage-0") + iter = NewMultiIterator(iters) + ensure.True(t, iter.Seek(k), "seek return false should return true") + ensure.True(t, iter.Next(), "Iterator should have a value") + ensure.DeepEqual(t, iter.Key(), k, "key mismatch") + + total := 1 + for iter.Next() { + _, err := iter.Value() + ensure.Nil(t, err) + total++ + } + ensure.DeepEqual(t, total, 3, "not enough element found in iter seek") } diff --git a/storage/null.go b/storage/null.go index aae7a971..6c4f800d 100644 --- a/storage/null.go +++ b/storage/null.go @@ -56,6 +56,11 @@ func (n *Null) Iterator() (Iterator, error) { return new(NullIter), nil } +// IteratorWithRange returns an Iterator that is immediately exhausted. +func (n *Null) IteratorWithRange(start, limit []byte) (Iterator, error) { + return new(NullIter), nil +} + // Open does nothing and doesn't error. func (n *Null) Open() error { return nil @@ -86,3 +91,6 @@ func (ni *NullIter) Value() ([]byte, error) { // Release does nothing. func (ni *NullIter) Release() {} + +// Seek do nothing +func (ni *NullIter) Seek(key []byte) bool { return false } diff --git a/storage/storage.go b/storage/storage.go index c9f534b7..7dd270f2 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -26,6 +26,8 @@ type Iterator interface { // Release releases the iterator. After release, the iterator is not usable // anymore. Release() + // Seek for a key in the iterator + Seek(key []byte) bool } // Storage abstracts the interface for a persistent local storage @@ -37,6 +39,7 @@ type Storage interface { SetOffset(value int64) error GetOffset(defValue int64) (int64, error) Iterator() (Iterator, error) + IteratorWithRange(start, limit []byte) (Iterator, error) MarkRecovered() error Recovered() bool Open() error @@ -89,6 +92,26 @@ func (s *storage) Iterator() (Iterator, error) { }, nil } +// Iterator returns an iterator that traverses over a snapshot of the storage. +func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) { + snap, err := s.db.GetSnapshot() + if err != nil { + return nil, err + } + + if limit != nil && len(limit) > 0 { + return &iterator{ + iter: s.store.NewIterator(&util.Range{Start: start, Limit: limit}, nil), + snap: snap, + }, nil + } + return &iterator{ + iter: s.store.NewIterator(util.BytesPrefix(start), nil), + snap: snap, + }, nil + +} + func (s *storage) Has(key string) (bool, error) { return s.store.Has([]byte(key), nil) } diff --git a/storage/storage_test.go b/storage/storage_test.go index 222f8d0c..a41434ef 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -77,6 +77,13 @@ func TestMemIter(t *testing.T) { ensure.True(t, val == nil, "exhausted iterator should return nil value, returned %s", val) ensure.DeepEqual(t, found, kv, "found doesn't match kv, iterator probably didn't return all values") + + k := []byte("key-1") + iter, err = storage.IteratorWithRange(k, nil) + + ensure.True(t, iter.Next(), "next should return true after a IteratorWithRange") + ensure.DeepEqual(t, iter.Key(), k, "the first matching key in IteratorWithRange is not corresponding to the value") + } func TestGetHas(t *testing.T) { From d3201bcb2fae1b51d34e6de8b843cd457da46318 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste PIN Date: Mon, 5 Feb 2018 15:09:32 +0100 Subject: [PATCH 2/4] Added IteratorWithRange on View --- view.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/view.go b/view.go index fa669faf..76e8159c 100644 --- a/view.go +++ b/view.go @@ -271,6 +271,29 @@ func (v *View) Iterator() (Iterator, error) { }, nil } +// IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range. +func (v *View) IteratorWithRange(start, limit []byte) (Iterator, error) { + iters := make([]storage.Iterator, 0, len(v.partitions)) + for i := range v.partitions { + iter, err := v.partitions[i].st.IteratorWithRange(start, limit) + if err != nil { + // release already opened iterators + for i := range iters { + iters[i].Release() + } + + return nil, fmt.Errorf("error opening partition iterator: %v", err) + } + + iters = append(iters, iter) + } + + return &iterator{ + iter: storage.NewMultiIterator(iters), + codec: v.opts.tableCodec, + }, nil +} + // Evict removes the given key only from the local cache. In order to delete a // key from Kafka and other Views, context.Delete should be used on a Processor. func (v *View) Evict(key string) error { From 105829064c1b6f74ff508af7a38911b46dda0d51 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste PIN Date: Mon, 19 Feb 2018 15:50:00 +0100 Subject: [PATCH 3/4] Update iterator.go Change iterator Seek method to take string and not []byte --- iterator.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iterator.go b/iterator.go index 84d4fe69..b49396fc 100644 --- a/iterator.go +++ b/iterator.go @@ -9,7 +9,7 @@ type Iterator interface { Key() string Value() (interface{}, error) Release() - Seek(key []byte) bool + Seek(key string) bool } type iterator struct { @@ -43,6 +43,6 @@ func (i *iterator) Release() { i.iter.Release() } -func (i *iterator) Seek(key []byte) bool { - return i.iter.Seek(key) +func (i *iterator) Seek(key string) bool { + return i.iter.Seek([]byte(key)) } From 0e9e0eb89d88f93946761f71bba8751fcfe67089 Mon Sep 17 00:00:00 2001 From: Jean-Baptiste PIN Date: Mon, 19 Feb 2018 15:57:06 +0100 Subject: [PATCH 4/4] Update view.go something like that --- view.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/view.go b/view.go index 76e8159c..039ca1c2 100644 --- a/view.go +++ b/view.go @@ -272,10 +272,10 @@ func (v *View) Iterator() (Iterator, error) { } // IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range. -func (v *View) IteratorWithRange(start, limit []byte) (Iterator, error) { +func (v *View) IteratorWithRange(start, limit string) (Iterator, error) { iters := make([]storage.Iterator, 0, len(v.partitions)) for i := range v.partitions { - iter, err := v.partitions[i].st.IteratorWithRange(start, limit) + iter, err := v.partitions[i].st.IteratorWithRange([]byte(start), []byte(limit)) if err != nil { // release already opened iterators for i := range iters {