Skip to content

Commit

Permalink
Added possibility to customize subscription resolver timeout value
Browse files Browse the repository at this point in the history
The previous value was hard-coded to 1 second. This is problematic for resolver that
takes more time than this to return a result.

When parsing the schema, it's not possible to pass a custom value for the subscription
resolver timeout.

Extracted from graph-gophers#317
  • Loading branch information
Matthieu Vachon committed Nov 11, 2020
1 parent 4c772c1 commit d77614a
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 16 deletions.
27 changes: 19 additions & 8 deletions graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/graph-gophers/graphql-go/errors"
"github.com/graph-gophers/graphql-go/internal/common"
Expand Down Expand Up @@ -64,13 +65,14 @@ type Schema struct {
schema *schema.Schema
res *resolvable.Schema

maxDepth int
maxParallelism int
tracer trace.Tracer
validationTracer trace.ValidationTracer
logger log.Logger
useStringDescriptions bool
disableIntrospection bool
maxDepth int
maxParallelism int
tracer trace.Tracer
validationTracer trace.ValidationTracer
logger log.Logger
useStringDescriptions bool
disableIntrospection bool
subscribeResolverTimeout time.Duration
}

// SchemaOpt is an option to pass to ParseSchema or MustParseSchema.
Expand Down Expand Up @@ -135,6 +137,15 @@ func DisableIntrospection() SchemaOpt {
}
}

// SubscribeResolverTimeout is an option to control the amount of time
// we allow for a single subscribe message resolver to complete it's job
// before it times out and returns an error to the subscriber.
func SubscribeResolverTimeout(timeout time.Duration) SchemaOpt {
return func(s *Schema) {
s.subscribeResolverTimeout = timeout
}
}

// Response represents a typical response of a GraphQL server. It may be encoded to JSON directly or
// it may be further processed to a custom response type, for example to include custom error data.
// Errors are intentionally serialized first based on the advice in https://github.com/facebook/graphql/commit/7b40390d48680b15cb93e02d46ac5eb249689876#diff-757cea6edf0288677a9eea4cfc801d87R107
Expand Down Expand Up @@ -190,7 +201,7 @@ func (s *Schema) exec(ctx context.Context, queryString string, operationName str

// Subscriptions are not valid in Exec. Use schema.Subscribe() instead.
if op.Type == query.Subscription {
return &Response{Errors: []*errors.QueryError{&errors.QueryError{Message: "graphql-ws protocol header is missing"}}}
return &Response{Errors: []*errors.QueryError{{Message: "graphql-ws protocol header is missing"}}}
}
if op.Type == query.Mutation {
if _, ok := s.schema.EntryPoints["mutation"]; !ok {
Expand Down
8 changes: 5 additions & 3 deletions internal/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"reflect"
"sync"
"time"

"github.com/graph-gophers/graphql-go/errors"
"github.com/graph-gophers/graphql-go/internal/common"
Expand All @@ -20,9 +21,10 @@ import (

type Request struct {
selected.Request
Limiter chan struct{}
Tracer trace.Tracer
Logger log.Logger
Limiter chan struct{}
Tracer trace.Tracer
Logger log.Logger
SubscribeResolverTimeout time.Duration
}

func (r *Request) handlePanic(ctx context.Context) {
Expand Down
8 changes: 6 additions & 2 deletions internal/exec/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query
}
var out bytes.Buffer
func() {
// TODO: configurable timeout
subCtx, cancel := context.WithTimeout(ctx, time.Second)
timeout := r.SubscribeResolverTimeout
if timeout == 0 {
timeout = time.Second
}

subCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

// resolve response
Expand Down
48 changes: 48 additions & 0 deletions subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"testing"
"time"

graphql "github.com/graph-gophers/graphql-go"
qerrors "github.com/graph-gophers/graphql-go/errors"
Expand Down Expand Up @@ -473,3 +474,50 @@ const schema = `
hello: String!
}
`

type subscriptionsCustomTimeout struct{}

type messageResolver struct{}

func (r messageResolver) Msg() string {
time.Sleep(5 * time.Millisecond)
return "failed!"
}

func (r *subscriptionsCustomTimeout) OnTimeout() <-chan *messageResolver {
c := make(chan *messageResolver)
go func() {
c <- &messageResolver{}
close(c)
}()

return c
}

func TestSchemaSubscribe_CustomResolverTimeout(t *testing.T) {
r := &struct {
*subscriptionsCustomTimeout
}{
subscriptionsCustomTimeout: &subscriptionsCustomTimeout{},
}
gqltesting.RunSubscribe(t, &gqltesting.TestSubscription{
Schema: graphql.MustParseSchema(`
type Query {}
type Subscription {
onTimeout : Message!
}
type Message {
msg: String!
}
`, r, graphql.SubscribeResolverTimeout(1*time.Millisecond)),
Query: `
subscription {
onTimeout { msg }
}
`,
ExpectedResults: []gqltesting.TestResponse{
{Errors: []*qerrors.QueryError{{Message: "context deadline exceeded"}}},
},
})
}
7 changes: 4 additions & 3 deletions subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func (s *Schema) subscribe(ctx context.Context, queryString string, operationNam
Vars: variables,
Schema: s.schema,
},
Limiter: make(chan struct{}, s.maxParallelism),
Tracer: s.tracer,
Logger: s.logger,
Limiter: make(chan struct{}, s.maxParallelism),
Tracer: s.tracer,
Logger: s.logger,
SubscribeResolverTimeout: s.subscribeResolverTimeout,
}
varTypes := make(map[string]*introspection.Type)
for _, v := range op.Vars {
Expand Down

0 comments on commit d77614a

Please sign in to comment.