Skip to content

Commit

Permalink
Apply old PR 135
Browse files Browse the repository at this point in the history
  • Loading branch information
danhunsaker committed Dec 11, 2018
1 parent f87b002 commit e52ee4c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 47 deletions.
5 changes: 3 additions & 2 deletions lib/Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,11 @@ public static function size($queue)
* @param string $class The name of the class that contains the code to execute the job.
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $trackStatus Set to true to be able to monitor the status of a job.
* @param string $prefix The prefix needs to be set for the status key
*
* @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
*/
public static function enqueue($queue, $class, $args = null, $trackStatus = false)
public static function enqueue($queue, $class, $args = null, $trackStatus = false, $prefix = "")
{
$id = Resque::generateJobId();
$hookParams = array(
Expand All @@ -232,7 +233,7 @@ public static function enqueue($queue, $class, $args = null, $trackStatus = fals
return false;
}

Resque_Job::create($queue, $class, $args, $trackStatus, $id);
Resque_Job::create($queue, $class, $args, $trackStatus, $id, $prefix);
Resque_Event::trigger('afterEnqueue', $hookParams);

return $id;
Expand Down
20 changes: 11 additions & 9 deletions lib/Resque/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ public function __construct($queue, $payload)
* @param array $args Any optional arguments that should be passed when the job is executed.
* @param boolean $monitor Set to true to be able to monitor the status of a job.
* @param string $id Unique identifier for tracking the job. Generated if not supplied.
* @param string $prefix The prefix needs to be set for the status key
*
* @return string
* @throws \InvalidArgumentException
*/
public static function create($queue, $class, $args = null, $monitor = false, $id = null)
public static function create($queue, $class, $args = null, $monitor = false, $id = null, $prefix = "")
{
if (is_null($id)) {
$id = Resque::generateJobId();
Expand All @@ -69,14 +70,15 @@ public static function create($queue, $class, $args = null, $monitor = false, $i
);
}
Resque::push($queue, array(
'class' => $class,
'args' => array($args),
'id' => $id,
'class' => $class,
'args' => array($args),
'id' => $id,
'prefix' => $prefix,
'queue_time' => microtime(true),
));

if($monitor) {
Resque_Job_Status::create($id);
Resque_Job_Status::create($id, $prefix);
}

return $id;
Expand Down Expand Up @@ -129,7 +131,7 @@ public function updateStatus($status, $result = null)
return;
}

$statusInstance = new Resque_Job_Status($this->payload['id']);
$statusInstance = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
$statusInstance->update($status, $result);
}

Expand All @@ -140,7 +142,7 @@ public function updateStatus($status, $result = null)
*/
public function getStatus()
{
$status = new Resque_Job_Status($this->payload['id']);
$status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
return $status->get();
}

Expand Down Expand Up @@ -237,13 +239,13 @@ public function fail($exception)
*/
public function recreate()
{
$status = new Resque_Job_Status($this->payload['id']);
$status = new Resque_Job_Status($this->payload['id'], $this->payload['prefix']);
$monitor = false;
if($status->isTracking()) {
$monitor = true;
}

return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor);
return self::create($this->queue, $this->payload['class'], $this->getArguments(), $monitor, $this->payload['prefix']);
}

/**
Expand Down
71 changes: 35 additions & 36 deletions lib/Resque/Job/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class Resque_Job_Status
const STATUS_FAILED = 3;
const STATUS_COMPLETE = 4;

/**
* @var string The prefix of the job status id.
*/
private $prefix;

/**
* @var string The ID of the job this status class refers back to.
*/
Expand All @@ -37,9 +42,10 @@ class Resque_Job_Status
*
* @param string $id The ID of the job to manage the status for.
*/
public function __construct($id)
public function __construct($id, $prefix = '')
{
$this->id = $id;
$this->prefix = empty($prefix) ? '' : "${prefix}_";
}

/**
Expand All @@ -48,14 +54,18 @@ public function __construct($id)
*
* @param string $id The ID of the job to monitor the status of.
*/
public static function create($id)
public static function create($id, $prefix = "")
{
$status = new self($id, $prefix);
$statusPacket = array(
'status' => self::STATUS_WAITING,
'status' => self::STATUS_WAITING,
'updated' => time(),
'started' => time(),
'result' => null,
);
Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket));
Resque::redis()->set((string) $status, json_encode($statusPacket));

return $status;
}

/**
Expand Down Expand Up @@ -91,9 +101,10 @@ public function update($status, $result = null)
}

$statusPacket = array(
'status' => $status,
'status' => $status,
'updated' => time(),
'result' => $result,
'started' => $this->getValue('started'),
'result' => $result,
);
Resque::redis()->set((string)$this, json_encode($statusPacket));

Expand All @@ -104,12 +115,12 @@ public function update($status, $result = null)
}

/**
* Fetch the status for the job being monitored.
* Fetch a value from the status packet for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the status
* as an integer, based on the Resque_Job_Status constants.
* @return mixed False if the status is not being monitored, otherwise the
* requested value from the status packet.
*/
public function get()
protected function getValue($value = null)
{
if(!$this->isTracking()) {
return false;
Expand All @@ -120,7 +131,18 @@ public function get()
return false;
}

return $statusPacket['status'];
return empty($value) ? $statusPacket : $statusPacket[$value];
}

/**
* Fetch the status for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the status
* as an integer, based on the Resque_Job_Status constants.
*/
public function get()
{
return $this->getValue('status');
}

/**
Expand All @@ -131,32 +153,9 @@ public function get()
*/
public function getResult()
{
if(!$this->isTracking()) {
return false;
}

$statusPacket = json_decode(Resque::redis()->get((string)$this), true);
if(!$statusPacket) {
return false;
}

return $statusPacket['result'];
return $this->getValue('result');
}

/**
* Delete the job monitoring from the queue
*
* @return boolean|int
*/
public function del()
{
if(!$this->isTracking()) {
return false;
}

return Resque::redis()->del((string)$this);
}

/**
* Stop tracking the status of a job.
*/
Expand All @@ -172,6 +171,6 @@ public function stop()
*/
public function __toString()
{
return 'job:' . $this->id . ':status';
return 'job:' . $this->prefix . $this->id . ':status';
}
}

0 comments on commit e52ee4c

Please sign in to comment.