From 63b61080b1383f8570526f85caf09dfea24c146d Mon Sep 17 00:00:00 2001 From: Hamid Alaei V Date: Sat, 2 Dec 2017 20:32:39 +0330 Subject: [PATCH] blocking pop from redis queues --- .../Queue/Connectors/RedisConnector.php | 3 +- src/Illuminate/Queue/RedisQueue.php | 50 ++++++++++++++++++- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/src/Illuminate/Queue/Connectors/RedisConnector.php b/src/Illuminate/Queue/Connectors/RedisConnector.php index ebac5a2bfa31..ffac75170db6 100644 --- a/src/Illuminate/Queue/Connectors/RedisConnector.php +++ b/src/Illuminate/Queue/Connectors/RedisConnector.php @@ -45,7 +45,8 @@ public function connect(array $config) return new RedisQueue( $this->redis, $config['queue'], $config['connection'] ?? $this->connection, - $config['retry_after'] ?? 60 + $config['retry_after'] ?? 60, + $config['block_for'] ?? 0 ); } } diff --git a/src/Illuminate/Queue/RedisQueue.php b/src/Illuminate/Queue/RedisQueue.php index b0bb6401f8b1..6ef43b635844 100644 --- a/src/Illuminate/Queue/RedisQueue.php +++ b/src/Illuminate/Queue/RedisQueue.php @@ -37,6 +37,13 @@ class RedisQueue extends Queue implements QueueContract */ protected $retryAfter = 60; + /** + * The maximum number of seconds to block for a job. + * + * @var int + */ + private $blockFor = 0; + /** * Create a new Redis queue instance. * @@ -44,14 +51,16 @@ class RedisQueue extends Queue implements QueueContract * @param string $default * @param string $connection * @param int $retryAfter + * @param int $blockFor * @return void */ - public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60) + public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = 0) { $this->redis = $redis; $this->default = $default; $this->connection = $connection; $this->retryAfter = $retryAfter; + $this->blockFor = $blockFor; } /** @@ -199,6 +208,45 @@ public function migrateExpiredJobs($from, $to) * @return array */ protected function retrieveNextJob($queue) + { + if ($this->blockFor >= 1) { + return $this->blockingPop($queue); + } + + return $this->nonBlockingPop($queue); + } + + /** + * Retrieve the next job by blocking-pop. + * + * @param string $queue + * @return array + */ + protected function blockingPop($queue) + { + $rawBody = $this->getConnection()->blpop($queue, $this->blockFor); + + if (! is_null($rawBody)) { + $payload = json_decode($rawBody[1], true); + $payload['attempts']++; + $reserved = json_encode($payload); + $this->getConnection()->zadd($queue.':reserved', [ + $reserved => $this->availableAt($this->retryAfter), + ]); + + return [$rawBody[1], $reserved]; + } + + return [null, null]; + } + + /** + * Retrieve the next job by Lua script. + * + * @param string $queue + * @return mixed + */ + protected function nonBlockingPop($queue) { return $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.':reserved',