Skip to content

Commit

Permalink
add nested with connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Aug 8, 2024
1 parent 14acd43 commit 22eca7b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 27 deletions.
50 changes: 26 additions & 24 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
# frozen_string_literal: true

# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
LOCK_PREFIX = ENV["QUE_LOCK_PREFIX"] || 1111 # this is a random number
def initialize(job_connection_pool:, lock_record:)
def initialize(job_connection_pool:, lock_connection_pool:)
@job_connection_pool = job_connection_pool
@lock_record = lock_record
@lock_connection_pool = lock_connection_pool
super
end

def checkout
checkout_lock_database_connection do
checkout_activerecord_adapter { |conn| yield conn.raw_connection }
end
rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e
raise UnavailableConnection, e
rescue ::ActiveRecord::StatementInvalid => e
raise e unless AR_UNAVAILABLE_CONNECTION_ERRORS.include?(e.cause.class)

# ActiveRecord::StatementInvalid is one of the most generic exceptions AR can
# raise, so we catch it and only handle the specific nested exceptions.
raise UnavailableConnection, e.cause
end

def checkout_activerecord_adapter(&block)
@job_connection_pool.with_connection(&block)
end

def lock_database_connection
if Thread.current[:db_connection]
return Thread.current[:db_connection] if Thread.current[:db_connection].active?
end
# We are storing this in thread variable here to make sure
# same connection is used to acquire and release the advisory locks.
# Advisory lock will not be released if any other connection from the
# pool tries to release the lock
Thread.current[:db_connection] = @lock_record.connection
def checkout_lock_database_connection(&block)
@lock_connection_pool.with_connection(&block)
end

def execute(command, params = [])
Expand Down Expand Up @@ -52,26 +57,23 @@ def lock_job_with_lock_database(queue, cursor)
result
end

def cleanup!
@job_connection_pool.release_connection
@lock_record.remove_connection
end

def pg_try_advisory_lock?(job_id)
lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i
lock_database_connection.execute(
"SELECT pg_try_advisory_lock(#{lock_variable})",
).try(:first)&.fetch("pg_try_advisory_lock")
checkout_lock_database_connection do |conn|
conn.execute(
"SELECT pg_try_advisory_lock(#{job_id})",
).try(:first)&.fetch("pg_try_advisory_lock")
end
end

def unlock_job(job_id)
lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i
# If for any reason the connection that is used to get this advisory lock
# is corrupted, the lock on this job_id would already be released when the
# connection holding the lock goes bad.
# Now, if a new connection tries to release the non existing lock this would just no op
# by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock"
lock_database_connection.execute("SELECT pg_advisory_unlock(#{lock_variable})")
checkout_lock_database_connection do |conn|
conn.execute("SELECT pg_advisory_unlock(#{job_id})")
end
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions spec/active_record_with_lock_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ class LockDatabaseRecord < ActiveRecord::Base
establish_connection(
adapter: "postgresql",
host: ENV.fetch("LOCK_PGHOST", "localhost"),
user: ENV.fetch("LOCK_PGUSER", "ubuntu"),
user: ENV.fetch("LOCK_PGUSER", "postgres"),
password: ENV.fetch("LOCK_PGPASSWORD", "password"),
database: ENV.fetch("LOCK_PGDATABASE", "lock-test"),
port: ENV.fetch("LOCK_PGPORT", 5434),
pool: 6,
pool: 5,
)
end

Expand All @@ -25,6 +25,6 @@ class JobRecord < ActiveRecord::Base
def active_record_with_lock_adapter_connection
Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: JobRecord.connection_pool,
lock_record: LockDatabaseRecord,
lock_connection_pool: LockDatabaseRecord.connection_pool,
)
end

0 comments on commit 22eca7b

Please sign in to comment.