From c8022beb11b573d730ea5f16f47a21f5a6f38af6 Mon Sep 17 00:00:00 2001 From: Michael Bromley Date: Fri, 2 Feb 2024 16:04:56 +0100 Subject: [PATCH] feat(core): Add cancellation handling to built-in jobs Relates to #1127, relates to #2650. This commit handles the case of a job being cancelled for those built-in jobs that can be long-running. --- .../indexer/indexer.controller.ts | 26 +++++++++++-------- .../indexer/search-index.service.ts | 9 ++++--- .../service/services/collection.service.ts | 4 +++ 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts b/packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts index 01882ce196..dc004d9b7a 100644 --- a/packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts +++ b/packages/core/src/plugin/default-search-plugin/indexer/indexer.controller.ts @@ -1,10 +1,9 @@ import { Inject, Injectable } from '@nestjs/common'; -import { LanguageCode } from '@vendure/common/lib/generated-types'; +import { JobState, LanguageCode } from '@vendure/common/lib/generated-types'; import { ID } from '@vendure/common/lib/shared-types'; import { unique } from '@vendure/common/lib/unique'; import { Observable } from 'rxjs'; import { In, IsNull } from 'typeorm'; -import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils'; import { RequestContext } from '../../../api/common/request-context'; import { RequestContextCacheService } from '../../../cache/request-context-cache.service'; @@ -15,8 +14,9 @@ import { ConfigService } from '../../../config/config.service'; import { Logger } from '../../../config/logger/vendure-logger'; import { TransactionalConnection } from '../../../connection/transactional-connection'; import { FacetValue } from '../../../entity/facet-value/facet-value.entity'; -import { Product } from '../../../entity/product/product.entity'; import { ProductVariant } from '../../../entity/product-variant/product-variant.entity'; +import { Product } from '../../../entity/product/product.entity'; +import { Job } from '../../../job-queue/index'; import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator'; import { ProductVariantService } from '../../../service/services/product-variant.service'; import { PLUGIN_INIT_OPTIONS } from '../constants'; @@ -24,12 +24,12 @@ import { SearchIndexItem } from '../entities/search-index-item.entity'; import { DefaultSearchPluginInitOptions, ProductChannelMessageData, - ReindexMessageData, ReindexMessageResponse, UpdateAssetMessageData, + UpdateIndexQueueJobData, UpdateProductMessageData, UpdateVariantMessageData, - UpdateVariantsByIdMessageData, + UpdateVariantsByIdJobData, VariantChannelMessageData, } from '../types'; @@ -62,7 +62,8 @@ export class IndexerController { @Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions, ) {} - reindex({ ctx: rawContext }: ReindexMessageData): Observable { + reindex(job: Job): Observable { + const { ctx: rawContext } = job.data; const ctx = MutableRequestContext.deserialize(rawContext); return asyncObservable(async observer => { const timeStart = Date.now(); @@ -77,8 +78,10 @@ export class IndexerController { Logger.verbose('Deleted existing index items', workerLoggerCtx); for (let i = 0; i < batches; i++) { + if (job.state === JobState.CANCELLED) { + throw new Error('reindex job was cancelled'); + } Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx); - const variants = await qb .take(BATCH_SIZE) .skip(i * BATCH_SIZE) @@ -100,10 +103,8 @@ export class IndexerController { }); } - updateVariantsById({ - ctx: rawContext, - ids, - }: UpdateVariantsByIdMessageData): Observable { + updateVariantsById(job: Job): Observable { + const { ctx: rawContext, ids } = job.data; const ctx = MutableRequestContext.deserialize(rawContext); return asyncObservable(async observer => { @@ -113,6 +114,9 @@ export class IndexerController { Logger.verbose(`Updating ${ids.length} variants...`); for (let i = 0; i < batches; i++) { + if (job.state === JobState.CANCELLED) { + throw new Error('updateVariantsById job was cancelled'); + } const begin = i * BATCH_SIZE; const end = begin + BATCH_SIZE; Logger.verbose(`Updating ids from index ${begin} to ${end}`); diff --git a/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts index f5e3859c04..06f80cde20 100644 --- a/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts +++ b/packages/core/src/plugin/default-search-plugin/indexer/search-index.service.ts @@ -11,7 +11,7 @@ import { ProductVariant } from '../../../entity/product-variant/product-variant. import { Job } from '../../../job-queue/job'; import { JobQueue } from '../../../job-queue/job-queue'; import { JobQueueService } from '../../../job-queue/job-queue.service'; -import { ReindexMessageResponse, UpdateIndexQueueJobData } from '../types'; +import { ReindexMessageResponse, UpdateIndexQueueJobData, UpdateVariantsByIdJobData } from '../types'; import { IndexerController } from './indexer.controller'; @@ -32,7 +32,7 @@ export class SearchIndexService implements OnApplicationBootstrap { switch (data.type) { case 'reindex': Logger.verbose('sending ReindexMessage'); - return this.jobWithProgress(job, this.indexerController.reindex(data)); + return this.jobWithProgress(job, this.indexerController.reindex(job)); case 'update-product': return this.indexerController.updateProduct(data); case 'update-variants': @@ -42,7 +42,10 @@ export class SearchIndexService implements OnApplicationBootstrap { case 'delete-variant': return this.indexerController.deleteVariant(data); case 'update-variants-by-id': - return this.jobWithProgress(job, this.indexerController.updateVariantsById(data)); + return this.jobWithProgress( + job, + this.indexerController.updateVariantsById(job as Job), + ); case 'update-asset': return this.indexerController.updateAsset(data); case 'delete-asset': diff --git a/packages/core/src/service/services/collection.service.ts b/packages/core/src/service/services/collection.service.ts index 3c9c153423..59af590d41 100644 --- a/packages/core/src/service/services/collection.service.ts +++ b/packages/core/src/service/services/collection.service.ts @@ -6,6 +6,7 @@ import { CreateCollectionInput, DeletionResponse, DeletionResult, + JobState, MoveCollectionInput, Permission, PreviewCollectionVariantsInput, @@ -114,6 +115,9 @@ export class CollectionService implements OnModuleInit { Logger.verbose(`Processing ${job.data.collectionIds.length} Collections`); let completed = 0; for (const collectionId of job.data.collectionIds) { + if (job.state === JobState.CANCELLED) { + throw new Error(`Job was cancelled`); + } let collection: Collection | undefined; try { collection = await this.connection.getEntityOrThrow(ctx, Collection, collectionId, {