Skip to content

Commit

Permalink
Merge pull request #1078 from caolan/queue-724
Browse files Browse the repository at this point in the history
Fix "saturated" and "unsaturated" events in queues
  • Loading branch information
aearly committed Mar 23, 2016
2 parents 153e496 + 1dc3277 commit d77d332
Show file tree
Hide file tree
Showing 6 changed files with 1,002 additions and 1,020 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,8 @@ methods:
the `worker` has finished processing the task. Instead of a single task, a `tasks` array
can be submitted. The respective callback is used for every task in the list.
* `unshift(task, [callback])` - add a new task to the front of the `queue`.
* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, and further tasks will be queued.
* `unsaturated` - a callback that is called when the `queue` length is less than the `concurrency` & `buffer` limits, and further tasks will not be queued.
* `saturated` - a callback that is called when the number of running workers hits the `concurrency` limit, and further tasks will be queued.
* `unsaturated` - a callback that is called when the number of running workers is less than the `concurrency` & `buffer` limits, and further tasks will not be queued.
* `buffer` A minimum threshold buffer in order to say that the `queue` is `unsaturated`.
* `empty` - a callback that is called when the last item from the `queue` is given to a `worker`.
* `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`.
Expand Down
19 changes: 13 additions & 6 deletions lib/internal/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,14 @@ export default function queue(worker, concurrency, payload) {
q.tasks.push(item);
}

if (q.tasks.length === q.concurrency) {
q.saturated();
}
if (q.tasks.length <= (q.concurrency - q.buffer) ) {
q.unsaturated();
}
});
setImmediate(q.process);
}
function _next(q, tasks) {
return function(){
workers -= 1;


var removed = false;
var args = arguments;
arrayEach(tasks, function (task) {
Expand All @@ -67,6 +62,11 @@ export default function queue(worker, concurrency, payload) {

task.callback.apply(task, args);
});

if (workers <= (q.concurrency - q.buffer) ) {
q.unsaturated();
}

if (q.tasks.length + workers === 0) {
q.drain();
}
Expand Down Expand Up @@ -111,8 +111,15 @@ export default function queue(worker, concurrency, payload) {
}
workers += 1;
workersList.push(tasks[0]);

if (workers === q.concurrency) {
q.saturated();
}

var cb = onlyOnce(_next(q, tasks));
worker(data, cb);


}
},
length: function () {
Expand Down
264 changes: 264 additions & 0 deletions mocha_test/cargo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
var async = require('../lib');
var expect = require('chai').expect;
var assert = require('assert');

describe('cargo', function () {

it('cargo', function (done) {
var call_order = [],
delays = [160, 160, 80];

// worker: --12--34--5-
// order of completion: 1,2,3,4,5

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', 'arg');
}, delays.shift());
}, 2);

c.push(1, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(c.length()).to.equal(3);
call_order.push('callback ' + 1);
});
c.push(2, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(c.length()).to.equal(3);
call_order.push('callback ' + 2);
});

expect(c.length()).to.equal(2);

// async push
setTimeout(function () {
c.push(3, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(c.length()).to.equal(1);
call_order.push('callback ' + 3);
});
}, 60);
setTimeout(function () {
c.push(4, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(c.length()).to.equal(1);
call_order.push('callback ' + 4);
});
expect(c.length()).to.equal(2);
c.push(5, function (err, arg) {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(c.length()).to.equal(0);
call_order.push('callback ' + 5);
});
}, 120);


setTimeout(function () {
expect(call_order).to.eql([
'process 1 2', 'callback 1', 'callback 2',
'process 3 4', 'callback 3', 'callback 4',
'process 5' , 'callback 5'
]);
expect(c.length()).to.equal(0);
done();
}, 800);
});

it('without callback', function (done) {
var call_order = [],
delays = [160,80,240,80];

// worker: --1-2---34-5-
// order of completion: 1,2,3,4,5

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', 'arg');
}, delays.shift());
}, 2);

c.push(1);

setTimeout(function () {
c.push(2);
}, 120);
setTimeout(function () {
c.push(3);
c.push(4);
c.push(5);
}, 180);

setTimeout(function () {
expect(call_order).to.eql([
'process 1',
'process 2',
'process 3 4',
'process 5'
]);
done();
}, 800);
});

it('bulk task', function (done) {
var call_order = [],
delays = [120,40];

// worker: -123-4-
// order of completion: 1,2,3,4

var c = async.cargo(function (tasks, callback) {
setTimeout(function () {
call_order.push('process ' + tasks.join(' '));
callback('error', tasks.join(' '));
}, delays.shift());
}, 3);

c.push( [1,2,3,4], function (err, arg) {
expect(err).to.equal('error');
call_order.push('callback ' + arg);
});

expect(c.length()).to.equal(4);

setTimeout(function () {
expect(call_order).to.eql([
'process 1 2 3', 'callback 1 2 3',
'callback 1 2 3', 'callback 1 2 3',
'process 4', 'callback 4',
]);
expect(c.length()).to.equal(0);
done();
}, 800);
});

it('drain once', function (done) {

var c = async.cargo(function (tasks, callback) {
callback();
}, 3);

var drainCounter = 0;
c.drain = function () {
drainCounter++;
};

for(var i = 0; i < 10; i++){
c.push(i);
}

setTimeout(function(){
expect(drainCounter).to.equal(1);
done();
}, 500);
});

it('drain twice', function (done) {

var c = async.cargo(function (tasks, callback) {
callback();
}, 3);

var loadCargo = function(){
for(var i = 0; i < 10; i++){
c.push(i);
}
};

var drainCounter = 0;
c.drain = function () {
drainCounter++;
};

loadCargo();
setTimeout(loadCargo, 500);

setTimeout(function(){
expect(drainCounter).to.equal(2);
done();
}, 1000);
});

it('events', function (done) {
var calls = [];
var q = async.cargo(function(task, cb) {
// nop
calls.push('process ' + task);
async.setImmediate(cb);
}, 1);
q.concurrency = 3;

q.saturated = function() {
assert(q.running() == 3, 'cargo should be saturated now');
calls.push('saturated');
};
q.empty = function() {
assert(q.length() === 0, 'cargo should be empty now');
calls.push('empty');
};
q.drain = function() {
assert(
q.length() === 0 && q.running() === 0,
'cargo should be empty now and no more workers should be running'
);
calls.push('drain');
expect(calls).to.eql([
'process foo',
'process bar',
'saturated',
'process zoo',
'foo cb',
'saturated',
'process poo',
'bar cb',
'empty',
'saturated',
'process moo',
'zoo cb',
'poo cb',
'moo cb',
'drain'
]);
done();
};
q.push('foo', function () {calls.push('foo cb');});
q.push('bar', function () {calls.push('bar cb');});
q.push('zoo', function () {calls.push('zoo cb');});
q.push('poo', function () {calls.push('poo cb');});
q.push('moo', function () {calls.push('moo cb');});
});

it('expose payload', function (done) {
var called_once = false;
var cargo= async.cargo(function(tasks, cb) {
if (!called_once) {
expect(cargo.payload).to.equal(1);
assert(tasks.length === 1, 'should start with payload = 1');
} else {
expect(cargo.payload).to.equal(2);
assert(tasks.length === 2, 'next call shold have payload = 2');
}
called_once = true;
setTimeout(cb, 25);
}, 1);

cargo.drain = function () {
done();
};

expect(cargo.payload).to.equal(1);

cargo.push([1, 2, 3]);

setTimeout(function () {
cargo.payload = 2;
}, 15);
});

});
Loading

0 comments on commit d77d332

Please sign in to comment.