diff --git a/__tests__/integration/abort_request.test.ts b/__tests__/integration/abort_request.test.ts index 91ac99d8..ec07504b 100644 --- a/__tests__/integration/abort_request.test.ts +++ b/__tests__/integration/abort_request.test.ts @@ -63,7 +63,7 @@ describe('abort request', () => { .then(async (rows) => { const stream = rows.stream() for await (const chunk of stream) { - const [[number]] = chunk.json() + const [[number]] = chunk.json<[[string]]>() // abort when reach number 3 if (number === '3') { controller.abort() @@ -86,21 +86,14 @@ describe('abort request', () => { .then(async function (rows) { const stream = rows.stream() for await (const chunk of stream) { - const [[number]] = chunk.json() + const [[number]] = chunk.json<[[string]]>() // abort when reach number 3 if (number === '3') { - stream.destroy() + await stream.return() } } }) - // There was a breaking change in Node.js 18.x behavior - if (process.version.startsWith('v18')) { - await expect(selectPromise).rejects.toMatchObject({ - message: 'Premature close', - }) - } else { - expect(await selectPromise).toEqual(undefined) - } + expect(await selectPromise).toEqual(undefined) }) // FIXME: it does not work with ClickHouse Cloud. diff --git a/__tests__/integration/select.test.ts b/__tests__/integration/select.test.ts index cebfe6d4..696a4432 100644 --- a/__tests__/integration/select.test.ts +++ b/__tests__/integration/select.test.ts @@ -1,8 +1,9 @@ -import type Stream from 'stream' import { type ClickHouseClient, type ResponseJSON, type Row } from '../../src' import { createTestClient } from '../utils' -async function rowsValues(stream: Stream.Readable): Promise { +async function rowsValues( + stream: AsyncGenerator +): Promise { const result: any[] = [] for await (const chunk of stream) { result.push((chunk as Row).json()) @@ -10,7 +11,9 @@ async function rowsValues(stream: Stream.Readable): Promise { return result } -async function rowsText(stream: Stream.Readable): Promise { +async function rowsText( + stream: AsyncGenerator +): Promise { const result: string[] = [] for await (const chunk of stream) { result.push((chunk as Row).text()) @@ -47,20 +50,14 @@ describe('select', () => { }) describe('consume the response only once', () => { - async function assertAlreadyConsumed$(fn: () => Promise) { + async function assertAlreadyConsumed(fn: () => Promise) { await expect(fn()).rejects.toMatchObject( expect.objectContaining({ message: 'Stream has been already consumed', }) ) } - function assertAlreadyConsumed(fn: () => T) { - expect(fn).toThrow( - expect.objectContaining({ - message: 'Stream has been already consumed', - }) - ) - } + it('should consume a JSON response only once', async () => { const rows = await client.query({ query: 'SELECT * FROM system.numbers LIMIT 1', @@ -68,9 +65,13 @@ describe('select', () => { }) expect(await rows.json()).toEqual([{ number: '0' }]) // wrap in a func to avoid changing inner "this" - await assertAlreadyConsumed$(() => rows.json()) - await assertAlreadyConsumed$(() => rows.text()) - await assertAlreadyConsumed(() => rows.stream()) + await assertAlreadyConsumed(() => rows.json()) + await assertAlreadyConsumed(() => rows.text()) + await assertAlreadyConsumed(async () => { + for await (const r of rows.stream()) { + r.text() + } + }) }) it('should consume a text response only once', async () => { @@ -80,9 +81,13 @@ describe('select', () => { }) expect(await rows.text()).toEqual('0\n') // wrap in a func to avoid changing inner "this" - await assertAlreadyConsumed$(() => rows.json()) - await assertAlreadyConsumed$(() => rows.text()) - await assertAlreadyConsumed(() => rows.stream()) + await assertAlreadyConsumed(() => rows.json()) + await assertAlreadyConsumed(() => rows.text()) + await assertAlreadyConsumed(async () => { + for await (const r of rows.stream()) { + r.text() + } + }) }) it('should consume a stream response only once', async () => { @@ -96,9 +101,13 @@ describe('select', () => { } expect(result).toEqual('0') // wrap in a func to avoid changing inner "this" - await assertAlreadyConsumed$(() => rows.json()) - await assertAlreadyConsumed$(() => rows.text()) - await assertAlreadyConsumed(() => rows.stream()) + await assertAlreadyConsumed(() => rows.json()) + await assertAlreadyConsumed(() => rows.text()) + await assertAlreadyConsumed(async () => { + for await (const r of rows.stream()) { + r.text() + } + }) }) }) @@ -328,7 +337,7 @@ describe('select', () => { format: 'JSON', }) try { - expect(() => result.stream()).toThrowError( + await expect(async () => result.stream().next()).rejects.toThrowError( 'JSON format is not streamable' ) } finally { @@ -336,27 +345,6 @@ describe('select', () => { } }) - it('can pause response stream', async () => { - const result = await client.query({ - query: 'SELECT number FROM system.numbers LIMIT 10000', - format: 'CSV', - }) - - const stream = result.stream() - - let last = null - let i = 0 - for await (const chunk of stream) { - last = chunk.text() - i++ - if (i % 1000 === 0) { - stream.pause() - setTimeout(() => stream.resume(), 100) - } - } - expect(last).toBe('9999') - }) - describe('text()', () => { it('returns stream of rows in CSV format', async () => { const result = await client.query({ diff --git a/__tests__/integration/streaming_e2e.test.ts b/__tests__/integration/streaming_e2e.test.ts index 31e674ed..d7f9d6f8 100644 --- a/__tests__/integration/streaming_e2e.test.ts +++ b/__tests__/integration/streaming_e2e.test.ts @@ -1,10 +1,9 @@ -import Fs from 'fs' -import Path from 'path' -import Stream from 'stream' -import split from 'split2' +import Stream, { Transform } from 'stream' import { type ClickHouseClient } from '../../src' import { createTestClient, guid } from '../utils' import { createSimpleTable } from './fixtures/simple_table' +import * as fs from 'fs' +import * as Path from 'path' const expected = [ ['0', 'a', [1, 2]], @@ -32,13 +31,27 @@ describe('streaming e2e', () => { __dirname, './fixtures/streaming_e2e_data.ndjson' ) - + const readStream = fs.createReadStream(filename) + const jsonTransform = new Transform({ + transform(data: Buffer, encoding, callback) { + data + .toString(encoding) + .split('\n') + .forEach((line) => { + if (!line.length) { + return + } else { + const json = JSON.parse(line) + this.push(json) + } + }) + callback() + }, + objectMode: true, + }) await client.insert({ table: tableName, - values: Fs.createReadStream(filename).pipe( - // should be removed when "insert" accepts a stream of strings/bytes - split((row: string) => JSON.parse(row)) - ), + values: readStream.pipe(jsonTransform), format: 'JSONCompactEachRow', }) diff --git a/__tests__/unit/rows.test.ts b/__tests__/unit/rows.test.ts index 71766235..40454774 100644 --- a/__tests__/unit/rows.test.ts +++ b/__tests__/unit/rows.test.ts @@ -1,4 +1,6 @@ -import { Row, Rows } from '../../src' +import type { Row } from '../../src' +import { Rows } from '../../src' +import * as Stream from 'stream' import { Readable } from 'stream' describe('rows', () => { @@ -27,29 +29,38 @@ describe('rows', () => { const rows = makeRows() const stream = rows.stream() - expect(stream.readableEnded).toBeFalsy() - const result = [] for await (const row of stream) { - result.push((row as Row).json()) + result.push(row.json()) } expect(result).toEqual(expectedJson) - expect(stream.readableEnded).toBeTruthy() + expect((await stream.next()).done).toBeTruthy() - expect(() => rows.stream()).toThrowError(err) + await expect(async () => { + for await (const r of rows.stream()) { + r.text() + } + }).rejects.toThrowError(err) await expect(rows.json()).rejects.toThrowError(err) await expect(rows.text()).rejects.toThrowError(err) }) it('should be able to call Row.text and Row.json multiple times', async () => { - const chunk = '{"foo":"bar"}' - const obj = { foo: 'bar' } - const row = new Row(chunk, 'JSON') - expect(row.text()).toEqual(chunk) - expect(row.text()).toEqual(chunk) - expect(row.json()).toEqual(obj) - expect(row.json()).toEqual(obj) + const rows = new Rows( + Stream.Readable.from([Buffer.from('{"foo":"bar"}\n')]), + 'JSONEachRow' + ) + const singleRows: Row[] = [] + for await (const r of rows.stream()) { + singleRows.push(r) + } + expect(singleRows).toHaveLength(1) + const [row] = singleRows + expect(row.text()).toEqual('{"foo":"bar"}') + expect(row.text()).toEqual('{"foo":"bar"}') + expect(row.json()).toEqual({ foo: 'bar' }) + expect(row.json()).toEqual({ foo: 'bar' }) }) function makeRows() { diff --git a/__tests__/unit/schema_select_result.test.ts b/__tests__/unit/schema_select_result.test.ts index 02b1093a..547f6f96 100644 --- a/__tests__/unit/schema_select_result.test.ts +++ b/__tests__/unit/schema_select_result.test.ts @@ -27,7 +27,10 @@ describe('schema select result', () => { .spyOn(client, 'query') .mockResolvedValueOnce( new Rows( - Readable.from(['{"valid":"json"}\n', 'invalid_json}\n']), + Readable.from([ + Buffer.from('{"valid":"json"}\n'), + Buffer.from('invalid_json}\n'), + ]), 'JSONEachRow' ) ) diff --git a/examples/insert_file_stream_ndjson.ts b/examples/insert_file_stream_ndjson.ts index 1823c1a8..2aa5f274 100644 --- a/examples/insert_file_stream_ndjson.ts +++ b/examples/insert_file_stream_ndjson.ts @@ -1,7 +1,7 @@ import { createClient } from '@clickhouse/client' import Path from 'path' -import Fs from 'fs' -import split from 'split2' +import fs from 'fs' +import { Transform } from 'stream' void (async () => { const client = createClient() @@ -23,12 +23,27 @@ void (async () => { process.cwd(), './examples/resources/data.ndjson' ) - + const readStream = fs.createReadStream(filename) + const jsonTransform = new Transform({ + transform(data: Buffer, encoding, callback) { + data + .toString(encoding) + .split('\n') + .forEach((line) => { + if (!line.length) { + return + } else { + const json = JSON.parse(line) + this.push(json) + } + }) + callback() + }, + objectMode: true, + }) await client.insert({ table: tableName, - values: Fs.createReadStream(filename).pipe( - split((row: string) => JSON.parse(row)) - ), + values: readStream.pipe(jsonTransform), format: 'JSONCompactEachRow', }) diff --git a/package.json b/package.json index 285eddc9..ef43087e 100644 --- a/package.json +++ b/package.json @@ -36,13 +36,11 @@ "dist" ], "dependencies": { - "node-abort-controller": "^3.0.1", - "split2": "^4.1.0" + "node-abort-controller": "^3.0.1" }, "devDependencies": { "@types/jest": "^29.0.2", "@types/node": "^18.7.18", - "@types/split2": "^3.2.1", "@types/uuid": "^8.3.4", "@typescript-eslint/eslint-plugin": "^5.37.0", "@typescript-eslint/parser": "^5.37.0", diff --git a/src/rows.ts b/src/rows.ts index a75d3084..c3f79dbb 100644 --- a/src/rows.ts +++ b/src/rows.ts @@ -1,5 +1,4 @@ -import Stream from 'stream' -import split from 'split2' +import type Stream from 'stream' import { getAsText } from './utils' import { type DataFormat, decode, validateStreamFormat } from './data_formatter' @@ -39,33 +38,52 @@ export class Rows { } /** - * Returns a readable stream of {@link Row}s for responses + * Returns an async iterator of {@link Row}s for responses * in {@link StreamableDataFormat} format. * - * The method will throw if called on a response in non-streamable format, - * and if the underlying stream was already consumed - * by calling the other methods + * If selected format is non-streamable, + * or the underlying stream was already consumed, + * it will throw when accessing the next element */ - stream(): Stream.Readable { - // If the underlying stream has already ended by calling `text` or `json`, - // Stream.pipeline will create a new empty stream - // but without "readableEnded" flag set to true + async *stream(): AsyncGenerator { if (this._stream.readableEnded) { throw Error(streamAlreadyConsumedMessage) } - validateStreamFormat(this.format) + const textDecoder = new TextDecoder() + let decodedChunk = '' + for await (const chunk of this._stream) { + decodedChunk += textDecoder.decode(chunk, { stream: true }) + let idx = 0 + while (true) { + idx = decodedChunk.indexOf('\n') + if (idx !== -1) { + const line = decodedChunk.slice(0, idx) + decodedChunk = decodedChunk.slice(idx + 1) + yield { + /** + * Returns a string representation of a row. + */ + text(): string { + return line + }, - return Stream.pipeline( - this._stream, - // only JSON-based format are supported at the moment - split((row: string) => new Row(row, 'JSON')), - function pipelineCb(err) { - if (err) { - console.error(err) + /** + * Returns a JSON representation of a row. + * The method will throw if called on a response in JSON incompatible format. + * + * It is safe to call this method multiple times. + */ + json(): T { + return decode(line, 'JSON') + }, + } + } else { + break } } - ) + } + textDecoder.decode() // flush } close() { @@ -73,28 +91,9 @@ export class Rows { } } -export class Row { - constructor( - private readonly chunk: string, - private readonly format: DataFormat - ) {} - - /** - * Returns a string representation of a row. - */ - text(): string { - return this.chunk - } - - /** - * Returns a JSON representation of a row. - * The method will throw if called on a response in JSON incompatible format. - * - * It is safe to call this method multiple times. - */ - json(): T { - return decode(this.text(), this.format) - } +export interface Row { + text(): string + json(): T } const streamAlreadyConsumedMessage = 'Stream has been already consumed'