This gem contains ActiveJob adapters for Amazon Simple Queue Service (SQS).
Add this gem to your Rails project's Gemfile:
gem 'aws-activejob-sqs', '~> 1'
gem 'aws-sdk-rails', '~> 5'
Then run bundle install
.
This gem also brings in the following AWS gems:
aws-sdk-sqs
You will have to ensure that you provide credentials for the SDK to use. See the latest AWS SDK for Ruby Docs for details.
If you're running your Rails application on Amazon EC2, the AWS SDK will check Amazon EC2 instance metadata for credentials to load. Learn more: IAM Roles for Amazon EC2
To use SQS as your queuing backend, simply set the active_job.queue_adapter
to :sqs
.
# config/environments/production.rb
config.active_job.queue_adapter = :sqs
To use the non-blocking (async) adapter, set active_job.queue_adapter
to
:sqs_async
. If you have a lot of jobs to queue or you need to avoid the extra
latency from an SQS call in your request then consider using the async adapter.
When using the Async adapter, you may want to configure a
async_queue_error_handler
to handle errors that may occur when queuing jobs.
# config/environments/production.rb
config.active_job.queue_adapter = :sqs_async
You can also set the adapter for a single job:
class YourJob < ApplicationJob
self.queue_adapter = :sqs
#....
end
You also need to configure a mapping of ActiveJob queue names to SQS Queue URLs:
# config/aws_active_job_sqs.yml
backpressure: 5 # configure global options for poller
max_messages: 3
queues:
default:
url: 'https://my-queue-url.amazon.aws'
max_messages: 2 # queue specific values override global values
For a complete list of configuration options see the Aws::ActiveJob::SQS::Configuration documentation.
You can configure SQS Active Job either through the environment, yaml file or
through code in your config/<env>.rb
or initializers.
For file based configuration, you can use either
config/aws_active_job_sqs/<Rails.env>.yml
or config/aws_active_job_sqs.yml
.
You may specify the file used through the :config_file
option in code or the
AWS_ACTIVE_JOB_SQS_CONFIG_FILE
environment variable.
The yaml files support ERB.
To configure in code:
Aws::ActiveJob::SQS.configure do |config|
config.logger = Rails.logger
config.max_messages = 5
config.client = Aws::SQS::Client.new(region: 'us-east-1')
end
SQS Active Job loads global and queue specific values from your
environment. Global keys take the form of:
AWS_ACTIVE_JOB_SQS_<KEY_NAME>
and queue specific keys take the
form of: AWS_ACTIVE_JOB_SQS_<QUEUE_NAME>_<KEY_NAME>
.
<QUEUE_NAME> is case-insensitive and is always down cased. Configuring
non-snake case queues (containing upper case) through ENV is
not supported.
Example:
export AWS_ACTIVE_JOB_SQS_MAX_MESSAGES = 5
export AWS_ACTIVE_JOB_SQS_DEFAULT_URL = https://my-queue.aws
To queue a job, you can just use standard ActiveJob methods:
# To queue for immediate processing
YourJob.perform_later(args)
# or to schedule a job for a future time:
YourJob.set(wait: 1.minute).perform_later(args)
Note: Due to limitations in SQS, you cannot schedule jobs for later than 15 minutes in the future.
See the Rails ActiveJob Guide on Exceptions for background on how ActiveJob handles exceptions and retries.
In general - you should configure retries for your jobs using retry_on. When configured, ActiveJob will catch the exception and reschedule the job for re-execution after the configured delay. This will delete the original message from the SQS queue and requeue a new message.
By default, any StandardError
raised during job execution will leave the message
on the queue (retrying it later) and initiate shutdown for the poller
and it will attempt to finish executing any in progress jobs.
You can configure an error handler block for the ActiveJob SQS poller with
poller_error_handler
. Jobs that raise a StandardError
and that do
not handle that error via retry_on
or discard_on
will call the configured
poller_error_handler
with |exception, sqs_message|
.
You may re-raise the exception to terminate the poller. You may also choose
whether to delete the sqs_message or not. If the message is not explicitly
deleted then the message will be left on the queue and will be retried
following the SQS Queue's configured
retry and DLQ settings.
Retries provided by this mechanism are after any retries configured on the job
with retry_on
.
If you do not have a DLQ configured, the message will continue to be attempted until it reaches the queues retention period. In general, it is a best practice to configure a DLQ to store unprocessable jobs for troubleshooting and re-drive.
Configuring retry/redrive for all StandardErrors
:
Aws::ActiveJob::SQS.configure do |config|
config.poller_error_handler do |_exception, _sqs_message|
# no-op, don't delete the message or re-raise the exception
end
end
Configuring different behavior for different exceptions:
module MyErrorHandlers
HANDLE_ERRORS = proc do |exception, sqs_message|
case exception
when MyRetryableException
# no-op, don't delete message
when MyTerminalException
# delete the message, preventing retry
sqs_message.delete
else
# unhandled exceptions, re-raise the exception to terminate the poller
raise exception
end
end
end
Aws::ActiveJob::SQS.configure do |config|
config.poller_error_handler = MyErrorHandlers::HANDLE_ERRORS
end
When using the Async adapter, you may want to configure a
async_queue_error_handler
to handle errors that may occur when queuing jobs.
See
Aws::ActiveJob::SQS::Configuration
for documentation.
To start processing jobs, you need to start a separate process
(in additional to your Rails app) with the aws_active_job_sqs
executable script provided with this gem, eg:
bundle exec aws_active_job_sqs --queue default
.
You can poll for one or multiple queues using the --queue
argument(s).
You can specify multiple queues
in the arguments by passing--queue
multiple times
(eg --queue queue_1 --queue queue_2
) or for all configured queues by
providing no queue arguments. When multiple queues are specified, 1 thread
per queue is started to run the poller for that queue. All queues share a
single, common thread pool for executing jobs.
It is generally recommended to start one polling process per queue instead of running a single poller for all queues as this generally allows you to better manage the resources used.
RAILS_ENV=development bundle exec aws_active_job_sqs --queue default
To see a complete list of arguments use --help
.
You can kill the process at any time with CTRL+C
- the processor will attempt
to shutdown cleanly and will wait up to :shutdown_timeout
seconds for all
actively running jobs to finish before killing them.
Note: When running in production, its recommended that use a process supervisor such as foreman, systemd, upstart, daemontools, launchd, runit, etc.
By default the aws_active_job_sqs
script will require and boot rails
using your config/environment.rb
; however, you can start aws_active_job_sqs
with --no-rails
to run the poller without Rails. You can specify an
additional file that includes required Job/application definitions with
--require
. Example:
bundle exec aws_active_job_sqs --queue default --no-rails --require my_jobs.rb
Rather than managing the worker processes yourself, you can use Lambda with an SQS Trigger. With Lambda Container Image Support and the lambda handler provided with this gem, it's easy to use lambda to run ActiveJobs for your dockerized rails app (see below for some tips).
All you need to do is:
- include the aws_lambda_ric gem
- Push your image to ECR
- Create a lambda function from your image (see the lambda docs for details).
- Add an SQS Trigger for the queue(s) you want to process jobs from.
- Set the ENTRYPOINT to
/usr/local/bundle/bin/aws_lambda_ric
and the CMD toconfig/environment.Aws::ActiveJob::SQS.lambda_job_handler
- this will load Rails and then use the lambda handler. You can do this either as function config or in your Dockerfile.
There are a few limitations/requirements for lambda container images: the default lambda user must be able to read all the files and the image must be able to run on a read only file system. You may need to disable bootsnap, set a HOME env variable and set the logger to STDOUT (which lambda will record to cloudwatch for you).
You can use the RAILS_ENV to control environment. If you need to execute specific configuration in the lambda, you can create a ruby file and use it as your entrypoint:
# app.rb
# some custom config
require_relative 'config/environment' # load rails
# Rails.config.custom....
# Aws::ActiveJob::SQS.configure....
# no need to write a handler yourself here, as long as
# aws-sdk-rails is loaded, you can still use the
# Aws::ActiveJob::SQS::LambdaHandler.job_handler
# To use this file, set CMD: app.Aws::ActiveJob::SQS::LambdaHandler.job_handler
If the order in which your jobs executes is important, consider using a FIFO Queue. A FIFO queue ensures that messages are processed in the order they were sent (First-In-First-Out) and exactly-once processing (ensuring duplicates are never introduced into the queue). To use a fifo queue, simply set the queue url (which will end in ".fifo") in your config.
When using FIFO queues, jobs will NOT be processed concurrently by the poller
to ensure the correct ordering. Additionally, all jobs on a FIFO queue will be queued
synchronously, even if you have configured the sqs_async
adapter.
FIFO queues support Message deduplication ID, which is the token used for deduplication of sent messages. If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.
If necessary, the deduplication key used to create the message deduplication ID can be customized:
Aws::ActiveJob::SQS.configure do |config|
config.excluded_deduplication_keys = [:job_class, :arguments]
end
# Or to set deduplication keys to exclude for a single job:
class YourJob < ApplicationJob
include Aws::ActiveJob::SQS::Deduplication
deduplicate_without :job_class, :arguments
#...
end
By default, the following keys are used for deduplication keys:
job_class, provider_job_id, queue_name, priority, arguments,
executions, exception_executions, locale, timezone, enqueued_at
Note that job_id
is NOT included in deduplication keys because it is unique
for each initialization of the job, and the run-once behavior must be guaranteed
for ActiveJob retries. Even without setting job_id
, it is implicitly excluded
from deduplication keys.
FIFO queues require a message group id to be provided for the job. It is determined by:
- Calling
message_group_id
on the job if it is defined - If
message_group_id
is not defined or the result isnil
, the default value will be used. You can optionally specify a custom value in your config as the default that will be used by all jobs.
AWS Activejob SQS is supported as an interrupt adaptor by job-iteration.