diff --git a/README.md b/README.md index c832ba1..58098ea 100644 --- a/README.md +++ b/README.md @@ -72,7 +72,12 @@ That will now run the server - **Scalable** - Designed for Horizontal Scalability. - **WAMP Basic Profile Features** - This project implements most of the basic profile features in WAMP v2. - **Websocket Transport** - Currently the project only implements websocket transport. -- **JSON Serializer** - JSON serializer is the only available, messagepack and CBOR implementation still on development. +- **Message Serializer** - Accepts JSON and MessagePack. + +### Message Serializer + +- JSON +- MessagePack ## Basic Profile Feature Support @@ -114,7 +119,6 @@ The current version does not support Advance Profile Features ## TODOs -- [ ] Implement MessagePack Serializer https://wamp-proto.org/wamp_bp_latest_ietf.html#name-serializers - [ ] Implement CBOR Serializer https://wamp-proto.org/wamp_bp_latest_ietf.html#name-serializers - [ ] Implement Advance Profile - [ ] Remove Dependencies from Thruway Common diff --git a/composer.json b/composer.json index cb5f3e7..ed69de2 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,8 @@ "voryx/thruway": "^0.4.2", "symfony/event-dispatcher": "^2.8", "octamp/client": "^1.3", - "symfony/dotenv": "^7.1" + "symfony/dotenv": "^7.1", + "rybakit/msgpack": "^0.9.1" }, "require-dev": { "openswoole/ide-helper": "^22.1" diff --git a/src/Helper/SerializerHelper.php b/src/Helper/SerializerHelper.php new file mode 100644 index 0000000..20e7022 --- /dev/null +++ b/src/Helper/SerializerHelper.php @@ -0,0 +1,43 @@ +getMessageParts()); + $data = json_decode($data, true); + + return MessagePack::pack($data); + } + + public function deserialize(string $serializedData): Message + { + $data = MessagePack::unpack($serializedData); + + return Message::createMessageFromArray($data); + } + + public function protocolName(): string + { + return 'wamp.2.msgpack'; + } + + public function opcode(): int + { + return Server::WEBSOCKET_OPCODE_BINARY; + } +} \ No newline at end of file diff --git a/src/Serializer/SerializerInterface.php b/src/Serializer/SerializerInterface.php new file mode 100644 index 0000000..d9765ee --- /dev/null +++ b/src/Serializer/SerializerInterface.php @@ -0,0 +1,12 @@ + $session->isTrusted(), 'transportClass' => get_class($session->getTransport()), 'serializerClass' => get_class($session->getTransport()->getSerializer()), + 'websocketProtocol' => $session->getTransport()->getSerializer()->protocolName(), 'serverId' => $session->getServerId(), ]; diff --git a/src/Session/SessionStorage.php b/src/Session/SessionStorage.php index f77ceec..20b6303 100644 --- a/src/Session/SessionStorage.php +++ b/src/Session/SessionStorage.php @@ -3,12 +3,13 @@ namespace Octamp\Wamp\Session; use Octamp\Server\Connection\ConnectionStorage; +use Octamp\Wamp\Helper\SerializerHelper; use Octamp\Wamp\Realm\RealmManager; +use Octamp\Wamp\Serializer\JsonSerializer; use Octamp\Wamp\Session\Adapter\AdapterInterface; use Octamp\Wamp\Transport\AbstractTransport; use Octamp\Wamp\Transport\DummyTransport; use Octamp\Wamp\Transport\OctampTransport; -use Thruway\Serializer\JsonSerializer; class SessionStorage { @@ -58,12 +59,23 @@ public function createFromArray(array $data): ?Session $transportClass = $data['transportClass']; $serializerClass = $data['serializerClass'] ?? null; + $websocketProtocol = $data['websocketProtocol'] ?? null; /** @var OctampTransport $transport */ $transport = new $transportClass($connection); - if ($serializerClass === null) { - $transport->setSerializer(new JsonSerializer()); - } else { - $transport->setSerializer(new $serializerClass()); + + if ($websocketProtocol !== null) { + $serializer = SerializerHelper::getSerializer($websocketProtocol); + if ($serializer !== null) { + $transport->setSerializer($serializer); + } + } + + if ($transport->getSerializer() === null) { + if ($serializerClass === null) { + $transport->setSerializer(new JsonSerializer()); + } else { + $transport->setSerializer(new $serializerClass()); + } } $realm = $this->realmManager->getRealm($data['realm']); diff --git a/src/Transport/AbstractTransport.php b/src/Transport/AbstractTransport.php index cd3a1f4..72c29ab 100644 --- a/src/Transport/AbstractTransport.php +++ b/src/Transport/AbstractTransport.php @@ -4,13 +4,13 @@ use Octamp\Client\Promise\Promise; use Octamp\Server\Connection\Connection; +use Octamp\Wamp\Serializer\WampMessageSerializerInterface; use OpenSwoole\WebSocket\Frame; use Thruway\Exception\PingNotSupportedException; -use Thruway\Serializer\SerializerInterface; abstract class AbstractTransport { - protected ?SerializerInterface $serializer = null; + protected ?WampMessageSerializerInterface $serializer = null; /* * @var boolean @@ -49,12 +49,12 @@ public function setTrusted(bool $trusted): void $this->trusted = $trusted; } - public function setSerializer(SerializerInterface $serializer): void + public function setSerializer(WampMessageSerializerInterface $serializer): void { $this->serializer = $serializer; } - public function getSerializer(): SerializerInterface + public function getSerializer(): ?WampMessageSerializerInterface { return $this->serializer; } diff --git a/src/Transport/OctampTransport.php b/src/Transport/OctampTransport.php index 59b9e37..05d2072 100644 --- a/src/Transport/OctampTransport.php +++ b/src/Transport/OctampTransport.php @@ -25,7 +25,7 @@ public function getTransportDetails(): array public function sendMessage(Message $msg): void { - $this->connection->send($this->getSerializer()->serialize($msg)); + $this->connection->send($this->getSerializer()->serialize($msg), $this->getSerializer()->opcode()); } public function close(): void diff --git a/src/Transport/OctampTransportProvider.php b/src/Transport/OctampTransportProvider.php index ffe2a19..45ae7bd 100644 --- a/src/Transport/OctampTransportProvider.php +++ b/src/Transport/OctampTransportProvider.php @@ -9,6 +9,8 @@ use Octamp\Wamp\Config\TransportProviderConfig; use Octamp\Wamp\Event\ConnectionOpenEvent; use Octamp\Wamp\Peers\Router; +use OpenSwoole\Http\Request; +use OpenSwoole\Http\Response; use OpenSwoole\WebSocket\Frame; use OpenSwoole\WebSocket\Server as WebsocketServer; @@ -24,13 +26,51 @@ public function __construct(private readonly TransportProviderConfig $config) { $this->websocketServer = Server::createWebsocketServer($this->config->host, $this->config->port, [ 'worker_num' => $this->config->workerNum, - 'websocket_subprotocol' => 'wamp.2.json', 'open_websocket_close_frame' => true, 'open_websocket_ping_frame' => true, 'open_websocket_pong_frame' => true, // "enable_reuse_port" => true, ]); $this->server = new Server($this->websocketServer); + $this->websocketServer->on('handshake', function (Request $request, Response $response) + { + $secWebSocketKey = $request->header['sec-websocket-key']; + $patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#'; + + // At this stage if the socket request does not meet custom requirements, you can ->end() it here and return false... + + // Websocket handshake connection algorithm verification + if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey))) + { + $response->end(); + return false; + } + + $key = base64_encode(sha1($request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)); + + $headers = [ + 'Upgrade' => 'websocket', + 'Connection' => 'Upgrade', + 'Sec-WebSocket-Accept' => $key, + 'Sec-WebSocket-Version' => '13', + ]; + + foreach($headers as $key => $val) + { + $response->header($key, $val); + } + + $this->server->dispatch('handshake', $request, $response); + + $response->status(101); + $response->end(); + + $this->getWebsocketServer()->defer(function () use ($request) { + call_user_func($this->getWebsocketServer()->getCallback('Open'), $this->getWebsocketServer(), $request); + }); + + return true; + }); } public function start(): void diff --git a/src/Wamp.php b/src/Wamp.php index b544f4d..051cdff 100644 --- a/src/Wamp.php +++ b/src/Wamp.php @@ -9,16 +9,21 @@ use Octamp\Wamp\Adapter\AdapterInterface; use Octamp\Wamp\Config\TransportProviderConfig; use Octamp\Wamp\Helper\IDHelper; +use Octamp\Wamp\Helper\SerializerHelper; use Octamp\Wamp\Peers\Router; use Octamp\Wamp\Realm\RealmManager; use Octamp\Wamp\Roles\Broker; use Octamp\Wamp\Roles\Dealer; +use Octamp\Wamp\Serializer\JsonSerializer; +use Octamp\Wamp\Serializer\MessagePackSerializer; +use Octamp\Wamp\Serializer\WampMessageSerializerInterface; use Octamp\Wamp\Session\SessionStorage; use Octamp\Wamp\Transport\OctampTransport; use Octamp\Wamp\Transport\OctampTransportProvider; use Octamp\Wamp\Transport\TransportProviderInterface; +use OpenSwoole\Http\Request; +use OpenSwoole\Http\Response; use OpenSwoole\WebSocket\Frame; -use Thruway\Serializer\JsonSerializer; class Wamp { @@ -52,7 +57,6 @@ public function init(): void $transportProvider->getServer()->on('afterStart', function (Server $server) { $this->serverId = $server->getServerId(); - $sessionAdapter = new \Octamp\Wamp\Session\Adapter\RedisAdapter($this->adapter); IDHelper::setAdapter($this->adapter); IDHelper::setSessionAdapter($sessionAdapter); @@ -70,9 +74,30 @@ public function init(): void $this->realmManager->addRealm($realm); }); + $transportProvider->getServer()->on('handshake', function (Server $server, Request $request, Response $response) { + $protocolString = $request->header['sec-websocket-protocol'] ?? ''; + if ($protocolString === '') { + return; + } + + $protocols = array_map(function ($protocol) { + return trim($protocol); + }, explode(',', $protocolString)); + + $protocol = SerializerHelper::getFirstSupportedProtocols($protocols); + if ($protocol !== null) { + $response->header('Sec-Websocket-Protocol', $protocol); + } + }); + $transportProvider->getServer()->on('open', function (Server $server, Connection $connection) { $transport = new OctampTransport($connection); - $transport->setSerializer(new JsonSerializer()); + $serializer = $this->getSerializer($connection->getRequest()); + if ($serializer === null) { + $connection->close(); + return; + } + $transport->setSerializer($serializer); $session = $this->realmManager->generateSession($transport); $this->realmManager->saveSession($session); }); @@ -105,7 +130,6 @@ public function init(): void return; } - $session = null; $retry = 0; $maxRetry = 10; @@ -126,7 +150,7 @@ public function init(): void if ($frame->opcode === \OpenSwoole\WebSocket\Server::WEBSOCKET_OPCODE_PONG) { $session->getTransport()->onPong($frame); - } elseif ($frame->opcode === \OpenSwoole\WebSocket\Server::WEBSOCKET_OPCODE_TEXT) { + } elseif ($frame->opcode === \OpenSwoole\WebSocket\Server::WEBSOCKET_OPCODE_TEXT || $frame->opcode === \OpenSwoole\WebSocket\Server::WEBSOCKET_OPCODE_BINARY) { $message = $session->getTransport()->getSerializer()->deserialize($frame->data); $this->realmManager->dispatch($session, $message); } @@ -137,4 +161,23 @@ public function run(): void { $this->transportProviders[0]->start(); } + + protected function getSerializer(Request $request): ?WampMessageSerializerInterface + { + $protocolString = $request->header['sec-websocket-protocol'] ?? ''; + if ($protocolString === '') { + return null; + } + + $protocols = array_map(function ($protocol) { + return trim($protocol); + }, explode(',', $protocolString)); + + $protocolName = SerializerHelper::getFirstSupportedProtocols($protocols); + if ($protocolName === null) { + return null; + } + + return SerializerHelper::getSerializer($protocolName); + } } \ No newline at end of file