Skip to content

Commit

Permalink
feat: Optimized stores for VCT monitoring
Browse files Browse the repository at this point in the history
The DB, monitoring, was renamed to proof-monitor and the following stores were converted to use the optimized storage interface:

- log-monitor
- proof-monitor
- log-entry

closes #1326

Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed Jun 10, 2022
1 parent 882cac7 commit 517f82b
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 113 deletions.
49 changes: 31 additions & 18 deletions pkg/store/logentry/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ import (
"github.com/trustbloc/vct/pkg/controller/command"

orberrors "github.com/trustbloc/orb/pkg/errors"
"github.com/trustbloc/orb/pkg/store"
)

const (
nameSpace = "log-entry"

logTagName = "Log"
indexTagName = "Index"
statusTagName = "Status"
logTagName = "logUrl"
indexTagName = "index"
statusTagName = "status"

defaultPageSize = 500
)
Expand Down Expand Up @@ -60,8 +61,10 @@ type Store struct {

// LogEntry consists of index with log and leaf entry.
type LogEntry struct {
Index int
LeafEntry command.LeafEntry
Index int `json:"index"`
LeafEntry command.LeafEntry `json:"leafEntry"`
LogURL string `json:"logUrl"`
Status EntryStatus `json:"status"`
}

// EntryIterator defines the query results iterator for log entry queries.
Expand All @@ -76,21 +79,16 @@ type EntryIterator interface {

// New creates db implementation of log entries.
func New(provider storage.Provider, opts ...Option) (*Store, error) {
store, err := provider.OpenStore(nameSpace)
s, err := store.Open(provider, nameSpace,
store.NewTagGroup(logTagName, indexTagName, statusTagName),
)
if err != nil {
return nil, fmt.Errorf("failed to open log entry store: %w", err)
}

err = provider.SetStoreConfig(nameSpace, storage.StoreConfiguration{
TagNames: []string{logTagName, indexTagName, statusTagName},
})
if err != nil {
return nil, fmt.Errorf("failed to set store configuration: %w", err)
}

logEntryStore := &Store{
pageSize: defaultPageSize,
store: store,
store: s,
}

for _, opt := range opts {
Expand Down Expand Up @@ -122,6 +120,8 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm
logEntry := &LogEntry{
Index: index,
LeafEntry: entry,
LogURL: base64.RawURLEncoding.EncodeToString([]byte(logURL)),
Status: EntryStatusSuccess,
}

logEntryBytes, err := json.Marshal(logEntry)
Expand All @@ -131,7 +131,7 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm

indexTag := storage.Tag{
Name: indexTagName,
Value: strconv.Itoa(index),
Value: strconv.Itoa(logEntry.Index),
}

statusTag := storage.Tag{
Expand All @@ -141,7 +141,7 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm

logTag := storage.Tag{
Name: logTagName,
Value: base64.RawURLEncoding.EncodeToString([]byte(logURL)),
Value: logEntry.LogURL,
}

op := storage.Operation{
Expand All @@ -153,8 +153,7 @@ func (s *Store) StoreLogEntries(logURL string, start, end uint64, entries []comm
operations[i] = op
}

err := s.store.Batch(operations)
if err != nil {
if err := s.store.Batch(operations); err != nil {
return orberrors.NewTransient(fmt.Errorf("failed to add entries for log: %w", err))
}

Expand Down Expand Up @@ -216,6 +215,20 @@ func (s *Store) FailLogEntriesFrom(logURL string, start uint64) error { // nolin
return orberrors.NewTransient(fmt.Errorf("failed to get key: %w", err))
}

var entry LogEntry

err = json.Unmarshal(entryBytes, &entry)
if err != nil {
return fmt.Errorf("failed to unmarshal entry bytes: %w", err)
}

entry.Status = EntryStatusFailed

entryBytes, err = json.Marshal(entry)
if err != nil {
return fmt.Errorf("failed to marshal entry bytes: %w", err)
}

op := storage.Operation{
Key: key,
Value: entryBytes,
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/logentry/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNew(t *testing.T) {

s, err := New(provider)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to open log entry store: open store error")
require.Contains(t, err.Error(), "open store error")
require.Nil(t, s)
})
}
Expand Down Expand Up @@ -660,6 +660,7 @@ func TestStore_FailLogEntriesFrom(t *testing.T) {
iterator := &mocks.Iterator{}
iterator.NextReturnsOnCall(0, true, nil)
iterator.NextReturnsOnCall(1, false, fmt.Errorf("iterator second next() error"))
iterator.ValueReturns([]byte(`{}`), nil)

store := &mocks.Store{}
store.QueryReturns(iterator, nil)
Expand Down
73 changes: 35 additions & 38 deletions pkg/store/logmonitor/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,41 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"

"github.com/hyperledger/aries-framework-go/spi/storage"
"github.com/trustbloc/edge-core/pkg/log"
"github.com/trustbloc/vct/pkg/controller/command"

orberrors "github.com/trustbloc/orb/pkg/errors"
"github.com/trustbloc/orb/pkg/store"
)

const (
namespace = "log-monitor"

activeIndex = "active"
statusIndex = "status"
)

type status = string

const (
statusActive status = "active"
statusInactive status = "inactive"
)

var logger = log.New("log-monitor-store")

// New returns new instance of log monitor store.
func New(provider storage.Provider) (*Store, error) {
store, err := provider.OpenStore(namespace)
s, err := store.Open(provider, namespace,
store.NewTagGroup(statusIndex),
)
if err != nil {
return nil, fmt.Errorf("failed to open log monitor store: %w", err)
}

err = provider.SetStoreConfig(namespace, storage.StoreConfiguration{TagNames: []string{activeIndex}})
if err != nil {
return nil, fmt.Errorf("failed to set store configuration: %w", err)
}

return &Store{
store: store,
store: s,
marshal: json.Marshal,
unmarshal: json.Unmarshal,
}, nil
Expand All @@ -55,11 +59,11 @@ type Store struct {

// LogMonitor provides information about log monitor.
type LogMonitor struct {
Log string `json:"log_url"`
STH *command.GetSTHResponse `json:"sth_response"`
PubKey []byte `json:"pub_key"`
Log string `json:"logUrl"`
STH *command.GetSTHResponse `json:"sthResponse"`
PubKey []byte `json:"pubKey"`

Active bool `json:"active"`
Status status `json:"status"`
}

// Activate stores a log to be monitored. If it already exists active flag will be set to true.
Expand All @@ -74,14 +78,14 @@ func (s *Store) Activate(logURL string) error {
// create new log monitor
rec = &LogMonitor{
Log: logURL,
Active: true,
Status: statusActive,
}
} else {
return orberrors.NewTransientf("failed to get log monitor record: %w", err)
}
}

rec.Active = true
rec.Status = statusActive

recBytes, err := s.marshal(rec)
if err != nil {
Expand All @@ -91,8 +95,8 @@ func (s *Store) Activate(logURL string) error {
logger.Debugf("storing log monitor: %s", string(recBytes))

indexTag := storage.Tag{
Name: activeIndex,
Value: "true",
Name: statusIndex,
Value: statusActive,
}

if e := s.store.Put(logURL, recBytes, indexTag); e != nil {
Expand All @@ -117,7 +121,7 @@ func (s *Store) Deactivate(logURL string) error {
return orberrors.NewTransientf("failed to deactivate log[%s] monitor: %w", logURL, err)
}

rec.Active = false
rec.Status = statusInactive

recBytes, err := s.marshal(rec)
if err != nil {
Expand All @@ -127,8 +131,8 @@ func (s *Store) Deactivate(logURL string) error {
logger.Debugf("deactivating log monitor: %s", logURL)

indexTag := storage.Tag{
Name: activeIndex,
Value: "false",
Name: statusIndex,
Value: statusInactive,
}

if e := s.store.Put(logURL, recBytes, indexTag); e != nil {
Expand Down Expand Up @@ -173,8 +177,8 @@ func (s *Store) Update(logMonitor *LogMonitor) error {
logger.Debugf("updating log monitor: %s", string(recBytes))

indexTag := storage.Tag{
Name: activeIndex,
Value: strconv.FormatBool(logMonitor.Active),
Name: statusIndex,
Value: logMonitor.Status,
}

if e := s.store.Put(logMonitor.Log, recBytes, indexTag); e != nil {
Expand All @@ -197,32 +201,25 @@ func (s *Store) Delete(logURL string) error {

// GetActiveLogs retrieves all active log monitors.
func (s *Store) GetActiveLogs() ([]*LogMonitor, error) {
return s.getLogs(true)
return s.getLogs(statusActive)
}

// GetInactiveLogs retrieves all inactive log monitors.
func (s *Store) GetInactiveLogs() ([]*LogMonitor, error) {
return s.getLogs(false)
return s.getLogs(statusInactive)
}

func (s *Store) getLogs(active bool) ([]*LogMonitor, error) {
var err error

label := "active"
if !active {
label = "inactive"
}

query := fmt.Sprintf("%s:%t", activeIndex, active)
func (s *Store) getLogs(status status) ([]*LogMonitor, error) {
query := fmt.Sprintf("%s:%s", statusIndex, status)

iter, err := s.store.Query(query)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' log monitors, query[%s]: %w", label, query, err)
return nil, fmt.Errorf("failed to get '%s' log monitors, query[%s]: %w", status, query, err)
}

ok, err := iter.Next()
if err != nil {
return nil, fmt.Errorf("iterator error for get '%s' log monitors: %w", label, err)
return nil, fmt.Errorf("iterator error for get '%s' log monitors: %w", status, err)
}

if !ok {
Expand All @@ -234,7 +231,7 @@ func (s *Store) getLogs(active bool) ([]*LogMonitor, error) {
for ok {
value, err := iter.Value()
if err != nil {
return nil, fmt.Errorf("failed to get iterator value for '%s' log monitors: %w", label, err)
return nil, fmt.Errorf("failed to get iterator value for '%s' log monitors: %w", status, err)
}

var logMonitor LogMonitor
Expand All @@ -248,11 +245,11 @@ func (s *Store) getLogs(active bool) ([]*LogMonitor, error) {

ok, err = iter.Next()
if err != nil {
return nil, fmt.Errorf("iterator error for '%s' log monitors: %w", label, err)
return nil, fmt.Errorf("iterator error for '%s' log monitors: %w", status, err)
}
}

logger.Debugf("get '%s' log monitors: %+v", label, logMonitors)
logger.Debugf("get '%s' log monitors: %+v", status, logMonitors)

return logMonitors, nil
}
Loading

0 comments on commit 517f82b

Please sign in to comment.