Skip to content
This repository has been archived by the owner on Aug 25, 2022. It is now read-only.

Add Socket.IO v2 support #152

Merged
merged 4 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public function initialize($keepAlive = false)
/**
* Reads a message from the socket
*
* @return MessageInterface Message read from the socket
* @return string Message read from the socket
*/
public function read()
{
Expand All @@ -89,6 +89,9 @@ public function read()
/**
* Emits a message through the engine
*
* @param string $event
* @param array $args
*
* @return $this
*/
public function emit($event, array $args)
Expand Down
5 changes: 4 additions & 1 deletion src/Engine/AbstractSocketIO.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace ElephantIO\Engine;

use DomainException;
use ElephantIO\Engine\SocketIO\Session;
use RuntimeException;

use Psr\Log\LoggerInterface;
Expand All @@ -37,7 +38,7 @@ abstract class AbstractSocketIO implements EngineInterface
/** @var array cookies received during handshake */
protected $cookies = [];

/** @var string[] Session information */
/** @var Session Session information */
protected $session;

/** @var mixed[] Array of options for the engine */
Expand Down Expand Up @@ -190,6 +191,8 @@ public function getName()
/**
* Parse an url into parts we may expect
*
* @param string $url
*
* @return string[] information on the given URL
*/
protected function parseUrl($url)
Expand Down
7 changes: 6 additions & 1 deletion src/Engine/SocketIO/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ public function __construct($id, $interval, $timeout, array $upgrades)
'interval' => $interval];
}

/** The property should not be modified, hence the private accessibility on them */
/**
* The property should not be modified, hence the private accessibility on them
*
* @param string $prop
* @return mixed
*/
public function __get($prop)
{
static $list = ['id', 'upgrades'];
Expand Down
2 changes: 1 addition & 1 deletion src/Engine/SocketIO/Version0X.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function connect()
$this->stream = stream_socket_client($host, $errors[0], $errors[1], $this->options['timeout'], STREAM_CLIENT_CONNECT, stream_context_create($this->context));

if (!is_resource($this->stream)) {
throw new SocketException($error[0], $error[1]);
throw new SocketException($errors[0], $errors[1]);
}

stream_set_timeout($this->stream, $this->options['timeout']);
Expand Down
238 changes: 238 additions & 0 deletions src/Engine/SocketIO/Version2X.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
<?php
/**
* This file is part of the Elephant.io package
*
* For the full copyright and license information, please view the LICENSE file
* that was distributed with this source code.
*
* @copyright Wisembly
* @license http://www.opensource.org/licenses/MIT-License MIT License
*/

namespace ElephantIO\Engine\SocketIO;

use DomainException;
use InvalidArgumentException;
use UnexpectedValueException;

use Psr\Log\LoggerInterface;

use ElephantIO\EngineInterface;
use ElephantIO\Payload\Encoder;
use ElephantIO\Engine\AbstractSocketIO;

use ElephantIO\Exception\SocketException;
use ElephantIO\Exception\UnsupportedTransportException;
use ElephantIO\Exception\ServerConnectionFailureException;

/**
* Implements the dialog with Socket.IO version 1.x
*
* Based on the work of Mathieu Lallemand (@lalmat)
*
* @author Baptiste Clavié <baptiste@wisembly.com>
* @link https://tools.ietf.org/html/rfc6455#section-5.2 Websocket's RFC
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not right

class Version2X extends AbstractSocketIO
{
const TRANSPORT_POLLING = 'polling';
const TRANSPORT_WEBSOCKET = 'websocket';

/** {@inheritDoc} */
public function connect()
{
if (is_resource($this->stream)) {
return;
}

$this->handshake();

$errors = [null, null];
$host = sprintf('%s:%d', $this->url['host'], $this->url['port']);

if (true === $this->url['secured']) {
$host = 'ssl://' . $host;
}

$this->stream = stream_socket_client($host, $errors[0], $errors[1], $this->options['timeout'], STREAM_CLIENT_CONNECT, stream_context_create($this->context));

if (!is_resource($this->stream)) {
throw new SocketException($errors[0], $errors[1]);
}

stream_set_timeout($this->stream, $this->options['timeout']);

$this->upgradeTransport();
}

/** {@inheritDoc} */
public function close()
{
if (!is_resource($this->stream)) {
return;
}

$this->write(EngineInterface::CLOSE);

fclose($this->stream);
$this->stream = null;
$this->session = null;
$this->cookies = [];
}

/** {@inheritDoc} */
public function emit($event, array $args)
{
$namespace = $this->namespace;

if ('' !== $namespace) {
$namespace .= ',';
}

return $this->write(EngineInterface::MESSAGE, static::EVENT . $namespace . json_encode([$event, $args]));
}

/** {@inheritDoc} */
public function of($namespace) {
parent::of($namespace);

$this->write(EngineInterface::MESSAGE, static::CONNECT . $namespace);
}

/** {@inheritDoc} */
public function write($code, $message = null)
{
if (!is_resource($this->stream)) {
return;
}

if (!is_int($code) || 0 > $code || 6 < $code) {
throw new InvalidArgumentException('Wrong message type when trying to write on the socket');
}

$payload = new Encoder($code . $message, Encoder::OPCODE_TEXT, true);
$bytes = fwrite($this->stream, (string) $payload);

// wait a little bit of time after this message was sent
usleep((int) $this->options['wait']);

return $bytes;
}

/** {@inheritDoc} */
public function getName()
{
return 'SocketIO Version 1.X';
}

/** {@inheritDoc} */
protected function getDefaultOptions()
{
$defaults = parent::getDefaultOptions();

$defaults['version'] = 3;
$defaults['use_b64'] = false;
$defaults['transport'] = static::TRANSPORT_POLLING;

return $defaults;
}

/** Does the handshake with the Socket.io server and populates the `session` value object */
protected function handshake()
{
if (null !== $this->session) {
return;
}

$query = ['use_b64' => $this->options['use_b64'],
'EIO' => $this->options['version'],
'transport' => $this->options['transport']];

if (isset($this->url['query'])) {
$query = array_replace($query, $this->url['query']);
}

$context = $this->context;

if (!isset($context[$this->url['secured'] ? 'ssl' : 'http'])) {
$context[$this->url['secured'] ? 'ssl' : 'http'] = [];
}

$context[$this->url['secured'] ? 'ssl' : 'http']['timeout'] = (float) $this->options['timeout'];

$url = sprintf('%s://%s:%d/%s/?%s', $this->url['scheme'], $this->url['host'], $this->url['port'], trim($this->url['path'], '/'), http_build_query($query));
$result = @file_get_contents($url, false, stream_context_create($context));

if (false === $result) {
throw new ServerConnectionFailureException;
}

$open_curly_at = strpos($result, '{');
$todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1);
$decoded = json_decode($todecode, true);

if (!in_array('websocket', $decoded['upgrades'])) {
throw new UnsupportedTransportException('websocket');
}

$cookies = [];
foreach ($http_response_header as $header) {
if (preg_match('/^Set-Cookie:\s*([^;]*)/i', $header, $matches)) {
$cookies[] = $matches[1];
}
}
$this->cookies = $cookies;

$this->session = new Session($decoded['sid'], $decoded['pingInterval'], $decoded['pingTimeout'], $decoded['upgrades']);
}

/** Upgrades the transport to WebSocket */
private function upgradeTransport()
{
$query = ['sid' => $this->session->id,
'EIO' => $this->options['version'],
'transport' => static::TRANSPORT_WEBSOCKET];

$url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query));
$key = base64_encode(random_bytes(16));

$origin = '*';
$headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ;

foreach ($headers as $header) {
$matches = [];

if (preg_match('`^Origin:\s*(.+?)$`', $header, $matches)) {
$origin = $matches[1];
break;
}
}

$request = "GET {$url} HTTP/1.1\r\n"
. "Host: {$this->url['host']}:{$this->url['port']}\r\n"
. "Upgrade: websocket\r\n"
. "Connection: Upgrade\r\n"
. "Sec-WebSocket-Key: {$key}\r\n"
. "Sec-WebSocket-Version: 13\r\n"
. "Origin: {$origin}\r\n";

if (!empty($this->cookies)) {
$request .= "Cookie: " . implode('; ', $this->cookies) . "\r\n";
}

$request .= "\r\n";

fwrite($this->stream, $request);
$result = fread($this->stream, 12);

if ('HTTP/1.1 101' !== $result) {
throw new UnexpectedValueException(sprintf('The server returned an unexpected value. Expected "HTTP/1.1 101", had "%s"', $result));
}

// cleaning up the stream
while ('' !== trim(fgets($this->stream)));

$this->write(EngineInterface::UPGRADE);
}
}

3 changes: 2 additions & 1 deletion src/Payload/Encoder.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
class Encoder extends AbstractPayload
{
private $data;
/** @var string */
private $payload;

/**
* @param string $data data to encode
* @param integer $opcode OpCode to use (one of AbstractPayload's constant)
* @param integer $opCode OpCode to use (one of AbstractPayload's constant)
* @param bool $mask Should we use a mask ?
*/
public function __construct($data, $opCode, $mask)
Expand Down