Skip to content

Commit

Permalink
feat(coroutines): changed SF resetting mechanism so only needed servi…
Browse files Browse the repository at this point in the history
…ces get reset in first usage
  • Loading branch information
Rastusik committed Feb 6, 2023
1 parent c7b31ed commit 7a568c7
Show file tree
Hide file tree
Showing 25 changed files with 540 additions and 87 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"phpunit/php-code-coverage": "^9.1.0",
"phpunit/phpcov": "^8.0",
"phpunit/phpunit": "^9.5",
"pixelfederation/doctrine-resettable-em-bundle": "^6.2",
"pixelfederation/doctrine-resettable-em-bundle": "dev-pinging_rework",
"pixelfederation/openswoole-blackfire": "^1.0",
"pixelfederation/z-engine": "~7.4|~8.0|~8.1",
"ramsey/uuid": "^4.1",
Expand Down
20 changes: 11 additions & 9 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions docs/configuration-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,50 @@ in multiple coroutine contexts:
e.g. in runtime (the point is to proxify those services in runtime)
- `swoole_bundle.stability_checker` - use this tag to mark a service as a stability checker

### Resetters

This bundle overrides Symfony resetting mechanism (`kernel.reset` tag), because the original mechanism is not well suited
for concurrent request processing. The main problem is that Symfony service resetter resets each already initialised
service on each request, which would lock a service instance of each service for each coroutine serving a request.
But not all of the services may be needed to be available for the request. Such behaviour might produce unnecessary
slowdowns while handling requests, since there are srvice instance limits implemented into the bundle. That means
that requests that don't need specific services would need to wait for other requests (which may or may not need
the same service instances as the waiting/blocked request), until they release the specific service instance.

This behaviour is problematic, so this bundle activates the resetting mechanism on each service instance only when
it is used for the first time in the request (while assigning it for concrete coroutine in the service pool). When
the request does not need the specific service, it won't get assigned to the coroutine. This helps the app to serve requests
without unnecessary blocking, e.g. when there are liveliness probes in the app that do not touch any database,
there is no need to assign a database connection for them. Since database connections (and maybe some other services)
are expensive resources, they will be used on demand, not always.

By default, this bundle automatically extracts all the resetter methods from Symfony service resetter and assigns
them to the service pools for each resettable service. That means, there is no need to do anything special in the app.

Sometimes there are special cases that emerge for using Symfony with coroutines turned on, like pinging DBAL connections
before the first query on each request (because the connections may be already closed, btw this bundle already has
a solution for this, using connection pingers
from [pixelfederation/doctrine-resettable-em-bundle](https://github.com/pixelfederation/doctrine-resettable-em-bundle)).

For special cases like this, you can implement a custom service resetter. The resetter has to implement
the `K911\Swoole\Bridge\Symfony\Container\Resetter` interface and has to be registered in the SF container
as a service. After that, you can configure any stateful service to use the resetter just by adding the resetter
service id to the stateful service tag like this:

```yaml
services:
my_custom_resetter:
class: My\Custom\ResetterClass
some_stateful_service:
class: My\Stateful\ServiceClass
tags: [{ name: swoole_bundle.stateful_service, resetter: my_custom_resetter}]
some_unmanaged_factory:
class: My\Unmanaged\FactoryClass2
tags: [{ name: swoole_bundle.unmanaged_factory, resetter: my_custom_resetter}]
```

### Stability checkers

Stability checkers are services services which make run-time checks for paired stateful services. They check,
Expand Down
32 changes: 32 additions & 0 deletions src/Bridge/Doctrine/DBAL/ConnectionKeepAliveResetter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Doctrine\DBAL;
use Doctrine\DBAL\Connection;
use K911\Swoole\Bridge\Symfony\Container\Resetter;
use PixelFederation\DoctrineResettableEmBundle\DBAL\Connection\AliveKeeper;

final class ConnectionKeepAliveResetter implements Resetter
{
private AliveKeeper $aliveKeeper;

private string $connectionName;

public function __construct(AliveKeeper $aliveKeeper, string $connectionName)
{
$this->aliveKeeper = $aliveKeeper;
$this->connectionName = $connectionName;
}

public function reset(object $service): void
{
if (!$service instanceof Connection) {
throw new \UnexpectedValueException(
\sprintf('Unexpected class instance: %s ', \get_class($service))
);
}

$this->aliveKeeper->keepAlive($service, $this->connectionName);
}
}
46 changes: 37 additions & 9 deletions src/Bridge/Doctrine/DoctrineProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

namespace K911\Swoole\Bridge\Doctrine;

use K911\Swoole\Bridge\Doctrine\DBAL\ConnectionKeepAliveResetter;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\CompileProcessor;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\CompilerPass\StatefulServices\Proxifier;
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\ContainerConstants;
use PixelFederation\DoctrineResettableEmBundle\DBAL\Connection\PlatformAliveKeeper;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
Expand Down Expand Up @@ -55,7 +57,7 @@ public function process(ContainerBuilder $container, Proxifier $proxifier): void
$this->decorateRepositoryFactory($container, $emName, $emSvcId);
}

$this->proxifyConnections($container, $proxifier, $connectionSvcIds);
$this->prepareConnectionsForProxification($container, $connectionSvcIds);
$this->fixDebugDataHolderResetter($container, $proxifier);
}

Expand All @@ -71,11 +73,12 @@ private function overrideEmConfigurator(ContainerBuilder $container, Definition
$emDef->setConfigurator([new Reference($newConfiguratorDefSvcId), 'configure']);
}

private function proxifyConnections(
ContainerBuilder $container,
Proxifier $proxifier,
array $connectionSvcIds
): void {
private function prepareConnectionsForProxification(ContainerBuilder $container, array $connectionSvcIds): void
{
$dbalAliveKeeperDef = $container->findDefinition(PlatformAliveKeeper::class);
$aliveKeepers = $dbalAliveKeeperDef->getArgument(1);
$dbalAliveKeeperDef->setArgument(1, []);

foreach ($connectionSvcIds as $connectionName => $connectionSvcId) {
$limit = $this->getConnectionLimit($connectionName);

Expand All @@ -84,14 +87,21 @@ private function proxifyConnections(
}

$connectionDef = $container->findDefinition($connectionSvcId);
$tagParams = [];

if ($limit) {
$connectionDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, ['limit' => $limit]);
$tagParams['limit'] = $limit;
}

continue;
if (isset($aliveKeepers[$connectionName])) {
$tagParams['resetter'] = $this->tryToCreateKeepAliveResetter(
$container,
$connectionName,
$aliveKeepers[$connectionName]
);
}

$proxifier->proxifyService($connectionSvcId);
$connectionDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, $tagParams);
}
}

Expand Down Expand Up @@ -166,4 +176,22 @@ private function getConnectionLimit(string $connectionName): ?int

return (int) $this->config['limits'][$connectionName];
}

private function tryToCreateKeepAliveResetter(
ContainerBuilder $container,
string $connectionName,
Reference $aliveKeeperRef
): string {
$resetterSvcId = sprintf(
'swoole_bundle.coroutines_support.doctrine.connection_resetter.%s',
$connectionName
);
$resetterDef = new Definition();
$resetterDef->setClass(ConnectionKeepAliveResetter::class);
$resetterDef->setArgument(0, $aliveKeeperRef);
$resetterDef->setArgument(1, $connectionName);
$container->setDefinition($resetterSvcId, $resetterDef);

return $resetterSvcId;
}
}
4 changes: 4 additions & 0 deletions src/Bridge/Doctrine/ORM/EntityManagerStabilityChecker.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ public function isStable(object $service): bool
throw new \UnexpectedValueException(\sprintf('Invalid service - expected %s, got %s', EntityManager::class, \get_class($service)));
}

if ($service->isOpen()) {
$service->clear(); // clear the em when not used any more in context, so RAM can be freed early
}

return $service->isOpen();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public function process(ContainerBuilder $container): void
* 6) coroutine 2 uses the not resetted service with state remembered from the other coroutine
*
* the instantiation on first reset is forced by using the RUNTIME_EXCEPTION_ON_INVALID_REFERENCE in service reference
*
* all this is only happening for resetters that are global and which have not been changed to service pool resetters
*/
private function makeResettableServicesActive(ContainerBuilder $container): void
{
$resetterCompilerPass = new ResettableServicePass();
$resetterCompilerPass->process($container);
$resetterDef = $container->findDefinition('services_resetter');

if ($resetterDef->hasTag('kernel.reset')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use K911\Swoole\Bridge\Symfony\Bundle\DependencyInjection\ContainerConstants;
use K911\Swoole\Bridge\Symfony\Container\Proxy\Instantiator;
use K911\Swoole\Bridge\Symfony\Container\ServicePool\DiServicePool;
use K911\Swoole\Bridge\Symfony\Container\SimpleResetter;
use K911\Swoole\Bridge\Symfony\Container\StabilityChecker;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand Down Expand Up @@ -48,7 +49,7 @@ public function __construct(
$this->stabilityCheckers = array_merge(self::DEFAULT_STABILITY_CHECKERS, $stabilityCheckers);
}

public function proxifyService(string $serviceId): void
public function proxifyService(string $serviceId, ?string $externalResetter = null): void
{
if (!$this->container->has($serviceId)) {
throw new \RuntimeException(sprintf('Service missing: %s', $serviceId));
Expand All @@ -64,12 +65,12 @@ public function proxifyService(string $serviceId): void
}

if (!$tags->hasDecoratedStatefulServiceTag()) {
$this->doProxifyService($serviceId, $serviceDef);
$this->doProxifyService($serviceId, $serviceDef, $externalResetter);

return;
}

$this->doProxifyDecoratedService($serviceId, $serviceDef);
$this->doProxifyDecoratedService($serviceId, $serviceDef, $externalResetter);
}

/**
Expand All @@ -80,14 +81,14 @@ public function getProxifiedServicePoolRefs(): array
return $this->proxifiedServicePoolRefs;
}

private function doProxifyService(string $serviceId, Definition $serviceDef): void
private function doProxifyService(string $serviceId, Definition $serviceDef, ?string $externalResetter = null): void
{
if (!$this->container->has($serviceId)) {
throw new \RuntimeException(sprintf('Service missing: %s', $serviceId));
}

$wrappedServiceId = sprintf('%s.swoole_coop.wrapped', $serviceId);
$svcPoolDef = $this->prepareServicePool($wrappedServiceId, $serviceDef);
$svcPoolDef = $this->prepareServicePool($wrappedServiceId, $serviceDef, $externalResetter);
$svcPoolServiceId = sprintf('%s.swoole_coop.service_pool', $serviceId);
$proxyDef = $this->prepareProxy($svcPoolServiceId, $serviceDef);
$this->prepareProxifiedService($serviceDef);
Expand All @@ -100,7 +101,7 @@ private function doProxifyService(string $serviceId, Definition $serviceDef): vo
$this->proxifiedServicePoolRefs[] = new Reference($svcPoolServiceId);
}

private function doProxifyDecoratedService(string $serviceId, Definition $serviceDef): void
private function doProxifyDecoratedService(string $serviceId, Definition $serviceDef, ?string $externalResetter = null): void
{
if (null === $serviceDef->innerServiceId) {
throw new \UnexpectedValueException(sprintf('Inner service id missing for service %s', $serviceId));
Expand All @@ -112,7 +113,7 @@ private function doProxifyDecoratedService(string $serviceId, Definition $servic
$decoratedServiceDef = $this->container->findDefinition($decoratedServiceId);

if ($this->isProxyfiable($decoratedServiceId, $decoratedServiceDef)) {
$this->doProxifyService($decoratedServiceId, $decoratedServiceDef);
$this->doProxifyService($decoratedServiceId, $decoratedServiceDef, $externalResetter);

return;
}
Expand All @@ -128,8 +129,11 @@ private function prepareProxifiedService(Definition $serviceDef): void
$serviceDef->setShared(false);
}

private function prepareServicePool(string $wrappedServiceId, Definition $serviceDef): Definition
{
private function prepareServicePool(
string $wrappedServiceId,
Definition $serviceDef,
?string $externalResetter = null
): Definition {
$svcPoolDef = new Definition(DiServicePool::class);
$svcPoolDef->setShared(true);
$svcPoolDef->setArgument(0, $wrappedServiceId);
Expand All @@ -140,20 +144,39 @@ private function prepareServicePool(string $wrappedServiceId, Definition $servic
$serviceClass = $serviceDef->getClass();
$serviceTags = new Tags($serviceClass, $serviceDef->getTags());
$serviceTag = $serviceTags->findStatefulServiceTag();
$customResetter = null;

if (null !== $serviceTag && null !== $serviceTag->getLimit()) {
$instanceLimit = $serviceTag->getLimit();
$customResetter = $serviceTag->getResetter();
}

$svcPoolDef->setArgument(3, $instanceLimit);
$svcPoolDef->setArgument(4, null);

$resetterDefOrRef = null;

if (null !== $customResetter) {
$resetterDefOrRef = new Reference($customResetter);
}

if (null === $resetterDefOrRef && null !== $externalResetter) {
$resetterDefOrRef = new Definition();
$resetterDefOrRef->setClass(SimpleResetter::class);
$resetterDefOrRef->setArgument(0, $externalResetter);
}

if ($resetterDefOrRef) {
$svcPoolDef->setArgument(4, $resetterDefOrRef);
}

if (!isset($this->stabilityCheckers[$serviceClass])) {
return $svcPoolDef;
}

$checkerSvcId = $this->stabilityCheckers[$serviceClass];
$this->container->findDefinition($checkerSvcId);
$svcPoolDef->setArgument(4, new Reference($checkerSvcId));
$svcPoolDef->setArgument(5, new Reference($checkerSvcId));

return $svcPoolDef;
}
Expand Down
Loading

0 comments on commit 7a568c7

Please sign in to comment.