diff --git a/lib/commands/addJob-6.lua b/lib/commands/addJob-6.lua index dda504810..dbb28111e 100644 --- a/lib/commands/addJob-6.lua +++ b/lib/commands/addJob-6.lua @@ -51,6 +51,7 @@ else jobId = ARGV[2] jobIdKey = ARGV[1] .. jobId if rcall("EXISTS", jobIdKey) == 1 then + rcall("PUBLISH", ARGV[1] .. "duplicated", jobId) return jobId .. "" -- convert to string end end diff --git a/lib/queue.js b/lib/queue.js index 67f94e895..216918e88 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -275,6 +275,7 @@ const Queue = function Queue(name, url, opts) { 'repeat', 'limiter', 'drained', + 'duplicated', 'progress' ], key => { @@ -416,6 +417,7 @@ Queue.prototype._setupQueueEventListeners = function() { const completedKey = this.keys.completed; const failedKey = this.keys.failed; const drainedKey = this.keys.drained; + const duplicatedKey = this.keys.duplicated; const pmessageHandler = (pattern, channel, message) => { const keyAndToken = channel.split('@'); @@ -437,6 +439,9 @@ Queue.prototype._setupQueueEventListeners = function() { } utils.emitSafe(this, 'global:stalled', message); break; + case duplicatedKey: + utils.emitSafe(this, 'global:duplicated', message); + break; } }; @@ -523,7 +528,7 @@ Queue.prototype._registerEvent = function(eventName) { .isRedisReady(this.eclient) .then(() => { const channel = this.toKey(_eventName); - if (['active', 'waiting', 'stalled'].indexOf(_eventName) !== -1) { + if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) { return (this.registeredEvents[_eventName] = this.eclient.psubscribe( channel + '*' )); diff --git a/test/test_queue.js b/test/test_queue.js index 7b8c663a8..647a8797f 100644 --- a/test/test_queue.js +++ b/test/test_queue.js @@ -1097,6 +1097,27 @@ describe('Queue', () => { } }); + describe('when job has been added again', () => { + it('emits duplicated event', async () => { + queue.process( + async () => { + await delay(50); + await queue.add({ foo: 'bar' }, { jobId: 'a1' }); + await delay(50); + } + ); + + await queue.add({ foo: 'bar' }, { jobId: 'a1' }); + + await new Promise(resolve => { + queue.once('global:duplicated', (jobId) => { + expect(jobId).to.be.equal('a1'); + resolve(); + }); + }); + }); + }); + it('process a job that updates progress', done => { queue.process((job, jobDone) => { expect(job.data.foo).to.be.equal('bar');