-
Notifications
You must be signed in to change notification settings - Fork 11.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[5.5] database queue - transactions management fix #22394
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,18 +184,24 @@ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts | |
* | ||
* @param string $queue | ||
* @return \Illuminate\Contracts\Queue\Job|null | ||
* @throws \Exception|\Throwable | ||
*/ | ||
public function pop($queue = null) | ||
{ | ||
$queue = $this->getQueue($queue); | ||
|
||
$this->database->beginTransaction(); | ||
try { | ||
$this->database->beginTransaction(); | ||
|
||
if ($job = $this->getNextAvailableJob($queue)) { | ||
return $this->marshalJob($queue, $job); | ||
} | ||
if ($job = $this->getNextAvailableJob($queue)) { | ||
return $this->marshalJob($queue, $job); | ||
} | ||
|
||
$this->database->commit(); | ||
$this->database->commit(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just noticed @sisve comment. If we want to keep the call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. marshalJob only executes if we've found a job, we still need to call commit() here to release any locks acquired by getNextAvailableJob There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. If The commit here terminates the transaction correctly if there are no jobs in DB. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True that. A little confusing. Would be nice to get this merged into 5.5 and another PR that clean this up targeting 5.6 (after this gets merged). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @deleugpn thanks that would be great I think. |
||
} catch (\Throwable $ex) { | ||
$this->database->rollBack(); | ||
throw $ex; | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -288,16 +294,15 @@ protected function markJobAsReserved($job) | |
* @param string $queue | ||
* @param string $id | ||
* @return void | ||
* @throws \Exception|\Throwable | ||
*/ | ||
public function deleteReserved($queue, $id) | ||
{ | ||
$this->database->beginTransaction(); | ||
|
||
if ($this->database->table($this->table)->lockForUpdate()->find($id)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've decided to preserve the original code and just fix the transaction management. But I am wondering what is the reason to lock a job for update by I cannot see the reason why not just do delete by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In this scenario, without the lock I think you run an incredibly small chance of Worker B picking the job right before Worker A deletes it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @deleugpn Thanks for the example! Hmm. If the retry is 10 second the job is eligible for re-run (is considered expired) after 10 seconds worker A claimed it even if the job is still running on worker A. This could be a problem of the timing and probably should not happen. Usually its not desired to process the same job twice and concurrently. I guess one should set retry time so that it is greater than any job that can potentially run. If I get it right, this preemption can happen even with locking - before A execution flow reaches On the other hand, imagine the the worker B took the job that is still being processed by A. A will reach There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK lets wrap this up so we can merge and maybe discuss later for 5.6 patch. Thanks! |
||
$this->database->table($this->table)->where('id', $id)->delete(); | ||
} | ||
|
||
$this->database->commit(); | ||
$this->database->transaction(function () use ($queue, $id) { | ||
if ($this->database->table($this->table)->lockForUpdate()->find($id)) { | ||
$this->database->table($this->table)->where('id', $id)->delete(); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
return
statement will prevent the$this->database->commit();
from ever executing. However, there's also acommit
call insidemarshalJob()
. That could lead to confusion. Can we remove the commit call frommarshalJob
and avoid this early return with a temporarily variable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed removing
commit
with @sisve - he correctly pointed out that it might be a breaking change as themarshalJob()
does not commit the transaction anymore (if any thirdparty code happen to rely on that).This breaking change could be added to 5.6 branch and I would rather then wrap the whole transaction in the closure to make it cleaner.