Skip to content

Commit

Permalink
Merge pull request #3312 from rmosolgo/dataloader-list-fix
Browse files Browse the repository at this point in the history
Rebuild dataloader
  • Loading branch information
Robert Mosolgo authored Feb 3, 2021
2 parents 6b17312 + 7e4f664 commit dafe3d2
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 228 deletions.
10 changes: 2 additions & 8 deletions benchmark/batch_loading.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,13 @@ def team(name:)
end

query(Query)
use GraphQL::Execution::Interpreter
use GraphQL::Analysis::AST
use GraphQL::Batch
end

class GraphQLDataloaderSchema < GraphQL::Schema
class DataSource < GraphQL::Dataloader::Source
def initialize(column: :id)
@column = column
def initialize(options = {column: :id})
@column = options[:column]
end

def fetch(keys)
Expand Down Expand Up @@ -100,8 +98,6 @@ def team(name:)
end

query(Query)
use GraphQL::Execution::Interpreter
use GraphQL::Analysis::AST
use GraphQL::Dataloader
end

Expand Down Expand Up @@ -137,7 +133,5 @@ def team(name:)
end

query(Query)
use GraphQL::Execution::Interpreter
use GraphQL::Analysis::AST
end
end
2 changes: 0 additions & 2 deletions benchmark/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ def foos

class Schema < GraphQL::Schema
query QueryType
use GraphQL::Execution::Interpreter
use GraphQL::Analysis::AST
use GraphQL::Dataloader
end

Expand Down
194 changes: 102 additions & 92 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,24 @@ def self.use(schema)
schema.dataloader_class = self
end

def initialize(multiplex_context)
@context = multiplex_context
def initialize
@source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters|
source = source_class.new(*batch_parameters)
source.setup(self)
h2[batch_parameters] = source
}
}
@waiting_fibers = []
@yielded_fibers = {}
@pending_jobs = []
end

# @return [Hash] the {Multiplex} context
attr_reader :context

# @api private
attr_reader :yielded_fibers

# Add some work to this dataloader to be scheduled later.
# @param block Some work to enqueue
# @return [void]
def enqueue(&block)
@waiting_fibers << Fiber.new {
begin
yield
rescue StandardError => exception
exception
end
}
nil
# Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
#
# @param source_class [Class<GraphQL::Dataloader::Source]
# @param batch_parameters [Array<Object>]
# @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
# and cached for the lifetime of this {Multiplex}.
def with(source_class, *batch_parameters)
@source_cache[source_class][batch_parameters]
end

# Tell the dataloader that this fiber is waiting for data.
Expand All @@ -69,33 +57,70 @@ def yield
nil
end

# @param path [Array<String, Integer>] A graphql response path
# @return [Boolean] True if the current Fiber has yielded once via Dataloader at {path}
def yielded?(path)
@yielded_fibers[Fiber.current] == path
# @api private Nothing to see here
def append_job(&job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
nil
end

# Run all Fibers until they're all done
#
# Each cycle works like this:
#
# - Run each pending execution fiber (`@waiting_fibers`),
# - Then run each pending Source, preparing more data for those fibers.
# - Run each pending Source _again_ (if one Source requested more data from another Source)
# - Continue until there are no pending sources
# - Repeat: run execution fibers again ...
#
# @return [void]
# @api private Move along, move along
def run
# Start executing Fibers. This will run until all the Fibers are done.
already_run_fibers = []
while (current_fiber = @waiting_fibers.pop)
# Run each execution fiber, enqueuing it in `already_run_fibers`
# if it's still `.alive?`.
# Any spin-off continuations will be enqueued in `@waiting_fibers` (via {#enqueue})
resume_fiber_and_enqueue_continuation(current_fiber, already_run_fibers)

if @waiting_fibers.empty?
# At a high level, the algorithm is:
#
# A) Inside Fibers, run jobs from the queue one-by-one
# - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
# - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
# - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
# B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
# - Similarly, create a Fiber to consume pending sources and tell them to load their data.
# - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
# - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
# C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
# - Those Fibers assume that source caches will have been populated with the data they were waiting for.
# - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
# D) Once all pending fibers have been resumed once, return to `A` above.
#
# For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
# on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
#
pending_fibers = []
next_fibers = []
first_pass = true

while first_pass || (f = pending_fibers.shift)
if first_pass
first_pass = false
else
# These fibers were previously waiting for sources to load data,
# resume them. (They might wait again, in which case, re-enqueue them.)
f.resume
if f.alive?
next_fibers << f
end
end

while @pending_jobs.any?
# Create a Fiber to consume jobs until one of the jobs yields
# or jobs run out
f = Fiber.new {
while (job = @pending_jobs.shift)
job.call
end
}
result = f.resume
if result.is_a?(StandardError)
raise result
end
# In this case, the job yielded. Queue it up to run again after
# we load whatever it's waiting for.
if f.alive?
next_fibers << f
end
end

if pending_fibers.empty?
# Now, run all Sources which have become pending _before_ resuming GraphQL execution.
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
Expand All @@ -109,68 +134,45 @@ def run
end

if source_fiber_stack
# Use a stack with `.pop` here so that when a source causes another source to become pending,
# that newly-pending source will run _before_ the one that depends on it.
# (See below where the old fiber is pushed to the stack, then the new fiber is pushed on the stack.)
while (outer_source_fiber = source_fiber_stack.pop)
resume_fiber_and_enqueue_continuation(outer_source_fiber, source_fiber_stack)
result = outer_source_fiber.resume
if result.is_a?(StandardError)
raise result
end

if outer_source_fiber.alive?
source_fiber_stack << outer_source_fiber
end
# If this source caused more sources to become pending, run those before running this one again:
next_source_fiber = create_source_fiber
if next_source_fiber
source_fiber_stack << next_source_fiber
end
end
end

# We ran all the first round of execution fibers,
# and we ran all the pending sources.
# So pick up any paused execution fibers and repeat.
@waiting_fibers.concat(already_run_fibers)
already_run_fibers.clear
# Move newly-enqueued Fibers on to the list to be resumed.
# Clear out the list of next-round Fibers, so that
# any Fibers that pause can be put on it.
pending_fibers.concat(next_fibers)
next_fibers.clear
end
end
nil
end

# Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
#
# @param source_class [Class<GraphQL::Dataloader::Source]
# @param batch_parameters [Array<Object>]
# @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
# and cached for the lifetime of this {Multiplex}.
def with(source_class, *batch_parameters)
@source_cache[source_class][batch_parameters]
if @pending_jobs.any?
raise "Invariant: #{@pending_jobs.size} pending jobs"
elsif pending_fibers.any?
raise "Invariant: #{pending_fibers.size} pending fibers"
elsif next_fibers.any?
raise "Invariant: #{next_fibers.size} next fibers"
end
nil
end

# @api private
attr_accessor :current_runtime

private

# Check if this fiber is still alive.
# If it is, and it should continue, then enqueue a continuation.
# If it is, re-enqueue it in `fiber_queue`.
# Otherwise, clean it up from @yielded_fibers.
# @return [void]
def resume_fiber_and_enqueue_continuation(fiber, fiber_stack)
result = fiber.resume
if result.is_a?(StandardError)
raise result
end

# This fiber yielded; there's more to do here.
# (If `#alive?` is false, then the fiber concluded without yielding.)
if fiber.alive?
if !@yielded_fibers.include?(fiber)
# This fiber hasn't yielded yet, we should enqueue a continuation fiber
@yielded_fibers[fiber] = current_runtime.progress_path
current_runtime.enqueue_selections_fiber
end
fiber_stack << fiber
else
# Keep this set clean so that fibers can be GC'ed during execution
@yielded_fibers.delete(fiber)
end
end

# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
Expand All @@ -187,6 +189,14 @@ def create_source_fiber
end

if pending_sources
# By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops.
# For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then
# the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually
# the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`.
# That method is short-circuited since it isn't pending any more, but it's still a waste.
#
# This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers,
# similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber.
source_fiber = Fiber.new do
pending_sources.each(&:run_pending_keys)
end
Expand Down
10 changes: 5 additions & 5 deletions lib/graphql/dataloader/null_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ class Dataloader
# The Dataloader interface isn't public, but it enables
# simple internal code while adding the option to add Dataloader.
class NullDataloader < Dataloader
def enqueue
yield
end

# These are all no-ops because code was
# executed sychronously.
def run; end
def yield; end
def yielded?(_path); false; end

def append_job
yield
nil
end
end
end
end
2 changes: 1 addition & 1 deletion lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def sync_lazies(query: nil, multiplex: nil)
end
final_values.compact!
tracer.trace("execute_query_lazy", {multiplex: multiplex, query: query}) do
Interpreter::Resolve.resolve_all(final_values)
Interpreter::Resolve.resolve_all(final_values, multiplex.dataloader)
end
queries.each do |query|
runtime = query.context.namespace(:interpreter)[:runtime]
Expand Down
41 changes: 22 additions & 19 deletions lib/graphql/execution/interpreter/resolve.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ class Interpreter
module Resolve
# Continue field results in `results` until there's nothing else to continue.
# @return [void]
def self.resolve_all(results)
while results.any?
results = resolve(results)
end
def self.resolve_all(results, dataloader)
dataloader.append_job { resolve(results, dataloader) }
nil
end

# After getting `results` back from an interpreter evaluation,
Expand All @@ -24,33 +23,37 @@ def self.resolve_all(results)
# return {Lazy} instances if there's more work to be done,
# or return {Hash}/{Array} if the query should be continued.
#
# @param results [Array]
# @return [Array] Same size, filled with finished values
def self.resolve(results)
# @return [void]
def self.resolve(results, dataloader)
next_results = []

# Work through the queue until it's empty
while results.size > 0
while results.any?
result_value = results.shift

if result_value.is_a?(Lazy)
if result_value.is_a?(Hash)
results.concat(result_value.values)
next
elsif result_value.is_a?(Array)
results.concat(result_value)
next
elsif result_value.is_a?(Lazy)
result_value = result_value.value
end

if result_value.is_a?(Lazy)
# Since this field returned another lazy,
# add it to the same queue
results << result_value
elsif result_value.is_a?(Hash)
# This is part of the next level, add it
next_results.concat(result_value.values)
elsif result_value.is_a?(Array)
# This is part of the next level, add it
next_results.concat(result_value)
elsif result_value.is_a?(Hash) || result_value.is_a?(Array)
# Add these values in wholesale --
# they might be modified by later work in the dataloader.
next_results << result_value
end
end

next_results
if next_results.any?
dataloader.append_job { resolve(next_results, dataloader) }
end

nil
end
end
end
Expand Down
Loading

0 comments on commit dafe3d2

Please sign in to comment.