Skip to content

Commit

Permalink
Add retry on product model import
Browse files Browse the repository at this point in the history
  • Loading branch information
TheGrimmChester committed Nov 14, 2023
1 parent 9b0add0 commit c31b25e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
1 change: 0 additions & 1 deletion src/Processor/ProductGroup/ProductGroupProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ private function createGroupForCodeAndFamily(
$productGroup->setModel($code);
$productGroup->setFamily($family);
$productGroup->setFamilyVariant($familyVariant);
$this->entityManager->persist($productGroup);

return;
}
Expand Down
6 changes: 6 additions & 0 deletions src/Resources/config/services.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
parameters:
env(SYNOLIA_AKENEO_MAX_RETRY_COUNT): 30
env(SYNOLIA_AKENEO_RETRY_WAIT_TIME): 5000

services:
_defaults:
autowire: true
autoconfigure: true
public: false
bind:
$projectDir: '%kernel.project_dir%'
$maxRetryCount: '%env(int:SYNOLIA_AKENEO_MAX_RETRY_COUNT)%'
$retryWaitTime: '%env(int:SYNOLIA_AKENEO_RETRY_WAIT_TIME)%'

Synolia\SyliusAkeneoPlugin\:
resource: '../../*'
Expand Down
67 changes: 44 additions & 23 deletions src/Task/ProductModel/BatchProductModelTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Synolia\SyliusAkeneoPlugin\Task\ProductModel;

use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Result;
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\ORMInvalidArgumentException;
use Doctrine\Persistence\ManagerRegistry;
Expand Down Expand Up @@ -41,6 +40,9 @@ public function __construct(
private IsProductProcessableCheckerInterface $isProductProcessableChecker,
private ProductGroupProcessor $productGroupProcessor,
private ManagerRegistry $managerRegistry,
private int $maxRetryCount,
private int $retryWaitTime,
private int $retryCount = 0,
) {
parent::__construct($entityManager);
}
Expand All @@ -57,34 +59,39 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte
$this->logger->notice(Messages::createOrUpdate($this->type));

$query = $this->getSelectStatement($payload);
/** @var Result $queryResult */
$queryResult = $query->executeQuery();

while ($results = $queryResult->fetchAll()) {
/** @var array $result */
foreach ($results as $result) {
/** @var array $resource */
$resource = json_decode($result['values'], true);

$this->handleProductGroup($resource);

try {
$this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource));
$this->handleProductModel($result);

$this->entityManager->beginTransaction();
unset($resource);
$this->removeEntry($payload, (int) $result['id']);
} catch (ORMInvalidArgumentException $ORMInvalidArgumentException) {
if ($this->retryCount === $this->maxRetryCount) {
$this->retryCount = 0;
unset($resource);
$this->removeEntry($payload, (int) $result['id']);

if ($this->isProductProcessableChecker->check($resource)) {
$product = $this->process($resource);
$this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product));
continue;
}

$this->entityManager->flush();
$this->entityManager->commit();
usleep($this->retryWaitTime);

unset($resource, $product);
$this->removeEntry($payload, (int) $result['id']);
$this->logger->error('Retrying import', [
'product' => $result,
'retry_count' => $this->retryCount,
'error' => $ORMInvalidArgumentException->getMessage(),
]);

return $this->__invoke($payload);
} catch (\Throwable $throwable) {
$this->entityManager->rollback();
$this->logger->warning($throwable->getMessage());
$this->logger->error($throwable->getMessage());
$this->removeEntry($payload, (int) $result['id']);
}
}
Expand All @@ -93,23 +100,37 @@ public function __invoke(PipelinePayloadInterface $payload): PipelinePayloadInte
return $payload;
}

private function handleProductModel(array $resource): void
{
$this->handleProductGroup($resource);
$this->dispatcher->dispatch(new BeforeProcessingProductEvent($resource));

if ($this->isProductProcessableChecker->check($resource)) {
$product = $this->process($resource);
$this->dispatcher->dispatch(new AfterProcessingProductEvent($resource, $product));
}

$this->entityManager->flush();
}

private function handleProductGroup(array $resource): void
{
try {
$this->entityManager->beginTransaction();

$this->productGroupProcessor->process($resource);

$this->entityManager->flush();
$this->entityManager->commit();
} catch (ORMInvalidArgumentException) {
if ($this->entityManager->getConnection()->isTransactionActive()) {
$this->entityManager->rollback();
}

$this->logger->info('Processed product group', [
'code' => $resource['parent'] ?? $resource['code'],
]);
} catch (ORMInvalidArgumentException $ORMInvalidArgumentException) {
if (!$this->entityManager->isOpen()) {
$this->logger->warning('Recreating entity manager');
$this->entityManager = $this->getNewEntityManager();
}

++$this->retryCount;

throw $ORMInvalidArgumentException;
}
}

Expand Down

0 comments on commit c31b25e

Please sign in to comment.