Skip to content

Commit

Permalink
Merge pull request #175 from synolia/feature/add-retry
Browse files Browse the repository at this point in the history
Add retry on product model import
  • Loading branch information
TheGrimmChester authored Dec 26, 2023
2 parents 5e61e7a + da1b329 commit 5609a7e
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
use Sylius\Component\Resource\Repository\RepositoryInterface;
use Synolia\SyliusAkeneoPlugin\Builder\Attribute\ProductAttributeValueValueBuilder;
use Synolia\SyliusAkeneoPlugin\Component\Attribute\AttributeType\AssetAttributeType;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\MissingLocaleTranslationException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\MissingLocaleTranslationOrScopeException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\MissingScopeException;
use Synolia\SyliusAkeneoPlugin\Exceptions\Attribute\TranslationNotFoundException;
use Synolia\SyliusAkeneoPlugin\Provider\AkeneoAttributeDataProviderInterface;
use Synolia\SyliusAkeneoPlugin\Provider\SyliusAkeneoLocaleCodeProvider;
use Synolia\SyliusAkeneoPlugin\Transformer\AkeneoAttributeToSyliusAttributeTransformerInterface;
Expand Down Expand Up @@ -74,18 +78,34 @@ public function process(string $attributeCode, array $context = []): void
/** @var AttributeInterface $attribute */
$attribute = $this->productAttributeRepository->findOneBy(['code' => $transformedAttributeCode]);

foreach ($this->syliusAkeneoLocaleCodeProvider->getUsedLocalesOnBothPlatforms() as $syliusAkeneo) {
$this->setAttributeTranslation(
$context['model'],
$attribute,
$context['data'],
$syliusAkeneo,
$attributeCode,
$context['scope'],
);
foreach ($this->syliusAkeneoLocaleCodeProvider->getUsedLocalesOnBothPlatforms() as $syliusLocale) {
try {
$this->setAttributeTranslation(
$context['model'],
$attribute,
$context['data'],
$syliusLocale,
$attributeCode,
$context['scope'],
);
} catch (MissingLocaleTranslationException | MissingLocaleTranslationOrScopeException|MissingScopeException|TranslationNotFoundException $error) {
$this->logger->warning('Attribute translation error', [
'attribute_code' => $attributeCode,
'sylius_locale' => $syliusLocale,
'context' => $context,
'error' => $error->getMessage(),
'trace' => $error->getTraceAsString(),
]);
}
}
}

/**
* @throws MissingLocaleTranslationOrScopeException
* @throws MissingLocaleTranslationException
* @throws MissingScopeException
* @throws TranslationNotFoundException
*/
private function setAttributeTranslation(
ProductInterface $product,
AttributeInterface $attribute,
Expand Down
1 change: 0 additions & 1 deletion src/Processor/ProductGroup/ProductGroupProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ private function createGroupForCodeAndFamily(
$productGroup->setModel($code);
$productGroup->setFamily($family);
$productGroup->setFamilyVariant($familyVariant);
$this->entityManager->persist($productGroup);

return;
}
Expand Down
6 changes: 6 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
parameters:
env(SYNOLIA_AKENEO_MAX_RETRY_COUNT): 3
env(SYNOLIA_AKENEO_RETRY_WAIT_TIME): 5000

services:
_defaults:
autowire: true
autoconfigure: true
public: false
bind:
$projectDir: '%kernel.project_dir%'
$maxRetryCount: '%env(int:SYNOLIA_AKENEO_MAX_RETRY_COUNT)%'
$retryWaitTime: '%env(int:SYNOLIA_AKENEO_RETRY_WAIT_TIME)%'

Synolia\SyliusAkeneoPlugin\:
resource: '../../*'
Expand Down
5 changes: 4 additions & 1 deletion src/Task/Product/ProcessProductsTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte
$this->process($payload);
}

$this->processManager->waitForAllProcesses();
$this->processManager->startAll();

return $payload;
}
Expand All @@ -114,6 +114,9 @@ private function handleProducts(
int &$count = 0,
array &$ids = [],
): void {
$this->processManager->setInstantProcessing($payload->getProcessAsSoonAsPossible());
$this->processManager->setNumberOfParallelProcesses($payload->getMaxRunningProcessQueueSize());

while (
($page instanceof Page && $page->hasNextPage()) ||
($page instanceof Page && !$page->hasPreviousPage()) ||
Expand Down
89 changes: 58 additions & 31 deletions src/Task/ProductModel/BatchProductModelTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Synolia\SyliusAkeneoPlugin\Task\ProductModel;

use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Result;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\ORMInvalidArgumentException;
use Doctrine\Persistence\ManagerRegistry;
Expand Down Expand Up @@ -41,6 +40,9 @@ public function __construct(
private IsProductProcessableCheckerInterface $isProductProcessableChecker,
private ProductGroupProcessor $productGroupProcessor,
private ManagerRegistry $managerRegistry,
private int $maxRetryCount,
private int $retryWaitTime,
private int $retryCount = 0,
) {
parent::__construct($entityManager);
}
Expand All @@ -52,64 +54,89 @@ public function __construct(
*/
public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInterface
{
if ($this->retryCount === $this->maxRetryCount) {
return $payload;
}

$this->logger->debug(self::class);
$this->type = $payload->getType();
$this->logger->notice(Messages::createOrUpdate($this->type));

$query = $this->getSelectStatement($payload);
/** @var Result $queryResult */
$queryResult = $query->executeQuery();

while ($results = $queryResult->fetchAllAssociative()) {
foreach ($results as $result) {
$isSuccess = false;

/** @var array $resource */
$resource = json_decode($result['values'], true);

$this->handleProductGroup($resource);

try {
$this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource));

$this->entityManager->beginTransaction();

if ($this->isProductProcessableChecker->check($resource)) {
$product = $this->process($resource);
$this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product));
do {
try {
$this->handleProductModel($resource);
$isSuccess = true;
} catch (ORMInvalidArgumentException $ormInvalidArgumentException) {
++$this->retryCount;
usleep($this->retryWaitTime);

$this->logger->error('Retrying import', [
'product' => $result,
'retry_count' => $this->retryCount,
'error' => $ormInvalidArgumentException->getMessage(),
]);

$this->entityManager = $this->getNewEntityManager();
} catch (\Throwable $throwable) {
++$this->retryCount;
usleep($this->retryWaitTime);

$this->logger->error('Error importing product', [
'message' => $throwable->getMessage(),
'trace' => $throwable->getTraceAsString(),
]);

$this->entityManager = $this->getNewEntityManager();
}
} while (false === $isSuccess && $this->retryCount < $this->maxRetryCount);

$this->entityManager->flush();
$this->entityManager->commit();

unset($resource, $product);
$this->removeEntry($payload, (int) $result['id']);
} catch (\Throwable $throwable) {
$this->entityManager->rollback();
$this->logger->warning($throwable->getMessage());
$this->removeEntry($payload, (int) $result['id']);
}
unset($resource);
$this->removeEntry($payload, (int) $result['id']);
$this->retryCount = 0;
}
}

return $payload;
}

private function handleProductModel(array $resource): void
{
$this->handleProductGroup($resource);
$this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource));

if (!$this->isProductProcessableChecker->check($resource)) {
return;
}

$product = $this->process($resource);
$this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product));
$this->entityManager->flush();
}

private function handleProductGroup(array $resource): void
{
try {
$this->entityManager->beginTransaction();

$this->productGroupProcessor->process($resource);

$this->entityManager->flush();
$this->entityManager->commit();
} catch (ORMInvalidArgumentException) {
if ($this->entityManager->getConnection()->isTransactionActive()) {
$this->entityManager->rollback();
}

} catch (ORMInvalidArgumentException $ormInvalidArgumentException) {
if (!$this->entityManager->isOpen()) {
$this->logger->warning('Recreating entity manager');
$this->entityManager = $this->getNewEntityManager();
}

++$this->retryCount;

throw $ormInvalidArgumentException;
}
}

Expand Down

0 comments on commit 5609a7e

Please sign in to comment.