Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams implementation #296

Merged
merged 13 commits into from
Aug 24, 2023
Merged

Conversation

H-Plus-Time
Copy link
Contributor

@H-Plus-Time H-Plus-Time commented Aug 22, 2023

Considerably cleaned up, the api for this is wholly functional thanks to wasm-streams (and the stream! macro from async-stream):

const stream = readFFIStream(targetUrl);
const batches = []
for await (const ffiTable of stream) {
    const batch = parseRecordBatch(memory.buffer, ffiTable.arrayAddr(), ffiTable.schemaAddr(), true);
    batches.push(batch);
}

parseRecordBatch now works for both arrow1 and arrow2, as far as I can tell given the ffi tests, it should be fully compatible.

Two things came up wrt the typescript interface:

  1. The bindings to ReadableStream are opaque, so their TS equivalents are just ReadableStream<R = any>. There's a combination of wasm_bindgen arguments that override a function's return type by fiat.
  2. The much bigger problem - apparently all versions of Typescript (yes, even nightlies) still interpret WHATWG ReadableStreams as both a. lacking a Symbol.asyncIterator and b. being non-iterable. Even once I forced the types to solve for 1, I ended up having to do as unknown as FFIArrowRecordBatch[].

NB: I've opted to leave the write half of this for a future PR (post-integration with the arrow-wasm repo), and of course IPC stream outputs aren't particularly relevant anymore.

@kylebarron
Copy link
Owner

Awesome, thanks! I'll do a final review and merge tonight or tomorrow!

@kylebarron kylebarron self-requested a review August 23, 2023 21:58
@@ -66,7 +66,7 @@ jobs:

- uses: actions/setup-node@v2
with:
node-version: "16"
node-version: "18"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require a Node 18+ for users as well? Or is this just for our tests? Is there a shim/polyfill that users can use for pre-18 versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, turns out the ReadableStream constructor only showed up in Node 18. There is indeed a poly/ponyfill, written by a familar face: MattiasBuelens/web-streams-polyfill.

That does bring up a good point - overall README level documentation, and doc-strings for the new exports. On that note, the codeblocks embedded in rust doc comments, do those show up with syntax highlighting for embedded code fragments (a bit like jsdocs) in editors for you?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah they show up in editors and on the docs site https://kylebarron.dev/parquet-wasm/modules/bundler_arrow1.html#readParquet

$FLAGS
--features async \
$FLAGS &
[ -n "$CI" ] && wait;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome; it parallelizes each build? Should we do this for arrow2 as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, turning the parallel behaviour off in CI took a bit of bash-fu (conditionally backgrounding a command isn't possible, but nobody said anything about conditionally running wait 😉 ).

Not much point for arrow2 (the wasm-opt step for arrow2 builds is quite short, while the compile step is highly parallelized, so trying to run that in parallel ended up slowing things down).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why wasm-opt would be so much slower for arrow1. I don't think this used to be the case; I only recently noticed it taking longer

Comment on lines +20 to +28
#[wasm_bindgen]
pub fn free(self) {
drop(self.0)
}

#[wasm_bindgen]
pub fn drop(self) {
drop(self.0)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General note: I need to check that the memory pointed to by this actually gets freed by wasm-bindgen. I only need one method but I'm not sure if I can/need to add a manual drop call to release the array

Comment on lines +66 to +79
pub async fn read_record_batch_stream(
url: String,
content_length: Option<usize>,
) -> Result<ParquetRecordBatchStream<Compat<RangedAsyncReader>>> {
let content_length = match content_length {
Some(_content_length) => _content_length,
None => get_content_length(url.clone()).await?,
};
let reader = crate::common::fetch::create_reader(url, content_length, None);

let builder = ParquetRecordBatchStreamBuilder::new(reader.compat()).await?;
let parquet_reader = builder.build()?;
Ok(parquet_reader)
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so clean!

url: String,
) -> Result<impl futures::Stream<Item = super::ffi::FFIArrowRecordBatch>> {
use async_stream::stream;
let inner_stream = stream! {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah! That's cool! I didn't even know yield was a keyword

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cake (yield) is a lie with this one - that stream! macro slightly extends the grammar (apparently it's on it's way to the language - generator syntax seems to be one of the harder ones to add to a language).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see it's a proc macro so it can make up new keywords!

@kylebarron kylebarron merged commit 0c0fc2a into kylebarron:main Aug 24, 2023
5 checks passed
@kylebarron
Copy link
Owner

Thanks again! As a follow up I'll try to update some docs and integrate with the new arrow-wasm Table object, but it would be great to release this sooner than later

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants