From 13be5ba8b36caa8026fc78853bc1e91813123469 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 6 Feb 2020 00:13:42 +0000 Subject: [PATCH 1/7] Add ability to stop a pool early --- README.md | 26 ++++++++++++++++++++++++++ src/Pool.php | 12 ++++++++++++ 2 files changed, 38 insertions(+) diff --git a/README.md b/README.md index 3b9fdcd1..dd118f59 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,32 @@ $pool }); ``` +### Stopping a pool + +If you need to stop a pool, because thte task it was performing is complete, you can use the +`$pool->stop()` method. This will prevent the pool from starting any additional processes. + +```php +use Spatie\Async\Pool; + +$pool = Pool::create(); + +// Generate 10k processes +for ($i = 0; $i < 10000; $i++) { + $pool->add(function () use ($pool) { + // If one of them randomly picks 100, end the pool early. + if (rand(0, 100) === 100) { + $pool->stop(); + } + }); +} + +$pool->wait(); +``` + +Note that a pool will be rendered useless after being stopped, and a new pool should be +created if needed. + ### Working with tasks Besides using closures, you can also work with a `Task`. diff --git a/src/Pool.php b/src/Pool.php index 043f5cec..9f540072 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -4,6 +4,7 @@ use ArrayAccess; use InvalidArgumentException; +use Spatie\Async\Output\ParallelError; use Spatie\Async\Process\ParallelProcess; use Spatie\Async\Process\Runnable; use Spatie\Async\Process\SynchronousProcess; @@ -37,6 +38,8 @@ class Pool implements ArrayAccess protected $status; + protected $stopped = false; + public function __construct() { if (static::isSupported()) { @@ -162,6 +165,10 @@ public function putInQueue(Runnable $process) public function putInProgress(Runnable $process) { + if ($this->stopped) { + return; + } + if ($process instanceof ParallelProcess) { $process->getProcess()->setTimeout($this->timeout); } @@ -301,4 +308,9 @@ protected function registerListener() } }); } + + public function stop() + { + $this->stopped = true; + } } From 16ab3ed62fbf5040281c64ac6d09d4c38db3b452 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 6 Feb 2020 00:17:02 +0000 Subject: [PATCH 2/7] Remove unused import --- src/Pool.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Pool.php b/src/Pool.php index 9f540072..4af835ca 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -4,7 +4,6 @@ use ArrayAccess; use InvalidArgumentException; -use Spatie\Async\Output\ParallelError; use Spatie\Async\Process\ParallelProcess; use Spatie\Async\Process\Runnable; use Spatie\Async\Process\SynchronousProcess; From 4edf1472e33dddd5f398eb1739dcead51af87828 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 6 Feb 2020 00:19:48 +0000 Subject: [PATCH 3/7] Update pool stopping docs --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index dd118f59..647a6506 100644 --- a/README.md +++ b/README.md @@ -136,8 +136,9 @@ $pool ### Stopping a pool -If you need to stop a pool, because thte task it was performing is complete, you can use the -`$pool->stop()` method. This will prevent the pool from starting any additional processes. +If you need to stop a pool early, because the task it was performing has been completed by one +of the child processes, you can use the `$pool->stop()` method. This will prevent the +pool from starting any additional processes. ```php use Spatie\Async\Pool; From b58a51b1704057d92df0df16e572f2f8a149620b Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 6 Feb 2020 11:58:01 +0000 Subject: [PATCH 4/7] Correct example in readme, and add unit test --- README.md | 11 +++++++---- tests/PoolTest.php | 26 ++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 647a6506..a5b8f77c 100644 --- a/README.md +++ b/README.md @@ -145,11 +145,14 @@ use Spatie\Async\Pool; $pool = Pool::create(); -// Generate 10k processes -for ($i = 0; $i < 10000; $i++) { - $pool->add(function () use ($pool) { +// Generate 10k processes generating random numbers +for($i = 0; $i < 10000; $i++) { + $pool->add(function() use ($i) { + return rand(0, 100); + + })->then(function($output) use ($pool) { // If one of them randomly picks 100, end the pool early. - if (rand(0, 100) === 100) { + if ($output === 100) { $pool->stop(); } }); diff --git a/tests/PoolTest.php b/tests/PoolTest.php index b0002189..763acdc2 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -288,4 +288,30 @@ public function it_takes_an_intermediate_callback() $this->assertTrue($isIntermediateCallbackCalled); } + + /** @test */ + public function it_can_be_stopped_early() + { + $pool = Pool::create(); + + $maxProcesses = 10000; + $completedProcessesCount = 0; + + for($i = 0; $i < $maxProcesses; $i++) { + $pool->add(function() use ($i) { + return rand(0, 100); + + })->then(function($output) use ($pool, &$completedProcessesCount) { + $completedProcessesCount++; + + if ($output === 100) { + $pool->stop(); + } + }); + } + + $pool->wait(); + + $this->assertLessThan($maxProcesses, $completedProcessesCount); + } } From c0a850d075f250fe9fa5ddfd4a879f42afea8981 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 6 Feb 2020 11:59:11 +0000 Subject: [PATCH 5/7] Style fixes --- tests/PoolTest.php | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/PoolTest.php b/tests/PoolTest.php index 763acdc2..da163eae 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -297,11 +297,10 @@ public function it_can_be_stopped_early() $maxProcesses = 10000; $completedProcessesCount = 0; - for($i = 0; $i < $maxProcesses; $i++) { - $pool->add(function() use ($i) { + for ($i = 0; $i < $maxProcesses; $i++) { + $pool->add(function () use ($i) { return rand(0, 100); - - })->then(function($output) use ($pool, &$completedProcessesCount) { + })->then(function ($output) use ($pool, &$completedProcessesCount) { $completedProcessesCount++; if ($output === 100) { From cd382264ffa124c2b31d1fa5b00f568ef4512277 Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 13 Feb 2020 21:33:17 +0000 Subject: [PATCH 6/7] Make stopping pool early test more predictable --- tests/PoolTest.php | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/PoolTest.php b/tests/PoolTest.php index da163eae..b7735af3 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -292,18 +292,20 @@ public function it_takes_an_intermediate_callback() /** @test */ public function it_can_be_stopped_early() { - $pool = Pool::create(); + $concurrency = 20; + + $pool = Pool::create()->concurrency($concurrency); $maxProcesses = 10000; $completedProcessesCount = 0; for ($i = 0; $i < $maxProcesses; $i++) { $pool->add(function () use ($i) { - return rand(0, 100); - })->then(function ($output) use ($pool, &$completedProcessesCount) { + return $i; + })->then(function ($output) use ($pool, &$completedProcessesCount, $concurrency) { $completedProcessesCount++; - if ($output === 100) { + if ($output === $concurrency / 4) { $pool->stop(); } }); @@ -311,6 +313,11 @@ public function it_can_be_stopped_early() $pool->wait(); - $this->assertLessThan($maxProcesses, $completedProcessesCount); + /** + * Because we are stopping the pool early (during the first set of processes created), we expect + * the number of completed processes to be within 1 and 2 times the defined concurrency. + */ + $this->assertGreaterThanOrEqual($concurrency * 1, $completedProcessesCount); + $this->assertLessThanOrEqual($concurrency * 2, $completedProcessesCount); } } From 14708af6df2f61f65bf2f77e7e6d7d4bed9ae07d Mon Sep 17 00:00:00 2001 From: Jordan Hall Date: Thu, 13 Feb 2020 21:41:21 +0000 Subject: [PATCH 7/7] Fix reliability of stopping pool early test --- tests/PoolTest.php | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/PoolTest.php b/tests/PoolTest.php index b7735af3..f76d4025 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -293,6 +293,7 @@ public function it_takes_an_intermediate_callback() public function it_can_be_stopped_early() { $concurrency = 20; + $stoppingPoint = $concurrency / 5; $pool = Pool::create()->concurrency($concurrency); @@ -302,10 +303,10 @@ public function it_can_be_stopped_early() for ($i = 0; $i < $maxProcesses; $i++) { $pool->add(function () use ($i) { return $i; - })->then(function ($output) use ($pool, &$completedProcessesCount, $concurrency) { + })->then(function ($output) use ($pool, &$completedProcessesCount, $stoppingPoint) { $completedProcessesCount++; - if ($output === $concurrency / 4) { + if ($output === $stoppingPoint) { $pool->stop(); } }); @@ -315,9 +316,9 @@ public function it_can_be_stopped_early() /** * Because we are stopping the pool early (during the first set of processes created), we expect - * the number of completed processes to be within 1 and 2 times the defined concurrency. + * the number of completed processes to be less than 2 times the defined concurrency. */ - $this->assertGreaterThanOrEqual($concurrency * 1, $completedProcessesCount); + $this->assertGreaterThanOrEqual($stoppingPoint, $completedProcessesCount); $this->assertLessThanOrEqual($concurrency * 2, $completedProcessesCount); } }