Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BREAKING(csv): rename CsvStream to CsvParseStream #3287

Merged
merged 3 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions csv/csv_parse_stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.

import {
convertRowToObject,
defaultReadOptions,
type LineReader,
parseRecord,
type ParseResult,
type ReadOptions,
} from "../csv/_io.ts";
import { TextDelimiterStream } from "../streams/text_delimiter_stream.ts";

export interface CsvParseStreamOptions extends ReadOptions {
/**
* If you provide `skipFirstRow: true` and `columns`, the first line will be
* skipped.
* If you provide `skipFirstRow: true` but not `columns`, the first line will
* be skipped and used as header definitions.
*/
skipFirstRow?: boolean;
/** List of names used for header definition. */
columns?: readonly string[];
}

class StreamLineReader implements LineReader {
#reader: ReadableStreamDefaultReader<string>;
#done = false;
constructor(reader: ReadableStreamDefaultReader<string>) {
this.#reader = reader;
}

async readLine(): Promise<string | null> {
const { value, done } = await this.#reader.read();
if (done) {
this.#done = true;
return null;
} else {
// NOTE: Remove trailing CR for compatibility with golang's `encoding/csv`
return stripLastCR(value!);
}
}

isEOF(): Promise<boolean> {
return Promise.resolve(this.#done);
}

cancel() {
this.#reader.cancel();
}
}

function stripLastCR(s: string): string {
return s.endsWith("\r") ? s.slice(0, -1) : s;
}

type RowType<T> = T extends undefined ? string[]
: ParseResult<CsvParseStreamOptions, T>[number];

/**
* Read data from a CSV-encoded stream or file.
* Provides an auto/custom mapper for columns.
*
* A `CsvParseStream` expects input conforming to
* [RFC 4180](https://rfc-editor.org/rfc/rfc4180.html).
*
* @example
* ```ts
* import { CsvParseStream } from "https://deno.land/std@$STD_VERSION/csv/csv_parse_stream.ts";
* const res = await fetch("https://example.com/data.csv");
* const parts = res.body!
* .pipeThrough(new TextDecoderStream())
* .pipeThrough(new CsvParseStream());
* ```
*/
export class CsvParseStream<
const T extends CsvParseStreamOptions | undefined = undefined,
> implements TransformStream<string, RowType<T>> {
readonly #readable: ReadableStream<
string[] | Record<string, string | unknown>
>;
readonly #options: CsvParseStreamOptions;
readonly #lineReader: StreamLineReader;
readonly #lines: TextDelimiterStream;
#lineIndex = 0;
#isFirstRow = true;

#headers: readonly string[] = [];

constructor(options: T = defaultReadOptions as T) {
this.#options = {
...defaultReadOptions,
...options,
};

this.#lines = new TextDelimiterStream("\n");
this.#lineReader = new StreamLineReader(this.#lines.readable.getReader());
this.#readable = new ReadableStream({
pull: (controller) => this.#pull(controller),
cancel: () => this.#lineReader.cancel(),
});
}

async #pull(
controller: ReadableStreamDefaultController<
string[] | Record<string, string | unknown>
>,
): Promise<void> {
const line = await this.#lineReader.readLine();
if (line === "") {
// Found an empty line
this.#lineIndex++;
return this.#pull(controller);
}
if (line === null) {
// Reached to EOF
controller.close();
this.#lineReader.cancel();
return;
}

const record = await parseRecord(
line,
this.#lineReader,
this.#options,
this.#lineIndex,
);
if (record === null) {
controller.close();
this.#lineReader.cancel();
return;
}

if (this.#isFirstRow) {
this.#isFirstRow = false;
if (this.#options.skipFirstRow || this.#options.columns) {
this.#headers = [];

if (this.#options.skipFirstRow) {
const head = record;
this.#headers = head;
}

if (this.#options.columns) {
this.#headers = this.#options.columns;
}
}

if (this.#options.skipFirstRow) {
return this.#pull(controller);
}
}

this.#lineIndex++;
if (record.length > 0) {
if (this.#options.skipFirstRow || this.#options.columns) {
controller.enqueue(convertRowToObject(
record,
this.#headers,
this.#lineIndex,
));
} else {
controller.enqueue(record);
}
} else {
return this.#pull(controller);
}
}

get readable() {
return this.#readable as ReadableStream<RowType<T>>;
}

get writable(): WritableStream<string> {
return this.#lines.writable;
}
}
55 changes: 28 additions & 27 deletions csv/stream_test.ts → csv/csv_parse_stream_test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { CsvStream } from "./stream.ts";
import type { CsvStreamOptions } from "./stream.ts";
import { CsvParseStream } from "./csv_parse_stream.ts";
import type { CsvParseStreamOptions } from "./csv_parse_stream.ts";
import { ERR_QUOTE, ParseError } from "./_io.ts";
import { readableStreamFromIterable } from "../streams/readable_stream_from_iterable.ts";
import { readableStreamFromReader } from "../streams/readable_stream_from_reader.ts";
Expand All @@ -18,15 +18,15 @@ const testdataDir = join(fromFileUrl(import.meta.url), "../testdata");
const encoder = new TextEncoder();

Deno.test({
name: "[csv/stream] CsvStream should work with Deno.File",
name: "[csv/csv_parse_stream] CsvParseStream should work with Deno.File",
permissions: {
read: [testdataDir],
},
fn: async () => {
const file = await Deno.open(join(testdataDir, "simple.csv"));
const readable = file.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new CsvStream());
.pipeThrough(new CsvParseStream());
const records = [] as Array<Array<string>>;
for await (const record of readable) {
records.push(record);
Expand All @@ -40,15 +40,15 @@ Deno.test({
});

Deno.test({
name: "[csv/stream] CsvStream with invalid csv",
name: "[csv/csv_parse_stream] CsvParseStream with invalid csv",
fn: async () => {
const readable = readableStreamFromIterable([
encoder.encode("id,name\n"),
encoder.encode("\n"),
encoder.encode("1,foo\n"),
encoder.encode('2,"baz\n'),
]).pipeThrough(new TextDecoderStream()).pipeThrough(
new CsvStream(),
new CsvParseStream(),
);
const reader = readable.getReader();
assertEquals(await reader.read(), { done: false, value: ["id", "name"] });
Expand All @@ -63,7 +63,7 @@ Deno.test({
});

Deno.test({
name: "[csv/stream] CsvStream with various inputs",
name: "[csv/csv_parse_stream] CsvParseStream with various inputs",
permissions: "none",
fn: async (t) => {
// These test cases were originally ported from Go:
Expand Down Expand Up @@ -318,7 +318,7 @@ x,,,
];
for (const testCase of testCases) {
await t.step(testCase.name, async () => {
const options: CsvStreamOptions = {};
const options: CsvParseStreamOptions = {};
if (testCase.separator) {
options.separator = testCase.separator;
}
Expand All @@ -332,7 +332,7 @@ x,,,
options.columns = testCase.columns;
}
const readable = createReadableStreamFromString(testCase.input)
.pipeThrough(new CsvStream(options));
.pipeThrough(new CsvParseStream(options));

if (testCase.output) {
const actual = [];
Expand Down Expand Up @@ -371,40 +371,41 @@ export const MyTextDecoderStream = () => {
};

Deno.test({
name: "[csv/stream] cancel CsvStream during iteration does not leak file",
name:
"[csv/csv_parse_stream] cancel CsvParseStream during iteration does not leak file",
permissions: { read: [testdataDir] },
// TODO(kt3k): Enable this test on windows.
// See https://github.com/denoland/deno_std/issues/3160
ignore: Deno.build.os === "windows",
fn: async () => {
const file = await Deno.open(join(testdataDir, "large.csv"));
const readable = file.readable.pipeThrough(MyTextDecoderStream())
.pipeThrough(new CsvStream());
.pipeThrough(new CsvParseStream());
for await (const _record of readable) {
break;
}
},
});

Deno.test({
name: "[csv/stream] correct typing",
name: "[csv/csv_parse_stream] correct typing",
fn() {
// If no option is passed, defaults to ReadableStream<string[]>.
{
const { readable } = new CsvStream();
const { readable } = new CsvParseStream();
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream(undefined);
const { readable } = new CsvParseStream(undefined);
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
// `skipFirstRow` may be `true` or `false`.
// `coloums` may be `undefined` or `string[]`.
// If you don't know exactly what the value of the option is,
// the return type is ReadableStream<string[] | Record<string, string | undefined>>
const options: CsvStreamOptions = {};
const { readable } = new CsvStream(options);
const options: CsvParseStreamOptions = {};
const { readable } = new CsvParseStream(options);
type _ = AssertTrue<
IsExact<
typeof readable,
Expand All @@ -413,21 +414,21 @@ Deno.test({
>;
}
{
const { readable } = new CsvStream({});
const { readable } = new CsvParseStream({});
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}

// skipFirstRow option
{
const { readable } = new CsvStream({ skipFirstRow: undefined });
const { readable } = new CsvParseStream({ skipFirstRow: undefined });
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ skipFirstRow: false });
const { readable } = new CsvParseStream({ skipFirstRow: false });
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ skipFirstRow: true });
const { readable } = new CsvParseStream({ skipFirstRow: true });
type _ = AssertTrue<
IsExact<
typeof readable,
Expand All @@ -438,17 +439,17 @@ Deno.test({

// columns option
{
const { readable } = new CsvStream({ columns: undefined });
const { readable } = new CsvParseStream({ columns: undefined });
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({ columns: ["aaa", "bbb"] });
const { readable } = new CsvParseStream({ columns: ["aaa", "bbb"] });
type _ = AssertTrue<
IsExact<typeof readable, ReadableStream<Record<"aaa" | "bbb", string>>>
>;
}
{
const { readable } = new CsvStream({ columns: ["aaa"] as string[] });
const { readable } = new CsvParseStream({ columns: ["aaa"] as string[] });
type _ = AssertTrue<
IsExact<
typeof readable,
Expand All @@ -459,14 +460,14 @@ Deno.test({

// skipFirstRow option + columns option
{
const { readable } = new CsvStream({
const { readable } = new CsvParseStream({
skipFirstRow: false,
columns: undefined,
});
type _ = AssertTrue<IsExact<typeof readable, ReadableStream<string[]>>>;
}
{
const { readable } = new CsvStream({
const { readable } = new CsvParseStream({
skipFirstRow: true,
columns: undefined,
});
Expand All @@ -478,7 +479,7 @@ Deno.test({
>;
}
{
const { readable } = new CsvStream({
const { readable } = new CsvParseStream({
skipFirstRow: false,
columns: ["aaa"],
});
Expand All @@ -487,7 +488,7 @@ Deno.test({
>;
}
{
const { readable } = new CsvStream({
const { readable } = new CsvParseStream({
skipFirstRow: true,
columns: ["aaa"],
});
Expand Down
1 change: 1 addition & 0 deletions csv/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@
export * from "./stringify.ts";
export * from "./parse.ts";
export * from "./stream.ts";
export * from "./csv_parse_stream.ts";
export * from "./csv_stringify_stream.ts";
Loading