diff --git a/src/classes/compat.ts b/src/classes/compat.ts index c83fa25857..d607899126 100644 --- a/src/classes/compat.ts +++ b/src/classes/compat.ts @@ -39,9 +39,12 @@ import { import _ from 'lodash'; import url from 'url'; -export class Queue3 extends EventEmitter { - static readonly DEFAULT_JOB_NAME = '__default__'; +type CommonOptions = QueueKeeperOptions & + QueueOptions & + WorkerOptions & + QueueEventsOptions; +export class Queue3 extends EventEmitter { /** * The name of the queue */ @@ -61,21 +64,16 @@ export class Queue3 extends EventEmitter { toKey: (type: string) => string; - private opts: QueueOptions3; - private settings: AdvancedSettings3; - private defaultJobOptions: JobOptions3; + private opts: CommonOptions; + private settings: AdvancedOpts; + private defaultJobOptions: JobsOpts; private keyPrefix: string; - private queue: Queue; - private queuePromise: Promise; - private queuePromiseResolve: (value: Queue) => void; - private queuePromiseReject: (reason: any) => void; - private queueScheduler: QueueScheduler; + private readonly queue: Queue; private queueEvents: QueueEvents; private worker: Worker; - private workerPromise: Promise; - private workerPromiseResolve: (value: Worker) => void; - private workerPromiseReject: (reason: any) => void; + private queueScheduler: QueueScheduler; + private readonly handlers: { [key: string]: Function } = {}; /** @@ -84,10 +82,7 @@ export class Queue3 extends EventEmitter { * Everytime the same queue is instantiated it tries to process all the old jobs * that may exist from a previous unfinished session. */ - constructor(queueName: string, opts?: QueueOptions3); - constructor(queueName: string, url: string, opts?: QueueOptions3); - - constructor(queueName: string, arg2?: any, arg3?: any) { + constructor(name: string, opts?: CommonOptions) { super(); Object.defineProperties(this, { @@ -121,7 +116,7 @@ export class Queue3 extends EventEmitter { }, client: { get: () => { - return (this.getQueue() as any).connection.client; + return (this.queue as any).connection.client; }, }, eclient: { @@ -131,7 +126,7 @@ export class Queue3 extends EventEmitter { }, clients: { get: () => { - const clients = [this.getQueue().client]; + const clients = [this.queue.client]; this.queueEvents && clients.push(this.queueEvents.client); this.worker && clients.push(this.worker.client); return clients; @@ -139,85 +134,15 @@ export class Queue3 extends EventEmitter { }, toKey: { get: () => { - return this.getQueue().toKey; - }, - }, - }); - - let opts: QueueOptions3; - - if (_.isString(arg2)) { - opts = _.extend( - {}, - { - redis: Utils.redisOptsFromUrl(arg2), + return this.queue.toKey; }, - arg3, - ); - } else { - opts = arg2 || {}; - } - - opts.settings = opts.settings || {}; - _.defaults(opts.settings, { - lockDuration: 30000, - stalledInterval: 30000, - maxStalledCount: 1, - guardInterval: 5000, - retryProcessDelay: 5000, - drainDelay: 5, - backoffStrategies: {}, - }); - opts.settings.lockRenewTime = - opts.settings.lockRenewTime || opts.settings.lockDuration / 2; - - opts.redis = opts.redis || {}; - _.defaults(opts.redis, { - port: 6379, - host: '127.0.0.1', - retryStrategy: (times: number) => { - return Math.min(Math.exp(times), 20000); }, }); - if (!opts.redis.db && opts.redis.db !== 0) { - opts.redis.db = 0; - if ((opts.redis as any).DB) { - opts.redis.db = (opts.redis as any).DB; - } - } - this.opts = opts; - this.settings = opts.settings; - - if (opts.defaultJobOptions) { - this.defaultJobOptions = opts.defaultJobOptions; - } - - this.name = queueName || ''; - - this.keyPrefix = - (this.opts.redis && this.opts.redis.keyPrefix) || - this.opts.prefix || - 'bull'; - - // - // We cannot use ioredis keyPrefix feature since we - // create keys dynamically in lua scripts. - // - if (this.opts.redis && this.opts.redis.keyPrefix) { - delete this.opts.redis.keyPrefix; - } + this.name = name; - this.queuePromise = new Promise((resolve, reject) => { - this.queuePromiseResolve = resolve; - this.queuePromiseReject = reject; - }).catch(e => this.queue); - - this.workerPromise = new Promise((resolve, reject) => { - this.workerPromiseResolve = resolve; - this.workerPromiseReject = reject; - }).catch(e => this.worker); + this.queue = new Queue(this.name, this.opts); } /** @@ -225,12 +150,10 @@ export class Queue3 extends EventEmitter { * This replaces the `ready` event emitted on Queue in previous verisons. */ async isReady(): Promise { - await this.getQueue().waitUntilReady(); + await this.queue.waitUntilReady(); return this; } - /* tslint:disable:unified-signatures */ - /** * Defines a processing function for the jobs placed into a given Queue. * @@ -249,215 +172,21 @@ export class Queue3 extends EventEmitter { * If the promise is rejected, the error will be passed as a second argument to the "failed" event. * If it is resolved, its value will be the "completed" event's second argument. */ - process(callback: ProcessCallbackFunction3): Promise; - process(callback: ProcessPromiseFunction3): Promise; - process(callback: string): Promise; - - /** - * Defines a processing function for the jobs placed into a given Queue. - * - * The callback is called everytime a job is placed in the queue. - * It is passed an instance of the job as first argument. - * - * If the callback signature contains the second optional done argument, - * the callback will be passed a done callback to be called after the job has been completed. - * The done callback can be called with an Error instance, to signal that the job did not complete successfully, - * or with a result as second argument (e.g.: done(null, result);) when the job is successful. - * Errors will be passed as a second argument to the "failed" event; results, - * as a second argument to the "completed" event. - * - * If, however, the callback signature does not contain the done argument, - * a promise must be returned to signal job completion. - * If the promise is rejected, the error will be passed as a second argument to the "failed" event. - * If it is resolved, its value will be the "completed" event's second argument. - * - * @param concurrency Bull will then call your handler in parallel respecting this maximum value. - */ - process( - concurrency: number, - callback: ProcessCallbackFunction3, - ): Promise; - process( - concurrency: number, - callback: ProcessPromiseFunction3, - ): Promise; - process(concurrency: number, callback: string): Promise; - - /** - * Defines a processing function for the jobs placed into a given Queue. - * - * The callback is called everytime a job is placed in the queue. - * It is passed an instance of the job as first argument. - * - * If the callback signature contains the second optional done argument, - * the callback will be passed a done callback to be called after the job has been completed. - * The done callback can be called with an Error instance, to signal that the job did not complete successfully, - * or with a result as second argument (e.g.: done(null, result);) when the job is successful. - * Errors will be passed as a second argument to the "failed" event; - * results, as a second argument to the "completed" event. - * - * If, however, the callback signature does not contain the done argument, - * a promise must be returned to signal job completion. - * If the promise is rejected, the error will be passed as a second argument to the "failed" event. - * If it is resolved, its value will be the "completed" event's second argument. - * - * @param name Bull will only call the handler if the job name matches - */ - process(name: string, callback: ProcessCallbackFunction3): Promise; - process(name: string, callback: ProcessPromiseFunction3): Promise; - process(name: string, callback: string): Promise; - - /** - * Defines a processing function for the jobs placed into a given Queue. - * - * The callback is called everytime a job is placed in the queue. - * It is passed an instance of the job as first argument. - * - * If the callback signature contains the second optional done argument, - * the callback will be passed a done callback to be called after the job has been completed. - * The done callback can be called with an Error instance, to signal that the job did not complete successfully, - * or with a result as second argument (e.g.: done(null, result);) when the job is successful. - * Errors will be passed as a second argument to the "failed" event; - * results, as a second argument to the "completed" event. - * - * If, however, the callback signature does not contain the done argument, - * a promise must be returned to signal job completion. - * If the promise is rejected, the error will be passed as a second argument to the "failed" event. - * If it is resolved, its value will be the "completed" event's second argument. - * - * @param name Bull will only call the handler if the job name matches - * @param concurrency Bull will then call your handler in parallel respecting this maximum value. - */ - process( - name: string, - concurrency: number, - callback: ProcessCallbackFunction3, - ): Promise; - process( - name: string, - concurrency: number, - callback: ProcessPromiseFunction3, - ): Promise; - process(name: string, concurrency: number, callback: string): Promise; - - process(arg1: any, arg2?: any, arg3?: any): Promise { - let name: string = Queue3.DEFAULT_JOB_NAME; - let concurrency = 1; - let handler: Function; - let handlerFile: string; - - if (arguments.length === 1) { - if (typeof arg1 === 'function') { - handler = arg1; - } else if (typeof arg1 === 'string') { - handlerFile = arg1; - } - } else if (arguments.length === 2) { - if (typeof arg1 === 'number') { - concurrency = arg1 > 0 ? arg1 : 1; - } else if (typeof arg1 === 'string') { - name = arg1; - } - if (typeof arg2 === 'function') { - handler = arg2; - } else if (typeof arg2 === 'string') { - handlerFile = arg2; - } - } else if (arguments.length === 3) { - if (typeof arg1 === 'string') { - name = arg1; - } - if (typeof arg2 === 'number') { - concurrency = arg2 > 0 ? arg2 : 1; - } - if (typeof arg3 === 'function') { - handler = arg3; - } else if (typeof arg3 === 'string') { - handlerFile = arg3; - } - } - - if (!handler && !handlerFile) { - throw new Error('Cannot set an undefined handler'); - } - if (this.handlers[name]) { - throw new Error('Cannot define the same handler twice ' + name); - } - - if (handlerFile && name !== Queue3.DEFAULT_JOB_NAME) { - throw new Error( - 'Named processors are not supported with sandboxed workers', - ); + process(processor: string | Processor): Promise { + if (this.worker) { + throw new Error('Queue3.process() cannot be called twice'); } - this.handlers[name] = handler; - - if (!this.worker) { - const workerOpts = Utils.convertToWorkerOptions(this.opts); - workerOpts.concurrency = concurrency; - if (handlerFile) { - this.worker = new Worker(this.name, handlerFile, workerOpts); - } else { - this.worker = new Worker( - this.name, - Queue3.createProcessor(this), - workerOpts, - ); - } - this.getQueueScheduler(); // create scheduler together with worker - this.workerPromiseResolve(this.worker); - } + this.worker = new Worker(this.name, processor, this.opts); + this.queueScheduler = new QueueScheduler(this.name, this.opts); return this.worker.waitUntilReady(); } - /* tslint:enable:unified-signatures */ - - /** - * Creates a new job and adds it to the queue. - * If the queue is empty the job will be executed directly, - * otherwise it will be placed in the queue and executed as soon as possible. - */ - add(data: T, opts?: JobOptions3): Promise; - - /** - * Creates a new named job and adds it to the queue. - * If the queue is empty the job will be executed directly, - * otherwise it will be placed in the queue and executed as soon as possible. - */ - add(name: string, data: T, opts?: JobOptions3): Promise; - - async add(arg1: any, arg2?: any, arg3?: any): Promise { - let name: string = Queue3.DEFAULT_JOB_NAME; - let data: any; - let opts: JobOptions3 = {}; - - if (typeof arg1 === 'string') { - name = arg1 || Queue3.DEFAULT_JOB_NAME; - data = arg2; - opts = arg3 || {}; - } else { - data = arg1; - opts = arg2 || {}; - } - - opts = _.cloneDeep(opts || {}); - _.defaults(opts, this.defaultJobOptions); - - if (opts.repeat) { - const result = await this.getQueue().repeat.addNextRepeatableJob( - name, - data, - Utils.convertToJobsOpts(opts), - true, - ); - return result; + append(jobName: string, data: any, opts?: JobsOpts): Promise { + if (opts && opts.repeat) { + return this.queue.repeat.addNextRepeatableJob(jobName, data, opts, false); } else { - const result = await this.getQueue().append( - name, - data, - Utils.convertToJobsOpts(opts), - ); - return result; + return this.queue.append(jobName, data, opts); } } @@ -471,12 +200,15 @@ export class Queue3 extends EventEmitter { * * Pausing a queue that is already paused does nothing. */ - async pause(isLocal?: boolean): Promise { - if (isLocal) { - return this.worker && this.worker.pause(true); - } else { - return this.queue && this.queue.pause(); + async pause(): Promise { + return this.queue.pause(); + } + + async pauseWorker(doNotWaitActive?: boolean): Promise { + if (!this.worker) { + throw new Error('Worker is not initialized, call process() first'); } + return this.worker.pause(doNotWaitActive); } /** @@ -489,15 +221,18 @@ export class Queue3 extends EventEmitter { * * Resuming a queue that is not paused does nothing. */ - async resume(isLocal?: boolean): Promise { - if (isLocal) { - return this.worker && this.worker.resume(); - } else { - return this.queue && this.queue.resume(); + async resume(): Promise { + return this.queue.resume(); + } + + async resumeWorker(): Promise { + if (!this.worker) { + throw new Error('Worker is not initialized, call process() first'); } + return this.worker.resume(); } - isPaused(): boolean { + isWorkerPaused(): boolean { return this.worker && this.worker.isPaused(); } @@ -507,14 +242,14 @@ export class Queue3 extends EventEmitter { * this value may be true only for a very small amount of time. */ count(): Promise { - return this.getQueue().count(); + return this.queue.count(); } /** * Empties a queue deleting all the input lists and associated jobs. */ empty(): Promise { - return this.getQueue().drain(true); + return this.queue.drain(); } /** @@ -546,42 +281,42 @@ export class Queue3 extends EventEmitter { * If the specified job cannot be located, the promise callback parameter will be set to null. */ getJob(jobId: string): Promise { - return this.getQueue().getJob(jobId); + return this.queue.getJob(jobId); } /** * Returns a promise that will return an array with the waiting jobs between start and end. */ getWaiting(start = 0, end = -1): Promise> { - return this.getQueue().getWaiting(start, end); + return this.queue.getWaiting(start, end); } /** * Returns a promise that will return an array with the active jobs between start and end. */ getActive(start = 0, end = -1): Promise> { - return this.getQueue().getActive(start, end); + return this.queue.getActive(start, end); } /** * Returns a promise that will return an array with the delayed jobs between start and end. */ getDelayed(start = 0, end = -1): Promise> { - return this.getQueue().getDelayed(start, end); + return this.queue.getDelayed(start, end); } /** * Returns a promise that will return an array with the completed jobs between start and end. */ getCompleted(start = 0, end = -1): Promise> { - return this.getQueue().getCompleted(start, end); + return this.queue.getCompleted(start, end); } /** * Returns a promise that will return an array with the failed jobs between start and end. */ async getFailed(start = 0, end = -1): Promise> { - return this.getQueue().getFailed(start, end); + return this.queue.getFailed(start, end); } /** @@ -593,7 +328,7 @@ export class Queue3 extends EventEmitter { end = -1, asc = false, ): Promise { - return this.getQueue().repeat.getRepeatableJobs(start, end, asc); + return this.queue.repeat.getRepeatableJobs(start, end, asc); } /** @@ -602,58 +337,32 @@ export class Queue3 extends EventEmitter { async nextRepeatableJob( name: string, data: any, - opts: JobOptions3, + opts?: JobsOpts, skipCheckExists?: boolean, ): Promise { - return this.getQueue().repeat.addNextRepeatableJob( - name || Queue3.DEFAULT_JOB_NAME, + return this.queue.repeat.addNextRepeatableJob( + name, data, - Utils.convertToJobsOpts(opts), + opts, skipCheckExists, ); } - /** - * Removes a given repeatable job. The RepeatOptions and JobId needs to be the same as the ones - * used for the job when it was added. - */ - removeRepeatable( - repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }, - ): Promise; - /** * Removes a given repeatable job. The RepeatOptions and JobId needs to be the same as the ones * used for the job when it was added. * * name: The name of the to be removed job */ - removeRepeatable( - name: string, - repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }, - ): Promise; - - async removeRepeatable(arg1: any, arg2?: any): Promise { - let name: string = Queue3.DEFAULT_JOB_NAME; - let repeat: (CronRepeatOptions3 | EveryRepeatOptions3) & { jobId?: string }; - - if (typeof arg1 === 'string') { - name = arg1; - repeat = arg2; - } else { - repeat = arg1; - } - return this.getQueue().repeat.removeRepeatable( - name, - Utils.convertToRepeatOpts(repeat), - repeat.jobId, - ); + async removeRepeatable(name: string, repeat: RepeatOpts): Promise { + return this.queue.repeat.removeRepeatable(name, repeat, repeat.jobId); } /** * Removes a given repeatable job by key. */ async removeRepeatableByKey(repeatJobKey: string): Promise { - const repeat = this.getQueue().repeat; + const repeat = this.queue.repeat; await repeat.waitUntilReady(); const tokens = repeatJobKey.split(':'); @@ -686,12 +395,11 @@ export class Queue3 extends EventEmitter { end = -1, asc = false, ): Promise> { - return this.getQueue().getJobs(types, start, end, asc); + return this.queue.getJobs(types, start, end, asc); } async getNextJob(): Promise { - await this.getWorker().waitUntilReady(); - return this.worker.getNextJob(); + throw new Error('Not supported'); } /** @@ -703,70 +411,70 @@ export class Queue3 extends EventEmitter { start = 0, end = -1, ): Promise<{ logs: string[]; count: number }> { - return this.getQueue().getJobLogs(jobId, start, end); + return this.queue.getJobLogs(jobId, start, end); } /** * Returns a promise that resolves with the job counts for the given queue. */ getJobCounts(...types: string[]): Promise<{ [index: string]: number }> { - return this.getQueue().getJobCounts(...Utils.parseTypeArg(types)); + return this.queue.getJobCounts(...types); } /** * Returns a promise that resolves with the job counts for the given queue of the given types. */ async getJobCountByTypes(...types: string[]): Promise { - return this.getQueue().getJobCountByTypes(...Utils.parseTypeArg(types)); + return this.queue.getJobCountByTypes(...types); } /** * Returns a promise that resolves with the quantity of completed jobs. */ getCompletedCount(): Promise { - return this.getQueue().getCompletedCount(); + return this.queue.getCompletedCount(); } /** * Returns a promise that resolves with the quantity of failed jobs. */ getFailedCount(): Promise { - return this.getQueue().getFailedCount(); + return this.queue.getFailedCount(); } /** * Returns a promise that resolves with the quantity of delayed jobs. */ getDelayedCount(): Promise { - return this.getQueue().getDelayedCount(); + return this.queue.getDelayedCount(); } /** * Returns a promise that resolves with the quantity of waiting jobs. */ getWaitingCount(): Promise { - return this.getQueue().getWaitingCount(); + return this.queue.getWaitingCount(); } /** * Returns a promise that resolves with the quantity of paused jobs. */ getPausedCount(): Promise { - return this.getQueue().getJobCountByTypes('paused'); + return this.queue.getJobCountByTypes('paused'); } /** * Returns a promise that resolves with the quantity of active jobs. */ getActiveCount(): Promise { - return this.getQueue().getActiveCount(); + return this.queue.getActiveCount(); } /** * Returns a promise that resolves to the quantity of repeatable jobs. */ getRepeatableCount(): Promise { - return this.getQueue().repeat.getRepeatableCount(); + return this.queue.repeat.getRepeatableCount(); } /** @@ -782,7 +490,7 @@ export class Queue3 extends EventEmitter { status: JobStatusClean3 = 'completed', limit = -1, ): Promise> { - return this.getQueue().clean(grace, status, limit); + return this.queue.clean(grace, status, limit); } /** @@ -856,92 +564,29 @@ export class Queue3 extends EventEmitter { on(event: 'drained', callback: EventCallback3): this; // tslint:disable-line unified-signatures on(event: string | symbol, listener: (...args: any[]) => void): this { - return this.registerEventHandler(false, event, listener); + return this.attachListener(false, event, listener); } once(event: string | symbol, listener: (...args: any[]) => void): this { - return this.registerEventHandler(true, event, listener); + return this.attachListener(true, event, listener); } off(event: string | symbol, listener: (...args: any[]) => void): this { - return this.removeListener(event, listener); + return this.detachListener(event, listener); } removeListener( event: string | symbol, listener: (...args: any[]) => void, ): this { - const global = this.queueEvents; - - switch (event) { - case 'active': - this.onWorkerInit(worker => { - worker.removeListener('active', listener); - }); - break; - case 'completed': - this.onWorkerInit(worker => { - worker.removeListener('completed', listener); - }); - break; - case 'drained': - this.onWorkerInit(worker => { - worker.removeListener('drained', listener); - }); - break; - case 'failed': - this.onWorkerInit(worker => { - worker.removeListener('failed', listener); - }); - break; - case 'paused': - this.onWorkerInit(worker => { - worker.removeListener('paused', listener); - }); - this.onQueueInit(queue => { - queue.removeListener('paused', listener); - }); - break; - case 'resumed': - this.onWorkerInit(worker => { - worker.removeListener('resumed', listener); - }); - this.onQueueInit(queue => { - queue.removeListener('resumed', listener); - }); - break; - case 'progress': - this.onWorkerInit(worker => { - worker.removeListener('progress', listener); - }); - this.onQueueInit(queue => { - queue.removeListener('progress', listener); - }); - break; - case 'global:active': - global && global.removeListener('active', listener); - break; - case 'global:completed': - global && global.removeListener('completed', listener); - break; - case 'global:drained': - global && global.removeListener('drained', listener); - break; - case 'global:failed': - global && global.removeListener('failed', listener); - break; - case 'global:paused': - global && global.removeListener('paused', listener); - break; - case 'global:resumed': - global && global.removeListener('resumed', listener); - break; - case 'global:waiting': - global && global.removeListener('waiting', listener); - break; + if (!listener) { + throw new Error('listener is required'); } + return this.detachListener(event, listener); + } - return this; + removeAllListeners(event: string | symbol): this { + return this.detachListener(event); } /** @@ -955,21 +600,21 @@ export class Queue3 extends EventEmitter { * Returns Redis clients array which belongs to current Queue */ getWorkers(): Promise<{ [key: string]: string }[]> { - return this.getQueue().getWorkers(); + return this.queue.getWorkers(); } /** * Returns Queue name in base64 encoded format */ base64Name(): string { - return (this.getQueue() as any).base64Name(); + return (this.queue as any).base64Name(); } /** * Returns Queue name with keyPrefix (default: 'bull') */ clientName(): string { - return (this.getQueue() as any).clientName(); + return (this.queue as any).clientName(); } /** @@ -978,239 +623,109 @@ export class Queue3 extends EventEmitter { * @param list String with all redis clients */ parseClientList(list: string): { [key: string]: string }[] { - return (this.getQueue() as any).parseClientList(list); + return (this.queue as any).parseClientList(list); } retryJob(job: Job): Promise { return job.retry(); } - private getQueueScheduler() { - if (!this.queueScheduler) { - this.queueScheduler = new QueueScheduler( - this.name, - Utils.convertToQueueKeeperOptions(this.opts), - ); - } - return this.queueScheduler; - } - - private getQueue() { - if (!this.queue) { - this.queue = new Queue(this.name, Utils.convertToQueueOptions(this.opts)); - this.queuePromiseResolve(this.queue); - } - return this.queue; - } - - private getWorker() { - if (!this.worker) { - this.worker = new Worker( - this.name, - Queue3.createProcessor(this), - Utils.convertToWorkerOptions(this.opts), - ); - this.workerPromiseResolve(this.worker); - this.getQueueScheduler(); // create scheduler together with worker - } - return this.worker; - } - private getQueueEvents() { if (!this.queueEvents) { - this.queueEvents = new QueueEvents( - this.name, - Utils.convertToQueueEventsOptions(this.opts), - ); + this.queueEvents = new QueueEvents(this.name, this.opts); this.queueEvents.init(); } return this.queueEvents; } - private onQueueInit(cb: (queue: Queue) => void) { - this.queuePromise = this.queuePromise - .then(_ => { - cb(this.queue); - return this.queue; - }) - .catch(_ => { - return this.queue; - }); - } - - private onWorkerInit(cb: (worker: Worker) => void) { - this.workerPromise = this.workerPromise - .then(_ => { - cb(this.worker); - return this.worker; - }) - .catch(_ => { - return this.worker; - }); + private ensureWorkerCreated() { + if (!this.worker) { + throw new Error( + 'You should create internal ' + + 'worker by calling progress() ' + + 'prior to attach listeners to worker events', + ); + } } - private registerEventHandler( + private attachListener( once: boolean, event: string | symbol, listener: (...args: any[]) => void, ): this { switch (event) { case 'active': - console.warn(`jobPromise won't be available on 'active' event handler`); + this.ensureWorkerCreated(); if (once) { - this.onWorkerInit(worker => { - worker.once('active', (job, jobPromise, prev) => { - listener(job, jobPromise, prev); - }); - }); + this.worker.once('active', listener); } else { - this.onWorkerInit(worker => { - worker.on('active', (job, jobPromise, prev) => { - listener(job, jobPromise, prev); - }); - }); + this.worker.on('active', listener); } break; - case 'cleaned': - console.warn(`listening on 'cleaned' event is not supported`); - break; case 'completed': + this.ensureWorkerCreated(); if (once) { - this.onWorkerInit(worker => { - worker.once('completed', (job, returnvalue, prev) => { - listener(job, returnvalue, prev); - }); - }); + this.worker.once('completed', listener); } else { - this.onWorkerInit(worker => { - worker.on('completed', (job, returnvalue, prev) => { - listener(job, returnvalue, prev); - }); - }); + this.worker.on('completed', listener); } break; case 'drained': + this.ensureWorkerCreated(); if (once) { - this.onWorkerInit(worker => { - worker.once('drained', listener); - }); + this.worker.once('drained', listener); } else { - this.onWorkerInit(worker => { - worker.on('drained', listener); - }); + this.worker.on('drained', listener); } break; - case 'error': - console.warn(`listening on 'error' event is not supported`); - break; case 'failed': + this.ensureWorkerCreated(); if (once) { - this.onWorkerInit(worker => { - worker.once('failed', (job, failedReason, prev) => { - listener(job, failedReason, prev); - }); - }); + this.worker.once('failed', listener); } else { - this.onWorkerInit(worker => { - worker.on('failed', (job, failedReason, prev) => { - listener(job, failedReason, prev); - }); - }); + this.worker.on('failed', listener); } break; case 'paused': if (once) { - this.onWorkerInit(worker => { - worker.once('paused', () => { - listener(); - }); - }); - this.getQueue().once('paused', () => { - listener(); - }); + this.queue.once('paused', listener); } else { - this.onWorkerInit(worker => { - worker.on('paused', () => { - listener(); - }); - }); - this.getQueue().on('paused', () => { - listener(); - }); + this.queue.on('paused', listener); } break; case 'resumed': if (once) { - this.onWorkerInit(worker => { - worker.once('resumed', () => { - listener(); - }); - }); - this.getQueue().once('resumed', () => { - listener(); - }); + this.queue.once('resumed', listener); } else { - this.onWorkerInit(worker => { - worker.on('resumed', () => { - listener(); - }); - }); - this.getQueue().on('resumed', () => { - listener(); - }); + this.queue.on('resumed', listener); } break; case 'progress': if (once) { - this.getQueue().once('progress', (job, progress) => { - listener(job, progress); - }); + this.queue.once('progress', listener); } else { - this.getQueue().on('progress', (job, progress) => { - listener(job, progress); - }); + this.queue.on('progress', listener); } break; - case 'stalled': - console.warn(`listening on 'stalled' event is not supported`); - break; case 'waiting': if (once) { - this.getQueue().once('waiting', job => { - listener(job.id, null); - }); + this.queue.once('waiting', listener); } else { - this.getQueue().on('waiting', job => { - listener(job.id, null); - }); + this.queue.on('waiting', listener); } break; case 'global:active': if (once) { - this.getQueueEvents().once('active', ({ jobId, prev }) => { - listener(jobId, prev); - }); + this.getQueueEvents().once('active', listener); } else { - this.getQueueEvents().on('active', ({ jobId, prev }) => { - listener(jobId, prev); - }); + this.getQueueEvents().on('active', listener); } break; case 'global:completed': if (once) { - this.getQueueEvents().once( - 'completed', - ({ jobId, returnvalue, prev }) => { - listener(jobId, returnvalue, prev || 'active'); - }, - ); + this.getQueueEvents().once('completed', listener); } else { - this.getQueueEvents().on( - 'completed', - ({ jobId, returnvalue, prev }) => { - listener(jobId, returnvalue, prev || 'active'); - }, - ); + this.getQueueEvents().on('completed', listener); } break; case 'global:drained': @@ -1222,190 +737,179 @@ export class Queue3 extends EventEmitter { break; case 'global:failed': if (once) { - this.getQueueEvents().once( - 'failed', - ({ jobId, failedReason, prev }) => { - listener(jobId, failedReason, prev || 'active'); - }, - ); + this.getQueueEvents().once('failed', listener); } else { - this.getQueueEvents().on( - 'failed', - ({ jobId, failedReason, prev }) => { - listener(jobId, failedReason, prev || 'active'); - }, - ); + this.getQueueEvents().on('failed', listener); } break; case 'global:paused': if (once) { - this.getQueueEvents().once('paused', () => { - listener(); - }); + this.getQueueEvents().once('paused', listener); } else { - this.getQueueEvents().on('paused', () => { - listener(); - }); + this.getQueueEvents().on('paused', listener); } break; - case 'global:progress': - console.warn(`listening on 'global:progress' event is not supported`); - break; case 'global:resumed': if (once) { - this.getQueueEvents().once('resumed', () => { - listener(); - }); + this.getQueueEvents().once('resumed', listener); } else { - this.getQueueEvents().on('resumed', () => { - listener(); - }); + this.getQueueEvents().on('resumed', listener); } break; - case 'global:stalled': - console.warn(`listening on 'global:stalled' event is not supported`); - break; case 'global:waiting': if (once) { - this.getQueueEvents().once('waiting', ({ jobId }) => { - listener(jobId, null); - }); + this.getQueueEvents().once('waiting', listener); } else { - this.getQueueEvents().on('waiting', ({ jobId }) => { - listener(jobId, null); - }); + this.getQueueEvents().on('waiting', listener); } break; default: - console.warn(`Listening on '${String(event)}' event is not supported`); + throw new Error( + `Listening on '${String(event)}' event is not supported`, + ); } return this; } - private static createProcessor(queue: Queue3): Processor { - return (job: Job): Promise => { - const name = job.name || Queue3.DEFAULT_JOB_NAME; - const handler = queue.handlers[name] || queue.handlers['*']; - if (!handler) { - throw new Error('Missing process handler for job type ' + name); - } - - return new Promise((resolve, reject) => { - if (handler.length > 1) { - const done = (err: any, res: any) => { - if (err) { - reject(err); - } - resolve(res); - }; - handler.apply(null, [job, done]); + detachListener( + event: string | symbol, + listener?: (...args: any[]) => void, + ): this { + switch (event) { + case 'active': + if (this.worker) { + if (listener) { + this.worker.removeListener('active', listener); + } else { + this.worker.removeAllListeners('active'); + } + } + break; + case 'completed': + if (this.worker) { + if (listener) { + this.worker.removeListener('completed', listener); + } else { + this.worker.removeAllListeners('completed'); + } + } + break; + case 'drained': + if (this.worker) { + if (listener) { + this.worker.removeListener('drained', listener); + } else { + this.worker.removeAllListeners('drained'); + } + } + break; + case 'failed': + if (this.worker) { + if (listener) { + this.worker.removeListener('failed', listener); + } else { + this.worker.removeAllListeners('failed'); + } + } + break; + case 'paused': + if (listener) { + this.queue.removeListener('paused', listener); + } else { + this.queue.removeAllListeners('paused'); + } + break; + case 'resumed': + if (listener) { + this.queue.removeListener('resumed', listener); } else { - try { - return resolve(handler.apply(null, [job])); - } catch (err) { - return reject(err); + this.queue.removeAllListeners('resumed'); + } + break; + case 'progress': + if (listener) { + this.queue.removeListener('progress', listener); + } else { + this.queue.removeAllListeners('progress'); + } + break; + case 'waiting': + if (listener) { + this.queue.removeListener('waiting', listener); + } else { + this.queue.removeAllListeners('waiting'); + } + break; + case 'global:active': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('active', listener); + } else { + this.queueEvents.removeAllListeners('active'); } } - }); - }; + break; + case 'global:completed': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('completed', listener); + } else { + this.queueEvents.removeAllListeners('completed'); + } + } + break; + case 'global:drained': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('drained', listener); + } else { + this.queueEvents.removeAllListeners('drained'); + } + } + break; + case 'global:failed': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('failed', listener); + } else { + this.queueEvents.removeAllListeners('failed'); + } + } + break; + case 'global:paused': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('paused', listener); + } else { + this.queueEvents.removeAllListeners('paused'); + } + } + break; + case 'global:resumed': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('resumed', listener); + } else { + this.queueEvents.removeAllListeners('resumed'); + } + } + break; + case 'global:waiting': + if (this.queueEvents) { + if (listener) { + this.queueEvents.removeListener('waiting', listener); + } else { + this.queueEvents.removeAllListeners('waiting'); + } + } + break; + default: + break; + } + return this; } } -export interface RateLimiter3 { - /** Max numbers of jobs processed */ - max: number; - /** Per duration in milliseconds */ - duration: number; - /** When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue */ - bounceBack?: boolean; -} - -export interface QueueOptions3 { - /** - * Options passed directly to the `ioredis` constructor - */ - redis?: IORedis.RedisOptions; - - /** - * When specified, the `Queue` will use this function to create new `ioredis` client connections. - * This is useful if you want to re-use connections or connect to a Redis cluster. - */ - createClient?( - type: 'client' | 'subscriber' | 'bclient', - redisOpts?: IORedis.RedisOptions, - ): IORedis.Redis | IORedis.Cluster; - - /** - * Prefix to use for all redis keys - */ - prefix?: string; - - settings?: AdvancedSettings3; - - limiter?: RateLimiter3; - - defaultJobOptions?: JobOptions3; -} - -export interface AdvancedSettings3 { - /** - * Key expiration time for job locks - */ - lockDuration?: number; - - /** - * Interval in milliseconds on which to acquire the job lock. - */ - lockRenewTime?: number; - - /** - * How often check for stalled jobs (use 0 for never checking) - */ - stalledInterval?: number; - - /** - * Max amount of times a stalled job will be re-processed - */ - maxStalledCount?: number; - - /** - * Poll interval for delayed jobs and added jobs - */ - guardInterval?: number; - - /** - * Delay before processing next job in case of internal error - */ - retryProcessDelay?: number; - - /** - * Define a custom backoff strategy - */ - backoffStrategies?: { - [key: string]: (attemptsMade: number, err: Error) => number; - }; - - /** - * A timeout for when the queue is in `drained` state (empty waiting for jobs). - * It is used when calling `queue.getNextJob()`, which will pass it to `.brpoplpush` on the Redis client. - */ - drainDelay?: number; -} - -export type DoneCallback3 = (error?: Error | null, value?: any) => void; - -export type ProcessCallbackFunction3 = ( - job: Job, - done: DoneCallback3, -) => void; -export type ProcessPromiseFunction3 = (job: Job) => Promise; - -export type JobStatus3 = - | 'completed' - | 'waiting' - | 'active' - | 'delayed' - | 'failed'; export type JobStatusClean3 = | 'completed' | 'wait' @@ -1414,130 +918,6 @@ export type JobStatusClean3 = | 'paused' | 'failed'; -export interface BackoffOptions3 { - /** - * Backoff type, which can be either `fixed` or `exponential` - */ - type: string; - - /** - * Backoff delay, in milliseconds - */ - delay?: number; -} - -export interface RepeatOptions3 { - /** - * Timezone - */ - tz?: string; - - /** - * End date when the repeat job should stop repeating - */ - endDate?: Date | string | number; - - /** - * Number of times the job should repeat at max. - */ - limit?: number; -} - -export interface CronRepeatOptions3 extends RepeatOptions3 { - /** - * Cron pattern specifying when the job should execute - */ - cron: string; - - /** - * Start date when the repeat job should start repeating (only with cron). - */ - startDate?: Date | string | number; -} - -export interface EveryRepeatOptions3 extends RepeatOptions3 { - /** - * Repeat every millis (cron setting cannot be used together with this setting.) - */ - every: number; -} - -export interface JobOptions3 { - /** - * Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). - * Note that using priorities has a slight impact on performance, so do not use it if not required - */ - priority?: number; - - /** - * An amount of miliseconds to wait until this job can be processed. - * Note that for accurate delays, both server and clients should have their clocks synchronized. [optional] - */ - delay?: number; - - /** - * The total number of attempts to try the job until it completes - */ - attempts?: number; - - /** - * Repeat job according to a cron specification - */ - repeat?: CronRepeatOptions3 | EveryRepeatOptions3; - - /** - * Backoff setting for automatic retries if the job fails - */ - backoff?: number | BackoffOptions3; - - /** - * A boolean which, if true, adds the job to the right - * of the queue instead of the left (default false) - */ - lifo?: boolean; - - /** - * The number of milliseconds after which the job should be fail with a timeout error - */ - timeout?: number; - - /** - * Override the job ID - by default, the job ID is a unique - * integer, but you can use this setting to override it. - * If you use this option, it is up to you to ensure the - * jobId is unique. If you attempt to add a job with an id that - * already exists, it will not be added. - */ - jobId?: string; - - /** - * A boolean which, if true, removes the job when it successfully completes. - * When a number, it specifies the amount of jobs to keep. - * Default behavior is to keep the job in the failed set. - */ - removeOnComplete?: boolean | number; - - /** - * A boolean which, if true, removes the job when it fails after all attempts. - * When a number, it specifies the amount of jobs to keep. - * Default behavior is to keep the job in the completed set. - */ - removeOnFail?: boolean | number; - - /** - * Limits the amount of stack trace lines that will be recorded in the stacktrace. - */ - stackTraceLimit?: number; -} - -export interface JobCounts3 { - active: number; - completed: number; - failed: number; - delayed: number; - waiting: number; -} - export interface JobInformation3 { key: string; name: string; @@ -1580,321 +960,3 @@ export type CleanedEventCallback3 = ( export type RemovedEventCallback3 = (job: Job) => void; export type WaitingEventCallback3 = (jobId: string) => void; - -class Utils { - static redisOptsFromUrl(urlString: string) { - const redisOpts: IORedis.RedisOptions = {}; - try { - const redisUrl = url.parse(urlString); - redisOpts.port = parseInt(redisUrl.port) || 6379; - redisOpts.host = redisUrl.hostname; - redisOpts.db = redisUrl.pathname - ? parseInt(redisUrl.pathname.split('/')[1]) - : 0; - if (redisUrl.auth) { - redisOpts.password = redisUrl.auth.split(':')[1]; - } - } catch (e) { - throw new Error(e.message); - } - return redisOpts; - } - - static convertToQueueBaseOptions(source: QueueOptions3): QueueBaseOptions { - if (!source) { - return; - } - - const target: QueueBaseOptions = {}; - - if (source.redis !== undefined) { - const client = new IORedis(source.redis); - target.connection = client; - target.client = client; - } - - if (source.prefix !== undefined) { - target.prefix = source.prefix; - } - - return target; - } - - static convertToQueueOptions(source: QueueOptions3): QueueOptions { - if (!source) { - return; - } - - const target: QueueOptions = Utils.convertToQueueBaseOptions(source); - - if (source.defaultJobOptions) { - target.defaultJobOptions = Utils.convertToJobsOpts( - source.defaultJobOptions, - ); - } - - if (source.createClient) { - target.createClient = Utils.adaptToCreateClient( - source.createClient, - source.redis, - ); - } - - return target; - } - - static convertToQueueEventsOptions( - source: QueueOptions3, - ): QueueEventsOptions { - if (!source) { - return; - } - - const target: QueueEventsOptions = Utils.convertToQueueBaseOptions(source); - - // target.lastEventId = undefined; - // target.blockingTimeout = undefined; - - return target; - } - - static convertToQueueKeeperOptions( - source: QueueOptions3, - ): QueueKeeperOptions { - if (!source) { - return; - } - - const target: QueueKeeperOptions = Utils.convertToQueueBaseOptions(source); - - if (source.settings) { - if (source.settings.maxStalledCount !== undefined) { - target.maxStalledCount = source.settings.maxStalledCount; - } - - if (source.settings.stalledInterval !== undefined) { - target.stalledInterval = source.settings.stalledInterval; - } - } - - return target; - } - - static convertToJobsOpts(source: JobOptions3): JobsOpts { - if (!source) { - return; - } - - const target: JobsOpts = {}; - if ((source as any).timestamp !== undefined) { - target.timestamp = (source as any).timestamp; - } - if (source.priority !== undefined) { - target.priority = source.priority; - } - if (source.delay !== undefined) { - target.delay = source.delay; - } - if (source.attempts !== undefined) { - target.attempts = source.attempts; - } - if (source.repeat !== undefined) { - target.repeat = Utils.convertToRepeatOpts(source.repeat); - } - - if (source.backoff !== undefined) { - if (typeof source.backoff === 'number') { - target.backoff = source.backoff; - } else { - target.backoff = Utils.convertToBackoffOpts(source.backoff); - } - } - - if (source.lifo !== undefined) { - target.lifo = source.lifo; - } - if (source.timeout !== undefined) { - target.timeout = source.timeout; - } - if (source.jobId !== undefined) { - target.jobId = source.jobId; - } - - if (source.removeOnComplete !== undefined) { - target.removeOnComplete = source.removeOnComplete; - } - if (source.removeOnFail !== undefined) { - target.removeOnFail = source.removeOnFail; - } - if (source.stackTraceLimit !== undefined) { - target.stackTraceLimit = source.stackTraceLimit; - } - - return target; - } - - static convertToRepeatOpts( - source: CronRepeatOptions3 | EveryRepeatOptions3, - ): RepeatOpts { - if (!source) { - return; - } - - const target: RepeatOpts = {}; - - if ((source as CronRepeatOptions3).cron !== undefined) { - target.cron = (source as CronRepeatOptions3).cron; - } - if ((source as CronRepeatOptions3).tz !== undefined) { - target.tz = (source as CronRepeatOptions3).tz; - } - if ((source as CronRepeatOptions3).startDate !== undefined) { - target.startDate = (source as CronRepeatOptions3).startDate; - } - if ((source as CronRepeatOptions3).endDate !== undefined) { - target.endDate = (source as CronRepeatOptions3).endDate; - } - if ((source as EveryRepeatOptions3).limit !== undefined) { - target.limit = (source as EveryRepeatOptions3).limit; - } - if ((source as EveryRepeatOptions3).every !== undefined) { - target.every = (source as EveryRepeatOptions3).every; - } - // target.count = undefined; - // target.prevMillis = undefined; - - return target; - } - - static convertToBackoffOpts(source: BackoffOptions3): BackoffOpts { - if (!source) { - return; - } - - const target: BackoffOpts = { type: undefined, delay: undefined }; - - if (source.type !== undefined) { - target.type = source.type; - } - if (source.delay !== undefined) { - target.delay = source.delay; - } - - return target; - } - - static convertToWorkerOptions(source: QueueOptions3): WorkerOptions { - if (!source) { - return; - } - const target: WorkerOptions = Utils.convertToQueueBaseOptions(source); - - // target.concurrency = undefined; - if (source.limiter !== undefined) { - target.limiter = Utils.convertToRateLimiterOpts(source.limiter); - } - // target.skipDelayCheck = undefined; - if (source.settings && source.settings.drainDelay !== undefined) { - target.drainDelay = source.settings.drainDelay; - } - - // target.visibilityWindow = undefined; - if (source.settings) { - target.settings = Utils.convertToAdvancedOpts(source.settings); - } - - return target; - } - - static convertToRateLimiterOpts(source: RateLimiter3): RateLimiterOpts { - if (!source) { - return; - } - - const target: RateLimiterOpts = { max: undefined, duration: undefined }; - - if (source.max !== undefined) { - target.max = source.max; - } - if (source.duration !== undefined) { - target.duration = source.duration; - } - - if (source.bounceBack !== undefined) { - console.warn('bounceBack option is not supported'); - } - - return target; - } - - static convertToAdvancedOpts(source: AdvancedSettings3): AdvancedOpts { - if (!source) { - return; - } - - const target: AdvancedOpts = {}; - - if (source.lockDuration !== undefined) { - target.lockDuration = source.lockDuration; - } - if (source.stalledInterval !== undefined) { - target.stalledInterval = source.stalledInterval; - } - if (source.maxStalledCount !== undefined) { - target.maxStalledCount = source.maxStalledCount; - } - if (source.guardInterval !== undefined) { - target.guardInterval = source.guardInterval; - } - if (source.retryProcessDelay !== undefined) { - target.retryProcessDelay = source.retryProcessDelay; - } - if (source.backoffStrategies !== undefined) { - target.backoffStrategies = source.backoffStrategies; - } - if (source.drainDelay !== undefined) { - target.drainDelay = source.drainDelay; - } - if (source.lockRenewTime !== undefined) { - console.warn('lockRenewTime option is not supported'); - } - - return target; - } - - static adaptToCreateClient( - createClient: ( - type: 'client' | 'subscriber' | 'bclient', - redisOpts?: IORedis.RedisOptions, - ) => IORedis.Redis | IORedis.Cluster, - redis: IORedis.RedisOptions, - ): (type: ClientType) => IORedis.Redis { - if (!createClient) { - return; - } - - return type => { - switch (type) { - case ClientType.blocking: - return createClient('bclient', redis) as IORedis.Redis; - case ClientType.normal: - return createClient('client', redis) as IORedis.Redis; - default: - return undefined; - } - }; - } - - static parseTypeArg(args: string[] | string): string[] { - const types = _.chain([]) - .concat(args) - .join(',') - .split(/\s*,\s*/g) - .compact() - .value(); - - return types.length - ? types - : ['waiting', 'active', 'completed', 'failed', 'delayed', 'paused']; - } -} diff --git a/src/test/test_compat.ts b/src/test/test_compat.ts index 0f2249fe9b..ce3e59b926 100644 --- a/src/test/test_compat.ts +++ b/src/test/test_compat.ts @@ -25,7 +25,7 @@ describe('Compat', function() { beforeEach(async function() { queueName = 'test-' + v4(); - queue = new Queue3(queueName, 'redis://127.0.0.1:6379/0'); + queue = new Queue3(queueName); }); afterEach(async function() { @@ -34,8 +34,8 @@ describe('Compat', function() { }); it('should get waiting jobs', async function() { - await queue.add('test', { foo: 'bar' }); - await queue.add('test', { baz: 'qux' }); + await queue.append('test', { foo: 'bar' }); + await queue.append('test', { baz: 'qux' }); const jobs = await queue.getWaiting(); expect(jobs).to.be.a('array'); @@ -47,8 +47,8 @@ describe('Compat', function() { it('should get paused jobs', async function() { await queue.pause(); await Promise.all([ - queue.add('test', { foo: 'bar' }), - queue.add('test', { baz: 'qux' }), + queue.append('test', { foo: 'bar' }), + queue.append('test', { baz: 'qux' }), ]); const jobs = await queue.getWaiting(); expect(jobs).to.be.a('array'); @@ -69,13 +69,13 @@ describe('Compat', function() { }; }); - await queue.add('test', { foo: 'bar' }); - await queue.process('test', processor); + await queue.append('test', { foo: 'bar' }); + await queue.process(processor); await processing; }); it('should get completed jobs', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); let counter = 2; @@ -89,12 +89,12 @@ describe('Compat', function() { } }); - queue.add('test', { foo: 'bar' }); - queue.add('test', { baz: 'qux' }); + queue.append('test', { foo: 'bar' }); + queue.append('test', { baz: 'qux' }); }); it('should get failed jobs', function(done) { - queue.process('test', async job => { + queue.process(async job => { throw new Error('Forced error'); }); @@ -110,12 +110,12 @@ describe('Compat', function() { } }); - queue.add('test', { foo: 'bar' }); - queue.add('test', { baz: 'qux' }); + queue.append('test', { foo: 'bar' }); + queue.append('test', { baz: 'qux' }); }); it('should return all completed jobs when not setting start/end', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); queue.on( 'completed', @@ -139,13 +139,13 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); - queue.add('test', { foo: 3 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); + queue.append('test', { foo: 3 }); }); it('should return all failed jobs when not setting start/end', function(done) { - queue.process('test', async job => { + queue.process(async job => { throw new Error('error'); }); @@ -172,13 +172,13 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); - queue.add('test', { foo: 3 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); + queue.append('test', { foo: 3 }); }); it('should return subset of jobs when setting positive range', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); queue.on( 'completed', @@ -201,13 +201,13 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); - queue.add('test', { foo: 3 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); + queue.append('test', { foo: 3 }); }); it('should return subset of jobs when setting a negative range', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); queue.on( 'completed', @@ -227,13 +227,13 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); - queue.add('test', { foo: 3 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); + queue.append('test', { foo: 3 }); }); it('should return subset of jobs when range overflows', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); queue.on( 'completed', @@ -253,18 +253,18 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); - queue.add('test', { foo: 3 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); + queue.append('test', { foo: 3 }); }); it('should return jobs for multiple types', function(done) { let counter = 0; - queue.process('test', async job => { + queue.process(async job => { counter++; if (counter == 2) { - await queue.add('test', { foo: 3 }); + await queue.append('test', { foo: 3 }); return queue.pause(); } }); @@ -283,8 +283,8 @@ describe('Compat', function() { }), ); - queue.add('test', { foo: 1 }); - queue.add('test', { foo: 2 }); + queue.append('test', { foo: 1 }); + queue.append('test', { foo: 2 }); }); }); @@ -314,7 +314,7 @@ describe('Compat', function() { done(); }); - queue.add('test', { foo: 'bar' }); + queue.append('test', { foo: 'bar' }); }); it('should emit global waiting event when a job has been added', function(done) { @@ -322,11 +322,11 @@ describe('Compat', function() { done(); }); - queue.add('test', { foo: 'bar' }); + queue.append('test', { foo: 'bar' }); }); it('emits drained and global:drained event when all jobs have been processed', function(done) { - queue.process('test', async job => {}); + queue.process(async job => {}); const drainedCallback = after(2, async function() { const jobs = await queue.getJobCountByTypes('completed'); @@ -337,20 +337,20 @@ describe('Compat', function() { queue.once('drained', drainedCallback); queue.once('global:drained', drainedCallback); - queue.add('test', { foo: 'bar' }); - queue.add('test', { foo: 'baz' }); + queue.append('test', { foo: 'bar' }); + queue.append('test', { foo: 'baz' }); }); it('should emit an event when a job becomes active', function(done) { - queue.add('test', {}); + queue.append('test', {}); + + queue.process(async () => {}); queue.once('active', function() { queue.once('completed', async function() { done(); }); }); - - queue.process('test', async () => {}); }); it('should listen to global events', function(done) { @@ -368,8 +368,8 @@ describe('Compat', function() { done(); }); - queue.add('test', {}); - queue.process('test', async () => {}); + queue.append('test', {}); + queue.process(async () => {}); }); }); @@ -408,12 +408,12 @@ describe('Compat', function() { // }; // }); // - // await queue.process('test', process); + // await queue.process(process); // // await queue.pause(); // isPaused = true; - // await queue.add('test', { foo: 'paused' }); - // await queue.add('test', { foo: 'paused' }); + // await queue.append('test', { foo: 'paused' }); + // await queue.append('test', { foo: 'paused' }); // isPaused = false; // await queue.resume(); // @@ -448,10 +448,10 @@ describe('Compat', function() { }; }); - await queue.process('test', process); + await queue.process(process); - queue.add('test', { foo: 'paused' }); - queue.add('test', { foo: 'paused' }); + queue.append('test', { foo: 'paused' }); + queue.append('test', { foo: 'paused' }); queue.on('global:paused', async () => { isPaused = false; @@ -470,7 +470,7 @@ describe('Compat', function() { let process; const processPromise = new Promise(resolve => { process = async (job: Job) => { - expect(queue.isPaused()).to.be.eql(false); + expect(queue.isWorkerPaused()).to.be.eql(false); counter--; if (counter === 0) { await queue.close(); @@ -479,20 +479,20 @@ describe('Compat', function() { }; }); - await queue.process('test', process); - await queue.pause(true); + await queue.process(process); + await queue.pauseWorker(); // Add the worker after the queue is in paused mode since the normal behavior is to pause // it after the current lock expires. This way, we can ensure there isn't a lock already // to test that pausing behavior works. - await queue.add('test', { foo: 'paused' }); - await queue.add('test', { foo: 'paused' }); + await queue.append('test', { foo: 'paused' }); + await queue.append('test', { foo: 'paused' }); expect(counter).to.be.eql(2); - expect(queue.isPaused()).to.be.eql(true); + expect(queue.isWorkerPaused()).to.be.eql(true); - await queue.resume(true); + await queue.resumeWorker(); return processPromise; }); @@ -506,11 +506,11 @@ describe('Compat', function() { // }; // }); // - // await queue.process('test', process); + // await queue.process(process); // // const jobs = []; // for (let i = 0; i < 10; i++) { - // jobs.push(queue.add('test', i)); + // jobs.push(queue.append('test', i)); // } // // // @@ -529,7 +529,7 @@ describe('Compat', function() { // expect(paused).to.be.eql(9); // await Promise.all([active, paused]); // - // await queue.add('test', {}); + // await queue.append('test', {}); // // active = await queue.getJobCountByTypes('active'); // expect(active).to.be.eql(0); @@ -563,10 +563,10 @@ describe('Compat', function() { const worker2 = new Worker(queueName, process2); await worker2.waitUntilReady(); - queue.add('test', 1); - queue.add('test', 2); - queue.add('test', 3); - queue.add('test', 4); + queue.append('test', 1); + queue.append('test', 2); + queue.append('test', 3); + queue.append('test', 4); await Promise.all([startProcessing1, startProcessing2]); await Promise.all([worker1.pause(), worker2.pause()]); @@ -589,12 +589,12 @@ describe('Compat', function() { // }; // }); // - // await queue.process('test', process); + // await queue.process(process); // - // await queue.add('test', 1); + // await queue.append('test', 1); // await startProcessing; // await queue.pause(true); - // await queue.add('test', 2); + // await queue.append('test', 2); // // const count = await queue.getJobCounts('active', 'waiting', 'completed'); // expect(count.active).to.be.eql(0); @@ -603,9 +603,9 @@ describe('Compat', function() { // }); it('pauses fast when queue is drained', async function() { - await queue.process('test', async () => {}); + await queue.process(async () => {}); - await queue.add('test', {}); + await queue.append('test', {}); return new Promise((resolve, reject) => { queue.on('global:drained', async () => {