Skip to content

Commit

Permalink
fix: isRunning for non job processor calls
Browse files Browse the repository at this point in the history
  • Loading branch information
simllll committed Oct 25, 2020
1 parent 413f797 commit a5bb965
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
17 changes: 10 additions & 7 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ export class Job<DATA = unknown | void> {
args: Partial<IJobParameters<void>> & {
name: string;
type: 'normal' | 'single';
}
},
byJobProcessor?
);
constructor(
agenda: Agenda,
args: Partial<IJobParameters<DATA>> & {
name: string;
type: 'normal' | 'single';
data: DATA;
}
},
byJobProcessor?
);
constructor(
readonly agenda: Agenda,
args: Partial<IJobParameters<DATA>> & {
name: string;
type: 'normal' | 'single';
data: DATA;
}
},
private readonly byJobProcessor = false
) {
// Set attrs to args
this.attrs = {
Expand Down Expand Up @@ -161,8 +164,7 @@ export class Job<DATA = unknown | void> {
}

async isRunning(): Promise<boolean> {
const definition = this.agenda.definitions[this.attrs.name];
if (!definition || !this.agenda.isActiveJobProcessor()) {
if (!this.byJobProcessor) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
Expand Down Expand Up @@ -197,13 +199,14 @@ export class Job<DATA = unknown | void> {
}

async isDead(): Promise<boolean> {
const definition = this.agenda.definitions[this.attrs.name];
if (!definition || !this.agenda.isActiveJobProcessor()) {
if (!this.byJobProcessor) {
// we have no job definition, therfore we are not the job processor, but a client call
// so we get the real state from database
await this.fetchStatus();
}

const definition = this.agenda.definitions[this.attrs.name];

const lockDeadline = new Date(Date.now() - definition.lockLifetime);

// This means a job has "expired", as in it has not been "touched" within the lockoutTime
Expand Down
4 changes: 2 additions & 2 deletions src/JobProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export class JobProcessor {
);
}

const jobToEnqueue = new Job(this.agenda, resp) as JobWithId;
const jobToEnqueue = new Job(this.agenda, resp, true) as JobWithId;

// Before en-queing job make sure we haven't exceed our lock limits
if (!this.shouldLock(jobToEnqueue.attrs.name)) {
Expand Down Expand Up @@ -291,7 +291,7 @@ export class JobProcessor {
'found a job available to lock, creating a new job on Agenda with id [%s]',
result._id
);
return new Job(this.agenda, result) as JobWithId;
return new Job(this.agenda, result, true) as JobWithId;
}

return undefined;
Expand Down
13 changes: 13 additions & 0 deletions test/job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1479,4 +1479,17 @@ describe('Job', () => {
});
});
});

it('checks database for running job on "client"', async () => {
agenda.define('test', async () => {
await new Promise(resolve => setTimeout(resolve, 30000));
});

const job = await agenda.now('test');
await agenda.start();

await new Promise(resolve => agenda.on('start:test', resolve));

expect(await job.isRunning()).to.be.equal(true);
});
});

0 comments on commit a5bb965

Please sign in to comment.