From 1108e757478b6aade7c31a0ad9ed22c87940977d Mon Sep 17 00:00:00 2001 From: SeinopSys Date: Tue, 9 May 2017 20:24:29 +0200 Subject: [PATCH 1/4] Add Socket.IO v2 support --- src/Client.php | 5 +- src/Engine/AbstractSocketIO.php | 5 +- src/Engine/SocketIO/Session.php | 7 +- src/Engine/SocketIO/Version0X.php | 2 +- src/Engine/SocketIO/Version2X.php | 238 ++++++++++++++++++++++++++++++ src/Payload/Encoder.php | 3 +- 6 files changed, 255 insertions(+), 5 deletions(-) create mode 100644 src/Engine/SocketIO/Version2X.php diff --git a/src/Client.php b/src/Client.php index 9f09541..be694a7 100644 --- a/src/Client.php +++ b/src/Client.php @@ -78,7 +78,7 @@ public function initialize($keepAlive = false) /** * Reads a message from the socket * - * @return MessageInterface Message read from the socket + * @return string Message read from the socket */ public function read() { @@ -89,6 +89,9 @@ public function read() /** * Emits a message through the engine * + * @param string $event + * @param array $args + * * @return $this */ public function emit($event, array $args) diff --git a/src/Engine/AbstractSocketIO.php b/src/Engine/AbstractSocketIO.php index 0ad6483..7120e7b 100644 --- a/src/Engine/AbstractSocketIO.php +++ b/src/Engine/AbstractSocketIO.php @@ -12,6 +12,7 @@ namespace ElephantIO\Engine; use DomainException; +use ElephantIO\Engine\SocketIO\Session; use RuntimeException; use Psr\Log\LoggerInterface; @@ -37,7 +38,7 @@ abstract class AbstractSocketIO implements EngineInterface /** @var array cookies received during handshake */ protected $cookies = []; - /** @var string[] Session information */ + /** @var Session Session information */ protected $session; /** @var mixed[] Array of options for the engine */ @@ -190,6 +191,8 @@ public function getName() /** * Parse an url into parts we may expect * + * @param string $url + * * @return string[] information on the given URL */ protected function parseUrl($url) diff --git a/src/Engine/SocketIO/Session.php b/src/Engine/SocketIO/Session.php index 27ac94e..0c5b668 100644 --- a/src/Engine/SocketIO/Session.php +++ b/src/Engine/SocketIO/Session.php @@ -42,7 +42,12 @@ public function __construct($id, $interval, $timeout, array $upgrades) 'interval' => $interval]; } - /** The property should not be modified, hence the private accessibility on them */ + /** + * The property should not be modified, hence the private accessibility on them + * + * @param string $prop + * @return mixed + */ public function __get($prop) { static $list = ['id', 'upgrades']; diff --git a/src/Engine/SocketIO/Version0X.php b/src/Engine/SocketIO/Version0X.php index 8d75f80..8b388c3 100644 --- a/src/Engine/SocketIO/Version0X.php +++ b/src/Engine/SocketIO/Version0X.php @@ -68,7 +68,7 @@ public function connect() $this->stream = stream_socket_client($host, $errors[0], $errors[1], $this->options['timeout'], STREAM_CLIENT_CONNECT, stream_context_create($this->context)); if (!is_resource($this->stream)) { - throw new SocketException($error[0], $error[1]); + throw new SocketException($errors[0], $errors[1]); } stream_set_timeout($this->stream, $this->options['timeout']); diff --git a/src/Engine/SocketIO/Version2X.php b/src/Engine/SocketIO/Version2X.php new file mode 100644 index 0000000..db1f9de --- /dev/null +++ b/src/Engine/SocketIO/Version2X.php @@ -0,0 +1,238 @@ + + * @link https://tools.ietf.org/html/rfc6455#section-5.2 Websocket's RFC + */ +class Version2X extends AbstractSocketIO +{ + const TRANSPORT_POLLING = 'polling'; + const TRANSPORT_WEBSOCKET = 'websocket'; + + /** {@inheritDoc} */ + public function connect() + { + if (is_resource($this->stream)) { + return; + } + + $this->handshake(); + + $errors = [null, null]; + $host = sprintf('%s:%d', $this->url['host'], $this->url['port']); + + if (true === $this->url['secured']) { + $host = 'ssl://' . $host; + } + + $this->stream = stream_socket_client($host, $errors[0], $errors[1], $this->options['timeout'], STREAM_CLIENT_CONNECT, stream_context_create($this->context)); + + if (!is_resource($this->stream)) { + throw new SocketException($errors[0], $errors[1]); + } + + stream_set_timeout($this->stream, $this->options['timeout']); + + $this->upgradeTransport(); + } + + /** {@inheritDoc} */ + public function close() + { + if (!is_resource($this->stream)) { + return; + } + + $this->write(EngineInterface::CLOSE); + + fclose($this->stream); + $this->stream = null; + $this->session = null; + $this->cookies = []; + } + + /** {@inheritDoc} */ + public function emit($event, array $args) + { + $namespace = $this->namespace; + + if ('' !== $namespace) { + $namespace .= ','; + } + + return $this->write(EngineInterface::MESSAGE, static::EVENT . $namespace . json_encode([$event, $args])); + } + + /** {@inheritDoc} */ + public function of($namespace) { + parent::of($namespace); + + $this->write(EngineInterface::MESSAGE, static::CONNECT . $namespace); + } + + /** {@inheritDoc} */ + public function write($code, $message = null) + { + if (!is_resource($this->stream)) { + return; + } + + if (!is_int($code) || 0 > $code || 6 < $code) { + throw new InvalidArgumentException('Wrong message type when trying to write on the socket'); + } + + $payload = new Encoder($code . $message, Encoder::OPCODE_TEXT, true); + $bytes = fwrite($this->stream, (string) $payload); + + // wait a little bit of time after this message was sent + usleep((int) $this->options['wait']); + + return $bytes; + } + + /** {@inheritDoc} */ + public function getName() + { + return 'SocketIO Version 1.X'; + } + + /** {@inheritDoc} */ + protected function getDefaultOptions() + { + $defaults = parent::getDefaultOptions(); + + $defaults['version'] = 3; + $defaults['use_b64'] = false; + $defaults['transport'] = static::TRANSPORT_POLLING; + + return $defaults; + } + + /** Does the handshake with the Socket.io server and populates the `session` value object */ + protected function handshake() + { + if (null !== $this->session) { + return; + } + + $query = ['use_b64' => $this->options['use_b64'], + 'EIO' => $this->options['version'], + 'transport' => $this->options['transport']]; + + if (isset($this->url['query'])) { + $query = array_replace($query, $this->url['query']); + } + + $context = $this->context; + + if (!isset($context[$this->url['secured'] ? 'ssl' : 'http'])) { + $context[$this->url['secured'] ? 'ssl' : 'http'] = []; + } + + $context[$this->url['secured'] ? 'ssl' : 'http']['timeout'] = (float) $this->options['timeout']; + + $url = sprintf('%s://%s:%d/%s/?%s', $this->url['scheme'], $this->url['host'], $this->url['port'], trim($this->url['path'], '/'), http_build_query($query)); + $result = @file_get_contents($url, false, stream_context_create($context)); + + if (false === $result) { + throw new ServerConnectionFailureException; + } + + $open_curly_at = strpos($result, '{'); + $todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1); + $decoded = json_decode($todecode, true); + + if (!in_array('websocket', $decoded['upgrades'])) { + throw new UnsupportedTransportException('websocket'); + } + + $cookies = []; + foreach ($http_response_header as $header) { + if (preg_match('/^Set-Cookie:\s*([^;]*)/i', $header, $matches)) { + $cookies[] = $matches[1]; + } + } + $this->cookies = $cookies; + + $this->session = new Session($decoded['sid'], $decoded['pingInterval'], $decoded['pingTimeout'], $decoded['upgrades']); + } + + /** Upgrades the transport to WebSocket */ + private function upgradeTransport() + { + $query = ['sid' => $this->session->id, + 'EIO' => $this->options['version'], + 'transport' => static::TRANSPORT_WEBSOCKET]; + + $url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query)); + $key = base64_encode(random_bytes(16)); + + $origin = '*'; + $headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ; + + foreach ($headers as $header) { + $matches = []; + + if (preg_match('`^Origin:\s*(.+?)$`', $header, $matches)) { + $origin = $matches[1]; + break; + } + } + + $request = "GET {$url} HTTP/1.1\r\n" + . "Host: {$this->url['host']}:{$this->url['port']}\r\n" + . "Upgrade: websocket\r\n" + . "Connection: Upgrade\r\n" + . "Sec-WebSocket-Key: {$key}\r\n" + . "Sec-WebSocket-Version: 13\r\n" + . "Origin: {$origin}\r\n"; + + if (!empty($this->cookies)) { + $request .= "Cookie: " . implode('; ', $this->cookies) . "\r\n"; + } + + $request .= "\r\n"; + + fwrite($this->stream, $request); + $result = fread($this->stream, 12); + + if ('HTTP/1.1 101' !== $result) { + throw new UnexpectedValueException(sprintf('The server returned an unexpected value. Expected "HTTP/1.1 101", had "%s"', $result)); + } + + // cleaning up the stream + while ('' !== trim(fgets($this->stream))); + + $this->write(EngineInterface::UPGRADE); + } +} + diff --git a/src/Payload/Encoder.php b/src/Payload/Encoder.php index 2eb7725..4a51d56 100644 --- a/src/Payload/Encoder.php +++ b/src/Payload/Encoder.php @@ -25,11 +25,12 @@ class Encoder extends AbstractPayload { private $data; + /** @var string */ private $payload; /** * @param string $data data to encode - * @param integer $opcode OpCode to use (one of AbstractPayload's constant) + * @param integer $opCode OpCode to use (one of AbstractPayload's constant) * @param bool $mask Should we use a mask ? */ public function __construct($data, $opCode, $mask) From f3aa107852a7ecc64e9653ef48ae0b67c4f427cb Mon Sep 17 00:00:00 2001 From: SeinopSys Date: Tue, 9 May 2017 23:25:20 +0200 Subject: [PATCH 2/4] Addressing concerns --- src/Engine/SocketIO/Version1X.php | 11 +- src/Engine/SocketIO/Version2X.php | 191 +----------------------------- 2 files changed, 11 insertions(+), 191 deletions(-) diff --git a/src/Engine/SocketIO/Version1X.php b/src/Engine/SocketIO/Version1X.php index 7783650..a4c844e 100644 --- a/src/Engine/SocketIO/Version1X.php +++ b/src/Engine/SocketIO/Version1X.php @@ -167,7 +167,9 @@ protected function handshake() throw new ServerConnectionFailureException; } - $decoded = json_decode(substr($result, strpos($result, '{')), true); + $open_curly_at = strpos($result, '{'); + $todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1); + $decoded = json_decode($todecode, true); if (!in_array('websocket', $decoded['upgrades'])) { throw new UnsupportedTransportException('websocket'); @@ -193,7 +195,7 @@ private function upgradeTransport() 'transport' => static::TRANSPORT_WEBSOCKET]; $url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query)); - $key = base64_encode(sha1(uniqid(mt_rand(), true), true)); + $key = base64_encode(random_bytes(20)); $origin = '*'; $headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ; @@ -208,7 +210,7 @@ private function upgradeTransport() } $request = "GET {$url} HTTP/1.1\r\n" - . "Host: {$this->url['host']}\r\n" + . "Host: {$this->url['host']}:{$this->url['port']}\r\n" . "Upgrade: WebSocket\r\n" . "Connection: Upgrade\r\n" . "Sec-WebSocket-Key: {$key}\r\n" @@ -234,7 +236,8 @@ private function upgradeTransport() $this->write(EngineInterface::UPGRADE); //remove message '40' from buffer, emmiting by socket.io after receiving EngineInterface::UPGRADE - $this->read(); + if ($this->options['version'] === 2) + $this->read(); } } diff --git a/src/Engine/SocketIO/Version2X.php b/src/Engine/SocketIO/Version2X.php index db1f9de..a3372b3 100644 --- a/src/Engine/SocketIO/Version2X.php +++ b/src/Engine/SocketIO/Version2X.php @@ -26,103 +26,20 @@ use ElephantIO\Exception\ServerConnectionFailureException; /** - * Implements the dialog with Socket.IO version 1.x + * Implements the dialog with Socket.IO version 2.x * * Based on the work of Mathieu Lallemand (@lalmat) * * @author Baptiste ClaviƩ * @link https://tools.ietf.org/html/rfc6455#section-5.2 Websocket's RFC */ -class Version2X extends AbstractSocketIO +class Version2X extends Version1X { - const TRANSPORT_POLLING = 'polling'; - const TRANSPORT_WEBSOCKET = 'websocket'; - - /** {@inheritDoc} */ - public function connect() - { - if (is_resource($this->stream)) { - return; - } - - $this->handshake(); - - $errors = [null, null]; - $host = sprintf('%s:%d', $this->url['host'], $this->url['port']); - - if (true === $this->url['secured']) { - $host = 'ssl://' . $host; - } - - $this->stream = stream_socket_client($host, $errors[0], $errors[1], $this->options['timeout'], STREAM_CLIENT_CONNECT, stream_context_create($this->context)); - - if (!is_resource($this->stream)) { - throw new SocketException($errors[0], $errors[1]); - } - - stream_set_timeout($this->stream, $this->options['timeout']); - - $this->upgradeTransport(); - } - - /** {@inheritDoc} */ - public function close() - { - if (!is_resource($this->stream)) { - return; - } - - $this->write(EngineInterface::CLOSE); - - fclose($this->stream); - $this->stream = null; - $this->session = null; - $this->cookies = []; - } - - /** {@inheritDoc} */ - public function emit($event, array $args) - { - $namespace = $this->namespace; - - if ('' !== $namespace) { - $namespace .= ','; - } - - return $this->write(EngineInterface::MESSAGE, static::EVENT . $namespace . json_encode([$event, $args])); - } - - /** {@inheritDoc} */ - public function of($namespace) { - parent::of($namespace); - - $this->write(EngineInterface::MESSAGE, static::CONNECT . $namespace); - } - - /** {@inheritDoc} */ - public function write($code, $message = null) - { - if (!is_resource($this->stream)) { - return; - } - - if (!is_int($code) || 0 > $code || 6 < $code) { - throw new InvalidArgumentException('Wrong message type when trying to write on the socket'); - } - - $payload = new Encoder($code . $message, Encoder::OPCODE_TEXT, true); - $bytes = fwrite($this->stream, (string) $payload); - - // wait a little bit of time after this message was sent - usleep((int) $this->options['wait']); - - return $bytes; - } /** {@inheritDoc} */ public function getName() { - return 'SocketIO Version 1.X'; + return 'SocketIO Version 2.X'; } /** {@inheritDoc} */ @@ -130,109 +47,9 @@ protected function getDefaultOptions() { $defaults = parent::getDefaultOptions(); - $defaults['version'] = 3; - $defaults['use_b64'] = false; - $defaults['transport'] = static::TRANSPORT_POLLING; + $defaults['version'] = 3; return $defaults; } - - /** Does the handshake with the Socket.io server and populates the `session` value object */ - protected function handshake() - { - if (null !== $this->session) { - return; - } - - $query = ['use_b64' => $this->options['use_b64'], - 'EIO' => $this->options['version'], - 'transport' => $this->options['transport']]; - - if (isset($this->url['query'])) { - $query = array_replace($query, $this->url['query']); - } - - $context = $this->context; - - if (!isset($context[$this->url['secured'] ? 'ssl' : 'http'])) { - $context[$this->url['secured'] ? 'ssl' : 'http'] = []; - } - - $context[$this->url['secured'] ? 'ssl' : 'http']['timeout'] = (float) $this->options['timeout']; - - $url = sprintf('%s://%s:%d/%s/?%s', $this->url['scheme'], $this->url['host'], $this->url['port'], trim($this->url['path'], '/'), http_build_query($query)); - $result = @file_get_contents($url, false, stream_context_create($context)); - - if (false === $result) { - throw new ServerConnectionFailureException; - } - - $open_curly_at = strpos($result, '{'); - $todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1); - $decoded = json_decode($todecode, true); - - if (!in_array('websocket', $decoded['upgrades'])) { - throw new UnsupportedTransportException('websocket'); - } - - $cookies = []; - foreach ($http_response_header as $header) { - if (preg_match('/^Set-Cookie:\s*([^;]*)/i', $header, $matches)) { - $cookies[] = $matches[1]; - } - } - $this->cookies = $cookies; - - $this->session = new Session($decoded['sid'], $decoded['pingInterval'], $decoded['pingTimeout'], $decoded['upgrades']); - } - - /** Upgrades the transport to WebSocket */ - private function upgradeTransport() - { - $query = ['sid' => $this->session->id, - 'EIO' => $this->options['version'], - 'transport' => static::TRANSPORT_WEBSOCKET]; - - $url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query)); - $key = base64_encode(random_bytes(16)); - - $origin = '*'; - $headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ; - - foreach ($headers as $header) { - $matches = []; - - if (preg_match('`^Origin:\s*(.+?)$`', $header, $matches)) { - $origin = $matches[1]; - break; - } - } - - $request = "GET {$url} HTTP/1.1\r\n" - . "Host: {$this->url['host']}:{$this->url['port']}\r\n" - . "Upgrade: websocket\r\n" - . "Connection: Upgrade\r\n" - . "Sec-WebSocket-Key: {$key}\r\n" - . "Sec-WebSocket-Version: 13\r\n" - . "Origin: {$origin}\r\n"; - - if (!empty($this->cookies)) { - $request .= "Cookie: " . implode('; ', $this->cookies) . "\r\n"; - } - - $request .= "\r\n"; - - fwrite($this->stream, $request); - $result = fread($this->stream, 12); - - if ('HTTP/1.1 101' !== $result) { - throw new UnexpectedValueException(sprintf('The server returned an unexpected value. Expected "HTTP/1.1 101", had "%s"', $result)); - } - - // cleaning up the stream - while ('' !== trim(fgets($this->stream))); - - $this->write(EngineInterface::UPGRADE); - } } From 584d708deaa17bd847039cfcfbcf456306216101 Mon Sep 17 00:00:00 2001 From: SeinopSys Date: Tue, 9 May 2017 23:54:12 +0200 Subject: [PATCH 3/4] Addressing concerns --- src/Engine/SocketIO/Version1X.php | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Engine/SocketIO/Version1X.php b/src/Engine/SocketIO/Version1X.php index a4c844e..ae5a5c7 100644 --- a/src/Engine/SocketIO/Version1X.php +++ b/src/Engine/SocketIO/Version1X.php @@ -187,15 +187,17 @@ protected function handshake() } /** Upgrades the transport to WebSocket */ - private function upgradeTransport() + protected function upgradeTransport() { $query = ['sid' => $this->session->id, 'EIO' => $this->options['version'], - 'use_b64' => $this->options['use_b64'], 'transport' => static::TRANSPORT_WEBSOCKET]; + if ($this->options['version'] === 2) + $query['use_b64'] = $this->options['use_b64']; + $url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query)); - $key = base64_encode(random_bytes(20)); + $key = base64_encode(openssl_random_pseudo_bytes(16)); $origin = '*'; $headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ; From f3b71e6b2feb73e21f4cbd719eb0cbb5139d97ce Mon Sep 17 00:00:00 2001 From: SeinopSys Date: Wed, 10 May 2017 00:04:18 +0200 Subject: [PATCH 4/4] Addressing concerns --- src/Engine/SocketIO/Version1X.php | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/Engine/SocketIO/Version1X.php b/src/Engine/SocketIO/Version1X.php index ae5a5c7..680fe46 100644 --- a/src/Engine/SocketIO/Version1X.php +++ b/src/Engine/SocketIO/Version1X.php @@ -167,8 +167,8 @@ protected function handshake() throw new ServerConnectionFailureException; } - $open_curly_at = strpos($result, '{'); - $todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1); + $open_curly_at = strpos($result, '{'); + $todecode = substr($result, $open_curly_at, strrpos($result, '}')-$open_curly_at+1); $decoded = json_decode($todecode, true); if (!in_array('websocket', $decoded['upgrades'])) { @@ -186,18 +186,27 @@ protected function handshake() $this->session = new Session($decoded['sid'], $decoded['pingInterval'], $decoded['pingTimeout'], $decoded['upgrades']); } - /** Upgrades the transport to WebSocket */ + /** + * Upgrades the transport to WebSocket + * + * FYI: + * Version "2" is used for the EIO param by socket.io v1 + * Version "3" is used by socket.io v2 + */ protected function upgradeTransport() { $query = ['sid' => $this->session->id, 'EIO' => $this->options['version'], 'transport' => static::TRANSPORT_WEBSOCKET]; - if ($this->options['version'] === 2) - $query['use_b64'] = $this->options['use_b64']; + if ($this->options['version'] === 2) + $query['use_b64'] = $this->options['use_b64']; $url = sprintf('/%s/?%s', trim($this->url['path'], '/'), http_build_query($query)); - $key = base64_encode(openssl_random_pseudo_bytes(16)); + $hash = sha1(uniqid(mt_rand(), true), true); + if ($this->options['version'] !== 2) + $hash = substr($hash, 0, 16); + $key = base64_encode($hash); $origin = '*'; $headers = isset($this->context['headers']) ? (array) $this->context['headers'] : [] ;