Skip to content

Commit

Permalink
Add support for custom storage backends
Browse files Browse the repository at this point in the history
  • Loading branch information
akadusei committed Aug 2, 2024
1 parent b5986ef commit 057650d
Show file tree
Hide file tree
Showing 39 changed files with 708 additions and 790 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased] -

### Added
- Add support for custom storage backends
- Add `Mel::Job::Template#run` abstract method

### Changed
- Rename `Mel::Carbon::DeliverLaterStrategy` to `DeliverLater`
- Remove `src/worker.cr`

## [0.20.0] - 2024-04-16

Expand Down
62 changes: 41 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

In *Mel*, a scheduled job is called a *task*. A single job may be scheduled in multiple ways, yielding multiple tasks from the same job.

*Mel* schedules all tasks in *Redis*, as a set of task `id`s sorted by their times of next run. For recurring tasks, the next run is scheduled in *Redis* right after the current run completes.
Mel schedules all tasks in the chosen storage backend as a set of task `id`s sorted by their times of next run. For recurring tasks, the next run is scheduled right after the current run completes.

This makes *Redis* the *source of truth* for schedules, allowing to easily scale out *Mel* to multiple instances (called *workers*), or replace or stop workers without losing schedules.
This makes the storage backend the *source of truth* for schedules, allowing to easily scale out *Mel* to multiple instances (called *workers*), or replace or stop workers without losing schedules.

*Mel* supports *bulk scheduling* of jobs as a single atomic unit. There's also support for *sequential scheduling* to track a series of jobs and perform some action after they are all complete.

Expand All @@ -26,11 +26,13 @@ This makes *Redis* the *source of truth* for schedules, allowing to easily scale
dependencies:
mel:
github: GrottoPress/mel
#redis: # Uncomment if using the Redis backend
# github: jgaskins/redis
```

1. Run `shards update`

1. Require and configure *Mel*:
1. Require and configure *Mel* in your app (we'll configure workers later):

```crystal
# ->>> src/app/config.cr
Expand All @@ -43,17 +45,36 @@ This makes *Redis* the *source of truth* for schedules, allowing to easily scale
Mel.configure do |settings|
settings.error_handler = ->(error : Exception) { puts error.message }
settings.redis_url = "redis://localhost:6379/0?initial_pool_size=5&max_idle_pool_size=10"
settings.redis_key_prefix = "mel"
settings.timezone = Time::Location.load("Africa/Accra")
end
Log.setup(Mel.log.source, :info, Log::IOBackend.new)
Redis::Connection::LOG.level = :info
# Redis::Connection::LOG.level = :info # Uncomment if using the Redis backend
# ...
```

- Using the Redis backend

```crystal
# ->>> src/app/config.cr
# ...
require "mel/redis"
Mel.configure do |settings|
# ...
settings.store = Mel::Redis.new(
"redis://localhost:6379/0",
namespace: "mel"
)
# ...
end
# ...
```
## Usage
1. Define job:
Expand Down Expand Up @@ -93,13 +114,13 @@ This makes *Redis* the *source of truth* for schedules, allowing to easily scale
end
# Called in the main fiber before enqueueing the task in
# Redis.
# the store.
def before_enqueue
# ...
end
# Called in the main fiber after enqueueing the task in
# Redis. `success` is `true` only if the enqueue succeeded.
# the store. `success` is `true` only if the enqueue succeeded.
def after_enqueue(success)
if success
# ...
Expand Down Expand Up @@ -188,7 +209,7 @@ This makes *Redis* the *source of truth* for schedules, allowing to easily scale
```crystal
# ->>> src/worker.cr
require "mel/worker"
require "mel"
require "./app/**"
Expand Down Expand Up @@ -369,7 +390,7 @@ Dynamic task IDs may be OK for *triggered* jobs (jobs triggered by some kind of

However, there may be jobs that are scheduled unconditionally when your app starts (*global* jobs). For example, sending invoices at the beginning of every month. You should specify unique **static** IDs for such tasks.

Otherwise, every time the app (re)starts, jobs are scheduled again, each time with a different set of IDs. *Redis* would accept the new schedules because the IDs are different, resulting in duplicated scheduling of the same jobs.
Otherwise, every time the app (re)starts, jobs are scheduled again, each time with a different set of IDs. The store would accept the new schedules because the IDs are different, resulting in duplicated scheduling of the same jobs.

This is particularly important if you run multiple instances of your app. Hardcoding IDs for *global* jobs means that all instances hold the same IDs, so cannot reschedule a job that has already been scheduled by another instance.

Expand Down Expand Up @@ -418,10 +439,9 @@ struct SendAllEmails
# Pushes all jobs atomically, at the end of the block.
#
# There's also `redis#pipeline(&)`, if you do not need the atomicity.
redis.multi do |redis|
# Pass `redis` to `.run_*`.
@users.each { |user| SendEmail.run(redis: redis, user: user) }
transaction do |store|
# Pass `store` to `.run_*`.
@users.each { |user| SendEmail.run(store: store, user: user) }
end
end
Expand Down Expand Up @@ -557,9 +577,9 @@ struct SomeJob
def after_run(success)
return @progress.fail unless success
redis.multi do |redis|
SomeStep.run(redis: redis, progress: @progress)
@progress.move(50, redis) # <= Move to 50%
transaction do |store|
SomeStep.run(store: store, progress: @progress)
@progress.move(50, store) # <= Move to 50%
end
end
Expand All @@ -574,9 +594,9 @@ struct SomeJob
def after_run(success)
return @progress.fail unless success
redis.multi do |redis|
SomeOtherStep.run(redis: redis, progress: @progress)
@progress.move(80, redis) # <= Move to 80%
transaction do |store|
SomeOtherStep.run(store: store, progress: @progress)
@progress.move(80, store) # <= Move to 80%
end
end
end
Expand Down Expand Up @@ -647,7 +667,7 @@ end

A *Mel* worker waits for all running tasks to complete before exiting, if it received a `Signal::INT` or a `Signal::TERM`, or if you called `Mel.stop` somewhere in your code. This means jobs are never lost mid-flight.

Jobs are not lost even if there is a force shutdown of the worker process, since *Mel* does not delete a task from *Redis* until it is complete. The worker can pick off where it left off when it comes back online.
Jobs are not lost even if there is a force shutdown of the worker process, since *Mel* does not delete a task from the store until it is complete. The worker can pick off where it left off when it comes back online.

*Mel* relies on the `worker_id` setting to achieve this. Each worker, therefore, must set a *unique*, *static* integer ID, so it knows which *pending* tasks it owns.

Expand Down
14 changes: 7 additions & 7 deletions benchmark/main.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
require "benchmark"

require "../src/spec"
require "../src/redis"

ITERATIONS = ENV["ITERATIONS"]?.try(&.to_i) || 100_000

Mel.configure do |settings|
settings.batch_size = ENV["BATCH_SIZE"]?.try(&.to_i) || 10_000
settings.poll_interval = 1.microsecond
settings.redis_key_prefix = "melbench"
settings.redis_url = ENV["REDIS_URL"]
settings.store = Mel::Redis.new(ENV["REDIS_URL"], "melbench")
settings.worker_id = ENV["WORKER_ID"].to_i
end

Expand All @@ -37,17 +37,17 @@ struct DoNothing
end

Benchmark.bm do |job|
Mel::Task::Query.truncate
Mel.settings.store.try(&.truncate)

job.report("Sequential schedule #{ITERATIONS} jobs") do
ITERATIONS.times { DoNothing.run(retries: 0) }
end

Mel::Task::Query.truncate
Mel.settings.store.try(&.truncate)

job.report("Bulk schedule #{ITERATIONS} jobs") do
Mel.redis.multi do |redis|
ITERATIONS.times { DoNothing.run(redis: redis, retries: 0) }
Mel.transaction do |store|
ITERATIONS.times { DoNothing.run(store: store, retries: 0) }
end
end

Expand All @@ -56,5 +56,5 @@ Benchmark.bm do |job|

job.report("Run #{ITERATIONS} scheduled jobs") { Mel.start_and_stop(batches) }

Mel::Task::Query.truncate
Mel.settings.store.try(&.truncate)
end
6 changes: 3 additions & 3 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ dependencies:
pond:
github: GrottoPress/pond
version: ~> 2.0
redis:
github: jgaskins/redis
version: ~> 0.8.0

development_dependencies:
ameba:
Expand All @@ -28,6 +25,9 @@ development_dependencies:
carbon:
github: luckyframework/carbon
version: ~> 0.2.0
redis:
github: jgaskins/redis
version: ~> 0.8.0
timecop:
github: crystal-community/timecop.cr
version: ~> 0.5.0
38 changes: 0 additions & 38 deletions spec/mel/progress/query_spec.cr

This file was deleted.

37 changes: 0 additions & 37 deletions spec/mel/task/query_spec.cr

This file was deleted.

23 changes: 4 additions & 19 deletions spec/mel/task_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ describe Mel::Task do

Timecop.travel(10.hours.from_now) do
Mel.start_and_stop
Mel::Task.find_lte(Time.local, -1).should be_nil
Mel::Task.find_due(Time.local, -1).should be_nil
end
end
end
Expand Down Expand Up @@ -205,22 +205,7 @@ describe Mel::Task do
end
end

describe ".find_lt" do
it "returns all tasks whose times are past due" do
address = "user@domain.tld"
later = 2.hour.from_now

SendEmailJob.run(address: address)
SendEmailJob.run_in(1.hour, address: address)
SendEmailJob.run_at(later, address: address)

Mel::InstantTask.find(-1).try(&.size).should eq(3)
Mel::InstantTask.find_lt(later).try(&.size).should eq(2)
Mel::InstantTask.find_lt(later, 1).try(&.size).should eq(1)
end
end

describe ".find_lte" do
describe ".find_due" do
it "returns all tasks whose times are either due or past due" do
address = "user@domain.tld"
later = 2.hours.from_now
Expand All @@ -230,8 +215,8 @@ describe Mel::Task do
SendEmailJob.run_at(4.hours.from_now, address: address)

Mel::InstantTask.find(-1).try(&.size).should eq(3)
Mel::InstantTask.find_lte(later).try(&.size).should eq(2)
Mel::InstantTask.find_lte(later, 1).try(&.size).should eq(1)
Mel::InstantTask.find_due(later).try(&.size).should eq(2)
Mel::InstantTask.find_due(later, 1).try(&.size).should eq(1)
end
end
end
5 changes: 5 additions & 0 deletions spec/setup/email.cr
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
BaseEmail.configure do |settings|
settings.adapter = Carbon::DevAdapter.new
settings.deliver_later_strategy = Mel::Carbon::DeliverLater.new
end

Spec.before_each { Carbon::DevAdapter.reset }
Loading

0 comments on commit 057650d

Please sign in to comment.