Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy forward concurrency key value when retrying a job, rather than regenerating it #622

Closed
rgaufman opened this issue Jun 21, 2022 · 32 comments · Fixed by #657
Closed

Copy forward concurrency key value when retrying a job, rather than regenerating it #622

rgaufman opened this issue Jun 21, 2022 · 32 comments · Fixed by #657
Labels
bug Something isn't working

Comments

@rgaufman
Copy link

Hi there,

I have this in my job:

class HeartbeatJob < ApplicationJob
  good_job_control_concurrency_with(total_limit: 1, key: -> { arguments.to_json })
  queue_as :default

  def perform(started: false, full: false)
    ...

My understanding is there should never be more than 1 job with these arguments, however I see this:

Screenshot 2022-06-21 at 17 50 38

When I expand the arguments, they are all the same, here is an example:

Screenshot 2022-06-21 at 17 50 43

Any ideas?

@bensheldon
Copy link
Owner

@rgaufman that's very strange. I tried to do a quick reproduction locally and it seemed like total_limit was meeting my expectations.

If you stop and delete all of those jobs, does it work properly?

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

I stopped and deleted all jobs, I upgraded good_job to 3.0.1 but I'm still seeing this behaviour, not sure what to do next. I am running in async mode:

Screenshot 2022-07-09 at 13 20 00

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

My guess is maybe there is a race condition where multiple threads are adding the items to the queue? - maybe on retry, it should check the constraint and if so remove the job instead of retrying endlessly?

@bensheldon
Copy link
Owner

Hmmm. You might see something if you read over the implementation:

https://github.com/bensheldon/good_job/blob/main/lib/good_job/active_job_extensions/concurrency.rb

Also, if you can put a irb/pry in there locally and see what the concurrency key is that's being generated and why logic is/isn't being entered.

Also, can you share your other GoodJob configuration? Maybe there's an interaction with job preservation/deletion.

This also makes me think I should surface some of GoodJob's record attributes in the UI like the concurrency key that's stored in the database so it would show up in these screenshots.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

How do I extract the concurrency keys through irb?

My config is:

    config.good_job = {
      preserve_job_records: false,
      cleanup_discarded_jobs: true,
      retry_on_unhandled_error: false,
      on_thread_error: ->(exception) { Airbrake.notify(exception) },
      execution_mode: :async,
      queues: 'snapshots:8;cloud_copy:3;filesystem:4;status:4;*',
      max_threads: 12,
      max_cache: 1000, # Number of jobs to keep in memory, 10k = ~20MB
      poll_interval: 1,
      shutdown_timeout: 30,
      enable_cron: true,
      logger: Rails.logger
    }

I will have a read through the implementation to see if I spoke anything, thank you for pointing me in the right direction.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Even from the console, if I run the same job with the same params, they get added:

irb(main):001:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca0656558 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="e9416ce8-69bc-4092-b8b0-1cfc845215c6", @priority=nil, @provider_job_id="93aaa86d-da35-433b-a424-e6fd9d99a59f", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):002:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca0305f48 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="8d5cb9b6-35b4-40a2-9dc8-2639789ebad5", @priority=nil, @provider_job_id="103bcde6-6e4c-48f2-be12-7118c766e35c", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):003:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca533fec8 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="a07ec457-402c-4393-ab16-96d1d2accf06", @priority=nil, @provider_job_id="073d2f5f-d4f6-4dc0-859a-e3374f6c5160", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):004:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca43c6758 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="a4d1a2a9-ce54-4ed3-bab2-2761e2fab8da", @priority=nil, @provider_job_id="52eb42e9-1940-41fb-b5c8-7170e75dc0de", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):005:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca42cb678 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="9969ab43-4f58-4d58-a133-5bcf5260880a", @priority=nil, @provider_job_id="3529c9ab-e6f5-40f6-a70d-686515ac3b83", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):006:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca42093e8 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="e8bac2cb-9878-4dd2-ac8b-5353f5aecdf0", @priority=nil, @provider_job_id="b5563660-6534-45c1-a250-b7d73491ef7e", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">

I can see those jobs in the dashboard:
Screenshot 2022-07-09 at 16 19 05

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Picking 2 of those at random, these are the details the dashboard shows:

Screenshot 2022-07-09 at 16 20 26

Screenshot 2022-07-09 at 16 20 36

@bensheldon
Copy link
Owner

How do I extract the concurrency keys through irb?

If you're in the context of an ActiveJob instance, you can call good_job_concurrency_key

You can see the GoodJob records that are stored in the database with GoodJob::Job.find(active_job_id). GoodJob::Job is an ActiveRecord object, so you can call where/first/etc. scopes/queries on it, as well as access the record-attributes, e.g.

good_job = GoodJob::Job.order(created_at: :desc).first
good_job.concurrency_key #=> should spit out the key that's stored in the database

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

irb(main):016:0> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
[{"full":null,"started":null}]
[{"full":null,"started":null}]
[{"full":null,"started":null}]
irb(main):017:0> HeartbeatJob.perform_later(started: nil, full: nil)
=> #<HeartbeatJob:0x00007fbca0798628 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="117350c4-3e67-4d07-945d-343339ce28f7", @priority=nil, @provider_job_id="2309f4e9-80fc-4c1c-a0bf-24a02efeb802", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):018:0> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
[{"full":null,"started":null}]
[{"full":null,"started":null}]
[{"full":null,"started":null}]
[{"full":null,"started":null}]

@bensheldon
Copy link
Owner

@rgaufman it sure looks like the concurrency limit isn't being respected; Like I would expect when you run perform_later in the console, that it should be rejected.

This is what I see in my local dev console when I try to reproduce it. Which is what I expect to see, and I'm not able to reproduce what you're seeing:

[12] pry(main)> GoodJob::Execution.delete_all
  GoodJob::Execution Destroy (1.4ms)  DELETE FROM "good_jobs"
=> 0
[13] pry(main)> HeartbeatJob.perform_later(full: nil, started: nil)
  GoodJob::Lockable Advisory Lock (1.3ms)  SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked  [["key", "[{\"full\":null,\"started\":null}]"]]
CONCURRENCY_KEY IS: [{"full":null,"started":null}]
   (0.8ms)  SELECT COUNT(*) FROM "good_jobs" WHERE "good_jobs"."concurrency_key" = $1 AND "good_jobs"."finished_at" IS NULL  [["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
  TRANSACTION (0.3ms)  BEGIN
  GoodJob::Execution Create (1.2ms)  INSERT INTO "good_jobs" ("queue_name", "priority", "serialized_params", "created_at", "updated_at", "active_job_id", "concurrency_key") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING "id"  [["queue_name", "default"], ["priority", 0], ["serialized_params", "{\"job_class\":\"HeartbeatJob\",\"job_id\":\"62268b92-5e8a-4309-b5a2-a5d2d855c272\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"full\":null,\"started\":null,\"_aj_ruby2_keywords\":[\"full\",\"started\"]}],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2022-07-09T15:48:58Z\"}"], ["created_at", "2022-07-09 15:48:58.003698"], ["updated_at", "2022-07-09 15:48:58.003698"], ["active_job_id", "62268b92-5e8a-4309-b5a2-a5d2d855c272"], ["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
  TRANSACTION (2.2ms)  COMMIT
  SQL (0.5ms)  NOTIFY good_job, '{"queue_name":"default"}'
  GoodJob::Lockable Advisory Unlock (0.3ms)  SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked  [["key", "[{\"full\":null,\"started\":null}]"]]
Enqueued HeartbeatJob (Job ID: 62268b92-5e8a-4309-b5a2-a5d2d855c272) to GoodJob(default) with arguments: {:full=>nil, :started=>nil}
=> #<HeartbeatJob:0x000000012398def0
 @arguments=[{:full=>nil, :started=>nil}],
 @exception_executions={},
 @executions=0,
 @job_id="62268b92-5e8a-4309-b5a2-a5d2d855c272",
 @priority=nil,
 @provider_job_id="60a2b354-00fc-4593-a92b-3ca0e27e78ca",
 @queue_name="default",
 @timezone="UTC">
[14] pry(main)> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)

  GoodJob::Job Load (0.6ms)  SELECT "good_jobs".* FROM "good_jobs" WHERE "good_jobs"."retried_good_job_id" IS NULL ORDER BY "good_jobs"."created_at" DESC
[{"full":null,"started":null}]
=> nil
[15] pry(main)> HeartbeatJob.perform_later(full: nil, started: nil)
  GoodJob::Lockable Advisory Lock (1.3ms)  SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked  [["key", "[{\"full\":null,\"started\":null}]"]]
CONCURRENCY_KEY IS: [{"full":null,"started":null}]
   (0.9ms)  SELECT COUNT(*) FROM "good_jobs" WHERE "good_jobs"."concurrency_key" = $1 AND "good_jobs"."finished_at" IS NULL  [["concurrency_key", "[{\"full\":null,\"started\":null}]"]]
  GoodJob::Lockable Advisory Unlock (0.2ms)  SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked  [["key", "[{\"full\":null,\"started\":null}]"]]
Enqueued HeartbeatJob (Job ID: bde44ed2-b7f2-4b13-b689-f5e10323d40c) to GoodJob(default) with arguments: {:full=>nil, :started=>nil}
=> false
[16] pry(main)> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
  GoodJob::Job Load (1.4ms)  SELECT "good_jobs".* FROM "good_jobs" WHERE "good_jobs"."retried_good_job_id" IS NULL ORDER BY "good_jobs"."created_at" DESC
[{"full":null,"started":null}]
=> nil
[17] pry(main)>

I noticed that you don't have a include GoodJob::ActiveJobExtensions::Concurrency in your HeartbeatJob. Could you share your ApplicationJob? Maybe something is intercepting the around_enqueue. This is a mystery to me.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Hmm:

class ApplicationJob < ActiveJob::Base
  include GoodJob::ActiveJobExtensions::Concurrency
end

Running again with debug logging, I see:

irb(main):001:0> HeartbeatJob.perform_later(started: nil, full: nil)
2022-07-09 16:58:29.592996 D [1674374:27980 (irb):1] (0.972ms) ActiveRecord -- GoodJob::Lockable Advisory Lock -- { :sql => "SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 9, :cached => nil }
2022-07-09 16:58:29.609951 D [1674374:27980 (irb):1] (3.012ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1 AND \"good_jobs\".\"finished_at\" IS NULL", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 48, :cached => nil }
2022-07-09 16:58:29.638540 D [1674374:27980 (irb):1] (0.315ms) ActiveRecord -- TRANSACTION -- { :sql => "BEGIN", :allocations => 55, :cached => nil }
2022-07-09 16:58:29.641600 D [1674374:27980 (irb):1] (1.124ms) ActiveRecord -- GoodJob::Execution Create -- { :sql => "INSERT INTO \"good_jobs\" (\"queue_name\", \"priority\", \"serialized_params\", \"scheduled_at\", \"performed_at\", \"finished_at\", \"error\", \"created_at\", \"updated_at\", \"active_job_id\", \"concurrency_key\", \"cron_key\", \"retried_good_job_id\", \"cron_at\") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING \"id\"", :binds => { :queue_name => "default", :priority => 0, :serialized_params => "{\"job_class\":\"HeartbeatJob\",\"job_id\":\"81e5fabf-805f-428f-84e1-fae1d5bd1bb2\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"started\":null,\"full\":null,\"_aj_ruby2_keywords\":[\"started\",\"full\"]}],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2022-07-09T15:58:29Z\"}", :scheduled_at => nil, :performed_at => nil, :finished_at => nil, :error => nil, :created_at => "2022-07-09 15:58:29.636800", :updated_at => "2022-07-09 15:58:29.636800", :active_job_id => "81e5fabf-805f-428f-84e1-fae1d5bd1bb2", :concurrency_key => "[{\"started\":null,\"full\":null}]", :cron_key => nil, :retried_good_job_id => nil, :cron_at => nil }, :allocations => 33, :cached => nil }
2022-07-09 16:58:29.644797 D [1674374:27980 (irb):1] (0.829ms) ActiveRecord -- TRANSACTION -- { :sql => "COMMIT", :allocations => 8, :cached => nil }
2022-07-09 16:58:29.646296 D [1674374:27980 (irb):1] (0.284ms) ActiveRecord -- SQL -- { :sql => "NOTIFY good_job, '{\"queue_name\":\"default\"}'", :allocations => 8, :cached => nil }
2022-07-09 16:58:29.647697 D [1674374:27980 (irb):1] (0.484ms) ActiveRecord -- GoodJob::Lockable Advisory Unlock -- { :sql => "SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 8, :cached => nil }
2022-07-09 16:58:29.649006 I [1674374:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: 81e5fabf-805f-428f-84e1-fae1d5bd1bb2) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "81e5fabf-805f-428f-84e1-fae1d5bd1bb2", :provider_job_id => "3cd931c9-0225-40cb-bf76-0880428d9236", :arguments => "[\n  {\n    \"started\": null,\n    \"full\": null\n  }\n]" }
=> #<HeartbeatJob:0x00007efec4e3adb8 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="81e5fabf-805f-428f-84e1-fae1d5bd1bb2", @priority=nil, @provider_job_id="3cd931c9-0225-40cb-bf76-0880428d9236", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):002:0> HeartbeatJob.perform_later(started: nil, full: nil)
2022-07-09 16:58:30.803454 D [1674374:27980 (irb):2] (0.696ms) ActiveRecord -- GoodJob::Lockable Advisory Lock -- { :sql => "SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 28, :cached => nil }
2022-07-09 16:58:30.806409 D [1674374:27980 (irb):2] (0.661ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1 AND \"good_jobs\".\"finished_at\" IS NULL", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 27, :cached => nil }
2022-07-09 16:58:30.809560 D [1674374:27980 (irb):2] (0.294ms) ActiveRecord -- TRANSACTION -- { :sql => "BEGIN", :allocations => 9, :cached => nil }
2022-07-09 16:58:30.813599 D [1674374:27980 (irb):2] (1.369ms) ActiveRecord -- GoodJob::Execution Create -- { :sql => "INSERT INTO \"good_jobs\" (\"queue_name\", \"priority\", \"serialized_params\", \"scheduled_at\", \"performed_at\", \"finished_at\", \"error\", \"created_at\", \"updated_at\", \"active_job_id\", \"concurrency_key\", \"cron_key\", \"retried_good_job_id\", \"cron_at\") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) RETURNING \"id\"", :binds => { :queue_name => "default", :priority => 0, :serialized_params => "{\"job_class\":\"HeartbeatJob\",\"job_id\":\"a61e0787-00d2-42cf-af44-c64b67a50917\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"started\":null,\"full\":null,\"_aj_ruby2_keywords\":[\"started\",\"full\"]}],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2022-07-09T15:58:30Z\"}", :scheduled_at => nil, :performed_at => nil, :finished_at => nil, :error => nil, :created_at => "2022-07-09 15:58:30.808216", :updated_at => "2022-07-09 15:58:30.808216", :active_job_id => "a61e0787-00d2-42cf-af44-c64b67a50917", :concurrency_key => "[{\"started\":null,\"full\":null}]", :cron_key => nil, :retried_good_job_id => nil, :cron_at => nil }, :allocations => 49, :cached => nil }
2022-07-09 16:58:30.817848 D [1674374:27980 (irb):2] (1.294ms) ActiveRecord -- TRANSACTION -- { :sql => "COMMIT", :allocations => 27, :cached => nil }
2022-07-09 16:58:30.820978 D [1674374:27980 (irb):2] (0.816ms) ActiveRecord -- SQL -- { :sql => "NOTIFY good_job, '{\"queue_name\":\"default\"}'", :allocations => 9, :cached => nil }
2022-07-09 16:58:30.824722 D [1674374:27980 (irb):2] (1.790ms) ActiveRecord -- GoodJob::Lockable Advisory Unlock -- { :sql => "SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 28, :cached => nil }
2022-07-09 16:58:30.826282 I [1674374:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: a61e0787-00d2-42cf-af44-c64b67a50917) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "a61e0787-00d2-42cf-af44-c64b67a50917", :provider_job_id => "2f9fcc47-d40d-480e-b6c9-f2bf09be4774", :arguments => "[\n  {\n    \"started\": null,\n    \"full\": null\n  }\n]" }
=> #<HeartbeatJob:0x00007efec462dd78 @arguments=[{:started=>nil, :full=>nil}], @exception_executions={}, @executions=0, @job_id="a61e0787-00d2-42cf-af44-c64b67a50917", @priority=nil, @provider_job_id="2f9fcc47-d40d-480e-b6c9-f2bf09be4774", @queue_name="default", @successfully_enqueued=true, @timezone="UTC">
irb(main):003:0> puts GoodJob::Job.order(created_at: :desc).select {|j| j.job_class == "HeartbeatJob" }.map(&:concurrency_key)
2022-07-09 16:58:52.574328 D [1674374:27980 (irb):3] (4.958ms) ActiveRecord -- GoodJob::Job Load -- { :sql => "SELECT \"good_jobs\".* FROM \"good_jobs\" WHERE \"good_jobs\".\"retried_good_job_id\" IS NULL ORDER BY \"good_jobs\".\"created_at\" DESC", :allocations => 69, :cached => nil }
[{"full":null,"started":null}]
[{"full":null,"started":null}]
[{"full":null,"started":null}]
=> nil

This is on Postgres 14.4, maybe a bug with the database?

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

I tried changing my job to:

class HeartbeatJob < ActiveJob::Base
  include GoodJob::ActiveJobExtensions::Concurrency
  good_job_control_concurrency_with(total_limit: 1, key: -> { arguments.to_json })
  queue_as :default

  def perform(started: false, full: false)
    sleep 60
  end
end

But there's no difference, it still queues the jobs, ignoring the total_limit.

This is on Ubuntu 20.04 with Postgresql 14.4-1.pgdg20.04+1

However on my Mac with 14.4 it is working as expected, but why...

@bensheldon
Copy link
Owner

Weird that you're only seeing it on Ubuntu. Can you drop a binding.irb in right here:

...and then see at that spot what it thinks the current enqueue_concurrency, enqueue_limit and total_limit is?

Otherwise we should probably hop on a zoom and debug this together.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Argh, so when I run this from the development environment, it is working correctly, something is happening in the production environment, trying to figure out what!

I don't have pry in the production environment. Trying to change production.rb line by line to figure out what's breaking this!

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

The problem goes away when I do config.eager_load = false, hmm

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

I added some puts statements and seeing this:

irb(main):004:0> HeartbeatJob.perform_later(started: nil, full: nil)
enqueue_limit:
enqueue_concurrency:1
total_limit:1
2022-07-09 17:22:03.517712 I [1699280:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: 428c4d67-03bc-418f-89dc-04107cc9c372) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "428c4d67-03bc-418f-89dc-04107cc9c372", :provider_job_id => nil, :arguments => "[\n  {\n    \"started\": null,\n    \"full\": null\n  }\n]" }
=> false
irb(main):005:0> HeartbeatJob.perform_later(started: nil, full: nil)
enqueue_limit:
enqueue_concurrency:1
total_limit:1
2022-07-09 17:22:03.980024 I [1699280:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: 7c319b22-da88-455e-bf51-84dc2e2b631f) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "7c319b22-da88-455e-bf51-84dc2e2b631f", :provider_job_id => nil, :arguments => "[\n  {\n    \"started\": null,\n    \"full\": null\n  }\n]" }

Also:

irb(main):004:0> GoodJob::Execution.where(concurrency_key: '[{"full":null,"started":null}]').count
=> 13

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

It doesn't make any sense, the key is correct and if I run in the console, it shows:

irb(main):002:0> key = '[{"full":null,"started":null}]'
=> "[{\"full\":null,\"started\":null}]"
irb(main):003:0> GoodJob::Execution.where(concurrency_key: key).unfinished.advisory_unlocked.count
=> 13
irb(main):004:0> GoodJob::Execution.where(concurrency_key: key).unfinished.count
=> 13

But for some reason, it thinks enqueue_concurrency:1

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

With verbose logging, running from concurrency.rb:

GoodJob::Execution.where(concurrency_key: key).count
2022-07-09 17:30:11.241796 D [1701512:27980 (irb):3] (1.361ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM "good_jobs" WHERE "good_jobs"."concurrency_key" = $1", :binds => { :concurrency_key => "[{"started":null,"full":null}]" }, :allocations => 46, :cached => nil }
Results => 1

Running manually from the same console:

irb(main):010:0> GoodJob::Execution.where(concurrency_key: key).count
2022-07-09 17:32:00.625427 D [1701512:27980 (irb):10] (1.414ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1", :binds => { :concurrency_key => "[{\"full\":null,\"started\":null}]" }, :allocations => 46, :cached => nil }
=> 13

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Full output:

$ bin/rails c
Loading production environment (Rails 7.0.3)
irb(main):001:0> key = '[{"full":null,"started":null}]'; GoodJob::Execution.where(concurrency_key: key).count
2022-07-09 17:35:23.258418 D [1702861:27980 (irb):1] (2.295ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1", :binds => { :concurrency_key => "[{\"full\":null,\"started\":null}]" }, :allocations => 47, :cached => nil }
=> 13
irb(main):002:0> HeartbeatJob.perform_later(started: nil, full: nil)
2022-07-09 17:35:25.224150 D [1702861:27980 (irb):2] (1.428ms) ActiveRecord -- GoodJob::Lockable Advisory Lock -- { :sql => "SELECT pg_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)::text AS locked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.228129 D [1702861:27980 (irb):2] (0.991ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1 AND \"good_jobs\".\"finished_at\" IS NULL", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.230881 D [1702861:27980 (irb):2] (0.935ms) ActiveRecord -- GoodJob::Execution Count -- { :sql => "SELECT COUNT(*) FROM \"good_jobs\" WHERE \"good_jobs\".\"concurrency_key\" = $1", :binds => { :concurrency_key => "[{\"started\":null,\"full\":null}]" }, :allocations => 46, :cached => nil }
manual: 1
key:[{"started":null,"full":null}]
enqueue_limit:
enqueue_concurrency:1
total_limit:1
2022-07-09 17:35:25.232758 D [1702861:27980 (irb):2] (0.493ms) ActiveRecord -- GoodJob::Lockable Advisory Unlock -- { :sql => "SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked", :binds => { :key => "[{\"started\":null,\"full\":null}]" }, :allocations => 10, :cached => nil }
2022-07-09 17:35:25.233949 I [1702861:27980 subscriber.rb:149] Rails -- Enqueued HeartbeatJob (Job ID: 873c4e2c-aad8-414c-951e-6434363548ec) to GoodJob(default) -- { :event_name => "enqueue.active_job", :adapter => "GoodJob", :queue => "default", :job_class => "HeartbeatJob", :job_id => "873c4e2c-aad8-414c-951e-6434363548ec", :provider_job_id => nil, :arguments => "[\n  {\n    \"started\": null,\n    \"full\": null\n  }\n]" }
=> false

@bensheldon
Copy link
Owner

Hmmmmmm. That is really strange 🙆‍♀️❓❓❓

A stretch, but are you setting GoodJob.active_record_parent_class maybe? That's the only thing I could imagine that would result in it like querying an entirely different database.

Reading those query logs, the exact same COUNT SQL is returning very different results.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Very bizarre indeed! - I am not setting active_record_parent_class and doesn't seem like any dependencies are either:

deployer@TetherBox-ad27131193d0fa71a629:~/timeagent$ find . -type f | xargs grep active_record_parent_class
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/CHANGELOG.md:- v2.0: Generators support multiple databases: `--database` option, `migrations_paths`, custom `GoodJob.active_record_parent_class` [\#354](https://github.com/bensheldon/good_job/pull/354) ([bensheldon](https://github.com/bensheldon))
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/README.md:- **`GoodJob.active_record_parent_class`** (string) The ActiveRecord parent class inherited by GoodJob's ActiveRecord model `GoodJob::Job` (defaults to `"ActiveRecord::Base"`). Configure this when using [multiple databases with ActiveRecord](https://guides.rubyonrails.org/active_record_multiple_databases.html) or when other custom configuration is necessary for the ActiveRecord model to connect to the Postgres database. _The value must be a String to avoid premature initialization of ActiveRecord._
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/README.md:GoodJob.active_record_parent_class = "ApplicationRecord"
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/README.md:    GoodJob.active_record_parent_class = "ApplicationDirectRecord"
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/lib/models/good_job/base_record.rb:  # Parent class can be configured with +GoodJob.active_record_parent_class+.
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/lib/models/good_job/base_record.rb:  class BaseRecord < Object.const_get(GoodJob.active_record_parent_class)
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/lib/good_job.rb:  # @!attribute [rw] active_record_parent_class
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/lib/good_job.rb:  #     GoodJob.active_record_parent_class = "CustomApplicationRecord"
./vendor/bundle/ruby/3.1.0/gems/good_job-3.0.1/lib/good_job.rb:  mattr_accessor :active_record_parent_class, default: "ActiveRecord::Base"

It's strange it's also caused by eager_loading but works with it disabled, hmm

@bensheldon
Copy link
Owner

bensheldon commented Jul 9, 2022

Noticed something: the concurrency key you're using initially is [full..started], whereas the concurrency key that is being used in the other query is [started...full]. Maybe the kwarg keys are sorted one way on insert, but a different way on query.

Could you try setting the arguments deterministically. e.g.

good_job_control_concurrency_with(total_limit: 1, key: -> { arguments.map { |item| item.sort.to_h }.to_json })

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Yes, that fixes it!! - now I wonder why it's sorting it 1 way vs another way depending on where it's getting called, very strange!

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Now interestingly, if I do arguments.t_s, the key is then: '[{"full":null,"started":null}]' - but this also adds duplicates. I wonder if there is a cleaner/simpler way for uniqueness based from arguments.

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Wait, something weird is going on, the concurrency_key is always [{"full":null,"started":null}] - even when I change my key to:

  good_job_control_concurrency_with(total_limit: 1, key: -> { OpenSSL::Digest.new('md5').base64digest(arguments.to_s) })

Which generates:

2022-07-09 18:02:15.268968 D [1840454:27980 (irb):6] (0.464ms) ActiveRecord -- GoodJob::Lockable Advisory Unlock -- { :sql => "SELECT pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint) AS unlocked", :binds => { :key => "9e++lYwVRnlsnCqZ9rhj5w==" }, :allocations => 10, :cached => nil }

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Oh, I think because good_job is running, it's overwriting my concurrency_key after the job is queued?? - Maybe that is causing some issues and why I can't reproduce in development?

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Yes! - It seems the job is added with the correct concurrency key, but the running good_job thread is then changing it to something else. But why? (your sorting suggestion above works and fixes it, but I'm wondering if there's anything cleaner/simplier)

In some of my tasks I now added: OpenSSL::Digest.new('md5').base64digest(arguments.map { |item| item.sort.to_h }.to_json) - this is making me a bit uneasy, doesn't feel like the Ruby way :)

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

This is the current working command:

  good_job_control_concurrency_with(total_limit: 1, key: -> { OpenSSL::Digest.new('md5').base64digest(arguments.map { |item| item.is_a?(String) ? item : item.sort.to_h }.to_json) })

Yikes lol

@bensheldon
Copy link
Owner

whew! 😰 I'm so glad we now understand what happened and we have a workaround. I think the kwargs are getting reordered when the job is deserialized, resulting in one cron-key on the initial enqueue, and a different cron-key when the job is retried.

I'm going to classify this as a bug. I think the ideal behavior is that the concurrency value is generated only once at the initial enqueue, and then that value is copied forward through subsequent retries.

Thank you so much for working through this with me!

@rgaufman
Copy link
Author

rgaufman commented Jul 9, 2022

Perfect, thank you, that sounds like a good solution!

@bensheldon bensheldon added the bug Something isn't working label Jul 9, 2022
@bensheldon bensheldon changed the title Jobs are queued despite good_job_control_concurrency_with total_limit being set Copy forward cron_key value when retrying a job, rather than regenerating it Jul 9, 2022
@bensheldon bensheldon changed the title Copy forward cron_key value when retrying a job, rather than regenerating it Copy forward concurrency key value when retrying a job, rather than regenerating it Jul 9, 2022
@rgaufman
Copy link
Author

This is fantastic and solved my problem, thank you for actioning this so quickly!

@bensheldon
Copy link
Owner

Yay! 🎉 This has been released in GoodJob v3.0.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants