diff --git a/HOWITWORKS.md b/HOWITWORKS.md index ec85fa37..3aa18b4c 100644 --- a/HOWITWORKS.md +++ b/HOWITWORKS.md @@ -101,47 +101,50 @@ How do the workers process the queues? 8. `Resque_Job->fail()` returns control to the worker (still in `Resque_Worker::work()`) without a value * Job - 1. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its + 1. `Resque_Job_PID` is created, registering the PID of the actual process + doing the job. + 2. The job calls `Resque_Worker->perform()` with the `Resque_Job` as its only argument. - 2. `Resque_Worker->perform()` sets up a `try...catch` block so it can + 3. `Resque_Worker->perform()` sets up a `try...catch` block so it can properly handle exceptions by marking jobs as failed (by calling `Resque_Job->fail()`, as above) - 3. Inside the `try...catch`, `Resque_Worker->perform()` triggers an + 4. Inside the `try...catch`, `Resque_Worker->perform()` triggers an `afterFork` event - 4. Still inside the `try...catch`, `Resque_Worker->perform()` calls + 5. Still inside the `try...catch`, `Resque_Worker->perform()` calls `Resque_Job->perform()` with no arguments - 5. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no + 6. `Resque_Job->perform()` calls `Resque_Job->getInstance()` with no arguments - 6. If `Resque_Job->getInstance()` has already been called, it returns the + 7. If `Resque_Job->getInstance()` has already been called, it returns the existing instance; otherwise: - 7. `Resque_Job->getInstance()` checks that the job's class (type) exists + 8. `Resque_Job->getInstance()` checks that the job's class (type) exists and has a `perform()` method; if not, in either case, it throws an exception which will be caught by `Resque_Worker->perform()` - 8. `Resque_Job->getInstance()` creates an instance of the job's class, and + 9. `Resque_Job->getInstance()` creates an instance of the job's class, and initializes it with a reference to the `Resque_Job` itself, the job's arguments (which it gets by calling `Resque_Job->getArguments()`, which in turn simply returns the value of `args[0]`, or an empty array if no arguments were passed), and the queue name - 9. `Resque_Job->getInstance()` returns control, along with the job class + 10. `Resque_Job->getInstance()` returns control, along with the job class instance, to `Resque_Job->perform()` - 10. `Resque_Job->perform()` sets up its own `try...catch` block to handle + 11. `Resque_Job->perform()` sets up its own `try...catch` block to handle `Resque_Job_DontPerform` exceptions; any other exceptions are passed up to `Resque_Worker->perform()` - 11. `Resque_Job->perform()` triggers a `beforePerform` event - 12. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists - 13. `Resque_Job->perform()` calls `perform()` on the instance - 14. `Resque_Job->perform()` calls `tearDown()` on the instance, if it + 12. `Resque_Job->perform()` triggers a `beforePerform` event + 13. `Resque_Job->perform()` calls `setUp()` on the instance, if it exists + 14. `Resque_Job->perform()` calls `perform()` on the instance + 15. `Resque_Job->perform()` calls `tearDown()` on the instance, if it exists - 15. `Resque_Job->perform()` triggers an `afterPerform` event - 16. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` + 16. `Resque_Job->perform()` triggers an `afterPerform` event + 17. The `try...catch` block ends, suppressing `Resque_Job_DontPerform` exceptions by returning control, and the value `FALSE`, to `Resque_Worker->perform()`; any other situation returns the value `TRUE` along with control, instead - 17. The `try...catch` block in `Resque_Worker->perform()` ends - 18. `Resque_Worker->perform()` updates the job status from `RUNNING` to + 18. The `try...catch` block in `Resque_Worker->perform()` ends + 19. `Resque_Worker->perform()` updates the job status from `RUNNING` to `COMPLETE`, then returns control, with no value, to the worker (again still in `Resque_Worker::work()`) - 19. `Resque_Worker::work()` calls `exit(0)` to terminate the job process + 20. `Resque_Job_PID()` is removed, the forked process will terminate soon + 21. `Resque_Worker::work()` calls `exit(0)` to terminate the job process cleanly * SPECIAL CASE: Non-forking OS (Windows) 1. Same as the job above, except it doesn't call `exit(0)` when done diff --git a/README.md b/README.md index cd8f83fe..d00c59d4 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,19 @@ or failed, and are then automatically expired. A status can also forcefully be expired by calling the `stop()` method on a status class. +### Obtaining job PID ### + +You can obtain the PID of the actual process doing the work through `Resque_Job_PID`. On a forking OS this will be the +PID of the forked process. + +CAUTION: on a non-forking OS, the PID returned will be of the worker itself. + +```php +echo Resque_Job_PID::get($token); +``` + +Function returns `0` if the `perform` hasn't started yet, or if it has already ended. + ## Workers ## Workers work in the exact same way as the Ruby workers. For complete diff --git a/lib/Resque/Job/PID.php b/lib/Resque/Job/PID.php new file mode 100644 index 00000000..f72710dc --- /dev/null +++ b/lib/Resque/Job/PID.php @@ -0,0 +1,43 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Job_PID +{ + /** + * Create a new PID tracker item for the supplied job ID. + * + * @param string $id The ID of the job to track the PID of. + */ + public static function create($id) + { + Resque::redis()->set('job:' . $id . ':pid', (string)getmypid()); + } + + /** + * Fetch the PID for the process actually executing the job. + * + * @param string $id The ID of the job to get the PID of. + * + * @return int PID of the process doing the job (on non-forking OS, PID of the worker, otherwise forked PID). + */ + public static function get($id) + { + return (int)Resque::redis()->get('job:' . $id . ':pid'); + } + + /** + * Remove the PID tracker for the job. + * + * @param string $id The ID of the job to remove the tracker from. + */ + public static function del($id) + { + Resque::redis()->del('job:' . $id . ':pid'); + } +} + diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 04714c1c..9acce78f 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -199,7 +199,17 @@ public function work($interval = Resque::DEFAULT_INTERVAL, $blocking = false) $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); $this->updateProcLine($status); $this->logger->log(Psr\Log\LogLevel::INFO, $status); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::create($job->payload['id']); + } + $this->perform($job); + + if(!empty($job->payload['id'])) { + Resque_Job_PID::del($job->payload['id']); + } + if ($this->child === 0) { exit(0); } @@ -394,6 +404,13 @@ public function shutdownNow() $this->killChild(); } + /** + * @return int Child process PID. + */ + public function getChildPID() { + return $this->child; + } + /** * Kill a forked child job immediately. The job it is processing will not * be completed. diff --git a/test/Resque/Tests/JobPIDTest.php b/test/Resque/Tests/JobPIDTest.php new file mode 100644 index 00000000..43aa6ee3 --- /dev/null +++ b/test/Resque/Tests/JobPIDTest.php @@ -0,0 +1,47 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_Tests_JobPIDTest extends Resque_Tests_TestCase +{ + /** + * @var \Resque_Worker + */ + protected $worker; + + public function setUp() + { + parent::setUp(); + + // Register a worker to test with + $this->worker = new Resque_Worker('jobs'); + $this->worker->setLogger(new Resque_Log()); + } + + public function testQueuedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } + + public function testRunningJobReturnsPID() + { + // Cannot use InProgress_Job on non-forking OS. + if(!function_exists('pcntl_fork')) return; + + $token = Resque::enqueue('jobs', 'InProgress_Job', null, true); + $this->worker->work(0); + $this->assertNotEquals(0, Resque_Job_PID::get($token)); + } + + public function testFinishedJobDoesNotReturnPID() + { + $token = Resque::enqueue('jobs', 'Test_Job', null, true); + $this->worker->work(0); + $this->assertEquals(0, Resque_Job_PID::get($token)); + } +} diff --git a/test/bootstrap.php b/test/bootstrap.php index a4b68377..d28137f1 100644 --- a/test/bootstrap.php +++ b/test/bootstrap.php @@ -109,6 +109,24 @@ public function perform() } } +/** + * This job exits the forked worker process, which simulates the job being (forever) in progress, + * so that we can verify the state of the system for "running jobs". Does not work on a non-forking OS. + * + * CAUTION Use this test job only with Worker::work, i.e. only when you actually trigger the fork in tests. + */ +class InProgress_Job +{ + public function perform() + { + if(!function_exists('pcntl_fork')) { + // We can't lose the worker on a non-forking OS. + throw new Failing_Job_Exception('Do not use InProgress_Job for tests on non-forking OS!'); + } + exit(0); + } +} + class Test_Job_Without_Perform_Method {