-
-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Anti-patterns Guide #1257
base: main
Are you sure you want to change the base?
Anti-patterns Guide #1257
Conversation
job -> | ||
# Get workflow_id and job_name to create dependency | ||
workflow_id = Workflow.workflow_id(job) | ||
job_name = Workflow.job_name(job) | ||
|
||
# Add new job with dependency on previous job to ensure sequential processing | ||
Workflow.add( | ||
workflow_id, | ||
:process_webhook, | ||
MyApp.Workers.WebhookHandler.new(%{ | ||
"account_id" => account_id, | ||
"event" => event | ||
}), | ||
deps: [job_name] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pretty sure this is hallucination as the workflow is never inserted into the database; will have to double check
|
||
### Problem | ||
|
||
Oban Pro's `recorded: true` option preserves job results for workflow coordination. However, some developers misuse this feature as a primary storage mechanism for domain data that should reside in dedicated database tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sparked some discussion of "where is the line drawn at?" in my team.
I myself don't quite have a good rule of thumb in this instance, it's kinda of "how much does the entity matter?" question. What do you think?
```elixir | ||
defmodule MyApp.Workers.ExternalEntityImporter do | ||
use Oban.Pro.Workers.Workflow, queue: :imports | ||
|
||
@impl true | ||
def process(%Job{args: %{"external_id" => external_id}}) do | ||
with {:ok, response} <- MyApp.ExternalAPI.fetch_entity(external_id), | ||
# Store the entity in a dedicated database table | ||
{:ok, entity} <- MyApp.Entities.create_entity(%{ | ||
external_id: external_id, | ||
name: response.name, | ||
details: response.details | ||
}) do | ||
# Pass only the reference ID in the job result | ||
{:ok, %{entity_id: entity.id}} | ||
end | ||
end | ||
end | ||
|
||
defmodule MyApp.Workers.ExternalEntityProcessor do | ||
use Oban.Pro.Workers.Workflow, queue: :processing | ||
|
||
@impl true | ||
def process(%Job{args: %{"entity_id" => entity_id}}) do | ||
# Retrieve data from a proper database table | ||
with {:ok, entity} <- MyApp.Entities.get_entity(entity_id) do | ||
MyApp.Processor.process_entity(entity) | ||
end | ||
end | ||
end | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example did not use recorded: true
or fetch_recorded/2
```elixir | |
defmodule MyApp.Workers.ExternalEntityImporter do | |
use Oban.Pro.Workers.Workflow, queue: :imports | |
@impl true | |
def process(%Job{args: %{"external_id" => external_id}}) do | |
with {:ok, response} <- MyApp.ExternalAPI.fetch_entity(external_id), | |
# Store the entity in a dedicated database table | |
{:ok, entity} <- MyApp.Entities.create_entity(%{ | |
external_id: external_id, | |
name: response.name, | |
details: response.details | |
}) do | |
# Pass only the reference ID in the job result | |
{:ok, %{entity_id: entity.id}} | |
end | |
end | |
end | |
defmodule MyApp.Workers.ExternalEntityProcessor do | |
use Oban.Pro.Workers.Workflow, queue: :processing | |
@impl true | |
def process(%Job{args: %{"entity_id" => entity_id}}) do | |
# Retrieve data from a proper database table | |
with {:ok, entity} <- MyApp.Entities.get_entity(entity_id) do | |
MyApp.Processor.process_entity(entity) | |
end | |
end | |
end | |
``` | |
```elixir | |
defmodule MyApp.Workers.ExternalEntityImporter do | |
use Oban.Pro.Workers.Workflow, recorded: true, queue: :imports | |
@impl true | |
def process(%Job{args: %{"external_id" => external_id}}) do | |
with {:ok, response} <- MyApp.ExternalAPI.fetch_entity(external_id), | |
# Store the entity in a dedicated database table | |
{:ok, entity} <- MyApp.Entities.create_entity(%{ | |
external_id: external_id, | |
name: response.name, | |
details: response.details | |
}) do | |
# Pass only the reference ID in the job result | |
{:ok, %{entity_id: entity.id}} | |
end | |
end | |
end | |
defmodule MyApp.Workers.ExternalEntityProcessor do | |
use Oban.Pro.Workers.Workflow, queue: :processing | |
@impl true | |
def process(%Job{args: %{"entity_id" => entity_id}} = job) do | |
# Retrieve data from a proper database table | |
with {:ok, prev_results} <- fetch_recorded(job, "import_entity"), | |
{:ok, entity} <- MyApp.Entities.get_entity(prev_result["entity_id"]) do | |
# Use entity data from the database, but workflow coordination data from recorded | |
MyApp.Processor.process_entity(entity) | |
end | |
end | |
# Helper to fetch recorded results from upstream workflow jobs | |
defp fetch_recorded(job, job_name) do | |
case Oban.Pro.Workers.Workflow.fetch_recorded(job, names: [job_name]) do | |
{:ok, result} -> {:ok, result} | |
{:error, _} -> {:error, :missing_workflow_data} | |
end | |
end | |
end |
|
||
### Refactoring | ||
|
||
Use Chain Workers to ensure proper operation sequencing: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or Workflow
```elixir | ||
# Customize chain behavior for different error scenarios | ||
use Oban.Pro.Workers.Chain, | ||
queue: :external_service, | ||
max_attempts: 3, | ||
# Stop the entire chain if a job is discarded | ||
on_discarded: :halt, | ||
# Continue the chain if a job is cancelled | ||
on_cancelled: :continue | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hallucination, will have to update with the proper values (:ignore
, :hold
)
defmodule MyApp.Schema.JobResult do | ||
use Ecto.Schema | ||
import Ecto.Changeset | ||
|
||
schema "job_results" do | ||
field :job_type, :string | ||
field :parameters, :map | ||
field :result, :map | ||
field :year, :integer | ||
field :month, :integer | ||
field :processed_at, :utc_datetime | ||
|
||
timestamps() | ||
end | ||
|
||
def changeset(record, attrs) do | ||
record | ||
|> cast(attrs, [:job_type, :parameters, :result, :year, :month, :processed_at]) | ||
|> validate_required([:job_type, :processed_at]) | ||
end | ||
end | ||
|
||
defmodule MyApp.JobResults do | ||
import Ecto.Query | ||
alias MyApp.Repo | ||
alias MyApp.Schema.JobResult | ||
|
||
def create(attrs) do | ||
%JobResult{} | ||
|> JobResult.changeset(attrs) | ||
|> Repo.insert() | ||
end | ||
|
||
def get_by_period(job_type, year, month) do | ||
JobResult | ||
|> where([r], r.job_type == ^job_type) | ||
|> where([r], r.year == ^year) | ||
|> where([r], r.month == ^month) | ||
|> Repo.all() | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the example is weak, the refactor also seems weak
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a job result temporarily is fine, which is why there's a recorded
option in the first place. If the result is important in the long term then it should be written to a proper database entity, and certinly not a JobResult
.
|
||
The Smart Engine will dynamically adjust concurrency based on job execution patterns, but it works best when you start with thoughtfully designed queue structure. | ||
|
||
## Anti-Pattern 7: Insufficient Error Handling in Jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new one, not in the issue. It arose from a discussion within my team where it was not clear the distinction on when to use {:discard, any()}
and when to use {:cancel, any()}
My take on this is that :discard
should be used either when attempts are exhausted or we faced a unrecoverable error. And :cancel
should be used when although not successful, it's not an error – kinda like a no-op
return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted on Slack:
Regarding the
cancel
vsdiscard
discussion, the discard tuple is deprecated and undocumented now. Discard should always be because of exhausted retries, and cancel is always a manual action.
{:error, %{status: status}} when status >= 500 -> | ||
# Server error - worth retrying with backoff | ||
backoff = :math.pow(2, attempt) |> round() | ||
Logger.warn("User sync failed with server error: #{status}", args: args) | ||
{:snooze, backoff} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a bad example on :snooze
. I use it when some resource the job needs is not ready yet, but it should be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although that may lead to another anti pattern? Infinitely snoozing jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use the backoff provided by Oban itself, either through the worker (backoff/1
) or the Oban.Backoff
module. This would lead to infinitely snoozing jobs, which is bad. Also, I wouldn't include arbitrary logger warnings in example code.
{:error, :not_found} -> | ||
# Don't retry when the user doesn't exist | ||
Logger.info("Cannot sync user #{user_id} - not found", args: args) | ||
{:discard, :user_not_found} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a good example of when to use cancel
? As the job became a no-op
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely use {:cancel, :user_not_found}
rather than {:discard, _}
. And don't include logging.
- ✅ Consider uniqueness constraints for idempotency | ||
- ✅ Use structured error returns rather than exceptions | ||
|
||
## Best Practices for Oban Usage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole section was the AI being too eager, I left it here in case you find it valuable. Though it should be in another guide and not in the anti-patterns one
Related to: #1253
Full disclaimer: I used AI to write the first draft. For the example I either pulled directly from my work's codebase and anonymized it, got from Oban's doc or let the AI cook up an example.
I added one more section than from the issue, as it's related to another thing I noticed from my team: not differentiating between discard and cancel returns.