Skip to content
This repository has been archived by the owner on Jan 17, 2022. It is now read-only.

feat(messenger): Add Symfony Messenger integration #56

Merged
merged 1 commit into from
Jun 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
}],
"require": {
"php": "^7.2",
"ext-swoole": "^4",
"ext-swoole": "^4.3.4",
"beberlei/assert": "^3.0",
"symfony/config": "^4.2",
"symfony/console": "^4.2",
"symfony/process": "^4.2",
"symfony/dependency-injection": "^4.2",
"symfony/http-foundation": "^4.2",
"symfony/http-kernel": "^4.2"
"symfony/config": "^4.3.1",
"symfony/console": "^4.3.1",
"symfony/process": "^4.3.1",
"symfony/dependency-injection": "^4.3.1",
"symfony/http-foundation": "^4.3.1",
"symfony/http-kernel": "^4.3.1"
},
"require-dev": {
"doctrine/annotations": "^1.6",
Expand All @@ -41,13 +41,14 @@
"phpunit/phpcov": "^6.0",
"phpunit/phpunit": "^8.0.4",
"swoole/ide-helper": "^4.3",
"symfony/debug": "^4.2",
"symfony/framework-bundle": "^4.2.4",
"symfony/monolog-bridge": "^4.2",
"symfony/debug": "^4.3.1",
"symfony/framework-bundle": "^4.3.1",
"symfony/messenger": "^4.3.1",
"symfony/monolog-bridge": "^4.3.1",
"symfony/monolog-bundle": "^3.3",
"symfony/twig-bundle": "^4.2",
"symfony/var-dumper": "^4.2",
"symfony/yaml": "^4.2"
"symfony/twig-bundle": "^4.3.1",
"symfony/var-dumper": "^4.3.1",
"symfony/yaml": "^4.3.1"
},
"suggest": {
"ext-inotify": "To enable HMR.",
Expand Down
82 changes: 78 additions & 4 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/Bridge/Symfony/Bundle/Command/ServerStatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ private function showMetrics(SymfonyStyle $io, array $metrics): void
['Active connections', $server['connection_num'], '1'],
['Accepted connections', $server['accept_count'], '1'],
['Closed connections', $server['close_count'], '1'],
['All workers', $workers, '1'],
['Active workers', $activeWorkers, '1'],
['Idle workers', $idleWorkers, '1'],
['Running coroutines', $server['coroutine_num'], '1'],
['Tasks in queue', $server['tasking_num'], '1'],
]);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ public function getConfigTreeBuilder(): TreeBuilder
->integerNode('reactor_count')
->min(1)
->end()
->scalarNode('task_worker_count')
->defaultNull()
->end()
->end()
->end() // settings
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@
use K911\Swoole\Bridge\Symfony\HttpFoundation\RequestFactoryInterface;
use K911\Swoole\Bridge\Symfony\HttpFoundation\TrustAllProxiesRequestHandler;
use K911\Swoole\Bridge\Symfony\HttpKernel\DebugHttpKernelRequestHandler;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportFactory;
use K911\Swoole\Bridge\Symfony\Messenger\SwooleServerTaskTransportHandler;
use K911\Swoole\Server\Config\Socket;
use K911\Swoole\Server\Config\Sockets;
use K911\Swoole\Server\Configurator\ConfiguratorInterface;
use K911\Swoole\Server\HttpServer;
use K911\Swoole\Server\HttpServerConfiguration;
use K911\Swoole\Server\RequestHandler\AdvancedStaticFilesServer;
use K911\Swoole\Server\RequestHandler\RequestHandlerInterface;
use K911\Swoole\Server\Runtime\BootableInterface;
use K911\Swoole\Server\Runtime\HMR\HotModuleReloaderInterface;
use K911\Swoole\Server\Runtime\HMR\InotifyHMR;
use K911\Swoole\Server\TaskHandler\TaskHandlerInterface;
use K911\Swoole\Server\WorkerHandler\HMRWorkerStartHandler;
use K911\Swoole\Server\WorkerHandler\WorkerStartHandlerInterface;
use Symfony\Component\Config\FileLocator;
Expand All @@ -28,6 +32,8 @@
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;

final class SwooleExtension extends Extension implements PrependExtensionInterface
{
Expand Down Expand Up @@ -58,6 +64,10 @@ public function load(array $configs, ContainerBuilder $container): void
$config = $this->processConfiguration($configuration, $configs);

$this->registerHttpServer($config['http_server'], $container);

if (\interface_exists(TransportFactoryInterface::class)) {
$this->registerSwooleServerTransportConfiguration($container);
}
}

/**
Expand All @@ -78,6 +88,18 @@ private function registerHttpServer(array $config, ContainerBuilder $container):
$this->registerHttpServerConfiguration($config, $container);
}

private function registerSwooleServerTransportConfiguration(ContainerBuilder $container): void
{
$container->register(SwooleServerTaskTransportFactory::class)
->addTag('messenger.transport_factory')
->addArgument(new Reference(HttpServer::class));

$container->register(SwooleServerTaskTransportHandler::class)
->addArgument(new Reference(MessageBusInterface::class))
->addArgument(new Reference(SwooleServerTaskTransportHandler::class.'.inner'))
->setDecoratedService(TaskHandlerInterface::class, null, -10);
}

private function registerHttpServerConfiguration(array $config, ContainerBuilder $container): void
{
[
Expand Down
10 changes: 10 additions & 0 deletions src/Bridge/Symfony/Bundle/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ services:
'K911\Swoole\Server\LifecycleHandler\ServerManagerStopHandlerInterface':
class: K911\Swoole\Server\LifecycleHandler\NoOpServerManagerStopHandler

'K911\Swoole\Server\TaskHandler\TaskHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskHandler

'K911\Swoole\Server\TaskHandler\TaskFinishedHandlerInterface':
class: K911\Swoole\Server\TaskHandler\NoOpTaskFinishedHandler

'K911\Swoole\Server\Api\ApiServerClientFactory':

'K911\Swoole\Server\Api\ApiServerClient':
Expand Down Expand Up @@ -78,6 +84,10 @@ services:

'K911\Swoole\Server\Configurator\WithWorkerStartHandler':

'K911\Swoole\Server\Configurator\WithTaskHandler':

'K911\Swoole\Server\Configurator\WithTaskFinishedHandler':

'K911\Swoole\Server\Configurator\CallableChainConfiguratorFactory':

'K911\Swoole\Server\Api\WithApiServerConfiguration':
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger\Exception;

use Symfony\Component\Messenger\Exception\TransportException;

final class ReceiverNotAvailableException extends TransportException
{
public static function make(): self
{
throw new self('Swoole Server Task transport does not implement Receiver interface methods. Messages sent via Swoole Server Task transport are dispatched inside task worker processes.');
}
}
36 changes: 36 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskReceiver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Bridge\Symfony\Messenger\Exception\ReceiverNotAvailableException;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;

final class SwooleServerTaskReceiver implements ReceiverInterface
{
/**
* {@inheritdoc}
*/
public function get(): iterable
{
throw ReceiverNotAvailableException::make();
}

/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
throw ReceiverNotAvailableException::make();
}

/**
* {@inheritdoc}
*/
public function reject(Envelope $envelope): void
{
throw ReceiverNotAvailableException::make();
}
}
35 changes: 35 additions & 0 deletions src/Bridge/Symfony/Messenger/SwooleServerTaskSender.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Messenger;

use K911\Swoole\Server\HttpServer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;

final class SwooleServerTaskSender implements SenderInterface
{
private $httpServer;

public function __construct(HttpServer $httpServer)
{
$this->httpServer = $httpServer;
}

/**
* {@inheritdoc}
*/
public function send(Envelope $envelope): Envelope
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);
$alias = null === $sentStamp ? 'swoole-task' : $sentStamp->getSenderAlias() ?? $sentStamp->getSenderClass();

$this->httpServer->dispatchTask($envelope->with(new ReceivedStamp($alias)));

return $envelope;
}
}
Loading