Skip to content

Commit

Permalink
Merge branch 'development-4.0' into feat-bull3-compat-api-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
stansv committed Aug 21, 2019
2 parents 7791cda + 9b78d4b commit 70e00ec
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 271 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@
"get-port": "^5.0.0",
"ioredis": "^4.3.0",
"node-uuid": "^1.4.8",
"semver": "^5.6.0"
"semver": "^6.3.0"
}
}
3 changes: 1 addition & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ export class Job {
name: string,
data: any,
opts?: JobsOpts,
jobId?: string,
) {
await queue.waitUntilReady();

const job = new Job(queue, name, data, opts, jobId);
const job = new Job(queue, name, data, opts, opts && opts.jobId);

job.id = await job.addJob(queue.client);

Expand Down
35 changes: 13 additions & 22 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ export class Repeat extends QueueBase {
opts: JobsOpts,
skipCheckExists?: boolean,
) {
let jobId;

await this.waitUntilReady();

const repeatOpts = { ...opts.repeat };
Expand All @@ -36,9 +34,7 @@ export class Repeat extends QueueBase {
const nextMillis = getNextMillis(now, repeatOpts);

if (nextMillis) {
jobId = opts.jobId ? opts.jobId + ':' : ':';

const repeatJobKey = getRepeatKey(name, repeatOpts, jobId);
const repeatJobKey = getRepeatKey(name, repeatOpts);

let repeatableExists = true;

Expand All @@ -57,7 +53,6 @@ export class Repeat extends QueueBase {
name,
nextMillis,
repeatJobKey,
jobId,
{ ...opts, repeat: repeatOpts },
data,
currentCount,
Expand All @@ -70,20 +65,20 @@ export class Repeat extends QueueBase {
name: string,
nextMillis: number,
repeatJobKey: string,
jobId: string,
opts: JobsOpts,
data: any,
currentCount: number,
) {
//
// Generate unique job id for this iteration.
//
const customId = getRepeatJobId(name, jobId, nextMillis, md5(repeatJobKey));
const jobId = getRepeatJobId(name, nextMillis, md5(repeatJobKey));
const now = Date.now();
const delay = nextMillis - now;

const mergedOpts = {
...opts,
jobId,
delay: delay < 0 ? 0 : delay,
timestamp: now,
prevMillis: nextMillis,
Expand All @@ -97,15 +92,14 @@ export class Repeat extends QueueBase {
repeatJobKey,
);

return Job.create(this, name, data, mergedOpts, customId);
return Job.create(this, name, data, mergedOpts);
}

async removeRepeatable(name: string, repeat: RepeatOpts, jobId?: string) {
await this.waitUntilReady();

jobId = jobId ? jobId + ':' : ':';
const repeatJobKey = getRepeatKey(name, repeat, jobId);
const repeatJobId = getRepeatJobId(name, jobId, '', md5(repeatJobKey));
const repeatJobKey = getRepeatKey(name, repeat);
const repeatJobId = getRepeatJobId(name, '', md5(repeatJobKey));
const queueKey = this.keys[''];

return (<any>this.client).removeRepeatable(
Expand Down Expand Up @@ -136,7 +130,7 @@ export class Repeat extends QueueBase {
const data = key.split(':');

return {
key: key,
key,
name: data[0],
id: data[1] || null,
endDate: parseInt(data[2]) || null,
Expand Down Expand Up @@ -177,21 +171,18 @@ export class Repeat extends QueueBase {

function getRepeatJobId(
name: string,
jobId: string,
nextMillis: number | string,
namespace: string,
) {
return 'repeat:' + md5(name + jobId + namespace) + ':' + nextMillis;
return `repeat:${name}:${namespace}:${nextMillis}`;
}

function getRepeatKey(name: string, repeat: RepeatOpts, jobId: string) {
const endDate = repeat.endDate
? new Date(repeat.endDate).getTime() + ':'
: ':';
const tz = repeat.tz ? repeat.tz + ':' : ':';
const suffix = repeat.cron ? tz + repeat.cron : String(repeat.every);
function getRepeatKey(name: string, repeat: RepeatOpts) {
const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
const tz = repeat.tz || '';
const suffix = (repeat.cron ? repeat.cron : String(repeat.every)) || '';

return name + ':' + jobId + endDate + suffix;
return `${name}::${endDate}:${tz}:${suffix}`;
}

function getNextMillis(millis: number, opts: RepeatOpts) {
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/repeat-opts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ export interface RepeatOpts {
limit?: number;
// Repeat every millis (cron setting cannot be used together with this setting.)
every?: number;

// The start value for the repeat iteration count.
count?: number;
prevMillis?: number;
jobId?: string;
}
13 changes: 7 additions & 6 deletions src/test/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { v4 } from 'node-uuid';
import { JobsOpts } from '@src/interfaces';
import { QueueEvents } from '@src/classes/queue-events';
import { Worker } from '@src/classes/worker';
import { after } from 'lodash';

import { delay } from 'bluebird';

Expand All @@ -29,10 +30,9 @@ describe('Job', function() {
});
});

afterEach(function() {
return queue.close().then(function() {
return client.quit();
});
afterEach(async () => {
await queue.close();
return client.quit();
});

describe('.create', function() {
Expand Down Expand Up @@ -271,7 +271,7 @@ describe('Job', function() {
});

const processStarted = new Promise(resolve =>
worker.once('active', resolve),
worker.on('active', after(2, resolve)),
);

const add = (jobId: string, ms = 0) =>
Expand All @@ -280,7 +280,8 @@ describe('Job', function() {
await add('1');
await add('2', 1);
await processStarted;
const job = await add('3', 5000);
const job = await add('3', 2000);

await job.promote();
await add('4', 1);

Expand Down
55 changes: 28 additions & 27 deletions src/test/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,10 @@ describe('repeat', function() {

it('should create multiple jobs if they have the same cron pattern', async function() {
const cron = '*/10 * * * * *';
const customJobIds = ['customjobone', 'customjobtwo'];

await Promise.all([
queue.append(
'test',
{},
{ jobId: customJobIds[0], repeat: { cron: cron } },
),
queue.append(
'test',
{},
{ jobId: customJobIds[1], repeat: { cron: cron } },
),
queue.append('test1', {}, { repeat: { cron: cron } }),
queue.append('test2', {}, { repeat: { cron: cron } }),
]);

const count = await queue.count();
Expand Down Expand Up @@ -99,17 +90,22 @@ describe('repeat', function() {
{},
{ repeat: { cron: crons[3], tz: 'Africa/Accra' } },
),
queue.append(
'fifth',
{},
{ repeat: { every: 5000, tz: 'Europa/Copenhaguen' } },
),
]);
const count = await repeat.getRepeatableCount();
expect(count).to.be.eql(4);
expect(count).to.be.eql(5);

let jobs = await repeat.getRepeatableJobs(0, -1, true);
jobs = await jobs.sort(function(a, b) {
return crons.indexOf(a.cron) - crons.indexOf(b.cron);
});
expect(jobs)
.to.be.and.an('array')
.and.have.length(4)
.and.have.length(5)
.and.to.deep.include({
key: 'first::12345::10 * * * * *',
name: 'first',
Expand Down Expand Up @@ -161,7 +157,7 @@ describe('repeat', function() {
const nextTick = 2 * ONE_SECOND + 500;

await queue.append(
'repeat',
'test',
{ foo: 'bar' },
{ repeat: { cron: '*/2 * * * * *' } },
);
Expand Down Expand Up @@ -201,7 +197,7 @@ describe('repeat', function() {
const worker = new Worker(queueName, async job => {});

await queue.append(
'repeat',
'test',
{ foo: 'bar' },
{
repeat: {
Expand Down Expand Up @@ -390,6 +386,7 @@ describe('repeat', function() {
const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.init();

const numJobs = 3;
const date = new Date('2017-02-07 9:24:00');
let prev: Job;
let counter = 0;
Expand All @@ -403,13 +400,13 @@ describe('repeat', function() {
const processing = new Promise((resolve, reject) => {
processor = async (job: Job) => {
counter++;
if (counter == 7) {
if (counter == numJobs) {
await queue.removeRepeatable('remove', repeat);
this.clock.tick(nextTick);
const delayed = await queue.getDelayed();
expect(delayed).to.be.empty;
resolve();
} else if (counter > 7) {
} else if (counter > numJobs) {
reject(Error('should not repeat more than 7 times'));
}
};
Expand Down Expand Up @@ -498,7 +495,7 @@ describe('repeat', function() {
await queueScheduler.close();
});

it.skip('should not re-add a repeatable job after it has been removed', async function() {
it('should not re-add a repeatable job after it has been removed', async function() {
const queueScheduler = new QueueScheduler(queueName);
await queueScheduler.init();

Expand Down Expand Up @@ -735,24 +732,28 @@ describe('repeat', function() {

const date = new Date('2017-02-07 9:24:00');
this.clock.tick(date.getTime());
const nextTick = 2 * ONE_SECOND + 500;
const nextTick = 1 * ONE_SECOND + 100;

const worker = new Worker(queueName, async job => {});

const waiting = new Promise(resolve => {
const waiting = new Promise((resolve, reject) => {
queueEvents.on('waiting', function({ event, jobId, prev }) {
expect(jobId).to.be.equal(
'repeat:93168b0ea97b55fb5a8325e8c66e4300:' +
(date.getTime() + 2 * ONE_SECOND),
);
resolve();
try {
expect(jobId).to.be.equal(
'repeat:test:16db7a9b166154f5c636abf3c8fe3364:' +
(date.getTime() + 1 * ONE_SECOND),
);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.append(
'repeat',
'test',
{ foo: 'bar' },
{ repeat: { cron: '*/2 * * * * *' } },
{ repeat: { cron: '*/1 * * * * *' } },
);
this.clock.tick(nextTick);

Expand Down
Loading

0 comments on commit 70e00ec

Please sign in to comment.