Skip to content

Commit

Permalink
Merge pull request #106 from DivineOmega/feature/stop-pool
Browse files Browse the repository at this point in the history
Add ability to stop a pool early
  • Loading branch information
brendt authored Feb 14, 2020
2 parents 9ddad44 + 14708af commit 3af8fa2
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,36 @@ $pool
});
```

### Stopping a pool

If you need to stop a pool early, because the task it was performing has been completed by one
of the child processes, you can use the `$pool->stop()` method. This will prevent the
pool from starting any additional processes.

```php
use Spatie\Async\Pool;

$pool = Pool::create();

// Generate 10k processes generating random numbers
for($i = 0; $i < 10000; $i++) {
$pool->add(function() use ($i) {
return rand(0, 100);

})->then(function($output) use ($pool) {
// If one of them randomly picks 100, end the pool early.
if ($output === 100) {
$pool->stop();
}
});
}

$pool->wait();
```

Note that a pool will be rendered useless after being stopped, and a new pool should be
created if needed.

### Working with tasks

Besides using closures, you can also work with a `Task`.
Expand Down
11 changes: 11 additions & 0 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Pool implements ArrayAccess

protected $status;

protected $stopped = false;

public function __construct()
{
if (static::isSupported()) {
Expand Down Expand Up @@ -162,6 +164,10 @@ public function putInQueue(Runnable $process)

public function putInProgress(Runnable $process)
{
if ($this->stopped) {
return;
}

if ($process instanceof ParallelProcess) {
$process->getProcess()->setTimeout($this->timeout);
}
Expand Down Expand Up @@ -301,4 +307,9 @@ protected function registerListener()
}
});
}

public function stop()
{
$this->stopped = true;
}
}
33 changes: 33 additions & 0 deletions tests/PoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,37 @@ public function it_takes_an_intermediate_callback()

$this->assertTrue($isIntermediateCallbackCalled);
}

/** @test */
public function it_can_be_stopped_early()
{
$concurrency = 20;
$stoppingPoint = $concurrency / 5;

$pool = Pool::create()->concurrency($concurrency);

$maxProcesses = 10000;
$completedProcessesCount = 0;

for ($i = 0; $i < $maxProcesses; $i++) {
$pool->add(function () use ($i) {
return $i;
})->then(function ($output) use ($pool, &$completedProcessesCount, $stoppingPoint) {
$completedProcessesCount++;

if ($output === $stoppingPoint) {
$pool->stop();
}
});
}

$pool->wait();

/**
* Because we are stopping the pool early (during the first set of processes created), we expect
* the number of completed processes to be less than 2 times the defined concurrency.
*/
$this->assertGreaterThanOrEqual($stoppingPoint, $completedProcessesCount);
$this->assertLessThanOrEqual($concurrency * 2, $completedProcessesCount);
}
}

0 comments on commit 3af8fa2

Please sign in to comment.