Transactional inbox/outbox message queuing.
This Rails engine can be mounted in any PostgreSQL-backed Rails app.
The motivating use-case involves event-streaming with Kafka.
Kafka guarantees that messages on the same topic and partition will be read in the same order as written, but we need to ensure we're writing them in the correct order, regardless of any concurrent processes we may be running.
Similarly, once we've read a message from the stream, we'd like to hand off the message processing to an asynchronous job while ensuring we process messages of a given type and key in the same order as read.
The inbox/outbox pattern (see also article) ensures message delivery.
Message ordering is preserved using advisory locks as a synchronization mechanism.
Add to the application's Gemfile:
gem "coil"
gem "schema_version_cache"
Install engine and migrations:
$ bundle install
$ bundle exec rails coil:install:migrations db:migrate
(NOTE: Also run the above commands when upgrading, as newer versions may introduce additional migrations.)
Register periodic jobs:
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
# ...
config.periodic do |mgr|
mgr.register("*/10 * * * *", "Coil::Inbox::MessagesPeriodicJob")
mgr.register("5-59/10 * * * *", "Coil::Outbox::MessagesPeriodicJob")
mgr.register("7-59/20 * * * *", "Coil::Inbox::MessagesCleanupJob")
mgr.register("12-59/20 * * * *", "Coil::Outbox::MessagesCleanupJob")
end
end
(NOTE: The cleanup jobs delete already-processed messages once their retention
period has passed. Retention periods can be configured using
Coil.inbox_retention_period
and Coil.outbox_retention_period
.)
Filter retryable errors out of alerting, e.g. airbrake:
# config/initializers/airbrake.rb
Airbrake.add_filter do |notice|
exception = notice.stash[:exception]
notice.ignore! if exception.is_a?(Coil::TransactionalMessagesJob::RetryableError)
end
Set up schema version cache as described here
Define a message type and corresponding job:
# app/models/inbox/foo_message.rb
class Inbox::FooMessage < Coil::Inbox::Message
def job_class
Inbox::FooMessagesJob
end
end
# app/jobs/inbox/foo_messages_job.rb
class Inbox::FooMessagesJob < Coil::TransactionalMessagesJob
private
# Put message processing logic in this method.
def process(message)
# For example...
uuid = message.key
val = message.value.deep_symbolize_keys
Foo.do_stuff(uuid:, potato: val[:potato])
end
def message_class
Inbox::FooMessage
end
end
(The test-suite contains a working example with type-annotations: message, job)
(For advanced use-cases, you can also define an around_process
job method.
See example)
Receive messages from Kafka:
# app/consumers/my_consumer.rb
class MyConsumer < Racecar::Consumer
FOO = "com.example.service.foo".freeze
subscribes_to FOO
def process(message)
key = AvroMessaging.decode(message.key)
decoded = AvroMessaging.decode_message(message.value)
value = decoded.message.deep_symbolize_keys
schema_id = decoded.schema_id
case message.topic
when FOO
Receivers::FooReceiver.receive(key:, value:, schema_id:)
end
end
end
# app/lib/receivers/foo_receiver.rb
module Receivers::FooReceiver
VALUE_SCHEMA_SUBJECT = "com.example.service.Foo_value"
def self.receive(key:, value:, schema_id:)
schema_version = AvroVersionCache.get_version_number(
subject: VALUE_SCHEMA_SUBJECT,
schema_id:
)
Inbox::FooMessage.create!(
key:,
value:,
metadata: {
value_schema_subject: VALUE_SCHEMA_SUBJECT,
value_schema_version: schema_version,
value_schema_id: schema_id
}
)
end
end
Define a message type and corresponding job:
# app/models/outbox/bar_message.rb
class Outbox::BarMessage < Coil::Outbox::Message
VALUE_SCHEMA_SUBJECT = "com.example.Bar_value"
def job_class
Outbox::BarMessagesJob
end
end
# app/jobs/outbox/bar_messages_job.rb
class Outbox::BarMessagesJob < Coil::TransactionalMessagesJob
private
# Attach schema metadata to message
def pre_process(message)
value_schema_subject = Outbox::BarMessage::VALUE_SCHEMA_SUBJECT
value_schema_id = AvroVersionCache.get_current_id(subject: value_schema_subject)
value_schema_version = AvroVersionCache.get_version_number(
subject: value_schema_subject,
schema_id: value_schema_id
)
metadata = {
value_schema_subject:,
value_schema_id:,
value_schema_version:
}.merge(message.metadata)
message.update!(metadata:)
end
# Write message to Kafka
def process(message)
BarEvent.new(message).produce_async
end
def message_class
Outbox::BarMessage
end
end
(The test-suite contains a working example with type-annotations: message, job)
Write to the outbox:
bar = Bar.first
turnips = bar.count_turnips
Outbox::BarMessage.create!(key: bar.uuid, value: {turnips:})
The inbox and outbox operations described above automatically preserve message ordering by sequentializing the creation and processing of messages with a given type and key.
You can access these synchronization mechanisms directly if necessary:
# If we want to treat turnip-harvesting and message creation as one operation
# and ensure that concurrent attempts to run that operation on the same Bar will
# be run sequentially:
Outbox::BarMessage.locking_persistence_queue(keys: [bar.uuid]) do
bar.harvest_turnips
bar.replant
turnips = bar.count_turnips
Outbox::BarMessage.create!(key: bar.uuid, value: {turnips:})
end
# More generally, we can run an action while holding advisory locks on a list of
# keys in some arbitrary keyspace:
queue_type = "FOOD_PREP"
message_type = "SOUP"
ingredients = ["lentils", "tomato"]
Coil::QueueLocking.locking(queue_type:, message_type:, message_keys: ingredients) do
Chef.make_soup(ingredients)
end
# The `locking` call above will wait until it's able to obtain the requested
# lock. If we'd rather abort the operation than wait for the lock:
Coil::QueueLocking.locking(queue_type:, message_type:, message_keys: ingredients, wait: false) do
Chef.make_soup(ingredients)
rescue Coil::QueueLocking::LockWaitTimeout
puts("Looks like someone else is already on it")
end
To adjust the configurable settings used within your application, create an
initializer at config/initializers/coil.rb
with the following content, then
uncomment and adjust the settings you wish to change:
# Coil.sidekiq_queue = "default"
# Coil.inbox_retention_period = 12.weeks
# Coil.outbox_retention_period = 12.weeks
Install development dependencies:
$ bundle
Install pre-commit hook:
$ bin/install-pre-commit
Setup database:
$ bin/rails db:setup
$ bin/rails db:migrate
Run test-suite:
$ bin/ci-test
Run linter:
$ bin/lint
Regenerate type info for DSLs (e.g. after adding a db migration):
$ bin/tapioca dsl --app-root=spec/dummy
Regenerate type info for gems (e.g. after adding a gem):
$ bin/tapioca gem
Coil's type annotations are declared in rbi/coil.rbi
to facilitate typechecking
by Rails apps that use this engine along with Sorbet. Keeping these annotations
in a separate file avoids foisting a Sorbet runtime dependency on any app that
uses our engine.