diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..df55cd7 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,20 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = false + +[*.{vue,js,scss}] +charset = utf-8 +indent_style = space +indent_size = 2 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.md] +trim_trailing_whitespace = false \ No newline at end of file diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..9af3157 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,11 @@ +* text=auto + +/tests export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.scrutinizer.yml export-ignore +.travis.yml export-ignore +phpunit.php export-ignore +phpunit.xml.dist export-ignore +phpunit.xml export-ignore +.php_cs export-ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..497c4e2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.idea +*.DS_Store +/vendor +/coverage +sftp-config.json +composer.lock +.subsplit +.php_cs.cache diff --git a/.php_cs b/.php_cs new file mode 100644 index 0000000..370423c --- /dev/null +++ b/.php_cs @@ -0,0 +1,27 @@ + + +This source file is subject to the MIT license that is bundled. +EOF; + +return PhpCsFixer\Config::create() + ->setRiskyAllowed(true) + ->setRules(array( + '@Symfony' => true, + 'header_comment' => array('header' => $header), + 'array_syntax' => array('syntax' => 'short'), + 'ordered_imports' => true, + 'no_useless_else' => true, + 'no_useless_return' => true, + 'php_unit_construct' => true, + 'php_unit_strict' => true, + )) + ->setFinder( + PhpCsFixer\Finder::create() + ->exclude('vendor') + ->in(__DIR__) + ) +; \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e5ef4e4 --- /dev/null +++ b/README.md @@ -0,0 +1,28 @@ +

websocket-client

+ +

A PHP WebSocket client..

+ + +## Installing + +```shell +$ composer require overtrue/websocket-client -vvv +``` + +## Usage + +TODO + +## Contributing + +You can contribute in one of three ways: + +1. File bug reports using the [issue tracker](https://github.com/overtrue/websocket-client/issues). +2. Answer questions or fix bugs on the [issue tracker](https://github.com/overtrue/websocket-client/issues). +3. Contribute new features or update the wiki. + +_The code contribution process is not very formal. You just need to make sure that you follow the PSR-0, PSR-1, and PSR-2 coding guidelines. Any new code contributions must be accompanied by unit tests where applicable._ + +## License + +MIT \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..fe49fca --- /dev/null +++ b/composer.json @@ -0,0 +1,19 @@ +{ + "name": "overtrue/websocket", + "description": "A PHP implementation of WebSocket.", + "license": "MIT", + "authors": [ + { + "name": "overtrue", + "email": "anzhengchao@gmail.com" + } + ], + "require": { + "php": "^7.1.3" + }, + "autoload": { + "psr-4": { + "Overtrue\\WebSocket\\": "src" + } + } +} diff --git a/index.php b/index.php new file mode 100644 index 0000000..fa4272d --- /dev/null +++ b/index.php @@ -0,0 +1,30 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +include __DIR__.'/vendor/autoload.php'; + +// +//use WebSocket\Client; +// +//$client = new Client("ws://echo.websocket.org/"); +//$client->send("Hello WebSocket.org!"); +// +//echo $client->receive(); // Will output 'Hello WebSocket.org!' + +//-------------------------------- + +use Overtrue\WebSocket\Client; + +$socket = new Client('ws://echo.websocket.org/'); + +$socket->send('Hello WebSocket.org!'); +$socket->send('Hello world!'); + +var_dump($socket->receive()); diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..e47284c --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,21 @@ + + + + + ./tests/ + + + + + src/ + + + diff --git a/src/Client.php b/src/Client.php new file mode 100644 index 0000000..c27e0d1 --- /dev/null +++ b/src/Client.php @@ -0,0 +1,163 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket; + +use Overtrue\WebSocket\Exceptions\ConnectionException; +use Overtrue\WebSocket\Exceptions\InvalidUriException; + +/** + * Class Client. + */ +class Client extends WebSocket +{ + /** + * @var string + */ + protected $uri; + + /** + * Client constructor. + * + * @param string $uri + * @param array $options + */ + public function __construct(string $uri, array $options = []) + { + if (false === \strpos($uri, '://')) { + $uri = 'ws://'.$uri; + } elseif (0 !== \strpos($uri, 'ws://') && 0 !== \strpos($uri, 'wss://')) { + return new InvalidUriException(\sprintf('Given URI "%s" is invalid', $uri)); + } + + $this->uri = $uri; + $this->options = \array_merge($this->options, $options); + } + + /** + * @param string $payload + * @param string $opcode + * @param bool $masked + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + public function send(string $payload, string $opcode = 'text', bool $masked = true) + { + if (!$this->connected) { + $this->connect(); + } + + parent::send($payload, $opcode, $masked); + } + + /** + * @param bool $try + * + * @return bool|string|null + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + * @throws \Overtrue\WebSocket\Exceptions\InvalidOpcodeException + */ + public function receive(bool $try = false) + { + if (!$this->connected) { + $this->connect(); + } + + return parent::receive($try); + } + + /** + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + public function connect() + { + $segments = \parse_url($this->uri); + $scheme = 'wss' === $segments['scheme'] ? 'ssl' : 'tcp'; + $segments['port'] = $segments['port'] ?? ('wss' === $segments['scheme'] ? 443 : 80); + $url = \sprintf('%s://%s:%s', $scheme, $segments['host'], $segments['port']); + + $this->socket = @\stream_socket_client($url, $errno, $errorMessage, $this->options['timeout'], \STREAM_CLIENT_CONNECT); + + if (!$this->socket) { + throw new ConnectionException(\sprintf('Unable to connect to socket "%s": [%s]%s', $this->uri, $errno, $errorMessage)); + } + + stream_set_timeout($this->socket, $this->options['timeout']); + + $this->performHandshake(); + + stream_set_blocking($this->socket, false); + + $this->connected = true; + } + + /** + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function performHandshake() + { + $key = base64_encode(\substr(md5(time().mt_rand(0, 100)), 0, 16)); + $segments = array_merge([ + 'path' => '/', + 'query' => '', + 'query' => '', + 'fragment' => '', + 'user' => '', + 'pass' => '', + ], \parse_url($this->uri)); + + $segments['port'] = $segments['port'] ?? ('wss' === $segments['scheme'] ? 443 : 80); + $pathWithQuery = $segments['path']; + + if (!empty($segments['query'])) { + $pathWithQuery .= '?'.$segments['query']; + } + + if (!empty($segments['fragment'])) { + $pathWithQuery .= '#'.$segments['fragment']; + } + + $headers = [ + "GET {$pathWithQuery} HTTP/1.1", + "Host: {$segments['host']}:{$segments['port']}", + 'User-Agent: websocket-client-php', + 'Upgrade: websocket', + 'Connection: Upgrade', + "Sec-WebSocket-Key: {$key}", + 'Sec-WebSocket-Version: 13', + "\r\n", + ]; + + if (!empty($this->options['origin'])) { + $headers[] = "Sec-WebSocket-Origin: {$this->options['origin']}"; + } + + if ($segments['user'] || $segments['pass']) { + $headers['Authorization'] = 'Basic '.base64_encode($segments['user'].':'.$segments['pass']); + } + + if (isset($this->options['headers'])) { + $headers = array_merge($headers, $this->options['headers']); + } + + @\fwrite($this->socket, \join("\r\n", $headers)); + + $response = \stream_socket_recvfrom($this->socket, 1024); + + preg_match('#Sec-WebSocket-Accept:\s(.*)$#mU', $response, $matches); + + if ($matches) { + if (trim($matches[1]) !== base64_encode(pack('H*', sha1($key.self::KEY_SALT)))) { + throw new ConnectionException(\sprintf('Unable to upgrade to socket "%s"', $this->uri)); + } + } + } +} diff --git a/src/Exceptions/BadRequestException.php b/src/Exceptions/BadRequestException.php new file mode 100644 index 0000000..85e4217 --- /dev/null +++ b/src/Exceptions/BadRequestException.php @@ -0,0 +1,18 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket\Exceptions; + +/** + * Class BadRequestException. + */ +class BadRequestException extends Exception +{ +} diff --git a/src/Exceptions/ConnectionException.php b/src/Exceptions/ConnectionException.php new file mode 100644 index 0000000..1fd640e --- /dev/null +++ b/src/Exceptions/ConnectionException.php @@ -0,0 +1,15 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket\Exceptions; + +class ConnectionException extends Exception +{ +} diff --git a/src/Exceptions/Exception.php b/src/Exceptions/Exception.php new file mode 100644 index 0000000..b36bdcd --- /dev/null +++ b/src/Exceptions/Exception.php @@ -0,0 +1,18 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket\Exceptions; + +/** + * Class Exception. + */ +class Exception extends \Exception +{ +} diff --git a/src/Exceptions/InvalidOpcodeException.php b/src/Exceptions/InvalidOpcodeException.php new file mode 100644 index 0000000..344c74c --- /dev/null +++ b/src/Exceptions/InvalidOpcodeException.php @@ -0,0 +1,15 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket\Exceptions; + +class InvalidOpcodeException extends Exception +{ +} diff --git a/src/Exceptions/InvalidUriException.php b/src/Exceptions/InvalidUriException.php new file mode 100644 index 0000000..e846ed2 --- /dev/null +++ b/src/Exceptions/InvalidUriException.php @@ -0,0 +1,15 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket\Exceptions; + +class InvalidUriException extends Exception +{ +} diff --git a/src/Server.php b/src/Server.php new file mode 100644 index 0000000..400b329 --- /dev/null +++ b/src/Server.php @@ -0,0 +1,130 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket; + +use Overtrue\WebSocket\Exceptions\BadRequestException; +use Overtrue\WebSocket\Exceptions\ConnectionException; + +/** + * Class Server. + */ +class Server extends WebSocket +{ + /** + * @var resource + */ + protected $listening; + + /** + * @var array + */ + protected $request; + + /** + * Server constructor. + * + * @param array $options + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + public function __construct(array $options = []) + { + $this->options = \array_merge($this->options, $options); + + $port = $this->options['port'] ?? 8000; + + do { + $this->listening = @stream_socket_server("tcp://0.0.0.0:$port", $errno, $message); + } while (false === $this->listening && $this->port++ < 10000); + + if (!$this->listening) { + throw new ConnectionException('No valid port to listen.'); + } + } + + /** + * @param string $header + * + * @return string|null + */ + public function getHeader(string $header) + { + foreach ($this->request as $row) { + if (false !== stripos($row, $header)) { + list($name, $value) = explode(':', $row); + + return trim($value); + } + } + + return null; + } + + /** + * @return bool|resource + * + * @throws \Overtrue\WebSocket\Exceptions\BadRequestException + */ + public function accept() + { + if (empty($this->options['timeout'])) { + $this->socket = stream_socket_accept($this->listening); + } else { + $this->socket = stream_socket_accept($this->listening, $this->options['timeout']); + stream_set_timeout($this->socket, $this->options['timeout']); + } + + $this->performHandshake(); + + stream_set_blocking($this->socket, false); + + return $this->socket; + } + + /** + * @throws \Overtrue\WebSocket\Exceptions\BadRequestException + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function performHandshake() + { + $body = ''; + do { + $buffer = stream_get_line($this->socket, 1024, "\r\n"); + $body .= $buffer."\n"; + $metadata = stream_get_meta_data($this->socket); + } while (!feof($this->socket) && $metadata['unread_bytes'] > 0); + + if (!preg_match('/GET (.*) HTTP\//mUi', $body, $matches)) { + throw new BadRequestException('Invalid Request headers.'); + } + + $this->request = explode("\n", $body); + + if (!preg_match('#Sec-WebSocket-Key:\s(.*)$#mUi', $body, $matches)) { + throw new BadRequestException('No key found in upgrade request'); + } + + $key = trim($matches[1]); + + // @todo Validate key length and base 64... + $responseKey = base64_encode(pack('H*', sha1($key.self::KEY_SALT))); + + $headers = [ + 'HTTP/1.1 101 Switching Protocols', + 'Upgrade: websocket', + 'Connection: Upgrade', + "Sec-WebSocket-Accept: $responseKey", + "\r\n", + ]; + + $this->write(\join("\r\n", $headers)); + } +} diff --git a/src/WebSocket.php b/src/WebSocket.php new file mode 100644 index 0000000..bd380cb --- /dev/null +++ b/src/WebSocket.php @@ -0,0 +1,547 @@ + + * + * This source file is subject to the MIT license that is bundled. + */ + +namespace Overtrue\WebSocket; + +use Overtrue\WebSocket\Exceptions\ConnectionException; +use Overtrue\WebSocket\Exceptions\InvalidOpcodeException; + +/** + * Class WebSocket. + * + * (c) Fredrik Liljegren + * + * This class is based of Textalk/websocket-php: + * https://github.com/Textalk/websocket-php/blob/master/lib/Base.php + */ +class WebSocket +{ + /** + * @var resource + */ + protected $socket; + + /** + * @var bool + */ + protected $connected = false; + + /** + * @var bool + */ + protected $closing = false; + + /** + * @var string + */ + protected $lastOpcode = null; + + /** + * @var int + */ + protected $closeStatus = null; + + /** + * @var string + */ + protected $hugePayload = ''; + + /** + * @var string + */ + protected $socketBuffer = ''; + + /** + * @var string + */ + protected $unparsedFragment = ''; + + /** + * @var array + */ + protected $options = [ + 'timeout' => 5, + 'fragment_size' => 4096, + ]; + + const OPCODES = [ + 'continuation' => 0, + 'text' => 1, + 'binary' => 2, + 'close' => 8, + 'ping' => 9, + 'pong' => 10, + ]; + + const FIRST_BYTE_MASK = 0b10001111; + const SECOND_BYTE_MASK = 0b11111111; + + const FINAL_BIT = 0b10000000; + const OPCODE_MASK = 0b00001111; + + const MASKED_BIT = 0b10000000; + const PAYLOAD_LENGTH_MASK = 0b01111111; + + const PAYLOAD_LENGTH_16BIT = 0b01111110; + const PAYLOAD_LENGTH_64BIT = 0b01111111; + + const KEY_SALT = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'; + + public function getLastOpcode() + { + return $this->lastOpcode; + } + + public function getCloseStatus() + { + return $this->closeStatus; + } + + /** + * @return bool + */ + public function isConnected() + { + return $this->connected; + } + + /** + * @param int $timeout + */ + public function setTimeout(int $timeout) + { + $this->options['timeout'] = $timeout; + + if ($this->socket && 'stream' === get_resource_type($this->socket)) { + stream_set_timeout($this->socket, $timeout); + } + } + + /** + * @param int $size + * + * @return $this + */ + public function setFragmentSize(int $size) + { + $this->options['fragment_size'] = $size; + + return $this; + } + + /** + * @return int + */ + public function getFragmentSize() + { + return $this->options['fragment_size']; + } + + /** + * @param string $payload + * @param string $opcode + * @param bool $masked + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + * @throws \Overtrue\WebSocket\Exceptions\InvalidOpcodeException + */ + public function send(string $payload, string $opcode = 'text', bool $masked = true) + { + if (!in_array($opcode, array_keys(self::OPCODES))) { + throw new InvalidOpcodeException("Invalid opcode '$opcode'. Try 'text' or 'binary'."); + } + + // record the length of the payload + $payloadLength = strlen($payload); + + $fragmentCursor = 0; + + // while we have data to send + while ($payloadLength > $fragmentCursor) { + // get a fragment of the payload + $subPayload = substr($payload, $fragmentCursor, $this->options['fragment_size']); + + // advance the cursor + $fragmentCursor += $this->options['fragment_size']; + + // is this the final fragment to send? + $final = $payloadLength <= $fragmentCursor; + + // send the fragment + $this->sendFragment($subPayload, $opcode, $final, $masked); + + // all fragments after the first will be marked a continuation + $opcode = 'continuation'; + } + } + + /** + * @param string $payload + * @param string $opcode + * @param bool $final + * @param bool $masked + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function sendFragment(string $payload, string $opcode, bool $final, bool $masked) + { + $frame = [0, 0]; + + // Set final bit + $frame[0] |= self::FINAL_BIT * (bool) $final; + // Set correct opcode + $frame[0] |= self::OPCODE_MASK & self::OPCODES[$opcode]; + // Reset reserved bytes + $frame[0] &= self::FIRST_BYTE_MASK; + + // 7 bits of payload length... + $payloadLength = strlen($payload); + if ($payloadLength > 65535) { + $opcodeLength = self::PAYLOAD_LENGTH_64BIT; + array_push($frame, pack('J', $payloadLength)); + } elseif ($payloadLength > 125) { + $opcodeLength = self::PAYLOAD_LENGTH_16BIT; + array_push($frame, pack('n', $payloadLength)); + } else { + $opcodeLength = $payloadLength; + } + + // Set masked mode + $frame[1] |= self::MASKED_BIT * (bool) $masked; + $frame[1] |= self::PAYLOAD_LENGTH_MASK & $opcodeLength; + + // Handle masking + if ($masked) { + // generate a random mask: + $mask = ''; + for ($i = 0; $i < 4; ++$i) { + $mask .= chr(rand(0, 255)); + } + array_push($frame, $mask); + + for ($i = 0; $i < $payloadLength; ++$i) { + $payload[$i] = $payload[$i] ^ $mask[$i & 3]; + } + } + + // Append payload to frame + array_push($frame, $payload); + + $this->write($frame); + } + + /** + * @param bool $try + * + * @return bool|string|null + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + * @throws \Overtrue\WebSocket\Exceptions\InvalidOpcodeException + */ + public function receive(bool $try = false) + { + $response = null; + + do { + $response = $this->receiveFragment(); + } while (is_null($response) && !$try); + + return $response; + } + + /** + * @return string|null + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function receiveFragmentHeader() + { + $minSize = 2; + $minRemain = $minSize - strlen($this->unparsedFragment); + + if ($this->willBlock($minRemain)) { + return null; + } + + $this->unparsedFragment .= $this->read($minRemain); + + $payloadLength = ord($this->unparsedFragment[1]) & 127; // Bits 1-7 in byte 1 + + switch ($payloadLength) { + default: + return $this->unparsedFragment; + case self::PAYLOAD_LENGTH_16BIT: + $extraHeaderBytes = 2; + break; + case self::PAYLOAD_LENGTH_64BIT: + $extraHeaderBytes = 8; + break; + } + + $extraRemain = $minSize + $extraHeaderBytes - strlen($this->unparsedFragment); + + if ($this->willBlock($extraRemain)) { + return null; + } + + $this->unparsedFragment .= $this->read($extraRemain); + + return $this->unparsedFragment; + } + + /** + * @return bool|string|null + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + * @throws \Overtrue\WebSocket\Exceptions\InvalidOpcodeException + */ + protected function receiveFragment() + { + $data = $this->receiveFragmentHeader(); + + // Buffer not ready for header + if (null === $data) { + return null; + } + + // Is this the final fragment? // Bit 0 in byte 0 + /// @todo Handle huge payloads with multiple fragments. + $final = ord($data[0]) & self::FINAL_BIT; + + // Should be zero + $rsv = ord($data[0]) & ~self::FIRST_BYTE_MASK; + + if (0 !== $rsv) { + throw new ConnectionException('Reserved bits should be zero'); + } + + // Parse opcode + $opcodeId = ord($data[0]) & self::OPCODE_MASK; + $opcodes = array_flip(self::OPCODES); + + if (!array_key_exists($opcodeId, $opcodes)) { + throw new ConnectionException("Bad opcode in websocket frame: $opcodeId"); + } + + $opcode = $opcodes[$opcodeId]; + + // record the opcode if we are not receiving a continuation fragment + if ('continuation' !== $opcode) { + $this->lastOpcode = $opcode; + } + + // Masking? + $mask = ord($data[1]) & self::MASKED_BIT; + + $payload = ''; + + // Payload length + $payloadLength = ord($data[1]) & self::PAYLOAD_LENGTH_MASK; + + if ($payloadLength > 125) { + // 126: 'n' means big-endian 16-bit unsigned int + // 127: 'J' means big-endian 64-bit unsigned int + $unpackMode = self::PAYLOAD_LENGTH_16BIT === $payloadLength ? 'n' : 'J'; + $unpacked = unpack($unpackMode, substr($data, 2)); + $payloadLength = current($unpacked); + } + + // Try again later when fragment is downloaded + if ($this->willBlock($mask * 4 + $payloadLength)) { + return null; + } + + // Enter fragment reading state + $this->unparsedFragment = ''; + + // Get masking key. + if ($mask) { + $maskingKey = $this->read(4); + } + + // Get the actual payload, if any (might not be for e.g. close frames. + if ($payloadLength > 0) { + $data = $this->read($payloadLength); + + if ($mask) { + // Unmask payload. + for ($i = 0; $i < $payloadLength; ++$i) { + $data[$i] = $data[$i] ^ $maskingKey[$i & 3]; + } + } + + $payload = $data; + } + + if ('close' === $opcode) { + // Get the close status. + if ($payloadLength >= 2) { + $status = current(unpack('n', $payload)); // read 16-bit short + + $this->closeStatus = $status; + $payload = substr($payload, 2); + $statusBin = $payload[0].$payload[1]; + + if (!$this->closing) { + $this->send($statusBin.'Close acknowledged: '.$status, 'close', true); // Respond. + } + } + + if ($this->closing) { + $this->closing = false; // A close response, all done. + } + + // And close the socket. + fclose($this->socket); + $this->connected = false; + } + + // if this is not the last fragment, then we need to save the payload + if (!$final) { + $this->hugePayload .= $payload; + + return null; + } elseif ($this->hugePayload) { // this is the last fragment, and we are processing a hugePayload + // sp we need to retreive the whole payload + $payload = $this->hugePayload .= $payload; + $this->hugePayload = ''; + } + + return $payload; + } + + /** + * Tell the socket to close. + * + * @param int $status http://tools.ietf.org/html/rfc6455#section-7.4 + * @param string $message a closing message, max 125 bytes + * + * @return bool|string|null + * + * @throws \Overtrue\WebSocket\Exceptions\InvalidOpcodeException + */ + public function close($status = 1000, $message = 'ttfn') + { + $this->send(pack('n', $status).$message, 'close', true); + + $this->closing = true; + + return $this->receive(); // Receiving a close frame will close the socket now. + } + + /** + * @param array|string $data + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function write($data) + { + // Array contains binary data and split-ed bytes + if (is_array($data)) { + foreach ($data as $part) { + $this->write($part); + } + + return; + } + + // If it is not binary data, then it is byte + if (!is_string($data)) { + $data = pack('C', $data); + } + + $written = fwrite($this->socket, $data); + + if ($written < strlen($data)) { + throw new ConnectionException( + "Could only write $written out of ".strlen($data).' bytes.' + ); + } + } + + /** + * @param int $length + * + * @return bool|string + * + * @throws \Overtrue\WebSocket\Exceptions\ConnectionException + */ + protected function read(int $length) + { + $data = &$this->socketBuffer; + + while (strlen($data) < $length) { + $buffer = fread($this->socket, $length - strlen($data)); + + if (false === $buffer) { + $metadata = stream_get_meta_data($this->socket); + throw new ConnectionException( + 'Broken frame, read '.strlen($data).' of stated ' + .$length.' bytes. Stream state: ' + .json_encode($metadata) + ); + } + + if ('' === $buffer) { + $metadata = stream_get_meta_data($this->socket); + throw new ConnectionException( + 'Empty read; connection dead? Stream state: '.json_encode($metadata) + ); + } + $data .= $buffer; + } + + $return = substr($data, 0, $length); + $data = substr($data, $length); + + return $return; + } + + /** + * @param int $length + * + * @return bool + */ + protected function bufferize(int $length) + { + while (1) { + $bufferLength = strlen($this->socketBuffer); + $remain = $length - $bufferLength; + + if ($remain <= 0) { + return true; + } + + $fetched = fread($this->socket, $remain); + + if (false === $fetched) { + break; + } + + if (0 == strlen($fetched)) { + break; + } + + $this->socketBuffer .= $fetched; + } + + return false; + } + + /** + * @param int $length + * + * @return bool + */ + protected function willBlock(int $length) + { + return !$this->bufferize($length); + } +} diff --git a/tests/.gitkeep b/tests/.gitkeep new file mode 100644 index 0000000..e69de29