Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove split2, refactor Rows for better performance #108

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions __tests__/integration/abort_request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ describe('abort request', () => {
.then(async (rows) => {
const stream = rows.stream()
for await (const chunk of stream) {
const [[number]] = chunk.json()
const [[number]] = chunk.json<[[string]]>()
// abort when reach number 3
if (number === '3') {
controller.abort()
Expand All @@ -86,10 +86,10 @@ describe('abort request', () => {
.then(async function (rows) {
const stream = rows.stream()
for await (const chunk of stream) {
const [[number]] = chunk.json()
const [[number]] = chunk.json<[[string]]>()
// abort when reach number 3
if (number === '3') {
stream.destroy()
await stream.return()
}
}
})
Expand Down
72 changes: 30 additions & 42 deletions __tests__/integration/select.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import type Stream from 'stream'
import { type ClickHouseClient, type ResponseJSON, type Row } from '../../src'
import { createTestClient } from '../utils'

async function rowsValues(stream: Stream.Readable): Promise<any[]> {
async function rowsValues(
stream: AsyncGenerator<Row, unknown>
): Promise<any[]> {
const result: any[] = []
for await (const chunk of stream) {
result.push((chunk as Row).json())
}
return result
}

async function rowsText(stream: Stream.Readable): Promise<string[]> {
async function rowsText(
stream: AsyncGenerator<Row, unknown>
): Promise<string[]> {
const result: string[] = []
for await (const chunk of stream) {
result.push((chunk as Row).text())
Expand Down Expand Up @@ -47,30 +50,28 @@ describe('select', () => {
})

describe('consume the response only once', () => {
async function assertAlreadyConsumed$<T>(fn: () => Promise<T>) {
async function assertAlreadyConsumed<T>(fn: () => Promise<T>) {
await expect(fn()).rejects.toMatchObject(
expect.objectContaining({
message: 'Stream has been already consumed',
})
)
}
function assertAlreadyConsumed<T>(fn: () => T) {
expect(fn).toThrow(
expect.objectContaining({
message: 'Stream has been already consumed',
})
)
}

it('should consume a JSON response only once', async () => {
const rows = await client.query({
query: 'SELECT * FROM system.numbers LIMIT 1',
format: 'JSONEachRow',
})
expect(await rows.json()).toEqual([{ number: '0' }])
// wrap in a func to avoid changing inner "this"
await assertAlreadyConsumed$(() => rows.json())
await assertAlreadyConsumed$(() => rows.text())
await assertAlreadyConsumed(() => rows.stream())
await assertAlreadyConsumed(() => rows.json())
await assertAlreadyConsumed(() => rows.text())
await assertAlreadyConsumed(async () => {
for await (const r of rows.stream()) {
r.text()
}
})
})

it('should consume a text response only once', async () => {
Expand All @@ -80,9 +81,13 @@ describe('select', () => {
})
expect(await rows.text()).toEqual('0\n')
// wrap in a func to avoid changing inner "this"
await assertAlreadyConsumed$(() => rows.json())
await assertAlreadyConsumed$(() => rows.text())
await assertAlreadyConsumed(() => rows.stream())
await assertAlreadyConsumed(() => rows.json())
await assertAlreadyConsumed(() => rows.text())
await assertAlreadyConsumed(async () => {
for await (const r of rows.stream()) {
r.text()
}
})
})

it('should consume a stream response only once', async () => {
Expand All @@ -96,9 +101,13 @@ describe('select', () => {
}
expect(result).toEqual('0')
// wrap in a func to avoid changing inner "this"
await assertAlreadyConsumed$(() => rows.json())
await assertAlreadyConsumed$(() => rows.text())
await assertAlreadyConsumed(() => rows.stream())
await assertAlreadyConsumed(() => rows.json())
await assertAlreadyConsumed(() => rows.text())
await assertAlreadyConsumed(async () => {
for await (const r of rows.stream()) {
r.text()
}
})
})
})

Expand Down Expand Up @@ -328,35 +337,14 @@ describe('select', () => {
format: 'JSON',
})
try {
expect(() => result.stream()).toThrowError(
await expect(async () => result.stream().next()).rejects.toThrowError(
'JSON format is not streamable'
)
} finally {
result.close()
}
})

it('can pause response stream', async () => {
const result = await client.query({
query: 'SELECT number FROM system.numbers LIMIT 10000',
format: 'CSV',
})

const stream = result.stream()

let last = null
let i = 0
for await (const chunk of stream) {
last = chunk.text()
i++
if (i % 1000 === 0) {
stream.pause()
setTimeout(() => stream.resume(), 100)
}
}
expect(last).toBe('9999')
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't need this anymore. It can be paused (or not consumed) just on the application level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not required anymore with async iterators. It can be paused (not consumed) on the application level, as it is a lazy evaluation.

describe('text()', () => {
it('returns stream of rows in CSV format', async () => {
const result = await client.query({
Expand Down
56 changes: 26 additions & 30 deletions __tests__/integration/streaming_e2e.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import Fs from 'fs'
import Path from 'path'
import Stream from 'stream'
import split from 'split2'
import { type ClickHouseClient } from '../../src'
import { createTestClient, guid } from '../utils'
import { createSimpleTable } from './fixtures/simple_table'
Expand All @@ -26,33 +23,32 @@ describe('streaming e2e', () => {
await client.close()
})

it('should stream a file', async () => {
// contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n...
const filename = Path.resolve(
__dirname,
'./fixtures/streaming_e2e_data.ndjson'
)

await client.insert({
table: tableName,
values: Fs.createReadStream(filename).pipe(
// should be removed when "insert" accepts a stream of strings/bytes
split((row: string) => JSON.parse(row))
),
format: 'JSONCompactEachRow',
})

const response = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONCompactEachRow',
})

const actual: string[] = []
for await (const row of response.stream()) {
actual.push(row.json())
}
expect(actual).toEqual(expected)
})
// it('should stream a file', async () => {
// // contains id as numbers in JSONCompactEachRow format ["0"]\n["1"]\n...
// const filename = Path.resolve(
// __dirname,
// './fixtures/streaming_e2e_data.ndjson'
// )
// await client.insert({
// table: tableName,
// values: Fs.createReadStream(filename).pipe(
// // should be removed when "insert" accepts a stream of strings/bytes
// split((row: string) => JSON.parse(row))
// ),
// format: 'JSONCompactEachRow',
// })
//
// const response = await client.query({
// query: `SELECT * from ${tableName}`,
// format: 'JSONCompactEachRow',
// })
//
// const actual: string[] = []
// for await (const row of response.stream()) {
// actual.push(row.json())
// }
// expect(actual).toEqual(expected)
// })

it('should stream a stream created in-place', async () => {
await client.insert({
Expand Down
37 changes: 24 additions & 13 deletions __tests__/unit/rows.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Row, Rows } from '../../src'
import type { Row } from '../../src'
import { Rows } from '../../src'
import * as Stream from 'stream'
import { Readable } from 'stream'

describe('rows', () => {
Expand Down Expand Up @@ -27,29 +29,38 @@ describe('rows', () => {
const rows = makeRows()
const stream = rows.stream()

expect(stream.readableEnded).toBeFalsy()

const result = []
for await (const row of stream) {
result.push((row as Row).json())
result.push(row.json())
}

expect(result).toEqual(expectedJson)
expect(stream.readableEnded).toBeTruthy()
expect((await stream.next()).done).toBeTruthy()

expect(() => rows.stream()).toThrowError(err)
await expect(async () => {
for await (const r of rows.stream()) {
r.text()
}
}).rejects.toThrowError(err)
await expect(rows.json()).rejects.toThrowError(err)
await expect(rows.text()).rejects.toThrowError(err)
})

it('should be able to call Row.text and Row.json multiple times', async () => {
const chunk = '{"foo":"bar"}'
const obj = { foo: 'bar' }
const row = new Row(chunk, 'JSON')
expect(row.text()).toEqual(chunk)
expect(row.text()).toEqual(chunk)
expect(row.json()).toEqual(obj)
expect(row.json()).toEqual(obj)
const rows = new Rows(
Stream.Readable.from([Buffer.from('{"foo":"bar"}\n')]),
'JSONEachRow'
)
const singleRows: Row[] = []
for await (const r of rows.stream()) {
singleRows.push(r)
}
expect(singleRows).toHaveLength(1)
const [row] = singleRows
expect(row.text()).toEqual('{"foo":"bar"}')
expect(row.text()).toEqual('{"foo":"bar"}')
expect(row.json()).toEqual({ foo: 'bar' })
expect(row.json()).toEqual({ foo: 'bar' })
})

function makeRows() {
Expand Down
5 changes: 4 additions & 1 deletion __tests__/unit/schema_select_result.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ describe('schema select result', () => {
.spyOn(client, 'query')
.mockResolvedValueOnce(
new Rows(
Readable.from(['{"valid":"json"}\n', 'invalid_json}\n']),
Readable.from([
Buffer.from('{"valid":"json"}\n'),
Buffer.from('invalid_json}\n'),
]),
'JSONEachRow'
)
)
Expand Down
94 changes: 47 additions & 47 deletions examples/insert_file_stream_ndjson.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
import { createClient } from '@clickhouse/client'
import Path from 'path'
import Fs from 'fs'
import split from 'split2'

void (async () => {
const client = createClient()
const tableName = 'insert_file_stream_ndjson'
await client.exec({
query: `DROP TABLE IF EXISTS ${tableName}`,
})
await client.exec({
query: `
CREATE TABLE ${tableName} (id UInt64)
ENGINE MergeTree()
ORDER BY (id)
`,
})

// contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// see also: NDJSON format
const filename = Path.resolve(
process.cwd(),
'./examples/resources/data.ndjson'
)

await client.insert({
table: tableName,
values: Fs.createReadStream(filename).pipe(
split((row: string) => JSON.parse(row))
),
format: 'JSONCompactEachRow',
})

const rows = await client.query({
query: `SELECT * from ${tableName}`,
format: 'JSONEachRow',
})

// or just `rows.text()` / `rows.json()`
// to consume the entire response at once
for await (const row of rows.stream()) {
console.log(row.json())
}

await client.close()
})()
// import { createClient } from '@clickhouse/client'
// import Path from 'path'
// import Fs from 'fs'
// import split from 'split2'
//
// void (async () => {
// const client = createClient()
// const tableName = 'insert_file_stream_ndjson'
// await client.exec({
// query: `DROP TABLE IF EXISTS ${tableName}`,
// })
// await client.exec({
// query: `
// CREATE TABLE ${tableName} (id UInt64)
// ENGINE MergeTree()
// ORDER BY (id)
// `,
// })
//
// // contains id as numbers in JSONCompactEachRow format ["0"]\n["0"]\n...
// // see also: NDJSON format
// const filename = Path.resolve(
// process.cwd(),
// './examples/resources/data.ndjson'
// )
//
// await client.insert({
// table: tableName,
// values: Fs.createReadStream(filename).pipe(
// split((row: string) => JSON.parse(row))
// ),
// format: 'JSONCompactEachRow',
// })
//
// const rows = await client.query({
// query: `SELECT * from ${tableName}`,
// format: 'JSONEachRow',
// })
//
// // or just `rows.text()` / `rows.json()`
// // to consume the entire response at once
// for await (const row of rows.stream()) {
// console.log(row.json())
// }
//
// await client.close()
// })()
Loading