Skip to content

Commit

Permalink
fix(coroutines): added global exclusive single coroutine access to ea…
Browse files Browse the repository at this point in the history
…ch container operation, so no deadlocks occurence is possible
  • Loading branch information
Rastusik committed Feb 6, 2023
1 parent 80d2c60 commit 6d0611b
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 41 deletions.
22 changes: 20 additions & 2 deletions src/Bridge/Doctrine/DoctrineProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,22 @@ public function process(ContainerBuilder $container, Proxifier $proxifier): void
throw new \UnexpectedValueException('Cannot obtain array of doctrine connections.');
}

$this->prepareConnectionsForProxification($container, $connectionSvcIds);

foreach ($entityManagers as $emName => $emSvcId) {
$emDef = $container->findDefinition($emSvcId);
$proxifier->proxifyService($emSvcId);
$tagParams = [];
$limit = $this->getLimitFromEntityManagerConnection($container, $emDef);

if (null !== $limit) {
$tagParams['limit'] = $limit;
}

$emDef->addTag(ContainerConstants::TAG_STATEFUL_SERVICE, $tagParams);
$this->overrideEmConfigurator($container, $emDef);
$this->decorateRepositoryFactory($container, $emName, $emSvcId);
}

$this->prepareConnectionsForProxification($container, $connectionSvcIds);
$this->fixDebugDataHolderResetter($container, $proxifier);
}

Expand Down Expand Up @@ -108,6 +116,16 @@ private function prepareConnectionsForProxification(ContainerBuilder $container,
}
}

private function getLimitFromEntityManagerConnection(ContainerBuilder $container, Definition $emDef): ?int
{
/** @vat Reference $connRef */
$connRef = $emDef->getArgument(0);
$connDef = $container->findDefinition((string) $connRef);
$statefulSvcTag = $connDef->getTag(ContainerConstants::TAG_STATEFUL_SERVICE);

return $statefulSvcTag && isset($statefulSvcTag[0]['limit']) ? $statefulSvcTag[0]['limit'] : null;
}

private function fixDebugDataHolderResetter(ContainerBuilder $container, Proxifier $proxifier): void
{
if (!$container->has('doctrine.debug_data_holder')) {
Expand Down
4 changes: 2 additions & 2 deletions src/Bridge/Symfony/Bundle/Resources/config/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ services:
factory: ['K911\Swoole\Component\Locking\CoroutineLocking', 'init']

swoole_bundle.unmanaged_factory_first_time.locking:
class: K911\Swoole\Component\Locking\FirstTimeOnlyLocking
factory: [ 'K911\Swoole\Component\Locking\FirstTimeOnlyLocking', 'init' ]
class: K911\Swoole\Component\Locking\CoroutineLocking
factory: [ 'K911\Swoole\Component\Locking\CoroutineLocking', 'init' ]

K911\Swoole\Bridge\Symfony\Container\Proxy\Instantiator:
arguments:
Expand Down
15 changes: 10 additions & 5 deletions src/Bridge/Symfony/Container/BlockingContainer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@

namespace K911\Swoole\Bridge\Symfony\Container;

use K911\Swoole\Component\Locking\CoroutineLocking;
use K911\Swoole\Component\Locking\Locking;
use K911\Swoole\Component\Locking\ContainerLocking;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\DependencyInjection\ParameterBag\ParameterBagInterface;

class BlockingContainer extends Container
{
protected static Locking $locking;
protected static ContainerLocking $locking;

protected static string $buildContainerNs = '';

public function __construct(ParameterBagInterface $parameterBag = null)
{
self::$locking = CoroutineLocking::init();
$locking = ContainerLocking::init();

if (!$locking instanceof ContainerLocking) {
throw new \UnexpectedValueException(sprintf('Invalid locking class: %s', get_class($locking)));
}

self::$locking = $locking;

parent::__construct($parameterBag);
}
Expand All @@ -27,7 +32,7 @@ public function __construct(ParameterBagInterface $parameterBag = null)
*/
public function get(string $id, int $invalidBehavior = self::EXCEPTION_ON_INVALID_REFERENCE): ?object
{
$lock = self::$locking->acquire($id);
$lock = self::$locking->acquireContainerLock();

try {
$service = parent::get($id, $invalidBehavior);
Expand Down
69 changes: 55 additions & 14 deletions src/Bridge/Symfony/Container/ContainerModifier.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public static function overrideDoInExtension(string $containerDir, string $fileT
$refl->addMethod('doOverridden', $reflDo->getClosure());

$reflDo->redefine(function ($container, $lazyLoad = true) {
$lockName = get_called_class().'::DO_'.($lazyLoad ? 'lazy' : '');
$lock = self::$locking->acquire($lockName);
$lock = self::$locking->acquireContainerLock();

try {
$return = self::doOverridden($container, $lazyLoad);
Expand Down Expand Up @@ -109,7 +108,7 @@ private static function overrideCreateProxy(BlockingContainer $container, Reflec
$createProxyRefl = $reflContainer->getMethod('createProxy');
$reflContainer->addMethod('createProxyOverridden', $createProxyRefl->getClosure($container));
$createProxyRefl->redefine(function ($class, \Closure $factory) {
$lock = self::$locking->acquire($class);
$lock = self::$locking->acquireContainerLock();

try {
$return = $this->createProxyOverridden($class, $factory);
Expand Down Expand Up @@ -139,7 +138,7 @@ private static function overrideOriginalContainerLoad(BlockingContainer $contain
$loadRefl = $reflContainer->getMethod('load');
$reflContainer->addMethod('loadOverridden', $loadRefl->getClosure($container));
$loadRefl->redefine(function (string $file) {
$lock = self::$locking->acquire($file);
$lock = self::$locking->acquireContainerLock();

try {
$return = $this->loadOverridden($file);
Expand All @@ -156,7 +155,7 @@ private static function overrideGeneratedLoad(BlockingContainer $container, Refl
$loadRefl = $reflContainer->getMethod('load');
$reflContainer->addMethod('loadOverridden', $loadRefl->getClosure($container));
$loadRefl->redefine(function ($file, $lazyLoad = true) {
$lock = self::$locking->acquire($file);
$lock = self::$locking->acquireContainerLock();

try {
$fileToLoad = $file;
Expand Down Expand Up @@ -196,6 +195,7 @@ private static function overrideGeneratedContainer(ReflectionClass $reflContaine
}

$containerSource = file_get_contents($containerFile);
$codeExtractor = new ContainerSourceCodeExtractor($containerSource);
$overriddenSource = str_replace('class '.$containerClass, 'class '.$overriddenClass, $containerSource);

// dump opcache.blacklist_filename
Expand All @@ -216,7 +216,7 @@ private static function overrideGeneratedContainer(ReflectionClass $reflContaine
continue;
}

$methodsCodes[] = self::generateOverriddenGetter($method);
$methodsCodes[] = self::generateOverriddenGetter($method, $codeExtractor);
}

$namespace = $reflContainer->getNamespaceName();
Expand All @@ -228,6 +228,8 @@ private static function overrideGeneratedContainer(ReflectionClass $reflContaine
class $containerClass extends $overriddenClass
{
private \$lazyInitializedShared = [];
$methodsCode
}
EOF;
Expand All @@ -237,16 +239,37 @@ class $containerClass extends $overriddenClass
$fs->dumpFile($containerFile, $newContainerSource);
}

private static function generateOverriddenGetter(ReflectionMethod $method): string
private static function generateOverriddenGetter(ReflectionMethod $method, ContainerSourceCodeExtractor $extractor): ?string
{
$methodName = $method->getName();
$internals = $extractor->getContainerInternalsForMethod($method);

if (isset($internals['type']) && 'factories' === $internals['type']) {
$internals = [];
}

return $method->getNumberOfParameters() > 0 ?
self::generateLazyGetter($methodName) : self::generateCasualGetter($methodName);
self::generateLazyGetter($methodName, $internals) : self::generateCasualGetter($methodName, $internals);
}

private static function generateLazyGetter(string $methodName): string
private static function generateLazyGetter(string $methodName, array $internals): string
{
$sharedCheck = PHP_EOL;

if (!empty($internals)) {
$arrayKey = "['{$internals['key']}']".(isset($internals['key2']) ? "['{$internals['key2']}']" : '');
$sharedCheck = <<<EOF
if (isset(\$this->{$internals['type']}{$arrayKey})) {
if (\$lazyLoad) {
return \$this->{$internals['type']}{$arrayKey};
} elseif (\$this->{$internals['type']}{$arrayKey}->isProxyInitialized() && isset(\$this->lazyInitializedShared['$methodName'])) {
return \$this->lazyInitializedShared['$methodName'];
}
}
EOF;
}

return <<<EOF
protected function $methodName(\$lazyLoad = true) {
// this might be a weird SF container bug or idk... but SF container keeps calling this factory method
Expand All @@ -255,10 +278,14 @@ protected function $methodName(\$lazyLoad = true) {
\$lazyLoad = true;
}
\$lock = self::\$locking->acquire('$methodName'.'_'.(\$lazyLoad ? 'lazy' : ''));
{$sharedCheck}
try {
\$lock = self::\$locking->acquireContainerLock();
{$sharedCheck}
\$return = parent::{$methodName}(\$lazyLoad);
if (!\$lazyLoad) \$this->lazyInitializedShared['$methodName'] = \$return;
} finally {
\$lock->release();
}
Expand All @@ -268,13 +295,27 @@ protected function $methodName(\$lazyLoad = true) {
EOF;
}

private static function generateCasualGetter(string $methodName): string
private static function generateCasualGetter(string $methodName, array $internals): string
{
$sharedCheck = PHP_EOL;

if (!empty($internals)) {
$arrayKey = "['{$internals['key']}']".(isset($internals['key2']) ? "['{$internals['key2']}']" : '');
$sharedCheck = <<<EOF
if (isset(\$this->{$internals['type']}{$arrayKey})) {
return \$this->{$internals['type']}{$arrayKey};
}
EOF;
}

return <<<EOF
protected function $methodName() {
\$lock = self::\$locking->acquire('$methodName');
{$sharedCheck}
try {
\$lock = self::\$locking->acquireContainerLock();
{$sharedCheck}
\$return = parent::{$methodName}();
} finally {
\$lock->release();
Expand Down
41 changes: 41 additions & 0 deletions src/Bridge/Symfony/Container/ContainerSourceCodeExtractor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Bridge\Symfony\Container;

use ZEngine\Reflection\ReflectionMethod;

final class ContainerSourceCodeExtractor
{
private array $sourceCode;

public function __construct(string $sourceCode)
{
$this->sourceCode = explode(PHP_EOL, $sourceCode);
}

public function getContainerInternalsForMethod(ReflectionMethod $method): array
{
$code = $this->getMethodCode($method);

if (!preg_match(
'/return \\$this->(?P<type>[a-z]+)\[\'(?P<key>[^\']+)\'\](\[\'(?P<key2>[^\']+)\'\])? \=/',
$code,
$matches
)) {
return [];
}

return $matches;
}

public function getMethodCode(ReflectionMethod $method): string
{
$startLine = $method->getStartLine() - 1; // it's actually - 1, otherwise you wont get the function() block
$endLine = $method->getEndLine();
$length = $endLine - $startLine;

return implode(PHP_EOL, array_slice($this->sourceCode, $startLine, $length));
}
}
5 changes: 0 additions & 5 deletions src/Bridge/Symfony/HttpKernel/CoroutineKernelPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace K911\Swoole\Bridge\Symfony\HttpKernel;

use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpKernel\KernelInterface;

final class CoroutineKernelPool implements KernelPoolInterface
Expand All @@ -24,10 +23,6 @@ public function __construct(KernelInterface $kernel)
public function boot(): void
{
$this->kernel->boot();
// this will boot the http kernel before the start of swoole web workers, which means that
// routers etc. will be initialized before getting into coroutine context
// without this there are concurrency problems while loading the application
$this->kernel->handle(new Request());
}

public function get(): KernelInterface
Expand Down
29 changes: 29 additions & 0 deletions src/Component/Locking/ContainerLocking.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace K911\Swoole\Component\Locking;

final class ContainerLocking extends CoroutineLocking
{
private const LOCK_KEY = 'EXCLUSIVE_CONTAINER_LOCK';

public function acquire(string $key): Lock
{
throw new \RuntimeException('This lock is not supposed to have variable lock keys.');
}

public function acquireContainerLock(): Lock
{
return parent::acquire(self::LOCK_KEY);
}

public static function init(?Locking $locking = null): Locking
{
if (null === $locking) {
$locking = new ContainerLocking();
}

return $locking;
}
}
12 changes: 5 additions & 7 deletions src/Component/Locking/CoroutineLocking.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

namespace K911\Swoole\Component\Locking;

final class CoroutineLocking implements Locking
class CoroutineLocking implements Locking
{
private Store $store;

private function __construct()
protected function __construct()
{
$this->store = new Store();
}
Expand All @@ -17,15 +17,13 @@ public function acquire(string $key): Lock
{
$cid = \Co::getCid();

// wait 0.01 ms if the container is already resolving the requested service
// wait 0.001 ms if the container is already resolving the requested service
// coroutine hook for usleep should switch context to other coroutine, while waiting
while ($this->store->has($key) && $this->store->get($key) !== $cid) {
usleep(10);
usleep(1);
}

$this->store->save($key, $cid);

return new CoroutineLock($key, $this->store);
return $this->store->save($key, $cid);
}

public static function init(?Locking $locking = null): Locking
Expand Down
8 changes: 5 additions & 3 deletions src/Component/Locking/FirstTimeOnlyLocking.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ private function __construct(Locking $wrapped)
public function acquire(string $key): Lock
{
if (!$this->store->has($key)) {
$this->store->save($key, FirstTimeOnlyLock::LOCKED);

return FirstTimeOnlyLock::locked($key, $this->store, $this->wrapped->acquire($key));
return $this->store->save(
$key,
FirstTimeOnlyLock::LOCKED,
FirstTimeOnlyLock::locked($key, $this->store, $this->wrapped->acquire($key))
);
}

if (FirstTimeOnlyLock::RELEASED === $this->store->get($key)) {
Expand Down
Loading

0 comments on commit 6d0611b

Please sign in to comment.