Skip to content

Commit

Permalink
feat: check if job state update was successful before running a job
Browse files Browse the repository at this point in the history
this stops processing jobs when they got removed in the meantime from the database
  • Loading branch information
simllll committed Mar 21, 2022
1 parent e6b038c commit 606e141
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 3 deletions.
10 changes: 8 additions & 2 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,14 @@ export class Job<DATA = unknown | void> {
log('[%s:%s] has failed [%s]', this.attrs.name, this.attrs._id, error.message);
} finally {
this.attrs.lockedAt = undefined;
await this.agenda.db.saveJobState(this);
log('[%s:%s] was saved successfully to MongoDB', this.attrs.name, this.attrs._id);
try {
await this.agenda.db.saveJobState(this);
log('[%s:%s] was saved successfully to MongoDB', this.attrs.name, this.attrs._id);
} catch (err) {
// in case this fails, we ignore it
// this can e.g. happen if the job gets removed during the execution
log('[%s:%s] was not saved to MongoDB', this.attrs.name, this.attrs._id, err);
}

this.agenda.emit('complete', this);
this.agenda.emit(`complete:${this.attrs.name}`, this);
Expand Down
8 changes: 7 additions & 1 deletion src/JobDbRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,18 @@ export class JobDbRepository {

log('[job %s] save job state: \n%O', id, $set);

await this.collection.updateOne(
const result = await this.collection.updateOne(
{ _id: id, name: job.attrs.name },
{
$set
}
);

if (!result.acknowledged || result.matchedCount !== 1) {
throw new Error(
`job ${id} (name: ${job.attrs.name}) cannot be updated in the database, maybe it does not exist anymore?`
);
}
}

/**
Expand Down
40 changes: 40 additions & 0 deletions test/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1601,4 +1601,44 @@ describe('Job', () => {

expect(await job.isRunning()).to.be.equal(true);
});

it('should not run job if is has been removed', async () => {
let executed = false;
agenda.define('test', async () => {
executed = true;
});

const job = new Job(agenda, {
name: 'test',
type: 'normal'
});
job.schedule('in 1 second');
await job.save();

await agenda.start();

// wait till it's locked (Picked up by the event processor)
const jobStarted = await agenda.db.getJobs({ name: 'test' });
expect(jobStarted[0].lockedAt).to.not.equal(null);

await job.remove();

let error;
const completed = new Promise<void>(resolve => {
agenda.on('error', err => {
error = err;
resolve();
});
});

await Promise.race([
new Promise(resolve => {
setTimeout(resolve, 1000);
}),
completed
]);

expect(executed).to.be.equal(false);
expect(error?.message).to.includes('(name: test) cannot be updated in the database');
});
});

0 comments on commit 606e141

Please sign in to comment.