From 0e4d0672bd55d98e11adb93b49a7275510463d47 Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 15:02:52 -0700 Subject: [PATCH 1/6] convert queue tests to mocha --- mocha_test/queue.js | 598 ++++++++++++++++++++++++++++++++++++++++++ test/test-async.js | 617 -------------------------------------------- 2 files changed, 598 insertions(+), 617 deletions(-) diff --git a/mocha_test/queue.js b/mocha_test/queue.js index f64eef5f0..c511e096f 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -1,8 +1,606 @@ var async = require('../lib'); var expect = require('chai').expect; +var assert = require('assert'); describe('queue', function(){ + + it('basics', function(done) { + + var call_order = []; + var delays = [160,80,240,80]; + + + // worker1: --1-4 + // worker2: -2---3 + // order of completion: 2,1,4,3 + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', 'arg'); + }, delays.splice(0,1)[0]); + }, 2); + + q.push(1, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(1); + call_order.push('callback ' + 1); + }); + q.push(2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(2); + call_order.push('callback ' + 2); + }); + q.push(3, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 3); + }); + q.push(4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 4); + }); + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(2); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 2', 'callback 2', + 'process 1', 'callback 1', + 'process 4', 'callback 4', + 'process 3', 'callback 3' + ]); + expect(q.concurrency).to.equal(2); + expect(q.length()).to.equal(0); + done(); + }; + }); + + it('default concurrency', function(done) { + var call_order = [], + delays = [160,80,240,80]; + + // order of completion: 1,2,3,4 + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', 'arg'); + }, delays.splice(0,1)[0]); + }); + + q.push(1, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(3); + call_order.push('callback ' + 1); + }); + q.push(2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(2); + call_order.push('callback ' + 2); + }); + q.push(3, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(1); + call_order.push('callback ' + 3); + }); + q.push(4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 4); + }); + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(1); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2', + 'process 3', 'callback 3', + 'process 4', 'callback 4' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + done(); + }; + }); + + it('zero concurrency', function(done){ + expect(function () { + async.queue(function (task, callback) { + callback(null, task); + }, 0); + }).to.throw(); + done(); + }); + + it('error propagation', function(done){ + var results = []; + + var q = async.queue(function (task, callback) { + callback(task.name === 'foo' ? new Error('fooError') : null); + }, 2); + + q.drain = function() { + expect(results).to.eql(['bar', 'fooError']); + done(); + }; + + q.push({name: 'bar'}, function (err) { + if(err) { + results.push('barError'); + return; + } + + results.push('bar'); + }); + + q.push({name: 'foo'}, function (err) { + if(err) { + results.push('fooError'); + return; + } + + results.push('foo'); + }); + }); + + // The original queue implementation allowed the concurrency to be changed only + // on the same event loop during which a task was added to the queue. This + // test attempts to be a more robust test. + // Start with a concurrency of 1. Wait until a leter event loop and change + // the concurrency to 2. Wait again for a later loop then verify the concurrency + // Repeat that one more time by chaning the concurrency to 5. + it('changing concurrency', function(done) { + + var q = async.queue(function(task, callback){ + setTimeout(function(){ + callback(); + }, 100); + }, 1); + + for(var i = 0; i < 50; i++){ + q.push(''); + } + + q.drain = function(){ + done(); + }; + + setTimeout(function(){ + expect(q.concurrency).to.equal(1); + q.concurrency = 2; + setTimeout(function(){ + expect(q.running()).to.equal(2); + q.concurrency = 5; + setTimeout(function(){ + expect(q.running()).to.equal(5); + }, 500); + }, 500); + }, 500); + }); + + it('push without callback', function(done) { + var call_order = [], + delays = [160,80,240,80]; + + // worker1: --1-4 + // worker2: -2---3 + // order of completion: 2,1,4,3 + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', 'arg'); + }, delays.splice(0,1)[0]); + }, 2); + + q.push(1); + q.push(2); + q.push(3); + q.push(4); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 2', + 'process 1', + 'process 4', + 'process 3' + ]); + done(); + }, 800); + }); + + it('push with non-function', function(done) { + var q = async.queue(function () {}, 1); + expect(function () { + q.push({}, 1); + }).to.throw(); + done(); + }); + + it('unshift', function(done) { + var queue_order = []; + + var q = async.queue(function (task, callback) { + queue_order.push(task); + callback(); + }, 1); + + q.unshift(4); + q.unshift(3); + q.unshift(2); + q.unshift(1); + + setTimeout(function () { + expect(queue_order).to.eql([ 1, 2, 3, 4 ]); + done(); + }, 100); + }); + + it('too many callbacks', function(done) { + var q = async.queue(function (task, callback) { + callback(); + expect(function() { + callback(); + }).to.throw(); + done(); + }, 2); + + q.push(1); + }); + + it('bulk task', function(done) { + var call_order = [], + delays = [160,80,240,80]; + + // worker1: --1-4 + // worker2: -2---3 + // order of completion: 2,1,4,3 + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', task); + }, delays.splice(0,1)[0]); + }, 2); + + q.push( [1,2,3,4], function (err, arg) { + expect(err).to.equal('error'); + call_order.push('callback ' + arg); + }); + + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(2); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 2', 'callback 2', + 'process 1', 'callback 1', + 'process 4', 'callback 4', + 'process 3', 'callback 3' + ]); + expect(q.concurrency).to.equal(2); + expect(q.length()).to.equal(0); + done(); + }, 800); + }); + + it('idle', function(done) { + var q = async.queue(function (task, callback) { + // Queue is busy when workers are running + expect(q.idle()).to.equal(false); + callback(); + }, 1); + + // Queue is idle before anything added + expect(q.idle()).to.equal(true); + + q.unshift(4); + q.unshift(3); + q.unshift(2); + q.unshift(1); + + // Queue is busy when tasks added + expect(q.idle()).to.equal(false); + + q.drain = function() { + // Queue is idle after drain + expect(q.idle()).to.equal(true); + done(); + }; + }); + + it('pause', function(done) { + var call_order = [], + task_timeout = 100, + pause_timeout = 300, + resume_timeout = 500, + tasks = [ 1, 2, 3, 4, 5, 6 ], + + elapsed = (function () { + var start = (new Date()).valueOf(); + return function () { + return Math.round(((new Date()).valueOf() - start) / 100) * 100; + }; + })(); + + var q = async.queue(function (task, callback) { + call_order.push('process ' + task); + call_order.push('timeout ' + elapsed()); + callback(); + }); + + function pushTask () { + var task = tasks.shift(); + if (!task) { return; } + setTimeout(function () { + q.push(task); + pushTask(); + }, task_timeout); + } + pushTask(); + + setTimeout(function () { + q.pause(); + expect(q.paused).to.equal(true); + }, pause_timeout); + + setTimeout(function () { + q.resume(); + expect(q.paused).to.equal(false); + }, resume_timeout); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 1', 'timeout 100', + 'process 2', 'timeout 200', + 'process 3', 'timeout 500', + 'process 4', 'timeout 500', + 'process 5', 'timeout 500', + 'process 6', 'timeout 600' + ]); + done(); + }, 800); + }); + + it('pause in worker with concurrency', function(done) { + var call_order = []; + var q = async.queue(function (task, callback) { + if (task.isLongRunning) { + q.pause(); + setTimeout(function () { + call_order.push(task.id); + q.resume(); + callback(); + }, 500); + } + else { + call_order.push(task.id); + callback(); + } + }, 10); + + q.push({ id: 1, isLongRunning: true}); + q.push({ id: 2 }); + q.push({ id: 3 }); + q.push({ id: 4 }); + q.push({ id: 5 }); + + setTimeout(function () { + expect(call_order).to.eql([1, 2, 3, 4, 5]); + done(); + }, 1000); + }); + + it('pause with concurrency', function(done) { + var call_order = [], + task_timeout = 100, + pause_timeout = 50, + resume_timeout = 300, + tasks = [ 1, 2, 3, 4, 5, 6 ], + + elapsed = (function () { + var start = (new Date()).valueOf(); + return function () { + return Math.round(((new Date()).valueOf() - start) / 100) * 100; + }; + })(); + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + call_order.push('timeout ' + elapsed()); + callback(); + }, task_timeout); + }, 2); + + q.push(tasks); + + setTimeout(function () { + q.pause(); + expect(q.paused).to.equal(true); + }, pause_timeout); + + setTimeout(function () { + q.resume(); + expect(q.paused).to.equal(false); + }, resume_timeout); + + setTimeout(function () { + expect(q.running()).to.equal(2); + }, resume_timeout + 10); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 1', 'timeout 100', + 'process 2', 'timeout 100', + 'process 3', 'timeout 400', + 'process 4', 'timeout 400', + 'process 5', 'timeout 500', + 'process 6', 'timeout 500' + ]); + done(); + }, 800); + }); + + it('start paused', function(done) { + var q = async.queue(function (task, callback) { + setTimeout(function () { + callback(); + }, 40); + }, 2); + q.pause(); + + q.push([1, 2, 3]); + + setTimeout(function () { + q.resume(); + }, 5); + + setTimeout(function () { + expect(q.tasks.length).to.equal(1); + expect(q.running()).to.equal(2); + q.resume(); + }, 15); + + q.drain = function () { + done(); + }; + }); + + it('kill', function(done) { + var q = async.queue(function (task, callback) { + setTimeout(function () { + throw new Error("Function should never be called"); + }, 300); + }, 1); + q.drain = function() { + throw new Error("Function should never be called"); + }; + + q.push(0); + + q.kill(); + + setTimeout(function() { + expect(q.length()).to.equal(0); + done(); + }, 600); + }); + + it('events', function(done) { + var calls = []; + var q = async.queue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + q.concurrency = 3; + + q.saturated = function() { + assert(q.length() == 3, 'queue should be saturated now'); + calls.push('saturated'); + }; + q.empty = function() { + assert(q.length() === 0, 'queue should be empty now'); + calls.push('empty'); + }; + q.drain = function() { + assert( + q.length() === 0 && q.running() === 0, + 'queue should be empty now and no more workers should be running' + ); + calls.push('drain'); + expect(calls).to.eql([ + 'saturated', + 'process foo', + 'process bar', + 'process zoo', + 'foo cb', + 'process poo', + 'bar cb', + 'empty', + 'process moo', + 'zoo cb', + 'poo cb', + 'moo cb', + 'drain' + ]); + done(); + }; + q.push('foo', function () {calls.push('foo cb');}); + q.push('bar', function () {calls.push('bar cb');}); + q.push('zoo', function () {calls.push('zoo cb');}); + q.push('poo', function () {calls.push('poo cb');}); + q.push('moo', function () {calls.push('moo cb');}); + }); + + it('empty', function(done) { + var calls = []; + var q = async.queue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 3); + + q.drain = function() { + assert( + q.length() === 0 && q.running() === 0, + 'queue should be empty now and no more workers should be running' + ); + calls.push('drain'); + expect(calls).to.eql([ + 'drain' + ]); + done(); + }; + q.push([]); + }); + + it('saturated', function(done) { + var saturatedCalled = false; + var q = async.queue(function(task, cb) { + async.setImmediate(cb); + }, 2); + + q.saturated = function () { + saturatedCalled = true; + }; + q.drain = function () { + assert(saturatedCalled, "saturated not called"); + done(); + }; + + setTimeout(function () { + q.push(['foo', 'bar', 'baz', 'moo']); + }, 10); + }); + + it('started', function(done) { + + var q = async.queue(function(task, cb) { + cb(null, task); + }); + + expect(q.started).to.equal(false); + q.push([]); + expect(q.started).to.equal(true); + done(); + }); + + + context('q.unsaturated(): ',function() { it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){ var calls = []; diff --git a/test/test-async.js b/test/test-async.js index f12d7c170..d4ea377f1 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2147,623 +2147,6 @@ exports['whilst optional callback'] = function (test) { test.done(); }; -exports['queue'] = { - - 'queue': function (test) { - test.expect(17); - - var call_order = [], - delays = [160,80,240,80]; - - // worker1: --1-4 - // worker2: -2---3 - // order of completion: 2,1,4,3 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }, 2); - - q.push(1, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 1); - }); - q.push(2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 2); - }); - q.push(3, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 3); - }); - q.push(4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 2); - - q.drain = function () { - test.same(call_order, [ - 'process 2', 'callback 2', - 'process 1', 'callback 1', - 'process 4', 'callback 4', - 'process 3', 'callback 3' - ]); - test.equal(q.concurrency, 2); - test.equal(q.length(), 0); - test.done(); - }; -}, - - 'default concurrency': function (test) { - test.expect(17); - var call_order = [], - delays = [160,80,240,80]; - - // order of completion: 1,2,3,4 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }); - - q.push(1, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 3); - call_order.push('callback ' + 1); - }); - q.push(2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 2); - }); - q.push(3, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 3); - }); - q.push(4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 1); - - q.drain = function () { - test.same(call_order, [ - 'process 1', 'callback 1', - 'process 2', 'callback 2', - 'process 3', 'callback 3', - 'process 4', 'callback 4' - ]); - test.equal(q.concurrency, 1); - test.equal(q.length(), 0); - test.done(); - }; -}, - - 'zero concurrency': function(test){ - test.expect(1); - test.throws(function () { - async.queue(function (task, callback) { - callback(null, task); - }, 0); - }); - test.done(); -}, - - 'error propagation': function(test){ - test.expect(1); - var results = []; - - var q = async.queue(function (task, callback) { - callback(task.name === 'foo' ? new Error('fooError') : null); - }, 2); - - q.drain = function() { - test.deepEqual(results, ['bar', 'fooError']); - test.done(); - }; - - q.push({name: 'bar'}, function (err) { - if(err) { - results.push('barError'); - return; - } - - results.push('bar'); - }); - - q.push({name: 'foo'}, function (err) { - if(err) { - results.push('fooError'); - return; - } - - results.push('foo'); - }); -}, - - // The original queue implementation allowed the concurrency to be changed only - // on the same event loop during which a task was added to the queue. This - // test attempts to be a more robust test. - // Start with a concurrency of 1. Wait until a leter event loop and change - // the concurrency to 2. Wait again for a later loop then verify the concurrency. - // Repeat that one more time by chaning the concurrency to 5. - 'changing concurrency': function (test) { - test.expect(3); - - var q = async.queue(function(task, callback){ - setTimeout(function(){ - callback(); - }, 100); - }, 1); - - for(var i = 0; i < 50; i++){ - q.push(''); - } - - q.drain = function(){ - test.done(); - }; - - setTimeout(function(){ - test.equal(q.concurrency, 1); - q.concurrency = 2; - setTimeout(function(){ - test.equal(q.running(), 2); - q.concurrency = 5; - setTimeout(function(){ - test.equal(q.running(), 5); - }, 500); - }, 500); - }, 500); -}, - - 'push without callback': function (test) { - test.expect(1); - var call_order = [], - delays = [160,80,240,80]; - - // worker1: --1-4 - // worker2: -2---3 - // order of completion: 2,1,4,3 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }, 2); - - q.push(1); - q.push(2); - q.push(3); - q.push(4); - - setTimeout(function () { - test.same(call_order, [ - 'process 2', - 'process 1', - 'process 4', - 'process 3' - ]); - test.done(); - }, 800); -}, - - 'push with non-function': function (test) { - test.expect(1); - var q = async.queue(function () {}, 1); - test.throws(function () { - q.push({}, 1); - }); - test.done(); -}, - - 'unshift': function (test) { - test.expect(1); - var queue_order = []; - - var q = async.queue(function (task, callback) { - queue_order.push(task); - callback(); - }, 1); - - q.unshift(4); - q.unshift(3); - q.unshift(2); - q.unshift(1); - - setTimeout(function () { - test.same(queue_order, [ 1, 2, 3, 4 ]); - test.done(); - }, 100); -}, - - 'too many callbacks': function (test) { - test.expect(1); - var q = async.queue(function (task, callback) { - callback(); - test.throws(function() { - callback(); - }); - test.done(); - }, 2); - - q.push(1); -}, - - 'bulk task': function (test) { - test.expect(9); - var call_order = [], - delays = [160,80,240,80]; - - // worker1: --1-4 - // worker2: -2---3 - // order of completion: 2,1,4,3 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', task); - }, delays.splice(0,1)[0]); - }, 2); - - q.push( [1,2,3,4], function (err, arg) { - test.equal(err, 'error'); - call_order.push('callback ' + arg); - }); - - test.equal(q.length(), 4); - test.equal(q.concurrency, 2); - - setTimeout(function () { - test.same(call_order, [ - 'process 2', 'callback 2', - 'process 1', 'callback 1', - 'process 4', 'callback 4', - 'process 3', 'callback 3' - ]); - test.equal(q.concurrency, 2); - test.equal(q.length(), 0); - test.done(); - }, 800); -}, - - 'idle': function(test) { - test.expect(7); - var q = async.queue(function (task, callback) { - // Queue is busy when workers are running - test.equal(q.idle(), false); - callback(); - }, 1); - - // Queue is idle before anything added - test.equal(q.idle(), true); - - q.unshift(4); - q.unshift(3); - q.unshift(2); - q.unshift(1); - - // Queue is busy when tasks added - test.equal(q.idle(), false); - - q.drain = function() { - // Queue is idle after drain - test.equal(q.idle(), true); - test.done(); - }; -}, - - 'pause': function(test) { - test.expect(3); - var call_order = [], - task_timeout = 100, - pause_timeout = 300, - resume_timeout = 500, - tasks = [ 1, 2, 3, 4, 5, 6 ], - - elapsed = (function () { - var start = (new Date()).valueOf(); - return function () { - return Math.round(((new Date()).valueOf() - start) / 100) * 100; - }; - })(); - - var q = async.queue(function (task, callback) { - call_order.push('process ' + task); - call_order.push('timeout ' + elapsed()); - callback(); - }); - - function pushTask () { - var task = tasks.shift(); - if (!task) { return; } - setTimeout(function () { - q.push(task); - pushTask(); - }, task_timeout); - } - pushTask(); - - setTimeout(function () { - q.pause(); - test.equal(q.paused, true); - }, pause_timeout); - - setTimeout(function () { - q.resume(); - test.equal(q.paused, false); - }, resume_timeout); - - setTimeout(function () { - test.same(call_order, [ - 'process 1', 'timeout 100', - 'process 2', 'timeout 200', - 'process 3', 'timeout 500', - 'process 4', 'timeout 500', - 'process 5', 'timeout 500', - 'process 6', 'timeout 600' - ]); - test.done(); - }, 800); -}, - - 'pause in worker with concurrency': function(test) { - test.expect(1); - var call_order = []; - var q = async.queue(function (task, callback) { - if (task.isLongRunning) { - q.pause(); - setTimeout(function () { - call_order.push(task.id); - q.resume(); - callback(); - }, 500); - } - else { - call_order.push(task.id); - callback(); - } - }, 10); - - q.push({ id: 1, isLongRunning: true}); - q.push({ id: 2 }); - q.push({ id: 3 }); - q.push({ id: 4 }); - q.push({ id: 5 }); - - setTimeout(function () { - test.same(call_order, [1, 2, 3, 4, 5]); - test.done(); - }, 1000); - }, - - 'pause with concurrency': function(test) { - test.expect(4); - var call_order = [], - task_timeout = 100, - pause_timeout = 50, - resume_timeout = 300, - tasks = [ 1, 2, 3, 4, 5, 6 ], - - elapsed = (function () { - var start = (new Date()).valueOf(); - return function () { - return Math.round(((new Date()).valueOf() - start) / 100) * 100; - }; - })(); - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - call_order.push('timeout ' + elapsed()); - callback(); - }, task_timeout); - }, 2); - - q.push(tasks); - - setTimeout(function () { - q.pause(); - test.equal(q.paused, true); - }, pause_timeout); - - setTimeout(function () { - q.resume(); - test.equal(q.paused, false); - }, resume_timeout); - - setTimeout(function () { - test.equal(q.running(), 2); - }, resume_timeout + 10); - - setTimeout(function () { - test.same(call_order, [ - 'process 1', 'timeout 100', - 'process 2', 'timeout 100', - 'process 3', 'timeout 400', - 'process 4', 'timeout 400', - 'process 5', 'timeout 500', - 'process 6', 'timeout 500' - ]); - test.done(); - }, 800); -}, - - 'start paused': function (test) { - test.expect(2); - var q = async.queue(function (task, callback) { - setTimeout(function () { - callback(); - }, 40); - }, 2); - q.pause(); - - q.push([1, 2, 3]); - - setTimeout(function () { - q.resume(); - }, 5); - - setTimeout(function () { - test.equal(q.tasks.length, 1); - test.equal(q.running(), 2); - q.resume(); - }, 15); - - q.drain = function () { - test.done(); - }; -}, - - 'kill': function (test) { - test.expect(1); - var q = async.queue(function (task, callback) { - setTimeout(function () { - test.ok(false, "Function should never be called"); - callback(); - }, 300); - }, 1); - q.drain = function() { - test.ok(false, "Function should never be called"); - }; - - q.push(0); - - q.kill(); - - setTimeout(function() { - test.equal(q.length(), 0); - test.done(); - }, 600); -}, - - 'events': function(test) { - test.expect(4); - var calls = []; - var q = async.queue(function(task, cb) { - // nop - calls.push('process ' + task); - async.setImmediate(cb); - }, 10); - q.concurrency = 3; - - q.saturated = function() { - test.ok(q.length() == 3, 'queue should be saturated now'); - calls.push('saturated'); - }; - q.empty = function() { - test.ok(q.length() === 0, 'queue should be empty now'); - calls.push('empty'); - }; - q.drain = function() { - test.ok( - q.length() === 0 && q.running() === 0, - 'queue should be empty now and no more workers should be running' - ); - calls.push('drain'); - test.same(calls, [ - 'saturated', - 'process foo', - 'process bar', - 'process zoo', - 'foo cb', - 'process poo', - 'bar cb', - 'empty', - 'process moo', - 'zoo cb', - 'poo cb', - 'moo cb', - 'drain' - ]); - test.done(); - }; - q.push('foo', function () {calls.push('foo cb');}); - q.push('bar', function () {calls.push('bar cb');}); - q.push('zoo', function () {calls.push('zoo cb');}); - q.push('poo', function () {calls.push('poo cb');}); - q.push('moo', function () {calls.push('moo cb');}); -}, - - 'empty': function(test) { - test.expect(2); - var calls = []; - var q = async.queue(function(task, cb) { - // nop - calls.push('process ' + task); - async.setImmediate(cb); - }, 3); - - q.drain = function() { - test.ok( - q.length() === 0 && q.running() === 0, - 'queue should be empty now and no more workers should be running' - ); - calls.push('drain'); - test.same(calls, [ - 'drain' - ]); - test.done(); - }; - q.push([]); -}, - - 'saturated': function (test) { - test.expect(1); - var saturatedCalled = false; - var q = async.queue(function(task, cb) { - async.setImmediate(cb); - }, 2); - - q.saturated = function () { - saturatedCalled = true; - }; - q.drain = function () { - test.ok(saturatedCalled, "saturated not called"); - test.done(); - }; - - setTimeout(function () { - q.push(['foo', 'bar', 'baz', 'moo']); - }, 10); -}, - - 'started': function(test) { - test.expect(2); - - var q = async.queue(function(task, cb) { - cb(null, task); - }); - - test.equal(q.started, false); - q.push([]); - test.equal(q.started, true); - test.done(); -} - -}; exports['priorityQueue'] = { From 6dd08eed9f15ed6b986a83c0f849394118f98e91 Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 15:26:46 -0700 Subject: [PATCH 2/6] convert priorityQueue tests to mocha --- lib/internal/queue.js | 15 +++-- mocha_test/priorityQueue.js | 108 ++++++++++++++++++++++++++++++++++ mocha_test/queue.js | 6 +- test/test-async.js | 112 ------------------------------------ 4 files changed, 119 insertions(+), 122 deletions(-) diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 3e7eb1e68..95210914e 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -42,12 +42,6 @@ export default function queue(worker, concurrency, payload) { q.tasks.push(item); } - if (q.tasks.length === q.concurrency) { - q.saturated(); - } - if (q.tasks.length <= (q.concurrency - q.buffer) ) { - q.unsaturated(); - } }); setImmediate(q.process); } @@ -111,8 +105,17 @@ export default function queue(worker, concurrency, payload) { } workers += 1; workersList.push(tasks[0]); + + if (workers === q.concurrency) { + q.saturated(); + } + if (workers <= (q.concurrency - q.buffer) ) { + q.unsaturated(); + } + var cb = onlyOnce(_next(q, tasks)); worker(data, cb); + } }, length: function () { diff --git a/mocha_test/priorityQueue.js b/mocha_test/priorityQueue.js index 9832b4308..9ed74c695 100644 --- a/mocha_test/priorityQueue.js +++ b/mocha_test/priorityQueue.js @@ -2,6 +2,114 @@ var async = require('../lib'); var expect = require('chai').expect; describe('priorityQueue', function() { + + it('priorityQueue', function (done) { + var call_order = []; + + // order of completion: 2,1,4,3 + + var q = async.priorityQueue(function (task, callback) { + call_order.push('process ' + task); + callback('error', 'arg'); + }, 1); + + q.push(1, 1.4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(2); + call_order.push('callback ' + 1); + }); + q.push(2, 0.2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(3); + call_order.push('callback ' + 2); + }); + q.push(3, 3.8, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 3); + }); + q.push(4, 2.9, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(1); + call_order.push('callback ' + 4); + }); + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(1); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 2', 'callback 2', + 'process 1', 'callback 1', + 'process 4', 'callback 4', + 'process 3', 'callback 3' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + done(); + }; + }); + + it('concurrency', function (done) { + var call_order = [], + delays = [160,80,240,80]; + + // worker1: --2-3 + // worker2: -1---4 + // order of completion: 1,2,3,4 + + var q = async.priorityQueue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', 'arg'); + }, delays.splice(0,1)[0]); + }, 2); + + q.push(1, 1.4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(2); + call_order.push('callback ' + 1); + }); + q.push(2, 0.2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(1); + call_order.push('callback ' + 2); + }); + q.push(3, 3.8, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 3); + }); + q.push(4, 2.9, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(q.length()).to.equal(0); + call_order.push('callback ' + 4); + }); + expect(q.length()).to.equal(4); + expect(q.concurrency).to.equal(2); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2', + 'process 3', 'callback 3', + 'process 4', 'callback 4' + ]); + expect(q.concurrency).to.equal(2); + expect(q.length()).to.equal(0); + done(); + }; + }); + + + context('q.unsaturated(): ',function() { it('should have a default buffer property that equals 25% of the concurrenct rate', function(done) { var calls = []; diff --git a/mocha_test/queue.js b/mocha_test/queue.js index c511e096f..654c2c024 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -504,7 +504,7 @@ describe('queue', function(){ var q = async.queue(function(task, cb) { // nop calls.push('process ' + task); - async.setImmediate(cb); + setTimeout(cb, 10); }, 10); q.concurrency = 3; @@ -582,9 +582,7 @@ describe('queue', function(){ done(); }; - setTimeout(function () { - q.push(['foo', 'bar', 'baz', 'moo']); - }, 10); + q.push(['foo', 'bar', 'baz', 'moo']); }); it('started', function(done) { diff --git a/test/test-async.js b/test/test-async.js index d4ea377f1..c87a0aa68 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2149,118 +2149,6 @@ exports['whilst optional callback'] = function (test) { -exports['priorityQueue'] = { - - 'priorityQueue': function (test) { - test.expect(17); - var call_order = []; - - // order of completion: 2,1,4,3 - - var q = async.priorityQueue(function (task, callback) { - call_order.push('process ' + task); - callback('error', 'arg'); - }, 1); - - q.push(1, 1.4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 1); - }); - q.push(2, 0.2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 3); - call_order.push('callback ' + 2); - }); - q.push(3, 3.8, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 3); - }); - q.push(4, 2.9, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 1); - - q.drain = function () { - test.same(call_order, [ - 'process 2', 'callback 2', - 'process 1', 'callback 1', - 'process 4', 'callback 4', - 'process 3', 'callback 3' - ]); - test.equal(q.concurrency, 1); - test.equal(q.length(), 0); - test.done(); - }; -}, - - 'concurrency': function (test) { - test.expect(17); - var call_order = [], - delays = [160,80,240,80]; - - // worker1: --2-3 - // worker2: -1---4 - // order of completion: 1,2,3,4 - - var q = async.priorityQueue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }, 2); - - q.push(1, 1.4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 1); - }); - q.push(2, 0.2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 2); - }); - q.push(3, 3.8, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 3); - }); - q.push(4, 2.9, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 2); - - q.drain = function () { - test.same(call_order, [ - 'process 1', 'callback 1', - 'process 2', 'callback 2', - 'process 3', 'callback 3', - 'process 4', 'callback 4' - ]); - test.equal(q.concurrency, 2); - test.equal(q.length(), 0); - test.done(); - }; -} - -}; - - exports['cargo'] = { 'cargo': function (test) { From 1dec998392e58a74c452d826d83af48db30f3eb0 Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 15:47:38 -0700 Subject: [PATCH 3/6] changed saturated and unsaturated to better reflect reality --- lib/internal/queue.js | 10 +++++++--- mocha_test/priorityQueue.js | 15 +++++++++------ mocha_test/queue.js | 24 +++++++++++++----------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 95210914e..9a1f77f91 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -49,6 +49,7 @@ export default function queue(worker, concurrency, payload) { return function(){ workers -= 1; + var removed = false; var args = arguments; arrayEach(tasks, function (task) { @@ -61,6 +62,11 @@ export default function queue(worker, concurrency, payload) { task.callback.apply(task, args); }); + + if (workers <= (q.concurrency - q.buffer) ) { + q.unsaturated(); + } + if (q.tasks.length + workers === 0) { q.drain(); } @@ -109,13 +115,11 @@ export default function queue(worker, concurrency, payload) { if (workers === q.concurrency) { q.saturated(); } - if (workers <= (q.concurrency - q.buffer) ) { - q.unsaturated(); - } var cb = onlyOnce(_next(q, tasks)); worker(data, cb); + } }, length: function () { diff --git a/mocha_test/priorityQueue.js b/mocha_test/priorityQueue.js index 9ed74c695..fa5cfcedf 100644 --- a/mocha_test/priorityQueue.js +++ b/mocha_test/priorityQueue.js @@ -139,8 +139,8 @@ describe('priorityQueue', function() { var calls = []; var q = async.priorityQueue(function(task, cb) { calls.push('process ' + task); - async.setImmediate(cb); - }, 10); + setTimeout(cb, 10); + }, 4); q.unsaturated = function() { calls.push('unsaturated'); }; @@ -148,8 +148,6 @@ describe('priorityQueue', function() { expect(calls.indexOf('unsaturated')).to.be.above(-1); setTimeout(function() { expect(calls).eql([ - 'unsaturated', - 'unsaturated', 'unsaturated', 'unsaturated', 'unsaturated', @@ -157,12 +155,17 @@ describe('priorityQueue', function() { 'process foo3', 'process foo2', 'process foo1', - 'process foo0', 'foo4 cb', + 'unsaturated', + 'process foo0', 'foo3 cb', + 'unsaturated', 'foo2 cb', + 'unsaturated', 'foo1 cb', - 'foo0 cb' + 'unsaturated', + 'foo0 cb', + 'unsaturated' ]); done(); }, 50); diff --git a/mocha_test/queue.js b/mocha_test/queue.js index 654c2c024..7170f44a7 100644 --- a/mocha_test/queue.js +++ b/mocha_test/queue.js @@ -505,11 +505,11 @@ describe('queue', function(){ // nop calls.push('process ' + task); setTimeout(cb, 10); - }, 10); + }, 3); q.concurrency = 3; q.saturated = function() { - assert(q.length() == 3, 'queue should be saturated now'); + assert(q.running() == 3, 'queue should be saturated now'); calls.push('saturated'); }; q.empty = function() { @@ -523,14 +523,16 @@ describe('queue', function(){ ); calls.push('drain'); expect(calls).to.eql([ - 'saturated', 'process foo', 'process bar', + 'saturated', 'process zoo', 'foo cb', + 'saturated', 'process poo', 'bar cb', 'empty', + 'saturated', 'process moo', 'zoo cb', 'poo cb', @@ -627,7 +629,7 @@ describe('queue', function(){ var q = async.queue(function(task, cb) { calls.push('process ' + task); async.setImmediate(cb); - }, 10); + }, 4); q.unsaturated = function() { calls.push('unsaturated'); }; @@ -635,21 +637,21 @@ describe('queue', function(){ expect(calls.indexOf('unsaturated')).to.be.above(-1); setTimeout(function() { expect(calls).eql([ - 'unsaturated', - 'unsaturated', - 'unsaturated', - 'unsaturated', - 'unsaturated', 'process foo0', 'process foo1', 'process foo2', 'process foo3', - 'process foo4', 'foo0 cb', + 'unsaturated', + 'process foo4', 'foo1 cb', + 'unsaturated', 'foo2 cb', + 'unsaturated', 'foo3 cb', - 'foo4 cb' + 'unsaturated', + 'foo4 cb', + 'unsaturated' ]); done(); }, 50); From 2f5ef064901b620acf6c8633fe2afeacbad87e2b Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 15:52:34 -0700 Subject: [PATCH 4/6] clarify saturated/unsaturated --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2b5b5937e..a106612dd 100644 --- a/README.md +++ b/README.md @@ -1250,8 +1250,8 @@ methods: the `worker` has finished processing the task. Instead of a single task, a `tasks` array can be submitted. The respective callback is used for every task in the list. * `unshift(task, [callback])` - add a new task to the front of the `queue`. -* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, and further tasks will be queued. -* `unsaturated` - a callback that is called when the `queue` length is less than the `concurrency` & `buffer` limits, and further tasks will not be queued. +* `saturated` - a callback that is called when the number of running workers hits the `concurrency` limit, and further tasks will be queued. +* `unsaturated` - a callback that is called when the number of running workers is less than the `concurrency` & `buffer` limits, and further tasks will not be queued. * `buffer` A minimum threshold buffer in order to say that the `queue` is `unsaturated`. * `empty` - a callback that is called when the last item from the `queue` is given to a `worker`. * `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`. From fafb4ef331d10ac20d62ff49b8841754438c3c78 Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 16:01:29 -0700 Subject: [PATCH 5/6] convert cargo tests to mocha --- mocha_test/cargo.js | 262 ++++++++++++++++++++++++++++++++++++++++++ test/test-async.js | 269 -------------------------------------------- 2 files changed, 262 insertions(+), 269 deletions(-) create mode 100644 mocha_test/cargo.js diff --git a/mocha_test/cargo.js b/mocha_test/cargo.js new file mode 100644 index 000000000..ece80b6ae --- /dev/null +++ b/mocha_test/cargo.js @@ -0,0 +1,262 @@ +var async = require('../lib'); +var expect = require('chai').expect; +var assert = require('assert'); + +describe('cargo', function () { + + it('cargo', function (done) { + var call_order = [], + delays = [160, 160, 80]; + + // worker: --12--34--5- + // order of completion: 1,2,3,4,5 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', 'arg'); + }, delays.shift()); + }, 2); + + c.push(1, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(c.length()).to.equal(3); + call_order.push('callback ' + 1); + }); + c.push(2, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(c.length()).to.equal(3); + call_order.push('callback ' + 2); + }); + + expect(c.length()).to.equal(2); + + // async push + setTimeout(function () { + c.push(3, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(c.length()).to.equal(1); + call_order.push('callback ' + 3); + }); + }, 60); + setTimeout(function () { + c.push(4, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(c.length()).to.equal(1); + call_order.push('callback ' + 4); + }); + expect(c.length()).to.equal(2); + c.push(5, function (err, arg) { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + expect(c.length()).to.equal(0); + call_order.push('callback ' + 5); + }); + }, 120); + + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 1 2', 'callback 1', 'callback 2', + 'process 3 4', 'callback 3', 'callback 4', + 'process 5' , 'callback 5' + ]); + expect(c.length()).to.equal(0); + done(); + }, 800); + }); + + it('without callback', function (done) { + var call_order = [], + delays = [160,80,240,80]; + + // worker: --1-2---34-5- + // order of completion: 1,2,3,4,5 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', 'arg'); + }, delays.shift()); + }, 2); + + c.push(1); + + setTimeout(function () { + c.push(2); + }, 120); + setTimeout(function () { + c.push(3); + c.push(4); + c.push(5); + }, 180); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 1', + 'process 2', + 'process 3 4', + 'process 5' + ]); + done(); + }, 800); + }); + + it('bulk task', function (done) { + var call_order = [], + delays = [120,40]; + + // worker: -123-4- + // order of completion: 1,2,3,4 + + var c = async.cargo(function (tasks, callback) { + setTimeout(function () { + call_order.push('process ' + tasks.join(' ')); + callback('error', tasks.join(' ')); + }, delays.shift()); + }, 3); + + c.push( [1,2,3,4], function (err, arg) { + expect(err).to.equal('error'); + call_order.push('callback ' + arg); + }); + + expect(c.length()).to.equal(4); + + setTimeout(function () { + expect(call_order).to.eql([ + 'process 1 2 3', 'callback 1 2 3', + 'callback 1 2 3', 'callback 1 2 3', + 'process 4', 'callback 4', + ]); + expect(c.length()).to.equal(0); + done(); + }, 800); + }); + + it('drain once', function (done) { + + var c = async.cargo(function (tasks, callback) { + callback(); + }, 3); + + var drainCounter = 0; + c.drain = function () { + drainCounter++; + }; + + for(var i = 0; i < 10; i++){ + c.push(i); + } + + setTimeout(function(){ + expect(drainCounter).to.equal(1); + done(); + }, 500); + }); + + it('drain twice', function (done) { + + var c = async.cargo(function (tasks, callback) { + callback(); + }, 3); + + var loadCargo = function(){ + for(var i = 0; i < 10; i++){ + c.push(i); + } + }; + + var drainCounter = 0; + c.drain = function () { + drainCounter++; + }; + + loadCargo(); + setTimeout(loadCargo, 500); + + setTimeout(function(){ + expect(drainCounter).to.equal(2); + done(); + }, 1000); + }); + + it('events', function (done) { + var calls = []; + var q = async.cargo(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 1); + q.concurrency = 3; + + q.saturated = function() { + assert(q.length() == 3, 'cargo should be saturated now'); + calls.push('saturated'); + }; + q.empty = function() { + assert(q.length() === 0, 'cargo should be empty now'); + calls.push('empty'); + }; + q.drain = function() { + assert( + q.length() === 0 && q.running() === 0, + 'cargo should be empty now and no more workers should be running' + ); + calls.push('drain'); + expect(calls).to.eql([ + 'saturated', + 'process foo', + 'process bar', + 'process zoo', + 'foo cb', + 'process poo', + 'bar cb', + 'empty', + 'process moo', + 'zoo cb', + 'poo cb', + 'moo cb', + 'drain' + ]); + done(); + }; + q.push('foo', function () {calls.push('foo cb');}); + q.push('bar', function () {calls.push('bar cb');}); + q.push('zoo', function () {calls.push('zoo cb');}); + q.push('poo', function () {calls.push('poo cb');}); + q.push('moo', function () {calls.push('moo cb');}); + }); + + it('expose payload', function (done) { + var called_once = false; + var cargo= async.cargo(function(tasks, cb) { + if (!called_once) { + expect(cargo.payload).to.equal(1); + assert(tasks.length === 1, 'should start with payload = 1'); + } else { + expect(cargo.payload).to.equal(2); + assert(tasks.length === 2, 'next call shold have payload = 2'); + } + called_once = true; + setTimeout(cb, 25); + }, 1); + + cargo.drain = function () { + done(); + }; + + expect(cargo.payload).to.equal(1); + + cargo.push([1, 2, 3]); + + setTimeout(function () { + cargo.payload = 2; + }, 15); + }); + +}); diff --git a/test/test-async.js b/test/test-async.js index c87a0aa68..3845bacb8 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2148,275 +2148,6 @@ exports['whilst optional callback'] = function (test) { }; - -exports['cargo'] = { - - 'cargo': function (test) { - test.expect(19); - var call_order = [], - delays = [160, 160, 80]; - - // worker: --12--34--5- - // order of completion: 1,2,3,4,5 - - var c = async.cargo(function (tasks, callback) { - setTimeout(function () { - call_order.push('process ' + tasks.join(' ')); - callback('error', 'arg'); - }, delays.shift()); - }, 2); - - c.push(1, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(c.length(), 3); - call_order.push('callback ' + 1); - }); - c.push(2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(c.length(), 3); - call_order.push('callback ' + 2); - }); - - test.equal(c.length(), 2); - - // async push - setTimeout(function () { - c.push(3, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(c.length(), 1); - call_order.push('callback ' + 3); - }); - }, 60); - setTimeout(function () { - c.push(4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(c.length(), 1); - call_order.push('callback ' + 4); - }); - test.equal(c.length(), 2); - c.push(5, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(c.length(), 0); - call_order.push('callback ' + 5); - }); - }, 120); - - - setTimeout(function () { - test.same(call_order, [ - 'process 1 2', 'callback 1', 'callback 2', - 'process 3 4', 'callback 3', 'callback 4', - 'process 5' , 'callback 5' - ]); - test.equal(c.length(), 0); - test.done(); - }, 800); -}, - - 'without callback': function (test) { - test.expect(1); - var call_order = [], - delays = [160,80,240,80]; - - // worker: --1-2---34-5- - // order of completion: 1,2,3,4,5 - - var c = async.cargo(function (tasks, callback) { - setTimeout(function () { - call_order.push('process ' + tasks.join(' ')); - callback('error', 'arg'); - }, delays.shift()); - }, 2); - - c.push(1); - - setTimeout(function () { - c.push(2); - }, 120); - setTimeout(function () { - c.push(3); - c.push(4); - c.push(5); - }, 180); - - setTimeout(function () { - test.same(call_order, [ - 'process 1', - 'process 2', - 'process 3 4', - 'process 5' - ]); - test.done(); - }, 800); -}, - - 'bulk task': function (test) { - test.expect(7); - var call_order = [], - delays = [120,40]; - - // worker: -123-4- - // order of completion: 1,2,3,4 - - var c = async.cargo(function (tasks, callback) { - setTimeout(function () { - call_order.push('process ' + tasks.join(' ')); - callback('error', tasks.join(' ')); - }, delays.shift()); - }, 3); - - c.push( [1,2,3,4], function (err, arg) { - test.equal(err, 'error'); - call_order.push('callback ' + arg); - }); - - test.equal(c.length(), 4); - - setTimeout(function () { - test.same(call_order, [ - 'process 1 2 3', 'callback 1 2 3', - 'callback 1 2 3', 'callback 1 2 3', - 'process 4', 'callback 4', - ]); - test.equal(c.length(), 0); - test.done(); - }, 800); -}, - - 'drain once': function (test) { - test.expect(1); - - var c = async.cargo(function (tasks, callback) { - callback(); - }, 3); - - var drainCounter = 0; - c.drain = function () { - drainCounter++; - }; - - for(var i = 0; i < 10; i++){ - c.push(i); - } - - setTimeout(function(){ - test.equal(drainCounter, 1); - test.done(); - }, 500); -}, - - 'drain twice': function (test) { - test.expect(1); - - var c = async.cargo(function (tasks, callback) { - callback(); - }, 3); - - var loadCargo = function(){ - for(var i = 0; i < 10; i++){ - c.push(i); - } - }; - - var drainCounter = 0; - c.drain = function () { - drainCounter++; - }; - - loadCargo(); - setTimeout(loadCargo, 500); - - setTimeout(function(){ - test.equal(drainCounter, 2); - test.done(); - }, 1000); -}, - - 'events': function(test) { - test.expect(4); - var calls = []; - var q = async.cargo(function(task, cb) { - // nop - calls.push('process ' + task); - async.setImmediate(cb); - }, 1); - q.concurrency = 3; - - q.saturated = function() { - test.ok(q.length() == 3, 'cargo should be saturated now'); - calls.push('saturated'); - }; - q.empty = function() { - test.ok(q.length() === 0, 'cargo should be empty now'); - calls.push('empty'); - }; - q.drain = function() { - test.ok( - q.length() === 0 && q.running() === 0, - 'cargo should be empty now and no more workers should be running' - ); - calls.push('drain'); - test.same(calls, [ - 'saturated', - 'process foo', - 'process bar', - 'process zoo', - 'foo cb', - 'process poo', - 'bar cb', - 'empty', - 'process moo', - 'zoo cb', - 'poo cb', - 'moo cb', - 'drain' - ]); - test.done(); - }; - q.push('foo', function () {calls.push('foo cb');}); - q.push('bar', function () {calls.push('bar cb');}); - q.push('zoo', function () {calls.push('zoo cb');}); - q.push('poo', function () {calls.push('poo cb');}); - q.push('moo', function () {calls.push('moo cb');}); -}, - - 'expose payload': function (test) { - test.expect(5); - var called_once = false; - var cargo= async.cargo(function(tasks, cb) { - if (!called_once) { - test.equal(cargo.payload, 1); - test.ok(tasks.length === 1, 'should start with payload = 1'); - } else { - test.equal(cargo.payload, 2); - test.ok(tasks.length === 2, 'next call shold have payload = 2'); - } - called_once = true; - setTimeout(cb, 25); - }, 1); - - cargo.drain = function () { - test.done(); - }; - - test.equals(cargo.payload, 1); - - cargo.push([1, 2, 3]); - - setTimeout(function () { - cargo.payload = 2; - }, 15); -} - -}; - - - exports['memoize'] = { 'memoize': function (test) { From 1dc32770eb92c23347a9603196aa9fad6921635f Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Tue, 22 Mar 2016 16:02:31 -0700 Subject: [PATCH 6/6] fix cargo events test --- mocha_test/cargo.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mocha_test/cargo.js b/mocha_test/cargo.js index ece80b6ae..b2980a2ee 100644 --- a/mocha_test/cargo.js +++ b/mocha_test/cargo.js @@ -195,7 +195,7 @@ describe('cargo', function () { q.concurrency = 3; q.saturated = function() { - assert(q.length() == 3, 'cargo should be saturated now'); + assert(q.running() == 3, 'cargo should be saturated now'); calls.push('saturated'); }; q.empty = function() { @@ -209,14 +209,16 @@ describe('cargo', function () { ); calls.push('drain'); expect(calls).to.eql([ - 'saturated', 'process foo', 'process bar', + 'saturated', 'process zoo', 'foo cb', + 'saturated', 'process poo', 'bar cb', 'empty', + 'saturated', 'process moo', 'zoo cb', 'poo cb',