Skip to content

Commit

Permalink
feat(coroutines): added configurable doctrine connections limit per s…
Browse files Browse the repository at this point in the history
…woole process
  • Loading branch information
Rastusik committed Feb 6, 2023
1 parent 22e2e9c commit d5365d3
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 21 deletions.
8 changes: 8 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ swoole:
compile_processors:
- class: ProcessorClass1
priority: 10
config: [] # all data will be propagated to the processor constructor, this attribute is not needed
- ProcessorClass2 # default priority
# register classes implementing the CompileProcessor interface
# check the section below about coroutines usage
# configuration options for doctrine processor - set instance limits for each connection type, or global limit
doctrine_processor_config:
# max connections in each swoole process for each doctrine connections, default is max_service_instances
global_limit: 10
limits:
# connection with name 'default' will have max 9 instances per swoole process, if not set, default is global_limit
default: 9
```
## Additional info for coroutines usage
Expand Down
61 changes: 56 additions & 5 deletions src/Bridge/Doctrine/DoctrineProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,27 @@

use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\CompileProcessor;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\Proxifier;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\ContainerConstants;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

final class DoctrineProcessor implements CompileProcessor
{
/**
* @var array{global_limit?: int, limits?: array<string, int>}
*/
private array $config;

/**
* @param array{global_limit?: int, limits?: array<string, int>} $config
*/
public function __construct(array $config = [])
{
$this->config = $config;
}

public function process(ContainerBuilder $container, Proxifier $proxifier): void
{
/** @var array<string,string> $bundles */
Expand All @@ -28,15 +42,16 @@ public function process(ContainerBuilder $container, Proxifier $proxifier): void
throw new \UnexpectedValueException('Cannot obtain array of entity managers.');
}

$connectionSvcIds = [];
$connectionSvcIds = $container->getParameter('doctrine.connections');

if (!\is_array($connectionSvcIds)) {
throw new \UnexpectedValueException('Cannot obtain array of doctrine connections.');
}

foreach ($entityManagers as $emName => $emSvcId) {
$emDef = $container->findDefinition($emSvcId);
$proxifier->proxifyService($emSvcId);
$this->overrideEmConfigurator($container, $emDef);
$connRef = $emDef->getArgument(0);
$connSvcId = (string) $connRef;
$connectionSvcIds[$connSvcId] = $connSvcId;
$this->decorateRepositoryFactory($container, $emName, $emSvcId);
}

Expand All @@ -61,7 +76,21 @@ private function proxifyConnections(
Proxifier $proxifier,
array $connectionSvcIds
): void {
foreach ($connectionSvcIds as $connectionSvcId) {
foreach ($connectionSvcIds as $connectionName => $connectionSvcId) {
$limit = $this->getConnectionLimit($connectionName);

if (!$limit) {
$limit = $this->getGlobalConnectionLimit();
}

$connectionDef = $container->findDefinition($connectionSvcId);

if ($limit) {
$connectionDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, ['limit' => $limit]);

continue;
}

$proxifier->proxifyService($connectionSvcId);
}
}
Expand Down Expand Up @@ -115,4 +144,26 @@ private function decorateRepositoryFactory(ContainerBuilder $container, string $
}
$configuratorDef->setMethodCalls($methodCalls);
}

private function getGlobalConnectionLimit(): ?int
{
if (!isset($this->config['global_limit'])) {
return null;
}

return $this->config['global_limit'];
}

private function getConnectionLimit(string $connectionName): ?int
{
if (!isset($this->config['limits'])) {
return null;
}

if (!isset($this->config['limits'][$connectionName])) {
return null;
}

return (int) $this->config['limits'][$connectionName];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ final class StatefulServicesPass implements CompilerPassInterface
* @var array<array{class: class-string<CompileProcessor>, priority: int}>
*/
private const COMPILE_PROCESSORS = [
[
DoctrineProcessor::class => [
'class' => DoctrineProcessor::class,
'priority' => 0,
],
[
MonologProcessor::class => [
'class' => MonologProcessor::class,
'priority' => 0,
],
Expand Down Expand Up @@ -73,36 +73,56 @@ public function process(ContainerBuilder $container): void

private function runCompileProcessors(ContainerBuilder $container, Proxifier $proxifier): void
{
/** @var array<array{class: class-string<CompileProcessor>, priority: int}> $compileProcessors */
$compileProcessors = $container->getParameter(ContainerConstants::PARAM_COROUTINES_COMPILE_PROCESSORS);

if (!is_array($compileProcessors)) {
throw new \UnexpectedValueException('Invalid compiler processors provided');
}

$compileProcessors = array_merge(self::COMPILE_PROCESSORS, $compileProcessors);
/** @var null|array<string, mixed> $doctrineConfig */
$doctrineConfig = $container->hasParameter(ContainerConstants::PARAM_COROUTINES_DOCTRINE_COMPILE_PROCESSOR_CONFIG) ?
$container->getParameter(ContainerConstants::PARAM_COROUTINES_DOCTRINE_COMPILE_PROCESSOR_CONFIG) : null;

$defaultProcessors = self::COMPILE_PROCESSORS;

if (null !== $doctrineConfig) {
$defaultProcessors[DoctrineProcessor::class]['config'] = $doctrineConfig;
}

/** @var array<array{class: class-string<CompileProcessor>, priority: int}> $compileProcessors */
$compileProcessors = array_merge(array_values($defaultProcessors), $compileProcessors);

/**
* @var callable(
* array<int, array<class-string<CompileProcessor>>>,
* array{class: class-string<CompileProcessor>, priority: int}
* ): array<int, array<class-string<CompileProcessor>>> $reducer
* array<int, array<array{class: class-string<CompileProcessor>, config?: array<string, mixed>}>>,
* array{class: class-string<CompileProcessor>, priority?: int, config?: array<string, mixed>}
* ): array<int, array<array{class: class-string<CompileProcessor>, config?: array<string, mixed>}>> $reducer
*/
$reducer = static function (array $processors, array $processorConfig): array {
$processors[$processorConfig['priority']][] = $processorConfig['class'];
$priority = $processorConfig['priority'] ?? 0;
$processors[$priority][] = $processorConfig;

return $processors;
};
/** @var array<int, array{class: class-string<CompileProcessor>, priority: int}> $compileProcessors */

$compileProcessors = array_reduce(
$compileProcessors,
$reducer,
[]
);
/**
* @var array<int, array{
* class: class-string<CompileProcessor>,
* priority?: int,
* config?: array<string, mixed>
* }> $compileProcessors
*/
$compileProcessors = array_merge(...array_reverse($compileProcessors));

foreach ($compileProcessors as $processorClass) {
foreach ($compileProcessors as $processorConfig) {
/** @var CompileProcessor $processor */
$processor = new $processorClass();
$processor = isset($processorConfig['config']) ?
new $processorConfig['class']($processorConfig['config']) : new $processorConfig['class']();
$processor->process($container, $proxifier);
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/Bridge/Symfony/Bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,29 @@ public function getConfigTreeBuilder(): TreeBuilder
->integerNode('priority')
->defaultValue(0)
->end()
->arrayNode('config')
->ignoreExtraKeys()
->variablePrototype()->end()
->beforeNormalization()
->castToArray()
->end()
->end()
->end()
->end()
->end()
->arrayNode('doctrine_processor_config')
->children()
->integerNode('global_limit')
->defaultNull()
->min(1)
->max(200)
->end()
->arrayNode('limits')
->ignoreExtraKeys()
->integerPrototype()->end()
->beforeNormalization()
->castToArray()
->end()
->end()
->end()
->end()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ interface ContainerConstants

public const PARAM_COROUTINES_COMPILE_PROCESSORS = 'swoole_bundle.coroutines_support.compile_processors';

public const PARAM_COROUTINES_DOCTRINE_COMPILE_PROCESSOR_CONFIG =
'swoole_bundle.coroutines_support.doctrine_compile_processor.config';

public const PARAM_CACHE_FOLDER = 'swoole_bundle';

public const TAG_STATEFUL_SERVICE = 'swoole_bundle.stateful_service';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ private function configurePlatform(array $config, ?int $maxConcurrency, Containe
->addTag(ContainerConstants::TAG_STABILITY_CHECKER)
;

if (isset($coroutineSettings['doctrine_processor_config'])) {
$container->setParameter(
ContainerConstants::PARAM_COROUTINES_DOCTRINE_COMPILE_PROCESSOR_CONFIG,
$coroutineSettings['doctrine_processor_config']
);
}

return $swooleSettings;
}

Expand Down
2 changes: 2 additions & 0 deletions tests/Feature/SwooleServerCoroutinesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public function testCoroutinesWithDebugOn(): void
$this->assertStringContainsString('Service2 limit is 10.', $response['body']);
$this->assertStringContainsString('TmpRepo was proxified.', $response['body']);
$this->assertStringContainsString('TmpRepo limit is 15.', $response['body']);
$this->assertStringContainsString('Connection limit is 12.', $response['body']);

if (false !== strpos($response['body'], 'Check was true')) {
++$trueChecks;
Expand Down Expand Up @@ -145,6 +146,7 @@ public function testCoroutinesWithDebugOff(): void
$this->assertStringContainsString('Service2 limit is 10.', $response['body']);
$this->assertStringContainsString('TmpRepo was proxified.', $response['body']);
$this->assertStringContainsString('TmpRepo limit is 15.', $response['body']);
$this->assertStringContainsString('Connection limit is 12.', $response['body']);

if (false !== strpos($response['body'], 'Check was true')) {
++$trueChecks;
Expand Down
20 changes: 16 additions & 4 deletions tests/Fixtures/Symfony/TestBundle/Controller/SleepController.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace K911\Swoole\Tests\Fixtures\Symfony\TestBundle\Controller;

use Doctrine\DBAL\Connection;
use K911\Swoole\Bridge\Symfony\Container\ServicePool\BaseServicePool;
use K911\Swoole\Tests\Fixtures\Symfony\TestBundle\Service\DefaultDummyService;
use K911\Swoole\Tests\Fixtures\Symfony\TestBundle\Service\DummyService;
Expand All @@ -27,18 +28,22 @@ final class SleepController

private DummyService $dummyService;

private Connection $connection;

public function __construct(
SleepingCounter $sleepingCounter,
SleepingCounterChecker $checker,
ShouldBeProxified $shouldBeProxified,
ShouldBeProxified2 $shouldBeProxified2,
DummyService $dummyService
DummyService $dummyService,
Connection $connection
) {
$this->sleepingCounter = $sleepingCounter;
$this->checker = $checker;
$this->shouldBeProxified = $shouldBeProxified;
$this->shouldBeProxified2 = $shouldBeProxified2;
$this->dummyService = $dummyService;
$this->connection = $connection;
}

/**
Expand Down Expand Up @@ -79,15 +84,22 @@ public function index()
$isProxified3 = $tmpRepo instanceof VirtualProxyInterface ? 'was' : 'WAS NOT';
$initializer2 = $tmpRepo->getProxyInitializer();
$rf2 = new \ReflectionFunction($initializer2);
$servicePool2 = $rf2->getStaticVariables()['servicePool'];
$limit2 = $limitProperty->getValue($servicePool2);
$connServicePool = $rf2->getStaticVariables()['servicePool'];
$limit2 = $limitProperty->getValue($connServicePool);

/** @phpstan-ignore-next-line */
$connInitializer = $this->connection->getProxyInitializer();
$rf3 = new \ReflectionFunction($connInitializer);
$connServicePool = $rf3->getStaticVariables()['servicePool'];
$connlimit = $limitProperty->getValue($connServicePool);

return new Response(
"<html><body>Sleep was fine. Count was {$counter}. Check was {$check}. "
."Checks: {$checks}. "
."Service {$isProxified} proxified. Service2 {$isProxified2} proxified. "
."Service2 limit is {$limit}. TmpRepo {$isProxified3} proxified. "
."TmpRepo limit is {$limit2}.</body></html>"
."TmpRepo limit is {$limit2}. "
."Connection limit is {$connlimit}.</body></html>"
);
}
}
5 changes: 4 additions & 1 deletion tests/Fixtures/Symfony/app/config/coroutines/swoole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ swoole:
coroutines:
enabled: true
max_concurrency: 30
# max_service_instances: 50
max_service_instances: 20
stateful_services:
- K911\Swoole\Tests\Fixtures\Symfony\TestBundle\Service\ShouldBeProxified
compile_processors:
- class: K911\Swoole\Tests\Fixtures\Symfony\TestBundle\DependencyInjection\CompilerPass\SleepingCounterCompileProcessor
priority: 10
doctrine_processor_config:
limits:
default: 12

services:
_defaults:
Expand Down
5 changes: 5 additions & 0 deletions tests/Fixtures/Symfony/app/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ services:
exclude: '../../TestBundle/Controller/ReplacedContentTestController.php'
tags: ['controller.service_arguments']

K911\Swoole\Tests\Fixtures\Symfony\TestBundle\Controller\SleepController:
arguments:
$connection: '@doctrine.dbal.default_connection'
tags: ['controller.service_arguments']

Ramsey\Uuid\UuidFactoryInterface: '@Ramsey\Uuid\UuidFactory'

Ramsey\Uuid\UuidFactory:
Expand Down

0 comments on commit d5365d3

Please sign in to comment.