Skip to content

Commit

Permalink
Extract job delay calculations.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 29, 2016
1 parent 7ee233f commit 9bc8ca5
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 127 deletions.
32 changes: 16 additions & 16 deletions src/Illuminate/Contracts/Queue/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ interface Job
*/
public function fire();

/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0);

/**
* Delete the job from the queue.
*
Expand All @@ -25,14 +33,6 @@ public function delete();
*/
public function isDeleted();

/**
* Release the job back into the queue.
*
* @param int $delay
* @return void
*/
public function release($delay = 0);

/**
* Determine if the job has been deleted or released.
*
Expand All @@ -47,6 +47,14 @@ public function isDeletedOrReleased();
*/
public function attempts();

/**
* Process an exception that caused the job to fail.
*
* @param \Throwable $e
* @return void
*/
public function failed($e);

/**
* Get the name of the queued job class.
*
Expand All @@ -63,14 +71,6 @@ public function getName();
*/
public function resolveName();

/**
* Process an exception that caused the job to fail.
*
* @param \Throwable $e
* @return void
*/
public function failed($e);

/**
* Get the name of the connection the job belongs to.
*
Expand Down
2 changes: 1 addition & 1 deletion src/Illuminate/Queue/BeanstalkdQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public function later($delay, $job, $data = '', $queue = null)
return $pheanstalk->put(
$this->createPayload($job, $data),
Pheanstalk::DEFAULT_PRIORITY,
$this->getSeconds($delay),
$this->secondsUntil($delay),
$this->timeToRun
);
}
Expand Down
32 changes: 32 additions & 0 deletions src/Illuminate/Queue/CalculatesDelays.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Illuminate\Queue;

use Carbon\Carbon;
use DateTimeInterface;

trait CalculatesDelays
{
/**
* Get the number of seconds until the given DateTime.
*
* @param \DateTimeInterface $delay
* @return int
*/
protected function secondsUntil($delay)
{
return $delay instanceof DateTimeInterface
? max(0, $delay->getTimestamp() - $this->currentTime())
: (int) $delay;
}

/**
* Get the current system time as a UNIX timestamp.
*
* @return int
*/
protected function currentTime()
{
return Carbon::now()->getTimestamp();
}
}
6 changes: 3 additions & 3 deletions src/Illuminate/Queue/DatabaseQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ protected function isAvailable($query)
{
$query->where(function ($query) {
$query->whereNull('reserved_at');
$query->where('available_at', '<=', $this->getTime());
$query->where('available_at', '<=', $this->currentTime());
});
}

Expand Down Expand Up @@ -246,7 +246,7 @@ protected function isReservedButExpired($query)
protected function markJobAsReserved($job)
{
$job->attempts = $job->attempts + 1;
$job->reserved_at = $this->getTime();
$job->reserved_at = $this->currentTime();

$this->database->table($this->table)->where('id', $job->id)->update([
'reserved_at' => $job->reserved_at,
Expand Down Expand Up @@ -303,7 +303,7 @@ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts
'attempts' => $attempts,
'reserved_at' => null,
'available_at' => $availableAt,
'created_at' => $this->getTime(),
'created_at' => $this->currentTime(),
'payload' => $payload,
];
}
Expand Down
24 changes: 12 additions & 12 deletions src/Illuminate/Queue/Jobs/BeanstalkdJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ public function getRawBody()
return $this->job->getData();
}

/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();

$this->pheanstalk->delete($this->job);
}

/**
* Release the job back into the queue.
*
Expand Down Expand Up @@ -89,6 +77,18 @@ public function bury()
$this->pheanstalk->bury($this->job);
}

/**
* Delete the job from the queue.
*
* @return void
*/
public function delete()
{
parent::delete();

$this->pheanstalk->delete($this->job);
}

/**
* Get the number of times the job has been attempted.
*
Expand Down
52 changes: 8 additions & 44 deletions src/Illuminate/Queue/Jobs/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
use DateTime;
use Carbon\Carbon;
use Illuminate\Support\Arr;
use Illuminate\Queue\CalculatesDelays;

abstract class Job
{
use CalculatesDelays;

/**
* The job handler instance.
*
Expand All @@ -22,18 +25,6 @@ abstract class Job
*/
protected $container;

/**
* The name of the connection the job belongs to.
*/
protected $connectionName;

/**
* The name of the queue the job belongs to.
*
* @var string
*/
protected $queue;

/**
* Indicates if the job has been deleted.
*
Expand All @@ -49,18 +40,16 @@ abstract class Job
protected $released = false;

/**
* Get the number of times the job has been attempted.
*
* @return int
* The name of the connection the job belongs to.
*/
abstract public function attempts();
protected $connectionName;

/**
* Get the raw body string for the job.
* The name of the queue the job belongs to.
*
* @return string
* @var string
*/
abstract public function getRawBody();
protected $queue;

/**
* Fire the job.
Expand Down Expand Up @@ -172,31 +161,6 @@ protected function parseJob($job)
return count($segments) > 1 ? $segments : [$segments[0], 'fire'];
}

/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}

return (int) $delay;
}

/**
* Get the current system time.
*
* @return int
*/
protected function getTime()
{
return Carbon::now()->getTimestamp();
}

/**
* Get the name of the queued job class.
*
Expand Down
27 changes: 2 additions & 25 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

abstract class Queue
{
use CalculatesDelays;

/**
* The IoC container instance.
*
Expand Down Expand Up @@ -134,31 +136,6 @@ protected function setMeta($payload, $key, $value)
return $payload;
}

/**
* Calculate the number of seconds with the given delay.
*
* @param \DateTime|int $delay
* @return int
*/
protected function getSeconds($delay)
{
if ($delay instanceof DateTime) {
return max(0, $delay->getTimestamp() - $this->getTime());
}

return (int) $delay;
}

/**
* Get the current UNIX timestamp.
*
* @return int
*/
protected function getTime()
{
return Carbon::now()->getTimestamp();
}

/**
* Set the IoC container instance.
*
Expand Down
8 changes: 4 additions & 4 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function later($delay, $job, $data = '', $queue = null)
$payload = $this->createPayload($job, $data);

$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->getTime() + $this->getSeconds($delay), $payload
$this->getQueue($queue).':delayed', $this->currentTime() + $this->secondsUntil($delay), $payload
);

return Arr::get(json_decode($payload, true), 'id');
Expand All @@ -135,7 +135,7 @@ public function pop($queue = null)
}

list($job, $reserved) = $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire
LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->currentTime() + $this->expire
);

if ($reserved) {
Expand Down Expand Up @@ -169,7 +169,7 @@ public function deleteAndRelease($queue, $job, $delay)

$this->getConnection()->eval(
LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved',
$job, $this->getTime() + $delay
$job, $this->currentTime() + $delay
);
}

Expand All @@ -183,7 +183,7 @@ public function deleteAndRelease($queue, $job, $delay)
public function migrateExpiredJobs($from, $to)
{
$this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->getTime()
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}

Expand Down
8 changes: 2 additions & 6 deletions src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,10 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
$payload = $this->createPayload($job, $data);

$delay = $this->getSeconds($delay);

return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $payload,
'DelaySeconds' => $delay,
'MessageBody' => $this->createPayload($job, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}

Expand Down
12 changes: 6 additions & 6 deletions tests/Queue/QueueDatabaseQueueUnitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ public function tearDown()

public function testPushProperlyPushesJobOntoDatabase()
{
$queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['getTime'])->setConstructorArgs([$database = m::mock('Illuminate\Database\Connection'), 'table', 'default'])->getMock();
$queue->expects($this->any())->method('getTime')->will($this->returnValue('time'));
$queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime'])->setConstructorArgs([$database = m::mock('Illuminate\Database\Connection'), 'table', 'default'])->getMock();
$queue->expects($this->any())->method('currentTime')->will($this->returnValue('time'));
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass'));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) {
$this->assertEquals('default', $array['queue']);
Expand All @@ -29,10 +29,10 @@ public function testDelayedPushProperlyPushesJobOntoDatabase()
{
$queue = $this->getMockBuilder(
'Illuminate\Queue\DatabaseQueue')->setMethods(
['getTime'])->setConstructorArgs(
['currentTime'])->setConstructorArgs(
[$database = m::mock('Illuminate\Database\Connection'), 'table', 'default']
)->getMock();
$queue->expects($this->any())->method('getTime')->will($this->returnValue('time'));
$queue->expects($this->any())->method('currentTime')->will($this->returnValue('time'));
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass'));
$query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) {
$this->assertEquals('default', $array['queue']);
Expand Down Expand Up @@ -93,8 +93,8 @@ public function testFailureToCreatePayloadAfterAddingMeta()
public function testBulkBatchPushesOntoDatabase()
{
$database = m::mock('Illuminate\Database\Connection');
$queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['getTime', 'getAvailableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock();
$queue->expects($this->any())->method('getTime')->will($this->returnValue('created'));
$queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime', 'getAvailableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock();
$queue->expects($this->any())->method('currentTime')->will($this->returnValue('created'));
$queue->expects($this->any())->method('getAvailableAt')->will($this->returnValue('available'));
$database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass'));
$query->shouldReceive('insert')->once()->andReturnUsing(function ($records) {
Expand Down
Loading

0 comments on commit 9bc8ca5

Please sign in to comment.