Skip to content

Commit

Permalink
feat: Add retriving log entries
Browse files Browse the repository at this point in the history
Implement get log entries for log entry store. Initially it is only used to test storing log entries.

Closes #1256

Signed-off-by: Sandra Vrtikapa <sandra.vrtikapa@securekey.com>
  • Loading branch information
sandrask committed Apr 22, 2022
1 parent 1b9365a commit a4fe2d9
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 11 deletions.
108 changes: 97 additions & 11 deletions pkg/store/logentry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package logentry

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -25,25 +26,23 @@ const (

logTagName = "Log"
indexTagName = "Index"

defaultPageSize = 500
)

var logger = log.New("log-entry-store")

// New creates db implementation of log entries.
func New(provider storage.Provider) (*Store, error) {
store, err := provider.OpenStore(nameSpace)
if err != nil {
return nil, fmt.Errorf("failed to open log entry store: %w", err)
}
// ErrDataNotFound is returned when data is not found.
var ErrDataNotFound = errors.New("data not found")

return &Store{
store: store,
}, nil
}
// Option is an option for log entry store.
type Option func(opts *Store)

// Store is db implementation of log entry store.
type Store struct {
store storage.Store

pageSize int
}

// LogEntry consists of index with log and leaf entry.
Expand All @@ -52,6 +51,35 @@ type LogEntry struct {
LeafEntry command.LeafEntry
}

// EntryIterator defines the query results iterator for log entry queries.
type EntryIterator interface {
// TotalItems returns the total number of items as a result of the query.
TotalItems() (int, error)
// Next returns the next log entry or an ErrNotFound error if there are no more items.
Next() (*command.LeafEntry, error)
// Close closes the iterator.
Close() error
}

// New creates db implementation of log entries.
func New(provider storage.Provider, opts ...Option) (*Store, error) {
store, err := provider.OpenStore(nameSpace)
if err != nil {
return nil, fmt.Errorf("failed to open log entry store: %w", err)
}

logEntryStore := &Store{
pageSize: defaultPageSize,
store: store,
}

for _, opt := range opts {
opt(logEntryStore)
}

return logEntryStore, nil
}

// StoreLogEntries stores log entries.
func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []command.LeafEntry) error {
if len(entries) == 0 {
Expand Down Expand Up @@ -88,7 +116,7 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm

logTag := storage.Tag{
Name: logTagName,
Value: logURL,
Value: base64.RawURLEncoding.EncodeToString([]byte(logURL)),
}

op := storage.Operation{
Expand All @@ -109,3 +137,61 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm

return nil
}

// GetLogEntries retrieves log entries.
func (s *Store) GetLogEntries(logURL string) (EntryIterator, error) {
if logURL == "" {
return nil, errors.New("missing log URL")
}

query := fmt.Sprintf("%s:%s", logTagName, base64.RawURLEncoding.EncodeToString([]byte(logURL)))

iterator, err := s.store.Query(query,
storage.WithSortOrder(&storage.SortOptions{
Order: storage.SortAscending,
TagName: indexTagName,
}),
storage.WithPageSize(s.pageSize))
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("failed to query log entry store: %w", err))
}

return &entryIterator{ariesIterator: iterator}, nil
}

type entryIterator struct {
ariesIterator storage.Iterator
}

func (e *entryIterator) TotalItems() (int, error) {
return e.ariesIterator.TotalItems()
}

func (e *entryIterator) Next() (*command.LeafEntry, error) {
exists, err := e.ariesIterator.Next()
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("failed to determine if there are more results: %w", err))
}

if exists {
entryBytes, err := e.ariesIterator.Value()
if err != nil {
return nil, orberrors.NewTransient(fmt.Errorf("failed to get value: %w", err))
}

var entry LogEntry

err = json.Unmarshal(entryBytes, &entry)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal entry bytes: %w", err)
}

return &entry.LeafEntry, nil
}

return nil, ErrDataNotFound
}

func (e *entryIterator) Close() error {
return e.ariesIterator.Close()
}
137 changes: 137 additions & 0 deletions pkg/store/logentry/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ SPDX-License-Identifier: Apache-2.0
package logentry

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/hyperledger/aries-framework-go-ext/component/storage/mongodb"
"github.com/hyperledger/aries-framework-go/component/storageutil/mem"
"github.com/hyperledger/aries-framework-go/component/storageutil/mock"
"github.com/stretchr/testify/require"
"github.com/trustbloc/vct/pkg/controller/command"

"github.com/trustbloc/orb/pkg/internal/testutil/mongodbtestutil"
"github.com/trustbloc/orb/pkg/store/mocks"
)

Expand Down Expand Up @@ -138,3 +143,135 @@ func TestStore_StoreLogEntries(t *testing.T) {
require.Contains(t, err.Error(), "batch error")
})
}

func TestStore_GetLogEntries(t *testing.T) {
t.Run("success - one entry", func(t *testing.T) {
mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t)
defer stopMongo()

mongoDBProvider, err := mongodb.NewProvider(mongoDBConnString)
require.NoError(t, err)

s, err := New(mongoDBProvider)
require.NoError(t, err)

testLeafInput := []byte("leafInput")

entries := []command.LeafEntry{{
LeafInput: testLeafInput,
}}

err = s.StoreLogEntries(logURL, 0, 0, entries)
require.NoError(t, err)

iter, err := s.GetLogEntries(logURL)
require.NoError(t, err)

n, err := iter.TotalItems()
require.NoError(t, err)
require.Equal(t, 1, n)

entry, err := iter.Next()
require.NoError(t, err)
require.True(t, bytes.Equal(testLeafInput, entry.LeafInput))

err = iter.Close()
require.NoError(t, err)
})

t.Run("success - multiple entries", func(t *testing.T) {
mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t)
defer stopMongo()

mongoDBProvider, err := mongodb.NewProvider(mongoDBConnString)
require.NoError(t, err)

s, err := New(mongoDBProvider)
require.NoError(t, err)

test0 := []byte("leafInput-0")
test1 := []byte("leafInput-1")

entries := []command.LeafEntry{
{
LeafInput: test0,
},
{
LeafInput: test1,
},
}

err = s.StoreLogEntries(logURL, 0, 1, entries)
require.NoError(t, err)

time.Sleep(time.Second)

iter, err := s.GetLogEntries(logURL)
require.NoError(t, err)

n, err := iter.TotalItems()
require.NoError(t, err)
require.Equal(t, 2, n)

entry, err := iter.Next()
require.NoError(t, err)
require.True(t, bytes.Equal(test0, entry.LeafInput))

entry, err = iter.Next()
require.NoError(t, err)
require.True(t, bytes.Equal(test1, entry.LeafInput))
})

t.Run("error - no entries", func(t *testing.T) {
mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t)
defer stopMongo()

mongoDBProvider, err := mongodb.NewProvider(mongoDBConnString)
require.NoError(t, err)

s, err := New(mongoDBProvider)
require.NoError(t, err)

iter, err := s.GetLogEntries(logURL)
require.NoError(t, err)

n, err := iter.TotalItems()
require.NoError(t, err)
require.Equal(t, 0, n)

entry, err := iter.Next()
require.Error(t, err)
require.Nil(t, entry)
require.Contains(t, err.Error(), "data not found")
})

t.Run("error - empty log URL", func(t *testing.T) {
provider := mem.NewProvider()

s, err := New(provider)
require.NoError(t, err)

iter, err := s.GetLogEntries("")
require.Error(t, err)
require.Nil(t, iter)
require.Contains(t, err.Error(), "missing log URL")
})
}

func TestEntryIterator(t *testing.T) {
t.Run("error - next fails", func(t *testing.T) {
iterator := entryIterator{ariesIterator: &mock.Iterator{ErrNext: fmt.Errorf("next error")}}

entry, err := iterator.Next()
require.EqualError(t, err, "failed to determine if there are more results: next error")
require.Nil(t, entry)

iterator = entryIterator{ariesIterator: &mock.Iterator{
NextReturn: true, ErrValue: fmt.Errorf("value error"),
}}

entry, err = iterator.Next()
require.EqualError(t, err, "failed to get value: value error")
require.Nil(t, entry)
})
}

0 comments on commit a4fe2d9

Please sign in to comment.