Skip to content

Commit

Permalink
All changes to date including pause requests & start paused, along wi…
Browse files Browse the repository at this point in the history
…th new adds for cleanups and checking of execution (#75)

* WIP

* feat(graphsync): pause/unpause requests

Allow graphsync requests to be paused and unpaused via request cancelling and DoNotSendCIDs
extension

* fix(requestmanager): refactor executor

remove extraneous allocation of closure functions

* feat(graphsync): support external request pauses

allow pausing requests imperatively via PauseRequest function

* fix(lint): fix lint errors

* feat(responsemanager): start requests paused

add the ability for a hook to pause a response right when it's first received

* feat(responsemanager): improve cancellation UX

provide a mechanism for responders to learn a requestor cancelled a request and for requestors to
learn a request was cancelled

* feat(requestmanager): process request cancelled status

process the responder returning a request cancelled error code and also support sentinel errors

* feat(executor): refactor to remove loader

remove loader, also only fire restart request as needed

* fix(asyncloader): load requests synchronously when possible

* fix(responsemanager): fix external pause

Fix external pauses missing a block

* fix(responsemanager): do not delay complete listener

Run complete listener in same thread as response processing, making it less susceptable to
interruption via cancel

* fix(responsemanager): fix context check

fix checking for context cancellation errors based off of the way ipld-prime does not wrap errors

* fix(responsemanager): more precise cancel

make cancels only get recorded if actual blocks are not sent -- otherwise the request is considered
complete -- and the complete hook always runs

* fix(requestmanager): handle non processed pauses

Handler the case where a pause is requested but never actually takes place

* refactor(responsemanager): handle cancels, correctly this time

Properly handle cancels for both paused and unpaused states

* fix(errors): remove regex cause it appears to be very slow

* fix(traverser): fix race condition for shutdown

make sure that the traverser is finished in the request executor

* fix(deps): mod tidy

* fix(executor): add back network error
  • Loading branch information
hannahhoward committed Jul 8, 2020
1 parent 0e23085 commit caa872f
Show file tree
Hide file tree
Showing 26 changed files with 1,922 additions and 661 deletions.
53 changes: 53 additions & 0 deletions graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,45 @@ const (
RequestFailedLegal = ResponseStatusCode(33)
// RequestFailedContentNotFound means the respondent does not have the content.
RequestFailedContentNotFound = ResponseStatusCode(34)
// RequestCancelled means the responder was processing the request but decided to top, for whatever reason
RequestCancelled = ResponseStatusCode(35)
)

// RequestFailedBusyErr is an error message received on the error channel when the peer is busy
type RequestFailedBusyErr struct{}

func (e RequestFailedBusyErr) Error() string {
return "Request Failed - Peer Is Busy"
}

// RequestFailedContentNotFoundErr is an error message received on the error channel when the content is not found
type RequestFailedContentNotFoundErr struct{}

func (e RequestFailedContentNotFoundErr) Error() string {
return "Request Failed - Content Not Found"
}

// RequestFailedLegalErr is an error message received on the error channel when the request fails for legal reasons
type RequestFailedLegalErr struct{}

func (e RequestFailedLegalErr) Error() string {
return "Request Failed - For Legal Reasons"
}

// RequestFailedUnknownErr is an error message received on the error channel when the request fails for unknown reasons
type RequestFailedUnknownErr struct{}

func (e RequestFailedUnknownErr) Error() string {
return "Request Failed - Unknown Reason"
}

// RequestCancelledErr is an error message received on the error channel that indicates the responder cancelled a request
type RequestCancelledErr struct{}

func (e RequestCancelledErr) Error() string {
return "Request Failed - Responder Cancelled"
}

var (
// ErrExtensionAlreadyRegistered means a user extension can be registered only once
ErrExtensionAlreadyRegistered = errors.New("extension already registered")
Expand Down Expand Up @@ -158,6 +195,7 @@ type IncomingRequestHookActions interface {
UseLinkTargetNodeStyleChooser(traversal.LinkTargetNodeStyleChooser)
TerminateWithError(error)
ValidateRequest()
PauseResponse()
}

// OutgoingBlockHookActions are actions that an outgoing block hook can take to
Expand Down Expand Up @@ -187,6 +225,7 @@ type IncomingResponseHookActions interface {
type IncomingBlockHookActions interface {
TerminateWithError(error)
UpdateRequestWithExtensions(...ExtensionData)
PauseRequest()
}

// RequestUpdatedHookActions are actions that can be taken in a request updated hook to
Expand Down Expand Up @@ -236,6 +275,9 @@ type OnRequestUpdatedHook func(p peer.ID, request RequestData, updateRequest Req
// OnResponseCompletedListener provides a way to listen for when responder has finished serving a response
type OnResponseCompletedListener func(p peer.ID, request RequestData, status ResponseStatusCode)

// OnRequestorCancelledListener provides a way to listen for responses the requestor canncels
type OnRequestorCancelledListener func(p peer.ID, request RequestData)

// UnregisterHookFunc is a function call to unregister a hook that was previously registered
type UnregisterHookFunc func()

Expand Down Expand Up @@ -268,6 +310,17 @@ type GraphExchange interface {
// RegisterCompletedResponseListener adds a listener on the responder for completed responses
RegisterCompletedResponseListener(listener OnResponseCompletedListener) UnregisterHookFunc

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
RegisterRequestorCancelledListener(listener OnRequestorCancelledListener) UnregisterHookFunc

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
UnpauseRequest(RequestID, ...ExtensionData) error

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
PauseRequest(RequestID) error

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
// Can also send extensions with unpause
UnpauseResponse(peer.ID, RequestID, ...ExtensionData) error
Expand Down
102 changes: 61 additions & 41 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,27 @@ const maxRecursionDepth = 100
// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
type GraphSync struct {
network gsnet.GraphSyncNetwork
loader ipld.Loader
storer ipld.Storer
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
network gsnet.GraphSyncNetwork
loader ipld.Loader
storer ipld.Storer
requestManager *requestmanager.RequestManager
responseManager *responsemanager.ResponseManager
asyncLoader *asyncloader.AsyncLoader
peerResponseManager *peerresponsemanager.PeerResponseManager
peerTaskQueue *peertaskqueue.PeerTaskQueue
peerManager *peermanager.PeerMessageManager
incomingRequestHooks *responderhooks.IncomingRequestHooks
outgoingBlockHooks *responderhooks.OutgoingBlockHooks
requestUpdatedHooks *responderhooks.RequestUpdatedHooks
completedResponseListeners *responderhooks.CompletedResponseListeners
requestorCancelledListeners *responderhooks.RequestorCancelledListeners
incomingResponseHooks *requestorhooks.IncomingResponseHooks
outgoingRequestHooks *requestorhooks.OutgoingRequestHooks
incomingBlockHooks *requestorhooks.IncomingBlockHooks
persistenceOptions *persistenceoptions.PersistenceOptions
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -88,29 +89,31 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
outgoingBlockHooks := responderhooks.NewBlockHooks()
requestUpdatedHooks := responderhooks.NewUpdateHooks()
completedResponseListeners := responderhooks.NewCompletedResponseListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners)
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
network: network,
loader: loader,
storer: storer,
asyncLoader: asyncLoader,
requestManager: requestManager,
peerManager: peerManager,
persistenceOptions: persistenceOptions,
incomingRequestHooks: incomingRequestHooks,
outgoingBlockHooks: outgoingBlockHooks,
requestUpdatedHooks: requestUpdatedHooks,
completedResponseListeners: completedResponseListeners,
requestorCancelledListeners: requestorCancelledListeners,
incomingResponseHooks: incomingResponseHooks,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
}

for _, option := range options {
Expand Down Expand Up @@ -177,6 +180,23 @@ func (gs *GraphSync) RegisterIncomingBlockHook(hook graphsync.OnIncomingBlockHoo
return gs.incomingBlockHooks.Register(hook)
}

// RegisterRequestorCancelledListener adds a listener on the responder for
// responses cancelled by the requestor
func (gs *GraphSync) RegisterRequestorCancelledListener(listener graphsync.OnRequestorCancelledListener) graphsync.UnregisterHookFunc {
return gs.requestorCancelledListeners.Register(listener)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (gs *GraphSync) UnpauseRequest(requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.requestManager.UnpauseRequest(requestID, extensions...)
}

// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (gs *GraphSync) PauseRequest(requestID graphsync.RequestID) error {
return gs.requestManager.PauseRequest(requestID)
}

// UnpauseResponse unpauses a response that was paused in a block hook based on peer ID and request ID
func (gs *GraphSync) UnpauseResponse(p peer.ID, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
return gs.responseManager.UnpauseResponse(p, requestID, extensions...)
Expand Down
46 changes: 46 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,52 @@ func TestPauseResume(t *testing.T) {
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")

}
func TestPauseResumeRequest(t *testing.T) {
// create network
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
td := newGsTestData(ctx, t)

// initialize graphsync on first node to make requests
requestor := td.GraphSyncHost1()

// setup receiving peer to just record message coming in
blockChainLength := 100
blockSize := 100
blockChain := testutil.SetupBlockChain(ctx, t, td.loader2, td.storer2, uint64(blockSize), blockChainLength)

// initialize graphsync on second node to response to requests
_ = td.GraphSyncHost2()

stopPoint := 50
blocksReceived := 0
requestIDChan := make(chan graphsync.RequestID, 1)
requestor.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
select {
case requestIDChan <- responseData.RequestID():
default:
}
blocksReceived++
if blocksReceived == stopPoint {
hookActions.PauseRequest()
}
})

progressChan, errChan := requestor.Request(ctx, td.host2.ID(), blockChain.TipLink, blockChain.Selector(), td.extension)

blockChain.VerifyResponseRange(ctx, progressChan, 0, stopPoint-1)
timer := time.NewTimer(100 * time.Millisecond)
testutil.AssertDoesReceiveFirst(t, timer.C, "should pause request", progressChan)

requestID := <-requestIDChan
err := requestor.UnpauseRequest(requestID, td.extensionUpdate)
require.NoError(t, err)

blockChain.VerifyRemainder(ctx, progressChan, stopPoint-1)
testutil.VerifyEmptyErrors(ctx, t, errChan)
require.Len(t, td.blockStore1, blockChainLength, "did not store all blocks")
}

func TestPauseResumeViaUpdate(t *testing.T) {
// create network
Expand Down
38 changes: 32 additions & 6 deletions ipldutil/traverser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (

var defaultVisitor traversal.AdvVisitFn = func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil }

// ContextCancelError is a sentinel that indicates the passed in context
// was cancelled
type ContextCancelError struct{}

func (cp ContextCancelError) Error() string {
return "Context cancelled"
}

// TraversalBuilder defines parameters for an iterative traversal
type TraversalBuilder struct {
Root ipld.Link
Expand All @@ -31,6 +39,8 @@ type Traverser interface {
Advance(reader io.Reader) error
// Error errors the traversal by returning the given error as the result of the next IPLD load
Error(err error)
// Shutdown cancels the traversal
Shutdown(ctx context.Context)
}

type state struct {
Expand All @@ -47,16 +57,20 @@ type nextResponse struct {

// Start initiates the traversal (run in a go routine because the regular
// selector traversal expects a call back)
func (tb TraversalBuilder) Start(ctx context.Context) Traverser {
func (tb TraversalBuilder) Start(parentCtx context.Context) Traverser {
ctx, cancel := context.WithCancel(parentCtx)
t := &traverser{
parentCtx: parentCtx,
ctx: ctx,
cancel: cancel,
root: tb.Root,
selector: tb.Selector,
visitor: defaultVisitor,
chooser: defaultChooser,
awaitRequest: make(chan struct{}, 1),
stateChan: make(chan state, 1),
responses: make(chan nextResponse),
stopped: make(chan struct{}),
}
if tb.Visitor != nil {
t.visitor = tb.Visitor
Expand All @@ -71,7 +85,9 @@ func (tb TraversalBuilder) Start(ctx context.Context) Traverser {
// traverser is a class to perform a selector traversal that stops every time a new block is loaded
// and waits for manual input (in the form of advance or error)
type traverser struct {
parentCtx context.Context
ctx context.Context
cancel func()
root ipld.Link
selector ipld.Node
visitor traversal.AdvVisitFn
Expand All @@ -83,6 +99,7 @@ type traverser struct {
awaitRequest chan struct{}
stateChan chan state
responses chan nextResponse
stopped chan struct{}
}

func (t *traverser) checkState() {
Expand All @@ -91,7 +108,7 @@ func (t *traverser) checkState() {
select {
case <-t.ctx.Done():
t.isDone = true
t.completionErr = errors.New("Context cancelled")
t.completionErr = ContextCancelError{}
case newState := <-t.stateChan:
t.isDone = newState.isDone
t.completionErr = newState.completionErr
Expand All @@ -116,15 +133,16 @@ func (t *traverser) start() {
case t.awaitRequest <- struct{}{}:
}
go func() {
defer close(t.stopped)
loader := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (io.Reader, error) {
select {
case <-t.ctx.Done():
return nil, errors.New("Context cancelled")
return nil, ContextCancelError{}
case t.stateChan <- state{false, nil, lnk, lnkCtx}:
}
select {
case <-t.ctx.Done():
return nil, errors.New("Context cancelled")
return nil, ContextCancelError{}
case response := <-t.responses:
return response.input, response.err
}
Expand Down Expand Up @@ -158,6 +176,14 @@ func (t *traverser) start() {
}()
}

func (t *traverser) Shutdown(ctx context.Context) {
t.cancel()
select {
case <-ctx.Done():
case <-t.stopped:
}
}

// IsComplete returns true if a traversal is complete
func (t *traverser) IsComplete() (bool, error) {
t.checkState()
Expand All @@ -179,12 +205,12 @@ func (t *traverser) Advance(reader io.Reader) error {
}
select {
case <-t.ctx.Done():
return errors.New("context cancelled")
return ContextCancelError{}
case t.awaitRequest <- struct{}{}:
}
select {
case <-t.ctx.Done():
return errors.New("context cancelled")
return ContextCancelError{}
case t.responses <- nextResponse{reader, nil}:
}
return nil
Expand Down
Loading

0 comments on commit caa872f

Please sign in to comment.