diff --git a/src/JobDbRepository.ts b/src/JobDbRepository.ts index 90d9666..3aff5b7 100644 --- a/src/JobDbRepository.ts +++ b/src/JobDbRepository.ts @@ -61,11 +61,15 @@ export class JobDbRepository { return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } }); } + async unlockJob(job) { + await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } }); + } + /** * Internal method to unlock jobs so that they can be re-run */ async unlockJobs(jobIds: ObjectId[]) { - await this.collection.updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } }); + await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } }); } async lockJob(job: Job): Promise { @@ -146,13 +150,15 @@ export class JobDbRepository { const collection = this.connectOptions.db?.collection || 'agendaJobs'; this.collection = db.collection(collection); - log( - `connected with collection: ${collection}, collection size: ${ - typeof this.collection.estimatedDocumentCount === 'function' - ? await this.collection.estimatedDocumentCount() - : '?' - }` - ); + if (log.enabled) { + log( + `connected with collection: ${collection}, collection size: ${ + typeof this.collection.estimatedDocumentCount === 'function' + ? await this.collection.estimatedDocumentCount() + : '?' + }` + ); + } if (this.connectOptions.ensureIndex) { log('attempting index creation'); diff --git a/src/JobProcessingQueue.ts b/src/JobProcessingQueue.ts index 710dca5..bfa0a6d 100644 --- a/src/JobProcessingQueue.ts +++ b/src/JobProcessingQueue.ts @@ -1,7 +1,7 @@ // eslint-disable-next-line prettier/prettier import type { Job } from './Job'; -import { IJobDefinition } from './types/JobDefinition'; import { IJobParameters } from './types/JobParameters'; +import type { Agenda } from './index'; /** * @class * @param {Object} args - Job Options @@ -11,11 +11,7 @@ import { IJobParameters } from './types/JobParameters'; export class JobProcessingQueue { private _queue: Job[]; - constructor( - private definitions: { - [name: string]: IJobDefinition; - } - ) { + constructor(private agenda: Agenda) { this._queue = []; } @@ -36,10 +32,25 @@ export class JobProcessingQueue { * @param {Job} job job to add to queue * @returns {undefined} */ - push(job) { + push(job: Job) { this._queue.push(job); } + remove(job: Job) { + let removeJobIndex = this._queue.indexOf(job); + if (removeJobIndex === -1) { + // lookup by id + removeJobIndex = this._queue.findIndex( + j => j.attrs._id?.toString() === job.attrs._id?.toString() + ); + } + if (removeJobIndex === -1) { + throw new Error(`cannot find job ${job.attrs._id} in processing queue?`); + } + + this._queue.splice(removeJobIndex, 1); + } + /** * Inserts job in queue where it will be order from left to right in decreasing * order of nextRunAt and priority (in case of same nextRunAt), if all values @@ -47,10 +58,11 @@ export class JobProcessingQueue { * @param {Job} job job to add to queue * @returns {undefined} */ - insert(job) { + insert(job: Job) { const matchIndex = this._queue.findIndex(element => { if ( element.attrs.nextRunAt && + job.attrs.nextRunAt && element.attrs.nextRunAt.getTime() <= job.attrs.nextRunAt.getTime() ) { if (element.attrs.nextRunAt.getTime() === job.attrs.nextRunAt.getTime()) { @@ -86,7 +98,7 @@ export class JobProcessingQueue { | undefined; }): (Job & { attrs: IJobParameters & { nextRunAt: Date } }) | undefined { const next = ((Object.keys(this._queue) as unknown) as number[]).reverse().find(i => { - const def = this.definitions[this._queue[i].attrs.name]; + const def = this.agenda.definitions[this._queue[i].attrs.name]; const status = jobStatus[this._queue[i].attrs.name]; // check if we have a definition diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index 248f78a..5c79567 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -8,6 +8,8 @@ const log = debug('agenda:jobProcessor'); /** * Process methods for jobs + * + * * @param {Job} extraJob job to run immediately * @returns {undefined} */ @@ -28,7 +30,7 @@ export class JobProcessor { return { version, queueName: this.agenda.name, - queueSize: await this.agenda.db.getQueueSize(), + totalQueueSizeDB: await this.agenda.db.getQueueSize(), config: { totalLockLimit: this.totalLockLimit, maxConcurrency: this.maxConcurrency, @@ -43,6 +45,7 @@ export class JobProcessor { } ]) ), + queuedJobs: this.jobQueue.length, runningJobs: !fullDetails ? this.runningJobs.length : this.runningJobs, lockedJobs: !fullDetails ? this.lockedJobs.length : this.lockedJobs, jobsToLock: !fullDetails ? this.jobsToLock.length : this.jobsToLock, @@ -52,7 +55,7 @@ export class JobProcessor { private nextScanAt = new Date(); - private jobQueue: JobProcessingQueue = new JobProcessingQueue(this.agenda.definitions); + private jobQueue: JobProcessingQueue = new JobProcessingQueue(this.agenda); private runningJobs: Job[] = []; @@ -77,7 +80,7 @@ export class JobProcessor { } stop(): Job[] { - log('stop job processor', this.isRunning); + log.extend('stop')('stop job processor', this.isRunning); this.isRunning = false; if (this.processInterval) { @@ -93,28 +96,29 @@ export class JobProcessor { // Make sure an interval has actually been set // Prevents race condition with 'Agenda.stop' and already scheduled run if (!this.isRunning) { - log('process: JobProcessor got stopped already, returning', this); + log.extend('process')('JobProcessor got stopped already, returning', this); return; } // Determine whether or not we have a direct process call! if (!extraJob) { - log('starting to process jobs'); + log.extend('process')('starting to process jobs'); // Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs await Promise.all( Object.keys(this.agenda.definitions).map(async jobName => { - log('queuing up job to process: [%s]', jobName); + log.extend('process')('queuing up job to process: [%s]', jobName); await this.jobQueueFilling(jobName); }) ); + this.jobProcessing(); } else if ( this.agenda.definitions[extraJob.attrs.name] && // If the extraJob would have been processed in an older scan, process the job immediately extraJob.attrs.nextRunAt && extraJob.attrs.nextRunAt < this.nextScanAt ) { - log( + log.extend('process')( '[%s:%s] job would have ran by nextScanAt, processing the job immediately', extraJob.attrs.name ); @@ -145,7 +149,7 @@ export class JobProcessor { shouldLock = false; } - log( + log.extend('shouldLock')( 'job [%s] lock status: shouldLock = %s', name, shouldLock, @@ -176,13 +180,13 @@ export class JobProcessor { async lockOnTheFly() { // Already running this? Return if (this.isLockingOnTheFly) { - log('lockOnTheFly() already running, returning'); + log.extend('lockOnTheFly')('already running, returning'); return; } // Don't have any jobs to run? Return if (this.jobsToLock.length === 0) { - log('no jobs to current lock on the fly, returning'); + log.extend('lockOnTheFly')('no jobs to current lock on the fly, returning'); return; } @@ -198,7 +202,7 @@ export class JobProcessor { // Jobs that were waiting to be locked will be picked up during a // future locking interval. if (!this.shouldLock(job.attrs.name)) { - log('lock limit hit for: [%s]', job.attrs.name); + log.extend('lockOnTheFly')('lock limit hit for: [%s]', job.attrs.name); this.jobsToLock = []; return; } @@ -217,18 +221,20 @@ export class JobProcessor { // Before en-queing job make sure we haven't exceed our lock limits if (!this.shouldLock(jobToEnqueue.attrs.name)) { - log( + log.extend('lockOnTheFly')( 'lock limit reached while job was locked in database. Releasing lock on [%s]', jobToEnqueue.attrs.name ); - jobToEnqueue.attrs.lockedAt = undefined; - await jobToEnqueue.save(); + this.agenda.db.unlockJob(jobToEnqueue); this.jobsToLock = []; return; } - log('found job [%s] that can be locked on the fly', jobToEnqueue.attrs.name); + log.extend('lockOnTheFly')( + 'found job [%s] that can be locked on the fly', + jobToEnqueue.attrs.name + ); this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1); this.lockedJobs.push(jobToEnqueue); this.enqueueJob(jobToEnqueue); @@ -249,13 +255,16 @@ export class JobProcessor { definition: IJobDefinition ): Promise { const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime); - log('findAndLockNextJob(%s, [Function])', jobName); + log.extend('findAndLockNextJob')('findAndLockNextJob(%s, [Function])', jobName); // Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed const result = await this.agenda.db.getNextJobToRun(jobName, this.nextScanAt, lockDeadline); if (result) { - log('found a job available to lock, creating a new job on Agenda with id [%s]', result._id); + log.extend('findAndLockNextJob')( + 'found a job available to lock, creating a new job on Agenda with id [%s]', + result._id + ); return new Job(this.agenda, result); } @@ -267,10 +276,10 @@ export class JobProcessor { * @param {String} name fill a queue with specific job name * @returns {undefined} */ - async jobQueueFilling(name) { + private async jobQueueFilling(name) { // Don't lock because of a limit we have set (lockLimit, etc) if (!this.shouldLock(name)) { - log('lock limit reached in queue filling for [%s]', name); + log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name); return; } @@ -296,22 +305,28 @@ export class JobProcessor { // Before en-queing job make sure we haven't exceed our lock limits if (!this.shouldLock(name)) { - log('lock limit reached before job was returned. Releasing lock on [%s]', name); - job.attrs.lockedAt = undefined; - await job.save(); // this.saveJob(job); + log.extend('jobQueueFilling')( + 'lock limit reached before job was returned. Releasing lock on [%s]', + name + ); + this.agenda.db.unlockJob(job); return; } - log('[%s:%s] job locked while filling queue', name, job.attrs._id); + log.extend('jobQueueFilling')( + '[%s:%s] job locked while filling queue', + name, + job.attrs._id + ); this.updateStatus(name, 'locked', +1); this.lockedJobs.push(job); this.enqueueJob(job); await this.jobQueueFilling(name); - this.jobProcessing(); + // this.jobProcessing(); } } catch (error) { - log('[%s] job lock failed while filling queue', name, error); + log.extend('jobQueueFilling')('[%s] job lock failed while filling queue', name, error); } } @@ -331,12 +346,16 @@ export class JobProcessor { const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus); if (job) { - log('[%s:%s] there is a job to process', job.attrs.name, job.attrs._id); + log.extend('jobProcessing')( + '[%s:%s] there is a job to process', + job.attrs.name, + job.attrs._id + ); // If the 'nextRunAt' time is older than the current time, run the job // Otherwise, setTimeout that gets called at the time of 'nextRunAt' if (job.attrs.nextRunAt <= now) { - log( + log.extend('jobProcessing')( '[%s:%s] nextRunAt is in the past, run the job immediately', job.attrs.name, job.attrs._id @@ -344,7 +363,7 @@ export class JobProcessor { this.runOrRetry(); } else { const runIn = job.attrs.nextRunAt.getTime() - now.getTime(); - log( + log.extend('jobProcessing')( '[%s:%s] nextRunAt is in the future, calling setTimeout(%d)', job.attrs.name, job.attrs._id, @@ -365,7 +384,10 @@ export class JobProcessor { if (!this.isRunning) { // const a = new Error(); // console.log('STACK', a.stack); - log('JobProcessor got stopped already while calling runOrRetry, returning!', this); + log.extend('runOrRetry')( + 'JobProcessor got stopped already while calling runOrRetry, returning!', + this + ); return; } @@ -389,7 +411,11 @@ export class JobProcessor { // Remove from local lock // NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart? if (job.attrs.lockedAt && job.attrs.lockedAt < lockDeadline) { - log('[%s:%s] job lock has expired, freeing it up', job.attrs.name, job.attrs._id); + log.extend('runOrRetry')( + '[%s:%s] job lock has expired, freeing it up', + job.attrs.name, + job.attrs._id + ); let lockedJobIndex = this.lockedJobs.indexOf(job); if (lockedJobIndex === -1) { // lookup by id @@ -412,13 +438,13 @@ export class JobProcessor { this.updateStatus(job.attrs.name, 'running', 1); try { - log('[%s:%s] processing job', job.attrs.name, job.attrs._id); + log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id); // CALL THE ACTUAL METHOD TO PROCESS THE JOB!!! await job.run(); // Job isn't in running jobs so throw an error if (!this.runningJobs.includes(job)) { - log( + log.extend('runOrRetry')( '[%s] callback was called, job must have been marked as complete already', job.attrs._id ); @@ -464,7 +490,7 @@ export class JobProcessor { setImmediate(() => this.jobProcessing()); } else { // Run the job immediately by putting it on the top of the queue - log( + log.extend('runOrRetry')( '[%s:%s] concurrency preventing immediate run, pushing job to top of queue', job.attrs.name, job.attrs._id diff --git a/src/index.ts b/src/index.ts index 62ec5f3..5e07d6d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -74,7 +74,7 @@ export class Agenda extends EventEmitter { config: { name?: string; defaultConcurrency?: number; - processEvery?: string; + processEvery?: string | number; maxConcurrency?: number; defaultLockLimit?: number; lockLimit?: number; @@ -393,7 +393,7 @@ export class Agenda extends EventEmitter { await this.jobProcessor.process(); - this.on('processJob', this.jobProcessor.process.bind(this.jobProcessor)); + this.on('processJob', job => this.jobProcessor?.process(job)); } async stop(): Promise {