Skip to content

Commit

Permalink
feat: add progressive call
Browse files Browse the repository at this point in the history
  • Loading branch information
cydrickn committed Jul 22, 2024
1 parent b1e518e commit 7df77f2
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 49 deletions.
34 changes: 20 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,23 @@ That will now run the server

### RPC Features

| Feature | Status |
|----------------------------------|---------|
| Progressive Call Results | ✗ |
| Progressive Call Invocations | ✗ |
| Call Timeout | ✗ |
| Call Canceling | ✗ |
| Caller Identification | ✗ |
| Call Trustlevels | ✗ |
| Registration Meta API | ✗ |
| Pattern-based Registration | ✗ |
| Shared Registration | ✗ |
| Sharded Registration | ✗ |
| Registration Revocation | ✗ |
| (Interface) Procedure Reflection | ✗ |
| Feature | Status |
|------------------------------------------------|---------|
| Progressive Call Results | ✓ |
| Ignoring Requests for Progressive Call Results | ✗ |
| Progressive Call Results with Timeout | ✗ |
| Progressive Call Invocations | ✗ |
| Call Timeout | ✗ |
| Call Canceling | ✗ |
| Call Re-Routing | ✗ |
| Caller Identification | ✗ |
| Call Trustlevels | ✗ |
| Registration Meta API | ✗ |
| Pattern-based Registration | ✗ |
| Shared Registration | ✓ |
| Sharded Registration | ✗ |
| Registration Revocation | ✗ |
| (Interface) Procedure Reflection | ✗ |


### PubSub Features
Expand All @@ -161,6 +164,9 @@ That will now run the server
| Pattern-based Subscription | ✓ |
| Sharded Subscription | ✗ |
| Event History | ✗ |
| Event Retention | ✗ |
| Subscription Revocation | ✗ |
| Session Testament | ✗ |
| (Interface) Topic Reflection | ✗ |

### Others
Expand Down
29 changes: 4 additions & 25 deletions src/Registration/Call.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ class Call
*/
private ?CancelMessage $cancelMessage = null;

/**
* @var boolean
*/
private bool $isProgressive = false;

/**
* @var Registration
*/
Expand Down Expand Up @@ -286,22 +281,16 @@ public function getInvocationMessage(): InvocationMessage
}
}

// TODO: check to see if callee supports progressive call
$callOptions = $this->getCallMessage()->getOptions();
$isProgressive = false;
if (is_object($callOptions) && isset($callOptions->receive_progress) && $callOptions->receive_progress) {
$calleeSupportReceiveProgress = $this->calleeSession->hasFeature('callee', 'progressive_call_results');
if ($calleeSupportReceiveProgress && $this->isProgressive()) {
$details = array_merge($details, ["receive_progress" => true]);
$isProgressive = true;
}

// if nothing was added to details - change ot stdClass so it will serialize correctly
if (count($details) == 0) {
$details = new \stdClass();
}
$invocationMessage->setDetails($details);

$this->setIsProgressive($isProgressive);

$this->setInvocationMessage($invocationMessage);
}

Expand All @@ -318,24 +307,14 @@ public function setInvocationMessage(?InvocationMessage $invocationMessage): voi
$this->invocationMessage = $invocationMessage;
}

/**
* update state is progressive
*
* @param boolean $isProgressive
*/
public function setIsProgressive(bool $isProgressive): void
{
$this->isProgressive = $isProgressive;
}

/**
* Get state is progressive
*
* @return boolean
*/
public function getIsProgressive(): bool
{
return $this->isProgressive;
return $this->isProgressive();
}

/**
Expand All @@ -345,7 +324,7 @@ public function getIsProgressive(): bool
*/
public function isProgressive(): bool
{
return $this->isProgressive;
return $this->callMessage?->getOptions()?->receive_progress ?? false;
}

/**
Expand Down
9 changes: 5 additions & 4 deletions src/Registration/Registration.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class Registration
{

private string $id;
private string|int $id;

private Session $session;

Expand Down Expand Up @@ -69,9 +69,9 @@ class Registration
private int $maxSimultaneousCalls;

/**
* @var int
* @var float
*/
private int $invocationAverageTime;
private float $invocationAverageTime;

/**
* @var null|\DateTime
Expand Down Expand Up @@ -235,6 +235,7 @@ public function processCall(Call $call): void
'calleeTransportId' => $call->getCalleeSession()->getTransportId(),
'invocationId' => $invocationMessage->getRequestId(),
'registrationId' => $invocationMessage->getRegistrationId(),
'isProgressive' => $call->isProgressive(),
'hasResponse' => false,
'hasSentResult' => false,
]);
Expand Down Expand Up @@ -336,7 +337,7 @@ public function errorAllPendingCalls(): void
}
}

public function setId(int $id): void
public function setId(string|int $id): void
{
$this->id = $id;
}
Expand Down
26 changes: 21 additions & 5 deletions src/Roles/Dealer.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public function onCallMessage(Session $session, CallMessage $message): void

public function onYieldMessage(Session $session, YieldMessage $message): void
{
$details = new \stdClass();

$invocationKey = Registration::generateKeyForInvocation('*', $session->getSessionId(), '*', $message->getRequestId());
$invocationDetails = $this->adapter->findOne($invocationKey);

Expand All @@ -76,15 +74,29 @@ public function onYieldMessage(Session $session, YieldMessage $message): void
$message->getRequestId()
);

$callerSession = $this->sessionStorage->getSessionUsingTransportId($invocationDetails['callTransportId']);

$isProgress = $message->getOptions()?->progress ?? false;
$callIsProgressive = $invocationDetails['isProgressive'] ?? false;
if ($isProgress && $callIsProgressive && $callerSession->hasFeature('caller', 'progressive_call_results')) {
$resultMessage = new ResultMessage(
(int) $invocationDetails['callRequestId'],
['progress' => true],
$message->getArguments(),
$message->getArgumentsKw()
);
$callerSession?->sendMessage($resultMessage);
return;
}

$this->adapter->setField($invocationKey, 'hasResponse', true);
$resultMessage = new ResultMessage(
(int) $invocationDetails['callRequestId'],
$details,
[],
$message->getArguments(),
$message->getArgumentsKw()
);

$callerSession = $this->sessionStorage->getSessionUsingTransportId($invocationDetails['callTransportId']);
$callerSession?->sendMessage($resultMessage);
$this->adapter->setField($invocationKey, 'hasSentResult', true);

Expand Down Expand Up @@ -285,6 +297,10 @@ public function getName(): string

public function getFeatures(): object
{
return new \stdClass();
$features = new \stdClass();
$features->shared_registration = true;
$features->progressive_call_results = true;

return $features;
}
}
1 change: 1 addition & 0 deletions src/Session/Adapter/RedisAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public function saveSession(Session $session): void
'serializerClass' => get_class($session->getTransport()->getSerializer()),
'websocketProtocol' => $session->getTransport()->getSerializer()->protocolName(),
'serverId' => $session->getServerId(),
'helloMessage' => $session->getHelloMessage()->getMessageParts(),
];

$this->adapter->set($this->getKeyBySession($session), $details);
Expand Down
10 changes: 10 additions & 0 deletions src/Session/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ public function getRoleFeatures(): array
return $this->getHelloMessage()->getDetails()?->roles ?? [];
}

public function hasFeature(string $role, string $feature): bool
{
$roles = $this->getRoleFeatures();
if (!isset($roles[$role])) {
return false;
}

return $roles[$role]->features->{$feature} ?? false;
}

public function incPendingCallCount(): int
{
return $this->pendingCallCount++;
Expand Down
8 changes: 8 additions & 0 deletions src/Session/SessionStorage.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Octamp\Wamp\Transport\AbstractTransport;
use Octamp\Wamp\Transport\DummyTransport;
use Octamp\Wamp\Transport\OctampTransport;
use Thruway\Message\HelloMessage;

class SessionStorage
{
Expand Down Expand Up @@ -92,6 +93,13 @@ public function createFromArray(array $data): ?Session
$session->setRealm($realm);
$session->setTrusted($data['trusted']);

if(isset($data['helloMessage'])) {
$helloMessage = HelloMessage::createMessageFromArray($data['helloMessage']);
if ($helloMessage instanceof HelloMessage) {
$session->setHelloMessage($helloMessage);
}
}

return $session;
}

Expand Down
11 changes: 10 additions & 1 deletion src/Wamp.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Octamp\Wamp\Serializer\JsonSerializer;
use Octamp\Wamp\Serializer\MessagePackSerializer;
use Octamp\Wamp\Serializer\WampMessageSerializerInterface;
use Octamp\Wamp\Session\Session;
use Octamp\Wamp\Session\SessionStorage;
use Octamp\Wamp\Transport\OctampTransport;
use Octamp\Wamp\Transport\OctampTransportProvider;
Expand All @@ -31,6 +32,7 @@
use OpenSwoole\WebSocket\Frame;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Thruway\Message\Message;

class Wamp
{
Expand Down Expand Up @@ -102,8 +104,15 @@ public function init(): void
$realm = $this->realmManager->createRealm('realm1', $router);
$realm->setConnection($connection);
$realm->getMetaSession();

$this->realmManager->addRealm($realm);

$this->adapter->subscribe('forward:message', function (string $serverId, string $transportId, string $data) use ($sessionStorage) {
if ($this->serverId === $serverId) {
$session = $sessionStorage->getSessionUsingTransportId($transportId);
$message = Message::createMessageFromArray(json_decode($data));
$this->realmManager->dispatch($session, $message);
}
});
});

$transportProvider->getServer()->on('handshake', function (Server $server, Request $request, Response $response) {
Expand Down

0 comments on commit 7df77f2

Please sign in to comment.