Skip to content

Commit

Permalink
5.5 database queue - transactions wrapped in closures, retry attempts
Browse files Browse the repository at this point in the history
- fixes #7046
  • Loading branch information
ph4r05 committed Dec 12, 2017
1 parent 242d438 commit 88802fa
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/DatabaseConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public function connect(array $config)
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60
$config['retry_after'] ?? 60,
$config['transaction_attempts'] ?? 1
);
}
}
38 changes: 22 additions & 16 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class DatabaseQueue extends Queue implements QueueContract
*/
protected $retryAfter = 60;

/**
* The transaction retry counter for job select & job delete.
*
* @var int
*/
protected $transactionAttempts = 1;

/**
* Create a new database queue instance.
*
Expand All @@ -47,12 +54,13 @@ class DatabaseQueue extends Queue implements QueueContract
* @param int $retryAfter
* @return void
*/
public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60)
public function __construct(Connection $database, $table, $default = 'default', $retryAfter = 60, $transactionAttempts = 1)
{
$this->table = $table;
$this->default = $default;
$this->database = $database;
$this->retryAfter = $retryAfter;
$this->transactionAttempts = $transactionAttempts;
}

/**
Expand Down Expand Up @@ -184,18 +192,19 @@ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts
*
* @param string $queue
* @return \Illuminate\Contracts\Queue\Job|null
* @throws \Exception|\Throwable
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);

$this->database->beginTransaction();

if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}
return $this->database->transaction(function () use ($queue) {
if ($job = $this->getNextAvailableJob($queue)) {
return $this->marshalJob($queue, $job);
}

$this->database->commit();
return null;
}, $this->transactionAttempts);
}

/**
Expand Down Expand Up @@ -259,8 +268,6 @@ protected function marshalJob($queue, $job)
{
$job = $this->markJobAsReserved($job);

$this->database->commit();

return new DatabaseJob(
$this->container, $this, $job, $this->connectionName, $queue
);
Expand Down Expand Up @@ -288,16 +295,15 @@ protected function markJobAsReserved($job)
* @param string $queue
* @param string $id
* @return void
* @throws \Exception|\Throwable
*/
public function deleteReserved($queue, $id)
{
$this->database->beginTransaction();

if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}

$this->database->commit();
$this->database->transaction(function () use ($queue, $id) {
if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
$this->database->table($this->table)->where('id', $id)->delete();
}
}, $this->transactionAttempts);
}

/**
Expand Down

0 comments on commit 88802fa

Please sign in to comment.