From 0b4a6f2b4ab754d445baa3910dfb4bd615deeaa3 Mon Sep 17 00:00:00 2001 From: Bob Stasyszyn Date: Wed, 17 May 2023 14:56:16 -0400 Subject: [PATCH] fix: Stop monitoring anchor once witness list is exausted The anchor monitor process will revove all in-process anchor status records after all witnesses have been exausted. This commit also adds more information to existing logs (such as anchorUri) and changes the level from DEBUG to INFO for some of the important logs so that issues may be more easily debugged. Signed-off-by: Bob Stasyszyn --- cmd/orb-server/startcmd/params.go | 2 +- .../service/activityhandler/inboxhandler.go | 6 ++- pkg/anchor/handler/proof/handler.go | 25 ++++++--- pkg/anchor/handler/proof/handler_test.go | 5 +- .../witness/policy/inspector/inspector.go | 2 +- .../policy/selector/random/selector.go | 4 +- .../policy/selector/random/selector_test.go | 3 ++ pkg/anchor/writer/writer.go | 4 +- .../updatehandler/decorator/decorator.go | 7 ++- pkg/store/anchorstatus/store.go | 51 ++++++++++++++----- pkg/store/anchorstatus/store_test.go | 35 +++++++++++-- pkg/vct/logmonitoring/monitor.go | 4 +- 12 files changed, 112 insertions(+), 36 deletions(-) diff --git a/cmd/orb-server/startcmd/params.go b/cmd/orb-server/startcmd/params.go index 2f8fda2c2..222e371a3 100644 --- a/cmd/orb-server/startcmd/params.go +++ b/cmd/orb-server/startcmd/params.go @@ -221,7 +221,7 @@ const ( anchorStatusInProcessGracePeriodFlagName = "anchor-status-in-process-grace-period" anchorStatusInProcessGracePeriodEnvKey = "ANCHOR_STATUS_IN_PROCESS_GRACE_PERIOD" anchorStatusInProcessGracePeriodFlagUsage = "The period in which witnesses will not be re-selected for 'in-process' anchors." + - "Defaults to 1m if not set. " + + "Defaults to 30s if not set. " + commonEnvVarUsageText + anchorStatusInProcessGracePeriodEnvKey externalEndpointFlagName = "external-endpoint" diff --git a/pkg/activitypub/service/activityhandler/inboxhandler.go b/pkg/activitypub/service/activityhandler/inboxhandler.go index ae80336c0..8199c0fc7 100644 --- a/pkg/activitypub/service/activityhandler/inboxhandler.go +++ b/pkg/activitypub/service/activityhandler/inboxhandler.go @@ -513,7 +513,8 @@ func (h *Inbox) HandleAnnounceActivity(ctx context.Context, source *url.URL, ann } func (h *Inbox) handleOfferActivity(ctx context.Context, offer *vocab.ActivityType) error { - h.logger.Debug("Handling 'Offer' activity", logfields.WithActivityID(offer.ID())) + h.logger.Info("Handling 'Offer' activity", logfields.WithActivityID(offer.ID()), + logfields.WithActorIRI(offer.Actor())) anchorLink, err := h.validateAndUnmarshalOfferActivity(offer) if err != nil { @@ -570,7 +571,8 @@ func (h *Inbox) handleOfferActivity(ctx context.Context, offer *vocab.ActivityTy } func (h *Inbox) handleAcceptOfferActivity(ctx context.Context, accept, offer *vocab.ActivityType) error { - h.logger.Debug("Handling 'Accept' offer activity", logfields.WithActivityID(accept.ID())) + h.logger.Info("Handling 'Accept' offer activity", logfields.WithActivityID(accept.ID()), + logfields.WithActorIRI(accept.Actor())) err := h.validateAcceptOfferActivity(accept) if err != nil { diff --git a/pkg/anchor/handler/proof/handler.go b/pkg/anchor/handler/proof/handler.go index 8c7d640a5..505dc4ca6 100644 --- a/pkg/anchor/handler/proof/handler.go +++ b/pkg/anchor/handler/proof/handler.go @@ -9,6 +9,7 @@ package proof import ( "context" "encoding/json" + "errors" "fmt" "net/url" "reflect" @@ -26,6 +27,7 @@ import ( "github.com/trustbloc/orb/pkg/anchor/vcpubsub" proofapi "github.com/trustbloc/orb/pkg/anchor/witness/proof" "github.com/trustbloc/orb/pkg/datauri" + orberrors "github.com/trustbloc/orb/pkg/errors" "github.com/trustbloc/orb/pkg/linkset" "github.com/trustbloc/orb/pkg/pubsub/spi" "github.com/trustbloc/orb/pkg/vcsigner" @@ -147,7 +149,9 @@ func (h *WitnessProofHandler) HandleProof(ctx context.Context, witness *url.URL, status, err := h.StatusStore.GetStatus(anchor) if err != nil { - return fmt.Errorf("failed to get status for anchor [%s]: %w", anchor, err) + if !errors.Is(err, orberrors.ErrContentNotFound) { + return fmt.Errorf("failed to get status for anchor [%s]: %w", anchor, err) + } } if status == proofapi.AnchorIndexStatusCompleted { @@ -208,7 +212,8 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin // Witness policy has been satisfied so add witness proofs to anchor, set 'complete' status for anchor // publish witnessed anchor to batch writer channel for further processing - logger.Info("Witness policy has been satisfied for anchor", logfields.WithAnchorURIString(anchorID)) + logger.Info("Witness policy has been satisfied for anchor", logfields.WithAnchorURIString(anchorID), + logfields.WithVerifiableCredentialID(vc.ID)) vc, err = addProofs(vc, witnessProofs) if err != nil { @@ -217,11 +222,14 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin status, err := h.StatusStore.GetStatus(anchorID) if err != nil { - return fmt.Errorf("failed to get status for anchor [%s]: %w", anchorID, err) + if !errors.Is(err, orberrors.ErrContentNotFound) { + return fmt.Errorf("failed to get status for anchor [%s]: %w", anchorID, err) + } } if status == proofapi.AnchorIndexStatusCompleted { - logger.Info("Anchor status has already been marked as completed for", logfields.WithAnchorURIString(anchorID)) + logger.Info("Anchor status has already been marked as completed for", logfields.WithAnchorURIString(anchorID), + logfields.WithVerifiableCredentialID(vc.ID)) return nil } @@ -230,7 +238,8 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin // then this handler would be invoked on another server instance. So, we want the status to remain in-process, // otherwise the handler on the other instance would not publish the VC because it would think that is has // already been processed. - logger.Debug("Publishing anchor", logfields.WithAnchorURIString(anchorID)) + logger.Debug("Publishing anchor", logfields.WithAnchorURIString(anchorID), + logfields.WithVerifiableCredentialID(vc.ID)) vcBytes, err := canonicalizer.MarshalCanonical(vc) if err != nil { @@ -254,11 +263,13 @@ func (h *WitnessProofHandler) handleWitnessPolicy(ctx context.Context, anchorLin return fmt.Errorf("publish credential[%s]: %w", anchorID, err) } - logger.Debug("Setting anchor status to completed", logfields.WithAnchorURIString(anchorID)) + logger.Info("Setting anchor status to completed", logfields.WithAnchorURIString(anchorID), + logfields.WithVerifiableCredentialID(vc.ID)) err = h.StatusStore.AddStatus(anchorID, proofapi.AnchorIndexStatusCompleted) if err != nil { - return fmt.Errorf("failed to change status to 'completed' for anchor [%s]: %w", anchorID, err) + return fmt.Errorf("failed to change status to 'completed' for anchor [%s], VC [%s]: %w", + anchorID, vc.ID, err) } if vc.Issued != nil { diff --git a/pkg/anchor/handler/proof/handler_test.go b/pkg/anchor/handler/proof/handler_test.go index ee3158e59..6f4ec8ad6 100644 --- a/pkg/anchor/handler/proof/handler_test.go +++ b/pkg/anchor/handler/proof/handler_test.go @@ -615,7 +615,7 @@ func TestWitnessProofHandler(t *testing.T) { err = proofHandler.HandleProof(context.Background(), witness1IRI, al.Anchor().String(), expiryTime, []byte(witnessProofJSONWebSignature)) require.Error(t, err) require.Contains(t, err.Error(), fmt.Sprintf( - "failed to change status to 'completed' for anchor [%s]: add status error", al.Anchor().String())) + "failed to change status to 'completed' for anchor [%s]", al.Anchor().String())) }) t.Run("status already completed", func(t *testing.T) { @@ -731,8 +731,7 @@ func TestWitnessProofHandler(t *testing.T) { proofHandler := New(providers, ps, datauri.MediaTypeDataURIGzipBase64, defaultClockSkew) err = proofHandler.HandleProof(context.Background(), witness1IRI, al.Anchor().String(), expiryTime, []byte(witnessProofJSONWebSignature)) - require.Error(t, err) - require.Contains(t, err.Error(), "status not found for anchor [hl:uEiABbKSeh3rb4MOjS1Era2_62bBPwP9EytPSg5tIkNYiSQ]") + require.NoError(t, err) }) t.Run("error - store error", func(t *testing.T) { diff --git a/pkg/anchor/witness/policy/inspector/inspector.go b/pkg/anchor/witness/policy/inspector/inspector.go index b69251ce7..be0bd643e 100644 --- a/pkg/anchor/witness/policy/inspector/inspector.go +++ b/pkg/anchor/witness/policy/inspector/inspector.go @@ -127,7 +127,7 @@ func (c *Inspector) postOfferActivity(ctx context.Context, anchorLink *linkset.L return fmt.Errorf("failed to post additional offer for anchor[%s]: %w", anchorLink.Anchor(), err) } - logger.Info("Created additional 'Offer' activity for anchor", logfields.WithAnchorURI(anchorLink.Anchor()), + logger.Info("Posted 'Offer' activity to additional witnesses", logfields.WithAnchorURI(anchorLink.Anchor()), logfields.WithActivityID(activityID), logfields.WithWitnessURIs(witnessesIRI...)) return nil diff --git a/pkg/anchor/witness/policy/selector/random/selector.go b/pkg/anchor/witness/policy/selector/random/selector.go index 4af2225b7..851d3b8ae 100644 --- a/pkg/anchor/witness/policy/selector/random/selector.go +++ b/pkg/anchor/witness/policy/selector/random/selector.go @@ -12,6 +12,7 @@ import ( "time" "github.com/trustbloc/orb/pkg/anchor/witness/proof" + orberrors "github.com/trustbloc/orb/pkg/errors" ) // New returns new random selector. @@ -29,7 +30,8 @@ func (s *Selector) Select(witnesses []*proof.Witness, n int) ([]*proof.Witness, l := len(witnesses) if n > l { - return nil, fmt.Errorf("unable to select %d witnesses from witness array of length %d", n, len(witnesses)) + return nil, fmt.Errorf("unable to select %d witnesses from witness array of length %d: %w", + n, len(witnesses), orberrors.ErrWitnessesNotFound) } if n == l { diff --git a/pkg/anchor/witness/policy/selector/random/selector_test.go b/pkg/anchor/witness/policy/selector/random/selector_test.go index 79c2b8cb7..98f87d067 100644 --- a/pkg/anchor/witness/policy/selector/random/selector_test.go +++ b/pkg/anchor/witness/policy/selector/random/selector_test.go @@ -7,11 +7,13 @@ SPDX-License-Identifier: Apache-2.0 package random import ( + "errors" "testing" "github.com/stretchr/testify/require" "github.com/trustbloc/orb/pkg/anchor/witness/proof" + orberrors "github.com/trustbloc/orb/pkg/errors" ) func TestNew(t *testing.T) { @@ -51,6 +53,7 @@ func TestSelect(t *testing.T) { selected, err := s.Select(nil, 2) require.Error(t, err) require.Empty(t, selected) + require.True(t, errors.Is(err, orberrors.ErrWitnessesNotFound)) require.Contains(t, err.Error(), "unable to select 2 witnesses from witness array of length 0") }) } diff --git a/pkg/anchor/writer/writer.go b/pkg/anchor/writer/writer.go index d201ede57..0a01cd141 100644 --- a/pkg/anchor/writer/writer.go +++ b/pkg/anchor/writer/writer.go @@ -652,8 +652,8 @@ func (c *Writer) postOfferActivity(ctx context.Context, anchorLink *linkset.Link return fmt.Errorf("store witnesses: %w", err) } - logger.Debug("Created 'Offer' activity for anchor", logfields.WithAnchorURI(anchorLink.Anchor()), - logfields.WithActivityID(activityID)) + logger.Info("Posted 'Offer' activity to witnesses", logfields.WithAnchorURI(anchorLink.Anchor()), + logfields.WithActivityID(activityID), logfields.WithWitnessURIs(selectedWitnessesIRIs...)) if len(selectedWitnessesIRIs) == 1 { // The Offer was posted only to the public IRI. This means that it will be persisted diff --git a/pkg/document/updatehandler/decorator/decorator.go b/pkg/document/updatehandler/decorator/decorator.go index 719170221..c83bbdc41 100644 --- a/pkg/document/updatehandler/decorator/decorator.go +++ b/pkg/document/updatehandler/decorator/decorator.go @@ -77,7 +77,9 @@ type metricsProvider interface { } // Decorate will validate local state against anchor origin for update/recover/deactivate. -func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Operation, error) { //nolint:cyclop +// +//nolint:funlen +func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Operation, error) { startTime := time.Now() defer func() { @@ -132,7 +134,8 @@ func (d *OperationDecorator) Decorate(op *operation.Operation) (*operation.Opera anchorOriginResponse, err := d.resolveDocumentFromAnchorOrigin(ctx, canonicalID, localAnchorOrigin) if err != nil { - logger.Debugc(ctx, "Failed to resolve document from anchor origin", logfields.WithDID(canonicalID), log.WithError(err)) + logger.Warnc(ctx, "Failed to resolve document from anchor origin. The local document will be used.", + logfields.WithDID(canonicalID), log.WithError(err)) return op, nil } diff --git a/pkg/store/anchorstatus/store.go b/pkg/store/anchorstatus/store.go index 118a1e279..eac1f53c3 100644 --- a/pkg/store/anchorstatus/store.go +++ b/pkg/store/anchorstatus/store.go @@ -125,10 +125,12 @@ func (s *Store) AddStatus(anchorID string, status proof.AnchorIndexStatus) error } if status == proof.AnchorIndexStatusCompleted { + logger.Info("Anchor has completed processing", logfields.WithAnchorURIString(anchorID)) + delErr := s.deleteInProcessStatus(anchorID) if delErr != nil { // no need to stop processing for this - logger.Debug("Failed to delete in-process statuses after receiving complete status", + logger.Warn("Failed to delete in-process statuses after receiving complete status", log.WithError(delErr)) } } @@ -276,7 +278,7 @@ func (s *Store) GetStatus(anchorID string) (proof.AnchorIndexStatus, error) { } if !ok { - return "", fmt.Errorf("status not found for anchor [%s]", anchorID) + return "", fmt.Errorf("status not found for anchor [%s]: %w", anchorID, orberrors.ErrContentNotFound) } var status proof.AnchorIndexStatus @@ -352,12 +354,7 @@ func (s *Store) CheckInProcessAnchors() { e = s.processIndex(status.AnchorID) if e != nil { - if errors.Is(e, orberrors.ErrWitnessesNotFound) { - // This is not a critical error. Log it as info. - logger.Info("Failed to process anchor index", log.WithError(e)) - } else { - logger.Error("Failed to process anchor index", log.WithError(e)) - } + logger.Error("Failed to process anchor index", log.WithError(e)) } more, e = iterator.Next() @@ -381,20 +378,50 @@ func (s *Store) processIndex(encodedAnchorID string) error { status, err := s.GetStatus(anchorID) if err != nil { - return fmt.Errorf("failed to get status for anchorID[%s]: %w", anchorID, err) + if !errors.Is(err, orberrors.ErrContentNotFound) { + return fmt.Errorf("failed to get status for anchorID[%s]: %w", anchorID, err) + } + + logger.Info("Status not found for anchor. No further processing will be performed for this anchor.", + logfields.WithAnchorURIString(anchorID)) + + return nil } if status == proof.AnchorIndexStatusCompleted { - // already completed - nothing to do + logger.Info("Anchor status is already set to completed. No processing required.", + logfields.WithAnchorURIString(anchorID)) + + // Delete all in-process status records + err = s.deleteInProcessStatus(anchorID) + if err != nil { + logger.Warn("Error deleting in process anchor status", log.WithError(err), + logfields.WithAnchorURIString(anchorID)) + } + return nil } err = s.policyHandler.CheckPolicy(anchorID) if err != nil { - return fmt.Errorf("failed to re-evaluate policy for anchorID[%s]: %w", anchorID, err) + if !errors.Is(err, orberrors.ErrWitnessesNotFound) { + return fmt.Errorf("failed to re-evaluate policy for anchorID[%s]: %w", anchorID, err) + } + + logger.Info("No additional witnesses found for anchor. No further processing will be performed for this anchor.", + logfields.WithAnchorURIString(anchorID), log.WithError(err)) + + // Delete all in-process status records + err = s.deleteInProcessStatus(anchorID) + if err != nil { + logger.Warn("Error deleting in process anchor status", log.WithError(err), + logfields.WithAnchorURIString(anchorID)) + } + + return nil } - logger.Debug("Successfully re-evaluated policy for anchor", logfields.WithAnchorURIString(anchorID)) + logger.Info("Successfully re-evaluated policy for anchor", logfields.WithAnchorURIString(anchorID)) return nil } diff --git a/pkg/store/anchorstatus/store_test.go b/pkg/store/anchorstatus/store_test.go index 66559c996..2488e266d 100644 --- a/pkg/store/anchorstatus/store_test.go +++ b/pkg/store/anchorstatus/store_test.go @@ -166,6 +166,7 @@ func TestStore_Get(t *testing.T) { status, err := s.GetStatus(vcID) require.Error(t, err) require.Empty(t, status) + require.True(t, errors.Is(err, orberrors.ErrContentNotFound)) require.Contains(t, err.Error(), "not found") }) @@ -291,6 +292,12 @@ func TestStore_CheckInProcessAnchors(t *testing.T) { WithCheckStatusAfterTime(time.Second)) require.NoError(t, err) + err = s.AddStatus(vcID, proof.AnchorIndexStatusCompleted) + require.NoError(t, err) + + err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess) + require.NoError(t, err) + err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess) require.NoError(t, err) @@ -581,7 +588,7 @@ func TestStore_processIndex(t *testing.T) { require.Contains(t, err.Error(), "failed to re-evaluate policy for anchorID[vcID]: policy error") }) - t.Run("error - status not found", func(t *testing.T) { + t.Run("ignore - status not found", func(t *testing.T) { mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t) defer stopMongo() @@ -593,8 +600,30 @@ func TestStore_processIndex(t *testing.T) { require.NoError(t, err) err = s.processIndex(encoder.EncodeToString([]byte(vcID))) - require.Error(t, err) - require.Contains(t, err.Error(), "failed to get status for anchorID[vcID]") + require.NoError(t, err) + }) + + t.Run("ignore - status completed", func(t *testing.T) { + mongoDBConnString, stopMongo := mongodbtestutil.StartMongoDB(t) + defer stopMongo() + + mongoDBProvider, err := mongodb.NewProvider(mongoDBConnString) + require.NoError(t, err) + + s, err := New(mongoDBProvider, testutil.GetExpiryService(t), maxWitnessDelayTime) + require.NoError(t, err) + + err = s.AddStatus(vcID, proof.AnchorIndexStatusCompleted) + require.NoError(t, err) + + err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess) + require.NoError(t, err) + + err = s.AddStatus(vcID, proof.AnchorIndexStatusInProcess) + require.NoError(t, err) + + err = s.processIndex(encoder.EncodeToString([]byte(vcID))) + require.NoError(t, err) }) } diff --git a/pkg/vct/logmonitoring/monitor.go b/pkg/vct/logmonitoring/monitor.go index 3696c100b..079059ed0 100644 --- a/pkg/vct/logmonitoring/monitor.go +++ b/pkg/vct/logmonitoring/monitor.go @@ -480,7 +480,7 @@ func min(a, b uint64) uint64 { func (c *Client) verifySTHConsistency(logURL string, storedSTH, sth *command.GetSTHResponse, vctClient *vct.Client) error { if storedSTH.TreeSize > 0 { - logger.Debug("Getting STH consistency for stored[%d] and latest[%d]", + logger.Debug("Getting STH consistency for stored and latest", logfields.WithLogURLString(logURL), zap.Uint64("stored-size", storedSTH.TreeSize), logfields.WithSizeUint64(sth.TreeSize)) @@ -489,7 +489,7 @@ func (c *Client) verifySTHConsistency(logURL string, storedSTH, sth *command.Get return fmt.Errorf("get STH consistency: %w", err) } - logger.Debug("Found %d consistencies in STH consistency response", logfields.WithLogURLString(logURL), + logger.Debug("Found consistencies in STH consistency response", logfields.WithLogURLString(logURL), zap.Int("consistency-size", len(sthConsistency.Consistency))) err = c.logVerifier.VerifyConsistencyProof(int64(storedSTH.TreeSize), int64(sth.TreeSize),