From 1213de8f032432ac09dd34861446b91ae85220ef Mon Sep 17 00:00:00 2001 From: "S.A.N" Date: Wed, 20 Nov 2024 10:46:37 +0100 Subject: [PATCH] feat(csv-parse): implement TransformStream (#445) * feat(csv-parse): implement TransformStream * feat(csv-parse): add test api.web_stream and fix controller.terminate * feat(csv-parse): added errors handle to TransformStream * test(csv-parse): error thrown by webstream * build(csv-parse): exclude web_stream ts test from ci in node 14 * fix(csv-parse): satisfy test and re-habilitate node stream import --------- Co-authored-by: David Worms --- packages/csv-parse/dist/esm/stream.d.ts | 11 +++++ packages/csv-parse/lib/stream.d.ts | 11 +++++ packages/csv-parse/lib/stream.js | 58 +++++++++++++---------- packages/csv-parse/package.json | 12 ++++- packages/csv-parse/test/api.web_stream.ts | 45 ++++++++++++++++++ 5 files changed, 110 insertions(+), 27 deletions(-) create mode 100644 packages/csv-parse/dist/esm/stream.d.ts create mode 100644 packages/csv-parse/lib/stream.d.ts create mode 100644 packages/csv-parse/test/api.web_stream.ts diff --git a/packages/csv-parse/dist/esm/stream.d.ts b/packages/csv-parse/dist/esm/stream.d.ts new file mode 100644 index 00000000..f3b4068f --- /dev/null +++ b/packages/csv-parse/dist/esm/stream.d.ts @@ -0,0 +1,11 @@ + +import { Options } from './index.js'; + +declare function parse(options?: Options): TransformStream; +// export default parse; +export { parse }; + +export { + CastingContext, CastingFunction, CastingDateFunction, + ColumnOption, Options, Info, CsvErrorCode, CsvError +} from './index.js'; diff --git a/packages/csv-parse/lib/stream.d.ts b/packages/csv-parse/lib/stream.d.ts new file mode 100644 index 00000000..f3b4068f --- /dev/null +++ b/packages/csv-parse/lib/stream.d.ts @@ -0,0 +1,11 @@ + +import { Options } from './index.js'; + +declare function parse(options?: Options): TransformStream; +// export default parse; +export { parse }; + +export { + CastingContext, CastingFunction, CastingDateFunction, + ColumnOption, Options, Info, CsvErrorCode, CsvError +} from './index.js'; diff --git a/packages/csv-parse/lib/stream.js b/packages/csv-parse/lib/stream.js index 8594af1b..d50d3e10 100644 --- a/packages/csv-parse/lib/stream.js +++ b/packages/csv-parse/lib/stream.js @@ -1,34 +1,40 @@ -import { TransformStream } from "node:stream/web"; +import { TransformStream, CountQueuingStrategy } from "node:stream/web"; import { transform } from "./api/index.js"; const parse = (opts) => { const api = transform(opts); - return new TransformStream({ - async transform(chunk, controller) { - api.parse( - chunk, - false, - (record) => { - controller.enqueue(record); - }, - () => { - controller.close(); - }, - ); - }, - async flush(controller) { - api.parse( - undefined, - true, - (record) => { - controller.enqueue(record); - }, - () => { - controller.close(); - }, - ); + let controller; + const enqueue = (record) => { + controller.enqueue(record); + }; + const terminate = () => { + controller.terminate(); + }; + return new TransformStream( + { + start(ctr) { + controller = ctr; + }, + transform(chunk) { + const error = api.parse(chunk, false, enqueue, terminate); + + if (error) { + controller.error(error); + throw error; + } + }, + flush() { + const error = api.parse(undefined, true, enqueue, terminate); + + if (error) { + controller.error(error); + throw error; + } + }, }, - }); + new CountQueuingStrategy({ highWaterMark: 1024 }), + new CountQueuingStrategy({ highWaterMark: 1024 }), + ); }; export { parse }; diff --git a/packages/csv-parse/package.json b/packages/csv-parse/package.json index 68b83988..5e6fd678 100644 --- a/packages/csv-parse/package.json +++ b/packages/csv-parse/package.json @@ -52,6 +52,16 @@ "default": "./dist/cjs/sync.cjs" } }, + "./stream": { + "import": { + "types": "./lib/stream.d.ts", + "default": "./lib/stream.js" + }, + "require": { + "types": "./dist/cjs/stream.d.cts", + "default": "./dist/cjs/stream.cjs" + } + }, "./browser/esm": { "types": "./dist/esm/index.d.ts", "default": "./dist/esm/index.js" @@ -123,7 +133,7 @@ "preversion": "npm run build && git add dist", "pretest": "npm run build", "test": "mocha 'test/**/*.{coffee,ts}'", - "test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'" + "test:legacy": "mocha --ignore test/api.web_stream.coffee --ignore test/api.web_stream.ts --ignore test/api.stream.finished.coffee --ignore test/api.stream.iterator.coffee --loader=./test/loaders/legacy/all.js 'test/**/*.{coffee,ts}'" }, "type": "module", "types": "dist/esm/index.d.ts", diff --git a/packages/csv-parse/test/api.web_stream.ts b/packages/csv-parse/test/api.web_stream.ts new file mode 100644 index 00000000..2337f481 --- /dev/null +++ b/packages/csv-parse/test/api.web_stream.ts @@ -0,0 +1,45 @@ +import 'should' +import {parse as parseStream} from '../lib/stream.js' +import { CsvError } from '../lib/index.js' + +describe('API Web Stream', () => { + + describe('stream/web/TransformStream', () => { + + it('simple parse', async () => { + const stream = parseStream(); + const writer = stream.writable.getWriter(); + const reader = stream.readable.getReader(); + await writer.write(Buffer.from("A,B,C\nD,E,F")); + await writer.close(); + await reader.read().should.finally.eql({ + done: false, + value: ['A', 'B', 'C'], + }); + await reader.read().should.finally.eql({ + done: false, + value: ['D', 'E', 'F'], + }); + await reader.read().should.finally.eql({ + done: true, + value: undefined, + }); + }) + + it("cat error parse", async function () { + const stream = parseStream(); + const writer = stream.writable.getWriter(); + try { + await writer.write(Buffer.from("A,B,C\nD,E")); + await writer.close(); + throw Error("Shall not be called"); + } catch (err) { + if (!(err instanceof CsvError)) { + throw Error("Invalid error type"); + } + err.code.should.eql("CSV_RECORD_INCONSISTENT_FIELDS_LENGTH"); + } + }); + + }) +})