Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gush::Client now delegates the ActiveJob.perform_later call to Gush::Job #117

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,37 @@ class NotifyWorkflow < Gush::Workflow
end
```

### Customization of ActiveJob enqueueing

There might be a case when you want to customize enqueing a job with more than just the above two options (`queue` and `wait`).

To pass additional options to `ActiveJob.set`, override `Job#worker_options`, e.g.:

```ruby

class ScheduledJob < Gush::Job

def worker_options
super.merge(wait_until: Time.at(params[:start_at]))
end

end
```

Or to entirely customize the ActiveJob integration, override `Job#enqueue_worker!`, e.g.:

```ruby

class SynchronousJob < Gush::Job

def enqueue_worker!(options = {})
Gush::Worker.perform_now(workflow_id, name)
end

end
```


## Command line interface (CLI)

### Checking status
Expand Down
11 changes: 3 additions & 8 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,9 @@ def expire_job(workflow_id, job, ttl=nil)
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace
wait = job.wait

if wait.present?
Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name])
else
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end

options = { queue: configuration.namespace }.merge(job.worker_options)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overriding method approach definitely comes together well. It makes for a concise and flexible interface to the Job class. If I put on my horn-rimmed-dark-tinted critic spectacles (and squint pensively) it does read a little odd to have to reach into the #worker_options method to merge back in to the #enqueue_worker! method on the same worker object.

This is partly semantic, and you could clear it up maybe with a #default_worker_options intermediary, or something similar, but I am not sure it is worth the additional layer without actually trying it.

To get anywhere with another approach would probably have to sacrifice some of the simplicity of this approach, so if the maintainers like it then so do I!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review and comments!

I originally considered this alternative:

class Gush::Client
  def enqueue_job(workflow_id, job)
    ...
    job.enqueue_worker!(queue: configuration.namespace)
  end
end

class Gush::Job
  def enqueue_worker!(default_options = {})
    options = default_options.merge(worker_options)
    Gush::Worker.set(options).perform_later(workflow_id, name)
  end
end

but I think it's preferable to have Job#enqueue_worker! receive everything it needs without having to do additional work to get options, and allow the extra line of complexity to be in the Client class, since that's not meant to be overridden.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you already tried the default semantic and have good reason to avoid it, thanks for the reply!

job.enqueue_worker!(options)
end

private
Expand Down
8 changes: 8 additions & 0 deletions lib/gush/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ def enqueue!
@failed_at = nil
end

def enqueue_worker!(options = {})
Gush::Worker.set(options).perform_later(workflow_id, name)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads great! Simple and clean abstraction. It feels like no matter what else happens around it this should be the goal for the interface to the Worker class. Kudos!

end

def worker_options
{ queue: queue, wait: wait }.compact
end

def finish!
@finished_at = current_timestamp
end
Expand Down
39 changes: 39 additions & 0 deletions spec/gush/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,45 @@
end
end

describe "#enqueue_worker!" do
it "enqueues the job using Gush::Worker" do
job = described_class.new(name: "a-job", workflow_id: 123)

expect {
job.enqueue_worker!
}.to change{ActiveJob::Base.queue_adapter.enqueued_jobs.size}.from(0).to(1)
end

it "handles ActiveJob.set options" do
freeze_time = Time.utc(2023, 01, 21, 14, 36, 0)

travel_to freeze_time do
job = described_class.new(name: "a-job", workflow_id: 123)
job.enqueue_worker!(wait_until: freeze_time + 5.minutes)
expect(Gush::Worker).to have_a_job_enqueued_at(123, job_with_id(job.class.name), 5.minutes)
end
end
end

describe "#worker_options" do
it "returns a blank options hash by default" do
job = described_class.new
expect(job.worker_options).to eq({})
end

it "returns a hash with the queue setting" do
job = described_class.new
job.queue = 'my-queue'
expect(job.worker_options).to eq({ queue: 'my-queue' })
end

it "returns a hash with the wait setting" do
job = described_class.new
job.wait = 123
expect(job.worker_options).to eq({ wait: 123 })
end
end

describe "#start!" do
it "resets flags and marks as running" do
job = described_class.new(name: "a-job")
Expand Down