diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index e2556ab..6d7dd35 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -1,29 +1,34 @@ # frozen_string_literal: true -# https://github.com/que-rb/que/blob/80d6067861a41766c3adb7e29b230ce93d94c8a4/lib/que/active_job/extensions.rb module Que module Adapters class ActiveRecordWithLock < Que::Adapters::ActiveRecord - LOCK_PREFIX = ENV["QUE_LOCK_PREFIX"] || 1111 # this is a random number - def initialize(job_connection_pool:, lock_record:) + def initialize(job_connection_pool:, lock_connection_pool:) @job_connection_pool = job_connection_pool - @lock_record = lock_record + @lock_connection_pool = lock_connection_pool super end + def checkout + checkout_lock_database_connection do + checkout_activerecord_adapter { |conn| yield conn.raw_connection } + end + rescue *AR_UNAVAILABLE_CONNECTION_ERRORS => e + raise UnavailableConnection, e + rescue ::ActiveRecord::StatementInvalid => e + raise e unless AR_UNAVAILABLE_CONNECTION_ERRORS.include?(e.cause.class) + + # ActiveRecord::StatementInvalid is one of the most generic exceptions AR can + # raise, so we catch it and only handle the specific nested exceptions. + raise UnavailableConnection, e.cause + end + def checkout_activerecord_adapter(&block) @job_connection_pool.with_connection(&block) end - def lock_database_connection - if Thread.current[:db_connection] - return Thread.current[:db_connection] if Thread.current[:db_connection].active? - end - # We are storing this in thread variable here to make sure - # same connection is used to acquire and release the advisory locks. - # Advisory lock will not be released if any other connection from the - # pool tries to release the lock - Thread.current[:db_connection] = @lock_record.connection + def checkout_lock_database_connection(&block) + @lock_connection_pool.with_connection(&block) end def execute(command, params = []) @@ -52,26 +57,23 @@ def lock_job_with_lock_database(queue, cursor) result end - def cleanup! - @job_connection_pool.release_connection - @lock_record.remove_connection - end - def pg_try_advisory_lock?(job_id) - lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i - lock_database_connection.execute( - "SELECT pg_try_advisory_lock(#{lock_variable})", - ).try(:first)&.fetch("pg_try_advisory_lock") + checkout_lock_database_connection do |conn| + conn.execute( + "SELECT pg_try_advisory_lock(#{job_id})", + ).try(:first)&.fetch("pg_try_advisory_lock") + end end def unlock_job(job_id) - lock_variable = "#{LOCK_PREFIX}#{job_id}".to_i # If for any reason the connection that is used to get this advisory lock # is corrupted, the lock on this job_id would already be released when the # connection holding the lock goes bad. # Now, if a new connection tries to release the non existing lock this would just no op # by returning false and return a warning "WARNING: you don't own a lock of type ExclusiveLock" - lock_database_connection.execute("SELECT pg_advisory_unlock(#{lock_variable})") + checkout_lock_database_connection do |conn| + conn.execute("SELECT pg_advisory_unlock(#{job_id})") + end end end end diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index 384230f..3c10fd0 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -4,11 +4,11 @@ class LockDatabaseRecord < ActiveRecord::Base establish_connection( adapter: "postgresql", host: ENV.fetch("LOCK_PGHOST", "localhost"), - user: ENV.fetch("LOCK_PGUSER", "ubuntu"), + user: ENV.fetch("LOCK_PGUSER", "postgres"), password: ENV.fetch("LOCK_PGPASSWORD", "password"), database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), port: ENV.fetch("LOCK_PGPORT", 5434), - pool: 6, + pool: 5, ) end @@ -25,6 +25,6 @@ class JobRecord < ActiveRecord::Base def active_record_with_lock_adapter_connection Que::Adapters::ActiveRecordWithLock.new( job_connection_pool: JobRecord.connection_pool, - lock_record: LockDatabaseRecord, + lock_connection_pool: LockDatabaseRecord.connection_pool, ) end