Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added sync option to job configuration to run job synchronously / immediately #112

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,20 @@ class NotifyWorkflow < Gush::Workflow
end
```

### Synchronous jobs

There might be a case when you want some jobs to complete synchronously, immediately after the call to `Workflow#start!`. For example, some jobs might complete very quickly and it can be helpful to show the results of those jobs in the response to some web request that started the workflow. Any job with the `sync: true` option is not sent to the queuing adapter but directly executed by blocking the execution of others until it’s finished.

```ruby
class SimpleWorkflow < Gush::Workflow
def configure
run PrepareJob, sync: true
run DownloadJob
end
end
```


## Command line interface (CLI)

### Checking status
Expand Down
7 changes: 5 additions & 2 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ def enqueue_job(workflow_id, job)
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace
wait = job.wait

if wait.present?
sync = job.sync

if sync
Gush::Worker.perform_now(*[workflow_id, job.name])
elsif wait.present?
Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name])
else
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
Expand Down
4 changes: 3 additions & 1 deletion lib/gush/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module Gush
class Job
attr_accessor :workflow_id, :incoming, :outgoing, :params,
:finished_at, :failed_at, :started_at, :enqueued_at, :payloads,
:klass, :queue, :wait
:klass, :sync, :queue, :wait
attr_reader :id, :klass, :output_payload, :params

def initialize(opts = {})
Expand All @@ -14,6 +14,7 @@ def as_json
{
id: id,
klass: klass.to_s,
sync: sync,
queue: queue,
incoming: incoming,
outgoing: outgoing,
Expand Down Expand Up @@ -126,6 +127,7 @@ def assign_variables(opts)
@klass = opts[:klass] || self.class
@output_payload = opts[:output_payload]
@workflow_id = opts[:workflow_id]
@sync = opts[:sync] || false
@queue = opts[:queue]
@wait = opts[:wait]
end
Expand Down
1 change: 1 addition & 0 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def run(klass, opts = {})
id: client.next_free_job_id(id, klass.to_s),
params: opts.fetch(:params, {}),
queue: opts[:queue],
sync: opts[:sync],
wait: opts[:wait]
})

Expand Down
11 changes: 11 additions & 0 deletions spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
end

describe "#start_workflow" do
context "when the job is configured as synchronous" do
it "performs the job immediately" do
workflow = SyncTestWorkflow.create

expect(Gush::Worker).to receive(:perform_now).with(workflow.id, job_with_id("Prepare"))
expect_any_instance_of(ActiveJob::ConfiguredJob).to receive(:perform_later).with(workflow.id, job_with_id('FetchFirstJob'))

client.start_workflow(workflow)
end
end

context "when there is wait parameter configured" do
let(:freeze_time) { Time.utc(2023, 01, 21, 14, 36, 0) }

Expand Down
5 changes: 4 additions & 1 deletion spec/gush/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
finished_at: 123,
enqueued_at: 120,
params: {},
sync: false,
queue: nil,
output_payload: nil,
workflow_id: 123
Expand All @@ -101,7 +102,8 @@
failed_at: 123,
finished_at: 122,
started_at: 55,
enqueued_at: 444
enqueued_at: 444,
sync: true
}
)

Expand All @@ -118,6 +120,7 @@
expect(job.finished_at).to eq(122)
expect(job.started_at).to eq(55)
expect(job.enqueued_at).to eq(444)
expect(job.sync).to eq(true)
end
end
end
7 changes: 7 additions & 0 deletions spec/gush/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ def configure(*args)
expect(flow.jobs.first.wait).to eq (5.seconds)
end

it "allows passing sync param to the job" do
flow = Gush::Workflow.new
flow.run(Gush::Job, sync: true)
flow.save
expect(flow.jobs.first.sync).to eq (true)
end

context "when graph is empty" do
it "adds new job with the given class as a node" do
flow = Gush::Workflow.new
Expand Down
7 changes: 7 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ def configure
end
end

class SyncTestWorkflow < Gush::Workflow
def configure
run Prepare, sync: true
run FetchFirstJob
end
end

REDIS_URL = ENV["REDIS_URL"] || "redis://localhost:6379/12"

module GushHelpers
Expand Down