From d13188b054a6df49fc5988c068fe064a5b3287d5 Mon Sep 17 00:00:00 2001 From: Matt Fenelon Date: Tue, 19 Mar 2024 11:51:49 +0000 Subject: [PATCH 1/5] Add concurrency_pool_max_size configuration option This option allows to limit the maximum number of resources that can be sideloaded concurrently. With a properly configured connection pool, this ensures that the activerecord's connection pool is not exhausted by the sideloading process. Closes #469 --- lib/graphiti/configuration.rb | 3 +++ lib/graphiti/scope.rb | 13 ++++++++++++- spec/configuration_spec.rb | 15 +++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/lib/graphiti/configuration.rb b/lib/graphiti/configuration.rb index be7b4f19..78d2fdf4 100644 --- a/lib/graphiti/configuration.rb +++ b/lib/graphiti/configuration.rb @@ -7,6 +7,8 @@ class Configuration # @return [Boolean] Concurrently fetch sideloads? # Defaults to false OR if classes are cached (Rails-only) attr_accessor :concurrency + # @return [Integer] Maximum number of threads to use when fetching sideloads concurrently + attr_accessor :concurrency_pool_max_size attr_accessor :respond_to attr_accessor :context_for_endpoint @@ -26,6 +28,7 @@ class Configuration def initialize @raise_on_missing_sideload = true @concurrency = false + @concurrency_pool_max_size = 4 @respond_to = [:json, :jsonapi, :xml] @links_on_demand = false @pagination_links_on_demand = false diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index af1f6ed0..be853768 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -2,6 +2,17 @@ module Graphiti class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination + + def self.thread_pool_executor + concurrency = Graphiti.config.concurrency_pool_max_size || 4 + @thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + def initialize(object, resource, query, opts = {}) @object = object @resource = resource @@ -49,7 +60,7 @@ def resolve_sideloads(results) @resource.adapter.close if concurrent } if concurrent - promises << Concurrent::Promise.execute(&resolve_sideload) + promises << Concurrent::Promise.execute(executor: self.class.thread_pool_executor, &resolve_sideload) else resolve_sideload.call end diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 22acf1f4..0fc0abb8 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -150,6 +150,21 @@ end end + describe "#concurrency_pool_max_size" do + include_context "with config", :concurrency_pool_max_size + + it "defaults" do + expect(Graphiti.config.concurrency_pool_max_size).to eq(4) + end + + it "is overridable" do + Graphiti.configure do |c| + c.concurrency_pool_max_size = 1 + end + expect(Graphiti.config.concurrency_pool_max_size).to eq(1) + end + end + describe "#raise_on_missing_sideload" do include_context "with config", :raise_on_missing_sideload From 664956b069993a1e4f9e3a3f1522b03d9f7853eb Mon Sep 17 00:00:00 2001 From: Matt Fenelon Date: Wed, 20 Mar 2024 09:48:13 +0000 Subject: [PATCH 2/5] Reword config setting to be clearer --- lib/graphiti/configuration.rb | 16 ++++++++++++++-- lib/graphiti/scope.rb | 2 +- spec/configuration_spec.rb | 10 +++++----- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/graphiti/configuration.rb b/lib/graphiti/configuration.rb index 78d2fdf4..4fc2a5fe 100644 --- a/lib/graphiti/configuration.rb +++ b/lib/graphiti/configuration.rb @@ -7,8 +7,20 @@ class Configuration # @return [Boolean] Concurrently fetch sideloads? # Defaults to false OR if classes are cached (Rails-only) attr_accessor :concurrency + + # This number must be considered in accordance with the database + # connection pool size configured in `database.yml`. The connection + # pool should be large enough to accommodate both the foreground + # threads (ie. web server or job worker threads) and background + # threads. For each process, Graphiti will create one global + # executor that uses this many threads to sideload resources + # asynchronously. Thus, the pool size should be at least + # `thread_count + concurrency_max_threads + 1`. For example, if your + # web server has a maximum of 3 threads, and + # `concurrency_max_threads` is set to 4, then your pool size should + # be at least 8. # @return [Integer] Maximum number of threads to use when fetching sideloads concurrently - attr_accessor :concurrency_pool_max_size + attr_accessor :concurrency_max_threads attr_accessor :respond_to attr_accessor :context_for_endpoint @@ -28,7 +40,7 @@ class Configuration def initialize @raise_on_missing_sideload = true @concurrency = false - @concurrency_pool_max_size = 4 + @concurrency_max_threads = 4 @respond_to = [:json, :jsonapi, :xml] @links_on_demand = false @pagination_links_on_demand = false diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index be853768..41101f9f 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -4,7 +4,7 @@ class Scope attr_reader :pagination def self.thread_pool_executor - concurrency = Graphiti.config.concurrency_pool_max_size || 4 + concurrency = Graphiti.config.concurrency_max_threads || 4 @thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( min_threads: 0, max_threads: concurrency, diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 0fc0abb8..87fceb49 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -150,18 +150,18 @@ end end - describe "#concurrency_pool_max_size" do - include_context "with config", :concurrency_pool_max_size + describe '#concurrency_max_threads' do + include_context "with config", :concurrency_max_threads it "defaults" do - expect(Graphiti.config.concurrency_pool_max_size).to eq(4) + expect(Graphiti.config.concurrency_max_threads).to eq(4) end it "is overridable" do Graphiti.configure do |c| - c.concurrency_pool_max_size = 1 + c.concurrency_max_threads = 1 end - expect(Graphiti.config.concurrency_pool_max_size).to eq(1) + expect(Graphiti.config.concurrency_max_threads).to eq(1) end end From 7208a0e904ff1b1e3a227dc4cdbb5ceee617c50d Mon Sep 17 00:00:00 2001 From: Matt Fenelon Date: Wed, 20 Mar 2024 10:32:28 +0000 Subject: [PATCH 3/5] Add thread safe executor initiator --- lib/graphiti/scope.rb | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index 41101f9f..3e03dfa6 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -3,14 +3,20 @@ class Scope attr_accessor :object, :unpaginated_object attr_reader :pagination + @thread_pool_executor_mutex = Mutex.new + def self.thread_pool_executor + return @thread_pool_executor if @thread_pool_executor + concurrency = Graphiti.config.concurrency_max_threads || 4 - @thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( - min_threads: 0, - max_threads: concurrency, - max_queue: concurrency * 4, - fallback_policy: :caller_runs - ) + @thread_pool_executor_mutex.synchronize do + @thread_pool_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end end def initialize(object, resource, query, opts = {}) @@ -19,20 +25,20 @@ def initialize(object, resource, query, opts = {}) @query = query @opts = opts - @object = @resource.around_scoping(@object, @query.hash) { |scope| + @object = @resource.around_scoping(@object, @query.hash) do |scope| apply_scoping(scope, opts) - } + end end def resolve if @query.zero_results? [] else - resolved = broadcast_data { |payload| + resolved = broadcast_data do |payload| @object = @resource.before_resolve(@object, @query) payload[:results] = @resource.resolve(@object) payload[:results] - } + end resolved.compact! assign_serializer(resolved) yield resolved if block_given? From 9e28bcecdf66f200bc82a45ee237a3d59f2e6fc5 Mon Sep 17 00:00:00 2001 From: Matt Fenelon Date: Wed, 20 Mar 2024 11:33:30 +0000 Subject: [PATCH 4/5] Drop unintended syntax changes --- lib/graphiti/scope.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/graphiti/scope.rb b/lib/graphiti/scope.rb index 3e03dfa6..fcb0731f 100644 --- a/lib/graphiti/scope.rb +++ b/lib/graphiti/scope.rb @@ -25,20 +25,20 @@ def initialize(object, resource, query, opts = {}) @query = query @opts = opts - @object = @resource.around_scoping(@object, @query.hash) do |scope| + @object = @resource.around_scoping(@object, @query.hash) { |scope| apply_scoping(scope, opts) - end + } end def resolve if @query.zero_results? [] else - resolved = broadcast_data do |payload| + resolved = broadcast_data { |payload| @object = @resource.before_resolve(@object, @query) payload[:results] = @resource.resolve(@object) payload[:results] - end + } resolved.compact! assign_serializer(resolved) yield resolved if block_given? From 9dfa0dc566cfddacaa0104f0251ce817a7c59c26 Mon Sep 17 00:00:00 2001 From: Jeff Keen Date: Wed, 20 Mar 2024 12:42:20 -0500 Subject: [PATCH 5/5] Run standardrb to fix whitespace/linting issue --- lib/graphiti/configuration.rb | 2 +- spec/configuration_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/graphiti/configuration.rb b/lib/graphiti/configuration.rb index 4fc2a5fe..d1a69098 100644 --- a/lib/graphiti/configuration.rb +++ b/lib/graphiti/configuration.rb @@ -7,7 +7,7 @@ class Configuration # @return [Boolean] Concurrently fetch sideloads? # Defaults to false OR if classes are cached (Rails-only) attr_accessor :concurrency - + # This number must be considered in accordance with the database # connection pool size configured in `database.yml`. The connection # pool should be large enough to accommodate both the foreground diff --git a/spec/configuration_spec.rb b/spec/configuration_spec.rb index 87fceb49..42013fbe 100644 --- a/spec/configuration_spec.rb +++ b/spec/configuration_spec.rb @@ -150,7 +150,7 @@ end end - describe '#concurrency_max_threads' do + describe "#concurrency_max_threads" do include_context "with config", :concurrency_max_threads it "defaults" do