Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Stop monitoring anchor once witness list is exhausted #1561

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/orb-server/startcmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ const (
anchorStatusInProcessGracePeriodFlagName = "anchor-status-in-process-grace-period"
anchorStatusInProcessGracePeriodEnvKey = "ANCHOR_STATUS_IN_PROCESS_GRACE_PERIOD"
anchorStatusInProcessGracePeriodFlagUsage = "The period in which witnesses will not be re-selected for 'in-process' anchors." +
"Defaults to 1m if not set. " +
"Defaults to 30s if not set. " +
commonEnvVarUsageText + anchorStatusInProcessGracePeriodEnvKey

externalEndpointFlagName = "external-endpoint"
Expand Down
6 changes: 4 additions & 2 deletions pkg/activitypub/service/activityhandler/inboxhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ func (h *Inbox) HandleAnnounceActivity(ctx context.Context, source *url.URL, ann
}

func (h *Inbox) handleOfferActivity(ctx context.Context, offer *vocab.ActivityType) error {
h.logger.Debug("Handling 'Offer' activity", logfields.WithActivityID(offer.ID()))
h.logger.Info("Handling 'Offer' activity", logfields.WithActivityID(offer.ID()),
logfields.WithActorIRI(offer.Actor()))

anchorLink, err := h.validateAndUnmarshalOfferActivity(offer)
if err != nil {
Expand Down Expand Up @@ -570,7 +571,8 @@ func (h *Inbox) handleOfferActivity(ctx context.Context, offer *vocab.ActivityTy
}

func (h *Inbox) handleAcceptOfferActivity(ctx context.Context, accept, offer *vocab.ActivityType) error {
h.logger.Debug("Handling 'Accept' offer activity", logfields.WithActivityID(accept.ID()))
h.logger.Info("Handling 'Accept' offer activity", logfields.WithActivityID(accept.ID()),
logfields.WithActorIRI(accept.Actor()))

err := h.validateAcceptOfferActivity(accept)
if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions pkg/anchor/handler/proof/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package proof
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"reflect"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/trustbloc/orb/pkg/anchor/vcpubsub"
proofapi "github.com/trustbloc/orb/pkg/anchor/witness/proof"
"github.com/trustbloc/orb/pkg/datauri"
orberrors "github.com/trustbloc/orb/pkg/errors"
"github.com/trustbloc/orb/pkg/linkset"
"github.com/trustbloc/orb/pkg/pubsub/spi"
"github.com/trustbloc/orb/pkg/vcsigner"
Expand Down Expand Up @@ -147,7 +149,9 @@ func (h *WitnessProofHandler) HandleProof(ctx context.Context, witness *url.URL,

status, err := h.StatusStore.GetStatus(anchor)
if err != nil {
return fmt.Errorf("failed to get status for anchor [%s]: %w", anchor, err)
if !errors.Is(err, orberrors.ErrContentNotFound) {
return fmt.Errorf("failed to get status for anchor [%s]: %w", anchor, err)
}
}

if status == proofapi.AnchorIndexStatusCompleted {
Expand Down Expand Up @@ -208,7 +212,8 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin

// Witness policy has been satisfied so add witness proofs to anchor, set 'complete' status for anchor
// publish witnessed anchor to batch writer channel for further processing
logger.Info("Witness policy has been satisfied for anchor", logfields.WithAnchorURIString(anchorID))
logger.Info("Witness policy has been satisfied for anchor", logfields.WithAnchorURIString(anchorID),
logfields.WithVerifiableCredentialID(vc.ID))

vc, err = addProofs(vc, witnessProofs)
if err != nil {
Expand All @@ -217,11 +222,14 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin

status, err := h.StatusStore.GetStatus(anchorID)
if err != nil {
return fmt.Errorf("failed to get status for anchor [%s]: %w", anchorID, err)
if !errors.Is(err, orberrors.ErrContentNotFound) {
return fmt.Errorf("failed to get status for anchor [%s]: %w", anchorID, err)
}
}

if status == proofapi.AnchorIndexStatusCompleted {
logger.Info("Anchor status has already been marked as completed for", logfields.WithAnchorURIString(anchorID))
logger.Info("Anchor status has already been marked as completed for", logfields.WithAnchorURIString(anchorID),
logfields.WithVerifiableCredentialID(vc.ID))

return nil
}
Expand All @@ -230,7 +238,8 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin
// then this handler would be invoked on another server instance. So, we want the status to remain in-process,
// otherwise the handler on the other instance would not publish the VC because it would think that is has
// already been processed.
logger.Debug("Publishing anchor", logfields.WithAnchorURIString(anchorID))
logger.Debug("Publishing anchor", logfields.WithAnchorURIString(anchorID),
logfields.WithVerifiableCredentialID(vc.ID))

vcBytes, err := canonicalizer.MarshalCanonical(vc)
if err != nil {
Expand All @@ -254,11 +263,13 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin
return fmt.Errorf("publish credential[%s]: %w", anchorID, err)
}

logger.Debug("Setting anchor status to completed", logfields.WithAnchorURIString(anchorID))
logger.Info("Setting anchor status to completed", logfields.WithAnchorURIString(anchorID),
logfields.WithVerifiableCredentialID(vc.ID))

err = h.StatusStore.AddStatus(anchorID, proofapi.AnchorIndexStatusCompleted)
if err != nil {
return fmt.Errorf("failed to change status to 'completed' for anchor [%s]: %w", anchorID, err)
return fmt.Errorf("failed to change status to 'completed' for anchor [%s], VC [%s]: %w",
anchorID, vc.ID, err)
}

if vc.Issued != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/anchor/handler/proof/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func TestWitnessProofHandler(t *testing.T) {
err = proofHandler.HandleProof(context.Background(), witness1IRI, al.Anchor().String(), expiryTime, []byte(witnessProofJSONWebSignature))
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf(
"failed to change status to 'completed' for anchor [%s]: add status error", al.Anchor().String()))
"failed to change status to 'completed' for anchor [%s]", al.Anchor().String()))
})

t.Run("status already completed", func(t *testing.T) {
Expand Down Expand Up @@ -731,8 +731,7 @@ func TestWitnessProofHandler(t *testing.T) {
proofHandler := New(providers, ps, datauri.MediaTypeDataURIGzipBase64, defaultClockSkew)

err = proofHandler.HandleProof(context.Background(), witness1IRI, al.Anchor().String(), expiryTime, []byte(witnessProofJSONWebSignature))
require.Error(t, err)
require.Contains(t, err.Error(), "status not found for anchor [hl:uEiABbKSeh3rb4MOjS1Era2_62bBPwP9EytPSg5tIkNYiSQ]")
require.NoError(t, err)
})

t.Run("error - store error", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/anchor/witness/policy/inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *Inspector) postOfferActivity(ctx context.Context, anchorLink *linkset.L
return fmt.Errorf("failed to post additional offer for anchor[%s]: %w", anchorLink.Anchor(), err)
}

logger.Info("Created additional 'Offer' activity for anchor", logfields.WithAnchorURI(anchorLink.Anchor()),
logger.Info("Posted 'Offer' activity to additional witnesses", logfields.WithAnchorURI(anchorLink.Anchor()),
logfields.WithActivityID(activityID), logfields.WithWitnessURIs(witnessesIRI...))

return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/anchor/witness/policy/selector/random/selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/trustbloc/orb/pkg/anchor/witness/proof"
orberrors "github.com/trustbloc/orb/pkg/errors"
)

// New returns new random selector.
Expand All @@ -29,7 +30,8 @@ func (s *Selector) Select(witnesses []*proof.Witness, n int) ([]*proof.Witness,
l := len(witnesses)

if n > l {
return nil, fmt.Errorf("unable to select %d witnesses from witness array of length %d", n, len(witnesses))
return nil, fmt.Errorf("unable to select %d witnesses from witness array of length %d: %w",
n, len(witnesses), orberrors.ErrWitnessesNotFound)
}

if n == l {
Expand Down
3 changes: 3 additions & 0 deletions pkg/anchor/witness/policy/selector/random/selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package random

import (
"errors"
"testing"

"github.com/stretchr/testify/require"

"github.com/trustbloc/orb/pkg/anchor/witness/proof"
orberrors "github.com/trustbloc/orb/pkg/errors"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -51,6 +53,7 @@ func TestSelect(t *testing.T) {
selected, err := s.Select(nil, 2)
require.Error(t, err)
require.Empty(t, selected)
require.True(t, errors.Is(err, orberrors.ErrWitnessesNotFound))
require.Contains(t, err.Error(), "unable to select 2 witnesses from witness array of length 0")
})
}
4 changes: 2 additions & 2 deletions pkg/anchor/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ func (c *Writer) postOfferActivity(ctx context.Context, anchorLink *linkset.Link
return fmt.Errorf("store witnesses: %w", err)
}

logger.Debug("Created 'Offer' activity for anchor", logfields.WithAnchorURI(anchorLink.Anchor()),
logfields.WithActivityID(activityID))
logger.Info("Posted 'Offer' activity to witnesses", logfields.WithAnchorURI(anchorLink.Anchor()),
logfields.WithActivityID(activityID), logfields.WithWitnessURIs(selectedWitnessesIRIs...))

if len(selectedWitnessesIRIs) == 1 {
// The Offer was posted only to the public IRI. This means that it will be persisted
Expand Down
7 changes: 5 additions & 2 deletions pkg/document/updatehandler/decorator/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ type metricsProvider interface {
}

// Decorate will validate local state against anchor origin for update/recover/deactivate.
func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Operation, error) { //nolint:cyclop
//
//nolint:funlen
func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Operation, error) {
startTime := time.Now()

defer func() {
Expand Down Expand Up @@ -132,7 +134,8 @@ func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Opera

anchorOriginResponse, err := d.resolveDocumentFromAnchorOrigin(ctx, canonicalID, localAnchorOrigin)
if err != nil {
logger.Debugc(ctx, "Failed to resolve document from anchor origin", logfields.WithDID(canonicalID), log.WithError(err))
logger.Warnc(ctx, "Failed to resolve document from anchor origin. The local document will be used.",
logfields.WithDID(canonicalID), log.WithError(err))

return op, nil
}
Expand Down
51 changes: 39 additions & 12 deletions pkg/store/anchorstatus/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,12 @@ func (s *Store) AddStatus(anchorID string, status proof.AnchorIndexStatus) error
}

if status == proof.AnchorIndexStatusCompleted {
logger.Info("Anchor has completed processing", logfields.WithAnchorURIString(anchorID))

delErr := s.deleteInProcessStatus(anchorID)
if delErr != nil {
// no need to stop processing for this
logger.Debug("Failed to delete in-process statuses after receiving complete status",
logger.Warn("Failed to delete in-process statuses after receiving complete status",
log.WithError(delErr))
}
}
Expand Down Expand Up @@ -276,7 +278,7 @@ func (s *Store) GetStatus(anchorID string) (proof.AnchorIndexStatus, error) {
}

if !ok {
return "", fmt.Errorf("status not found for anchor [%s]", anchorID)
return "", fmt.Errorf("status not found for anchor [%s]: %w", anchorID, orberrors.ErrContentNotFound)
}

var status proof.AnchorIndexStatus
Expand Down Expand Up @@ -352,12 +354,7 @@ func (s *Store) CheckInProcessAnchors() {

e = s.processIndex(status.AnchorID)
if e != nil {
if errors.Is(e, orberrors.ErrWitnessesNotFound) {
// This is not a critical error. Log it as info.
logger.Info("Failed to process anchor index", log.WithError(e))
} else {
logger.Error("Failed to process anchor index", log.WithError(e))
}
logger.Error("Failed to process anchor index", log.WithError(e))
}

more, e = iterator.Next()
Expand All @@ -381,20 +378,50 @@ func (s *Store) processIndex(encodedAnchorID string) error {

status, err := s.GetStatus(anchorID)
if err != nil {
return fmt.Errorf("failed to get status for anchorID[%s]: %w", anchorID, err)
if !errors.Is(err, orberrors.ErrContentNotFound) {
return fmt.Errorf("failed to get status for anchorID[%s]: %w", anchorID, err)
}

logger.Info("Status not found for anchor. No further processing will be performed for this anchor.",
logfields.WithAnchorURIString(anchorID))

return nil
}

if status == proof.AnchorIndexStatusCompleted {
// already completed - nothing to do
logger.Info("Anchor status is already set to completed. No processing required.",
logfields.WithAnchorURIString(anchorID))

// Delete all in-process status records
err = s.deleteInProcessStatus(anchorID)
if err != nil {
logger.Warn("Error deleting in process anchor status", log.WithError(err),
logfields.WithAnchorURIString(anchorID))
}

return nil
}

err = s.policyHandler.CheckPolicy(anchorID)
if err != nil {
return fmt.Errorf("failed to re-evaluate policy for anchorID[%s]: %w", anchorID, err)
if !errors.Is(err, orberrors.ErrWitnessesNotFound) {
return fmt.Errorf("failed to re-evaluate policy for anchorID[%s]: %w", anchorID, err)
}

logger.Info("No additional witnesses found for anchor. No further processing will be performed for this anchor.",
logfields.WithAnchorURIString(anchorID), log.WithError(err))

// Delete all in-process status records
err = s.deleteInProcessStatus(anchorID)
if err != nil {
logger.Warn("Error deleting in process anchor status", log.WithError(err),
logfields.WithAnchorURIString(anchorID))
}

return nil
}

logger.Debug("Successfully re-evaluated policy for anchor", logfields.WithAnchorURIString(anchorID))
logger.Info("Successfully re-evaluated policy for anchor", logfields.WithAnchorURIString(anchorID))

return nil
}
Expand Down
35 changes: 32 additions & 3 deletions pkg/store/anchorstatus/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func TestStore_Get(t *testing.T) {
status, err := s.GetStatus(vcID)
require.Error(t, err)
require.Empty(t, status)
require.True(t, errors.Is(err, orberrors.ErrContentNotFound))
require.Contains(t, err.Error(), "not found")
})

Expand Down Expand Up @@ -291,6 +292,12 @@ func TestStore_CheckInProcessAnchors(t *testing.T) {
WithCheckStatusAfterTime(time.Second))
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusCompleted)
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess)
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess)
require.NoError(t, err)

Expand Down Expand Up @@ -581,7 +588,7 @@ func TestStore_processIndex(t *testing.T) {
require.Contains(t, err.Error(), "failed to re-evaluate policy for anchorID[vcID]: policy error")
})

t.Run("error - status not found", func(t *testing.T) {
t.Run("ignore - status not found", func(t *testing.T) {
mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t)
defer stopMongo()

Expand All @@ -593,8 +600,30 @@ func TestStore_processIndex(t *testing.T) {
require.NoError(t, err)

err = s.processIndex(encoder.EncodeToString([]byte(vcID)))
require.Error(t, err)
require.Contains(t, err.Error(), "failed to get status for anchorID[vcID]")
require.NoError(t, err)
})

t.Run("ignore - status completed", func(t *testing.T) {
mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t)
defer stopMongo()

mongoDBProvider, err := mongodb.NewProvider(mongoDBConnString)
require.NoError(t, err)

s, err := New(mongoDBProvider, testutil.GetExpiryService(t), maxWitnessDelayTime)
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusCompleted)
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess)
require.NoError(t, err)

err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess)
require.NoError(t, err)

err = s.processIndex(encoder.EncodeToString([]byte(vcID)))
require.NoError(t, err)
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/vct/logmonitoring/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func min(a, b uint64) uint64 {

func (c *Client) verifySTHConsistency(logURL string, storedSTH, sth *command.GetSTHResponse, vctClient *vct.Client) error {
if storedSTH.TreeSize > 0 {
logger.Debug("Getting STH consistency for stored[%d] and latest[%d]",
logger.Debug("Getting STH consistency for stored and latest",
logfields.WithLogURLString(logURL), zap.Uint64("stored-size", storedSTH.TreeSize),
logfields.WithSizeUint64(sth.TreeSize))

Expand All @@ -489,7 +489,7 @@ func (c *Client) verifySTHConsistency(logURL string, storedSTH, sth *command.Get
return fmt.Errorf("get STH consistency: %w", err)
}

logger.Debug("Found %d consistencies in STH consistency response", logfields.WithLogURLString(logURL),
logger.Debug("Found consistencies in STH consistency response", logfields.WithLogURLString(logURL),
zap.Int("consistency-size", len(sthConsistency.Consistency)))

err = c.logVerifier.VerifyConsistencyProof(int64(storedSTH.TreeSize), int64(sth.TreeSize),
Expand Down