-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove split2, refactor Rows for better performance #108
Conversation
} | ||
expect(last).toBe('9999') | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't need this anymore. It can be paused (or not consumed) just on the application level.
}) | ||
} else { | ||
expect(await selectPromise).toEqual(undefined) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly enough, this is no longer the case for Node.js 18.x.
¯\_(ツ)_/¯
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You removed destroy()
, which caused the error. Maybe that's the reason?
} | ||
expect(last).toBe('9999') | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not required anymore with async iterators. It can be paused (not consumed) on the application level, as it is a lazy evaluation.
callback() | ||
}, | ||
objectMode: true, | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the best implementation for this (haven't tested how it behaves with large files), but at least it works without split2
.
if (!line.length) { | ||
return | ||
} else { | ||
const json = JSON.parse(line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should support already serialized data somehow. It looks strange to parse it here and then stringify it again in the library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's insert data in CSV
but query in JSONCompactEachRow
? It doesn't change the logic much but simplifies the example
After some digging in Node.js issues I found this, and indeed, Using current release version, with the old stream implementation with import { createClient } from '@clickhouse/client';
(async () => {
const client = createClient()
const rows = await client.query({
query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
format: 'TabSeparated'
})
const start = +new Date()
const stream = rows.stream()
stream.on('data', (_) => {
//
})
await new Promise((resolve) => {
stream.on('end', () => {resolve()})
})
const end = +new Date()
console.info(`Execution time: ${end - start} ms`)
await client.close()
})() executes in 20 seconds, while this import { createClient } from '@clickhouse/client';
(async () => {
const client = createClient()
const rows = await client.query({
query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
format: 'TabSeparated'
})
const start = +new Date()
for await (const _ of rows.stream()) {
//
}
const end = +new Date()
console.info(`Execution time: ${end - start} ms`)
await client.close()
})() takes 30 seconds to finish. So it might be beneficial to return Stream.Readable instead of an AsyncGenerator. |
So, with just the allocations removed, keeping the Stream in place return Stream.pipeline(
this._stream,
split((row: string) => ({ // <- no `new Row` here
text: row, // <- this is not a function anymore
json<T>() {
return decode(row, 'JSON')
},
})),
function pipelineCb(err) {
if (err) {
console.error(err)
}
}
) using this code (50M numbers) import { createClient } from '../src'
void (async () => {
const client = createClient({
compression: {
request: false,
response: false,
},
})
const rows = await client.query({
query: 'SELECT number FROM system.numbers_mt LIMIT 50000000',
format: 'TabSeparated',
})
const start = +new Date()
const stream = rows.stream()
stream.on('data', (_) => {
//
})
await new Promise((resolve) => {
stream.on('end', () => {
resolve(0)
})
})
const end = +new Date()
console.info(`Execution time: ${end - start} ms`)
})() we can get as fast as ~3.5-4 seconds on my machine. On 500M records instead of 50M, that's ~37-38 seconds vs ~280-300 seconds. |
let decodedChunk = '' | ||
for await (const chunk of this._stream) { | ||
decodedChunk += textDecoder.decode(chunk, { stream: true }) | ||
let idx = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit it's not changed. let's move lower:
const idx = decodedChunk.indexOf('\n')
}, | ||
} | ||
} else { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a special case at the top to remove a nesting level.
if (idx === -1) break;
const line = decodedChunk.slice(0, idx);
...
yield { | ||
/** | ||
* Returns a string representation of a row. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These comments belong to Row
interface declared below. https://github.com/ClickHouse/clickhouse-js/pull/108/files#diff-b13826a37e4f93783b49eaca9c60dc1d124ee9d6b331be22244f41cc7bb09d39R94-R96
IDE doesn't hint at the method signature.
} | ||
} | ||
) | ||
} | ||
textDecoder.decode() // flush |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be consumed? The method typings shows it as
/**
* Returns the result of running encoding's decoder. The method can be invoked zero or more times with options's stream set to true, and then once without options's stream (or set to false), to process a fragmented input. If the invocation without options's stream (or set to false) has no input, it's clearest to omit both arguments.
*
* ```
* var string = "", decoder = new TextDecoder(encoding), buffer;
* while(buffer = next_chunk()) {
* string += decoder.decode(buffer, {stream:true});
* }
* string += decoder.decode(); // end-of-queue
* ```
*
* If the error mode is "fatal" and encoding's decoder returns error, throws a TypeError.
*/
decode(input?: BufferSource, options?: TextDecodeOptions): string;
if (!line.length) { | ||
return | ||
} else { | ||
const json = JSON.parse(line) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's insert data in CSV
but query in JSONCompactEachRow
? It doesn't change the logic much but simplifies the example
}) | ||
} else { | ||
expect(await selectPromise).toEqual(undefined) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You removed destroy()
, which caused the error. Maybe that's the reason?
/** | ||
* Returns a string representation of a row. | ||
*/ | ||
text(): string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it's more lightweight than a Row
class instance?
The method emits an object literal with 2 methods vs. a class with 2 methods in a prototype
class Row {
text(){}
}
(new Row()).hasOwnProperty('text') // false
const objectLiteral = {
text(){}
}
objectLiteral.hasOwnProperty('text') // true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split((row: string) => new Row(row, 'JSON')),
ts-node --transpile-only --project tsconfig.dev.json examples/many_numbers.ts
Execution time: 1957 ms
split((row: string) => ({
text: row,
json<T>() {
return decode(row, 'JSON')
}
})),
ts-node --transpile-only --project tsconfig.dev.json examples/many_numbers.ts
Execution time: 394 ms
Will create a new one with an updated implementation. |
After checking out https://github.com/go-faster/ch-bench and writing a simple test for our client as such:
I ended up with an "amazing" performance of around ~280 seconds for executing this bit of code (Ryzen 9 5950X).
So, I did the following:
split2
with a simple self-written solutionRow
class created on every iteration with just a slim interfacestream()
method withAsyncGenerator<Row, void>
, and now we have a properrow
type here:Before: 285 seconds
After: 96 seconds
Still far from perfect (I will investigate more), but that's a start.