Skip to content

Commit

Permalink
database queue: job pesimistic -> optimistic locking
Browse files Browse the repository at this point in the history
  • Loading branch information
tillkruss authored and ph4r05 committed Dec 12, 2017
1 parent 242d438 commit 60fcddd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/Illuminate/Queue/Console/stubs/jobs.stub
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Create{{tableClassName}}Table extends Migration
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->unsignedInteger('version');
});
}

Expand Down
60 changes: 42 additions & 18 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Illuminate\Queue;

use Illuminate\Database\Query\Expression;
use Illuminate\Support\Carbon;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
Expand Down Expand Up @@ -176,6 +177,7 @@ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts
'available_at' => $availableAt,
'created_at' => $this->currentTime(),
'payload' => $payload,
'version' => 0
];
}

Expand All @@ -189,13 +191,31 @@ public function pop($queue = null)
{
$queue = $this->getQueue($queue);

$this->database->beginTransaction();

if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}

$this->database->commit();
// Pops one job of the queue or return null if there is no job to process.
//
// In order to preserve job ordering we have to pick the first available job.
// Workers compete for the first available job in the queue.
//
// Load the first available job and try to claim it.
// During the competition it may happen another worker claims the job before we do
// which can be easily handled and detected with optimistic locking.
//
// In that case we try to load another job
// because there are apparently some more jobs in the database and pop() is supposed
// to return such job if there is one or return null if there are no jobs so worker
// can sleep(). Thus we have to attempt to claim jobs until there are some.
$job = null;
do {
if ($job = $this->getNextAvailableJob($queue)) {

// job is not null, try to claim it
$jobClaimed = $this->marshalJob($queue, $job);
if (!empty($jobClaimed)) {
// job was successfully claimed, return it.
return $jobClaimed;
}
}
} while($job);
}

/**
Expand All @@ -207,7 +227,6 @@ public function pop($queue = null)
protected function getNextAvailableJob($queue)
{
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where(function ($query) {
$this->isAvailable($query);
Expand Down Expand Up @@ -258,8 +277,9 @@ protected function isReservedButExpired($query)
protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);

$this->database->commit();
if (empty($job)){
return null;
}

return new DatabaseJob(
$this->container, $this, $job, $this->connectionName, $queue
Expand All @@ -274,12 +294,16 @@ protected function marshalJob($queue, $job)
*/
protected function markJobAsReserved($job)
{
$this->database->table($this->table)->where('id', $job->id)->update([
$affected = $this->database->table($this->table)
->where('id', $job->id)
->where('version', $job->version)
->update([
'reserved_at' => $job->touch(),
'attempts' => $job->increment(),
'version' => new Expression('version + 1'),
]);

return $job;
return $affected ? $job : null;
}

/**
Expand All @@ -291,13 +315,13 @@ protected function markJobAsReserved($job)
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();

if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
$job = $this->database->table($this->table)->find($id);

This comment has been minimized.

Copy link
@ph4r05

ph4r05 Dec 12, 2017

Owner

We might not need to load the job at all.

I've decided to preserve the original code and just fix the transaction management. But I am wondering what is the reason to lock a job for update by id when the only action then is delete by id.

I cannot see the reason why not just do delete by id directly in the transaction. Deleting directly would be faster and I think it is functionally equivalent. Just wondering (not needed to change it in this PR). Any thoughts? Maybe in 5.6?

if ($job){
$this->database->table($this->table)
->where('id', $id)
->where('version', $job->version)
->delete();
}

$this->database->commit();
}

/**
Expand Down

0 comments on commit 60fcddd

Please sign in to comment.