Skip to content

Commit

Permalink
feat(core): Add cancellation handling to built-in jobs
Browse files Browse the repository at this point in the history
Relates to #1127, relates to #2650. This commit handles the case of a job
being cancelled for those built-in jobs that can be long-running.
  • Loading branch information
michaelbromley committed Feb 2, 2024
1 parent cba069b commit c8022be
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Inject, Injectable } from '@nestjs/common';
import { LanguageCode } from '@vendure/common/lib/generated-types';
import { JobState, LanguageCode } from '@vendure/common/lib/generated-types';
import { ID } from '@vendure/common/lib/shared-types';
import { unique } from '@vendure/common/lib/unique';
import { Observable } from 'rxjs';
import { In, IsNull } from 'typeorm';
import { FindOptionsUtils } from 'typeorm/find-options/FindOptionsUtils';

import { RequestContext } from '../../../api/common/request-context';
import { RequestContextCacheService } from '../../../cache/request-context-cache.service';
Expand All @@ -15,21 +14,22 @@ import { ConfigService } from '../../../config/config.service';
import { Logger } from '../../../config/logger/vendure-logger';
import { TransactionalConnection } from '../../../connection/transactional-connection';
import { FacetValue } from '../../../entity/facet-value/facet-value.entity';
import { Product } from '../../../entity/product/product.entity';
import { ProductVariant } from '../../../entity/product-variant/product-variant.entity';
import { Product } from '../../../entity/product/product.entity';
import { Job } from '../../../job-queue/index';
import { ProductPriceApplicator } from '../../../service/helpers/product-price-applicator/product-price-applicator';
import { ProductVariantService } from '../../../service/services/product-variant.service';
import { PLUGIN_INIT_OPTIONS } from '../constants';
import { SearchIndexItem } from '../entities/search-index-item.entity';
import {
DefaultSearchPluginInitOptions,
ProductChannelMessageData,
ReindexMessageData,
ReindexMessageResponse,
UpdateAssetMessageData,
UpdateIndexQueueJobData,
UpdateProductMessageData,
UpdateVariantMessageData,
UpdateVariantsByIdMessageData,
UpdateVariantsByIdJobData,
VariantChannelMessageData,
} from '../types';

Expand Down Expand Up @@ -62,7 +62,8 @@ export class IndexerController {
@Inject(PLUGIN_INIT_OPTIONS) private options: DefaultSearchPluginInitOptions,
) {}

reindex({ ctx: rawContext }: ReindexMessageData): Observable<ReindexMessageResponse> {
reindex(job: Job<UpdateIndexQueueJobData>): Observable<ReindexMessageResponse> {
const { ctx: rawContext } = job.data;
const ctx = MutableRequestContext.deserialize(rawContext);
return asyncObservable(async observer => {
const timeStart = Date.now();
Expand All @@ -77,8 +78,10 @@ export class IndexerController {
Logger.verbose('Deleted existing index items', workerLoggerCtx);

for (let i = 0; i < batches; i++) {
if (job.state === JobState.CANCELLED) {
throw new Error('reindex job was cancelled');
}
Logger.verbose(`Processing batch ${i + 1} of ${batches}`, workerLoggerCtx);

const variants = await qb
.take(BATCH_SIZE)
.skip(i * BATCH_SIZE)
Expand All @@ -100,10 +103,8 @@ export class IndexerController {
});
}

updateVariantsById({
ctx: rawContext,
ids,
}: UpdateVariantsByIdMessageData): Observable<ReindexMessageResponse> {
updateVariantsById(job: Job<UpdateVariantsByIdJobData>): Observable<ReindexMessageResponse> {
const { ctx: rawContext, ids } = job.data;
const ctx = MutableRequestContext.deserialize(rawContext);

return asyncObservable(async observer => {
Expand All @@ -113,6 +114,9 @@ export class IndexerController {
Logger.verbose(`Updating ${ids.length} variants...`);

for (let i = 0; i < batches; i++) {
if (job.state === JobState.CANCELLED) {
throw new Error('updateVariantsById job was cancelled');
}
const begin = i * BATCH_SIZE;
const end = begin + BATCH_SIZE;
Logger.verbose(`Updating ids from index ${begin} to ${end}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ProductVariant } from '../../../entity/product-variant/product-variant.
import { Job } from '../../../job-queue/job';
import { JobQueue } from '../../../job-queue/job-queue';
import { JobQueueService } from '../../../job-queue/job-queue.service';
import { ReindexMessageResponse, UpdateIndexQueueJobData } from '../types';
import { ReindexMessageResponse, UpdateIndexQueueJobData, UpdateVariantsByIdJobData } from '../types';

import { IndexerController } from './indexer.controller';

Expand All @@ -32,7 +32,7 @@ export class SearchIndexService implements OnApplicationBootstrap {
switch (data.type) {
case 'reindex':
Logger.verbose('sending ReindexMessage');
return this.jobWithProgress(job, this.indexerController.reindex(data));
return this.jobWithProgress(job, this.indexerController.reindex(job));
case 'update-product':
return this.indexerController.updateProduct(data);
case 'update-variants':
Expand All @@ -42,7 +42,10 @@ export class SearchIndexService implements OnApplicationBootstrap {
case 'delete-variant':
return this.indexerController.deleteVariant(data);
case 'update-variants-by-id':
return this.jobWithProgress(job, this.indexerController.updateVariantsById(data));
return this.jobWithProgress(
job,
this.indexerController.updateVariantsById(job as Job<UpdateVariantsByIdJobData>),
);
case 'update-asset':
return this.indexerController.updateAsset(data);
case 'delete-asset':
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/service/services/collection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
CreateCollectionInput,
DeletionResponse,
DeletionResult,
JobState,
MoveCollectionInput,
Permission,
PreviewCollectionVariantsInput,
Expand Down Expand Up @@ -114,6 +115,9 @@ export class CollectionService implements OnModuleInit {
Logger.verbose(`Processing ${job.data.collectionIds.length} Collections`);
let completed = 0;
for (const collectionId of job.data.collectionIds) {
if (job.state === JobState.CANCELLED) {
throw new Error(`Job was cancelled`);
}
let collection: Collection | undefined;
try {
collection = await this.connection.getEntityOrThrow(ctx, Collection, collectionId, {
Expand Down

0 comments on commit c8022be

Please sign in to comment.