Skip to content

Commit

Permalink
feat: implement message pack serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
cydrickn committed Jun 24, 2024
1 parent 5979dd7 commit 84b8482
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 19 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ That will now run the server
- **Scalable** - Designed for Horizontal Scalability.
- **WAMP Basic Profile Features** - This project implements most of the basic profile features in WAMP v2.
- **Websocket Transport** - Currently the project only implements websocket transport.
- **JSON Serializer** - JSON serializer is the only available, messagepack and CBOR implementation still on development.
- **Message Serializer** - Accepts JSON and MessagePack.

### Message Serializer

- JSON
- MessagePack

## Basic Profile Feature Support

Expand Down Expand Up @@ -114,7 +119,6 @@ The current version does not support Advance Profile Features

## TODOs

- [ ] Implement MessagePack Serializer https://wamp-proto.org/wamp_bp_latest_ietf.html#name-serializers
- [ ] Implement CBOR Serializer https://wamp-proto.org/wamp_bp_latest_ietf.html#name-serializers
- [ ] Implement Advance Profile
- [ ] Remove Dependencies from Thruway Common
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"voryx/thruway": "^0.4.2",
"symfony/event-dispatcher": "^2.8",
"octamp/client": "^1.3",
"symfony/dotenv": "^7.1"
"symfony/dotenv": "^7.1",
"rybakit/msgpack": "^0.9.1"
},
"require-dev": {
"openswoole/ide-helper": "^22.1"
Expand Down
43 changes: 43 additions & 0 deletions src/Helper/SerializerHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

namespace Octamp\Wamp\Helper;

use Octamp\Wamp\Serializer\JsonSerializer;
use Octamp\Wamp\Serializer\MessagePackSerializer;
use Octamp\Wamp\Serializer\WampMessageSerializerInterface;

class SerializerHelper
{
public static function getSerializer(string $protocol): ?WampMessageSerializerInterface
{
if ($protocol === 'wamp.2.json') {
return new JsonSerializer();
}

if ($protocol === 'wamp.2.msgpack') {
return new MessagePackSerializer();
}

return null;
}

public static function supportedProtocols(): array
{
return [
'wamp.2.msgpack',
'wamp.2.json',
];
}

public static function getFirstSupportedProtocols(array $userProtocols): ?string
{
$supportedProtocols = static::supportedProtocols();
foreach ($supportedProtocols as $protocol) {
if (in_array($protocol, $userProtocols)) {
return trim($protocol);
}
}

return null;
}
}
8 changes: 8 additions & 0 deletions src/Serializer/DeserializationException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Octamp\Wamp\Serializer;

class DeserializationException extends \Exception
{

}
33 changes: 33 additions & 0 deletions src/Serializer/JsonSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Octamp\Wamp\Serializer;

use OpenSwoole\WebSocket\Server;
use Thruway\Message\Message;

class JsonSerializer implements WampMessageSerializerInterface
{
public function serialize(Message $msg): string
{
return json_encode($msg);
}

public function deserialize(string $serializedData): Message
{
if (null === ($data = @json_decode($serializedData))) {
throw new DeserializationException("Error decoding json \"" . $serializedData . "\"");
}

return Message::createMessageFromArray($data);
}

public function protocolName(): string
{
return 'wamp.2.json';
}

public function opcode(): int
{
return Server::WEBSOCKET_OPCODE_TEXT;
}
}
37 changes: 37 additions & 0 deletions src/Serializer/MessagePackSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Octamp\Wamp\Serializer;

use MessagePack\MessagePack;
use MessagePack\Packer;
use OpenSwoole\WebSocket\Server;
use Thruway\Message\Message;

class MessagePackSerializer implements WampMessageSerializerInterface
{

public function serialize(Message $msg): string
{
$data = json_encode($msg->getMessageParts());
$data = json_decode($data, true);

return MessagePack::pack($data);
}

public function deserialize(string $serializedData): Message
{
$data = MessagePack::unpack($serializedData);

return Message::createMessageFromArray($data);
}

public function protocolName(): string
{
return 'wamp.2.msgpack';
}

public function opcode(): int
{
return Server::WEBSOCKET_OPCODE_BINARY;
}
}
12 changes: 12 additions & 0 deletions src/Serializer/SerializerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

namespace Octamp\Wamp\Serializer;

use Thruway\Message\Message;

interface SerializerInterface
{
public function serialize(Message $msg): string;

public function deserialize(string $serializedData): Message;
}
10 changes: 10 additions & 0 deletions src/Serializer/WampMessageSerializerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Octamp\Wamp\Serializer;

interface WampMessageSerializerInterface extends SerializerInterface
{
public function protocolName(): string;

public function opcode(): int;
}
1 change: 1 addition & 0 deletions src/Session/Adapter/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public function saveSession(Session $session): void
'trusted' => $session->isTrusted(),
'transportClass' => get_class($session->getTransport()),
'serializerClass' => get_class($session->getTransport()->getSerializer()),
'websocketProtocol' => $session->getTransport()->getSerializer()->protocolName(),
'serverId' => $session->getServerId(),
];

Expand Down
22 changes: 17 additions & 5 deletions src/Session/SessionStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
namespace Octamp\Wamp\Session;

use Octamp\Server\Connection\ConnectionStorage;
use Octamp\Wamp\Helper\SerializerHelper;
use Octamp\Wamp\Realm\RealmManager;
use Octamp\Wamp\Serializer\JsonSerializer;
use Octamp\Wamp\Session\Adapter\AdapterInterface;
use Octamp\Wamp\Transport\AbstractTransport;
use Octamp\Wamp\Transport\DummyTransport;
use Octamp\Wamp\Transport\OctampTransport;
use Thruway\Serializer\JsonSerializer;

class SessionStorage
{
Expand Down Expand Up @@ -58,12 +59,23 @@ public function createFromArray(array $data): ?Session

$transportClass = $data['transportClass'];
$serializerClass = $data['serializerClass'] ?? null;
$websocketProtocol = $data['websocketProtocol'] ?? null;
/** @var OctampTransport $transport */
$transport = new $transportClass($connection);
if ($serializerClass === null) {
$transport->setSerializer(new JsonSerializer());
} else {
$transport->setSerializer(new $serializerClass());

if ($websocketProtocol !== null) {
$serializer = SerializerHelper::getSerializer($websocketProtocol);
if ($serializer !== null) {
$transport->setSerializer($serializer);
}
}

if ($transport->getSerializer() === null) {
if ($serializerClass === null) {
$transport->setSerializer(new JsonSerializer());
} else {
$transport->setSerializer(new $serializerClass());
}
}

$realm = $this->realmManager->getRealm($data['realm']);
Expand Down
8 changes: 4 additions & 4 deletions src/Transport/AbstractTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

use Octamp\Client\Promise\Promise;
use Octamp\Server\Connection\Connection;
use Octamp\Wamp\Serializer\WampMessageSerializerInterface;
use OpenSwoole\WebSocket\Frame;
use Thruway\Exception\PingNotSupportedException;
use Thruway\Serializer\SerializerInterface;

abstract class AbstractTransport
{
protected ?SerializerInterface $serializer = null;
protected ?WampMessageSerializerInterface $serializer = null;

/*
* @var boolean
Expand Down Expand Up @@ -49,12 +49,12 @@ public function setTrusted(bool $trusted): void
$this->trusted = $trusted;
}

public function setSerializer(SerializerInterface $serializer): void
public function setSerializer(WampMessageSerializerInterface $serializer): void
{
$this->serializer = $serializer;
}

public function getSerializer(): SerializerInterface
public function getSerializer(): ?WampMessageSerializerInterface
{
return $this->serializer;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Transport/OctampTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public function getTransportDetails(): array

public function sendMessage(Message $msg): void
{
$this->connection->send($this->getSerializer()->serialize($msg));
$this->connection->send($this->getSerializer()->serialize($msg), $this->getSerializer()->opcode());
}

public function close(): void
Expand Down
42 changes: 41 additions & 1 deletion src/Transport/OctampTransportProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Octamp\Wamp\Config\TransportProviderConfig;
use Octamp\Wamp\Event\ConnectionOpenEvent;
use Octamp\Wamp\Peers\Router;
use OpenSwoole\Http\Request;
use OpenSwoole\Http\Response;
use OpenSwoole\WebSocket\Frame;
use OpenSwoole\WebSocket\Server as WebsocketServer;

Expand All @@ -24,13 +26,51 @@ public function __construct(private readonly TransportProviderConfig $config)
{
$this->websocketServer = Server::createWebsocketServer($this->config->host, $this->config->port, [
'worker_num' => $this->config->workerNum,
'websocket_subprotocol' => 'wamp.2.json',
'open_websocket_close_frame' => true,
'open_websocket_ping_frame' => true,
'open_websocket_pong_frame' => true,
// "enable_reuse_port" => true,
]);
$this->server = new Server($this->websocketServer);
$this->websocketServer->on('handshake', function (Request $request, Response $response)
{
$secWebSocketKey = $request->header['sec-websocket-key'];
$patten = '#^[+/0-9A-Za-z]{21}[AQgw]==$#';

// At this stage if the socket request does not meet custom requirements, you can ->end() it here and return false...

// Websocket handshake connection algorithm verification
if (0 === preg_match($patten, $secWebSocketKey) || 16 !== strlen(base64_decode($secWebSocketKey)))
{
$response->end();
return false;
}

$key = base64_encode(sha1($request->header['sec-websocket-key'] . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true));

$headers = [
'Upgrade' => 'websocket',
'Connection' => 'Upgrade',
'Sec-WebSocket-Accept' => $key,
'Sec-WebSocket-Version' => '13',
];

foreach($headers as $key => $val)
{
$response->header($key, $val);
}

$this->server->dispatch('handshake', $request, $response);

$response->status(101);
$response->end();

$this->getWebsocketServer()->defer(function () use ($request) {
call_user_func($this->getWebsocketServer()->getCallback('Open'), $this->getWebsocketServer(), $request);
});

return true;
});
}

public function start(): void
Expand Down
Loading

0 comments on commit 84b8482

Please sign in to comment.