Skip to content

Commit

Permalink
Bulk enqueue: Disable que_job_notify trigger in bulk_enqueue for perf…
Browse files Browse the repository at this point in the history
…ormance

Implementation is based on [a tip from @jasoncodes](#340 (comment))!

Co-Authored-By: Andrew Colbeck <andrewcolbeck@gmail.com>
  • Loading branch information
ZimbiX and AndrewColbeck committed Jul 22, 2022
1 parent f573970 commit 9bc68cb
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
11 changes: 7 additions & 4 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,13 @@ def bulk_enqueue(args_and_kwargs_array, job_options: {})
_run_attrs(attrs)
else
values_array =
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :data),
)
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true')
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :data),
)
end
values_array.map(&method(:new))
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Que
module Migrations
# In order to ship a schema change, add the relevant up and down sql files
# to the migrations directory, and bump the version here.
CURRENT_VERSION = 6
CURRENT_VERSION = 7

class << self
def migrate!(version:)
Expand Down
5 changes: 5 additions & 0 deletions lib/que/migrations/7/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE public.que_job_notify();
6 changes: 6 additions & 0 deletions lib/que/migrations/7/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true')
EXECUTE FUNCTION public.que_job_notify();
14 changes: 14 additions & 0 deletions spec/que/migrations.work_job_trigger_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,18 @@ def listen_connection
assert_nil conn.wait_for_notify(0.01)
end
end

it "should not notify a locker when using bulk_enqueue" do
listen_connection do |conn|
DB[:que_lockers].insert(locker_attrs)
conn.async_exec "LISTEN que_listener_1"
Que::Job.bulk_enqueue(
[
{ args: ['job1_arg1'], kwargs: { job1_kwarg1: 'x' } },
{ args: ['job2_arg1'], kwargs: { job2_kwarg1: 'x' } },
],
)
assert_nil conn.wait_for_notify(0.01)
end
end
end

0 comments on commit 9bc68cb

Please sign in to comment.