Skip to content

Commit

Permalink
enhancements to job status tracking
Browse files Browse the repository at this point in the history
 - add helper methods to support fetching started/updated timestamps
 - do not discard `started` timestamp on update
 - simplify job id mapping
 - sanity check `update()` to accept valid statuses only
  • Loading branch information
Aeon committed Nov 8, 2016
1 parent cf187fa commit b22a5e3
Showing 1 changed file with 74 additions and 16 deletions.
90 changes: 74 additions & 16 deletions lib/Resque/Job/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ class Resque_Job_Status
*/
public function __construct($id)
{
$this->id = $id;
$this->id = self::generateId($id);
}

/**
* generate job id consistently
*/
private static function generateId($id) {
return 'job:' . $id . ':status';
}

/**
Expand All @@ -53,9 +60,9 @@ public static function create($id)
$statusPacket = array(
'status' => self::STATUS_WAITING,
'updated' => time(),
'started' => time(),
'started' => time()
);
Resque::redis()->set('job:' . $id . ':status', json_encode($statusPacket));
Resque::redis()->set(self::generateId($id), json_encode($statusPacket));
}

/**
Expand All @@ -70,7 +77,7 @@ public function isTracking()
return false;
}

if(!Resque::redis()->exists((string)$this)) {
if(!Resque::redis()->exists($this->id)) {
$this->isTracking = false;
return false;
}
Expand All @@ -86,19 +93,26 @@ public function isTracking()
*/
public function update($status)
{
$status = (int)$status;

if(!$this->isTracking()) {
return;
}

if($status < 1 || $status > 4) {
return;
}

$statusPacket = array(
'status' => $status,
'updated' => time(),
'started' => $this->fetch('started')
);
Resque::redis()->set((string)$this, json_encode($statusPacket));
Resque::redis()->set($this->id, json_encode($statusPacket));

// Expire the status for completed jobs after 24 hours
if(in_array($status, self::$completeStatuses)) {
Resque::redis()->expire((string)$this, 86400);
Resque::redis()->expire($this->id, 86400);
}
}

Expand All @@ -110,24 +124,68 @@ public function update($status)
*/
public function get()
{
if(!$this->isTracking()) {
return false;
}
return $this->status();
}

$statusPacket = json_decode(Resque::redis()->get((string)$this), true);
if(!$statusPacket) {
return false;
}
/**
* Fetch the status for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the status as
* as an integer, based on the Resque_Job_Status constants.
*/
public function status()
{
return $this->fetch('status');
}

/**
* Fetch the updated timestamp for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the updated timestamp
*/
public function updated()
{
return $this->fetch('updated');
}

return $statusPacket['status'];
/**
* Fetch the started timestamp for the job being monitored.
*
* @return mixed False if the status is not being monitored, otherwise the created timestamp
*/
public function started()
{
return $this->fetch('started');
}

/**
* Fetch the status packet for the job being monitored.
* @param optional string $field The field to get from the status packet
*
* @return mixed False if the status is not being monitored, otherwise the status packet array or the individual field
*/
private function fetch($field = false)
{
$statusPacket = Resque::redis()->get($this->id);
if($statusPacket) {
$statusPacket = json_decode($statusPacket, true);
if($field) {
if(isset($statusPacket[$field])) {
return (int)$statusPacket[$field];
}
} else {
return $statusPacket;
}
}
return false;
}

/**
* Stop tracking the status of a job.
*/
public function stop()
{
Resque::redis()->del((string)$this);
Resque::redis()->del($this->id);
}

/**
Expand All @@ -137,6 +195,6 @@ public function stop()
*/
public function __toString()
{
return 'job:' . $this->id . ':status';
return $this->id;
}
}

0 comments on commit b22a5e3

Please sign in to comment.