Skip to content

Commit

Permalink
Add memory storage backend
Browse files Browse the repository at this point in the history
  • Loading branch information
akadusei committed Aug 2, 2024
1 parent 057650d commit 1febae0
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added
- Add support for custom storage backends
- Add memory storage backend
- Add `Mel::Job::Template#run` abstract method

### Changed
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,28 @@ This makes the storage backend the *source of truth* for schedules, allowing to
# ...
```
- Using the Memory backend (Not for production use)
```crystal
# ->>> src/app/config.cr
# ...
require "mel"
Mel.configure do |settings|
# ...
settings.store = Mel::Memory.new
# ...
end
# ...
```
- Skip storage
You may disable storage altogether by setting `Mel.settings.store` to `nil` (This is the default).
## Usage
1. Define job:
Expand Down
2 changes: 1 addition & 1 deletion spec/setup/worker.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Spec.around_each do |spec|
next spec.run
end

{Mel::Redis.new(ENV["REDIS_URL"])}.each do |store|
{Mel::Memory.new, Mel::Redis.new(ENV["REDIS_URL"])}.each do |store|
Mel.settings.store = store
tasks.call
spec.run
Expand Down
2 changes: 2 additions & 0 deletions spec/support/jobs/collect_jobs_job.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
JOBS = Atomic(Int32).new(0)

Spec.before_each { JOBS.lazy_set(0) }

struct CollectJobsJob
include Mel::Job

Expand Down
150 changes: 150 additions & 0 deletions src/mel/memory.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
require "./store"

module Mel
# DO NOT USE IN PRODUCTION:
#
# This is not very useful, unless the app and worker share memory.
# Besides, memory does not provide the persistence that Mel requires.
#
# You may use this for tests or demos.
class Memory
alias Progress = Hash(String, String)
alias Queue = Hash(String, Int64)
alias Tasks = Hash(String, String)

include Store

getter :mutex
getter :progress
getter :queue
getter :tasks

def initialize(
@queue = Queue.new,
@tasks = Tasks.new,
@progress = Progress.new,
@mutex = Mutex.new
)
end

def sorted_queue : Queue
queue.to_a.sort_by!(&.[1]).to_h
end

def find_due(
at time = Time.local,
count : Int = -1, *,
delete : Bool? = false
) : Array(String)?
return if count.zero?

query = ->do
queue = sorted_queue.select { |_, value| 0 <= value <= time.to_unix }
count(queue, count).keys
end

if delete.nil?
ids = lock do query.call.tap { |_ids| to_pending(_ids) } end
return find(ids, delete: false)
end

ids = lock { query.call }
find(ids, delete: delete)
end

def find_pending(count : Int, *, delete : Bool = false) : Array(String)?
return if count.zero?

queue = self.queue.select { |_, value| value == worker_score }
ids = count(queue, count).keys

find(ids, delete: delete)
end

def find(count : Int, *, delete : Bool? = false) : Array(String)?
return if count.zero?

query = ->do
queue = sorted_queue.select { |_, value| value >= 0 }
count(queue, count).keys
end

if delete.nil?
ids = lock do query.call.tap { |_ids| to_pending(_ids) } end
return find(ids, delete: false)
end

ids = lock { query.call }
find(ids, delete: delete)
end

def find(ids : Indexable, *, delete : Bool = false) : Array(String)?
return if ids.empty?

values = lock do
ids.compact_map { |id| delete ? @tasks.delete(id) : @tasks[id]? }
end

values unless values.empty?
end

def transaction(& : Transaction -> _)
yield Transaction.new(queue, tasks, progress)
end

def truncate
@tasks.clear
end

def get_progress(ids : Indexable) : Array(String)?
return if ids.empty?

values = ids.compact_map { |id| @progress[id]? }
values unless values.empty?
end

def truncate_progress
@progress.clear
end

private def count(queue, count)
count < 0 ? queue.to_h : queue.first(count).to_h
end

private def lock
@mutex.synchronize { yield }
end

private def to_pending(ids)
ids.each { |id| queue[id] = worker_score }
end

private def worker_score
-Mel.settings.worker_id.abs.to_i64
end

struct Transaction
include Store::Transaction

def initialize(@queue : Queue, @tasks : Tasks, @progress : Progress)
end

def create(task : Task)
@queue[task.id] ||= task.time.to_unix
@tasks[task.id] ||= task.to_json
end

def update(task : Task)
time = task.retry_time || task.time

@queue[task.id] = time.to_unix
@tasks[task.id] = task.to_json
end

def set_progress(id : String, value : Int, description : String)
report = Mel::Progress::Report.new(id, description, value)
@progress[id] = report.to_json
end
end
end
end

0 comments on commit 1febae0

Please sign in to comment.