Skip to content

Commit

Permalink
Merge pull request #340 from greensync/bulk-enqueue
Browse files Browse the repository at this point in the history
Support bulk enqueuing of jobs which differ only in args/kwargs
  • Loading branch information
ZimbiX authored Aug 25, 2022
2 parents 3d452f2 + 297c6cc commit f0dab0b
Show file tree
Hide file tree
Showing 14 changed files with 974 additions and 16 deletions.
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

<!-- MarkdownTOC autolink=true -->

- [Unreleased](#unreleased)
- [2.0.0 \(2022-08-25\)](#200-2022-08-25)
- [1.4.1 \(2022-07-24\)](#141-2022-07-24)
- [2.0.0.beta1 \(2022-03-24\)](#200beta1-2022-03-24)
Expand Down Expand Up @@ -54,8 +55,31 @@

<!-- /MarkdownTOC -->

## Unreleased

- **Added**:
+ Added bulk enqueue interface for performance when enqueuing a large number of jobs at once - [docs](docs#enqueueing-jobs-in-bulk).
- **Deprecated**:
+ Deprecated `que_state_notify` trigger (`que_state` notification channel / `job_change` notification message). See [#372](https://github.com/que-rb/que/issues/372). We plan to remove this in a future release - let us know on the issue if you desire otherwise.

This release contains a database migration. You will need to migrate Que to the latest database schema version (7). For example, on ActiveRecord and Rails 6:

```ruby
class UpdateQueTablesToVersion6 < ActiveRecord::Migration[6.0]
def up
Que.migrate!(version: 7)
end

def down
Que.migrate!(version: 6)
end
end
```

## 2.0.0 (2022-08-25)

**Important: Do not upgrade straight to Que 2.** You will need to first update to the latest 1.x version, apply the Que database schema migration, and deploy, before you can safely begin the process of upgrading to Que 2. See the [2.0.0.beta1 changelog entry](#200beta1-2022-03-24) for details.

See beta 2.0.0.beta1, plus:

- **Fixed**:
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ group :test do
gem 'minitest', '~> 5.10.1'
gem 'minitest-profile', '0.0.2'
gem 'minitest-hooks', '1.4.0'
gem 'minitest-fail-fast', '0.1.0'

gem 'm'

Expand Down
28 changes: 26 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
- [Defining Middleware For Jobs](#defining-middleware-for-jobs)
- [Defining Middleware For SQL statements](#defining-middleware-for-sql-statements)
- [Vacuuming](#vacuuming)
- [Enqueueing jobs in bulk](#enqueueing-jobs-in-bulk)
- [Expired jobs](#expired-jobs)
- [Finished jobs](#finished-jobs)

Expand Down Expand Up @@ -836,6 +837,30 @@ class ManualVacuumJob < CronJob
end
```

## Enqueueing jobs in bulk

If you need to enqueue a large number of jobs at once, enqueueing each one separately (and running the notify trigger for each) can become a performance bottleneck. To mitigate this, there is a bulk enqueue interface:

```ruby
Que.bulk_enqueue do
MyJob.enqueue(user_id: 1)
MyJob.enqueue(user_id: 2)
# ...
end
```

The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query.

Limitations:

- ActiveJob is not supported
- All jobs must use the same job class
- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`)
- The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`)
- The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll

If you still want the notify trigger to run for each job, use `Que.bulk_enqueue(notify: true) { ... }`.

## Expired jobs

Expired jobs hang around in the `que_jobs` table. If necessary, you can get an expired job to run again by clearing the `error_count` and `expired_at` columns, e.g.:
Expand All @@ -850,8 +875,7 @@ If you prefer to leave finished jobs in the database for a while, to performantl

```sql
BEGIN;
ALTER TABLE que_jobs DISABLE TRIGGER que_state_notify;
SET LOCAL que.skip_notify TO true;
DELETE FROM que_jobs WHERE finished_at < (select now() - interval '7 days');
ALTER TABLE que_jobs ENABLE TRIGGER que_state_notify;
COMMIT;
```
2 changes: 1 addition & 1 deletion lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class << self

# Copy some commonly-used methods here, for convenience.
def_delegators :pool, :execute, :checkout, :in_transaction?
def_delegators Job, :enqueue, :run_synchronously, :run_synchronously=
def_delegators Job, :enqueue, :bulk_enqueue, :run_synchronously, :run_synchronously=
def_delegators Migrations, :db_version, :migrate!

# Global configuration logic.
Expand Down
134 changes: 122 additions & 12 deletions lib/que/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ class Job
RETURNING *
}

SQL[:bulk_insert_jobs] =
%{
WITH args_and_kwargs as (
SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb)
)
INSERT INTO public.que_jobs
(queue, priority, run_at, job_class, args, kwargs, data, job_schema_version)
SELECT
coalesce($1, 'default')::text,
coalesce($2, 100)::smallint,
coalesce($3, now())::timestamptz,
$4::text,
args_and_kwargs.args,
args_and_kwargs.kwargs,
coalesce($6, '{}')::jsonb,
#{Que.job_schema_version}
FROM args_and_kwargs
RETURNING *
}

attr_reader :que_attrs
attr_accessor :que_error, :que_resolved

Expand Down Expand Up @@ -78,30 +98,120 @@ def enqueue(*args)
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args: Que.serialize_json(args),
kwargs: Que.serialize_json(kwargs),
data: job_options[:tags] ? Que.serialize_json(tags: job_options[:tags]) : "{}",
args: args,
kwargs: kwargs,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs[:args] = Que.deserialize_json(attrs[:args])
attrs[:kwargs] = Que.deserialize_json(attrs[:kwargs])
attrs[:data] = Que.deserialize_json(attrs[:data])
if Thread.current[:que_jobs_to_bulk_insert]
if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper'
raise Que::Error, "Que.bulk_enqueue does not support ActiveJob."
end

raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {}

Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs
new({})
elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
attrs.merge!(
args: Que.deserialize_json(Que.serialize_json(attrs[:args])),
kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])),
data: Que.deserialize_json(Que.serialize_json(attrs[:data])),
)
_run_attrs(attrs)
else
values =
Que.execute(
:insert_job,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data),
).first
attrs.merge!(
args: Que.serialize_json(attrs[:args]),
kwargs: Que.serialize_json(attrs[:kwargs]),
data: Que.serialize_json(attrs[:data]),
)
values = Que.execute(
:insert_job,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data),
).first
new(values)
end
end
ruby2_keywords(:enqueue) if respond_to?(:ruby2_keywords, true)

def bulk_enqueue(job_options: {}, notify: false)
raise Que::Error, "Can't nest .bulk_enqueue" unless Thread.current[:que_jobs_to_bulk_insert].nil?
Thread.current[:que_jobs_to_bulk_insert] = { jobs_attrs: [], job_options: job_options }
yield
jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs]
job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options]
return [] if jobs_attrs.empty?
raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one?
args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) }
klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class])
klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify)
ensure
Thread.current[:que_jobs_to_bulk_insert] = nil
end

def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) }

if job_options[:tags]
if job_options[:tags].length > MAXIMUM_TAGS_COUNT
raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})"
end

job_options[:tags].each do |tag|
if tag.length > MAXIMUM_TAG_LENGTH
raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")"
end
end
end

args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs|
args_and_kwargs.merge(
args: args_and_kwargs.fetch(:args, []),
kwargs: args_and_kwargs.fetch(:kwargs, {}),
)
end

attrs = {
queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue,
priority: job_options[:priority] || resolve_que_setting(:priority),
run_at: job_options[:run_at] || resolve_que_setting(:run_at),
args_and_kwargs_array: args_and_kwargs_array,
data: job_options[:tags] ? { tags: job_options[:tags] } : {},
job_class: \
job_options[:job_class] || name ||
raise(Error, "Can't enqueue an anonymous subclass of Que::Job"),
}

if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously)
args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array)))
args_and_kwargs_array.map do |args_and_kwargs|
_run_attrs(
attrs.merge(
args: args_and_kwargs.fetch(:args),
kwargs: args_and_kwargs.fetch(:kwargs),
),
)
end
else
attrs.merge!(
args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]),
data: Que.serialize_json(attrs[:data]),
)
values_array =
Que.transaction do
Que.execute('SET LOCAL que.skip_notify TO true') unless notify
Que.execute(
:bulk_insert_jobs,
attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data),
)
end
values_array.map(&method(:new))
end
end

def run(*args)
# Make sure things behave the same as they would have with a round-trip
# to the DB.
Expand Down
2 changes: 1 addition & 1 deletion lib/que/migrations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Que
module Migrations
# In order to ship a schema change, add the relevant up and down sql files
# to the migrations directory, and bump the version here.
CURRENT_VERSION = 6
CURRENT_VERSION = 7

class << self
def migrate!(version:)
Expand Down
5 changes: 5 additions & 0 deletions lib/que/migrations/7/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
EXECUTE PROCEDURE public.que_job_notify();
13 changes: 13 additions & 0 deletions lib/que/migrations/7/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DROP TRIGGER que_job_notify ON que_jobs;
CREATE TRIGGER que_job_notify
AFTER INSERT ON que_jobs
FOR EACH ROW
WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true')
EXECUTE PROCEDURE public.que_job_notify();

DROP TRIGGER que_state_notify ON que_jobs;
CREATE TRIGGER que_state_notify
AFTER INSERT OR UPDATE OR DELETE ON que_jobs
FOR EACH ROW
WHEN (NOT coalesce(current_setting('que.skip_notify', true), '') = 'true')
EXECUTE PROCEDURE public.que_state_notify();
1 change: 1 addition & 0 deletions scripts/test
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
set -Eeuo pipefail

bundle exec rake spec "$@"
USE_RAILS=true bundle exec rake spec "$@"
25 changes: 25 additions & 0 deletions spec/que/active_job/extensions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def perform(*args, **kwargs)
after do
Object.send :remove_const, :TestJobClass
$args = nil
$kwargs = nil
end

def execute(&perform_later_block)
Expand Down Expand Up @@ -171,5 +172,29 @@ def perform(*args)
assert_equal error, notified_error
end
end

describe 'with bulk_enqueue' do
describe 'ActiveJobClass.perform_later' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
) do
Que.bulk_enqueue { TestJobClass.perform_later(1, 2) }
end
end
end

describe 'active_job#enqueue' do
it "is not supported" do
assert_raises_with_message(
Que::Error,
/Que\.bulk_enqueue does not support ActiveJob\./
) do
Que.bulk_enqueue { TestJobClass.new.enqueue }
end
end
end
end
end
end
Loading

0 comments on commit f0dab0b

Please sign in to comment.