Skip to content

Commit

Permalink
Introduce Resque_Job_PID so a process PID can be obtained.
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz committed Mar 31, 2017
1 parent cf187fa commit c38f047
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 23 deletions.
41 changes: 22 additions & 19 deletions HOWITWORKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,47 +101,50 @@ How do the workers process the queues?
8. `Resque_Job->fail()` returns control to the worker (still in
`Resque_Worker::work()`) without a value
* Job
1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
1. `Resque_Job_PID` is created, registering the PID of the actual process
doing the job.
2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its
only argument.
2. `Resque_Worker->perform()` sets up a `try...catch` block so it can
3. `Resque_Worker->perform()` sets up a `try...catch` block so it can
properly handle exceptions by marking jobs as failed (by calling
`Resque_Job->fail()`, as above)
3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an
`afterFork` event
4. Still inside the `try...catch`, `Resque_Worker->perform()` calls
5. Still inside the `try...catch`, `Resque_Worker->perform()` calls
`Resque_Job->perform()` with no arguments
5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no
arguments
6. If `Resque_Job->getInstance()` has already been called, it returns the
7. If `Resque_Job->getInstance()` has already been called, it returns the
existing instance; otherwise:
7. `Resque_Job->getInstance()` checks that the job's class (type) exists
8. `Resque_Job->getInstance()` checks that the job's class (type) exists
and has a `perform()` method; if not, in either case, it throws an
exception which will be caught by `Resque_Worker->perform()`
8. `Resque_Job->getInstance()` creates an instance of the job's class, and
9. `Resque_Job->getInstance()` creates an instance of the job's class, and
initializes it with a reference to the `Resque_Job` itself, the job's
arguments (which it gets by calling `Resque_Job->getArguments()`, which
in turn simply returns the value of `args[0]`, or an empty array if no
arguments were passed), and the queue name
9. `Resque_Job->getInstance()` returns control, along with the job class
10. `Resque_Job->getInstance()` returns control, along with the job class
instance, to `Resque_Job->perform()`
10. `Resque_Job->perform()` sets up its own `try...catch` block to handle
11. `Resque_Job->perform()` sets up its own `try...catch` block to handle
`Resque_Job_DontPerform` exceptions; any other exceptions are passed
up to `Resque_Worker->perform()`
11. `Resque_Job->perform()` triggers a `beforePerform` event
12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
13. `Resque_Job->perform()` calls `perform()` on the instance
14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
12. `Resque_Job->perform()` triggers a `beforePerform` event
13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists
14. `Resque_Job->perform()` calls `perform()` on the instance
15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it
exists
15. `Resque_Job->perform()` triggers an `afterPerform` event
16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
16. `Resque_Job->perform()` triggers an `afterPerform` event
17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform`
exceptions by returning control, and the value `FALSE`, to
`Resque_Worker->perform()`; any other situation returns the value
`TRUE` along with control, instead
17. The `try...catch` block in `Resque_Worker->perform()` ends
18. `Resque_Worker->perform()` updates the job status from `RUNNING` to
18. The `try...catch` block in `Resque_Worker->perform()` ends
19. `Resque_Worker->perform()` updates the job status from `RUNNING` to
`COMPLETE`, then returns control, with no value, to the worker (again
still in `Resque_Worker::work()`)
19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
20. `Resque_Job_PID()` is removed, the forked process will terminate soon
21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process
cleanly
* SPECIAL CASE: Non-forking OS (Windows)
1. Same as the job above, except it doesn't call `exit(0)` when done
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,19 @@ or failed, and are then automatically expired. A status can also
forcefully be expired by calling the `stop()` method on a status
class.

### Obtaining job PID ###

You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the
PID of the forked process.

CAUTION: on a non-forking OS, the PID returned will be of the worker itself.

```php
echo Resque_Job_PID::get($token);
```

Function returns `0` if the `perform` hasn't started yet, or if it has already ended.

## Workers ##

Workers work in the exact same way as the Ruby workers. For complete
Expand Down
43 changes: 43 additions & 0 deletions lib/Resque/Job/PID.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php
/**
* PID tracker for the forked worker job.
*
* @package Resque/Job
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Job_PID
{
/**
* Create a new PID tracker item for the supplied job ID.
*
* @param string $id The ID of the job to track the PID of.
*/
public static function create($id)
{
Resque::redis()->set('job:' . $id . ':pid', (string)getmypid());
}

/**
* Fetch the PID for the process actually executing the job.
*
* @param string $id The ID of the job to get the PID of.
*
* @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID).
*/
public static function get($id)
{
return (int)Resque::redis()->get('job:' . $id . ':pid');
}

/**
* Remove the PID tracker for the job.
*
* @param string $id The ID of the job to remove the tracker from.
*/
public static function del($id)
{
Resque::redis()->del('job:' . $id . ':pid');
}
}

17 changes: 17 additions & 0 deletions lib/Resque/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false)
$status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T');
$this->updateProcLine($status);
$this->logger->log(Psr\Log\LogLevel::INFO, $status);

if(!empty($job->payload['id'])) {
Resque_Job_PID::create($job->payload['id']);
}

$this->perform($job);

if(!empty($job->payload['id'])) {
Resque_Job_PID::del($job->payload['id']);
}

if ($this->child === 0) {
exit(0);
}
Expand Down Expand Up @@ -394,6 +404,13 @@ public function shutdownNow()
$this->killChild();
}

/**
* @return int Child process PID.
*/
public function getChildPID() {
return $this->child;
}

/**
* Kill a forked child job immediately. The job it is processing will not
* be completed.
Expand Down
47 changes: 47 additions & 0 deletions test/Resque/Tests/JobPIDTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php
/**
* Resque_Job_PID tests.
*
* @package Resque/Tests
* @author Chris Boulton <chris@bigcommerce.com>
* @license http://www.opensource.org/licenses/mit-license.php
*/
class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase
{
/**
* @var \Resque_Worker
*/
protected $worker;

public function setUp()
{
parent::setUp();

// Register a worker to test with
$this->worker = new Resque_Worker('jobs');
$this->worker->setLogger(new Resque_Log());
}

public function testQueuedJobDoesNotReturnPID()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$this->assertEquals(0, Resque_Job_PID::get($token));
}

public function testRunningJobReturnsPID()
{
// Cannot use InProgress_Job on non-forking OS.
if(!function_exists('pcntl_fork')) return;

$token = Resque::enqueue('jobs', 'InProgress_Job', null, true);
$this->worker->work(0);
$this->assertNotEquals(0, Resque_Job_PID::get($token));
}

public function testFinishedJobDoesNotReturnPID()
{
$token = Resque::enqueue('jobs', 'Test_Job', null, true);
$this->worker->work(0);
$this->assertEquals(0, Resque_Job_PID::get($token));
}
}
8 changes: 4 additions & 4 deletions test/Resque/Tests/JobStatusTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
*/
class Resque_Tests_JobStatusTest extends Resque_Tests_TestCase
{
/**
* @var \Resque_Worker
*/
protected $worker;
/**
* @var \Resque_Worker
*/
protected $worker;

public function setUp()
{
Expand Down
18 changes: 18 additions & 0 deletions test/bootstrap.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ public function perform()
}
}

/**
* This job exits the forked worker process, which simulates the job being (forever) in progress,
* so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS.
*
* CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests.
*/
class InProgress_Job
{
public function perform()
{
if(!function_exists('pcntl_fork')) {
// We can't lose the worker on a non-forking OS.
throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!');
}
exit(0);
}
}

class Test_Job_Without_Perform_Method
{

Expand Down

0 comments on commit c38f047

Please sign in to comment.