Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.2] Remove need to release jobs that have been reserved too long #13833

Merged
merged 1 commit into from
Jul 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 26 additions & 46 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

use DateTime;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use Illuminate\Database\Connection;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Database\Query\Expression;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class DatabaseQueue extends Queue implements QueueContract
Expand Down Expand Up @@ -161,14 +159,10 @@ public function pop($queue = null)
{
$queue = $this->getQueue($queue);

if (! is_null($this->expire)) {
$this->releaseJobsThatHaveBeenReservedTooLong($queue);
}

$this->database->beginTransaction();

if ($job = $this->getNextAvailableJob($queue)) {
$this->markJobAsReserved($job->id);
$job = $this->markJobAsReserved($job);

$this->database->commit();

Expand All @@ -180,38 +174,6 @@ public function pop($queue = null)
$this->database->commit();
}

/**
* Release the jobs that have been reserved for too long.
*
* @param string $queue
* @return void
*/
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
if (random_int(1, 10) < 10) {
return;
}

$this->database->beginTransaction();

$stale = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 1)
->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp())
->get();

$this->database->table($this->table)
->whereIn('id', Collection::make($stale)->pluck('id')->all())
->update([
'reserved' => 0,
'reserved_at' => null,
'attempts' => new Expression('attempts + 1'),
]);

$this->database->commit();
}

/**
* Get the next available job for the queue.
*
Expand All @@ -223,8 +185,18 @@ protected function getNextAvailableJob($queue)
$job = $this->database->table($this->table)
->lockForUpdate()
->where('queue', $this->getQueue($queue))
->where('reserved', 0)
->where('available_at', '<=', $this->getTime())
->where(function ($query) {
// Where not reserved.
$query->where(function ($query) {
$query->where('reserved', 0);
$query->where('available_at', '<=', $this->getTime());
});
// Or where reserved and reservation has expired.
$query->orWhere(function ($query) {
$query->where('reserved', 1);
$query->where('reserved_at', '<=', Carbon::now()->subSeconds($this->expire)->getTimestamp());
});
})
->orderBy('id', 'asc')
->first();

Expand All @@ -234,14 +206,22 @@ protected function getNextAvailableJob($queue)
/**
* Mark the given job ID as reserved.
*
* @param string $id
* @return void
* @param \stdClass $job
* @return \stdClass
*/
protected function markJobAsReserved($id)
protected function markJobAsReserved($job)
{
$this->database->table($this->table)->where('id', $id)->update([
'reserved' => 1, 'reserved_at' => $this->getTime(),
$job->reserved = 1;
$job->reserved_at = $this->getTime();
$job->attempts = ++$job->attempts;

$this->database->table($this->table)->where('id', $job->id)->update([
'reserved' => $job->reserved,
'reserved_at' => $job->reserved_at,
'attempts' => $job->attempts,
]);

return $job;
}

/**
Expand Down
1 change: 0 additions & 1 deletion src/Illuminate/Queue/Jobs/DatabaseJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public function __construct(Container $container, DatabaseQueue $database, $job,
$this->queue = $queue;
$this->database = $database;
$this->container = $container;
$this->job->attempts = $this->job->attempts + 1;
}

/**
Expand Down
218 changes: 218 additions & 0 deletions tests/Queue/QueueDatabaseQueueIntegrationTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
<?php

use Illuminate\Database\Capsule\Manager as DB;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Database\Eloquent\Model as Eloquent;
use \Illuminate\Queue\DatabaseQueue;
use Carbon\Carbon;
use Illuminate\Container\Container;

class QueueDatabaseQueueIntegrationTest extends PHPUnit_Framework_TestCase
{
/**
* @var DatabaseQueue The queue instance.
*/
protected $queue;

/**
* @var string The jobs table name.
*/
protected $table;

/**
* @var Container The IOC container.
*/
protected $container;

public function setUp()
{
$db = new DB;

$db->addConnection([
'driver' => 'sqlite',
'database' => ':memory:',
]);

$db->bootEloquent();

$db->setAsGlobal();

$this->table = 'jobs';

$this->queue = new DatabaseQueue($this->connection(), $this->table);

$this->container = $this->getMock(Container::class);

$this->queue->setContainer($this->container);

$this->createSchema();
}

/**
* Setup the database schema.
*
* @return void
*/
public function createSchema()
{
$this->schema()->create($this->table, function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('queue');
$table->longText('payload');
$table->tinyInteger('attempts')->unsigned();
$table->tinyInteger('reserved')->unsigned();
$table->unsignedInteger('reserved_at')->nullable();
$table->unsignedInteger('available_at');
$table->unsignedInteger('created_at');
$table->index(['queue', 'reserved', 'reserved_at']);
});
}

/**
* Get a database connection instance.
*
* @return \Illuminate\Database\Connection
*/
protected function connection()
{
return Eloquent::getConnectionResolver()->connection();
}

/**
* Get a schema builder instance.
*
* @return Illuminate\Database\Schema\Builder
*/
protected function schema()
{
return $this->connection()->getSchemaBuilder();
}

/**
* Tear down the database schema.
*
* @return void
*/
public function tearDown()
{
$this->schema()->drop('jobs');
}

/**
* Test that jobs that are not reserved and have an available_at value less then now, are popped.
*/
public function testAvailableAndUnReservedJobsArePopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNotNull($popped_job);
}

/**
* Test that when jobs are popped, the attempts attribute is incremented.
*/
public function testPoppedJobsIncrementAttempts()
{
$job = [
'id' => 1,
'queue' => 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->subSeconds(1)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
];

$this->connection()->table('jobs')->insert($job);

$popped_job = $this->queue->pop($job['queue']);

$database_record = $this->connection()->table('jobs')->find($job['id']);

$this->assertEquals(1, $database_record->attempts, 'Job attempts not updated in the database!');
$this->assertEquals(1, $popped_job->attempts(), 'The "attempts" attribute of the Job object was not updated by pop!');
}

/**
* Test that jobs that are not reserved and have an available_at value in the future, are not popped.
*/
public function testUnavailableJobsAreNotPopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => Carbon::now()->addSeconds(60)->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNull($popped_job);
}

/**
* Test that jobs that are reserved and have expired are popped.
*/
public function testThatReservedAndExpiredJobsArePopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 1,
'reserved_at' => Carbon::now()->subDay()->getTimestamp(),
'available_at' => Carbon::now()->addDay()->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNotNull($popped_job);
}

/**
* Test that jobs that are reserved and not expired and available are not popped.
*/
public function testThatReservedJobsAreNotPopped()
{
$this->connection()
->table('jobs')
->insert([
'id' => 1,
'queue' => $mock_queue_name = 'mock_queue_name',
'payload' => 'mock_payload',
'attempts' => 0,
'reserved' => 1,
'reserved_at' => Carbon::now()->addDay()->getTimestamp(),
'available_at' => Carbon::now()->subDay()->getTimestamp(),
'created_at' => Carbon::now()->getTimestamp(),
]);

$popped_job = $this->queue->pop($mock_queue_name);

$this->assertNull($popped_job);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use Mockery as m;

class QueueDatabaseQueueTest extends PHPUnit_Framework_TestCase
class QueueDatabaseQueueUnitTest extends PHPUnit_Framework_TestCase
{
public function tearDown()
{
Expand Down