-
-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy forward concurrency key value when retrying a job, rather than regenerating it #622
Comments
@rgaufman that's very strange. I tried to do a quick reproduction locally and it seemed like If you stop and delete all of those jobs, does it work properly? |
My guess is maybe there is a race condition where multiple threads are adding the items to the queue? - maybe on retry, it should check the constraint and if so remove the job instead of retrying endlessly? |
Hmmm. You might see something if you read over the implementation: https://github.com/bensheldon/good_job/blob/main/lib/good_job/active_job_extensions/concurrency.rb Also, if you can put a irb/pry in there locally and see what the concurrency key is that's being generated and why logic is/isn't being entered. Also, can you share your other GoodJob configuration? Maybe there's an interaction with job preservation/deletion. This also makes me think I should surface some of GoodJob's record attributes in the UI like the concurrency key that's stored in the database so it would show up in these screenshots. |
How do I extract the concurrency keys through irb? My config is:
I will have a read through the implementation to see if I spoke anything, thank you for pointing me in the right direction. |
Even from the console, if I run the same job with the same params, they get added:
|
If you're in the context of an ActiveJob instance, you can call You can see the GoodJob records that are stored in the database with good_job = GoodJob::Job.order(created_at: :desc).first
good_job.concurrency_key #=> should spit out the key that's stored in the database |
|
@rgaufman it sure looks like the concurrency limit isn't being respected; Like I would expect when you run This is what I see in my local dev console when I try to reproduce it. Which is what I expect to see, and I'm not able to reproduce what you're seeing: [12] pry(main)> GoodJob::Execution.delete_all
GoodJob::Execution Destroy (1.4ms) DELETE FROM "good_jobs"
=> 0
[13] pry(main)> HeartbeatJob.perform_later(full: nil, started: nil)
GoodJob::Lockable Advisory Lock (1.3ms) SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked [["key", "[{\"full\":null,\"started\":null}]"]]
CONCURRENCY_KEY IS: [{"full":null,"started":null}]
(0.8ms) SELECT COUNT(*) FROM "good_jobs" WHERE "good_jobs"."concurrency_key" = $1 AND "good_jobs"."finished_at" IS NULL [["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
TRANSACTION (0.3ms) BEGIN
GoodJob::Execution Create (1.2ms) INSERT INTO "good_jobs" ("queue_name", "priority", "serialized_params", "created_at", "updated_at", "active_job_id", "concurrency_key") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING "id" [["queue_name", "default"], ["priority", 0], ["serialized_params", "{\"job_class\":\"HeartbeatJob\",\"job_id\":\"62268b92-5e8a-4309-b5a2-a5d2d855c272\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"full\":null,\"started\":null,\"_aj_ruby2_keywords\":[\"full\",\"started\"]}],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2022-07-09T15:48:58Z\"}"], ["created_at", "2022-07-09 15:48:58.003698"], ["updated_at", "2022-07-09 15:48:58.003698"], ["active_job_id", "62268b92-5e8a-4309-b5a2-a5d2d855c272"], ["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
TRANSACTION (2.2ms) COMMIT
SQL (0.5ms) NOTIFY good_job, '{"queue_name":"default"}'
GoodJob::Lockable Advisory Unlock (0.3ms) SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked [["key", "[{\"full\":null,\"started\":null}]"]]
Enqueued HeartbeatJob (Job ID: 62268b92-5e8a-4309-b5a2-a5d2d855c272) to GoodJob(default) with arguments: {:full=>nil, :started=>nil}
=> #<HeartbeatJob:0x000000012398def0
@arguments=[{:full=>nil, :started=>nil}],
@exception_executions={},
@executions=0,
@job_id="62268b92-5e8a-4309-b5a2-a5d2d855c272",
@priority=nil,
@provider_job_id="60a2b354-00fc-4593-a92b-3ca0e27e78ca",
@queue_name="default",
@timezone="UTC">
[14] pry(main)> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
GoodJob::Job Load (0.6ms) SELECT "good_jobs".* FROM "good_jobs" WHERE "good_jobs"."retried_good_job_id" IS NULL ORDER BY "good_jobs"."created_at" DESC
[{"full":null,"started":null}]
=> nil
[15] pry(main)> HeartbeatJob.perform_later(full: nil, started: nil)
GoodJob::Lockable Advisory Lock (1.3ms) SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked [["key", "[{\"full\":null,\"started\":null}]"]]
CONCURRENCY_KEY IS: [{"full":null,"started":null}]
(0.9ms) SELECT COUNT(*) FROM "good_jobs" WHERE "good_jobs"."concurrency_key" = $1 AND "good_jobs"."finished_at" IS NULL [["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
GoodJob::Lockable Advisory Unlock (0.2ms) SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked [["key", "[{\"full\":null,\"started\":null}]"]]
Enqueued HeartbeatJob (Job ID: bde44ed2-b7f2-4b13-b689-f5e10323d40c) to GoodJob(default) with arguments: {:full=>nil, :started=>nil}
=> false
[16] pry(main)> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
GoodJob::Job Load (1.4ms) SELECT "good_jobs".* FROM "good_jobs" WHERE "good_jobs"."retried_good_job_id" IS NULL ORDER BY "good_jobs"."created_at" DESC
[{"full":null,"started":null}]
=> nil
[17] pry(main)> I noticed that you don't have a |
Hmm:
Running again with debug logging, I see:
This is on Postgres 14.4, maybe a bug with the database? |
I tried changing my job to:
But there's no difference, it still queues the jobs, ignoring the total_limit. This is on Ubuntu 20.04 with Postgresql 14.4-1.pgdg20.04+1 However on my Mac with 14.4 it is working as expected, but why... |
Weird that you're only seeing it on Ubuntu. Can you drop a ...and then see at that spot what it thinks the current Otherwise we should probably hop on a zoom and debug this together. |
Argh, so when I run this from the development environment, it is working correctly, something is happening in the production environment, trying to figure out what! I don't have pry in the production environment. Trying to change production.rb line by line to figure out what's breaking this! |
The problem goes away when I do config.eager_load = false, hmm |
I added some puts statements and seeing this:
Also:
|
It doesn't make any sense, the key is correct and if I run in the console, it shows:
But for some reason, it thinks enqueue_concurrency:1 |
With verbose logging, running from concurrency.rb:
Running manually from the same console:
|
Full output: $ bin/rails c
Loading production environment (Rails 7.0.3)
irb(main):001:0> key = '[{"full":null,"started":null}]'; GoodJob::Execution.where(concurrency_key: key).count
2022-07-09 17:35:23.258418 D [1702861:27980 (irb):1] (2.295ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1", :binds => { :concurrency_key => "[{\"full\":null,\"started\":null}]" }, :allocations => 47, :cached => nil }
=> 13
irb(main):002:0> HeartbeatJob.perform_later(started: nil, full: nil)
2022-07-09 17:35:25.224150 D [1702861:27980 (irb):2] (1.428ms) ActiveRecord -- GoodJob::Lockable Advisory Lock -- { :sql => "SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.228129 D [1702861:27980 (irb):2] (0.991ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1 AND \"good_jobs\".\"finished_at\" IS NULL", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.230881 D [1702861:27980 (irb):2] (0.935ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 46, :cached => nil }
manual: 1
key:[{"started":null,"full":null}]
enqueue_limit:
enqueue_concurrency:1
total_limit:1
2022-07-09 17:35:25.232758 D [1702861:27980 (irb):2] (0.493ms) ActiveRecord -- GoodJob::Lockable Advisory Unlock -- { :sql => "SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.233949 I [1702861:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: 873c4e2c-aad8-414c-951e-6434363548ec) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "873c4e2c-aad8-414c-951e-6434363548ec", :provider_job_id => nil, :arguments => "[\n {\n \"started\": null,\n \"full\": null\n }\n]" }
=> false |
Hmmmmmm. That is really strange 🙆♀️❓❓❓ A stretch, but are you setting Reading those query logs, the exact same COUNT SQL is returning very different results. |
Very bizarre indeed! - I am not setting active_record_parent_class and doesn't seem like any dependencies are either:
It's strange it's also caused by eager_loading but works with it disabled, hmm |
Noticed something: the concurrency key you're using initially is Could you try setting the arguments deterministically. e.g. good_job_control_concurrency_with(total_limit: 1, key: -> { arguments.map { |item| item.sort.to_h }.to_json }) |
Yes, that fixes it!! - now I wonder why it's sorting it 1 way vs another way depending on where it's getting called, very strange! |
Now interestingly, if I do arguments.t_s, the key is then: '[{"full":null,"started":null}]' - but this also adds duplicates. I wonder if there is a cleaner/simpler way for uniqueness based from arguments. |
Wait, something weird is going on, the concurrency_key is always [{"full":null,"started":null}] - even when I change my key to:
Which generates:
|
Oh, I think because good_job is running, it's overwriting my concurrency_key after the job is queued?? - Maybe that is causing some issues and why I can't reproduce in development? |
Yes! - It seems the job is added with the correct concurrency key, but the running good_job thread is then changing it to something else. But why? (your sorting suggestion above works and fixes it, but I'm wondering if there's anything cleaner/simplier) In some of my tasks I now added: OpenSSL::Digest.new('md5').base64digest(arguments.map { |item| item.sort.to_h }.to_json) - this is making me a bit uneasy, doesn't feel like the Ruby way :) |
This is the current working command:
Yikes lol |
whew! 😰 I'm so glad we now understand what happened and we have a workaround. I think the kwargs are getting reordered when the job is deserialized, resulting in one cron-key on the initial enqueue, and a different cron-key when the job is retried. I'm going to classify this as a bug. I think the ideal behavior is that the concurrency value is generated only once at the initial enqueue, and then that value is copied forward through subsequent retries. Thank you so much for working through this with me! |
Perfect, thank you, that sounds like a good solution! |
cron_key
value when retrying a job, rather than regenerating it
cron_key
value when retrying a job, rather than regenerating it
This is fantastic and solved my problem, thank you for actioning this so quickly! |
Yay! 🎉 This has been released in GoodJob v3.0.2 |
Hi there,
I have this in my job:
My understanding is there should never be more than 1 job with these arguments, however I see this:
When I expand the arguments, they are all the same, here is an example:
Any ideas?
The text was updated successfully, but these errors were encountered: