-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: cache integrity and size events so late listeners still get them (…
- Loading branch information
Showing
4 changed files
with
87 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
'use strict' | ||
|
||
const MinipassPipeline = require('minipass-pipeline') | ||
|
||
class CachingMinipassPipeline extends MinipassPipeline { | ||
#events = [] | ||
#data = new Map() | ||
|
||
constructor (opts, ...streams) { | ||
// CRITICAL: do NOT pass the streams to the call to super(), this will start | ||
// the flow of data and potentially cause the events we need to catch to emit | ||
// before we've finished our own setup. instead we call super() with no args, | ||
// finish our setup, and then push the streams into ourselves to start the | ||
// data flow | ||
super() | ||
this.#events = opts.events | ||
|
||
/* istanbul ignore next - coverage disabled because this is pointless to test here */ | ||
if (streams.length) { | ||
this.push(...streams) | ||
} | ||
} | ||
|
||
on (event, handler) { | ||
if (this.#events.includes(event) && this.#data.has(event)) { | ||
return handler(...this.#data.get(event)) | ||
} | ||
|
||
return super.on(event, handler) | ||
} | ||
|
||
emit (event, ...data) { | ||
if (this.#events.includes(event)) { | ||
this.#data.set(event, data) | ||
} | ||
|
||
return super.emit(event, ...data) | ||
} | ||
} | ||
|
||
module.exports = CachingMinipassPipeline |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
'use strict' | ||
|
||
const events = require('events') | ||
const ssri = require('ssri') | ||
const t = require('tap') | ||
|
||
const CachingMinipassPipeline = require('../lib/pipeline.js') | ||
|
||
t.test('caches events and emits them again for new listeners', async (t) => { | ||
const INTEGRITY = ssri.fromData('foobarbazbuzz') | ||
const integrityStream = ssri.integrityStream() | ||
const pipeline = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, integrityStream) | ||
integrityStream.on('size', s => pipeline.emit('size', s)) | ||
integrityStream.on('integrity', i => pipeline.emit('integrity', i)) | ||
|
||
pipeline.write('foobarbazbuzz') | ||
pipeline.resume() | ||
// delay ending the stream so the early listeners will get the first events | ||
setImmediate(() => pipeline.end()) | ||
|
||
const [earlySize, earlyIntegrity] = await Promise.all([ | ||
events.once(pipeline, 'size').then(res => res[0]), | ||
events.once(pipeline, 'integrity').then(res => res[0]), | ||
]) | ||
|
||
// now wait for the stream itself to have ended | ||
await pipeline.promise() | ||
|
||
// and add new listeners | ||
const [lateSize, lateIntegrity] = await Promise.all([ | ||
events.once(pipeline, 'size').then(res => res[0]), | ||
events.once(pipeline, 'integrity').then(res => res[0]), | ||
]) | ||
|
||
// and make sure we got the same results | ||
t.equal(earlySize, 13, 'got the right size') | ||
t.same(earlyIntegrity, INTEGRITY, 'got the right integrity') | ||
t.same(earlySize, lateSize, 'got the same size early and late') | ||
t.same(earlyIntegrity, lateIntegrity, 'got the same integrity early and late') | ||
}) |