Skip to content

Commit

Permalink
refactor: Use finished to check streams
Browse files Browse the repository at this point in the history
  • Loading branch information
fb55 committed Mar 16, 2022
1 parent e41d774 commit 3c01e78
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 115 deletions.
20 changes: 9 additions & 11 deletions bench/memory/sax-parser.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { readFile } from 'node:fs/promises';
import format from 'human-format';
import promisifyEvent from 'promisify-event';
import memwatch from '@airbnb/node-memwatch';
import { SAXParser } from '../../packages/parse5-sax-parser/dist/index.js';
import { finished } from 'node:stream/promises';

main();

Expand All @@ -18,18 +18,16 @@ async function main() {
maxMemUsage = Math.max(maxMemUsage, stats.used_heap_size);
});

const statsPromise = new Promise((resolve) => memwatch.once('stats', resolve));

startDate = new Date();

const parserPromise = parse().then((dataSize) => {
parsedDataSize = dataSize;
endDate = new Date();
heapDiff = heapDiffMeasurement.end();
});
parsedDataSize = await parse();
endDate = new Date();
heapDiff = heapDiffMeasurement.end();

await Promise.all([
parserPromise,
promisifyEvent(memwatch, 'stats'), // NOTE: we need at least one `stats` result
]);
// NOTE: we need at least one `stats` result to get maxMemUsage
await statsPromise;

printResults(parsedDataSize, startDate, endDate, heapDiff, maxMemUsage);
}
Expand All @@ -46,7 +44,7 @@ async function parse() {

stream.end();

await promisifyEvent(stream, 'finish');
await finished(stream);

return parsedDataSize;
}
Expand Down
42 changes: 18 additions & 24 deletions bench/perf/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { treeAdapters, WritableStreamStub } from 'parse5-test-utils/dist/common.
import * as parse5 from '../../packages/parse5/dist/index.js';
import { ParserStream as parse5Stream } from '../../packages/parse5-parser-stream/dist/index.js';
import * as parse5Upstream from 'parse5';
import { finished } from 'node:stream/promises';

const hugePagePath = new URL('../../test/data/huge-page/huge-page.html', import.meta.url);
const treeConstructionPath = new URL('../../test/data/html5lib-tests/tree-construction', import.meta.url);
Expand All @@ -21,7 +22,7 @@ global.upstreamParser = parse5Upstream;
global.hugePage = readFileSync(hugePagePath).toString();

// Micro data
global.microTests = loadTreeConstructionTestData([treeConstructionPath], treeAdapters.default)
global.microTests = loadTreeConstructionTestData(treeConstructionPath, treeAdapters.default)
.filter(
(test) =>
//NOTE: this test caused a stack overflow in parse5 v1.x
Expand Down Expand Up @@ -107,35 +108,28 @@ runBench({
name: 'parse5 regression benchmark - STREAM',
defer: true,
workingCopyFn: async (deferred) => {
const parsePromises = files.map(
(fileName) =>
new Promise((resolve) => {
const stream = createReadStream(fileName, 'utf8');
const parserStream = new WorkingCopyParserStream();
const parsePromises = files.map((fileName) => {
const stream = createReadStream(fileName, 'utf8');
const parserStream = new WorkingCopyParserStream();

stream.pipe(parserStream);
parserStream.on('finish', resolve);
})
);
stream.pipe(parserStream);
return finished(parserStream);
});

await Promise.all(parsePromises);
deferred.resolve();
},
upstreamFn: async (deferred) => {
const parsePromises = files.map(
(fileName) =>
new Promise((resolve) => {
const stream = createReadStream(fileName, 'utf8');
const writable = new WritableStreamStub();

writable.on('finish', () => {
upstreamParser.parse(writable.writtenData);
resolve();
});

stream.pipe(writable);
})
);
const parsePromises = files.map(async (fileName) => {
const stream = createReadStream(fileName, 'utf8');
const writable = new WritableStreamStub();

stream.pipe(writable);

await finished(writable);

upstreamParser.parse(writable.writtenData);
});

await Promise.all(parsePromises);
deferred.resolve();
Expand Down
28 changes: 11 additions & 17 deletions packages/parse5-html-rewriting-stream/test/rewriting-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { outdent } from 'outdent';
import { RewritingStream } from '../lib/index.js';
import { loadSAXParserTestData } from 'parse5-test-utils/utils/load-sax-parser-test-data.js';
import { getStringDiffMsg, writeChunkedToStream, WritableStreamStub } from 'parse5-test-utils/utils/common.js';
import { finished } from 'node:stream';
import { finished } from 'node:stream/promises';

const srcHtml = outdent`
<!DOCTYPE html "">
Expand Down Expand Up @@ -32,23 +32,18 @@ function createRewriterTest({
expected: string;
assignTokenHandlers?: (rewriter: RewritingStream) => void;
}) {
return (done: (err?: unknown) => void): void => {
return async (): Promise<void> => {
const rewriter = new RewritingStream();
const writable = new WritableStreamStub();

finished(writable, () => {
try {
assert.ok(writable.writtenData === expected, getStringDiffMsg(writable.writtenData, expected));
done();
} catch (error) {
done(error);
}
});

rewriter.pipe(writable);

assignTokenHandlers(rewriter);
writeChunkedToStream(src, rewriter);

await finished(writable);

assert.ok(writable.writtenData === expected, getStringDiffMsg(writable.writtenData, expected));
};
}

Expand Down Expand Up @@ -289,7 +284,7 @@ describe('RewritingStream', () => {
})
);

it('Last text chunk must be flushed (GH-271)', (done) => {
it('Last text chunk must be flushed (GH-271)', async () => {
const parser = new RewritingStream();
let foundText = false;

Expand All @@ -298,13 +293,12 @@ describe('RewritingStream', () => {
assert.strictEqual(text, 'text');
});

parser.once('finish', () => {
assert.ok(foundText);
done();
});

parser.write('text');
parser.end();

await finished(parser);

assert.ok(foundText);
});

it('Should not accept binary input (GH-269)', () => {
Expand Down
15 changes: 7 additions & 8 deletions packages/parse5-parser-stream/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import type { DefaultTreeAdapterMap } from 'parse5/dist/tree-adapters/default.js
* ```js
* const ParserStream = require('parse5-parser-stream');
* const http = require('http');
* const { finished } = require('node:stream');
*
* // Fetch the page content and obtain it's <head> node
* http.get('http://inikulin.github.io/parse5/', res => {
* const parser = new ParserStream();
*
* parser.once('finish', () => {
* finished(parser, () => {
* console.log(parser.document.childNodes[1].childNodes[0].tagName); //> 'head'
* });
*
Expand All @@ -27,7 +28,6 @@ import type { DefaultTreeAdapterMap } from 'parse5/dist/tree-adapters/default.js
*
*/
export class ParserStream<T extends TreeAdapterTypeMap = DefaultTreeAdapterMap> extends Writable {
private lastChunkWritten = false;
private writeCallback: null | (() => void) = null;
private pausedByScript = false;

Expand All @@ -53,16 +53,15 @@ export class ParserStream<T extends TreeAdapterTypeMap = DefaultTreeAdapterMap>
}

this.writeCallback = callback;
this.parser.tokenizer.write(chunk, this.lastChunkWritten);
this.parser.tokenizer.write(chunk, false);
this._runParsingLoop();
}

// TODO [engine:node@>=16]: Due to issues with Node < 16, we are overriding `end` instead of `_final`.
override _final(callback: (error?: Error | null) => void): void {
this.writeCallback = callback;

// eslint-disable-next-line @typescript-eslint/no-explicit-any
override end(chunk?: any, encoding?: any, callback?: any): any {
this.lastChunkWritten = true;
super.end(chunk || '', encoding, callback);
this.parser.tokenizer.write('', true);
this._runParsingLoop();
}

//Scriptable parser implementation
Expand Down
12 changes: 7 additions & 5 deletions packages/parse5-parser-stream/test/parser-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ import * as assert from 'node:assert';
import { ParserStream } from '../lib/index.js';
import { generateParsingTests } from 'parse5-test-utils/utils/generate-parsing-tests.js';
import { parseChunked } from './utils/parse-chunked.js';
import { finished } from 'node:stream/promises';

generateParsingTests('ParserStream', 'ParserStream', { skipFragments: true }, (test, opts) =>
parseChunked(test.input, opts)
);

describe('ParserStream', () => {
it('Fix empty stream parsing with ParserStream (GH-196)', (done) => {
const parser = new ParserStream().once('finish', () => {
assert.ok(parser.document.childNodes.length > 0);
done();
});
it('Fix empty stream parsing with ParserStream (GH-196)', async () => {
const parser = new ParserStream();

parser.end();

await finished(parser);

assert.ok(parser.document.childNodes.length > 0);
});

it('Should not accept binary input (GH-269)', () => {
Expand Down
41 changes: 19 additions & 22 deletions packages/parse5-parser-stream/test/scripting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ParserStream } from '../lib/index.js';
import { generateParsingTests } from 'parse5-test-utils/utils/generate-parsing-tests.js';
import { makeChunks, generateTestsForEachTreeAdapter } from 'parse5-test-utils/utils/common.js';
import { runInNewContext } from 'node:vm';
import { finished } from 'node:stream/promises';

function pause(): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, 5));
Expand All @@ -22,23 +23,19 @@ generateParsingTests(
const parser = new ParserStream(opts);
const { document } = parser;

const completionPromise = new Promise<{ node: typeof document }>((resolve, reject) => {
parser.once('finish', () => resolve({ node: document }));
parser.on('script', async (scriptElement, documentWrite, resume) => {
const scriptTextNode = opts.treeAdapter.getChildNodes(scriptElement)[0];
const script = scriptTextNode ? opts.treeAdapter.getTextNodeContent(scriptTextNode) : '';

parser.on('script', async (scriptElement, documentWrite, resume) => {
const scriptTextNode = opts.treeAdapter.getChildNodes(scriptElement)[0];
const script = scriptTextNode ? opts.treeAdapter.getTextNodeContent(scriptTextNode) : '';

//NOTE: emulate postponed script execution
await pause();
//NOTE: emulate postponed script execution
await pause();

try {
runInNewContext(script, { document: { write: documentWrite } });
resume();
} catch (error) {
reject(error);
}
});
try {
runInNewContext(script, { document: { write: documentWrite } });
resume();
} catch (error) {
parser.emit('error', error);
}
});

//NOTE: emulate async input stream behavior
Expand All @@ -49,7 +46,9 @@ generateParsingTests(

parser.end();

return completionPromise;
await finished(parser);

return { node: document };
}
);

Expand All @@ -64,16 +63,14 @@ generateTestsForEachTreeAdapter('ParserStream', (treeAdapter) => {
process.nextTick(done);
});

test('Regression - Parsing loop lock causes accidental hang ups (GH-101)', (done) => {
test('Regression - Parsing loop lock causes accidental hang ups (GH-101)', () => {
const parser = new ParserStream({ treeAdapter });

parser.once('finish', () => done());

parser.on('script', (_scriptElement, _documentWrite, resume) => {
process.nextTick(() => resume());
});
parser.on('script', (_scriptElement, _documentWrite, resume) => process.nextTick(resume));

parser.write('<script>yo</script>');
parser.end('dawg');

return finished(parser);
});
});
3 changes: 2 additions & 1 deletion packages/parse5-plain-text-conversion-stream/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import type { TreeAdapterTypeMap } from 'parse5/dist/tree-adapters/interface.js'
* ```js
* const PlainTextConversionStream = require('parse5-plain-text-conversion-stream');
* const fs = require('fs');
* const { finished } = require('node:stream');
*
* const file = fs.createReadStream('war_and_peace.txt');
* const converter = new PlainTextConversionStream();
*
* converter.once('finish', () => {
* finished(converter, () => {
* console.log(converter.document.childNodes[1].childNodes[0].tagName); //> 'head'
* });
*
Expand Down
Loading

0 comments on commit 3c01e78

Please sign in to comment.