diff --git a/README.md b/README.md index 3b9fdcd1..a5b8f77c 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,36 @@ $pool }); ``` +### Stopping a pool + +If you need to stop a pool early, because the task it was performing has been completed by one +of the child processes, you can use the `$pool->stop()` method. This will prevent the +pool from starting any additional processes. + +```php +use Spatie\Async\Pool; + +$pool = Pool::create(); + +// Generate 10k processes generating random numbers +for($i = 0; $i < 10000; $i++) { + $pool->add(function() use ($i) { + return rand(0, 100); + + })->then(function($output) use ($pool) { + // If one of them randomly picks 100, end the pool early. + if ($output === 100) { + $pool->stop(); + } + }); +} + +$pool->wait(); +``` + +Note that a pool will be rendered useless after being stopped, and a new pool should be +created if needed. + ### Working with tasks Besides using closures, you can also work with a `Task`. diff --git a/src/Pool.php b/src/Pool.php index 043f5cec..4af835ca 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -37,6 +37,8 @@ class Pool implements ArrayAccess protected $status; + protected $stopped = false; + public function __construct() { if (static::isSupported()) { @@ -162,6 +164,10 @@ public function putInQueue(Runnable $process) public function putInProgress(Runnable $process) { + if ($this->stopped) { + return; + } + if ($process instanceof ParallelProcess) { $process->getProcess()->setTimeout($this->timeout); } @@ -301,4 +307,9 @@ protected function registerListener() } }); } + + public function stop() + { + $this->stopped = true; + } } diff --git a/tests/PoolTest.php b/tests/PoolTest.php index b0002189..f76d4025 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -288,4 +288,37 @@ public function it_takes_an_intermediate_callback() $this->assertTrue($isIntermediateCallbackCalled); } + + /** @test */ + public function it_can_be_stopped_early() + { + $concurrency = 20; + $stoppingPoint = $concurrency / 5; + + $pool = Pool::create()->concurrency($concurrency); + + $maxProcesses = 10000; + $completedProcessesCount = 0; + + for ($i = 0; $i < $maxProcesses; $i++) { + $pool->add(function () use ($i) { + return $i; + })->then(function ($output) use ($pool, &$completedProcessesCount, $stoppingPoint) { + $completedProcessesCount++; + + if ($output === $stoppingPoint) { + $pool->stop(); + } + }); + } + + $pool->wait(); + + /** + * Because we are stopping the pool early (during the first set of processes created), we expect + * the number of completed processes to be less than 2 times the defined concurrency. + */ + $this->assertGreaterThanOrEqual($stoppingPoint, $completedProcessesCount); + $this->assertLessThanOrEqual($concurrency * 2, $completedProcessesCount); + } }