From 9bc8ca502687f29761b9eb78f70db6e3c3f0a09e Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Thu, 29 Dec 2016 08:50:55 -0600 Subject: [PATCH] Extract job delay calculations. --- src/Illuminate/Contracts/Queue/Job.php | 32 ++++++------- src/Illuminate/Queue/BeanstalkdQueue.php | 2 +- src/Illuminate/Queue/CalculatesDelays.php | 32 +++++++++++++ src/Illuminate/Queue/DatabaseQueue.php | 6 +-- src/Illuminate/Queue/Jobs/BeanstalkdJob.php | 24 +++++----- src/Illuminate/Queue/Jobs/Job.php | 52 ++++----------------- src/Illuminate/Queue/Queue.php | 27 +---------- src/Illuminate/Queue/RedisQueue.php | 8 ++-- src/Illuminate/Queue/SqsQueue.php | 8 +--- tests/Queue/QueueDatabaseQueueUnitTest.php | 12 ++--- tests/Queue/QueueRedisQueueTest.php | 12 ++--- tests/Queue/QueueSqsQueueTest.php | 8 ++-- 12 files changed, 96 insertions(+), 127 deletions(-) create mode 100644 src/Illuminate/Queue/CalculatesDelays.php diff --git a/src/Illuminate/Contracts/Queue/Job.php b/src/Illuminate/Contracts/Queue/Job.php index 84e688100c5a..2875187b3cf3 100644 --- a/src/Illuminate/Contracts/Queue/Job.php +++ b/src/Illuminate/Contracts/Queue/Job.php @@ -11,6 +11,14 @@ interface Job */ public function fire(); + /** + * Release the job back into the queue. + * + * @param int $delay + * @return void + */ + public function release($delay = 0); + /** * Delete the job from the queue. * @@ -25,14 +33,6 @@ public function delete(); */ public function isDeleted(); - /** - * Release the job back into the queue. - * - * @param int $delay - * @return void - */ - public function release($delay = 0); - /** * Determine if the job has been deleted or released. * @@ -47,6 +47,14 @@ public function isDeletedOrReleased(); */ public function attempts(); + /** + * Process an exception that caused the job to fail. + * + * @param \Throwable $e + * @return void + */ + public function failed($e); + /** * Get the name of the queued job class. * @@ -63,14 +71,6 @@ public function getName(); */ public function resolveName(); - /** - * Process an exception that caused the job to fail. - * - * @param \Throwable $e - * @return void - */ - public function failed($e); - /** * Get the name of the connection the job belongs to. * diff --git a/src/Illuminate/Queue/BeanstalkdQueue.php b/src/Illuminate/Queue/BeanstalkdQueue.php index 44b36803ddde..b34e0d7e050b 100755 --- a/src/Illuminate/Queue/BeanstalkdQueue.php +++ b/src/Illuminate/Queue/BeanstalkdQueue.php @@ -102,7 +102,7 @@ public function later($delay, $job, $data = '', $queue = null) return $pheanstalk->put( $this->createPayload($job, $data), Pheanstalk::DEFAULT_PRIORITY, - $this->getSeconds($delay), + $this->secondsUntil($delay), $this->timeToRun ); } diff --git a/src/Illuminate/Queue/CalculatesDelays.php b/src/Illuminate/Queue/CalculatesDelays.php new file mode 100644 index 000000000000..5d05135833f2 --- /dev/null +++ b/src/Illuminate/Queue/CalculatesDelays.php @@ -0,0 +1,32 @@ +getTimestamp() - $this->currentTime()) + : (int) $delay; + } + + /** + * Get the current system time as a UNIX timestamp. + * + * @return int + */ + protected function currentTime() + { + return Carbon::now()->getTimestamp(); + } +} diff --git a/src/Illuminate/Queue/DatabaseQueue.php b/src/Illuminate/Queue/DatabaseQueue.php index d6298db5b482..b516b50c7dac 100644 --- a/src/Illuminate/Queue/DatabaseQueue.php +++ b/src/Illuminate/Queue/DatabaseQueue.php @@ -218,7 +218,7 @@ protected function isAvailable($query) { $query->where(function ($query) { $query->whereNull('reserved_at'); - $query->where('available_at', '<=', $this->getTime()); + $query->where('available_at', '<=', $this->currentTime()); }); } @@ -246,7 +246,7 @@ protected function isReservedButExpired($query) protected function markJobAsReserved($job) { $job->attempts = $job->attempts + 1; - $job->reserved_at = $this->getTime(); + $job->reserved_at = $this->currentTime(); $this->database->table($this->table)->where('id', $job->id)->update([ 'reserved_at' => $job->reserved_at, @@ -303,7 +303,7 @@ protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts 'attempts' => $attempts, 'reserved_at' => null, 'available_at' => $availableAt, - 'created_at' => $this->getTime(), + 'created_at' => $this->currentTime(), 'payload' => $payload, ]; } diff --git a/src/Illuminate/Queue/Jobs/BeanstalkdJob.php b/src/Illuminate/Queue/Jobs/BeanstalkdJob.php index ce6d3cbd6a41..198f2fb4ec84 100755 --- a/src/Illuminate/Queue/Jobs/BeanstalkdJob.php +++ b/src/Illuminate/Queue/Jobs/BeanstalkdJob.php @@ -50,18 +50,6 @@ public function getRawBody() return $this->job->getData(); } - /** - * Delete the job from the queue. - * - * @return void - */ - public function delete() - { - parent::delete(); - - $this->pheanstalk->delete($this->job); - } - /** * Release the job back into the queue. * @@ -89,6 +77,18 @@ public function bury() $this->pheanstalk->bury($this->job); } + /** + * Delete the job from the queue. + * + * @return void + */ + public function delete() + { + parent::delete(); + + $this->pheanstalk->delete($this->job); + } + /** * Get the number of times the job has been attempted. * diff --git a/src/Illuminate/Queue/Jobs/Job.php b/src/Illuminate/Queue/Jobs/Job.php index 10b9f622ff27..81261b770e8f 100755 --- a/src/Illuminate/Queue/Jobs/Job.php +++ b/src/Illuminate/Queue/Jobs/Job.php @@ -5,9 +5,12 @@ use DateTime; use Carbon\Carbon; use Illuminate\Support\Arr; +use Illuminate\Queue\CalculatesDelays; abstract class Job { + use CalculatesDelays; + /** * The job handler instance. * @@ -22,18 +25,6 @@ abstract class Job */ protected $container; - /** - * The name of the connection the job belongs to. - */ - protected $connectionName; - - /** - * The name of the queue the job belongs to. - * - * @var string - */ - protected $queue; - /** * Indicates if the job has been deleted. * @@ -49,18 +40,16 @@ abstract class Job protected $released = false; /** - * Get the number of times the job has been attempted. - * - * @return int + * The name of the connection the job belongs to. */ - abstract public function attempts(); + protected $connectionName; /** - * Get the raw body string for the job. + * The name of the queue the job belongs to. * - * @return string + * @var string */ - abstract public function getRawBody(); + protected $queue; /** * Fire the job. @@ -172,31 +161,6 @@ protected function parseJob($job) return count($segments) > 1 ? $segments : [$segments[0], 'fire']; } - /** - * Calculate the number of seconds with the given delay. - * - * @param \DateTime|int $delay - * @return int - */ - protected function getSeconds($delay) - { - if ($delay instanceof DateTime) { - return max(0, $delay->getTimestamp() - $this->getTime()); - } - - return (int) $delay; - } - - /** - * Get the current system time. - * - * @return int - */ - protected function getTime() - { - return Carbon::now()->getTimestamp(); - } - /** * Get the name of the queued job class. * diff --git a/src/Illuminate/Queue/Queue.php b/src/Illuminate/Queue/Queue.php index 5b91ba90200a..f5ee3df5a0da 100755 --- a/src/Illuminate/Queue/Queue.php +++ b/src/Illuminate/Queue/Queue.php @@ -10,6 +10,8 @@ abstract class Queue { + use CalculatesDelays; + /** * The IoC container instance. * @@ -134,31 +136,6 @@ protected function setMeta($payload, $key, $value) return $payload; } - /** - * Calculate the number of seconds with the given delay. - * - * @param \DateTime|int $delay - * @return int - */ - protected function getSeconds($delay) - { - if ($delay instanceof DateTime) { - return max(0, $delay->getTimestamp() - $this->getTime()); - } - - return (int) $delay; - } - - /** - * Get the current UNIX timestamp. - * - * @return int - */ - protected function getTime() - { - return Carbon::now()->getTimestamp(); - } - /** * Set the IoC container instance. * diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index cee02757e5e8..1992878bc31b 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -110,7 +110,7 @@ public function later($delay, $job, $data = '', $queue = null) $payload = $this->createPayload($job, $data); $this->getConnection()->zadd( - $this->getQueue($queue).':delayed', $this->getTime() + $this->getSeconds($delay), $payload + $this->getQueue($queue).':delayed', $this->currentTime() + $this->secondsUntil($delay), $payload ); return Arr::get(json_decode($payload, true), 'id'); @@ -135,7 +135,7 @@ public function pop($queue = null) } list($job, $reserved) = $this->getConnection()->eval( - LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire + LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->currentTime() + $this->expire ); if ($reserved) { @@ -169,7 +169,7 @@ public function deleteAndRelease($queue, $job, $delay) $this->getConnection()->eval( LuaScripts::release(), 2, $queue.':delayed', $queue.':reserved', - $job, $this->getTime() + $delay + $job, $this->currentTime() + $delay ); } @@ -183,7 +183,7 @@ public function deleteAndRelease($queue, $job, $delay) public function migrateExpiredJobs($from, $to) { $this->getConnection()->eval( - LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->getTime() + LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime() ); } diff --git a/src/Illuminate/Queue/SqsQueue.php b/src/Illuminate/Queue/SqsQueue.php index f7add815ce2e..f2fc34f2bad1 100755 --- a/src/Illuminate/Queue/SqsQueue.php +++ b/src/Illuminate/Queue/SqsQueue.php @@ -98,14 +98,10 @@ public function pushRaw($payload, $queue = null, array $options = []) */ public function later($delay, $job, $data = '', $queue = null) { - $payload = $this->createPayload($job, $data); - - $delay = $this->getSeconds($delay); - return $this->sqs->sendMessage([ 'QueueUrl' => $this->getQueue($queue), - 'MessageBody' => $payload, - 'DelaySeconds' => $delay, + 'MessageBody' => $this->createPayload($job, $data), + 'DelaySeconds' => $this->secondsUntil($delay), ])->get('MessageId'); } diff --git a/tests/Queue/QueueDatabaseQueueUnitTest.php b/tests/Queue/QueueDatabaseQueueUnitTest.php index 5dbe7bd23481..8e9ce36e5e26 100644 --- a/tests/Queue/QueueDatabaseQueueUnitTest.php +++ b/tests/Queue/QueueDatabaseQueueUnitTest.php @@ -11,8 +11,8 @@ public function tearDown() public function testPushProperlyPushesJobOntoDatabase() { - $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['getTime'])->setConstructorArgs([$database = m::mock('Illuminate\Database\Connection'), 'table', 'default'])->getMock(); - $queue->expects($this->any())->method('getTime')->will($this->returnValue('time')); + $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime'])->setConstructorArgs([$database = m::mock('Illuminate\Database\Connection'), 'table', 'default'])->getMock(); + $queue->expects($this->any())->method('currentTime')->will($this->returnValue('time')); $database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass')); $query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) { $this->assertEquals('default', $array['queue']); @@ -29,10 +29,10 @@ public function testDelayedPushProperlyPushesJobOntoDatabase() { $queue = $this->getMockBuilder( 'Illuminate\Queue\DatabaseQueue')->setMethods( - ['getTime'])->setConstructorArgs( + ['currentTime'])->setConstructorArgs( [$database = m::mock('Illuminate\Database\Connection'), 'table', 'default'] )->getMock(); - $queue->expects($this->any())->method('getTime')->will($this->returnValue('time')); + $queue->expects($this->any())->method('currentTime')->will($this->returnValue('time')); $database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass')); $query->shouldReceive('insertGetId')->once()->andReturnUsing(function ($array) { $this->assertEquals('default', $array['queue']); @@ -93,8 +93,8 @@ public function testFailureToCreatePayloadAfterAddingMeta() public function testBulkBatchPushesOntoDatabase() { $database = m::mock('Illuminate\Database\Connection'); - $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['getTime', 'getAvailableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock(); - $queue->expects($this->any())->method('getTime')->will($this->returnValue('created')); + $queue = $this->getMockBuilder('Illuminate\Queue\DatabaseQueue')->setMethods(['currentTime', 'getAvailableAt'])->setConstructorArgs([$database, 'table', 'default'])->getMock(); + $queue->expects($this->any())->method('currentTime')->will($this->returnValue('created')); $queue->expects($this->any())->method('getAvailableAt')->will($this->returnValue('available')); $database->shouldReceive('table')->with('table')->andReturn($query = m::mock('StdClass')); $query->shouldReceive('insert')->once()->andReturnUsing(function ($records) { diff --git a/tests/Queue/QueueRedisQueueTest.php b/tests/Queue/QueueRedisQueueTest.php index b2039d31ab4f..e66f7e8a6ef7 100644 --- a/tests/Queue/QueueRedisQueueTest.php +++ b/tests/Queue/QueueRedisQueueTest.php @@ -22,10 +22,10 @@ public function testPushProperlyPushesJobOntoRedis() public function testDelayedPushProperlyPushesJobOntoRedis() { - $queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['getSeconds', 'getTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock(); + $queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['secondsUntil', 'currentTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock(); $queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo')); - $queue->expects($this->once())->method('getSeconds')->with(1)->will($this->returnValue(1)); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); + $queue->expects($this->once())->method('secondsUntil')->with(1)->will($this->returnValue(1)); + $queue->expects($this->once())->method('currentTime')->will($this->returnValue(1)); $redis->shouldReceive('connection')->once()->andReturn($redis); $redis->shouldReceive('zadd')->once()->with( @@ -41,10 +41,10 @@ public function testDelayedPushProperlyPushesJobOntoRedis() public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis() { $date = Carbon\Carbon::now(); - $queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['getSeconds', 'getTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock(); + $queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['secondsUntil', 'currentTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock(); $queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo')); - $queue->expects($this->once())->method('getSeconds')->with($date)->will($this->returnValue(1)); - $queue->expects($this->once())->method('getTime')->will($this->returnValue(1)); + $queue->expects($this->once())->method('secondsUntil')->with($date)->will($this->returnValue(1)); + $queue->expects($this->once())->method('currentTime')->will($this->returnValue(1)); $redis->shouldReceive('connection')->once()->andReturn($redis); $redis->shouldReceive('zadd')->once()->with( diff --git a/tests/Queue/QueueSqsQueueTest.php b/tests/Queue/QueueSqsQueueTest.php index 7cc67d7035d9..35efd8122d43 100755 --- a/tests/Queue/QueueSqsQueueTest.php +++ b/tests/Queue/QueueSqsQueueTest.php @@ -58,9 +58,9 @@ public function testPopProperlyPopsJobOffOfSqs() public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs() { $now = Carbon\Carbon::now(); - $queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'getSeconds', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); + $queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); $queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->mockedData)->will($this->returnValue($this->mockedPayload)); - $queue->expects($this->once())->method('getSeconds')->with($now)->will($this->returnValue(5)); + $queue->expects($this->once())->method('secondsUntil')->with($now)->will($this->returnValue(5)); $queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl)); $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel); $id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName); @@ -69,9 +69,9 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs() public function testDelayedPushProperlyPushesJobOntoSqs() { - $queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'getSeconds', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); + $queue = $this->getMockBuilder('Illuminate\Queue\SqsQueue')->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock(); $queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->mockedData)->will($this->returnValue($this->mockedPayload)); - $queue->expects($this->once())->method('getSeconds')->with($this->mockedDelay)->will($this->returnValue($this->mockedDelay)); + $queue->expects($this->once())->method('secondsUntil')->with($this->mockedDelay)->will($this->returnValue($this->mockedDelay)); $queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl)); $this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel); $id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName);