Skip to content

Commit

Permalink
feat: Store log entries
Browse files Browse the repository at this point in the history
Store log entries.

Closes #1240

Signed-off-by: Sandra Vrtikapa <sandra.vrtikapa@securekey.com>
  • Loading branch information
sandrask committed Apr 20, 2022
1 parent 61890ac commit b175913
Show file tree
Hide file tree
Showing 10 changed files with 983 additions and 59 deletions.
25 changes: 25 additions & 0 deletions cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const (
defaultVCTLogMonitoringInterval = 10 * time.Second
defaultVCTLogMonitoringMaxTreeSize = 50000
defaultVCTLogMonitoringGetEntriesRange = 1000
defaultVCTLogEntriesStoreEnabled = false
defaultAnchorStatusMonitoringInterval = 5 * time.Second
defaultAnchorStatusInProcessGracePeriod = 30 * time.Second
mqDefaultMaxConnectionSubscriptions = 1000
Expand Down Expand Up @@ -187,6 +188,12 @@ const (
"Defaults to 1000 if not set. Has to be less or equal than 1000 due to VCT limitation." +
commonEnvVarUsageText + vctLogMonitoringGetEntriesRangeEnvKey

vctLogEntriesStoreEnabledFlagName = "vct-log-entries-store-enabled"
vctLogEntriesStoreEnabledEnvKey = "VCT_LOG_ENTRIES_STORE_ENABLED"
vctLogEntriesStoreEnabledFlagUsage = "Enables storing of log entries during log monitoring. " +
"Defaults to false if not set. " +
commonEnvVarUsageText + vctLogEntriesStoreEnabledEnvKey

anchorStatusMonitoringIntervalFlagName = "anchor-status-monitoring-interval"
anchorStatusMonitoringIntervalEnvKey = "ANCHOR_STATUS_MONITORING_INTERVAL"
anchorStatusMonitoringIntervalFlagUsage = "The interval in which 'in-process' anchors are monitored to ensure that they will be witnessed(completed) as per policy." +
Expand Down Expand Up @@ -710,6 +717,7 @@ type orbParameters struct {
vctLogMonitoringInterval time.Duration
vctLogMonitoringTreeSize uint64
vctLogMonitoringGetEntriesRange int
vctLogEntriesStoreEnabled bool
anchorStatusMonitoringInterval time.Duration
anchorStatusInProcessGracePeriod time.Duration
apClientCacheSize int
Expand Down Expand Up @@ -1298,6 +1306,21 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
vctLogMonitoringGetEntriesRange = int(getEntriesRange)
}

enableLogEntriesStoreStr, err := cmdutils.GetUserSetVarFromString(cmd, vctLogEntriesStoreEnabledFlagName, vctLogEntriesStoreEnabledEnvKey, true)
if err != nil {
return nil, err
}

vctLogEntriesStoreEnabled := defaultVCTLogEntriesStoreEnabled
if enableLogEntriesStoreStr != "" {
enable, parseErr := strconv.ParseBool(enableLogEntriesStoreStr)
if parseErr != nil {
return nil, fmt.Errorf("invalid value for %s: %s", vctLogEntriesStoreEnabledFlagName, parseErr)
}

vctLogEntriesStoreEnabled = enable
}

anchorStatusMonitoringInterval, err := getDuration(cmd, anchorStatusMonitoringIntervalFlagName, anchorStatusMonitoringIntervalEnvKey,
defaultAnchorStatusMonitoringInterval)
if err != nil {
Expand Down Expand Up @@ -1398,6 +1421,7 @@ func getOrbParameters(cmd *cobra.Command) (*orbParameters, error) {
vctLogMonitoringInterval: vctLogMonitoringInterval,
vctLogMonitoringTreeSize: vctLogMonitoringMaxTreeSize,
vctLogMonitoringGetEntriesRange: vctLogMonitoringGetEntriesRange,
vctLogEntriesStoreEnabled: vctLogEntriesStoreEnabled,
anchorStatusMonitoringInterval: anchorStatusMonitoringInterval,
anchorStatusInProcessGracePeriod: anchorStatusInProcessGracePeriod,
witnessPolicyCacheExpiration: witnessPolicyCacheExpiration,
Expand Down Expand Up @@ -2017,6 +2041,7 @@ func createFlags(startCmd *cobra.Command) {
startCmd.Flags().StringP(vctLogMonitoringIntervalFlagName, "", "", vctLogMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(vctLogMonitoringMaxTreeSizeFlagName, "", "", vctLogMonitoringMaxTreeSizeFlagUsage)
startCmd.Flags().StringP(vctLogMonitoringGetEntriesRangeFlagName, "", "", vctLogMonitoringGetEntriesRangeFlagUsage)
startCmd.Flags().StringP(vctLogEntriesStoreEnabledFlagName, "", "", vctLogEntriesStoreEnabledFlagUsage)
startCmd.Flags().StringP(anchorStatusMonitoringIntervalFlagName, "", "", anchorStatusMonitoringIntervalFlagUsage)
startCmd.Flags().StringP(anchorStatusInProcessGracePeriodFlagName, "", "", anchorStatusInProcessGracePeriodFlagUsage)
startCmd.Flags().StringP(witnessPolicyCacheExpirationFlagName, "", "", witnessPolicyCacheExpirationFlagUsage)
Expand Down
13 changes: 13 additions & 0 deletions cmd/orb-server/startcmd/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,19 @@ func TestStartCmdWithMissingArg(t *testing.T) {
require.Contains(t, err.Error(), "vct-log-monitoring-get-entries-range: strconv.ParseUint: parsing \"xxx\": invalid syntax")
})

t.Run("VCT log monitoring - log entries store enabled", func(t *testing.T) {
restoreEnv := setEnv(t, vctLogEntriesStoreEnabledEnvKey, "xxx")
defer restoreEnv()

startCmd := GetStartCmd()

startCmd.SetArgs(getTestArgs("localhost:8081", "local", "false", databaseTypeMemOption, ""))

err := startCmd.Execute()
require.Error(t, err)
require.Contains(t, err.Error(), "vct-log-entries-store-enabled: strconv.ParseBool: parsing \"xxx\": invalid syntax")
})

t.Run("anchor status monitoring interval", func(t *testing.T) {
restoreEnv := setEnv(t, anchorStatusMonitoringIntervalEnvKey, "xxx")
defer restoreEnv()
Expand Down
19 changes: 17 additions & 2 deletions cmd/orb-server/startcmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ import (
casstore "github.com/trustbloc/orb/pkg/store/cas"
didanchorstore "github.com/trustbloc/orb/pkg/store/didanchor"
"github.com/trustbloc/orb/pkg/store/expiry"
"github.com/trustbloc/orb/pkg/store/logentry"
"github.com/trustbloc/orb/pkg/store/logmonitor"
opstore "github.com/trustbloc/orb/pkg/store/operation"
unpublishedopstore "github.com/trustbloc/orb/pkg/store/operation/unpublished"
Expand Down Expand Up @@ -773,9 +774,23 @@ func startOrbServices(parameters *orbParameters) error {
return fmt.Errorf("failed to create log monitor store: %w", err)
}

logMonitoringSvc, err := logmonitoring.New(logMonitorStore, httpClient, parameters.requestTokens,
logMonitoringOpts := []logmonitoring.Option{
logmonitoring.WithMaxTreeSize(parameters.vctLogMonitoringTreeSize),
logmonitoring.WithMaxGetEntriesRange(parameters.vctLogMonitoringGetEntriesRange))
logmonitoring.WithMaxGetEntriesRange(parameters.vctLogMonitoringGetEntriesRange),
}

if parameters.vctLogEntriesStoreEnabled {
logEntryStore, err := logentry.New(storeProviders.provider)
if err != nil {
return fmt.Errorf("failed to create log entries store: %w", err)
}

logMonitoringOpts = append(logMonitoringOpts,
logmonitoring.WithLogEntriesStore(logEntryStore))
}

logMonitoringSvc, err := logmonitoring.New(logMonitorStore, httpClient, parameters.requestTokens,
logMonitoringOpts...)
if err != nil {
return fmt.Errorf("new VCT consistency monitoring service: %w", err)
}
Expand Down
116 changes: 66 additions & 50 deletions pkg/activitypub/service/vct/logmonitoring/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ import (
"net/http"
"sync"

"github.com/google/trillian/merkle/logverifier"
"github.com/hyperledger/aries-framework-go/pkg/crypto/tinkcrypto"
"github.com/hyperledger/aries-framework-go/pkg/kms/localkms"
"github.com/transparency-dev/merkle/compact"
"github.com/transparency-dev/merkle/rfc6962"
"github.com/trustbloc/edge-core/pkg/log"
"github.com/trustbloc/vct/pkg/client/vct"
"github.com/trustbloc/vct/pkg/controller/command"

"github.com/trustbloc/orb/pkg/activitypub/service/vct/logmonitoring/verifier"
orberrors "github.com/trustbloc/orb/pkg/errors"
"github.com/trustbloc/orb/pkg/store/logmonitor"
)
Expand Down Expand Up @@ -52,6 +50,15 @@ type logMonitorStore interface {
Update(log *logmonitor.LogMonitor) error
}

type logEntryStore interface {
StoreLogEntries(log string, start, end uint64, entries []command.LeafEntry) error
}

type logVerifier interface {
VerifyConsistencyProof(snapshot1, snapshot2 int64, root1, root2 []byte, proof [][]byte) error
GetRootHashFromEntries(entries []*command.LeafEntry) ([]byte, error)
}

/* Monitors watch logs and check that they behave correctly.
In order to do this, it should follow these steps for each log:
1. Fetch the current STH.
Expand All @@ -75,7 +82,9 @@ type logMonitorStore interface {
// Client implements periodical monitoring of VCT consistency
// as per https://datatracker.ietf.org/doc/html/rfc6962#section-5.3.
type Client struct {
store logMonitorStore
logVerifier logVerifier
monitorStore logMonitorStore
entryStore logEntryStore
http httpClient
requestTokens map[string]string
maxTreeSize uint64
Expand All @@ -99,10 +108,19 @@ func WithMaxGetEntriesRange(max int) Option {
}
}

// WithLogEntriesStore sets optional implementation of log entries store (default is noop store).
func WithLogEntriesStore(s logEntryStore) Option {
return func(opts *Client) {
opts.entryStore = s
}
}

// New returns new client for monitoring VCT log consistency.
func New(store logMonitorStore, httpClient httpClient, requestTokens map[string]string, opts ...Option) (*Client, error) { //nolint:lll
client := &Client{
store: store,
logVerifier: verifier.New(),
monitorStore: store,
entryStore: &noopLogEntryStore{},
http: httpClient,
requestTokens: requestTokens,
maxTreeSize: defaultMaxTreeSize,
Expand All @@ -122,7 +140,7 @@ func (c *Client) checkVCTConsistency(logMonitor *logmonitor.LogMonitor) error {

storedSTH := logMonitor.STH

// creates new client based on domain
// creates new client based on log
vctClient := vct.New(logMonitor.Log, vct.WithHTTPClient(c.http),
vct.WithAuthWriteToken(c.requestTokens[vctWriteTokenKey]),
vct.WithAuthReadToken(c.requestTokens[vctReadTokenKey]))
Expand Down Expand Up @@ -154,8 +172,8 @@ func (c *Client) checkVCTConsistency(logMonitor *logmonitor.LogMonitor) error {
logMonitor.STH = sth
logMonitor.PubKey = pubKey

// store the latest checked STH for domain; set processing flag to false
err = c.store.Update(logMonitor)
// store the latest checked STH for log; set processing flag to false
err = c.monitorStore.Update(logMonitor)
if err != nil {
return fmt.Errorf("failed to store STH: %w", err)
}
Expand Down Expand Up @@ -198,7 +216,7 @@ func (c *Client) verifySTH(logURL string, storedSTH, sth *command.GetSTHResponse
return nil
}

err = verifySTHConsistency(logURL, storedSTH, sth, vctClient)
err = c.verifySTHConsistency(logURL, storedSTH, sth, vctClient)
if err != nil {
return fmt.Errorf("failed to verify STH consistency: %w", err)
}
Expand All @@ -208,19 +226,19 @@ func (c *Client) verifySTH(logURL string, storedSTH, sth *command.GetSTHResponse
return nil
}

func (c *Client) verifySTHTree(domain string, sth *command.GetSTHResponse, vctClient *vct.Client) error {
logger.Debugf("log[%s]: get STH tree[%d] and verify consistency", domain, sth.TreeSize)
func (c *Client) verifySTHTree(logURL string, sth *command.GetSTHResponse, vctClient *vct.Client) error {
logger.Debugf("log[%s]: get STH tree[%d] and verify consistency", logURL, sth.TreeSize)

entries, err := getEntries(domain, vctClient, sth.TreeSize, c.maxGetEntriesRange)
entries, err := c.getAllEntries(logURL, vctClient, sth.TreeSize, c.maxGetEntriesRange)
if err != nil {
return fmt.Errorf("failed to get all entries: %w", err)
}

logger.Debugf("log[%s]: get all entries[%d] for tree size[%d]", domain, len(entries), sth.TreeSize)
logger.Debugf("log[%s]: get all entries[%d] for tree size[%d]", logURL, len(entries), sth.TreeSize)

// Confirm that the tree made from the fetched entries produces the
// same hash as that in the STH.
root, err := getRootHashFromEntries(entries)
root, err := c.logVerifier.GetRootHashFromEntries(entries)
if err != nil {
return fmt.Errorf("failed to get root hash from entries: %w", err)
}
Expand All @@ -229,29 +247,34 @@ func (c *Client) verifySTHTree(domain string, sth *command.GetSTHResponse, vctCl
return fmt.Errorf("different root hash results from merkle tree building: %s and sth %s", root, sth.SHA256RootHash)
}

logger.Debugf("log[%s]: merkle tree hash from all entries matches latest STH", domain)
logger.Debugf("log[%s]: merkle tree hash from all entries matches latest STH", logURL)

return nil
}

func getEntries(domain string, vctClient *vct.Client,
treeSize uint64, maxEntriesPerRequest int) ([]*command.LeafEntry, error) {
func (c *Client) getEntries(logURL string, vctClient *vct.Client,
start, end uint64, maxEntriesPerRequest int) ([]*command.LeafEntry, error) {
var allEntries []*command.LeafEntry

attempts := int(treeSize-1) / maxEntriesPerRequest
attempts := int(end) / maxEntriesPerRequest

// fetch all the entries in the tree corresponding to the STH
// VCT: get-entries allow maximum 1000 entries to be returned
for i := 0; i <= attempts; i++ {
start := uint64(i * maxEntriesPerRequest)
end := min(uint64((i+1)*maxEntriesPerRequest-1), treeSize-1)
attemptStart := start + uint64(i*maxEntriesPerRequest)
attemptEnd := min(uint64((i+1)*maxEntriesPerRequest-1), end)

entries, err := vctClient.GetEntries(context.Background(), start, end)
entries, err := vctClient.GetEntries(context.Background(), attemptStart, attemptEnd)
if err != nil {
return nil, fmt.Errorf("failed to get entries for range[%d-%d]: %w", start, end, err)
return nil, fmt.Errorf("failed to get entries for range[%d-%d]: %w", attemptStart, attemptEnd, err)
}

logger.Debugf("domain[%s] fetched entries from %d to %d", domain, start, end)
logger.Debugf("log[%s] fetched entries from %d to %d", logURL, attemptStart, attemptEnd)

err = c.entryStore.StoreLogEntries(logURL, attemptStart, attemptEnd, entries.Entries)
if err != nil {
return nil, fmt.Errorf("failed to store entries for range[%d-%d]: %w", attemptStart, attemptEnd, err)
}

for i := range entries.Entries {
allEntries = append(allEntries, &entries.Entries[i])
Expand All @@ -261,25 +284,9 @@ func getEntries(domain string, vctClient *vct.Client,
return allEntries, nil
}

func getRootHashFromEntries(entries []*command.LeafEntry) ([]byte, error) {
hasher := rfc6962.DefaultHasher
fact := compact.RangeFactory{Hash: hasher.HashChildren}
cr := fact.NewEmptyRange(0)

// We don't simply iterate the map, as we need to preserve the leaves order.
for _, entry := range entries {
err := cr.Append(hasher.HashLeaf(entry.LeafInput), nil)
if err != nil {
return nil, err
}
}

root, err := cr.GetRootHash(nil)
if err != nil {
return nil, fmt.Errorf("failed to compute compact range root: %w", err)
}

return root, nil
func (c *Client) getAllEntries(logURL string, vctClient *vct.Client,
treeSize uint64, maxEntriesPerRequest int) ([]*command.LeafEntry, error) {
return c.getEntries(logURL, vctClient, 0, treeSize-1, maxEntriesPerRequest)
}

func min(a, b uint64) uint64 {
Expand All @@ -290,33 +297,36 @@ func min(a, b uint64) uint64 {
return b
}

func verifySTHConsistency(domain string, storedSTH, sth *command.GetSTHResponse, vctClient *vct.Client) error {
func (c *Client) verifySTHConsistency(logURL string, storedSTH, sth *command.GetSTHResponse, vctClient *vct.Client) error { //nolint:lll
if storedSTH.TreeSize == 0 {
// any tree is consistent with tree size of zero - nothing to do
logger.Debugf("log[%s]: STH stored tree size is zero - nothing to do for STH consistency", domain)
logger.Debugf("log[%s]: STH stored tree size is zero - nothing to do for STH consistency", logURL)

return nil
}

logger.Debugf("log[%s]: get STH consistency for stored[%d] and latest[%d]",
domain, storedSTH.TreeSize, sth.TreeSize)
logURL, storedSTH.TreeSize, sth.TreeSize)

sthConsistency, err := vctClient.GetSTHConsistency(context.Background(), storedSTH.TreeSize, sth.TreeSize)
if err != nil {
return fmt.Errorf("get STH consistency: %w", err)
}

logger.Debugf("log[%s]: found %d consistencies in STH consistency response",
domain, len(sthConsistency.Consistency))

logVerifier := logverifier.New(rfc6962.DefaultHasher)
logURL, len(sthConsistency.Consistency))

err = logVerifier.VerifyConsistencyProof(int64(storedSTH.TreeSize), int64(sth.TreeSize),
err = c.logVerifier.VerifyConsistencyProof(int64(storedSTH.TreeSize), int64(sth.TreeSize),
storedSTH.SHA256RootHash, sth.SHA256RootHash, sthConsistency.Consistency)
if err != nil {
return fmt.Errorf("verify consistency proof: %w", err)
}

_, err = c.getEntries(logURL, vctClient, storedSTH.TreeSize, sth.TreeSize-1, c.maxGetEntriesRange)
if err != nil {
return fmt.Errorf("get entries between trees: %w", err)
}

return nil
}

Expand Down Expand Up @@ -375,7 +385,7 @@ func verifySTHSignature(sth *command.GetSTHResponse, pubKey []byte) error {
func (c *Client) MonitorLogs() {
logger.Debugf("start log monitoring...")

logs, err := c.store.GetActiveLogs()
logs, err := c.monitorStore.GetActiveLogs()
if err != nil {
if errors.Is(err, orberrors.ErrContentNotFound) {
logger.Debugf("no active log monitors found - nothing to do")
Expand Down Expand Up @@ -409,3 +419,9 @@ func (c *Client) processLog(logMonitor *logmonitor.LogMonitor) {
logger.Errorf("[%s] failed to check VCT consistency: %s", logMonitor.Log, err.Error())
}
}

type noopLogEntryStore struct{}

func (s *noopLogEntryStore) StoreLogEntries(logURL string, start, end uint64, entries []command.LeafEntry) error {
return nil
}
Loading

0 comments on commit b175913

Please sign in to comment.