diff --git a/graphsync.go b/graphsync.go index 83aa8202..6b4d40e9 100644 --- a/graphsync.go +++ b/graphsync.go @@ -236,6 +236,12 @@ type RequestUpdatedHookActions interface { UnpauseResponse() } +// ResponseCompletedHookActions are actions that can be taken in response completed hook to add a +// final extension on a response +type ResponseCompletedHookActions interface { + SendExtensionData(ExtensionData) +} + // OnIncomingRequestHook is a hook that runs each time a new request is received. // It receives the peer that sent the request and all data about the request. // It receives an interface for customizing the response to this request @@ -272,8 +278,8 @@ type OnOutgoingBlockHook func(p peer.ID, request RequestData, block BlockData, h // It receives an interface to taking further action on the response type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest RequestData, hookActions RequestUpdatedHookActions) -// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response -type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode) +// OnResponseCompletedHook provides a way to listen for when responder has finished serving a response +type OnResponseCompletedHook func(p peer.ID, request RequestData, status ResponseStatusCode, hookActions ResponseCompletedHookActions) // OnRequestorCancelledListener provides a way to listen for responses the requestor canncels type OnRequestorCancelledListener func(p peer.ID, request RequestData) @@ -307,8 +313,8 @@ type GraphExchange interface { // RegisterRequestUpdatedHook adds a hook that runs every time an update to a request is received RegisterRequestUpdatedHook(hook OnRequestUpdatedHook) UnregisterHookFunc - // RegisterCompletedResponseListener adds a listener on the responder for completed responses - RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc + // RegisterCompletedResponseHook adds a hook on the responder for completed responses + RegisterCompletedResponseHook(hook OnResponseCompletedHook) UnregisterHookFunc // RegisterRequestorCancelledListener adds a listener on the responder for // responses cancelled by the requestor diff --git a/impl/graphsync.go b/impl/graphsync.go index bff7070f..b684d8ef 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -41,7 +41,7 @@ type GraphSync struct { incomingRequestHooks *responderhooks.IncomingRequestHooks outgoingBlockHooks *responderhooks.OutgoingBlockHooks requestUpdatedHooks *responderhooks.RequestUpdatedHooks - completedResponseListeners *responderhooks.CompletedResponseListeners + completedResponseHooks *responderhooks.CompletedResponseHooks requestorCancelledListeners *responderhooks.RequestorCancelledListeners incomingResponseHooks *requestorhooks.IncomingResponseHooks outgoingRequestHooks *requestorhooks.OutgoingRequestHooks @@ -88,9 +88,9 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() requestUpdatedHooks := responderhooks.NewUpdateHooks() - completedResponseListeners := responderhooks.NewCompletedResponseListeners() + completedResponseHooks := responderhooks.NewCompletedResponseHooks() requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() - responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners) + responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseHooks, requestorCancelledListeners) unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, @@ -103,7 +103,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingRequestHooks: incomingRequestHooks, outgoingBlockHooks: outgoingBlockHooks, requestUpdatedHooks: requestUpdatedHooks, - completedResponseListeners: completedResponseListeners, + completedResponseHooks: completedResponseHooks, requestorCancelledListeners: requestorCancelledListeners, incomingResponseHooks: incomingResponseHooks, outgoingRequestHooks: outgoingRequestHooks, @@ -170,9 +170,9 @@ func (gs *GraphSync) RegisterRequestUpdatedHook(hook graphsync.OnRequestUpdatedH return gs.requestUpdatedHooks.Register(hook) } -// RegisterCompletedResponseListener adds a listener on the responder for completed responses -func (gs *GraphSync) RegisterCompletedResponseListener(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc { - return gs.completedResponseListeners.Register(listener) +// RegisterCompletedResponseHook adds a hook on the responder for completed responses +func (gs *GraphSync) RegisterCompletedResponseHook(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc { + return gs.completedResponseHooks.Register(hook) } // RegisterIncomingBlockHook adds a hook that runs when a block is received and validated (put in block store) diff --git a/impl/graphsync_test.go b/impl/graphsync_test.go index 80d8a47e..bc0a3390 100644 --- a/impl/graphsync_test.go +++ b/impl/graphsync_test.go @@ -191,14 +191,14 @@ func TestGraphsyncRoundTrip(t *testing.T) { // initialize graphsync on second node to response to requests responder := td.GraphSyncHost2() - var receivedResponseData []byte + var receivedResponseData [][]byte var receivedRequestData []byte requestor.RegisterIncomingResponseHook( func(p peer.ID, responseData graphsync.ResponseData, hookActions graphsync.IncomingResponseHookActions) { data, has := responseData.Extension(td.extensionName) if has { - receivedResponseData = data + receivedResponseData = append(receivedResponseData, data) } }) @@ -213,7 +213,8 @@ func TestGraphsyncRoundTrip(t *testing.T) { }) finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) - responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) { + hookActions.SendExtensionData(td.extensionFinal) select { case finalResponseStatusChan <- status: default: @@ -227,9 +228,11 @@ func TestGraphsyncRoundTrip(t *testing.T) { // verify extension roundtrip require.Equal(t, td.extensionData, receivedRequestData, "did not receive correct extension request data") - require.Equal(t, td.extensionResponseData, receivedResponseData, "did not receive correct extension response data") + require.Len(t, receivedResponseData, 2) + require.Equal(t, td.extensionResponseData, receivedResponseData[0], "did not receive correct extension response data") + require.Equal(t, td.extensionFinalData, receivedResponseData[1], "did not receive correct extension response data") - // verify listener + // verify completed hook var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedFull, finalResponseStatus) @@ -256,7 +259,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { responder := td.GraphSyncHost2() finalResponseStatusChan := make(chan graphsync.ResponseStatusCode, 1) - responder.RegisterCompletedResponseListener(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { + responder.RegisterCompletedResponseHook(func(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) { select { case finalResponseStatusChan <- status: default: @@ -278,7 +281,7 @@ func TestGraphsyncRoundTripPartial(t *testing.T) { require.Equal(t, tree.MiddleMapBlock.RawData(), td.blockStore1[tree.MiddleMapNodeLnk]) require.Equal(t, tree.RootBlock.RawData(), td.blockStore1[tree.RootNodeLnk]) - // verify listener + // verify completed hook var finalResponseStatus graphsync.ResponseStatusCode testutil.AssertReceive(ctx, t, finalResponseStatusChan, &finalResponseStatus, "should receive status") require.Equal(t, graphsync.RequestCompletedPartial, finalResponseStatus) @@ -820,6 +823,8 @@ type gsTestData struct { extensionResponse graphsync.ExtensionData extensionUpdateData []byte extensionUpdate graphsync.ExtensionData + extensionFinalData []byte + extensionFinal graphsync.ExtensionData } func newGsTestData(ctx context.Context, t *testing.T) *gsTestData { @@ -857,7 +862,11 @@ func newGsTestData(ctx context.Context, t *testing.T) *gsTestData { Name: td.extensionName, Data: td.extensionUpdateData, } - + td.extensionFinalData = testutil.RandomBytes(100) + td.extensionFinal = graphsync.ExtensionData{ + Name: td.extensionName, + Data: td.extensionFinalData, + } return td } diff --git a/responsemanager/hooks/listeners.go b/responsemanager/hooks/completehooks.go similarity index 55% rename from responsemanager/hooks/listeners.go rename to responsemanager/hooks/completehooks.go index 72cf58b3..9c7c57b5 100644 --- a/responsemanager/hooks/listeners.go +++ b/responsemanager/hooks/completehooks.go @@ -6,8 +6,8 @@ import ( peer "github.com/libp2p/go-libp2p-core/peer" ) -// CompletedResponseListeners is a set of listeners for completed responses -type CompletedResponseListeners struct { +// CompletedResponseHooks is a set of hooks for completed responses +type CompletedResponseHooks struct { pubSub *pubsub.PubSub } @@ -15,28 +15,50 @@ type internalCompletedResponseEvent struct { p peer.ID request graphsync.RequestData status graphsync.ResponseStatusCode + cha *completeHookActions } func completedResponseDispatcher(event pubsub.Event, subscriberFn pubsub.SubscriberFn) error { ie := event.(internalCompletedResponseEvent) - listener := subscriberFn.(graphsync.OnResponseCompletedListener) - listener(ie.p, ie.request, ie.status) + hook := subscriberFn.(graphsync.OnResponseCompletedHook) + hook(ie.p, ie.request, ie.status, ie.cha) return nil } -// NewCompletedResponseListeners returns a new list of completed response listeners -func NewCompletedResponseListeners() *CompletedResponseListeners { - return &CompletedResponseListeners{pubSub: pubsub.New(completedResponseDispatcher)} +// NewCompletedResponseHooks returns a new list of completed response hooks +func NewCompletedResponseHooks() *CompletedResponseHooks { + return &CompletedResponseHooks{pubSub: pubsub.New(completedResponseDispatcher)} } -// Register registers an listener for completed responses -func (crl *CompletedResponseListeners) Register(listener graphsync.OnResponseCompletedListener) graphsync.UnregisterHookFunc { - return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(listener)) +// Register registers an hook for completed responses +func (crl *CompletedResponseHooks) Register(hook graphsync.OnResponseCompletedHook) graphsync.UnregisterHookFunc { + return graphsync.UnregisterHookFunc(crl.pubSub.Subscribe(hook)) +} + +// ProcessCompleteHooks runs notifies all completed hooks that a response has completed +func (crl *CompletedResponseHooks) ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) CompleteResult { + ha := &completeHookActions{} + _ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status, ha}) + return ha.result() +} + +// CompleteResult is the outcome of running complete response hooks +type CompleteResult struct { + Extensions []graphsync.ExtensionData +} + +type completeHookActions struct { + extensions []graphsync.ExtensionData +} + +func (ha *completeHookActions) result() CompleteResult { + return CompleteResult{ + Extensions: ha.extensions, + } } -// NotifyCompletedListeners runs notifies all completed listeners that a response has completed -func (crl *CompletedResponseListeners) NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) { - _ = crl.pubSub.Publish(internalCompletedResponseEvent{p, request, status}) +func (ha *completeHookActions) SendExtensionData(ext graphsync.ExtensionData) { + ha.extensions = append(ha.extensions, ext) } // RequestorCancelledListeners is a set of listeners for when requestors cancel diff --git a/responsemanager/hooks/hooks_test.go b/responsemanager/hooks/hooks_test.go index 273a4f45..586c9483 100644 --- a/responsemanager/hooks/hooks_test.go +++ b/responsemanager/hooks/hooks_test.go @@ -385,3 +385,60 @@ func TestUpdateHookProcessing(t *testing.T) { }) } } + +func TestCompleteHookProcessing(t *testing.T) { + extensionData := testutil.RandomBytes(100) + extensionName := graphsync.ExtensionName("AppleSauce/McGee") + extension := graphsync.ExtensionData{ + Name: extensionName, + Data: extensionData, + } + extensionResponseData := testutil.RandomBytes(100) + extensionResponse := graphsync.ExtensionData{ + Name: extensionName, + Data: extensionResponseData, + } + + root := testutil.GenerateCids(1)[0] + requestID := graphsync.RequestID(rand.Int31()) + ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any) + request := gsmsg.NewRequest(requestID, root, ssb.Matcher().Node(), graphsync.Priority(0), extension) + status := graphsync.RequestCompletedFull + p := testutil.GeneratePeers(1)[0] + testCases := map[string]struct { + configure func(t *testing.T, completedHooks *hooks.CompletedResponseHooks) + assert func(t *testing.T, result hooks.CompleteResult) + }{ + "no hooks": { + assert: func(t *testing.T, result hooks.CompleteResult) { + require.Empty(t, result.Extensions) + }, + }, + "send extension data": { + configure: func(t *testing.T, completedHooks *hooks.CompletedResponseHooks) { + completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) { + _, found := requestData.Extension(extensionName) + if found { + hookActions.SendExtensionData(extensionResponse) + } + }) + }, + assert: func(t *testing.T, result hooks.CompleteResult) { + require.Len(t, result.Extensions, 1) + require.Contains(t, result.Extensions, extensionResponse) + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + completedHooks := hooks.NewCompletedResponseHooks() + if data.configure != nil { + data.configure(t, completedHooks) + } + result := completedHooks.ProcessCompleteHooks(p, request, status) + if data.assert != nil { + data.assert(t, result) + } + }) + } +} diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index 2b859d11..bf7fc3d0 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -26,7 +26,7 @@ type queryExecutor struct { requestHooks RequestHooks blockHooks BlockHooks updateHooks UpdateHooks - completedListeners CompletedListeners + completedHooks CompletedHooks cancelledListeners CancelledListeners peerManager PeerManager loader ipld.Loader @@ -71,13 +71,6 @@ func (qe *queryExecutor) processQueriesWorker() { continue } status, err := qe.executeTask(key, taskData) - _, isPaused := err.(hooks.ErrPaused) - isCancelled := err != nil && isContextErr(err) - if isCancelled { - qe.cancelledListeners.NotifyCancelledListeners(key.p, taskData.request) - } else if !isPaused { - qe.completedListeners.NotifyCompletedListeners(key.p, taskData.request, status) - } select { case qe.messages <- &finishTaskRequest{key, status, err}: case <-qe.ctx.Done(): @@ -207,23 +200,41 @@ func (qe *queryExecutor) executeQuery( }) return err }) - if err != nil { - _, isPaused := err.(hooks.ErrPaused) - if isPaused { - return graphsync.RequestPaused, err - } + + var status graphsync.ResponseStatusCode + _ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { + status = qe.closeRequest(transaction, err) if isContextErr(err) { - peerResponseSender.FinishWithCancel(request.ID()) - return graphsync.RequestCancelled, err - } - if err == errCancelledByCommand { - peerResponseSender.FinishWithError(request.ID(), graphsync.RequestCancelled) - return graphsync.RequestCancelled, err + qe.cancelledListeners.NotifyCancelledListeners(p, request) + } else if status != graphsync.RequestPaused { + result := qe.completedHooks.ProcessCompleteHooks(p, request, status) + for _, extension := range result.Extensions { + transaction.SendExtensionData(extension) + } } - peerResponseSender.FinishWithError(request.ID(), graphsync.RequestFailedUnknown) - return graphsync.RequestFailedUnknown, err + return nil + }) + return status, err +} + +func (qe *queryExecutor) closeRequest(peerResponseSender peerresponsemanager.PeerResponseTransactionSender, err error) graphsync.ResponseStatusCode { + _, isPaused := err.(hooks.ErrPaused) + if isPaused { + return graphsync.RequestPaused + } + if isContextErr(err) { + peerResponseSender.FinishWithCancel() + return graphsync.RequestCancelled + } + if err == errCancelledByCommand { + peerResponseSender.FinishWithError(graphsync.RequestCancelled) + return graphsync.RequestCancelled + } + if err != nil { + peerResponseSender.FinishWithError(graphsync.RequestFailedUnknown) + return graphsync.RequestFailedUnknown } - return peerResponseSender.FinishRequest(request.ID()), nil + return peerResponseSender.FinishRequest() } func (qe *queryExecutor) checkForUpdates( @@ -268,5 +279,5 @@ func (qe *queryExecutor) checkForUpdates( func isContextErr(err error) bool { // TODO: Match with errors.Is when https://github.com/ipld/go-ipld-prime/issues/58 is resolved - return strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error()) + return err != nil && strings.Contains(err.Error(), ipldutil.ContextCancelError{}.Error()) } diff --git a/responsemanager/responsemanager.go b/responsemanager/responsemanager.go index 46fd1568..ccdbd3ac 100644 --- a/responsemanager/responsemanager.go +++ b/responsemanager/responsemanager.go @@ -81,9 +81,9 @@ type UpdateHooks interface { ProcessUpdateHooks(p peer.ID, request graphsync.RequestData, update graphsync.RequestData) hooks.UpdateResult } -// CompletedListeners is an interface for notifying listeners that responses are complete -type CompletedListeners interface { - NotifyCompletedListeners(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) +// CompletedHooks is an interface for processing complete hooks +type CompletedHooks interface { + ProcessCompleteHooks(p peer.ID, request graphsync.RequestData, status graphsync.ResponseStatusCode) hooks.CompleteResult } // CancelledListeners is an interface for notifying listeners that requestor cancelled @@ -109,7 +109,7 @@ type ResponseManager struct { queryQueue QueryQueue updateHooks UpdateHooks cancelledListeners CancelledListeners - completedListeners CompletedListeners + completedHooks CompletedHooks messages chan responseManagerMessage workSignal chan struct{} qe *queryExecutor @@ -125,7 +125,7 @@ func New(ctx context.Context, requestHooks RequestHooks, blockHooks BlockHooks, updateHooks UpdateHooks, - completedListeners CompletedListeners, + completedHooks CompletedHooks, cancelledListeners CancelledListeners, ) *ResponseManager { ctx, cancelFn := context.WithCancel(ctx) @@ -135,7 +135,7 @@ func New(ctx context.Context, requestHooks: requestHooks, blockHooks: blockHooks, updateHooks: updateHooks, - completedListeners: completedListeners, + completedHooks: completedHooks, cancelledListeners: cancelledListeners, peerManager: peerManager, loader: loader, @@ -151,7 +151,7 @@ func New(ctx context.Context, peerManager: peerManager, queryQueue: queryQueue, updateHooks: updateHooks, - completedListeners: completedListeners, + completedHooks: completedHooks, cancelledListeners: cancelledListeners, messages: messages, workSignal: workSignal, @@ -368,13 +368,19 @@ func (rm *ResponseManager) cancelRequest(p peer.ID, requestID graphsync.RequestI if response.isPaused { peerResponseSender := rm.peerManager.SenderForPeer(key.p) - if selfCancel { - rm.completedListeners.NotifyCompletedListeners(p, response.request, graphsync.RequestCancelled) - peerResponseSender.FinishWithError(requestID, graphsync.RequestCancelled) - } else { - rm.cancelledListeners.NotifyCancelledListeners(p, response.request) - peerResponseSender.FinishWithCancel(requestID) - } + _ = peerResponseSender.Transaction(requestID, func(transaction peerresponsemanager.PeerResponseTransactionSender) error { + if selfCancel { + result := rm.completedHooks.ProcessCompleteHooks(p, response.request, graphsync.RequestCancelled) + for _, extension := range result.Extensions { + transaction.SendExtensionData(extension) + } + transaction.FinishWithError(graphsync.RequestCancelled) + } else { + rm.cancelledListeners.NotifyCancelledListeners(p, response.request) + transaction.FinishWithCancel() + } + return nil + }) delete(rm.inProgressResponses, key) response.cancelFn() return nil diff --git a/responsemanager/responsemanager_test.go b/responsemanager/responsemanager_test.go index 393d3dc6..09c55e96 100644 --- a/responsemanager/responsemanager_test.go +++ b/responsemanager/responsemanager_test.go @@ -210,7 +210,7 @@ func TestIncomingQuery(t *testing.T) { defer td.cancel() blks := td.blockChain.AllBlocks() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() @@ -231,7 +231,7 @@ func TestCancellationQueryInProgress(t *testing.T) { td := newTestData(t) defer td.cancel() blks := td.blockChain.AllBlocks() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) cancelledListenerCalled := make(chan struct{}, 1) td.cancelledListeners.Register(func(p peer.ID, request graphsync.RequestData) { @@ -284,7 +284,7 @@ func TestCancellationViaCommand(t *testing.T) { td := newTestData(t) defer td.cancel() blks := td.blockChain.AllBlocks() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) td.requestHooks.Register(selectorvalidator.SelectorValidator(100)) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) @@ -328,7 +328,7 @@ func TestEarlyCancellation(t *testing.T) { td := newTestData(t) defer td.cancel() td.queryQueue.popWait.Add(1) - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) @@ -352,7 +352,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("on its own, should fail validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() responseManager.ProcessRequests(td.ctx, td.p, td.requests) var lastRequest completedRequest @@ -363,7 +363,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if non validating hook succeeds, does not pass validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.SendExtensionData(td.extensionResponse) @@ -380,7 +380,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if validating hook succeeds, should pass validation", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -398,7 +398,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("if any hook fails, should fail", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -419,7 +419,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("hooks can be unregistered", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() unregister := td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -449,7 +449,7 @@ func TestValidationAndExtensions(t *testing.T) { defer td.cancel() obs := make(map[ipld.Link][]byte) oloader, _ := testutil.NewTestStore(obs) - responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, oloader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() // add validating hook -- so the request SHOULD succeed td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { @@ -483,7 +483,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("hooks can alter the node builder chooser", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() customChooserCallCount := 0 @@ -525,7 +525,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("do-not-send-cids extension", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -558,7 +558,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("test pause/resume", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -580,7 +580,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can send extension data", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -602,7 +602,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can send errors", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -619,7 +619,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can pause/unpause", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -654,7 +654,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can pause/unpause externally", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -693,7 +693,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("can pause/unpause", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -732,7 +732,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when unpaused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -769,7 +769,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when paused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -814,7 +814,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when unpaused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -848,7 +848,7 @@ func TestValidationAndExtensions(t *testing.T) { t.Run("when paused", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() @@ -891,22 +891,26 @@ func TestValidationAndExtensions(t *testing.T) { }) }) - t.Run("final response status listeners", func(t *testing.T) { + t.Run("final response status hook", func(t *testing.T) { td := newTestData(t) defer td.cancel() - responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedListeners, td.cancelledListeners) + responseManager := New(td.ctx, td.loader, td.peerManager, td.queryQueue, td.requestHooks, td.blockHooks, td.updateHooks, td.completedHooks, td.cancelledListeners) responseManager.Startup() td.requestHooks.Register(func(p peer.ID, requestData graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) { hookActions.ValidateRequest() }) statusChan := make(chan graphsync.ResponseStatusCode, 1) - td.completedListeners.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode) { + td.completedHooks.Register(func(p peer.ID, requestData graphsync.RequestData, status graphsync.ResponseStatusCode, hookActions graphsync.ResponseCompletedHookActions) { + hookActions.SendExtensionData(td.extensionResponse) select { case statusChan <- status: default: } }) responseManager.ProcessRequests(td.ctx, td.p, td.requests) + var receivedExtension sentExtension + testutil.AssertReceive(td.ctx, t, td.sentExtensions, &receivedExtension, "should send extension response") + require.Equal(t, td.extensionResponse, receivedExtension.extension, "incorrect extension response sent") var lastRequest completedRequest testutil.AssertReceive(td.ctx, t, td.completedRequestChan, &lastRequest, "should complete request") require.True(t, gsmsg.IsTerminalSuccessCode(lastRequest.result), "request should succeed") @@ -947,7 +951,7 @@ type testData struct { requestHooks *hooks.IncomingRequestHooks blockHooks *hooks.OutgoingBlockHooks updateHooks *hooks.RequestUpdatedHooks - completedListeners *hooks.CompletedResponseListeners + completedHooks *hooks.CompletedResponseHooks cancelledListeners *hooks.RequestorCancelledListeners } @@ -1006,7 +1010,7 @@ func newTestData(t *testing.T) testData { td.requestHooks = hooks.NewRequestHooks(td.peristenceOptions) td.blockHooks = hooks.NewBlockHooks() td.updateHooks = hooks.NewUpdateHooks() - td.completedListeners = hooks.NewCompletedResponseListeners() + td.completedHooks = hooks.NewCompletedResponseHooks() td.cancelledListeners = hooks.NewRequestorCancelledListeners() return td }