From 0bc10c4d7f35dbd5421cb7acd647b4ee8d8ca24a Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 31 Aug 2020 07:50:43 +0300 Subject: [PATCH] enable batch execution (#1965) WIP: When `batch` is set to true for a given subschemaConfig, batches all delegated root fields into a combined request passed to the executor. Moreover, batches all requests to a given subschema into the minimum number of requests, collecting queries and mutations separately, preserving operation order. Distributes properly pathed errors to the originating requests. Adapted from Gatsby query batcher by @vladar. Caveats: * Uses a Dataloader under the hood, which is created anew upon each request -- relies on a unique context argument per request to make this happen! * Does not pass `info` argument to the batched executor call, disabling any executors that used extensions annotated onto info. TODO: - Add testing! - Extensions support should be added by a custom option? Related: https://github.com/gatsbyjs/gatsby/pull/22347#issuecomment-609727851 https://github.com/ardatan/graphql-tools/issues/1710#issuecomment-652355934 https://github.com/ardatan/graphql-tools/issues/1959#issuecomment-683336594 https://github.com/ardatan/graphql-tools/issues/1954 --- packages/delegate/package.json | 3 +- packages/delegate/src/delegateToSchema.ts | 7 +- packages/delegate/src/getBatchingExecutor.ts | 71 +++++ packages/delegate/src/memoize.ts | 40 +++ packages/delegate/src/mergeRequests.ts | 273 ++++++++++++++++++ packages/delegate/src/prefix.ts | 13 + packages/delegate/src/splitResult.ts | 63 ++++ packages/delegate/src/types.ts | 1 + .../delegate/tests/batchExecution.test.ts | 61 ++++ 9 files changed, 530 insertions(+), 2 deletions(-) create mode 100644 packages/delegate/src/getBatchingExecutor.ts create mode 100644 packages/delegate/src/mergeRequests.ts create mode 100644 packages/delegate/src/prefix.ts create mode 100644 packages/delegate/src/splitResult.ts create mode 100644 packages/delegate/tests/batchExecution.test.ts diff --git a/packages/delegate/package.json b/packages/delegate/package.json index 1138cf5289d..610dac95ba9 100644 --- a/packages/delegate/package.json +++ b/packages/delegate/package.json @@ -21,6 +21,7 @@ "@graphql-tools/schema": "6.0.18", "@graphql-tools/utils": "6.0.18", "@ardatan/aggregate-error": "0.0.1", + "dataloader": "2.0.0", "is-promise": "4.0.0", "tslib": "~2.0.0" }, @@ -28,4 +29,4 @@ "access": "public", "directory": "dist" } -} \ No newline at end of file +} diff --git a/packages/delegate/src/delegateToSchema.ts b/packages/delegate/src/delegateToSchema.ts index 4f07a2fd3f8..f329485c852 100644 --- a/packages/delegate/src/delegateToSchema.ts +++ b/packages/delegate/src/delegateToSchema.ts @@ -31,6 +31,7 @@ import { createRequestFromInfo, getDelegatingOperation } from './createRequest'; import { Transformer } from './Transformer'; import AggregateError from '@ardatan/aggregate-error'; +import { getBatchingExecutor } from './getBatchingExecutor'; export function delegateToSchema(options: IDelegateToSchemaOptions | GraphQLSchema): any { if (isSchema(options)) { @@ -165,9 +166,13 @@ export function delegateRequest({ } if (targetOperation === 'query' || targetOperation === 'mutation') { - const executor = + let executor = subschemaConfig?.executor || createDefaultExecutor(targetSchema, subschemaConfig?.rootValue || targetRootValue); + if (subschemaConfig?.batch) { + executor = getBatchingExecutor(context, subschemaConfig, executor); + } + const executionResult = executor({ document: processedRequest.document, variables: processedRequest.variables, diff --git a/packages/delegate/src/getBatchingExecutor.ts b/packages/delegate/src/getBatchingExecutor.ts new file mode 100644 index 00000000000..4857428e5b3 --- /dev/null +++ b/packages/delegate/src/getBatchingExecutor.ts @@ -0,0 +1,71 @@ +import { getOperationAST } from 'graphql'; + +import isPromise from 'is-promise'; + +import DataLoader from 'dataloader'; + +import { ExecutionResult, Request } from '@graphql-tools/utils'; + +import { SubschemaConfig, ExecutionParams } from './types'; +import { memoize2of3 } from './memoize'; +import { mergeRequests } from './mergeRequests'; +import { splitResult } from './splitResult'; + +export const getBatchingExecutor = memoize2of3(function ( + _context: Record, + _subschemaConfig: SubschemaConfig, + executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise +) { + const loader = new DataLoader(createLoadFn(executor)); + return (request: Request) => loader.load(request); +}); + +function createLoadFn( + executor: ({ document, context, variables, info }: ExecutionParams) => ExecutionResult | Promise +) { + return async (requests: Array): Promise> => { + const requestBatches: Array> = []; + let index = 0; + const request = requests[index]; + let currentBatch: Array = [request]; + requestBatches.push(currentBatch); + const operationType = getOperationAST(request.document).operation; + while (++index < requests.length) { + const currentOperationType = getOperationAST(requests[index].document).operation; + if (operationType === currentOperationType) { + currentBatch.push(requests[index]); + } else { + currentBatch = [requests[index]]; + requestBatches.push(currentBatch); + } + } + + let containsPromises = false; + const executionResults: Array> = []; + requestBatches.forEach(requestBatch => { + const mergedRequest = mergeRequests(requestBatch); + const executionResult = executor(mergedRequest); + + if (isPromise(executionResult)) { + containsPromises = true; + } + executionResults.push(executionResult); + }); + + if (containsPromises) { + return Promise.all(executionResults).then(resultBatches => { + let results: Array = []; + resultBatches.forEach((resultBatch, index) => { + results = results.concat(splitResult(resultBatch, requestBatches[index].length)); + }); + return results; + }); + } + + let results: Array = []; + (executionResults as Array).forEach((resultBatch, index) => { + results = results.concat(splitResult(resultBatch, requestBatches[index].length)); + }); + return results; + }; +} diff --git a/packages/delegate/src/memoize.ts b/packages/delegate/src/memoize.ts index 05e6d3804a1..e32108a812c 100644 --- a/packages/delegate/src/memoize.ts +++ b/packages/delegate/src/memoize.ts @@ -211,3 +211,43 @@ export function memoize2, T2 extends Record, + T2 extends Record, + T3 extends any, + R extends any +>(fn: (A1: T1, A2: T2, A3: T3) => R): (A1: T1, A2: T2, A3: T3) => R { + let cache1: WeakMap>; + + function memoized(a1: T1, a2: T2, a3: T3) { + if (!cache1) { + cache1 = new WeakMap(); + const cache2: WeakMap = new WeakMap(); + cache1.set(a1, cache2); + const newValue = fn(a1, a2, a3); + cache2.set(a2, newValue); + return newValue; + } + + let cache2 = cache1.get(a1); + if (!cache2) { + cache2 = new WeakMap(); + cache1.set(a1, cache2); + const newValue = fn(a1, a2, a3); + cache2.set(a2, newValue); + return newValue; + } + + const cachedValue = cache2.get(a2); + if (cachedValue === undefined) { + const newValue = fn(a1, a2, a3); + cache2.set(a2, newValue); + return newValue; + } + + return cachedValue; + } + + return memoized; +} diff --git a/packages/delegate/src/mergeRequests.ts b/packages/delegate/src/mergeRequests.ts new file mode 100644 index 00000000000..df410335478 --- /dev/null +++ b/packages/delegate/src/mergeRequests.ts @@ -0,0 +1,273 @@ +// adapted from https://github.com/gatsbyjs/gatsby/blob/master/packages/gatsby-source-graphql/src/batching/merge-queries.js + +import { + visit, + Kind, + DefinitionNode, + OperationDefinitionNode, + DocumentNode, + FragmentDefinitionNode, + VariableDefinitionNode, + SelectionNode, + FragmentSpreadNode, + VariableNode, + VisitorKeyMap, + ASTKindToNode, + InlineFragmentNode, + FieldNode, +} from 'graphql'; + +import { Request } from '@graphql-tools/utils'; + +import { createPrefix } from './prefix'; + +/** + * Merge multiple queries into a single query in such a way that query results + * can be split and transformed as if they were obtained by running original queries. + * + * Merging algorithm involves several transformations: + * 1. Replace top-level fragment spreads with inline fragments (... on Query {}) + * 2. Add unique aliases to all top-level query fields (including those on inline fragments) + * 3. Prefix all variable definitions and variable usages + * 4. Prefix names (and spreads) of fragments + * + * i.e transform: + * [ + * `query Foo($id: ID!) { foo, bar(id: $id), ...FooQuery } + * fragment FooQuery on Query { baz }`, + * + * `query Bar($id: ID!) { foo: baz, bar(id: $id), ... on Query { baz } }` + * ] + * to: + * query ( + * $graphqlTools1_id: ID! + * $graphqlTools2_id: ID! + * ) { + * graphqlTools1_foo: foo, + * graphqlTools1_bar: bar(id: $graphqlTools1_id) + * ... on Query { + * graphqlTools1__baz: baz + * } + * graphqlTools1__foo: baz + * graphqlTools1__bar: bar(id: $graphqlTools1__id) + * ... on Query { + * graphqlTools1__baz: baz + * } + * } + */ +export function mergeRequests(requests: Array): Request { + const mergedVariables: Record = Object.create(null); + const mergedVariableDefinitions: Array = []; + const mergedSelections: Array = []; + const mergedFragmentDefinitions: Array = []; + + requests.forEach((request, index) => { + const prefixedRequest = prefixRequestParts(createPrefix(index), request); + + prefixedRequest.document.definitions.forEach(def => { + if (isOperationDefinition(def)) { + mergedSelections.push(...def.selectionSet.selections); + mergedVariableDefinitions.push(...(def.variableDefinitions ?? [])); + } + if (isFragmentDefinition(def)) { + mergedFragmentDefinitions.push(def); + } + }); + Object.assign(mergedVariables, request.variables); + }); + + const mergedOperationDefinition: OperationDefinitionNode = { + kind: Kind.OPERATION_DEFINITION, + operation: `query`, + variableDefinitions: mergedVariableDefinitions, + selectionSet: { + kind: Kind.SELECTION_SET, + selections: mergedSelections, + }, + }; + + return { + document: { + kind: Kind.DOCUMENT, + definitions: [mergedOperationDefinition, ...mergedFragmentDefinitions], + }, + variables: mergedVariables, + }; +} + +function prefixRequestParts(prefix: string, request: Request): Request { + let document = aliasTopLevelFields(prefix, request.document); + const variableNames = Object.keys(request.variables); + + if (variableNames.length === 0) { + return { ...request, document }; + } + + document = visit(document, { + [Kind.VARIABLE]: (node: VariableNode) => prefixNodeName(node, prefix), + [Kind.FRAGMENT_DEFINITION]: (node: FragmentDefinitionNode) => prefixNodeName(node, prefix), + [Kind.FRAGMENT_SPREAD]: (node: FragmentSpreadNode) => prefixNodeName(node, prefix), + }); + + const prefixedVariables = variableNames.reduce((acc, name) => { + acc[prefix + name] = request.variables[name]; + return acc; + }, Object.create(null)); + + return { + document, + variables: prefixedVariables, + }; +} + +/** + * Adds prefixed aliases to top-level fields of the query. + * + * @see aliasFieldsInSelection for implementation details + */ +function aliasTopLevelFields(prefix: string, document: DocumentNode): DocumentNode { + const transformer = { + [Kind.OPERATION_DEFINITION]: (def: OperationDefinitionNode) => { + const { selections } = def.selectionSet; + return { + ...def, + selectionSet: { + ...def.selectionSet, + selections: aliasFieldsInSelection(prefix, selections, document), + }, + }; + }, + }; + return visit(document, transformer, ({ [Kind.DOCUMENT]: [`definitions`] } as unknown) as VisitorKeyMap< + ASTKindToNode + >); +} + +/** + * Add aliases to fields of the selection, including top-level fields of inline fragments. + * Fragment spreads are converted to inline fragments and their top-level fields are also aliased. + * + * Note that this method is shallow. It adds aliases only to the top-level fields and doesn't + * descend to field sub-selections. + * + * For example, transforms: + * { + * foo + * ... on Query { foo } + * ...FragmentWithBarField + * } + * To: + * { + * graphqlTools1_foo: foo + * ... on Query { graphqlTools1_foo: foo } + * ... on Query { graphqlTools1_bar: bar } + * } + */ +function aliasFieldsInSelection( + prefix: string, + selections: ReadonlyArray, + document: DocumentNode +): Array { + return selections.map(selection => { + switch (selection.kind) { + case Kind.INLINE_FRAGMENT: + return aliasFieldsInInlineFragment(prefix, selection, document); + case Kind.FRAGMENT_SPREAD: { + const inlineFragment = inlineFragmentSpread(selection, document); + return aliasFieldsInInlineFragment(prefix, inlineFragment, document); + } + case Kind.FIELD: + default: + return aliasField(selection, prefix); + } + }); +} + +/** + * Add aliases to top-level fields of the inline fragment. + * Returns new inline fragment node. + * + * For Example, transforms: + * ... on Query { foo, ... on Query { bar: foo } } + * To + * ... on Query { graphqlTools1_foo: foo, ... on Query { graphqlTools1_bar: foo } } + */ +function aliasFieldsInInlineFragment( + prefix: string, + fragment: InlineFragmentNode, + document: DocumentNode +): InlineFragmentNode { + const { selections } = fragment.selectionSet; + return { + ...fragment, + selectionSet: { + ...fragment.selectionSet, + selections: aliasFieldsInSelection(prefix, selections, document), + }, + }; +} + +/** + * Replaces fragment spread with inline fragment + * + * Example: + * query { ...Spread } + * fragment Spread on Query { bar } + * + * Transforms to: + * query { ... on Query { bar } } + */ +function inlineFragmentSpread(spread: FragmentSpreadNode, document: DocumentNode): InlineFragmentNode { + const fragment = document.definitions.find( + def => isFragmentDefinition(def) && def.name.value === spread.name.value + ) as FragmentDefinitionNode; + if (!fragment) { + throw new Error(`Fragment ${spread.name.value} does not exist`); + } + const { typeCondition, selectionSet } = fragment; + return { + kind: Kind.INLINE_FRAGMENT, + typeCondition, + selectionSet, + directives: spread.directives, + }; +} + +function prefixNodeName( + namedNode: T, + prefix: string +): T { + return { + ...namedNode, + name: { + ...namedNode.name, + value: prefix + namedNode.name.value, + }, + }; +} + +/** + * Returns a new FieldNode with prefixed alias + * + * Example. Given prefix === "graphqlTools1_" transforms: + * { foo } -> { graphqlTools1_foo: foo } + * { foo: bar } -> { graphqlTools1_foo: bar } + */ +function aliasField(field: FieldNode, aliasPrefix: string): FieldNode { + const aliasNode = field.alias ? field.alias : field.name; + return { + ...field, + alias: { + ...aliasNode, + value: aliasPrefix + aliasNode.value, + }, + }; +} + +function isOperationDefinition(def: DefinitionNode): def is OperationDefinitionNode { + return def.kind === Kind.OPERATION_DEFINITION; +} + +function isFragmentDefinition(def: DefinitionNode): def is FragmentDefinitionNode { + return def.kind === Kind.FRAGMENT_DEFINITION; +} diff --git a/packages/delegate/src/prefix.ts b/packages/delegate/src/prefix.ts new file mode 100644 index 00000000000..73cb9d52323 --- /dev/null +++ b/packages/delegate/src/prefix.ts @@ -0,0 +1,13 @@ +// adapted from https://github.com/gatsbyjs/gatsby/blob/master/packages/gatsby-source-graphql/src/batching/merge-queries.js + +export function createPrefix(index: number): string { + return `graphqlTools${index}_`; +} + +export function parseKey(prefixedKey: string): { index: number; originalKey: string } { + const match = /^graphqlTools([\d]+)_(.*)$/.exec(prefixedKey); + if (match && match.length === 3 && !isNaN(Number(match[1])) && match[2]) { + return { index: Number(match[1]), originalKey: match[2] }; + } + return null; +} diff --git a/packages/delegate/src/splitResult.ts b/packages/delegate/src/splitResult.ts new file mode 100644 index 00000000000..fd2682a8e60 --- /dev/null +++ b/packages/delegate/src/splitResult.ts @@ -0,0 +1,63 @@ +// adapted from https://github.com/gatsbyjs/gatsby/blob/master/packages/gatsby-source-graphql/src/batching/merge-queries.js + +import { ExecutionResult, GraphQLError } from 'graphql'; + +import { relocatedError } from '@graphql-tools/utils'; + +import { parseKey } from './prefix'; + +/** + * Split and transform result of the query produced by the `merge` function + */ +export function splitResult(mergedResult: ExecutionResult, numResults: number): Array { + const splitResults: Array = []; + for (let i = 0; i < numResults; i++) { + splitResults.push({}); + } + + const data = mergedResult.data; + if (data) { + Object.keys(data).forEach(prefixedKey => { + const { index, originalKey } = parseKey(prefixedKey); + if (!splitResults[index].data) { + splitResults[index].data = { [originalKey]: data[prefixedKey] }; + } else { + splitResults[index].data[originalKey] = data[prefixedKey]; + } + }); + } + + const errors = mergedResult.errors; + if (errors) { + const newErrors: Record> = Object.create(null); + errors.forEach(error => { + if (error.path) { + const parsedKey = parseKey(error.path[0] as string); + if (parsedKey) { + const { index, originalKey } = parsedKey; + const newError = relocatedError(error, [originalKey, ...error.path.slice(1)]); + if (!newErrors[index]) { + newErrors[index] = [newError]; + } else { + newErrors[index].push(newError); + } + return; + } + } + + splitResults.forEach((_splitResult, index) => { + if (!newErrors[index]) { + newErrors[index] = [error]; + } else { + newErrors[index].push(error); + } + }); + }); + + Object.keys(newErrors).forEach(index => { + splitResults[index].errors = newErrors[index]; + }); + } + + return splitResults; +} diff --git a/packages/delegate/src/types.ts b/packages/delegate/src/types.ts index 5374ae8fcfd..f203ec8e441 100644 --- a/packages/delegate/src/types.ts +++ b/packages/delegate/src/types.ts @@ -151,6 +151,7 @@ export interface SubschemaConfig { createProxyingResolver?: CreateProxyingResolverFn; transforms?: Array; merge?: Record; + batch?: boolean; } export interface MergedTypeConfig { diff --git a/packages/delegate/tests/batchExecution.test.ts b/packages/delegate/tests/batchExecution.test.ts new file mode 100644 index 00000000000..5eb8c079f46 --- /dev/null +++ b/packages/delegate/tests/batchExecution.test.ts @@ -0,0 +1,61 @@ +import { graphql, execute, ExecutionResult } from 'graphql'; + +import { makeExecutableSchema } from '@graphql-tools/schema'; +import { delegateToSchema, SubschemaConfig, ExecutionParams, SyncExecutor } from '../src'; + +describe('batch execution', () => { + it('should batch', async () => { + const innerSchema = makeExecutableSchema({ + typeDefs: ` + type Query { + field1: String + field2: String + } + `, + resolvers: { + Query: { + field1: () => 'test1', + field2: () => 'test2', + }, + }, + }); + + let executions = 0; + + const innerSubschemaConfig: SubschemaConfig = { + schema: innerSchema, + batch: true, + executor: ((params: ExecutionParams): ExecutionResult => { + executions++; + return execute(innerSchema, params.document, undefined, params.context, params.variables) as ExecutionResult; + }) as SyncExecutor + } + + const outerSchema = makeExecutableSchema({ + typeDefs: ` + type Query { + field1: String + field2: String + } + `, + resolvers: { + Query: { + field1: (_parent, _args, context, info) => delegateToSchema({ schema: innerSubschemaConfig, context, info }), + field2: (_parent, _args, context, info) => delegateToSchema({ schema: innerSubschemaConfig, context, info }), + }, + }, + }); + + const expectedResult = { + data: { + field1: 'test1', + field2: 'test2', + }, + }; + + const result = await graphql(outerSchema, '{ field1 field2 }', undefined, {}); + + expect(result).toEqual(expectedResult); + expect(executions).toEqual(1); + }); +});