Skip to content
This repository has been archived by the owner on Jan 17, 2022. It is now read-only.

Commit

Permalink
feat(messenger): Add Symfony Messenger integration (#56)
Browse files Browse the repository at this point in the history
Asynchronously dispatch messages using Symfony Messanger API and Swoole inter-process communication (Swoole task) without serialization.

Notice: Symfony messenger `messenger:consume-messages` command is not supported. Dispatched messages are handled in task worker processes of Swoole server.

Usage:

1. Install `symfony/messenger` package via composer
2. Enable task workers inside configuration

    Example:

    ```yaml
    # config/packages/swoole.yaml
    swoole:
        http_server:
            ...
            settings:
                ...
                task_worker_count: auto
    ```
3. Configure swoole messenger transport

   Example:

   ```yaml
   # config/packages/messenger.yaml
   framework:
       messenger:
           transports:
               swoole: swoole://task
           routing:
               '*': swoole
   ```
4. (optional) Follow official [symfony messenger guide](https://symfony.com/doc/current/messenger.html) to define message object and its handler

Relates to #4
  • Loading branch information
k911 authored Jun 7, 2019
1 parent 427dd2b commit d136313
Show file tree
Hide file tree
Showing 35 changed files with 1,018 additions and 61 deletions.
27 changes: 14 additions & 13 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
}],
"require": {
"php": "^7.2",
"ext-swoole": "^4",
"ext-swoole": "^4.3.4",
"beberlei/assert": "^3.0",
"symfony/config": "^4.2",
"symfony/console": "^4.2",
"symfony/process": "^4.2",
"symfony/dependency-injection": "^4.2",
"symfony/http-foundation": "^4.2",
"symfony/http-kernel": "^4.2"
"symfony/config": "^4.3.1",
"symfony/console": "^4.3.1",
"symfony/process": "^4.3.1",
"symfony/dependency-injection": "^4.3.1",
"symfony/http-foundation": "^4.3.1",
"symfony/http-kernel": "^4.3.1"
},
"require-dev": {
"doctrine/annotations": "^1.6",
Expand All @@ -41,13 +41,14 @@
"phpunit/phpcov": "^6.0",
"phpunit/phpunit": "^8.0.4",
"swoole/ide-helper": "^4.3",
"symfony/debug": "^4.2",
"symfony/framework-bundle": "^4.2.4",
"symfony/monolog-bridge": "^4.2",
"symfony/debug": "^4.3.1",
"symfony/framework-bundle": "^4.3.1",
"symfony/messenger": "^4.3.1",
"symfony/monolog-bridge": "^4.3.1",
"symfony/monolog-bundle": "^3.3",
"symfony/twig-bundle": "^4.2",
"symfony/var-dumper": "^4.2",
"symfony/yaml": "^4.2"
"symfony/twig-bundle": "^4.3.1",
"symfony/var-dumper": "^4.3.1",
"symfony/yaml": "^4.3.1"
},
"suggest": {
"ext-inotify": "To enable HMR.",
Expand Down
82 changes: 78 additions & 4 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/Bridge/Symfony/Bundle/Command/ServerStatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private function showMetrics(SymfonyStyle $io, array $metrics): void
['Active connections', $server['connection_num'], '1'],
['Accepted connections', $server['accept_count'], '1'],
['Closed connections', $server['close_count'], '1'],
['All workers', $workers, '1'],
['Active workers', $activeWorkers, '1'],
['Idle workers', $idleWorkers, '1'],
['Running coroutines', $server['coroutine_num'], '1'],
['Tasks in queue', $server['tasking_num'], '1'],
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public function getConfigTreeBuilder(): TreeBuilder
->integerNode('reactor_count')
->min(1)
->end()
->scalarNode('task_worker_count')
->defaultNull()
->end()
->end()
->end() // settings
->end()
Expand Down
22 changes: 22 additions & 0 deletions src/Bridge/Symfony/Bundle/DependencyInjection/SwooleExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@
use K911\Swoole\Bridge\Symfony\HttpFoundation\RequestFactoryInterface;
use K911\Swoole\Bridge\Symfony\HttpFoundation\TrustAllProxiesRequestHandler;
use K911\Swoole\Bridge\Symfony\HttpKernel\DebugHttpKernelRequestHandler;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportFactory;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportHandler;
use K911\Swoole\Server\Config\Socket;
use K911\Swoole\Server\Config\Sockets;
use K911\Swoole\Server\Configurator\ConfiguratorInterface;
use K911\Swoole\Server\HttpServer;
use K911\Swoole\Server\HttpServerConfiguration;
use K911\Swoole\Server\RequestHandler\AdvancedStaticFilesServer;
use K911\Swoole\Server\RequestHandler\RequestHandlerInterface;
use K911\Swoole\Server\Runtime\BootableInterface;
use K911\Swoole\Server\Runtime\HMR\HotModuleReloaderInterface;
use K911\Swoole\Server\Runtime\HMR\InotifyHMR;
use K911\Swoole\Server\TaskHandler\TaskHandlerInterface;
use K911\Swoole\Server\WorkerHandler\HMRWorkerStartHandler;
use K911\Swoole\Server\WorkerHandler\WorkerStartHandlerInterface;
use Symfony\Component\Config\FileLocator;
Expand All @@ -28,6 +32,8 @@
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;

final class SwooleExtension extends Extension implements PrependExtensionInterface
{
Expand Down Expand Up @@ -58,6 +64,10 @@ public function load(array $configs, ContainerBuilder $container): void
$config = $this->processConfiguration($configuration, $configs);

$this->registerHttpServer($config['http_server'], $container);

if (\interface_exists(TransportFactoryInterface::class)) {
$this->registerSwooleServerTransportConfiguration($container);
}
}

/**
Expand All @@ -78,6 +88,18 @@ private function registerHttpServer(array $config, ContainerBuilder $container):
$this->registerHttpServerConfiguration($config, $container);
}

private function registerSwooleServerTransportConfiguration(ContainerBuilder $container): void
{
$container->register(SwooleServerTaskTransportFactory::class)
->addTag('messenger.transport_factory')
->addArgument(new Reference(HttpServer::class));

$container->register(SwooleServerTaskTransportHandler::class)
->addArgument(new Reference(MessageBusInterface::class))
->addArgument(new Reference(SwooleServerTaskTransportHandler::class.'.inner'))
->setDecoratedService(TaskHandlerInterface::class, null, -10);
}

private function registerHttpServerConfiguration(array $config, ContainerBuilder $container): void
{
[
Expand Down
10 changes: 10 additions & 0 deletions src/Bridge/Symfony/Bundle/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ services:
'K911\Swoole\Server\LifecycleHandler\ServerManagerStopHandlerInterface':
class: K911\Swoole\Server\LifecycleHandler\NoOpServerManagerStopHandler

'K911\Swoole\Server\TaskHandler\TaskHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskHandler

'K911\Swoole\Server\TaskHandler\TaskFinishedHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskFinishedHandler

'K911\Swoole\Server\Api\ApiServerClientFactory':

'K911\Swoole\Server\Api\ApiServerClient':
Expand Down Expand Up @@ -78,6 +84,10 @@ services:

'K911\Swoole\Server\Configurator\WithWorkerStartHandler':

'K911\Swoole\Server\Configurator\WithTaskHandler':

'K911\Swoole\Server\Configurator\WithTaskFinishedHandler':

'K911\Swoole\Server\Configurator\CallableChainConfiguratorFactory':

'K911\Swoole\Server\Api\WithApiServerConfiguration':
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger\Exception;

use Symfony\Component\Messenger\Exception\TransportException;

final class ReceiverNotAvailableException extends TransportException
{
public static function make(): self
{
throw new self('Swoole Server Task transport does not implement Receiver interface methods. Messages sent via Swoole Server Task transport are dispatched inside task worker processes.');
}
}
36 changes: 36 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskReceiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Bridge\Symfony\Messenger\Exception\ReceiverNotAvailableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class SwooleServerTaskReceiver implements ReceiverInterface
{
/**
* {@inheritdoc}
*/
public function get(): iterable
{
throw ReceiverNotAvailableException::make();
}

/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
throw ReceiverNotAvailableException::make();
}

/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
throw ReceiverNotAvailableException::make();
}
}
35 changes: 35 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Server\HttpServer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

final class SwooleServerTaskSender implements SenderInterface
{
private $httpServer;

public function __construct(HttpServer $httpServer)
{
$this->httpServer = $httpServer;
}

/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
$alias = null === $sentStamp ? 'swoole-task' : $sentStamp->getSenderAlias() ?? $sentStamp->getSenderClass();

$this->httpServer->dispatchTask($envelope->with(new ReceivedStamp($alias)));

return $envelope;
}
}
Loading

0 comments on commit d136313

Please sign in to comment.