Skip to content

Commit

Permalink
blocking pop from redis queues
Browse files Browse the repository at this point in the history
  • Loading branch information
halaei committed Dec 2, 2017
1 parent 64ff115 commit 63b6108
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/Illuminate/Queue/Connectors/RedisConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public function connect(array $config)
return new RedisQueue(
$this->redis, $config['queue'],
$config['connection'] ?? $this->connection,
$config['retry_after'] ?? 60
$config['retry_after'] ?? 60,
$config['block_for'] ?? 0
);
}
}
50 changes: 49 additions & 1 deletion src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,30 @@ class RedisQueue extends Queue implements QueueContract
*/
protected $retryAfter = 60;

/**
* The maximum number of seconds to block for a job.
*
* @var int
*/
private $blockFor = 0;

/**
* Create a new Redis queue instance.
*
* @param \Illuminate\Contracts\Redis\Factory $redis
* @param string $default
* @param string $connection
* @param int $retryAfter
* @param int $blockFor
* @return void
*/
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60)
public function __construct(Redis $redis, $default = 'default', $connection = null, $retryAfter = 60, $blockFor = 0)
{
$this->redis = $redis;
$this->default = $default;
$this->connection = $connection;
$this->retryAfter = $retryAfter;
$this->blockFor = $blockFor;
}

/**
Expand Down Expand Up @@ -199,6 +208,45 @@ public function migrateExpiredJobs($from, $to)
* @return array
*/
protected function retrieveNextJob($queue)
{
if ($this->blockFor >= 1) {
return $this->blockingPop($queue);
}

return $this->nonBlockingPop($queue);
}

/**
* Retrieve the next job by blocking-pop.
*
* @param string $queue
* @return array
*/
protected function blockingPop($queue)
{
$rawBody = $this->getConnection()->blpop($queue, $this->blockFor);

if (! is_null($rawBody)) {
$payload = json_decode($rawBody[1], true);
$payload['attempts']++;
$reserved = json_encode($payload);
$this->getConnection()->zadd($queue.':reserved', [
$reserved => $this->availableAt($this->retryAfter),
]);

return [$rawBody[1], $reserved];
}

return [null, null];
}

/**
* Retrieve the next job by Lua script.
*
* @param string $queue
* @return mixed
*/
protected function nonBlockingPop($queue)
{
return $this->getConnection()->eval(
LuaScripts::pop(), 2, $queue, $queue.':reserved',
Expand Down

0 comments on commit 63b6108

Please sign in to comment.