An simply uv_spawn
or proc-open
wrapper API to execute and manage a Pool of child-processes, achieving parallel/asynchronous PHP for Blocking I/O.
- Installation
- Usage
- Channels Transfer messages between Child and Parent
- Event hooks
- Parallel
- Parallel Configuration
- Behind the curtains
- Differences with original author's "Spatie/Async"
- How to integrate into your project/package
- Error handling
- Contributing
- License
This package uses features of libuv
, the PHP extension ext-uv of the Node.js library. It's uv_spawn
function is used to launch processes. The performance it a much better alternative to pcntl-extension, or the use of proc_open
. This package will fallback to use [symfony/process], if libuv
is not installed.
This package is part of our symplely/coroutine package for handling any blocking i/o process, that can not be handled by Coroutine natively.
To learn more about libuv features read the online tutorial book.
The terminology in versions 3x and above was changed to be inline with ext-parallel
extension usage, and to behave as a Thread
, but without many of that library extension's limitations.
The Channeled
and Future
classes are both designed in a way to be extend from to create your own implementation of a Parallel
based library. Currently libuv
will be required to get full benefits of the implementation.
composer require symplely/spawn
This package will use libuv features if available. Do one of the following to install.
For Debian like distributions, Ubuntu...
apt-get install libuv1-dev php-pear -y
For RedHat like distributions, CentOS...
yum install libuv-devel php-pear -y
Now have Pecl auto compile, install, and setup.
pecl channel-update pecl.php.net
pecl install uv-beta
For Windows there is good news, native async thru libuv
has arrived.
Windows builds for stable PHP versions are available from PECL.
Directly download latest from https://windows.php.net/downloads/pecl/releases/uv/
Extract libuv.dll
to same directory as PHP
binary executable, and extract php_uv.dll
to ext\
directory.
Enable extension php_sockets.dll
and php_uv.dll
in php.ini
cd C:\Php
Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.2-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
#Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.3-nts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
#Invoke-WebRequest "https://windows.php.net/downloads/pecl/releases/uv/0.2.4/php_uv-0.2.4-7.4-ts-vc15-x64.zip" -OutFile "php_uv-0.2.4.zip"
7z x -y php_uv-0.2.4.zip libuv.dll php_uv.dll
copy php_uv.dll ext\php_uv.dll
del php_uv.dll
del php_uv-0.2.4.zip
echo extension=php_sockets.dll >> php.ini
echo extension=php_uv.dll >> php.ini
include 'vendor/autoload.php';
use Async\Spawn\Spawn;
// Shows output by default and Channel instance is extracted from args.
$future = \parallel($function, ...$args)
// Shows output by default, turns on yield usage, can include additional file, and the Channel instance is extracted from args.
$future = \paralleling($function, $includeFile, ...$args)
// Or Does not show output by default and channel instance has to be explicitly passed in.
$future = \spawn($function, $timeout, $channel)
// Or Show output by default and channel instance has to be explicitly passed in.
$future = \spawning($function, $timeout, $channel)
// Or
$future = Spawn::create(function () use ($thing) {
// Do a thing
}, $timeout, $channel)
->then(function ($output) {
// Handle success
})->catch(function (\Throwable $exception) {
// Handle exception
});
// Wait for `Future` to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
// Second option can be used to set to display child output, default is false
\spawn_run($future, true);
// Or same as
$future->displayOn()->run();
// Or
$future->run();
The feature has been completely redesigned to behave similar to PHP ext-parallel extension.
See the Channel page for real examples.
include 'vendor/autoload.php';
use Async\Spawn\Channeled as Channel;
$channel = Channel::make("io");
// Shows output by default and Channel instance is extracted for args.
$future = parallel(function ($channel) {
$channel = Channel::open($channel);
for ($count = 0; $count <= 10; $count++) {
$channel->send($count);
}
echo 'pingpangpong';
$channel->send(false);
return 'return whatever';
}, (string) $channel);
while (($value = $channel->recv()) !== false) {
var_dump($value);
}
echo \spawn_output($future); // pingpangpong
// Or
echo \spawn_result($future); // return whatever
// Or
echo $future->getResult(); // return whatever
When creating asynchronous processes, you'll get an instance of FutureInterface
returned.
You can add the following event callback hooks on a Future
process.
// Shows output by default and Channel instance is extracted for args.
$future = parallel($function, ...$args)
// Or
$future = spawn($function, $timeout, $channel)
// Or
$future = Spawn::create(function () {
// The second argument is optional, Defaults no timeout,
// it sets The maximum amount of time a process may take to finish in seconds
// The third is the Channel instance pass to Future subprocess.
return `whatever`|Object|Closure|; // `whatever` will be encoded, then decoded by parent.
}, int $timeout = 0 , $input = null)
->then(function ($result) {
// On success, `$result` is returned by the process.
})
->catch(function ($exception) {
// When an exception is thrown from within a process, it's caught and passed here.
})
->timeout(function () {
// When an timeout is reached, it's caught and passed here.
})
->progress(function ($type, $data) {
// Live progressing of output: `$type, $data` is returned by the Future process.
// $type is `ERR` for stderr, or `OUT` for stdout.
})
->signal($signal, function ($signal) {
// The process will be sent termination `signal` and stopped.
// When an signal is triggered, it's caught and passed here.
// This feature is only available using `libuv`.
});
->then(function ($result) {
// On success, `$result` is returned by the Future process or callable you passed.
//
}, function ($catchException) {
//
}, function ($progressOutput) {
//
}
);
// To turn on displaying of child output.
->displayOn();
// Stop displaying child output.
->displayOff();
// A `Future` process can be retried.
->restart();
// Wait for `Future` to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
->run();
The Parallel class is used to manage a Pool of Future's
. The same Event hooks and Error handling are available.
include 'vendor/autoload.php';
use Async\Spawn\Parallel;
$parallel = new Parallel();
foreach ($things as $thing) {
// the second argument `optional`, can set the maximum amount of time a process may take to finish in seconds.
$parallel->add(function () use ($thing) {
// Do a thing
}, $optional)->then(function ($output) {
// Handle success
// On success, `$output` is returned by the process or callable you passed to the queue.
})->catch(function (\Throwable $exception) {
// Handle exception
// When an exception is thrown from within a process, it's caught and passed here.
});
}
// Wait for Parallel `Future` Pool to terminate. Note this should only be executed for local testing only.
// Use "How to integrate into your project/package" section instead.
$parallel->wait();
You're free to create as many parallel process pools as you want, each parallel pool has its own queue of processes it will handle.
A parallel pool is configurable by the developer:
use Async\Spawn\Parallel;
$parallel = (new Parallel())
// The maximum amount of processes which can run simultaneously.
->concurrency(20)
// Configure how long the loop should sleep before re-checking the process statuses in milliseconds.
->sleepTime(50000);
This package using uv_spawn
, and proc_open
as a fallback, to create and manage a pool of child processes in PHP. By creating child processes on the fly, we're able to execute PHP scripts in parallel. This parallelism can improve performance significantly when dealing with multiple Synchronous I/O tasks, which don't really need to wait for each other.
By giving these tasks a separate process to run on, the underlying operating system can take care of running them in parallel.
The Parallel
class provided by this package takes care of handling as many processes as you want by scheduling and running them when it's possible. When multiple processes are spawned, each can have a separate time to completion.
Waiting for all processes is done by using uv_run
, or basic child process polling
which will monitor until all processes are finished.
When a process is finished, its success event is triggered, which you can hook into with the ->then()
function.
When a process fails, an error event is triggered, which you can hook into with the ->catch()
function.
When a process times out, an timeout event is triggered, which you can hook into with the ->timeout()
function.
Then the iterations will update that process's status and move on.
This package differs from original author's spatie/async implementations:
- The
Runnable
class is Future with expanded capabilities. - The
Pool
class is Parallel with some features extracted into another class FutureHandler. - The
ParentRuntime
class is Spawn that can accept astring
command line action toexecute
, returns a Future. - The
async
function is spawn with additional spawning that will display any child process output. - Removed output limit, no timeout unless set per
Future
, added all Symfony Process features. - Not
Linux
orCLI
only, runs the same in Web environment under Windows and Apple macOS too. - Added a Event Loop library
libuv
support, it's now the main usage model, fallback toproc-open
Process if not installed. - Libuv allows more direct Channel message exchange, same is done with
proc-open
but is limited.
Todo: Move in all ext-parallel
like functionality from external Coroutine
library.
A previous PR of a fork was submitted addressing real Windows support.
When you include this library into your project, you can't execute functions/methods spawn_wait()
, spawn_run()
, wait()
or run()
directly. They are for mainly testing this library locally. You will need to adapt to or create a custom event loop routine.
The Parallel class has a getFutureHandler()
method that returns a FutureHandler instance.
The FutureHandler has two methods processing()
and isEmpty()
that you will need to call within your custom loop routine. These two calls are the same ones the wait()
method calls onto within a while
loop with additional sleepingTime()
.
The processing()
method will monitor/check the Future's state status and execute any appropriate event callback handler.
The FutureHandler class can accept/handle a custom Event Loop that has executeTask(event callback, future)
and isPcntl()
methods defined. The custom Event Loop object should be supplied to Parallel instantiation.
A basic setup to add to your Event Loop
use Async\Spawn\Parallel;
use Async\Spawn\FutureHandler;
use Async\Spawn\FutureInterface;
use Async\Spawn\ParallelInterface;
class setupLoop
{
/**
* @var Parallel
*/
protected $parallel;
/**
* @var FutureHandler
*/
protected $future = null;
public function __construct() {
$this->parallel = new Parallel($this);
$this->future = $this->parallel->getFutureHandler();
}
public function addFuture($callable, int $timeout = 0, bool $display = false, $channel = null): FutureInterface {
$future = $this->parallel->add($callable, $timeout, $channel);
return $display ? $future->displayOn() : $future;
}
public function getParallel(): ParallelInterface {
return $this->parallel;
}
/**
* Check for pending I/O events, signals, futures, streams/sockets/fd activity, timers or etc...
*/
protected function hasEvents(): bool {
return !$this->future->isEmpty() || !$this->ActionEventsCheckers->isEmpty();
}
public function runLoop() {
while ($this->hasEvents()) {
$this->future->processing();
if ($this->waitForAction());
$this->DoEventActions();
}
}
public function executeTask($event, $parameters = null) {
$this->DoEventActions($event, $parameters);
// Or just
// if (\is_callable($event))
// $event($parameters);
}
public function isPcntl(): bool {}
}
This library uses opis/closure package for closure/callable
serialization. For any function or class methods to be accessible in a Future
child process you must make changes to your composer.json
to insure it's picked up. The composer.json
file should contain a pointer to some file with functions you always need, and insure all new classes/namespaces are within added. You can't just make local named functions
or classes
on the fly and expect them to be available.
// composer.json
"autoload": {
"files": [
"Extra/functions.php"
],
"psr-4": {
"Name\\Space\\": ["Folder/"],
"Extra\\Name\\Spaces\\": ["Extra/"]
}
},
// functions.php
if (!\function_exists('___marker')) {
//
// All additional extra functions needed in a `Future` process...
//
function ___marker()
{
return true;
}
}
If an Exception
or Error
is thrown from within a child process, it can be caught per process by specifying a callback in the ->catch()
method.
If there's no error handler added, the error will be thrown in the parent process.
If the child process would unexpectedly stop without throwing an Throwable
, the output written to stderr
will be wrapped and thrown as Async\Spawn\SpawnError
in the parent process.
Contributions are encouraged and welcome; I am always happy to get feedback or pull requests on Github :) Create Github Issues for bugs and new features and comment on the ones you are interested in.
The MIT License (MIT). Please see License File for more information.