Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status support #16

Merged
merged 11 commits into from
Jan 2, 2018
22 changes: 21 additions & 1 deletion src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ class Pool implements ArrayAccess
/** @var \Spatie\Async\ParallelProcess[] */
protected $failed = [];

/** @var \Spatie\Async\ParallelProcess[] */
protected $timeouts = [];

protected $results = [];

protected $status;

public function __construct()
{
$this->registerListener();

$this->status = new PoolStatus($this);
}

/**
Expand Down Expand Up @@ -157,7 +164,7 @@ public function markAsTimedOut(ParallelProcess $process)

unset($this->inProgress[$process->getPid()]);

$this->failed[$process->getPid()] = $process;
$this->timeouts[$process->getPid()] = $process;

$this->notify();
}
Expand Down Expand Up @@ -211,6 +218,19 @@ public function getFailed(): array
return $this->failed;
}

/**
* @return \Spatie\Async\ParallelProcess[]
*/
public function getTimeouts(): array
{
return $this->timeouts;
}

public function status(): PoolStatus
{
return $this->status;
}

protected function registerListener()
{
pcntl_async_signals(true);
Expand Down
52 changes: 52 additions & 0 deletions src/PoolStatus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?php

namespace Spatie\Async;

use Spatie\Async\Output\SerializableException;

class PoolStatus
{
protected $pool;

public function __construct(Pool $pool)
{
$this->pool = $pool;
}

public function __toString(): string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

{
return $this->lines(
$this->summaryToString(),
$this->failedToString()
);
}

protected function lines(string ...$lines): string
{
return implode(PHP_EOL, $lines);
}

protected function summaryToString(): string
{
$finished = $this->pool->getFinished();
$failed = $this->pool->getFailed();
$timeouts = $this->pool->getTimeouts();

return 'finished: '.count($finished)
.' - failed: '.count($failed)
.' - timeout: '.count($timeouts);
}

protected function failedToString(): string
{
return (string) array_reduce($this->pool->getFailed(), function ($currentStatus, ParallelProcess $process) {
$output = $process->getErrorOutput();

if ($output instanceof SerializableException) {
$output = get_class($output->asThrowable()).': '.$output->asThrowable()->getMessage();
}

return $this->lines((string) $currentStatus, "{$process->getPid()} failed with {$output}");
});
}
}
60 changes: 60 additions & 0 deletions tests/PoolStatusTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace Spatie\Async;

use Exception;
use Spatie\Async\Tests\MyTask;
use PHPUnit\Framework\TestCase;

class PoolStatusTest extends TestCase
{
/** @test */
public function it_can_show_a_textual_status()
{
$pool = Pool::create();

$pool->add(new MyTask());

$this->assertContains('finished: 0', (string) $pool->status());

await($pool);

$this->assertContains('finished: 1', (string) $pool->status());
}

/** @test */
public function it_can_show_a_textual_failed_status()
{
$pool = Pool::create();

foreach (range(1, 5) as $i) {
$pool->add(function () {
throw new Exception('Test');
})->catch(function () {
// Do nothing
});
}

$pool->wait();

$this->assertContains('finished: 0', (string) $pool->status());
$this->assertContains('failed: 5', (string) $pool->status());
$this->assertContains('failed with Exception: Test', (string) $pool->status());
}

/** @test */
public function it_can_show_timeout_status()
{
$pool = Pool::create()->timeout(0);

foreach (range(1, 5) as $i) {
$pool->add(function () {
sleep(1000);
});
}

$pool->wait();

$this->assertContains('timeout: 5', (string) $pool->status());
}
}
14 changes: 7 additions & 7 deletions tests/PoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function it_can_run_processes_in_parallel()

$executionTime = $endTime - $startTime;

$this->assertLessThan(0.2, $executionTime, "Execution time was {$executionTime}, expected less than 0.2.");
$this->assertLessThan(0.2, $executionTime, "Execution time was {$executionTime}, expected less than 0.2.\n".(string) $pool->status());
}

/** @test */
Expand All @@ -59,7 +59,7 @@ public function it_can_handle_success()

$pool->wait();

$this->assertEquals(10, $counter);
$this->assertEquals(10, $counter, (string) $pool->status());
}

/** @test */
Expand All @@ -80,7 +80,7 @@ public function it_can_handle_timeout()

$pool->wait();

$this->assertEquals(5, $counter);
$this->assertEquals(5, $counter, (string) $pool->status());
}

/** @test */
Expand All @@ -98,7 +98,7 @@ public function it_can_handle_exceptions()

$pool->wait();

$this->assertCount(5, $pool->getFailed());
$this->assertCount(5, $pool->getFailed(), (string) $pool->status());
}

/** @test */
Expand All @@ -121,8 +121,8 @@ public function it_can_handle_a_maximum_of_concurrent_processes()

$executionTime = $endTime - $startTime;

$this->assertGreaterThanOrEqual(2, $executionTime, "Execution time was {$executionTime}, expected more than 2.");
$this->assertCount(3, $pool->getFinished());
$this->assertGreaterThanOrEqual(2, $executionTime, "Execution time was {$executionTime}, expected more than 2.\n".(string) $pool->status());
$this->assertCount(3, $pool->getFinished(), (string) $pool->status());
}

/** @test */
Expand All @@ -144,7 +144,7 @@ public function it_works_with_helper_functions()

await($pool);

$this->assertEquals(10, $counter);
$this->assertEquals(10, $counter, (string) $pool->status());
}

/** @test */
Expand Down