This repository has been archived by the owner on Jan 17, 2022. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 47
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(messenger): Add Symfony Messenger integration (#56)
Asynchronously dispatch messages using Symfony Messanger API and Swoole inter-process communication (Swoole task) without serialization. Notice: Symfony messenger `messenger:consume-messages` command is not supported. Dispatched messages are handled in task worker processes of Swoole server. Usage: 1. Install `symfony/messenger` package via composer 2. Enable task workers inside configuration Example: ```yaml # config/packages/swoole.yaml swoole: http_server: ... settings: ... task_worker_count: auto ``` 3. Configure swoole messenger transport Example: ```yaml # config/packages/messenger.yaml framework: messenger: transports: swoole: swoole://task routing: '*': swoole ``` 4. (optional) Follow official [symfony messenger guide](https://symfony.com/doc/current/messenger.html) to define message object and its handler Relates to #4
- Loading branch information
Showing
35 changed files
with
1,018 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
src/Bridge/Symfony/Messenger/Exception/ReceiverNotAvailableException.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace K911\Swoole\Bridge\Symfony\Messenger\Exception; | ||
|
||
use Symfony\Component\Messenger\Exception\TransportException; | ||
|
||
final class ReceiverNotAvailableException extends TransportException | ||
{ | ||
public static function make(): self | ||
{ | ||
throw new self('Swoole Server Task transport does not implement Receiver interface methods. Messages sent via Swoole Server Task transport are dispatched inside task worker processes.'); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace K911\Swoole\Bridge\Symfony\Messenger; | ||
|
||
use K911\Swoole\Bridge\Symfony\Messenger\Exception\ReceiverNotAvailableException; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; | ||
|
||
final class SwooleServerTaskReceiver implements ReceiverInterface | ||
{ | ||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function get(): iterable | ||
{ | ||
throw ReceiverNotAvailableException::make(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function ack(Envelope $envelope): void | ||
{ | ||
throw ReceiverNotAvailableException::make(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function reject(Envelope $envelope): void | ||
{ | ||
throw ReceiverNotAvailableException::make(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace K911\Swoole\Bridge\Symfony\Messenger; | ||
|
||
use K911\Swoole\Server\HttpServer; | ||
use Symfony\Component\Messenger\Envelope; | ||
use Symfony\Component\Messenger\Stamp\ReceivedStamp; | ||
use Symfony\Component\Messenger\Stamp\SentStamp; | ||
use Symfony\Component\Messenger\Transport\Sender\SenderInterface; | ||
|
||
final class SwooleServerTaskSender implements SenderInterface | ||
{ | ||
private $httpServer; | ||
|
||
public function __construct(HttpServer $httpServer) | ||
{ | ||
$this->httpServer = $httpServer; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function send(Envelope $envelope): Envelope | ||
{ | ||
/** @var SentStamp|null $sentStamp */ | ||
$sentStamp = $envelope->last(SentStamp::class); | ||
$alias = null === $sentStamp ? 'swoole-task' : $sentStamp->getSenderAlias() ?? $sentStamp->getSenderClass(); | ||
|
||
$this->httpServer->dispatchTask($envelope->with(new ReceivedStamp($alias))); | ||
|
||
return $envelope; | ||
} | ||
} |
Oops, something went wrong.