Skip to content

Commit

Permalink
Merge pull request #12 from spatie/task-support
Browse files Browse the repository at this point in the history
Add Task support
  • Loading branch information
brendt authored Jan 1, 2018
2 parents acce83d + 90f5e8a commit 48b0fa3
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 10 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ foreach ($things as $thing) {
// Do a thing
})->then(function ($output) {
// Handle success
})->catch(function (Throwable $e) {
})->catch(function (Throwable $exception) {
// Handle exception
});
}
Expand Down Expand Up @@ -111,6 +111,33 @@ await($pool);
If an exception is thrown from within a child process, and not caught using the `->catch()` callback,
it will be thrown as `Spatie\Async\ParallelError` when calling `await()` or `$pool->wait()`.

### Working with tasks

Besides using closures, you can also work with a `Task`.
A `Task` is useful in situations where you need more setup work in the child process.
Because a child process is always bootstrapped from nothing, chances are you'll want to initialise eg. the dependency container before executing the task.
The `Task` class makes this easier to do.

```php
use Spatie\Async\Task;

class MyTask extends Task
{
public function configure()
{
// Setup eg. dependency container, load config,...
}

public function execute()
{
// Do the real work here.
}
}

// Add the task to the pool
$pool->add(new MyTask());
```

## Behind the curtains

When using this package, you're probably wondering what's happening underneath the surface.
Expand Down
8 changes: 5 additions & 3 deletions src/Runtime/ChildRuntime.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
<?php

use Spatie\Async\Runtime\ParentRuntime;

try {
$autoloader = $argv[1] ?? null;
$serializedClosure = base64_decode($argv[2] ?? '');
$serializedClosure = $argv[2] ?? null;

if (! $autoloader) {
throw new InvalidArgumentException('No autoloader provided in child process.');
Expand All @@ -18,9 +20,9 @@

require_once $autoloader;

$closure = Opis\Closure\unserialize($serializedClosure);
$task = ParentRuntime::decodeTask($serializedClosure);

$output = call_user_func($closure);
$output = call_user_func($task);

fwrite(STDOUT, base64_encode(serialize($output)));

Expand Down
32 changes: 28 additions & 4 deletions src/Runtime/ParentRuntime.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

namespace Spatie\Async\Runtime;

use Spatie\Async\Task;
use Spatie\Async\ParallelProcess;
use function Opis\Closure\serialize;
use Opis\Closure\SerializableClosure;
use function Opis\Closure\unserialize;
use Symfony\Component\Process\Process;

class ParentRuntime
Expand Down Expand Up @@ -39,21 +41,43 @@ public static function init(string $autoloader = null)
self::$isInitialised = true;
}

public static function createChildProcess(callable $callable): ParallelProcess
/**
* @param \Spatie\Async\Task|callable $task
*
* @return \Spatie\Async\ParallelProcess
*/
public static function createChildProcess($task): ParallelProcess
{
if (! self::$isInitialised) {
self::init();
}

$closure = new SerializableClosure($callable);

$process = new Process(implode(' ', [
'exec php',
self::$childProcessScript,
self::$autoloader,
base64_encode(serialize($closure)),
self::encodeTask($task),
]));

return ParallelProcess::create($process);
}

/**
* @param \Spatie\Async\Task|callable $task
*
* @return string
*/
public static function encodeTask($task): string
{
if (! $task instanceof Task) {
$task = new SerializableClosure($task);
}

return base64_encode(serialize($task));
}

public static function decodeTask(string $task)
{
return unserialize(base64_decode($task));
}
}
9 changes: 9 additions & 0 deletions src/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,14 @@

abstract class Task
{
abstract public function configure();

abstract public function execute();

public function __invoke()
{
$this->configure();

return $this->execute();
}
}
9 changes: 7 additions & 2 deletions src/helpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
use Spatie\Async\Runtime\ParentRuntime;

if (! function_exists('async')) {
function async(callable $callable): ParallelProcess
/**
* @param \Spatie\Async\Task|callable $task
*
* @return \Spatie\Async\ParallelProcess
*/
function async($task): ParallelProcess
{
return ParentRuntime::createChildProcess($callable);
return ParentRuntime::createChildProcess($task);
}
}

Expand Down
20 changes: 20 additions & 0 deletions tests/MyTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Spatie\Async\Tests;

use Spatie\Async\Task;

class MyTask extends Task
{
protected $i = 0;

public function configure()
{
$this->i = 2;
}

public function execute()
{
return $this->i;
}
}
25 changes: 25 additions & 0 deletions tests/PoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Spatie\Async;

use Exception;
use Spatie\Async\Tests\MyTask;
use PHPUnit\Framework\TestCase;
use Spatie\Async\Tests\MyClass;

Expand Down Expand Up @@ -183,4 +184,28 @@ public function it_returns_all_the_output_as_an_array()
$this->assertCount(5, $result);
$this->assertEquals(10, array_sum($result));
}

/** @test */
public function it_can_work_with_tasks()
{
$pool = Pool::create();

$pool[] = async(new MyTask());

$results = await($pool);

$this->assertEquals(2, $results[0]);
}

/** @test */
public function it_can_accept_tasks_with_pool_add()
{
$pool = Pool::create();

$pool->add(new MyTask());

$results = await($pool);

$this->assertEquals(2, $results[0]);
}
}

0 comments on commit 48b0fa3

Please sign in to comment.