Skip to content

Commit

Permalink
Working on Redis queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 29, 2016
1 parent 8da5a32 commit 7bb15cf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 50 deletions.
34 changes: 22 additions & 12 deletions src/Illuminate/Queue/LuaScripts.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@

class LuaScripts
{
/**
* Get the Lua script for computing the size of queue.
*
* @return string
*/
public static function size()
{
return <<<'LUA'
return redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
LUA;
}

/**
* Get the Lua script for popping the next job off of the queue.
*
Expand Down Expand Up @@ -41,31 +53,29 @@ public static function release()
/**
* Get the Lua script to migrate expired jobs back onto the queue.
*
* KEYS[1] - The queue we are removing jobs from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving jobs to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpiredJobs()
{
return <<<'LUA'
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate jobs onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return true
LUA;
}
/**
* Get the Lua script for computing the size of queue.
*
* @return string
*/
public static function size()
{
return <<<'LUA'
return redis.call('llen', KEYS[1]) + redis.call('zcard', KEYS[2]) + redis.call('zcard', KEYS[3])
return true
LUA;
}
}
66 changes: 34 additions & 32 deletions src/Illuminate/Queue/RedisQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public function size($queue = null)
{
$queue = $this->getQueue($queue);

return $this->getConnection()->eval(LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved');
return $this->getConnection()->eval(
LuaScripts::size(), 3, $queue, $queue.':delayed', $queue.':reserved'
);
}

/**
Expand Down Expand Up @@ -110,12 +112,28 @@ public function later($delay, $job, $data = '', $queue = null)
$payload = $this->createPayload($job, $data);

$this->getConnection()->zadd(
$this->getQueue($queue).':delayed', $this->currentTime() + $this->secondsUntil($delay), $payload
$this->getQueue($queue).':delayed', $this->availableAt($delay), $payload
);

return Arr::get(json_decode($payload, true), 'id');
}

/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayloadArray($job, $data = '', $queue = null)
{
return array_merge(parent::createPayloadArray($job, $data, $queue), [
'id' => $this->getRandomId(),
'attempts' => 1,
]);
}

/**
* Pop the next job off of the queue.
*
Expand Down Expand Up @@ -143,6 +161,20 @@ public function pop($queue = null)
}
}

/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return void
*/
public function migrateExpiredJobs($from, $to)
{
$this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}

/**
* Delete a reserved job from the queue.
*
Expand Down Expand Up @@ -173,36 +205,6 @@ public function deleteAndRelease($queue, $job, $delay)
);
}

/**
* Migrate the delayed jobs that are ready to the regular queue.
*
* @param string $from
* @param string $to
* @return void
*/
public function migrateExpiredJobs($from, $to)
{
$this->getConnection()->eval(
LuaScripts::migrateExpiredJobs(), 2, $from, $to, $this->currentTime()
);
}

/**
* Create a payload string from the given job and data.
*
* @param string $job
* @param mixed $data
* @param string $queue
* @return string
*/
protected function createPayloadArray($job, $data = '', $queue = null)
{
return array_merge(parent::createPayloadArray($job, $data, $queue), [
'id' => $this->getRandomId(),
'attempts' => 1,
]);
}

/**
* Get a random ID string.
*
Expand Down
10 changes: 4 additions & 6 deletions tests/Queue/QueueRedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ public function testPushProperlyPushesJobOntoRedis()

public function testDelayedPushProperlyPushesJobOntoRedis()
{
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['secondsUntil', 'currentTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo'));
$queue->expects($this->once())->method('secondsUntil')->with(1)->will($this->returnValue(1));
$queue->expects($this->once())->method('currentTime')->will($this->returnValue(1));
$queue->expects($this->once())->method('availableAt')->with(1)->will($this->returnValue(2));

$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('zadd')->once()->with(
Expand All @@ -41,10 +40,9 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
{
$date = Carbon\Carbon::now();
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['secondsUntil', 'currentTime', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
$queue = $this->getMockBuilder('Illuminate\Queue\RedisQueue')->setMethods(['availableAt', 'getRandomId'])->setConstructorArgs([$redis = m::mock('Illuminate\Contracts\Redis\Factory'), 'default'])->getMock();
$queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo'));
$queue->expects($this->once())->method('secondsUntil')->with($date)->will($this->returnValue(1));
$queue->expects($this->once())->method('currentTime')->will($this->returnValue(1));
$queue->expects($this->once())->method('availableAt')->with($date)->will($this->returnValue(2));

$redis->shouldReceive('connection')->once()->andReturn($redis);
$redis->shouldReceive('zadd')->once()->with(
Expand Down

0 comments on commit 7bb15cf

Please sign in to comment.