From 7590ceae7abe32a8824e4a265f95fef2f9a6665f Mon Sep 17 00:00:00 2001 From: Stanislav Mamontov <46541650+stansv@users.noreply.github.com> Date: Wed, 21 Aug 2019 12:15:41 +0300 Subject: [PATCH] feat: get rid of Job3 in favor of bullmq Job class --- src/classes/compat.ts | 907 +---------- src/test/compat/test_events.ts | 202 --- src/test/compat/test_job.ts | 953 ----------- src/test/compat/test_queue.ts | 2713 -------------------------------- src/test/compat/utils.ts | 56 - 5 files changed, 52 insertions(+), 4779 deletions(-) delete mode 100644 src/test/compat/test_events.ts delete mode 100644 src/test/compat/test_job.ts delete mode 100644 src/test/compat/test_queue.ts delete mode 100644 src/test/compat/utils.ts diff --git a/src/classes/compat.ts b/src/classes/compat.ts index a8c47dd69d..84500e0421 100644 --- a/src/classes/compat.ts +++ b/src/classes/compat.ts @@ -36,7 +36,6 @@ import { WorkerOptions, Processor, } from '@src/interfaces'; -import { JobJson } from '@src/classes'; import _ from 'lodash'; import url from 'url'; @@ -418,16 +417,16 @@ export class Queue3 extends EventEmitter { * If the queue is empty the job will be executed directly, * otherwise it will be placed in the queue and executed as soon as possible. */ - add(data: T, opts?: JobOptions3): Promise>; + add(data: T, opts?: JobOptions3): Promise; /** * Creates a new named job and adds it to the queue. * If the queue is empty the job will be executed directly, * otherwise it will be placed in the queue and executed as soon as possible. */ - add(name: string, data: T, opts?: JobOptions3): Promise>; + add(name: string, data: T, opts?: JobOptions3): Promise; - async add(arg1: any, arg2?: any, arg3?: any): Promise> { + async add(arg1: any, arg2?: any, arg3?: any): Promise { let name: string = Queue3.DEFAULT_JOB_NAME; let data: any; let opts: JobOptions3 = {}; @@ -451,14 +450,14 @@ export class Queue3 extends EventEmitter { Utils.convertToJobsOpts(opts), true, ); - return Utils.convertToJob3(result, this); + return result; } else { const result = await this.getQueue().append( name, data, Utils.convertToJobsOpts(opts), ); - return Utils.convertToJob3(result, this); + return result; } } @@ -542,49 +541,43 @@ export class Queue3 extends EventEmitter { * Returns a promise that will return the job instance associated with the jobId parameter. * If the specified job cannot be located, the promise callback parameter will be set to null. */ - async getJob(jobId: JobId3): Promise | null> { - const job = await this.getQueue().getJob(Utils.convertToJobId(jobId)); - return Utils.convertToJob3(job, this); + getJob(jobId: string): Promise { + return this.getQueue().getJob(jobId); } /** * Returns a promise that will return an array with the waiting jobs between start and end. */ - async getWaiting(start = 0, end = -1): Promise>> { - const result: Job[] = await this.getQueue().getWaiting(start, end); - return result.map(job => Utils.convertToJob3(job, this)); + getWaiting(start = 0, end = -1): Promise> { + return this.getQueue().getWaiting(start, end); } /** * Returns a promise that will return an array with the active jobs between start and end. */ - async getActive(start = 0, end = -1): Promise>> { - const result: Job[] = await this.getQueue().getActive(start, end); - return result.map(job => Utils.convertToJob3(job, this)); + getActive(start = 0, end = -1): Promise> { + return this.getQueue().getActive(start, end); } /** * Returns a promise that will return an array with the delayed jobs between start and end. */ - async getDelayed(start = 0, end = -1): Promise>> { - const result: Job[] = await this.getQueue().getDelayed(start, end); - return result.map(job => Utils.convertToJob3(job, this)); + getDelayed(start = 0, end = -1): Promise> { + return this.getQueue().getDelayed(start, end); } /** * Returns a promise that will return an array with the completed jobs between start and end. */ - async getCompleted(start = 0, end = -1): Promise>> { - const result: Job[] = await this.getQueue().getCompleted(start, end); - return result.map(job => Utils.convertToJob3(job, this)); + getCompleted(start = 0, end = -1): Promise> { + return this.getQueue().getCompleted(start, end); } /** * Returns a promise that will return an array with the failed jobs between start and end. */ - async getFailed(start = 0, end = -1): Promise>> { - const result: Job[] = await this.getQueue().getFailed(start, end); - return result.map(job => Utils.convertToJob3(job, this)); + async getFailed(start = 0, end = -1): Promise> { + return this.getQueue().getFailed(start, end); } /** @@ -607,14 +600,13 @@ export class Queue3 extends EventEmitter { data: any, opts: JobOptions3, skipCheckExists?: boolean, - ): Promise> { - const result = await this.getQueue().repeat.addNextRepeatableJob( + ): Promise { + return this.getQueue().repeat.addNextRepeatableJob( name || Queue3.DEFAULT_JOB_NAME, data, Utils.convertToJobsOpts(opts), skipCheckExists, ); - return Utils.convertToJob3(result, this); } /** @@ -622,7 +614,7 @@ export class Queue3 extends EventEmitter { * used for the job when it was added. */ removeRepeatable( - repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: JobId3 }, + repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }, ): Promise; /** @@ -633,12 +625,12 @@ export class Queue3 extends EventEmitter { */ removeRepeatable( name: string, - repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: JobId3 }, + repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }, ): Promise; async removeRepeatable(arg1: any, arg2?: any): Promise { let name: string = Queue3.DEFAULT_JOB_NAME; - let repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: JobId3 }; + let repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }; if (typeof arg1 === 'string') { name = arg1; @@ -649,7 +641,7 @@ export class Queue3 extends EventEmitter { return this.getQueue().repeat.removeRepeatable( name, Utils.convertToRepeatOpts(repeat), - Utils.convertToJobId(repeat.jobId), + repeat.jobId, ); } @@ -684,22 +676,18 @@ export class Queue3 extends EventEmitter { * Returns a promise that will return an array of job instances of the given types. * Optional parameters for range and ordering are provided. */ - async getJobs( + getJobs( types: string[], start = 0, end = -1, asc = false, - ): Promise>> { - const result: Job[] = await this.getQueue().getJobs(types, start, end, asc); - return result.map(job => Utils.convertToJob3(job, this)); + ): Promise> { + return this.getQueue().getJobs(types, start, end, asc); } - async getNextJob() { + async getNextJob(): Promise { await this.getWorker().waitUntilReady(); - const result: Job = await this.worker.getNextJob(); - if (result) { - return Utils.convertToJob3(result, this); - } + return this.worker.getNextJob(); } /** @@ -707,21 +695,20 @@ export class Queue3 extends EventEmitter { * value is the total amount of logs, useful for implementing pagination. */ getJobLogs( - jobId: JobId3, + jobId: string, start = 0, end = -1, ): Promise<{ logs: string[]; count: number }> { - return this.getQueue().getJobLogs(Utils.convertToJobId(jobId), start, end); + return this.getQueue().getJobLogs(jobId, start, end); } /** * Returns a promise that resolves with the job counts for the given queue. */ - async getJobCounts(types?: string[] | string): Promise { - const result = await this.getQueue().getJobCounts( - ...Utils.parseTypeArg(types), - ); - return Utils.convertToJobCounts3(result); + getJobCounts( + types?: string[] | string, + ): Promise<{ [index: string]: number }> { + return this.getQueue().getJobCounts(...Utils.parseTypeArg(types)); } /** @@ -792,7 +779,7 @@ export class Queue3 extends EventEmitter { grace: number, status: JobStatusClean3 = 'completed', limit = -1, - ): Promise>> { + ): Promise> { return this.getQueue().clean(grace, status, limit); } @@ -992,7 +979,7 @@ export class Queue3 extends EventEmitter { return (this.getQueue() as any).parseClientList(list); } - retryJob(job: Job3): Promise { + retryJob(job: Job): Promise { return job.retry(); } @@ -1070,13 +1057,13 @@ export class Queue3 extends EventEmitter { if (once) { this.onWorkerInit(worker => { worker.once('active', (job, jobPromise, prev) => { - listener(job, Utils.getFakeJobPromise3(), prev); + listener(job, jobPromise, prev); }); }); } else { this.onWorkerInit(worker => { worker.on('active', (job, jobPromise, prev) => { - listener(job, Utils.getFakeJobPromise3(), prev); + listener(job, jobPromise, prev); }); }); } @@ -1316,12 +1303,10 @@ export class Queue3 extends EventEmitter { } resolve(res); }; - handler.apply(null, [Utils.convertToJob3(job, queue), done]); + handler.apply(null, [job, done]); } else { try { - return resolve( - handler.apply(null, [Utils.convertToJob3(job, queue)]), - ); + return resolve(handler.apply(null, [job])); } catch (err) { return reject(err); } @@ -1331,548 +1316,6 @@ export class Queue3 extends EventEmitter { } } -export class Job3 { - id: JobId3; - - /** - * The custom data passed when the job was created - */ - data: T; - - /** - * Options of the job - */ - opts: JobOptions3; - - /** - * How many attempts where made to run this job - */ - attemptsMade: number; - - /** - * When this job was started (unix milliseconds) - */ - processedOn?: number; - - /** - * When this job was completed (unix milliseconds) - */ - finishedOn?: number; - - /** - * Which queue this job was part of - */ - queue: Queue3; - - timestamp: number; - - /** - * The named processor name - */ - name: string; - - /** - * The stacktrace for any errors - */ - stacktrace: string[]; - - returnvalue: any; - - toKey: (type: string) => string; - - private _progress: any; - private delay: number; - private failedReason: string; - - private job: Job; - - constructor(queue: Queue3, data: any, opts?: JobOptions3); - constructor(queue: Queue3, name: string, data: any, opts?: JobOptions3); - - constructor( - queue: Queue3 | InternalJobAndQueueWrapper, - arg2: any, - arg3?: any, - arg4?: any, - ) { - Object.defineProperties(this, { - job: { - enumerable: false, - writable: true, - }, - id: { - get: () => { - return Utils.convertToJobId3(this.job.id); - }, - set: val => { - this.job.id = Utils.convertToJobId(val); - }, - }, - name: { - get: () => { - return this.job.name; - }, - set: val => { - this.job.name = val; - }, - }, - data: { - get: () => { - return this.job.data; - }, - set: val => { - this.job.data = val; - }, - }, - opts: { - get: () => { - return Utils.convertToJobOptions3(this.job.opts); - }, - set: val => { - this.job.opts = Utils.convertToJobsOpts(val); - }, - }, - _progress: { - get: () => { - return this.job.progress; - }, - set: val => { - this.job.progress = val; - }, - }, - delay: { - get: () => { - return this.job.opts && this.job.opts.delay; - }, - set: val => { - this.job.opts = { ...this.job.opts, delay: val }; - }, - }, - timestamp: { - get: () => { - return this.job.timestamp; - }, - set: val => { - this.job.timestamp = val; - }, - }, - finishedOn: { - get: () => { - return (this.job as any).finishedOn; - }, - set: val => { - (this.job as any).finishedOn = val; - }, - }, - processedOn: { - get: () => { - return (this.job as any).processedOn; - }, - set: val => { - (this.job as any).processedOn = val; - }, - }, - failedReason: { - get: () => { - return (this.job as any).failedReason; - }, - set: val => { - (this.job as any).failedReason = val; - }, - }, - attemptsMade: { - get: () => { - return (this.job as any).attemptsMade; - }, - set: val => { - (this.job as any).attemptsMade = val; - }, - }, - stacktrace: { - get: () => { - return this.job.stacktrace; - }, - set: val => { - this.job.stacktrace = val; - }, - }, - returnvalue: { - get: () => { - return this.job.returnvalue; - }, - set: val => { - this.job.returnvalue = val; - }, - }, - toKey: { - enumerable: false, - get: () => { - return (this.job as any).toKey; - }, - }, - }); - - let name: string = Queue3.DEFAULT_JOB_NAME; - let data: any; - let opts: JobOptions3; - - if (typeof arg2 !== 'string') { - // formally we cannot resolve args when data is string - data = arg2; - opts = arg3; - } else { - name = arg2; - data = arg3; - opts = arg4; - } - - const wrapper = queue as InternalJobAndQueueWrapper; - if (wrapper.job && wrapper.queue) { - // wrapper used - this.queue = wrapper.queue; - this.job = wrapper.job; - } else { - this.queue = queue as Queue3; - this.job = new Job( - (queue as any).getQueue(), - name, - data, - Utils.convertToJobsOpts(opts), - ); - } - - this.stacktrace = []; - } - - /** - * Report progress on a job - */ - progress(value?: any): Promise | any { - if (_.isUndefined(value)) { - return this._progress; - } - return this.job.updateProgress(value); - } - - /** - * Logs one row of log data. - * - * @param row String with log data to be logged. - */ - log(row: string): Promise { - return this.job.log(row); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is completed - */ - isCompleted(): Promise { - return this.job.isCompleted(); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is failed - */ - isFailed(): Promise { - return this.job.isFailed(); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is delayed - */ - isDelayed(): Promise { - return this.job.isDelayed(); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is active - */ - isActive(): Promise { - return this.job.isActive(); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is wait - */ - isWaiting(): Promise { - return this.job.isWaiting(); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is paused - */ - isPaused(): Promise { - throw new Error('Not supported'); - } - - /** - * Returns a promise resolving to a boolean which, if true, current job's state is stuck - */ - isStuck(): Promise { - throw new Error('Not supported'); - } - - /** - * Returns a promise resolving to the current job's status. - * Please take note that the implementation of this method is not very efficient, nor is - * it atomic. If your queue does have a very large quantity of jobs, you may want to - * avoid using this method. - */ - async getState(): Promise { - const result = await this.job.getState(); - switch (result) { - case 'completed': - return 'completed'; - case 'failed': - return 'failed'; - case 'delayed': - return 'delayed'; - case 'active': - return 'active'; - case 'waiting': - return 'waiting'; - } - } - - /** - * Update a specific job's data. Promise resolves when the job has been updated. - */ - update(data: any): Promise { - return this.job.update(data); - } - - /** - * Removes a job from the queue and from any lists it may be included in. - * The returned promise resolves when the job has been removed. - */ - remove(): Promise { - return this.job.remove(); - } - - /** - * Re-run a job that has failed. The returned promise resolves when the job - * has been scheduled for retry. - */ - retry(): Promise { - return this.job.retry(); - } - - /** - * Ensure this job is never ran again even if attemptsMade is less than job.attempts. - */ - discard(): Promise { - throw new Error('Not supported'); - } - - /** - * Returns a promise that resolves to the returned data when the job has been finished. - * TODO: Add a watchdog to check if the job has finished periodically. - * since pubsub does not give any guarantees. - */ - finished(watchdog = 5000, ttl?: number): Promise { - return this.job.waitUntilFinished( - (this.queue as any).getQueueEvents(), - watchdog, - ttl, - ); - } - - /** - * Moves a job to the `completed` queue. Pulls a job from 'waiting' to 'active' - * and returns a tuple containing the next jobs data and id. If no job is in the `waiting` queue, returns null. - */ - async moveToCompleted( - returnValue?: string, - ignoreLock?: boolean, - ): Promise<[SerializedJob3, JobId3] | null> { - if (ignoreLock) { - console.warn('ignoreLock is not supported'); - } - const result = await this.job.moveToCompleted(returnValue); - if (result) { - return [ - Utils.convertToSerializedJob3(result[0]), - Utils.convertToJobId3(result[1]), - ]; - } - } - - /** - * Moves a job to the `failed` queue. Pulls a job from 'waiting' to 'active' - * and returns a tuple containing the next jobs data and id. If no job is in the `waiting` queue, returns null. - */ - async moveToFailed( - errorInfo: any, - ignoreLock?: boolean, - ): Promise<[any, JobId3] | null> { - if (ignoreLock) { - console.warn('ignoreLock is not supported'); - } - await this.job.moveToFailed(errorInfo); - return null; - } - - moveToDelayed(timestamp?: number, ignoreLock = false): Promise { - if (ignoreLock) { - console.warn('ignoreLock is not supported'); - } - return this.job.moveToDelayed(timestamp); - } - - /** - * Promotes a job that is currently "delayed" to the "waiting" state and executed as soon as possible. - */ - promote(): Promise { - return this.job.promote(); - } - - /** - * The lock id of the job - */ - lockKey(): string { - throw new Error('Not supported'); - } - - /** - * Releases the lock on the job. Only locks owned by the queue instance can be released. - */ - releaseLock(): Promise { - throw new Error('Not supported'); - } - - /** - * Takes a lock for this job so that no other queue worker can process it at the same time. - */ - takeLock(): Promise { - throw new Error('Not supported'); - } - - /** - * Get job properties as Json Object - */ - toJSON(): JobJson3 { - const result = { - id: this.id, - name: this.name, - data: this.data, - opts: { ...this.opts }, - progress: this._progress, - delay: this.delay, // Move to opts - timestamp: this.timestamp, - attemptsMade: this.attemptsMade, - failedReason: this.failedReason, - stacktrace: this.stacktrace || null, - returnvalue: this.returnvalue || null, - finishedOn: this.finishedOn || null, - processedOn: this.processedOn || null, - }; - if (!result.data) { - (result as any).data = {}; - } - return result; - } - - private toData(): SerializedJob3 { - const target: SerializedJob3 = { - id: undefined, - name: undefined, - data: undefined, - opts: undefined, - progress: undefined, - delay: undefined, - timestamp: undefined, - attemptsMade: undefined, - failedReason: undefined, - stacktrace: undefined, - returnvalue: undefined, - finishedOn: undefined, - processedOn: undefined, - }; - const json = this.toJSON(); - target.id = undefined; - target.name = undefined; - target.data = JSON.stringify(json.data); - target.opts = JSON.stringify(json.opts); - target.progress = undefined; - target.delay = undefined; - target.timestamp = undefined; - target.attemptsMade = undefined; - target.failedReason = JSON.stringify(json.failedReason); - target.stacktrace = JSON.stringify(json.stacktrace); - target.returnvalue = JSON.stringify(json.returnvalue); - target.finishedOn = undefined; - target.processedOn = undefined; - return target; - } - - static async create(queue: Queue3, arg2?: any, arg3?: any, arg4?: any) { - await queue.isReady(); - return queue.add(arg2, arg3, arg4); - } - - static async fromId(queue: Queue3, jobId: JobId3): Promise { - // jobId can be undefined if moveJob returns undefined - if (!jobId) { - return Promise.resolve(undefined); - } - - const serializedJob: SerializedJob3 = await queue.client.hgetall( - queue.toKey('' + jobId), - ); - if (serializedJob && Object.keys(serializedJob).length > 0) { - return Job3.fromJSON(queue, serializedJob, jobId); - } - return null; - } - - private static fromJSON( - queue: Queue3, - json: SerializedJob3, - jobId?: JobId3, - ) { - const data = JSON.parse(json.data || '{}'); - const opts = JSON.parse(json.opts || '{}'); - - const job = new Job3( - queue, - json.name || Queue3.DEFAULT_JOB_NAME, - data, - opts, - ); - - job.id = json.id || jobId; - job._progress = JSON.parse(json.progress || '0'); - job.delay = parseInt(json.delay); - job.timestamp = parseInt(json.timestamp); - if (json.finishedOn) { - job.finishedOn = parseInt(json.finishedOn); - } - - if (json.processedOn) { - job.processedOn = parseInt(json.processedOn); - } - - job.failedReason = json.failedReason; - job.attemptsMade = parseInt(json.attemptsMade) || 0; - - job.stacktrace = []; - try { - const parsed = JSON.parse(json.stacktrace); - if (Array.isArray(parsed)) { - job.stacktrace = parsed; - } - } catch (e) {} - - if (typeof json.returnvalue === 'string') { - try { - job.returnvalue = JSON.parse(json.returnvalue); - } catch (e) {} - } - - return job; - } -} - export interface RateLimiter3 { /** Max numbers of jobs processed */ max: number; @@ -1956,13 +1399,11 @@ export interface AdvancedSettings3 { export type DoneCallback3 = (error?: Error | null, value?: any) => void; -export type JobId3 = number | string; - export type ProcessCallbackFunction3 = ( - job: Job3, + job: Job, done: DoneCallback3, ) => void; -export type ProcessPromiseFunction3 = (job: Job3) => Promise; +export type ProcessPromiseFunction3 = (job: Job) => Promise; export type JobStatus3 = | 'completed' @@ -1978,38 +1419,6 @@ export type JobStatusClean3 = | 'paused' | 'failed'; -export interface SerializedJob3 { - id: JobId3; - name: string; - data: string; - opts: string; - progress: any; - delay: string; - timestamp: string; - attemptsMade: string; - failedReason: any; - stacktrace: string; - returnvalue: any; - finishedOn: string; - processedOn: string; -} - -export interface JobJson3 { - id: JobId3; - name: string; - data: T; - opts: JobOptions3; - progress: any; - delay?: number; - timestamp: number; - attemptsMade: number; - failedReason: any; - stacktrace: string[] | null; - returnvalue: any; - finishedOn: number; - processedOn: number; -} - export interface BackoffOptions3 { /** * Backoff type, which can be either `fixed` or `exponential` @@ -2104,7 +1513,7 @@ export interface JobOptions3 { * jobId is unique. If you attempt to add a job with an id that * already exists, it will not be added. */ - jobId?: JobId3; + jobId?: string; /** * A boolean which, if true, removes the job when it successfully completes. @@ -2156,40 +1565,26 @@ export interface JobPromise3 { } export type ActiveEventCallback3 = ( - job: Job3, + job: Job, jobPromise?: JobPromise3, ) => void; -export type StalledEventCallback3 = (job: Job3) => void; +export type StalledEventCallback3 = (job: Job) => void; -export type ProgressEventCallback3 = ( - job: Job3, - progress: any, -) => void; +export type ProgressEventCallback3 = (job: Job, progress: any) => void; -export type CompletedEventCallback3 = ( - job: Job3, - result: any, -) => void; +export type CompletedEventCallback3 = (job: Job, result: any) => void; -export type FailedEventCallback3 = ( - job: Job3, - error: Error, -) => void; +export type FailedEventCallback3 = (job: Job, error: Error) => void; export type CleanedEventCallback3 = ( - jobs: Array>, + jobs: Array, status: JobStatusClean3, ) => void; -export type RemovedEventCallback3 = (job: Job3) => void; - -export type WaitingEventCallback3 = (jobId: JobId3) => void; +export type RemovedEventCallback3 = (job: Job) => void; -interface InternalJobAndQueueWrapper { - job: Job; - queue: Queue3; -} +export type WaitingEventCallback3 = (jobId: string) => void; class Utils { static redisOptsFromUrl(urlString: string) { @@ -2210,68 +1605,6 @@ class Utils { return redisOpts; } - static convertToJobId(id: JobId3): string { - if (id !== undefined) { - if (typeof id === 'string') { - return id; - } else { - return id.toString(); - } - } - } - - static convertToJobId3(id: string): JobId3 { - if (id !== undefined) { - return id; - } - } - - static convertToJob3(source: Job, queue: Queue3): Job3 { - if (source) { - const wrapper = { - queue, - job: source, - }; - - return new Job3( - wrapper as any, - source.name, - source.data, - Utils.convertToJobOptions3(source.opts), - ); - } - } - - static convertToJobOptions3(source: JobsOpts): JobOptions3 { - if (!source) { - return; - } - - const target: JobOptions3 = {}; - - (target as any).timestamp = source.timestamp; - target.priority = source.priority; - target.delay = source.delay; - target.attempts = source.attempts; - target.repeat = Utils.convertToRepeatOptions3(source.repeat); - - if (source.backoff !== undefined) { - if (typeof source.backoff === 'number') { - target.backoff = source.backoff; - } else { - target.backoff = Utils.convertToBackoffOptions3(source.backoff); - } - } - - target.lifo = source.lifo; - target.timeout = source.timeout; - target.jobId = source.jobId; - target.removeOnComplete = source.removeOnComplete; - target.removeOnFail = source.removeOnFail; - target.stackTraceLimit = source.stackTraceLimit; - return target; - } - static convertToQueueBaseOptions(source: QueueOptions3): QueueBaseOptions { if (!source) { return; @@ -2365,7 +1698,7 @@ class Utils { target.timeout = source.timeout; if (source.jobId !== undefined) { - target.jobId = Utils.convertToJobId(source.jobId); + target.jobId = source.jobId; } target.removeOnComplete = source.removeOnComplete; @@ -2395,31 +1728,6 @@ class Utils { return target; } - static convertToRepeatOptions3( - source: RepeatOpts, - ): CronRepeatOptions3 | EveryRepeatOptions3 { - if (!source) { - return; - } - - if (source.cron) { - const target: CronRepeatOptions3 = { cron: undefined }; - target.cron = (source as CronRepeatOptions3).cron; - target.tz = (source as CronRepeatOptions3).tz; - target.startDate = (source as CronRepeatOptions3).startDate; - target.endDate = (source as CronRepeatOptions3).endDate; - target.limit = (source as EveryRepeatOptions3).limit; - return target; - } else { - const target: EveryRepeatOptions3 = { every: undefined }; - target.tz = (source as CronRepeatOptions3).tz; - target.endDate = (source as CronRepeatOptions3).endDate; - target.limit = (source as EveryRepeatOptions3).limit; - target.every = (source as EveryRepeatOptions3).every; - return target; - } - } - static convertToBackoffOpts(source: BackoffOptions3): BackoffOpts { if (!source) { return; @@ -2433,19 +1741,6 @@ class Utils { return target; } - static convertToBackoffOptions3(source: BackoffOpts): BackoffOptions3 { - if (!source) { - return; - } - - const target: BackoffOptions3 = { type: undefined }; - - target.type = source.type; - target.delay = source.delay; - - return target; - } - static convertToWorkerOptions(source: QueueOptions3): WorkerOptions { if (!source) { return; @@ -2503,86 +1798,6 @@ class Utils { return target; } - static convertToJobCounts3(source: { [key: string]: number }): JobCounts3 { - if (source) { - const target: JobCounts3 = { - active: undefined, - completed: undefined, - failed: undefined, - delayed: undefined, - waiting: undefined, - }; - Object.keys(source).forEach(key => { - if (typeof source[key] === 'number') { - switch (key) { - case 'active': - target.active = source[key]; - break; - case 'completed': - target.completed = source[key]; - break; - case 'failed': - target.failed = source[key]; - break; - case 'delayed': - target.delayed = source[key]; - break; - case 'waiting': - target.waiting = source[key]; - break; - } - } - }); - return target; - } - } - - static convertToSerializedJob3(source: JobJson): SerializedJob3 { - if (source) { - const target: SerializedJob3 = { - id: undefined, - name: undefined, - data: undefined, - opts: undefined, - progress: undefined, - delay: undefined, - timestamp: undefined, - attemptsMade: undefined, - failedReason: undefined, - stacktrace: undefined, - returnvalue: undefined, - finishedOn: undefined, - processedOn: undefined, - }; - target.id = source.id; - target.name = source.name; - target.data = source.data; - target.opts = source.opts; - target.progress = source.progress; - if (source.opts) { - try { - target.delay = JSON.parse(source.opts).delay; - } catch (e) {} - } - if (source.timestamp !== undefined) { - target.timestamp = source.timestamp.toString(); - } - if (source.attemptsMade !== undefined) { - target.attemptsMade = source.attemptsMade.toString(); - } - target.failedReason = source.failedReason; - target.stacktrace = source.stacktrace; - target.returnvalue = source.returnvalue; - if (source.finishedOn !== undefined) { - target.finishedOn = source.finishedOn.toString(); - } - if (source.processedOn !== undefined) { - target.processedOn = source.processedOn.toString(); - } - return target; - } - } - static adaptToCreateClient( createClient: ( type: 'client' | 'subscriber' | 'bclient', @@ -2606,24 +1821,6 @@ class Utils { }; } - static getFakeJobPromise3() { - const msg = 'jobPromise is not supported'; - return { - then: () => { - console.warn(msg + ' (then() call is a no-op)'); - }, - catch: () => { - console.warn(msg + ' (catch() call is a no-op)'); - }, - finally: () => { - console.warn(msg + ' (finally() call is a no-op)'); - }, - cancel: () => { - console.warn(msg + ' (cancel() call is a no-op)'); - }, - }; - } - static parseTypeArg(args: string[] | string): string[] { const types = _.chain([]) .concat(args) diff --git a/src/test/compat/test_events.ts b/src/test/compat/test_events.ts deleted file mode 100644 index 1eb9508f4f..0000000000 --- a/src/test/compat/test_events.ts +++ /dev/null @@ -1,202 +0,0 @@ -import { - Queue3 as Queue, - Job3 as Job, - JobId3 as JobId, - JobStatus3 as JobStatus, - QueueOptions3 as QueueOptions, - JobOptions3 as JobOptions, -} from '@src/classes/compat'; -import _ from 'lodash'; -import IORedis from 'ioredis'; -import { expect } from 'chai'; -import sinon from 'sinon'; -import { v4 as uuid } from 'node-uuid'; -import * as utils from './utils'; -import { Scripts } from '@src/classes/scripts'; - -describe('events', () => { - let queue: Queue; - - beforeEach(() => { - const client = new IORedis(); - return client.flushdb().then(() => { - queue = utils.buildQueue('test events', { - settings: { - stalledInterval: 100, - lockDuration: 50, - }, - }); - return queue; - }); - }); - - afterEach(() => { - return queue.close(); - }); - - // TODO - it('should emit waiting when a job has been added', done => { - queue.on('waiting', () => { - done(); - }); - - queue.add({ foo: 'bar' }); - }); - - // TODO - // it('should emit global:waiting when a job has been added', done => { - // queue.on('global:waiting', () => { - // done(); - // }); - // - // queue.add({ foo: 'bar' }); - // }); - - // TODO waiting for moveStalledJobsToWait - // it('should emit stalled when a job has been stalled', done => { - // queue.on('completed', (/*job*/) => { - // done(new Error('should not have completed')); - // }); - // - // queue.process((/*job*/) => { - // return utils.sleep(250); - // }); - // - // queue.add({ foo: 'bar' }); - // - // const queue2 = utils.buildQueue('test events', { - // settings: { - // stalledInterval: 100 - // } - // }); - // - // queue2.on('stalled', (/*job*/) => { - // queue2.close().then(() => { done(); }); - // }); - // - // queue.on('active', () => { - // queue2.startMoveUnlockedJobsToWait(); - // queue.close(); - // }); - // }); - - // TODO waiting for moveStalledJobsToWait - // it('should emit global:stalled when a job has been stalled', done => { - // queue.on('completed', (/*job*/) => { - // done(new Error('should not have completed')); - // }); - // - // queue.process((/*job*/) => { - // return utils.sleep(250); - // }); - // - // queue.add({ foo: 'bar' }); - // - // const queue2 = utils.buildQueue('test events', { - // settings: { - // stalledInterval: 100 - // } - // }); - // - // queue2.on('global:stalled', (/*job*/) => { - // queue2.close().then(() => { done(); }); - // }); - // - // queue.on('active', () => { - // queue2.startMoveUnlockedJobsToWait(); - // queue.close(); - // }); - // }); - - // TODO - // it('emits waiting event when a job is added', done => { - // const queue = utils.buildQueue(); - // - // queue.once('waiting', jobId => { - // Job.fromId(queue, jobId).then(job => { - // expect(job.data.foo).to.be.equal('bar'); - // queue.close().then(() => { done(); }); - // }); - // }); - // queue.add({ foo: 'bar' }); - // }); - - // TODO - // it('emits drained and global:drained event when all jobs have been processed', done => { - // const queue = utils.buildQueue('event drained', { - // settings: { drainDelay: 1 } - // }); - // - // queue.process((job, done) => { - // done(); - // }); - // - // const drainedCallback = _.after(2, () => { - // queue.getJobCountByTypes('completed').then(completed => { - // expect(completed).to.be.equal(2); - // return queue.close().then(() => { done(); }); - // }); - // }); - // - // queue.once('drained', drainedCallback); - // queue.once('global:drained', drainedCallback); - // - // queue.add({ foo: 'bar' }); - // queue.add({ foo: 'baz' }); - // }); - - // TODO - // it('should emit an event when a new message is added to the queue', done => { - // const client = new IORedis(6379, '127.0.0.1', {}); - // client.select(0); - // const queue = new Queue('test pub sub'); - // queue.on('waiting', jobId => { - // expect(jobId).to.be.eql("1"); - // client.quit(); - // done(); - // }); - // queue.add({ test: 'stuff' }); - // }); - - it('should emit an event when a job becomes active', done => { - const queue = utils.buildQueue(); - queue.process((job, jobDone) => { - jobDone(); - }); - queue.add({}); - queue.once('active', () => { - queue.once('completed', () => { - queue.close().then(() => { - done(); - }); - }); - }); - }); - - // TODO - // it('should listen to global events', done => { - // const queue1 = utils.buildQueue(); - // const queue2 = utils.buildQueue(); - // queue1.process((job, jobDone) => { - // jobDone(); - // }); - // - // let state: JobStatus; - // queue2.on('global:waiting', () => { - // expect(state).to.be.undefined; - // state = 'waiting'; - // }); - // queue2.once('global:active', () => { - // expect(state).to.be.equal('waiting'); - // state = 'active'; - // }); - // queue2.once('global:completed', () => { - // expect(state).to.be.equal('active'); - // queue1.close().then(() => { - // queue2.close().then(() => { done(); }); - // }); - // }); - // - // queue1.add({}); - // }); -}); diff --git a/src/test/compat/test_job.ts b/src/test/compat/test_job.ts deleted file mode 100644 index f81aa86c16..0000000000 --- a/src/test/compat/test_job.ts +++ /dev/null @@ -1,953 +0,0 @@ -import { - Queue3 as Queue, - Job3 as Job, - JobId3 as JobId, - JobStatus3 as JobStatus, - QueueOptions3 as QueueOptions, - JobOptions3 as JobOptions, -} from '@src/classes/compat'; -import _ from 'lodash'; -import IORedis from 'ioredis'; -import { expect } from 'chai'; -import sinon from 'sinon'; -import { v4 as uuid } from 'node-uuid'; -import * as utils from './utils'; -import { Scripts } from '@src/classes/scripts'; - -describe('Job', () => { - let queue: Queue; - let client: IORedis.Redis; - - beforeEach(() => { - client = new IORedis(); - return client.flushdb(); - }); - - beforeEach(() => { - queue = new Queue('test-' + uuid(), { - redis: { port: 6379, host: '127.0.0.1' }, - }); - }); - - afterEach(function() { - this.timeout( - queue['settings'].stalledInterval * - (1 + queue['settings'].maxStalledCount), - ); - return queue.close().then(() => { - return client.quit(); - }); - }); - - describe('.create', () => { - let job: Job; - let data: any; - let opts: JobOptions; - - beforeEach(() => { - data = { foo: 'bar' }; - opts = { testOpt: 'enabled' } as any; - - return Job.create(queue, data, opts).then(createdJob => { - job = createdJob; - }); - }); - - it('returns a promise for the job', () => { - expect(job).to.have.property('id'); - expect(job).to.have.property('data'); - }); - - it('should not modify input options', () => { - expect(opts).not.to.have.property('jobId'); - }); - - // TODO arbitrary job opts not converted - // it('saves the job in redis', () => { - // return Job.fromId(queue, job.id).then(storedJob => { - // expect(storedJob).to.have.property('id'); - // expect(storedJob).to.have.property('data'); - // - // expect(storedJob.data.foo).to.be.equal('bar'); - // //expect(storedJob.opts).to.be.a(Object as any); - // expect((storedJob.opts as any).testOpt).to.be.equal('enabled'); - // }); - // }); - - // TODO - // it('should use the custom jobId if one is provided', () => { - // const customJobId = 'customjob'; - // return Job.create(queue, data, { jobId: customJobId }).then( - // createdJob => { - // expect(createdJob.id).to.be.equal(customJobId); - // } - // ); - // }); - - // it('should process jobs with custom jobIds', done => { - // const customJobId = 'customjob'; - // queue.process(() => { - // return Promise.resolve(); - // }); - // - // queue.add({ foo: 'bar' }, { jobId: customJobId }); - // - // queue.on('completed', job => { - // if (job.id == customJobId) { - // done(); - // } - // }); - // }); - }); - - describe('.add jobs on priority queues', () => { - it('add 4 jobs with different priorities', () => { - return queue - .add({ foo: 'bar' }, { jobId: '1', priority: 3 }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '2', priority: 3 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '3', priority: 2 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '4', priority: 1 }); - }) - .then(() => { - return queue - .getWaiting() - .then(result => { - const waitingIDs: JobId[] = []; - result.forEach(element => { - waitingIDs.push(element.id); - }); - return waitingIDs; - }) - .then(waitingIDs => { - expect(waitingIDs.length).to.be.equal(4); - expect(waitingIDs).to.be.eql(['4', '3', '1', '2']); - }); - }); - }); - }); - - describe('.update', () => { - it('should allow updating job data', () => { - return Job.create(queue, { foo: 'bar' }) - .then(job => { - return job.update({ baz: 'qux' }).then(() => { - return job; - }); - }) - .then(job => { - return Job.fromId(queue, job.id).then(job => { - expect(job.data).to.be.eql({ baz: 'qux' }); - }); - }); - }); - }); - - describe('.remove', () => { - it('removes the job from redis', () => { - return Job.create(queue, { foo: 'bar' }) - .then(job => { - return job.remove().then(() => { - return job; - }); - }) - .then(job => { - return Job.fromId(queue, job.id); - }) - .then(storedJob => { - expect(storedJob).to.be.equal(null); - }); - }); - - // TODO not supported - // it('fails to remove a locked job', () => { - // return Job.create(queue, 1, { foo: 'bar' }).then(job => { - // return job - // .takeLock() - // .then(lock => { - // expect(lock).to.be.ok; - // }) - // .then(() => { - // return Job.fromId(queue, job.id).then(job => { - // return job.remove(); - // }); - // }) - // .then(() => { - // throw new Error('Should not be able to remove a locked job'); - // }) - // .catch((/*err*/) => { - // // Good! - // }); - // }); - // }); - - // TODO not supported - // it('removes any job from active set', () => { - // return queue.add({ foo: 'bar' }).then(job => { - // // Simulate a job in active state but not locked - // return queue - // .getNextJob() - // .then(() => { - // return job - // .isActive() - // .then(isActive => { - // expect(isActive).to.be.equal(true); - // return job.releaseLock(); - // }) - // .then(() => { - // return job.remove(); - // }); - // }) - // .then(() => { - // return Job.fromId(queue, job.id); - // }) - // .then((stored: Job) => { - // expect(stored).to.be.equal(null); - // return job.getState(); - // }) - // .then((state: JobStatus) => { - // // This check is a bit of a hack. A job that is not found in any list will return the state - // // stuck. - // expect(state).to.be.equal('stuck'); - // }); - // }); - // }); - - // TODO not supported - // it('emits removed event', cb => { - // queue.once('removed', job => { - // expect(job.data.foo).to.be.equal('bar'); - // cb(); - // }); - // Job.create(queue, { foo: 'bar' }).then(job => { - // job.remove(); - // }); - // }); - - it('a succesful job should be removable', done => { - queue.process(() => { - return Promise.resolve(); - }); - - queue.add({ foo: 'bar' }); - - queue.on('completed', job => { - job - .remove() - .then(() => { - done(); - }) - .catch(() => { - done(); - }); - }); - }); - - it('a failed job should be removable', done => { - queue.process(() => { - throw new Error(); - }); - - queue.add({ foo: 'bar' }); - - queue.on('failed', job => { - job - .remove() - .then(() => { - done(); - }) - .catch(() => { - done(); - }); - }); - }); - }); - - describe('.remove on priority queues', () => { - it('remove a job with jobID 1 and priority 3 and check the new order in the queue', () => { - return queue - .add({ foo: 'bar' }, { jobId: '1', priority: 3 }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '2', priority: 3 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '3', priority: 2 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '4', priority: 1 }); - }) - .then(() => { - return queue.getJob('1').then(job => { - return job.remove().then(() => { - return queue - .getWaiting() - .then(result => { - const waitingIDs: JobId[] = []; - result.forEach(element => { - waitingIDs.push(element.id); - }); - return waitingIDs; - }) - .then(waitingIDs => { - expect(waitingIDs.length).to.be.equal(3); - expect(waitingIDs).to.be.eql(['4', '3', '2']); - }); - }); - }); - }); - }); - - it('add a new job with priority 10 and ID 5 and check the new order (along with the previous 4 jobs)', () => { - return queue - .add({ foo: 'bar' }, { jobId: '1', priority: 3 }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '2', priority: 3 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '3', priority: 2 }); - }) - .then(() => { - return queue.add({ foo: 'bar' }, { jobId: '4', priority: 1 }); - }) - .then(() => { - return queue.getJob('1').then(job => { - return job.remove().then(() => { - return queue - .getWaiting() - .then(result => { - const waitingIDs: JobId[] = []; - result.forEach(element => { - waitingIDs.push(element.id); - }); - return waitingIDs; - }) - .then(waitingIDs => { - expect(waitingIDs.length).to.be.equal(3); - expect(waitingIDs).to.be.eql(['4', '3', '2']); - return true; - }) - .then(() => { - return queue - .add({ foo: 'bar' }, { jobId: '5', priority: 10 }) - .then(() => { - return queue - .getWaiting() - .then(result => { - const waitingIDs: JobId[] = []; - result.forEach(element => { - waitingIDs.push(element.id); - }); - return waitingIDs; - }) - .then(waitingIDs => { - expect(waitingIDs.length).to.be.equal(4); - expect(waitingIDs).to.be.eql(['4', '3', '2', '5']); - }); - }); - }); - }); - }); - }); - }); - }); - - // TODO timeout - // describe('.retry', () => { - // it('emits waiting event', cb => { - // queue.add({ foo: 'bar' }); - // queue.process((job, done) => { - // done(new Error('the job failed')); - // }); - // - // queue.once('failed', job => { - // queue.once('global:waiting', jobId2 => { - // Job.fromId(queue, jobId2).then(job2 => { - // expect(job2.data.foo).to.be.equal('bar'); - // cb(); - // }); - // }); - // queue.once('registered:global:waiting', () => { - // job.retry(); - // }); - // }); - // }); - // }); - - describe('Locking', () => { - let job: Job; - - beforeEach(() => { - return Job.create(queue, { foo: 'bar' }).then(createdJob => { - job = createdJob; - }); - }); - - // TODO not supported - // it('can take a lock', () => { - // return job - // .takeLock() - // .then(lockTaken => { - // expect(lockTaken).to.be.ok; - // }) - // .then(() => { - // return job.releaseLock().then(lockReleased => { - // expect(lockReleased).to.not.exist; - // }); - // }); - // }); - - // TODO not supported - // it('take an already taken lock', () => { - // return job - // .takeLock() - // .then(lockTaken => { - // expect(lockTaken).to.be.ok; - // }) - // .then(() => { - // return job.takeLock().then(lockTaken => { - // expect(lockTaken).to.be.ok; - // }); - // }); - // }); - - // TODO not supported - // it('can release a lock', () => { - // return job - // .takeLock() - // .then(lockTaken => { - // expect(lockTaken).to.be.ok; - // }) - // .then(() => { - // return job.releaseLock().then(lockReleased => { - // expect(lockReleased).to.not.exist; - // }); - // }); - // }); - }); - - describe('.progress', () => { - it('can set and get progress as number', () => { - return Job.create(queue, { foo: 'bar' }).then(job => { - return job.progress(42).then(() => { - return Job.fromId(queue, job.id).then(storedJob => { - expect(storedJob.progress()).to.be.equal(42); - }); - }); - }); - }); - it('can set and get progress as object', () => { - return Job.create(queue, { foo: 'bar' }).then(job => { - return job.progress({ total: 120, completed: 40 }).then(() => { - return Job.fromId(queue, job.id).then(storedJob => { - expect(storedJob.progress()).to.be.eql({ - total: 120, - completed: 40, - }); - }); - }); - }); - }); - }); - - // TODO not supported - describe('.log', () => { - it('can log two rows with text', () => { - const firstLog = 'some log text 1'; - const secondLog = 'some log text 2'; - return Job.create(queue, { foo: 'bar' }).then(job => - job - .log(firstLog) - .then(() => job.log(secondLog)) - .then(() => queue.getJobLogs(job.id)) - .then(logs => - expect(logs).to.be.eql({ logs: [firstLog, secondLog], count: 2 }), - ) - .then(() => job.remove()) - .then(() => queue.getJobLogs(job.id)) - .then(logs => expect(logs).to.be.eql({ logs: [], count: 0 })), - ); - }); - }); - - describe('.moveToCompleted', () => { - it('marks the job as completed and returns new job', () => { - return Job.create(queue, { foo: 'bar' }).then(job1 => { - return Job.create(queue, { foo: 'bar' }).then(job2 => { - return job2 - .isCompleted() - .then(isCompleted => { - expect(isCompleted).to.be.equal(false); - }) - .then(() => { - return job2.moveToCompleted('succeeded', true); - }) - .then(job1Id => { - return job2.isCompleted().then(isCompleted => { - expect(isCompleted).to.be.equal(true); - expect(job2.returnvalue).to.be.equal('succeeded'); - expect(job1Id[1]).to.be.equal(job1.id); - }); - }); - }); - }); - }); - }); - - describe('.moveToFailed', () => { - it('marks the job as failed', () => { - return Job.create(queue, { foo: 'bar' }).then(job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be.equal(false); - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be.equal(true); - expect(job.stacktrace).to.not.be.null; - expect(job.stacktrace.length).to.be.equal(1); - }); - }); - }); - }); - - it('moves the job to wait for retry if attempts are given', () => { - return Job.create(queue, { foo: 'bar' }, { attempts: 3 }).then(job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be.false; - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be.false; - expect(job.stacktrace).not.be.null; - expect(job.stacktrace.length).to.be.equal(1); - return job.isWaiting().then(isWaiting => { - expect(isWaiting).to.be.equal(true); - }); - }); - }); - }); - }); - - it('marks the job as failed when attempts made equal to attempts given', () => { - return Job.create(queue, { foo: 'bar' }, { attempts: 1 }).then(job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be.equal(false); - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be.equal(true); - expect(job.stacktrace).to.not.be.equal(null); - expect(job.stacktrace.length).to.be.equal(1); - }); - }); - }); - }); - - it('moves the job to delayed for retry if attempts are given and backoff is non zero', () => { - return Job.create( - queue, - { foo: 'bar' }, - { attempts: 3, backoff: 300 }, - ).then(job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be.equal(false); - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be.equal(false); - expect(job.stacktrace).not.be.null; - expect(job.stacktrace.length).to.be.equal(1); - return job.isDelayed().then(isDelayed => { - expect(isDelayed).to.be.equal(true); - }); - }); - }); - }); - }); - - it('applies stacktrace limit on failure', () => { - const stackTraceLimit = 1; - return Job.create(queue, { foo: 'bar' }, { stackTraceLimit }).then( - job => { - return job - .isFailed() - .then(isFailed => { - expect(isFailed).to.be.equal(false); - }) - .then(() => { - return job.moveToFailed(new Error('test error'), true); - }) - .then(() => { - return job - .moveToFailed(new Error('test error'), true) - .then(() => { - return job.isFailed().then(isFailed => { - expect(isFailed).to.be.equal(true); - expect(job.stacktrace).not.be.null; - expect(job.stacktrace.length).to.be.equal(stackTraceLimit); - }); - }); - }); - }, - ); - }); - }); - - describe('.promote', () => { - // TODO not supported - // it('can promote a delayed job to be executed immediately', () => { - // return Job.create(queue, { foo: 'bar' }, { delay: 1500 }).then(job => { - // return job - // .isDelayed() - // .then(isDelayed => { - // expect(isDelayed).to.be.equal(true); - // }) - // .then(() => { - // return job.promote(); - // }) - // .then(() => { - // return job.isDelayed().then(isDelayed => { - // expect(isDelayed).to.be.equal(false); - // return job.isWaiting().then(isWaiting => { - // expect(isWaiting).to.be.equal(true); - // return; - // }); - // }); - // }); - // }); - // }); - - // TODO not supported - // it('should process a promoted job according to its priority', done => { - // queue.process(() => { - // return utils.sleep(100); - // }); - // - // const completed: JobId[] = []; - // - // queue.on('completed', job => { - // completed.push(job.id); - // if (completed.length > 3) { - // expect(completed).to.be.eql(['1', '2', '3', '4']); - // done(); - // } - // }); - // const processStarted = new Promise(resolve => - // queue.once('active', resolve) - // ); - // - // const add = (id: string, ms?: number) => - // queue.add({}, { jobId: id, delay: ms, priority: 1 }); - // - // add('1') - // .then(() => add('2', 1)) - // .then(() => processStarted) - // .then(() => add('3', 5000)) - // .then(job => { - // job.promote(); - // }) - // .then(() => add('4', 1)); - // }); - - // TODO not supported - it('should not promote a job that is not delayed', () => { - return Job.create(queue, { foo: 'bar' }).then(job => { - return job - .isDelayed() - .then(isDelayed => { - expect(isDelayed).to.be.equal(false); - }) - .then(() => { - return job.promote(); - }) - .then(() => { - throw new Error('Job should not be promoted!'); - }) - .catch(err => { - expect(err).to.be.ok; - }); - }); - }); - }); - - // TODO: - // Divide into several tests - // - - // TODO isStuck not supported - // it('get job status', function() { - // this.timeout(12000); - // - // const client = new IORedis(); - // return Job.create(queue, { foo: 'baz' }) - // .then(job => { - // return job - // .isStuck() - // .then(isStuck => { - // expect(isStuck).to.be.equal(false); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('waiting'); - // return Scripts.moveToActive((queue as any).getWorker(), undefined).then(() => { - // return job.moveToCompleted(); - // }); - // }) - // .then(() => { - // return job.isCompleted(); - // }) - // .then(isCompleted => { - // expect(isCompleted).to.be.equal(true); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('completed'); - // return client.zrem(queue.toKey('completed'), job.id); - // }) - // .then(() => { - // return job.moveToDelayed(Date.now() + 10000, true); - // }) - // .then(() => { - // return job.isDelayed(); - // }) - // .then(yes => { - // expect(yes).to.be.equal(true); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('delayed'); - // return client.zrem(queue.toKey('delayed'), job.id); - // }) - // .then(() => { - // return job.moveToFailed(new Error('test'), true); - // }) - // .then(() => { - // return job.isFailed(); - // }) - // .then(isFailed => { - // expect(isFailed).to.be.equal(true); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('failed'); - // return client.zrem(queue.toKey('failed'), job.id); - // }) - // .then(res => { - // expect(res).to.be.equal(1); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('stuck'); - // return client.rpop(queue.toKey('wait')); - // }) - // .then(() => { - // return client.lpush(queue.toKey('paused'), job.id); - // }) - // .then(() => { - // return job.isPaused(); - // }) - // .then(isPaused => { - // expect(isPaused).to.be.equal(true); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('paused'); - // return client.rpop(queue.toKey('paused')); - // }) - // .then(() => { - // return client.lpush(queue.toKey('wait'), job.id); - // }) - // .then(() => { - // return job.isWaiting(); - // }) - // .then(isWaiting => { - // expect(isWaiting).to.be.equal(true); - // return job.getState(); - // }) - // .then(state => { - // expect(state).to.be.equal('waiting'); - // }); - // }) - // .then(() => { - // return client.quit(); - // }); - // }); - - describe('.finished', () => { - // TODO timeout - // it('should resolve when the job has been completed', done => { - // queue.process(() => { - // return utils.sleep(500); - // }); - // queue - // .add({ foo: 'bar' }) - // .then(job => { - // return job.finished(); - // }) - // .then(done, done); - // }); - - // TODO - // it('should resolve when the job has been completed and return object', done => { - // queue.process((/*job*/) => { - // return utils.sleep(500).then(() => { - // return { resultFoo: 'bar' }; - // }); - // }); - // queue - // .add({ foo: 'bar' }) - // .then(job => { - // return job.finished(); - // }) - // .then(jobResult => { - // expect(jobResult).to.be.an('object'); - // expect(jobResult.resultFoo).equal('bar'); - // done(); - // }); - // }); - - // it('should resolve when the job has been delayed and completed and return object', done => { - // queue.process((/*job*/) => { - // return utils.sleep(300).then(() => { - // return { resultFoo: 'bar' }; - // }); - // }); - // queue - // .add({ foo: 'bar' }) - // .then(job => { - // return utils.sleep(600).then(() => { - // return job.finished(); - // }); - // }) - // .then(jobResult => { - // expect(jobResult).to.be.an('object'); - // expect(jobResult.resultFoo).equal('bar'); - // done(); - // }); - // }); - - it('should resolve when the job has been completed and return string', done => { - queue.process((/*job*/) => { - return utils.sleep(500).then(() => { - return 'a string'; - }); - }); - queue - .add({ foo: 'bar' }) - .then(job => { - return utils.sleep(600).then(() => { - return job.finished(); - }); - }) - .then(jobResult => { - expect(jobResult).to.be.an('string'); - expect(jobResult).equal('a string'); - done(); - }); - }); - - // TODO - // it('should resolve when the job has been delayed and completed and return string', done => { - // queue.process((/*job*/) => { - // return utils.sleep(300).then(() => { - // return 'a string'; - // }); - // }); - // queue - // .add({ foo: 'bar' }) - // .then(job => { - // return job.finished(); - // }) - // .then(jobResult => { - // expect(jobResult).to.be.an('string'); - // expect(jobResult).equal('a string'); - // done(); - // }); - // }); - - // TODO - // it('should reject when the job has been failed', done => { - // queue.process(() => { - // return utils.sleep(500).then(() => { - // return Promise.reject(new Error('test error')); - // }); - // }); - // - // queue - // .add({ foo: 'bar' }) - // .then(job => { - // return job.finished(); - // }) - // .then( - // () => { - // done(Error('should have been rejected')); - // }, - // err => { - // expect(err.message).equal('test error'); - // done(); - // } - // ); - // }); - - it('should resolve directly if already processed', done => { - queue.process(() => { - return Promise.resolve(); - }); - queue - .add({ foo: 'bar' }) - .then(job => { - return utils.sleep(500).then(() => { - return job.finished(); - }); - }) - .then(() => { - done(); - }, done); - }); - - it('should reject directly if already processed', done => { - queue.process(() => { - return Promise.reject(Error('test error')); - }); - queue - .add({ foo: 'bar' }) - .then(job => { - return utils.sleep(500).then(() => { - return job.finished(); - }); - }) - .then( - () => { - done(Error('should have been rejected')); - }, - err => { - expect(err.message).equal('test error'); - done(); - }, - ); - }); - }); -}); diff --git a/src/test/compat/test_queue.ts b/src/test/compat/test_queue.ts deleted file mode 100644 index 9b143a1ab9..0000000000 --- a/src/test/compat/test_queue.ts +++ /dev/null @@ -1,2713 +0,0 @@ -import { - Queue3 as Queue, - Job3 as Job, - QueueOptions3 as QueueOptions, - JobOptions3 as JobOptions, -} from '@src/classes/compat'; -import _ from 'lodash'; -import IORedis from 'ioredis'; -import { expect } from 'chai'; -import sinon from 'sinon'; -import { v4 as uuid } from 'node-uuid'; -import * as utils from './utils'; - -const LOCK_RENEW_TIME = 5000; - -describe('Queue', () => { - const sandbox = sinon.createSandbox(); - let client: IORedis.Redis; - - beforeEach(() => { - client = new IORedis(); - return client.flushdb(); - }); - - afterEach(() => { - sandbox.restore(); - return client.quit(); - }); - - describe('.close', () => { - let testQueue: Queue; - beforeEach(() => { - return utils.newQueue('test').then(queue => { - testQueue = queue; - }); - }); - - it('should call end on the client', done => { - testQueue.client.once('end', () => { - done(); - }); - testQueue.close(); - }); - - it('should call end on the event subscriber client', done => { - testQueue.eclient.once('end', () => { - done(); - }); - testQueue.close(); - }); - - it('should resolve the promise when each client has disconnected', () => { - function checkStatus(status: string) { - return ( - status === 'ready' || status === 'connecting' || status === 'connect' - ); - } - expect(testQueue.client.status).to.satisfy(checkStatus); - expect(testQueue.eclient.status).to.satisfy(checkStatus); - - return testQueue.close().then(() => { - expect(testQueue.client.status).to.be.equal('end'); - expect(testQueue.eclient.status).to.be.equal('end'); - }); - }); - - it('should return a promise', () => { - const closePromise = testQueue.close().then(() => { - expect(closePromise.then).to.be.a('function'); - }); - return closePromise; - }); - - it('should close if the job expires after the lockRenewTime', function(done) { - this.timeout(testQueue['settings'].stalledInterval * 2); - - testQueue.process(() => { - return utils.sleep(100); - }); - - testQueue.on('completed', () => { - testQueue.close().then(() => { - done(); - }); - }); - testQueue.add({ foo: 'bar' }); - }); - - describe('should be callable from within', () => { - it('a job handler that takes a callback', function(done) { - this.timeout(12000); // Close can be a slow operation - - testQueue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - testQueue.close().then(() => { - done(); - }); - }); - - testQueue.add({ foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }); - }); - - it('a job handler that returns a promise', done => { - testQueue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return Promise.resolve(); - }); - - testQueue.on('completed', () => { - testQueue.close().then(() => { - done(); - }); - }); - - testQueue.add({ foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }); - }); - }); - }); - - describe('instantiation', () => { - it('should create a queue with standard redis opts', done => { - const queue = new Queue('standard'); - - expect((queue.client as any).options.host).to.be.equal('127.0.0.1'); - expect((queue.eclient as any).options.host).to.be.equal('127.0.0.1'); - - expect((queue.client as any).options.port).to.be.equal(6379); - expect((queue.eclient as any).options.port).to.be.equal(6379); - - expect((queue.client as any).options.db).to.be.equal(0); - expect((queue.eclient as any).options.db).to.be.equal(0); - - queue.close().then( - () => { - done(); - }, - () => { - done(); - }, - ); - }); - - it('should create a queue with a redis connection string', () => { - const queue = new Queue('connstring', 'redis://123.4.5.67:1234/2'); - - expect((queue.client as any).options.host).to.be.equal('123.4.5.67'); - expect((queue.eclient as any).options.host).to.be.equal('123.4.5.67'); - - expect((queue.client as any).options.port).to.be.equal(1234); - expect((queue.eclient as any).options.port).to.be.equal(1234); - - expect((queue.client as any).options.db).to.be.equal(2); - expect((queue.eclient as any).options.db).to.be.equal(2); - - queue.close(); - }); - - it('should create a queue with only a hostname', () => { - const queue = new Queue('connstring', 'redis://127.2.3.4'); - - expect((queue.client as any).options.host).to.be.equal('127.2.3.4'); - expect((queue.eclient as any).options.host).to.be.equal('127.2.3.4'); - - expect((queue.client as any).options.port).to.be.equal(6379); - expect((queue.eclient as any).options.port).to.be.equal(6379); - - expect((queue.client as any).condition.select).to.be.equal(0); - expect((queue.eclient as any).condition.select).to.be.equal(0); - - queue.close().catch((/*err*/) => { - // Swallow error. - }); - }); - - it('should create a queue with connection string and password', () => { - const queue = new Queue('connstring', 'redis://:123@127.2.3.4:6379'); - - expect((queue.client as any).options.host).to.be.equal('127.2.3.4'); - expect((queue.eclient as any).options.host).to.be.equal('127.2.3.4'); - - expect((queue.client as any).options.port).to.be.equal(6379); - expect((queue.eclient as any).options.port).to.be.equal(6379); - - expect((queue.client as any).condition.select).to.be.equal(0); - expect((queue.eclient as any).condition.select).to.be.equal(0); - - expect((queue.client as any).options.password).to.be.equal('123'); - expect((queue.eclient as any).options.password).to.be.equal('123'); - - queue.close().catch((/*err*/) => { - // Swallow error. - }); - }); - - it('creates a queue using the supplied redis DB', done => { - const queue = new Queue('custom', { redis: { DB: 1 } } as any); - - expect((queue.client as any).options.host).to.be.equal('127.0.0.1'); - expect((queue.eclient as any).options.host).to.be.equal('127.0.0.1'); - - expect((queue.client as any).options.port).to.be.equal(6379); - expect((queue.eclient as any).options.port).to.be.equal(6379); - - expect((queue.client as any).options.db).to.be.equal(1); - expect((queue.eclient as any).options.db).to.be.equal(1); - - queue.close().then( - () => { - done(); - }, - () => { - done(); - }, - ); - }); - - it('creates a queue using the supplied redis host', done => { - const queue = new Queue('custom', { redis: { host: 'localhost' } }); - - expect((queue.client as any).options.host).to.be.equal('localhost'); - expect((queue.eclient as any).options.host).to.be.equal('localhost'); - - expect((queue.client as any).options.db).to.be.equal(0); - expect((queue.eclient as any).options.db).to.be.equal(0); - - queue.close().then( - () => { - done(); - }, - () => { - done(); - }, - ); - }); - - it('creates a queue with dots in its name', () => { - const queue = new Queue('using. dots. in.name.'); - - return queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .then(() => { - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - }); - return null; - }) - .then(() => { - return queue.close(); - }); - }); - - it('creates a queue accepting port as a string', () => { - const queue = new Queue('foobar', '6379', 'localhost' as any); - - return queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .then(() => { - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - }); - return null; - }) - .then(() => { - return queue.close(); - }); - }); - - it('should create a queue with a prefix option', () => { - const queue = new Queue('q', 'redis://127.0.0.1', { - keyPrefix: 'myQ', - } as any); - - return queue - .add({ foo: 'bar' }) - .then((job: Job) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - const client = new IORedis(); - return client.hgetall('myQ:q:' + job.id).then(result => { - expect(result).to.not.be.null; - return client.quit(); - }); - }) - .then(() => { - return queue.close(); - }); - }); - - // TODO createClient() is not supported in BullMQ - // it('should allow reuse redis connections', done => { - // const client = new IORedis(); - // const subscriber = new IORedis(); - // - // const opts = { - // createClient(type: string, opts: IORedis.RedisOptions) { - // switch (type) { - // case 'client': - // return client; - // case 'subscriber': - // return subscriber; - // default: - // return new IORedis(opts); - // } - // } - // }; - // const queueFoo = new Queue('foobar', opts); - // const queueQux = new Queue('quxbaz', opts); - // - // expect(queueFoo.client).to.be.equal(client); - // expect(queueFoo.eclient).to.be.equal(subscriber); - // - // expect(queueQux.client).to.be.equal(client); - // expect(queueQux.eclient).to.be.equal(subscriber); - // - // queueFoo - // .add({ foo: 'bar' }) - // .then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }) - // .then(() => { - // return queueQux.add({ qux: 'baz' }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.qux).to.be.equal('baz'); - // let completed = 0; - // - // queueFoo.process((job, jobDone) => { - // jobDone(); - // }); - // - // queueQux.process((job, jobDone) => { - // jobDone(); - // }); - // - // queueFoo.on('completed', () => { - // completed++; - // if (completed == 2) { - // done(); - // } - // }); - // - // queueQux.on('completed', () => { - // completed++; - // if (completed == 2) { - // done(); - // } - // }); - // }); - // }, done); - // }); - - it('creates a queue with default job options', done => { - const defaultJobOptions = { removeOnComplete: true }; - const queue = new Queue('custom', { - defaultJobOptions, - }); - - expect(queue['defaultJobOptions']).to.be.equal(defaultJobOptions); - - queue.close().then( - () => { - done(); - }, - () => { - done(); - }, - ); - }); - }); - - describe(' a worker', () => { - let queue: Queue; - - beforeEach(() => { - const client = new IORedis(); - return client - .flushdb() - .then(() => { - return utils.newQueue(); - }) - .then(_queue => { - queue = _queue; - }); - }); - - afterEach(function() { - this.timeout( - queue['settings'].stalledInterval * - (1 + queue['settings'].maxStalledCount), - ); - return utils.cleanupQueues(); - }); - - it('should process a job', done => { - queue - .process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - done(); - }) - .catch(() => { - done(); - }); - - queue.add({ foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, done); - }); - - describe('auto job removal', () => { - // TODO - // it('should remove job after completed if removeOnComplete', done => { - // queue - // .process((job, jobDone) => { - // expect(job.data.foo).to.be.equal('bar'); - // jobDone(); - // }) - // .catch(done); - // - // queue.add({ foo: 'bar' }, { removeOnComplete: true }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }, done); - // - // queue.on('completed', job => { - // queue - // .getJob(job.id) - // .then(job => { - // expect(job).to.be.equal(null); - // }) - // .then(() => { - // queue.getJobCounts().then(counts => { - // expect(counts.completed).to.be.equal(0); - // done(); - // }); - // }); - // }); - // }); - - it('should remove a job after completed if the default job options specify removeOnComplete', done => { - utils - .newQueue('test-' + uuid(), { - defaultJobOptions: { - removeOnComplete: true, - }, - }) - .then(myQueue => { - myQueue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - }); - - myQueue - .add({ foo: 'bar' }) - .then( - job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, - () => { - done(); - }, - ) - .catch(() => { - done(); - }); - - myQueue.on('completed', job => { - myQueue - .getJob(job.id) - .then(job => { - expect(job).to.be.equal(null); - }) - .then(() => { - return myQueue.getJobCounts(); - }) - .then(counts => { - expect(counts.completed).to.be.equal(0); - - return utils.cleanupQueues(); - }) - .then(() => { - done(); - }) - .catch(() => { - done(); - }); - }); - return null; - }) - .catch(() => { - done(); - }); - }); - - // TODO - // it('should keep specified number of jobs after completed with removeOnComplete', async () => { - // const keepJobs = 3; - // queue.process(async job => { - // await job.log('test log'); - // }); - // - // const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - // - // const jobIds = await Promise.all( - // datas.map( - // async data => - // (await queue.add(data, { removeOnComplete: keepJobs })).id - // ) - // ); - // - // return new Promise(resolve => { - // queue.on('completed', async job => { - // if (job.data == 8) { - // const counts = await queue.getJobCounts(); - // expect(counts.completed).to.be.equal(keepJobs); - // - // await Promise.all( - // jobIds.map(async (jobId, index) => { - // const job = await queue.getJob(jobId); - // const logs = await queue.getJobLogs(jobId); - // if (index >= datas.length - keepJobs) { - // expect(job).to.not.be.equal(null); - // expect(logs.logs).to.not.be.empty; - // } else { - // expect(job).to.be.equal(null); - // expect(logs.logs).to.be.empty; - // } - // }) - // ); - // resolve(); - // } - // }); - // }); - // }); - - // TODO - // it('should keep specified number of jobs after completed with global removeOnComplete', async () => { - // const keepJobs = 3; - // - // const localQueue = await utils.newQueue('test-' + uuid(), { - // defaultJobOptions: { - // removeOnComplete: keepJobs - // } - // }); - // localQueue.process(() => {}); - // - // const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - // - // const jobIds = await Promise.all( - // datas.map(async data => (await localQueue.add(data)).id) - // ); - // - // return new Promise((resolve, reject) => { - // localQueue.on('completed', async job => { - // if (job.data === 8) { - // try { - // const counts = await localQueue.getJobCounts(); - // expect(counts.completed).to.be.equal(keepJobs); - // - // await Promise.all( - // jobIds.map(async (jobId, index) => { - // const job = await localQueue.getJob(jobId); - // if (index >= datas.length - keepJobs) { - // expect(job).to.not.be.equal(null); - // } else { - // expect(job).to.be.equal(null); - // } - // }) - // ); - // } catch (err) { - // reject(err); - // } - // - // resolve(); - // } - // }); - // }); - // }); - - // TODO - // it('should remove job after failed if removeOnFail', done => { - // queue.process(job => { - // expect(job.data.foo).to.be.equal('bar'); - // throw Error('error'); - // }); - // - // queue.add({ foo: 'bar' }, { removeOnFail: true }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }, done); - // - // queue.on('failed', (job) => { - // const jobId = job.id; - // queue - // .getJob(jobId) - // .then(job => { - // expect(job).to.be.equal(null); - // return null; - // }) - // .then(() => { - // return queue.getJobCounts().then(counts => { - // expect(counts.failed).to.be.equal(0); - // done(); - // }); - // }); - // }); - // }); - - it('should remove a job after fail if the default job options specify removeOnFail', done => { - utils - .newQueue('test-' + uuid(), { - defaultJobOptions: { - removeOnFail: true, - }, - }) - .then(myQueue => { - myQueue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - throw Error('error'); - }); - - myQueue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, done) - .catch(() => { - done(); - }); - - myQueue.on('failed', job => { - const jobId = job.id; - myQueue - .getJob(jobId) - .then(job => { - expect(job).to.be.equal(null); - }) - .then(() => { - return myQueue.getJobCounts(); - }) - .then(counts => { - expect(counts.completed).to.be.equal(0); - - return utils.cleanupQueues(); - }) - .then(() => { - done(); - }) - .catch(() => { - done(); - }); - }); - return null; - }) - .catch(() => { - done(); - }); - }); - - // TODO - // it('should keep specified number of jobs after completed with removeOnFail', async () => { - // const keepJobs = 3; - // queue.process(() => { - // throw Error('error'); - // }); - // - // const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - // - // const jobIds = await Promise.all( - // datas.map( - // async data => (await queue.add(data, { removeOnFail: keepJobs })).id - // ) - // ); - // - // return new Promise(resolve => { - // queue.on('failed', async job => { - // if (job.data == 8) { - // const counts = await queue.getJobCounts(); - // expect(counts.failed).to.be.equal(keepJobs); - // - // await Promise.all( - // jobIds.map(async (jobId, index) => { - // const job = await queue.getJob(jobId); - // if (index >= datas.length - keepJobs) { - // expect(job).to.not.be.equal(null); - // } else { - // expect(job).to.be.equal(null); - // } - // }) - // ); - // - // resolve(); - // } - // }); - // }); - // }); - - // TODO - // it('should keep specified number of jobs after completed with global removeOnFail', async () => { - // const keepJobs = 3; - // - // const localQueue = await utils.newQueue('test-' + uuid(), { - // defaultJobOptions: { - // removeOnFail: keepJobs - // } - // }); - // localQueue.process(() => { - // throw Error('error'); - // }); - // - // const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8]; - // - // const jobIds = await Promise.all( - // datas.map(async data => (await localQueue.add(data)).id) - // ); - // - // return new Promise((resolve, reject) => { - // localQueue.on('failed', async job => { - // if (job.data == 8) { - // try { - // const counts = await localQueue.getJobCounts(); - // expect(counts.failed).to.be.equal(keepJobs); - // - // await Promise.all( - // jobIds.map(async (jobId, index) => { - // const job = await localQueue.getJob(jobId); - // if (index >= datas.length - keepJobs) { - // expect(job).to.not.be.equal(null); - // } else { - // expect(job).to.be.equal(null); - // } - // }) - // ); - // } catch (err) { - // reject(err); - // } - // - // resolve(); - // } - // }); - // }); - // }); - }); - - it('process a lifo queue', function(done) { - this.timeout(3000); - let currentValue = 0, - first = true; - utils.newQueue('test lifo').then(queue2 => { - queue2.process((job, jobDone) => { - // Catching the job before the pause - expect(job.data.count).to.be.equal(currentValue--); - jobDone(); - if (first) { - first = false; - } else if (currentValue === 0) { - done(); - } - }); - - queue2.pause().then(() => { - // Add a series of jobs in a predictable order - const jobs = [ - { count: ++currentValue }, - { count: ++currentValue }, - { count: ++currentValue }, - { count: ++currentValue }, - ]; - return Promise.all( - jobs.map(jobData => { - return queue2.add(jobData, { lifo: true }); - }), - ).then(() => { - queue2.resume(); - }); - }); - }); - }); - - it('should processes jobs by priority', done => { - const normalPriority = []; - const mediumPriority = []; - const highPriority = []; - - // for the current strategy this number should not exceed 8 (2^2*2) - // this is done to maitain a deterministic output. - const numJobsPerPriority = 6; - - for (let i = 0; i < numJobsPerPriority; i++) { - normalPriority.push(queue.add({ p: 2 }, { priority: 2 })); - mediumPriority.push(queue.add({ p: 3 }, { priority: 3 })); - highPriority.push(queue.add({ p: 1 }, { priority: 1 })); - } - - // wait for all jobs to enter the queue and then start processing - Promise.all([].concat(normalPriority, mediumPriority, highPriority)).then( - () => { - let currentPriority = 1; - let counter = 0; - let total = 0; - - queue.process((job, jobDone) => { - expect(job.id).to.be.ok; - expect(job.data.p).to.be.equal(currentPriority); - jobDone(); - - total++; - if (++counter === numJobsPerPriority) { - currentPriority++; - counter = 0; - - if (currentPriority === 4 && total === numJobsPerPriority * 3) { - done(); - } - } - }); - }, - done, - ); - }); - - it('process several jobs serially', function(done) { - this.timeout(12000); - let counter = 1; - const maxJobs = 35; - - queue.process((job, jobDone) => { - expect(job.data.num).to.be.equal(counter); - expect(job.data.foo).to.be.equal('bar'); - jobDone(); - if (counter === maxJobs) { - done(); - } - counter++; - }); - - for (let i = 1; i <= maxJobs; i++) { - queue.add({ foo: 'bar', num: i }); - } - }); - - it('process a job that updates progress', done => { - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - job.progress(42); - jobDone(); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('progress', (job, progress) => { - expect(job).to.be.ok; - expect(progress).to.be.equal(42); - done(); - }); - }); - - it('process a job that returns data in the process handler', done => { - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(null, 37); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - expect(data).to.be.equal(37); - queue.getJob(job.id).then(job => { - expect(job.returnvalue).to.be.equal(37); - done(); - }); - }); - }); - - it('process a job that returns a string in the process handler', done => { - const testString = 'a very dignified string'; - queue.on('completed', (job /*, data*/) => { - expect(job).to.be.ok; - expect(job.returnvalue).to.be.equal(testString); - setTimeout(() => { - queue - .getJob(job.id) - .then(job => { - expect(job).to.be.ok; - expect(job.returnvalue).to.be.equal(testString); - done(); - }) - .catch(() => { - done(); - }); - }, 100); - }); - - queue.process((/*job*/) => { - return Promise.resolve(testString); - }); - - queue.add({ testing: true }); - }); - - it('process a job that returns data and the returnvalue gets stored in the database', done => { - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(null, 37); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - expect(data).to.be.equal(37); - queue.getJob(job.id).then(job => { - expect(job.returnvalue).to.be.equal(37); - queue.client - .hget(queue.toKey('' + job.id), 'returnvalue') - .then(retval => { - expect(JSON.parse(retval)).to.be.equal(37); - done(); - }); - }); - }); - }); - - it('process a job that returns a promise', done => { - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return utils.sleep(250).then(() => { - return 'my data'; - }); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - expect(data).to.be.equal('my data'); - done(); - }); - }); - - it('process a job that returns data in a promise', done => { - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return utils.sleep(250, { value: 42 }); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - expect(data).to.be.equal(42); - done(); - }); - }); - - it('process a synchronous job', done => { - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - }); - - queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', job => { - expect(job).to.be.ok; - done(); - }); - }); - - // TODO too tricky to fix - // it('process stalled jobs when starting a queue', function(done) { - // this.timeout(6000); - // utils - // .newQueue('test queue stalled', { - // settings: { - // lockDuration: 15, - // lockRenewTime: 5, - // stalledInterval: 100 - // } - // }) - // .then((queueStalled: Queue) => { - // const jobs = [ - // queueStalled.add({ bar: 'baz' }), - // queueStalled.add({ bar1: 'baz1' }), - // queueStalled.add({ bar2: 'baz2' }), - // queueStalled.add({ bar3: 'baz3' }) - // ]; - // Promise.all(jobs).then(() => { - // const afterJobsRunning = function() { - // const stalledCallback = sandbox.spy(); - // return queueStalled - // .close() - // .then(() => { - // return new Promise((resolve, reject) => { - // utils - // .newQueue('test queue stalled', { - // settings: { - // stalledInterval: 100 - // } - // }) - // .then(queue2 => { - // const doneAfterFour = _.after(4, () => { - // try { - // expect(stalledCallback.calledOnce).to.be.equal(true); - // queue.close().then(resolve); - // } catch (e) { - // queue.close().then(() => { - // reject(e); - // }); - // } - // }); - // queue2.on('completed', doneAfterFour); - // queue2.on('stalled', stalledCallback); - // - // queue2.process((job, jobDone2) => { - // jobDone2(); - // }); - // }); - // }); - // }) - // .then(() => { done(); }, () => { done(); }); - // }; - // - // const onceRunning = _.once(afterJobsRunning); - // queueStalled.process(() => { - // onceRunning(); - // return utils.sleep(150); - // }); - // }); - // }); - // }); - - it('processes jobs that were added before the queue backend started', () => { - return utils - .newQueue('test queue added before', { - settings: { - lockRenewTime: 10, - }, - }) - .then(queueStalled => { - const jobs = [ - queueStalled.add({ bar: 'baz' }), - queueStalled.add({ bar1: 'baz1' }), - queueStalled.add({ bar2: 'baz2' }), - queueStalled.add({ bar3: 'baz3' }), - ]; - - return Promise.all(jobs) - .then(queueStalled.close.bind(queueStalled)) - .then(() => { - return utils.newQueue('test queue added before').then(queue2 => { - queue2.process((job, jobDone) => { - jobDone(); - }); - - return new Promise(resolve => { - const resolveAfterAllJobs = _.after(jobs.length, resolve); - queue2.on('completed', resolveAfterAllJobs); - }); - }); - }); - }); - }); - - it('process a named job that returns a promise', done => { - queue.process('myname', job => { - expect(job.data.foo).to.be.equal('bar'); - return utils.sleep(250).then(() => { - return 'my data'; - }); - }); - - queue - .add('myname', { foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .catch(() => { - done(); - }); - - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - expect(data).to.be.equal('my data'); - done(); - }); - }); - - it('process a two named jobs that returns a promise', done => { - queue.process('myname', job => { - expect(job.data.foo).to.be.equal('bar'); - return utils.sleep(250).then(() => { - return 'my data'; - }); - }); - - queue.process('myname2', job => { - expect(job.data.baz).to.be.equal('qux'); - return utils.sleep(250).then(() => { - return 'my data 2'; - }); - }); - - queue - .add('myname', { foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .then(() => { - return queue.add('myname2', { baz: 'qux' }); - }) - .catch(() => { - done(); - }); - - let one: boolean, two: boolean; - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - if (job.data.foo) { - one = true; - expect(data).to.be.equal('my data'); - } - if (job.data.baz) { - two = true; - expect(data).to.be.equal('my data 2'); - } - if (one && two) { - done(); - } - }); - }); - - it('process all named jobs from one process function', done => { - queue.process('*', job => { - expect(job.data).to.be.ok; - return utils.sleep(250).then(() => { - return 'my data'; - }); - }); - - queue.add('job_a', { foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }); - - queue.add('job_b', { baz: 'qux' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.baz).to.be.equal('qux'); - }); - - let one: boolean, two: boolean; - queue.on('completed', (job, data) => { - expect(job).to.be.ok; - if (job.data.foo) { - one = true; - expect(data).to.be.equal('my data'); - } - if (job.data.baz) { - two = true; - expect(data).to.be.equal('my data'); - } - if (one && two) { - done(); - } - }); - }); - - it('fails job if missing named process', done => { - queue.process((/*job*/) => { - done(Error('should not process this job')); - }); - - queue.once('failed', (/*err*/) => { - done(); - }); - - queue.add('myname', { foo: 'bar' }).then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }); - }); - - // TODO something is wrong with stalled jobs detection - // it('processes several stalled jobs when starting several queues', function(done) { - // this.timeout(50000); - // - // const NUM_QUEUES = 10; - // const NUM_JOBS_PER_QUEUE = 10; - // const stalledQueues: Queue[] = []; - // const jobs = []; - // const redisOpts = { port: 6379, host: '127.0.0.1' }; - // - // for (let i = 0; i < NUM_QUEUES; i++) { - // const queueStalled2 = new Queue('test queue stalled 2', { - // redis: redisOpts, - // settings: { - // lockDuration: 30, - // lockRenewTime: 10, - // stalledInterval: 100 - // } - // }); - // - // for (let j = 0; j < NUM_JOBS_PER_QUEUE; j++) { - // jobs.push(queueStalled2.add({ job: j })); - // } - // - // stalledQueues.push(queueStalled2); - // } - // - // const closeStalledQueues = function() { - // return Promise.all( - // stalledQueues.map(queue => { - // return queue.close(); - // }) - // ); - // }; - // - // Promise.all(jobs).then(() => { - // let processed = 0; - // const procFn = function(this: Queue) { - // // instead of completing we just close the queue to simulate a crash. - // utils.simulateDisconnect(this); - // processed++; - // if (processed === stalledQueues.length) { - // setTimeout(() => { - // const queue2 = new Queue('test queue stalled 2', { - // redis: redisOpts, - // settings: { stalledInterval: 100 } - // }); - // queue2.on('error', err => { - // done(err); - // }); - // queue2.process((job2, jobDone) => { - // jobDone(); - // }); - // - // let counter = 0; - // queue2.on('completed', () => { - // counter++; - // if (counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) { - // queue2.close().then(() => { done(); }); - // - // closeStalledQueues().then(() => { - // // This can take long time since queues are disconnected. - // }); - // } - // }); - // }, 100); - // } - // }; - // - // const processes: Promise[] = []; - // stalledQueues.forEach(queue => { - // queue.on('error', (/*err*/) => { - // // - // // Swallow errors produced by the disconnect - // // - // }); - // processes.push(queue.process(procFn)); - // }); - // return Promise.all(processes); - // }); - // }); - - // TODO - // it('does not process a job that is being processed when a new queue starts', function(done) { - // this.timeout(12000); - // let err: any = null; - // let anotherQueue: Queue; - // - // queue.on('completed', () => { - // utils.cleanupQueue(anotherQueue).then(done.bind(null, err)); - // }); - // - // queue.add({ foo: 'bar' }).then(addedJob => { - // queue - // .process((job, jobDone) => { - // expect(job.data.foo).to.be.equal('bar'); - // - // if (addedJob.id !== job.id) { - // err = new Error( - // 'Processed job id does not match that of added job' - // ); - // } - // setTimeout(jobDone, 500); - // }) - // .catch(done); - // - // utils.newQueue().then(_anotherQueue => { - // anotherQueue = _anotherQueue; - // setTimeout(() => { - // anotherQueue.process((job, jobDone) => { - // err = new Error( - // 'The second queue should not have received a job to process' - // ); - // jobDone(); - // }); - // }, 50); - // }); - // }); - // }); - - // TODO - // it('process stalled jobs without requiring a queue restart', function(done) { - // this.timeout(12000); - // - // const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), { - // settings: { - // lockRenewTime: 5000, - // lockDuration: 500, - // stalledInterval: 1000 - // } - // }); - // - // const collect = _.after(2, () => { - // queue2.close().then(() => { done(); }); - // }); - // - // queue2.on('completed', () => { - // const client = new IORedis(); - // client - // .multi() - // .zrem(queue2.toKey('completed'), 1) - // .lpush(queue2.toKey('active'), 1) - // .exec(); - // client.quit(); - // collect(); - // }); - // - // queue2.process((job, jobDone) => { - // expect(job.data.foo).to.be.equal('bar'); - // jobDone(); - // }); - // - // queue2 - // .add({ foo: 'bar' }) - // .then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }) - // .catch(done); - // }); - - // TODO - // it('failed stalled jobs that stall more than allowable stalled limit', function(done) { - // const FAILED_MESSAGE = 'job stalled more than allowable limit'; - // this.timeout(10000); - // - // const queue2 = utils.buildQueue('running-stalled-job-' + uuid(), { - // settings: { - // lockRenewTime: 2500, - // lockDuration: 250, - // stalledInterval: 500, - // maxStalledCount: 1 - // } - // }); - // - // let processedCount = 0; - // queue2.process(job => { - // processedCount++; - // expect(job.data.foo).to.be.equal('bar'); - // return utils.sleep(1500); - // }); - // - // queue2.on('completed', () => { - // done(new Error('should not complete')); - // }); - // - // queue2.on('failed', (job, err) => { - // expect(processedCount).to.be.equal(2); - // expect(job['failedReason']).to.be.equal(FAILED_MESSAGE); - // expect(err.message).to.be.equal(FAILED_MESSAGE); - // done(); - // }); - // - // queue2 - // .add({ foo: 'bar' }) - // .then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }) - // .catch(done); - // }); - - it('process a job that fails', done => { - const jobError = new Error('Job Failed'); - - queue.process((job, jobDone) => { - expect(job.data.foo).to.be.equal('bar'); - jobDone(jobError); - }); - - queue.add({ foo: 'bar' }).then( - job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, - err => { - done(err); - }, - ); - - queue.once('failed', (job, err) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - expect(err).to.be.equal(jobError); - done(); - }); - }); - - it('process a job that throws an exception', done => { - const jobError = new Error('Job Failed'); - - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - throw jobError; - }); - - queue.add({ foo: 'bar' }).then( - job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, - err => { - done(err); - }, - ); - - queue.once('failed', (job, err) => { - expect(job).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - expect(err).to.be.equal(jobError); - done(); - }); - }); - - // TODO - // it('process a job that returns data with a circular dependency', done => { - // queue.on('failed', () => { - // done(); - // }); - // queue.on('completed', () => { - // done(Error('Should not complete')); - // }); - // queue.process(() => { - // const circular: any = {}; - // circular.x = circular; - // return Promise.resolve(circular); - // }); - // - // queue.add({ foo: 'bar' }); - // }); - - it('process a job that returns a rejected promise', done => { - const jobError = new Error('Job Failed'); - - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return Promise.reject(jobError); - }); - - queue.add({ foo: 'bar' }).then( - job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, - err => { - done(err); - }, - ); - - queue.once('failed', (job, err) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - expect(err).to.be.equal(jobError); - done(); - }); - }); - - it('process a job that rejects with a nested error', done => { - const cause = new Error('cause'); - const jobError: any = new Error('wrapper'); - jobError.cause = function() { - return cause; - }; - - queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return Promise.reject(jobError); - }); - - queue.add({ foo: 'bar' }).then( - job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }, - err => { - done(err); - }, - ); - - queue.once('failed', (job, err) => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - expect(err).to.be.equal(jobError); - expect(err.cause()).to.be.equal(cause); - done(); - }); - }); - - // Skipped since the test is unstable and difficult to understand. - it.skip('does not renew a job lock after the lock has been released [#397]', function() { - this.timeout(LOCK_RENEW_TIME * 4); - - const processing = queue.process(job => { - expect(job.data.foo).to.be.equal('bar'); - return Promise.resolve(); - }); - - const emit = queue.emit.bind(queue); - queue.emit = function(event: string | symbol, ...args: any[]): boolean { - utils.sleep(LOCK_RENEW_TIME * 2).then(() => { - emit.apply(null, arguments as any); - }); - return true; - }; - - setTimeout(queue.close.bind(queue), LOCK_RENEW_TIME * 2.5); - - return queue - .add({ foo: 'bar' }) - .then(job => { - expect(job.id).to.be.ok; - expect(job.data.foo).to.be.equal('bar'); - }) - .then(() => { - return processing; - }); - }); - - // TODO - // it('retry a job that fails', done => { - // let called = 0; - // let failedOnce = false; - // const notEvenErr = new Error('Not even!'); - // - // const retryQueue = utils.buildQueue('retry-test-queue'); - // - // retryQueue.add({ foo: 'bar' }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }); - // - // retryQueue.process((job, jobDone) => { - // called++; - // if (called % 2 !== 0) { - // throw notEvenErr; - // } - // jobDone(); - // }); - // - // retryQueue.once('failed', (job, err) => { - // expect(job).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // expect(err).to.be.equal(notEvenErr); - // failedOnce = true; - // retryQueue.retryJob(job); - // }); - // - // retryQueue.once('completed', () => { - // expect(failedOnce).to.be.equal(true); - // retryQueue.close().then(() => { done(); }); - // }); - // }); - - // TODO - // it('retry a job that fails using job retry method', done => { - // let called = 0; - // let failedOnce = false; - // const notEvenErr = new Error('Not even!'); - // - // const retryQueue = utils.buildQueue('retry-test-queue'); - // - // retryQueue.add({ foo: 'bar' }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // }); - // - // retryQueue.process((job, jobDone) => { - // called++; - // if (called % 2 !== 0) { - // throw notEvenErr; - // } - // jobDone(); - // }); - // - // retryQueue.once('failed', (job, err) => { - // expect(job).to.be.ok; - // expect(job.data.foo).to.be.equal('bar'); - // expect(err).to.be.equal(notEvenErr); - // failedOnce = true; - // job.retry().then(() => { - // expect(job.failedReason).to.be.null; - // expect(job.processedOn).to.be.null; - // expect(job.finishedOn).to.be.null; - // - // retryQueue.getJob(job.id).then(updatedJob => { - // expect(updatedJob['failedReason']).to.be.undefined; - // expect(updatedJob.processedOn).to.be.undefined; - // expect(updatedJob.finishedOn).to.be.undefined; - // }); - // }); - // }); - // - // retryQueue.once('completed', () => { - // expect(failedOnce).to.be.equal(true); - // retryQueue.close().then(() => { done(); }); - // }); - // }); - }); - - // TODO queue.empty() not supported - // it('count added, unprocessed jobs', () => { - // const maxJobs = 100; - // const added = []; - // - // const queue = utils.buildQueue(); - // - // for (let i = 1; i <= maxJobs; i++) { - // added.push(queue.add({ foo: 'bar', num: i })); - // } - // - // return Promise.all(added) - // .then(queue.count.bind(queue)) - // .then(count => { - // expect(count).to.be.equal(maxJobs); - // }) - // .then(queue.empty.bind(queue)) - // .then(queue.count.bind(queue)) - // .then(count => { - // expect(count).to.be.equal(0); - // return queue.close(); - // }); - // }); - - describe('Delayed jobs', () => { - const queue: Queue = null; - - beforeEach(() => { - const client = new IORedis(); - return client.flushdb(); - }); - - // TODO - // it('should process a delayed job only after delayed time', done => { - // const delay = 500; - // queue = new Queue('delayed queue simple'); - // const client = new IORedis(6379, '127.0.0.1', {}); - // const timestamp = Date.now(); - // let publishHappened = false; - // client.on('ready', () => { - // client.on('message', (channel, message) => { - // expect(parseInt(message, 10)).to.be.a('number'); - // publishHappened = true; - // }); - // client.subscribe(queue.toKey('delayed')); - // }); - // - // queue.process((job, jobDone) => { - // jobDone(); - // }); - // - // queue.on('completed', () => { - // expect(Date.now() > timestamp + delay); - // queue - // .getWaiting() - // .then(jobs => { - // expect(jobs.length).to.be.equal(0); - // }) - // .then(() => { - // return queue.getDelayed().then(jobs => { - // expect(jobs.length).to.be.equal(0); - // }); - // }) - // .then(() => { - // expect(publishHappened).to.be.equal(true); - // queue.close().then(() => { done(); }, () => { done(); }); - // }); - // }); - // - // queue.isReady().then(() => { - // queue.add({ delayed: 'foobar' }, { delay }).then(job => { - // expect(job.id).to.be.ok; - // expect(job.data.delayed).to.be.equal('foobar'); - // expect(job['delay']).to.be.equal(delay); - // }); - // }); - // }); - - // TODO - // it('should process delayed jobs in correct order', done => { - // let order = 0; - // queue = new Queue('delayed queue multiple'); - // - // queue.on('failed', (job, err) => { - // (err as any).job = job; - // done(err); - // }); - // - // queue.process((job, jobDone) => { - // order++; - // expect(order).to.be.equal(job.data.order); - // jobDone(); - // if (order === 10) { - // queue.close().then(() => { done(); }, () => { done(); }); - // } - // }); - // - // queue.add({ order: 1 }, { delay: 100 }); - // queue.add({ order: 6 }, { delay: 600 }); - // queue.add({ order: 10 }, { delay: 1000 }); - // queue.add({ order: 2 }, { delay: 200 }); - // queue.add({ order: 9 }, { delay: 900 }); - // queue.add({ order: 5 }, { delay: 500 }); - // queue.add({ order: 3 }, { delay: 300 }); - // queue.add({ order: 7 }, { delay: 700 }); - // queue.add({ order: 4 }, { delay: 400 }); - // queue.add({ order: 8 }, { delay: 800 }); - // }); - - // TODO - // it('should process delayed jobs in correct order even in case of restart', function(done) { - // this.timeout(15000); - // - // const QUEUE_NAME = 'delayed queue multiple' + uuid(); - // let order = 1; - // - // queue = new Queue(QUEUE_NAME); - // - // const fn = function(job: Job, jobDone: Function) { - // expect(order).to.be.equal(job.data.order); - // jobDone(); - // - // if (order === 4) { - // queue.close().then(() => { done(); }, () => { done(); }); - // } - // - // order++; - // }; - // - // Promise.all([ - // queue.add({ order: 2 }, { delay: 300 }), - // queue.add({ order: 4 }, { delay: 500 }), - // queue.add({ order: 1 }, { delay: 200 }), - // queue.add({ order: 3 }, { delay: 400 }) - // ]) - // .then(() => { - // // - // // Start processing so that jobs get into the delay set. - // // - // queue.process(fn); - // return utils.sleep(20); - // }) - // .then(() => { - // /* - // //We simulate a restart - // console.log('RESTART'); - // return queue.close().then(function () { - // console.log('CLOSED'); - // return utils.sleep(100).then(function () { - // queue = new Queue(QUEUE_NAME); - // queue.process(fn); - // }); - // }); - // */ - // }); - // }); - - // TODO - // it('should process delayed jobs with exact same timestamps in correct order (FIFO)', done => { - // const QUEUE_NAME = 'delayed queue multiple' + uuid(); - // queue = new Queue(QUEUE_NAME); - // let order = 1; - // - // const fn = function(job: Job, jobDone: Function) { - // expect(order).to.be.equal(job.data.order); - // jobDone(); - // - // if (order === 12) { - // queue.close().then(() => { done(); }, () => { done(); }); - // } - // - // order++; - // }; - // - // queue.isReady().then(() => { - // const now = Date.now(); - // const _promises = []; - // let _i = 1; - // for (_i; _i <= 12; _i++) { - // _promises.push( - // queue.add( - // { order: _i }, - // { - // delay: 1000, - // timestamp: now - // } as JobOptions - // ) - // ); - // } - // Promise.all(_promises).then(() => { - // queue.process(fn); - // }); - // }); - // }); - - // TODO - // it('an unlocked job should not be moved to delayed', done => { - // const queue = new Queue('delayed queue'); - // let job: Job; - // - // queue.process((_job, callback) => { - // // Release the lock to simulate the event loop stalling (so failure to renew the lock). - // job = _job; - // job.releaseLock().then(() => { - // // Once it's failed, it should NOT be moved to delayed since this worker lost the lock. - // callback(new Error('retry this job')); - // }); - // }); - // - // queue.on('error', (/*err*/) => { - // job.isDelayed().then((isDelayed: boolean) => { - // expect(isDelayed).to.be.equal(false); - // queue.close().then(() => { done(); }, () => { done(); }); - // }); - // }); - // - // queue.add({ foo: 'bar' }, { backoff: 1000, attempts: 2 }); - // }); - - // TODO - // it('an unlocked job should not be moved to waiting', done => { - // const queue = new Queue('delayed queue'); - // let job: Job; - // - // queue.process((_job, callback) => { - // job = _job; - // // Release the lock to simulate the event loop stalling (so failure to renew the lock). - // job.releaseLock().then(() => { - // // Once it's failed, it should NOT be moved to waiting since this worker lost the lock. - // callback(new Error('retry this job')); - // }); - // }); - // - // queue.on('error', (/*err*/) => { - // job.isWaiting().then((isWaiting: boolean) => { - // expect(isWaiting).to.be.equal(false); - // queue.close().then(() => { done(); }, () => { done(); }); - // }); - // }); - // - // // Note that backoff:0 should immediately retry the job upon failure (ie put it in 'waiting') - // queue.add({ foo: 'bar' }, { backoff: 0, attempts: 2 }); - // }); - }); - - describe('Concurrency process', () => { - let queue: Queue; - - beforeEach(() => { - const client = new IORedis(); - queue = utils.buildQueue(); - return client.flushdb(); - }); - - afterEach(function() { - this.timeout( - queue['settings'].stalledInterval * - (1 + queue['settings'].maxStalledCount), - ); - return queue.close(); - }); - - it('should run job in sequence if I specify a concurrency of 1', done => { - let processing = false; - - queue.process(1, (job, jobDone) => { - expect(processing).to.be.equal(false); - processing = true; - utils.sleep(50).then(() => { - processing = false; - jobDone(); - }); - }); - - queue.add({}); - queue.add({}); - - queue.on('completed', _.after(2, done.bind(null, null))); - }); - - // TODO - //This job use delay to check that at any time we have 4 process in parallel. - //Due to time to get new jobs and call process, false negative can appear. - // it('should process job respecting the concurrency set', done => { - // let nbProcessing = 0; - // let pendingMessageToProcess = 8; - // let wait = 100; - // - // queue - // .process(4, () => { - // nbProcessing++; - // expect(nbProcessing).to.be.lessThan(5); - // - // wait += 100; - // - // return utils.sleep(wait).then(() => { - // //We should not have 4 more in parallel. - // //At the end, due to empty list, no new job will process, so nbProcessing will decrease. - // expect(nbProcessing).to.be.equal( - // Math.min(pendingMessageToProcess, 4) - // ); - // pendingMessageToProcess--; - // nbProcessing--; - // }); - // }) - // .catch(done); - // - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // queue.add(undefined); - // - // queue.on('completed', _.after(8, done.bind(null, null))); - // queue.on('failed', done); - // }); - - // TODO - // it('should wait for all concurrent processing in case of pause', done => { - // let i = 0; - // let nbJobFinish = 0; - // - // queue - // .process(3, (job, jobDone) => { - // let error: any = null; - // - // if (++i === 4) { - // queue.pause().then(() => { - // utils.sleep(500).then(() => { - // // Wait for all the active jobs to finalize. - // expect(nbJobFinish).to.be.above(3); - // queue.resume(); - // }); - // }); - // } - // - // // We simulate an error of one processing job. - // // They had a bug in pause() with this special case. - // if (i % 3 === 0) { - // error = new Error(); - // } - // - // //100 - i*20 is to force to finish job n°4 before lower job that will wait longer - // utils.sleep(100 - i * 10).then(() => { - // nbJobFinish++; - // jobDone(error); - // }); - // }) - // .catch(done); - // - // queue.add({}); - // queue.add({}); - // queue.add({}); - // queue.add({}); - // queue.add({}); - // queue.add({}); - // queue.add({}); - // queue.add({}); - // - // const cb = _.after(8, done.bind(null, null)); - // queue.on('completed', cb); - // queue.on('failed', cb); - // queue.on('error', done); - // }); - }); - - describe('Retries and backoffs', () => { - let queue: Queue; - - afterEach(function() { - this.timeout( - queue['settings'].stalledInterval * - (1 + queue['settings'].maxStalledCount), - ); - return queue.close(); - }); - - it('should not retry a job if it has been marked as unrecoverable', done => { - let tries = 0; - queue = utils.buildQueue('test retries and backoffs'); - queue.isReady().then(() => { - queue.process((job, jobDone) => { - tries++; - expect(tries).to.equal(1); - job.discard(); - jobDone(new Error('unrecoverable error')); - }); - - queue.add( - { foo: 'bar' }, - { - attempts: 5, - }, - ); - }); - queue.on('failed', () => { - done(); - }); - }); - - it('should automatically retry a failed job if attempts is bigger than 1', done => { - queue = utils.buildQueue('test retries and backoffs'); - queue.isReady().then(() => { - let tries = 0; - queue.process((job, jobDone) => { - expect(job.attemptsMade).to.be.equal(tries); - tries++; - if (job.attemptsMade < 2) { - throw new Error('Not yet!'); - } - - jobDone(); - }); - - queue.add( - { foo: 'bar' }, - { - attempts: 3, - }, - ); - }); - queue.on('completed', () => { - done(); - }); - }); - - it('should not retry a failed job more than the number of given attempts times', done => { - queue = utils.buildQueue('test retries and backoffs'); - let tries = 0; - queue.isReady().then(() => { - queue.process((job, jobDone) => { - tries++; - if (job.attemptsMade < 3) { - throw new Error('Not yet!'); - } - expect(job.attemptsMade).to.be.equal(tries - 1); - jobDone(); - }); - - queue.add( - { foo: 'bar' }, - { - attempts: 3, - }, - ); - }); - queue.on('completed', () => { - done(new Error('Failed job was retried more than it should be!')); - }); - queue.on('failed', () => { - if (tries === 3) { - done(); - } - }); - }); - - // TODO - // it('should retry a job after a delay if a fixed backoff is given', function(done) { - // this.timeout(12000); - // queue = utils.buildQueue('test retries and backoffs'); - // let start: number; - // queue.isReady().then(() => { - // queue.process((job, jobDone) => { - // if (job.attemptsMade < 2) { - // throw new Error('Not yet!'); - // } - // jobDone(); - // }); - // - // start = Date.now(); - // queue.add( - // { foo: 'bar' }, - // { - // attempts: 3, - // backoff: 1000 - // } - // ); - // }); - // queue.on('completed', () => { - // const elapse = Date.now() - start; - // expect(elapse).to.be.greaterThan(2000); - // done(); - // }); - // }); - - // TODO - // it('should retry a job after a delay if an exponential backoff is given', function(done) { - // this.timeout(12000); - // queue = utils.buildQueue('test retries and backoffs'); - // let start: number; - // queue.isReady().then(() => { - // queue.process((job, jobDone) => { - // if (job.attemptsMade < 2) { - // throw new Error('Not yet!'); - // } - // jobDone(); - // }); - // - // start = Date.now(); - // queue.add( - // { foo: 'bar' }, - // { - // attempts: 3, - // backoff: { - // type: 'exponential', - // delay: 1000 - // } - // } - // ); - // }); - // queue.on('completed', () => { - // const elapse = Date.now() - start; - // const expected = 1000 * (Math.pow(2, 2) - 1); - // expect(elapse).to.be.greaterThan(expected); - // done(); - // }); - // }); - - // TODO - // it('should retry a job after a delay if a custom backoff is given', function(done) { - // this.timeout(12000); - // queue = utils.buildQueue('test retries and backoffs', { - // settings: { - // backoffStrategies: { - // custom(attemptsMade) { - // return attemptsMade * 1000; - // } - // } - // } - // }); - // let start: number; - // queue.isReady().then(() => { - // queue.process((job, jobDone) => { - // if (job.attemptsMade < 2) { - // throw new Error('Not yet!'); - // } - // jobDone(); - // }); - // - // start = Date.now(); - // queue.add( - // { foo: 'bar' }, - // { - // attempts: 3, - // backoff: { - // type: 'custom' - // } - // } - // ); - // }); - // queue.on('completed', () => { - // const elapse = Date.now() - start; - // expect(elapse).to.be.greaterThan(3000); - // done(); - // }); - // }); - - // it('should not retry a job if the custom backoff returns -1', done => { - // queue = utils.buildQueue('test retries and backoffs', { - // settings: { - // backoffStrategies: { - // custom() { - // return -1; - // } - // } - // } - // }); - // let tries = 0; - // queue.process((job, jobDone) => { - // tries++; - // if (job.attemptsMade < 3) { - // throw new Error('Not yet!'); - // } - // jobDone(); - // }); - // - // queue.add( - // { foo: 'bar' }, - // { - // attempts: 3, - // backoff: { - // type: 'custom' - // } - // } - // ); - // queue.on('completed', () => { - // done(new Error('Failed job was retried more than it should be!')); - // }); - // queue.on('failed', () => { - // if (tries === 1) { - // done(); - // } - // }); - // }); - - // TODO - // it('should retry a job after a delay if a custom backoff is given based on the error thrown', function(done) { - // function CustomError() {} - // - // this.timeout(12000); - // queue = utils.buildQueue('test retries and backoffs', { - // settings: { - // backoffStrategies: { - // custom(attemptsMade, err) { - // if (err instanceof CustomError) { - // return 1500; - // } - // return 500; - // } - // } - // } - // }); - // let start: number; - // queue.isReady().then(() => { - // queue.process((job, jobDone) => { - // if (job.attemptsMade < 2) { - // throw (new (CustomError as any)()); - // } - // jobDone(); - // }); - // - // start = Date.now(); - // queue.add( - // { foo: 'bar' }, - // { - // attempts: 3, - // backoff: { - // type: 'custom' - // } - // } - // ); - // }); - // queue.on('completed', () => { - // const elapse = Date.now() - start; - // expect(elapse).to.be.greaterThan(3000); - // done(); - // }); - // }); - - // it('should not retry a job that has been removed', done => { - // queue = utils.buildQueue('retry a removed job'); - // let attempts = 0; - // const failedError = new Error('failed'); - // queue.process((job, jobDone) => { - // if (attempts === 0) { - // attempts++; - // throw failedError; - // } else { - // jobDone(); - // } - // }); - // queue.add({ foo: 'bar' }); - // - // const failedHandler = _.once((job, err) => { - // expect(job.data.foo).to.equal('bar'); - // expect(err).to.equal(failedError); - // expect(job.failedReason).to.equal(failedError.message); - // - // try { - // job - // .retry() - // .then(() => { - // return utils.sleep(100).then(() => { - // return queue.getCompletedCount().then(count => { - // return expect(count).to.equal(1); - // }); - // }); - // }) - // .then(() => { - // return queue.clean(0).then(() => { - // return job.retry().catch(err => { - // expect(err.message).to.equal( - // Queue.ErrorMessages.RETRY_JOB_NOT_EXIST - // ); - // }); - // }); - // }) - // .then(() => { - // return Promise.all([ - // queue.getCompletedCount().then(count => { - // return expect(count).to.equal(0); - // }), - // queue.getFailedCount().then(count => { - // return expect(count).to.equal(0); - // }) - // ]); - // }) - // .then(() => { - // done(); - // }, done); - // } catch (err) { - // console.error(err); - // } - // }); - // - // queue.on('failed', failedHandler); - // }); - - // it('should not retry a job that has been retried already', done => { - // queue = utils.buildQueue('retry already retried job'); - // const failedError = new Error('failed'); - // queue.isReady().then(() => { - // let attempts = 0; - // queue.process((job, jobDone) => { - // if (attempts === 0) { - // attempts++; - // throw failedError; - // } else { - // jobDone(); - // } - // }); - // - // queue.add({ foo: 'bar' }); - // }); - // - // const failedHandler = _.once((job, err) => { - // expect(job.data.foo).to.equal('bar'); - // expect(err).to.equal(failedError); - // - // job - // .retry() - // .then(() => { - // return utils.sleep(100).then(() => { - // return queue.getCompletedCount().then(count => { - // return expect(count).to.equal(1); - // }); - // }); - // }) - // .then(() => { - // return job.retry().catch(err => { - // expect(err.message).to.equal( - // Queue.ErrorMessages.RETRY_JOB_NOT_FAILED - // ); - // }); - // }) - // .then(() => { - // return Promise.all([ - // queue.getCompletedCount().then(count => { - // return expect(count).to.equal(1); - // }), - // queue.getFailedCount().then(count => { - // return expect(count).to.equal(0); - // }) - // ]); - // }) - // .then(() => { - // done(); - // }, done); - // }); - // - // queue.on('failed', failedHandler); - // }); - - // it('should not retry a job that is locked', done => { - // queue = utils.buildQueue('retry a locked job'); - // const addedHandler = _.once(job => { - // expect(job.data.foo).to.equal('bar'); - // - // utils.sleep(100).then(() => { - // job - // .retry() - // .catch(err => { - // expect(err.message).to.equal( - // Queue.ErrorMessages.RETRY_JOB_IS_LOCKED - // ); - // return null; - // }) - // .then(done, done); - // }); - // }); - // - // queue.process((/*job*/) => { - // return utils.sleep(300); - // }); - // queue.add({ foo: 'bar' }).then(addedHandler); - // }); - - // TODO - // it('an unlocked job should not be moved to failed', done => { - // queue = utils.buildQueue('test unlocked failed'); - // - // queue.process((job, callback) => { - // // Release the lock to simulate the event loop stalling (so failure to renew the lock). - // job.releaseLock().then(() => { - // // Once it's failed, it should NOT be moved to failed since this worker lost the lock. - // callback(new Error('retry this job')); - // }); - // }); - // - // queue.on('failed', job => { - // job.isFailed().then(isFailed => { - // expect(isFailed).to.be.equal(false); - // }); - // }); - // - // queue.on('error', (/*err*/) => { - // queue.close().then(() => { done(); }, () => { done(); }); - // }); - // - // // Note that backoff:0 should immediately retry the job upon failure (ie put it in 'waiting') - // queue.add({ foo: 'bar' }, { backoff: 0, attempts: 2 }); - // }); - }); - - describe('Cleaner', () => { - let queue: Queue; - - beforeEach(() => { - queue = utils.buildQueue('cleaner' + uuid()); - }); - - afterEach(function() { - this.timeout( - queue['settings'].stalledInterval * - (1 + queue['settings'].maxStalledCount), - ); - return queue.close(); - }); - - // TODO clean not supported - it('should reject the cleaner with no grace', done => { - queue.clean(undefined).then( - () => { - done(new Error('Promise should not resolve')); - }, - err => { - expect(err).to.be.instanceof(Error); - done(); - }, - ); - }); - - // TODO clean not supported - it('should reject the cleaner an unknown type', done => { - queue.clean(0, 'bad' as any).then( - () => { - done(new Error('Promise should not resolve')); - }, - e => { - expect(e).to.be.instanceof(Error); - done(); - }, - ); - }); - - // TODO clean not supported - // it('should clean an empty queue', done => { - // const testQueue = utils.buildQueue('cleaner' + uuid()); - // testQueue.isReady().then(() => { - // return testQueue.clean(0); - // }); - // testQueue.on('error', err => { - // utils.cleanupQueue(testQueue); - // done(err); - // }); - // testQueue.on('cleaned', (jobs, type) => { - // expect(type).to.be.equal('completed'); - // expect(jobs.length).to.be.equal(0); - // utils.cleanupQueue(testQueue); - // done(); - // }); - // }); - - // TODO clean not supported - it('should clean two jobs from the queue', done => { - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - queue.process((job, jobDone) => { - jobDone(); - }); - - queue.on( - 'completed', - _.after(2, () => { - queue.clean(0).then(jobs => { - expect(jobs.length).to.be.equal(2); - done(); - }, done); - }), - ); - }); - - // TODO clean not supported - // it('should only remove a job outside of the grace period', done => { - // queue.process((job, jobDone) => { - // jobDone(); - // }); - // queue.add({ some: 'data' }); - // queue.add({ some: 'data' }); - // utils.sleep(200) - // .then(() => { - // queue.add({ some: 'data' }); - // queue.clean(100); - // return null; - // }) - // .then(() => { - // return utils.sleep(100); - // }) - // .then(() => { - // return queue.getCompleted(); - // }) - // .then(jobs => { - // expect(jobs.length).to.be.equal(1); - // return queue.empty(); - // }) - // .then(() => { - // done(); - // }); - // }); - - it('should clean all failed jobs', done => { - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - queue.process((job, jobDone) => { - jobDone(new Error('It failed')); - }); - utils - .sleep(100) - .then(() => { - return queue.clean(0, 'failed'); - }) - .then(jobs => { - expect(jobs.length).to.be.equal(2); - return queue.count(); - }) - .then(len => { - expect(len).to.be.equal(0); - done(); - }); - }); - - it('should clean all waiting jobs', done => { - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - utils - .sleep(100) - .then(() => { - return queue.clean(0, 'wait'); - }) - .then(jobs => { - expect(jobs.length).to.be.equal(2); - return queue.count(); - }) - .then(len => { - expect(len).to.be.equal(0); - done(); - }); - }); - - it('should clean all delayed jobs', done => { - queue.add({ some: 'data' }, { delay: 5000 }); - queue.add({ some: 'data' }, { delay: 5000 }); - utils - .sleep(100) - .then(() => { - return queue.clean(0, 'delayed'); - }) - .then(jobs => { - expect(jobs.length).to.be.equal(2); - return queue.count(); - }) - .then(len => { - expect(len).to.be.equal(0); - done(); - }); - }); - - it('should clean the number of jobs requested', done => { - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - utils - .sleep(100) - .then(() => { - return queue.clean(0, 'wait', 1); - }) - .then(jobs => { - expect(jobs.length).to.be.equal(1); - return queue.count(); - }) - .then(len => { - expect(len).to.be.equal(2); - done(); - }); - }); - - it('should clean a job without a timestamp', done => { - const client = new IORedis(6379, '127.0.0.1', {}); - - queue.add({ some: 'data' }); - queue.add({ some: 'data' }); - queue.process((job, jobDone) => { - jobDone(new Error('It failed')); - }); - - utils - .sleep(100) - .then(() => { - return new Promise(resolve => { - client.hdel( - 'bull:' + queue.name + ':1', - 'timestamp', - resolve as any, - ); - }); - }) - .then(() => { - return queue.clean(0, 'failed'); - }) - .then(jobs => { - expect(jobs.length).to.be.equal(2); - return queue.getFailed(); - }) - .then(failed => { - expect(failed.length).to.be.equal(0); - done(); - }); - }); - }); -}); diff --git a/src/test/compat/utils.ts b/src/test/compat/utils.ts deleted file mode 100644 index 8082b39b63..0000000000 --- a/src/test/compat/utils.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { - Queue3 as Queue, - QueueOptions3 as QueueOptions, -} from '@src/classes/compat'; -import _ from 'lodash'; - -const STD_QUEUE_NAME = 'test queue'; - -let queues: Queue[] = []; - -const originalSetTimeout = setTimeout; - -export function simulateDisconnect(queue: Queue) { - queue.client.disconnect(); - queue.eclient.disconnect(); -} - -export function buildQueue(name?: string, options?: QueueOptions) { - options = _.extend({ redis: { port: 6379, host: '127.0.0.1' } }, options); - const queue = new Queue(name || STD_QUEUE_NAME, options); - queues.push(queue); - return queue; -} - -export function newQueue(name?: string, opts?: QueueOptions) { - const queue = buildQueue(name, opts); - return queue.isReady(); -} - -export function cleanupQueue(queue: Queue) { - return queue.empty().then(queue.close.bind(queue)); -} - -export function cleanupQueues() { - return Promise.all( - queues.map(queue => { - const errHandler = function() {}; - queue.on('error', errHandler); - return queue.close().catch(errHandler); - }), - ).then(() => { - queues = []; - }); -} - -export function sleep(ms: number, retval?: any) { - return new Promise(resolve => { - originalSetTimeout(() => { - if (retval && retval.value) { - resolve(retval.value); - } else { - resolve(); - } - }, ms); - }); -}