Skip to content

Commit

Permalink
fix: Don't process anchor if it has already been processed
Browse files Browse the repository at this point in the history
Just before posting an anchor to the message queue, check if the anchor has already been processed. This greatly reduces the number of duplicate anchors sent to the queue and will improve overall performance.

closes trustbloc#1565

Signed-off-by: Bob Stasyszyn <Bob.Stasyszyn@securekey.com>
  • Loading branch information
bstasyszyn committed May 29, 2023
1 parent 23f5800 commit e555fc2
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/activitypub/httpsig/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSigner(cfg SignerConfig, cr crypto, km keyManager, keyID string) *Signer
func (s *Signer) SignRequest(pubKeyID string, req *http.Request) error {
req.Header.Add(dateHeader, date())

logger.Debug("Signing request for %s. Public key ID [%s]. Headers: %s", logfields.WithRequestURLString(req.RequestURI),
logger.Debug("Signing request", logfields.WithRequestURLString(req.RequestURI),
logfields.WithKeyID(pubKeyID), logfields.WithRequestHeaders(req.Header))

if err := s.signer().Sign(pubKeyID, req); err != nil {
Expand Down
98 changes: 71 additions & 27 deletions pkg/anchor/handler/credential/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,27 @@ func New(anchorPublisher anchorPublisher, casResolver casResolver,
func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, anchorRef, source *url.URL,
anchorEvent *vocab.AnchorEventType,
) error {
logger.Debug("Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorEventURI(anchorRef))
logger.Debugc(ctx, "Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorURI(anchorRef))

ok, err := h.isAnchorProcessed(anchorRef)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", anchorRef, err)
}

if ok {
logger.Infoc(ctx, "Anchor was already processed or processing is pending", logfields.WithAnchorURI(anchorRef))

return nil
}

var anchorLinksetBytes []byte

if anchorEvent != nil {
var err error
var e error

anchorLinksetBytes, err = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if err != nil {
return fmt.Errorf("marshal anchor linkset: %w", err)
anchorLinksetBytes, e = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if e != nil {
return fmt.Errorf("marshal anchor linkset: %w", e)
}
}

Expand Down Expand Up @@ -130,7 +141,7 @@ func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, ancho
attributedTo = actor.String()
}

logger.Info("Processing anchor", logfields.WithAnchorEventURI(anchorRef))
logger.Infoc(ctx, "Processing anchor", logfields.WithAnchorURI(anchorRef))

var alternateSources []string

Expand Down Expand Up @@ -185,7 +196,37 @@ func (h *AnchorEventHandler) processAnchorEvent(ctx context.Context, anchorInfo
return fmt.Errorf("validate credential subject for anchor [%s]: %w", anchorLink.Anchor(), err)
}

return h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
hl, err := url.Parse(anchorInfo.Hashlink)
if err != nil {
return fmt.Errorf("parse anchor hashlink [%s]: %w", anchorInfo.Hashlink, err)
}

// Check again if the anchor was processed. This further limits race conditions, especially
// when many anchors with the same parent are being processed concurrently.
processed, err := h.isAnchorProcessed(hl)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", hl, err)
}

if processed {
logger.Infoc(ctx, "Anchor was already processed or processing is pending.", logfields.WithAnchorURI(hl))

return nil
}

logger.Debugc(ctx, "Storing pending anchor link", logfields.WithAnchorURI(hl))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{hl})
if err != nil {
return fmt.Errorf("store pending anchor link: %w", err)
}

err = h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
if err != nil {
return fmt.Errorf("publish anchor %s: %w", hl, err)
}

return nil
}

// ensureParentAnchorsAreProcessed checks all ancestors (parents, grandparents, etc.) of the given anchor event
Expand All @@ -196,21 +237,23 @@ func (h *AnchorEventHandler) ensureParentAnchorsAreProcessed(ctx context.Context
return fmt.Errorf("get unprocessed parent anchors for [%s]: %w", anchorRef, err)
}

if len(unprocessedParents) > 0 {
logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))
if len(unprocessedParents) == 0 {
return nil
}

logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))

spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()
spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()

for _, parentAnchorInfo := range unprocessedParents {
logger.Infoc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))
for _, parentAnchorInfo := range unprocessedParents {
logger.Debugc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))

err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
}

Expand Down Expand Up @@ -263,12 +306,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchors(hl string, anchorLink *
}

logger.Debug("Adding parent of anchor event to the unprocessed list",
logfields.WithAnchorEventURIString(hl), logfields.WithParentURI(parentHL))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{parentHL})
if err != nil {
return nil, fmt.Errorf("store pending parent anchor %s: %w", parentHL, err)
}
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

// Add the parent to the head of the list since it needs to be processed first.
unprocessed = append([]*anchorInfo{info}, unprocessed...)
Expand All @@ -294,7 +332,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchor(hl string, parentHL *url
}

if isProcessed {
logger.Debug("Parent of anchor was already processed",
logger.Debug("Parent of anchor was already processed or processing is pending",
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

return true, nil, nil
Expand Down Expand Up @@ -361,7 +399,13 @@ func (h *AnchorEventHandler) isAnchorProcessed(hl *url.URL) (bool, error) {
return false, fmt.Errorf("get anchor event: %w", err)
}

return len(links) > 0, nil
for _, link := range links {
if link.String() == hl.String() {
return true, nil
}
}

return false, nil
}

type anchorInfo struct {
Expand Down
106 changes: 98 additions & 8 deletions pkg/anchor/handler/credential/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,94 @@ func TestGetUnprocessedParentAnchorEvents(t *testing.T) {
}

func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}
t.Run("success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)
handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

t.Run("success", func(t *testing.T) {
anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

err := handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{},
ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: ls.Link().Anchor().String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("already processed -> success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

anchorLinkStore.GetProcessedAndPendingLinksReturns([]*url.URL{anchorRef}, nil)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("is processed -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

errExpected := errors.New("injected query error")

anchorLinkStore.GetProcessedAndPendingLinksReturns(nil, errExpected)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.Error(t, err)
require.Contains(t, err.Error(), errExpected.Error())
})

t.Run("no replies -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetNoReplies), anchorLinkset))

Expand All @@ -323,6 +392,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid original content -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidContent), anchorLinkset))

Expand All @@ -335,6 +411,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("unsupported profile -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetUnsupportedProfile), anchorLinkset))

Expand All @@ -347,6 +430,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid anchor credential -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidVC), anchorLinkset))

Expand Down
Loading

0 comments on commit e555fc2

Please sign in to comment.