Skip to content

Commit

Permalink
feat(queue): emit internal duplicated event (#2754)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 30, 2024
1 parent 4cea99e commit 021ab7f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ else
jobId = ARGV[2]
jobIdKey = ARGV[1] .. jobId
if rcall("EXISTS", jobIdKey) == 1 then
rcall("PUBLISH", ARGV[1] .. "duplicated", jobId)
rcall("PUBLISH", ARGV[1] .. "duplicated@" .. ARGV[11], jobId)
return jobId .. "" -- convert to string
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,9 @@ Queue.prototype._setupQueueEventListeners = function() {
utils.emitSafe(this, 'global:stalled', message);
break;
case duplicatedKey:
if (this.token === token) {
utils.emitSafe(this, 'duplicated', message);
}
utils.emitSafe(this, 'global:duplicated', message);
break;
}
Expand Down Expand Up @@ -510,7 +513,7 @@ Queue.prototype._setupQueueEventListeners = function() {
};

Queue.prototype._registerEvent = function(eventName) {
const internalEvents = ['waiting', 'delayed'];
const internalEvents = ['waiting', 'delayed', 'duplicated'];

if (
eventName.startsWith('global:') ||
Expand Down
21 changes: 20 additions & 1 deletion test/test_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ describe('Queue', () => {
});

describe('when job has been added again', () => {
it('emits duplicated event', async () => {
it('emits global duplicated event', async () => {
queue.process(
async () => {
await delay(50);
Expand All @@ -1116,6 +1116,25 @@ describe('Queue', () => {
});
});
});

it('emits duplicated event', async () => {
queue.process(
async () => {
await delay(50);
await queue.add({ foo: 'bar' }, { jobId: 'a1' });
await delay(50);
}
);

await queue.add({ foo: 'bar' }, { jobId: 'a1' });

await new Promise(resolve => {
queue.once('duplicated', (jobId) => {
expect(jobId).to.be.equal('a1');
resolve();
});
});
});
});

it('process a job that updates progress', done => {
Expand Down

0 comments on commit 021ab7f

Please sign in to comment.