diff --git a/index.js b/index.js index cbf3970..a0e87b2 100644 --- a/index.js +++ b/index.js @@ -36,26 +36,35 @@ module.exports = function (mapper, opts) { function queueData (data, number) { var nextToWrite = lastWritten + 1 - if (number === nextToWrite) { - // If it's next, and its not undefined write it - if (data !== undefined) { - stream.emit.apply(stream, ['data', data]) - } - lastWritten ++ - nextToWrite ++ + if(number === nextToWrite) { + do { + var dataToWrite; + + // write data contents only for the first iteration + if(number === nextToWrite) { + dataToWrite = data + } else { + dataToWrite = writeQueue[nextToWrite] + delete writeQueue[nextToWrite] + } + + // If it's next, and its not undefined write it + if(dataToWrite !== undefined) { + stream.emit.apply(stream, ['data', dataToWrite]) + } + + lastWritten ++ + nextToWrite ++ + outputs ++ + + // If the next value is in the queue, keeping writing from the queue + } while (writeQueue.hasOwnProperty(nextToWrite)) + } else { // Otherwise queue it for later. writeQueue[number] = data } - // If the next value is in the queue, write it - if (writeQueue.hasOwnProperty(nextToWrite)) { - var dataToWrite = writeQueue[nextToWrite] - delete writeQueue[nextToWrite] - return queueData(dataToWrite, nextToWrite) - } - - outputs ++ if(inputs === outputs) { if(paused) paused = false, stream.emit('drain') //written all the incoming events if(ended) end() @@ -138,7 +147,3 @@ module.exports = function (mapper, opts) { return stream } - - - -