Skip to content

Commit

Permalink
fix: simplify unlocking and improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 14, 2020
1 parent 1aa2d4a commit a70f500
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 52 deletions.
22 changes: 14 additions & 8 deletions src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,15 @@ export class JobDbRepository {
return this.collection.countDocuments({ nextRunAt: { $lt: new Date() } });
}

async unlockJob(job) {
await this.collection.updateOne({ _id: job._id }, { $unset: { lockedAt: true } });
}

/**
* Internal method to unlock jobs so that they can be re-run
*/
async unlockJobs(jobIds: ObjectId[]) {
await this.collection.updateMany({ _id: { $in: jobIds } }, { $set: { lockedAt: null } });
await this.collection.updateMany({ _id: { $in: jobIds } }, { $unset: { lockedAt: true } });
}

async lockJob(job: Job): Promise<IJobParameters | undefined> {
Expand Down Expand Up @@ -146,13 +150,15 @@ export class JobDbRepository {
const collection = this.connectOptions.db?.collection || 'agendaJobs';

this.collection = db.collection(collection);
log(
`connected with collection: ${collection}, collection size: ${
typeof this.collection.estimatedDocumentCount === 'function'
? await this.collection.estimatedDocumentCount()
: '?'
}`
);
if (log.enabled) {
log(
`connected with collection: ${collection}, collection size: ${
typeof this.collection.estimatedDocumentCount === 'function'
? await this.collection.estimatedDocumentCount()
: '?'
}`
);
}

if (this.connectOptions.ensureIndex) {
log('attempting index creation');
Expand Down
30 changes: 21 additions & 9 deletions src/JobProcessingQueue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// eslint-disable-next-line prettier/prettier
import type { Job } from './Job';
import { IJobDefinition } from './types/JobDefinition';
import { IJobParameters } from './types/JobParameters';
import type { Agenda } from './index';
/**
* @class
* @param {Object} args - Job Options
Expand All @@ -11,11 +11,7 @@ import { IJobParameters } from './types/JobParameters';
export class JobProcessingQueue {
private _queue: Job[];

constructor(
private definitions: {
[name: string]: IJobDefinition;
}
) {
constructor(private agenda: Agenda) {
this._queue = [];
}

Expand All @@ -36,21 +32,37 @@ export class JobProcessingQueue {
* @param {Job} job job to add to queue
* @returns {undefined}
*/
push(job) {
push(job: Job) {
this._queue.push(job);
}

remove(job: Job) {
let removeJobIndex = this._queue.indexOf(job);
if (removeJobIndex === -1) {
// lookup by id
removeJobIndex = this._queue.findIndex(
j => j.attrs._id?.toString() === job.attrs._id?.toString()
);
}
if (removeJobIndex === -1) {
throw new Error(`cannot find job ${job.attrs._id} in processing queue?`);
}

this._queue.splice(removeJobIndex, 1);
}

/**
* Inserts job in queue where it will be order from left to right in decreasing
* order of nextRunAt and priority (in case of same nextRunAt), if all values
* are even the first jobs to be introduced will have priority
* @param {Job} job job to add to queue
* @returns {undefined}
*/
insert(job) {
insert(job: Job) {
const matchIndex = this._queue.findIndex(element => {
if (
element.attrs.nextRunAt &&
job.attrs.nextRunAt &&
element.attrs.nextRunAt.getTime() <= job.attrs.nextRunAt.getTime()
) {
if (element.attrs.nextRunAt.getTime() === job.attrs.nextRunAt.getTime()) {
Expand Down Expand Up @@ -86,7 +98,7 @@ export class JobProcessingQueue {
| undefined;
}): (Job & { attrs: IJobParameters & { nextRunAt: Date } }) | undefined {
const next = ((Object.keys(this._queue) as unknown) as number[]).reverse().find(i => {
const def = this.definitions[this._queue[i].attrs.name];
const def = this.agenda.definitions[this._queue[i].attrs.name];
const status = jobStatus[this._queue[i].attrs.name];

// check if we have a definition
Expand Down
92 changes: 59 additions & 33 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const log = debug('agenda:jobProcessor');

/**
* Process methods for jobs
*
*
* @param {Job} extraJob job to run immediately
* @returns {undefined}
*/
Expand All @@ -28,7 +30,7 @@ export class JobProcessor {
return {
version,
queueName: this.agenda.name,
queueSize: await this.agenda.db.getQueueSize(),
totalQueueSizeDB: await this.agenda.db.getQueueSize(),
config: {
totalLockLimit: this.totalLockLimit,
maxConcurrency: this.maxConcurrency,
Expand All @@ -43,6 +45,7 @@ export class JobProcessor {
}
])
),
queuedJobs: this.jobQueue.length,
runningJobs: !fullDetails ? this.runningJobs.length : this.runningJobs,
lockedJobs: !fullDetails ? this.lockedJobs.length : this.lockedJobs,
jobsToLock: !fullDetails ? this.jobsToLock.length : this.jobsToLock,
Expand All @@ -52,7 +55,7 @@ export class JobProcessor {

private nextScanAt = new Date();

private jobQueue: JobProcessingQueue = new JobProcessingQueue(this.agenda.definitions);
private jobQueue: JobProcessingQueue = new JobProcessingQueue(this.agenda);

private runningJobs: Job[] = [];

Expand All @@ -77,7 +80,7 @@ export class JobProcessor {
}

stop(): Job[] {
log('stop job processor', this.isRunning);
log.extend('stop')('stop job processor', this.isRunning);
this.isRunning = false;

if (this.processInterval) {
Expand All @@ -93,28 +96,29 @@ export class JobProcessor {
// Make sure an interval has actually been set
// Prevents race condition with 'Agenda.stop' and already scheduled run
if (!this.isRunning) {
log('process: JobProcessor got stopped already, returning', this);
log.extend('process')('JobProcessor got stopped already, returning', this);
return;
}

// Determine whether or not we have a direct process call!
if (!extraJob) {
log('starting to process jobs');
log.extend('process')('starting to process jobs');

// Go through each jobName set in 'Agenda.process' and fill the queue with the next jobs
await Promise.all(
Object.keys(this.agenda.definitions).map(async jobName => {
log('queuing up job to process: [%s]', jobName);
log.extend('process')('queuing up job to process: [%s]', jobName);
await this.jobQueueFilling(jobName);
})
);
this.jobProcessing();
} else if (
this.agenda.definitions[extraJob.attrs.name] &&
// If the extraJob would have been processed in an older scan, process the job immediately
extraJob.attrs.nextRunAt &&
extraJob.attrs.nextRunAt < this.nextScanAt
) {
log(
log.extend('process')(
'[%s:%s] job would have ran by nextScanAt, processing the job immediately',
extraJob.attrs.name
);
Expand Down Expand Up @@ -145,7 +149,7 @@ export class JobProcessor {
shouldLock = false;
}

log(
log.extend('shouldLock')(
'job [%s] lock status: shouldLock = %s',
name,
shouldLock,
Expand Down Expand Up @@ -176,13 +180,13 @@ export class JobProcessor {
async lockOnTheFly() {
// Already running this? Return
if (this.isLockingOnTheFly) {
log('lockOnTheFly() already running, returning');
log.extend('lockOnTheFly')('already running, returning');
return;
}

// Don't have any jobs to run? Return
if (this.jobsToLock.length === 0) {
log('no jobs to current lock on the fly, returning');
log.extend('lockOnTheFly')('no jobs to current lock on the fly, returning');
return;
}

Expand All @@ -198,7 +202,7 @@ export class JobProcessor {
// Jobs that were waiting to be locked will be picked up during a
// future locking interval.
if (!this.shouldLock(job.attrs.name)) {
log('lock limit hit for: [%s]', job.attrs.name);
log.extend('lockOnTheFly')('lock limit hit for: [%s]', job.attrs.name);
this.jobsToLock = [];
return;
}
Expand All @@ -217,18 +221,20 @@ export class JobProcessor {

// Before en-queing job make sure we haven't exceed our lock limits
if (!this.shouldLock(jobToEnqueue.attrs.name)) {
log(
log.extend('lockOnTheFly')(
'lock limit reached while job was locked in database. Releasing lock on [%s]',
jobToEnqueue.attrs.name
);
jobToEnqueue.attrs.lockedAt = undefined;
await jobToEnqueue.save();
this.agenda.db.unlockJob(jobToEnqueue);

this.jobsToLock = [];
return;
}

log('found job [%s] that can be locked on the fly', jobToEnqueue.attrs.name);
log.extend('lockOnTheFly')(
'found job [%s] that can be locked on the fly',
jobToEnqueue.attrs.name
);
this.updateStatus(jobToEnqueue.attrs.name, 'locked', +1);
this.lockedJobs.push(jobToEnqueue);
this.enqueueJob(jobToEnqueue);
Expand All @@ -249,13 +255,16 @@ export class JobProcessor {
definition: IJobDefinition
): Promise<Job | undefined> {
const lockDeadline = new Date(Date.now().valueOf() - definition.lockLifetime);
log('findAndLockNextJob(%s, [Function])', jobName);
log.extend('findAndLockNextJob')('findAndLockNextJob(%s, [Function])', jobName);

// Find ONE and ONLY ONE job and set the 'lockedAt' time so that job begins to be processed
const result = await this.agenda.db.getNextJobToRun(jobName, this.nextScanAt, lockDeadline);

if (result) {
log('found a job available to lock, creating a new job on Agenda with id [%s]', result._id);
log.extend('findAndLockNextJob')(
'found a job available to lock, creating a new job on Agenda with id [%s]',
result._id
);
return new Job(this.agenda, result);
}

Expand All @@ -267,10 +276,10 @@ export class JobProcessor {
* @param {String} name fill a queue with specific job name
* @returns {undefined}
*/
async jobQueueFilling(name) {
private async jobQueueFilling(name) {
// Don't lock because of a limit we have set (lockLimit, etc)
if (!this.shouldLock(name)) {
log('lock limit reached in queue filling for [%s]', name);
log.extend('jobQueueFilling')('lock limit reached in queue filling for [%s]', name);
return;
}

Expand All @@ -296,22 +305,28 @@ export class JobProcessor {

// Before en-queing job make sure we haven't exceed our lock limits
if (!this.shouldLock(name)) {
log('lock limit reached before job was returned. Releasing lock on [%s]', name);
job.attrs.lockedAt = undefined;
await job.save(); // this.saveJob(job);
log.extend('jobQueueFilling')(
'lock limit reached before job was returned. Releasing lock on [%s]',
name
);
this.agenda.db.unlockJob(job);
return;
}

log('[%s:%s] job locked while filling queue', name, job.attrs._id);
log.extend('jobQueueFilling')(
'[%s:%s] job locked while filling queue',
name,
job.attrs._id
);
this.updateStatus(name, 'locked', +1);
this.lockedJobs.push(job);

this.enqueueJob(job);
await this.jobQueueFilling(name);
this.jobProcessing();
// this.jobProcessing();
}
} catch (error) {
log('[%s] job lock failed while filling queue', name, error);
log.extend('jobQueueFilling')('[%s] job lock failed while filling queue', name, error);
}
}

Expand All @@ -331,20 +346,24 @@ export class JobProcessor {
const job = this.jobQueue.returnNextConcurrencyFreeJob(this.jobStatus);

if (job) {
log('[%s:%s] there is a job to process', job.attrs.name, job.attrs._id);
log.extend('jobProcessing')(
'[%s:%s] there is a job to process',
job.attrs.name,
job.attrs._id
);

// If the 'nextRunAt' time is older than the current time, run the job
// Otherwise, setTimeout that gets called at the time of 'nextRunAt'
if (job.attrs.nextRunAt <= now) {
log(
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the past, run the job immediately',
job.attrs.name,
job.attrs._id
);
this.runOrRetry();
} else {
const runIn = job.attrs.nextRunAt.getTime() - now.getTime();
log(
log.extend('jobProcessing')(
'[%s:%s] nextRunAt is in the future, calling setTimeout(%d)',
job.attrs.name,
job.attrs._id,
Expand All @@ -365,7 +384,10 @@ export class JobProcessor {
if (!this.isRunning) {
// const a = new Error();
// console.log('STACK', a.stack);
log('JobProcessor got stopped already while calling runOrRetry, returning!', this);
log.extend('runOrRetry')(
'JobProcessor got stopped already while calling runOrRetry, returning!',
this
);
return;
}

Expand All @@ -389,7 +411,11 @@ export class JobProcessor {
// Remove from local lock
// NOTE: Shouldn't we update the 'lockedAt' value in MongoDB so it can be picked up on restart?
if (job.attrs.lockedAt && job.attrs.lockedAt < lockDeadline) {
log('[%s:%s] job lock has expired, freeing it up', job.attrs.name, job.attrs._id);
log.extend('runOrRetry')(
'[%s:%s] job lock has expired, freeing it up',
job.attrs.name,
job.attrs._id
);
let lockedJobIndex = this.lockedJobs.indexOf(job);
if (lockedJobIndex === -1) {
// lookup by id
Expand All @@ -412,13 +438,13 @@ export class JobProcessor {
this.updateStatus(job.attrs.name, 'running', 1);

try {
log('[%s:%s] processing job', job.attrs.name, job.attrs._id);
log.extend('runOrRetry')('[%s:%s] processing job', job.attrs.name, job.attrs._id);
// CALL THE ACTUAL METHOD TO PROCESS THE JOB!!!
await job.run();

// Job isn't in running jobs so throw an error
if (!this.runningJobs.includes(job)) {
log(
log.extend('runOrRetry')(
'[%s] callback was called, job must have been marked as complete already',
job.attrs._id
);
Expand Down Expand Up @@ -464,7 +490,7 @@ export class JobProcessor {
setImmediate(() => this.jobProcessing());
} else {
// Run the job immediately by putting it on the top of the queue
log(
log.extend('runOrRetry')(
'[%s:%s] concurrency preventing immediate run, pushing job to top of queue',
job.attrs.name,
job.attrs._id
Expand Down
Loading

0 comments on commit a70f500

Please sign in to comment.