Skip to content

Commit

Permalink
Support Windows by using temporary network socket for process I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed Apr 29, 2019
1 parent c957d83 commit 3e1032e
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 34 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ This method returns a promise that will resolve with a `DatabaseInterface` on
success or will reject with an `Exception` on error. The SQLite extension
is inherently blocking, so this method will spawn an SQLite worker process
to run all SQLite commands and queries in a separate process without
blocking the main process.
blocking the main process. On Windows, it uses a temporary network socket
for this communication, on all other platforms it communicates over
standard process I/O pipes.

```php
$factory->open('users.db')->then(function (DatabaseInterface $db) {
Expand Down
40 changes: 36 additions & 4 deletions res/sqlite-worker.php
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
<?php

// This child worker process will be started by the main process to start communication over process pipe I/O
//
// Communication happens via newline-delimited JSON-RPC messages, see:
// $ php res/sqlite-worker.php
// < {"id":0,"method":"open","params":["test.db"]}
// > {"id":0,"result":true}
//
// Or via socket connection (used for Windows, which does not support non-blocking process pipe I/O)
// $ nc localhost 8080
// $ php res/sqlite-worker.php localhost:8080

use Clue\React\NDJson\Decoder;
use Clue\React\NDJson\Encoder;
use React\EventLoop\Factory;
use React\Stream\DuplexResourceStream;
use React\Stream\ReadableResourceStream;
use React\Stream\ThroughStream;
use React\Stream\WritableResourceStream;
use Clue\React\NDJson\Decoder;
use Clue\React\NDJson\Encoder;

if (file_exists(__DIR__ . '/../vendor/autoload.php')) {
// local project development, go from /res to /vendor
Expand All @@ -15,8 +28,27 @@
}

$loop = Factory::create();
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));

if (isset($_SERVER['argv'][1])) {
// socket address given, so try to connect through socket (Windows)
$socket = stream_socket_client($_SERVER['argv'][1]);
$stream = new DuplexResourceStream($socket, $loop);

// pipe input through a wrapper stream so that an error on the input stream
// will not immediately close the output stream without a chance to report
// this error through the output stream.
$through = new ThroughStream();
$stream->on('data', function ($data) use ($through) {
$through->write($data);
});

$in = new Decoder($through);
$out = new Encoder($stream);
} else {
// no socket address given, use process I/O pipes
$in = new Decoder(new ReadableResourceStream(\STDIN, $loop));
$out = new Encoder(new WritableResourceStream(\STDOUT, $loop));
}

// report error when input is invalid NDJSON
$in->on('error', function (Exception $e) use ($out) {
Expand Down
95 changes: 94 additions & 1 deletion src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
use React\ChildProcess\Process;
use React\EventLoop\LoopInterface;
use Clue\React\SQLite\Io\ProcessIoDatabase;
use React\Stream\DuplexResourceStream;
use React\Promise\Deferred;
use React\Stream\ThroughStream;

class Factory
{
private $loop;

private $useSocket;

/**
* The `Factory` is responsible for opening your [`DatabaseInterface`](#databaseinterface) instance.
* It also registers everything with the main [`EventLoop`](https://github.com/reactphp/event-loop#usage).
Expand All @@ -24,6 +29,9 @@ class Factory
public function __construct(LoopInterface $loop)
{
$this->loop = $loop;

// use socket I/O for Windows only, use faster process pipes everywhere else
$this->useSocket = DIRECTORY_SEPARATOR === '\\';
}

/**
Expand All @@ -33,7 +41,9 @@ public function __construct(LoopInterface $loop)
* success or will reject with an `Exception` on error. The SQLite extension
* is inherently blocking, so this method will spawn an SQLite worker process
* to run all SQLite commands and queries in a separate process without
* blocking the main process.
* blocking the main process. On Windows, it uses a temporary network socket
* for this communication, on all other platforms it communicates over
* standard process I/O pipes.
*
* ```php
* $factory->open('users.db')->then(function (DatabaseInterface $db) {
Expand Down Expand Up @@ -62,6 +72,11 @@ public function __construct(LoopInterface $loop)
* @return PromiseInterface<DatabaseInterface> Resolves with DatabaseInterface instance or rejects with Exception
*/
public function open($filename, $flags = null)
{
return $this->useSocket ? $this->openSocketIo($filename, $flags) : $this->openProcessIo($filename, $flags);
}

private function openProcessIo($filename, $flags = null)
{
$command = 'exec ' . \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');

Expand Down Expand Up @@ -121,4 +136,82 @@ public function open($filename, $flags = null)
throw $e;
});
}

private function openSocketIo($filename, $flags = null)
{
$command = \escapeshellarg(\PHP_BINARY) . ' ' . \escapeshellarg(__DIR__ . '/../res/sqlite-worker.php');

// launch process without default STDIO pipes
$null = \DIRECTORY_SEPARATOR === '\\' ? 'nul' : '/dev/null';
$pipes = array(
array('file', $null, 'r'),
array('file', $null, 'w'),
STDERR // array('file', $null, 'w'),
);

// start temporary socket on random address
$server = @stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr);
if ($server === false) {
return \React\Promise\reject(
new \RuntimeException('Unable to start temporary socket I/O server: ' . $errstr, $errno)
);
}

// pass random server address to child process to connect back to parent process
stream_set_blocking($server, false);
$command .= ' ' . stream_socket_get_name($server, false);

$process = new Process($command, null, null, $pipes);
$process->start($this->loop);

$deferred = new Deferred(function () use ($process, $server) {
$this->loop->removeReadStream($server);
fclose($server);
$process->terminate();

throw new \RuntimeException('Opening database cancelled');
});

// time out after a few seconds if we don't receive a connection
$timeout = $this->loop->addTimer(5.0, function () use ($server, $deferred, $process) {
$this->loop->removeReadStream($server);
fclose($server);
$process->terminate();

$deferred->reject(new \RuntimeException('No connection detected'));
});

$this->loop->addReadStream($server, function () use ($server, $timeout, $filename, $flags, $deferred, $process) {
// accept once connection on server socket and stop server socket
$this->loop->cancelTimer($timeout);
$peer = stream_socket_accept($server, 0);
$this->loop->removeReadStream($server);
fclose($server);

// use this one connection as fake process I/O streams
$connection = new DuplexResourceStream($peer, $this->loop, -1);
$process->stdin = $process->stdout = $connection;
$connection->on('close', function () use ($process) {
$process->terminate();
});
$process->on('exit', function () use ($connection) {
$connection->close();
});

$db = new ProcessIoDatabase($process);
$args = array($filename);
if ($flags !== null) {
$args[] = $flags;
}

$db->send('open', $args)->then(function () use ($deferred, $db) {
$deferred->resolve($db);
}, function ($e) use ($deferred, $db) {
$db->close();
$deferred->reject($e);
});
});

return $deferred->promise();
}
}
8 changes: 6 additions & 2 deletions src/Io/ProcessIoDatabase.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ public function quit()
{
$promise = $this->send('close', array());

$this->process->stdin->end();
if ($this->process->stdin === $this->process->stdout) {
$promise->then(function () { $this->process->stdin->close(); });
} else {
$this->process->stdin->end();
}

return $promise;
}
Expand Down Expand Up @@ -120,7 +124,7 @@ public function close()
/** @internal */
public function send($method, array $params)
{
if (!$this->process->stdin->isWritable()) {
if ($this->closed || !$this->process->stdin->isWritable()) {
return \React\Promise\reject(new \RuntimeException('Database closed'));
}

Expand Down
Loading

0 comments on commit 3e1032e

Please sign in to comment.