Skip to content

Commit

Permalink
Formatting!
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Plus-Time committed Aug 22, 2023
1 parent 0dcc0b2 commit 8051f22
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 13 deletions.
4 changes: 1 addition & 3 deletions src/arrow1/ffi.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


use arrow::array::{Array, StructArray};
use arrow::datatypes::{Field, Schema};
use arrow::ffi::{self, from_ffi, to_ffi};
Expand Down Expand Up @@ -114,7 +112,7 @@ impl From<FFIArrowRecordBatch> for RecordBatch {
fn from(value: FFIArrowRecordBatch) -> Self {
let array_data = from_ffi(*value.array, &value.field).unwrap();
let intermediate = StructArray::from(array_data);

RecordBatch::from(intermediate)
}
}
Expand Down
1 change: 0 additions & 1 deletion src/arrow1/reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ pub async fn read_record_batch_stream(
Some(_content_length) => _content_length,
None => get_content_length(url.clone()).await?,
};
let content_length = usize::try_from(content_length).unwrap();
let reader = crate::common::fetch::create_reader(url, content_length, None);

let builder = ParquetRecordBatchStreamBuilder::new(reader.compat()).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/arrow1/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::arrow1::error::WasmResult;
use crate::arrow1::ffi::{FFIArrowRecordBatch, FFIArrowTable};
use crate::arrow1::ffi::FFIArrowTable;
use crate::utils::assert_parquet_file_not_empty;
use wasm_bindgen::prelude::*;

Expand Down Expand Up @@ -81,6 +81,7 @@ pub async fn read_ffi_stream(
url: String,
content_length: Option<usize>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use crate::arrow1::ffi::FFIArrowRecordBatch;
use futures::StreamExt;
let parquet_stream =
crate::arrow1::reader_async::read_record_batch_stream(url, content_length).await?;
Expand Down
12 changes: 7 additions & 5 deletions src/arrow2/reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,26 @@ pub async fn read_row_group(
Ok(output_file)
}

pub async fn read_record_batch_stream(url: String) -> Result<impl futures::Stream<Item = super::ffi::FFIArrowRecordBatch>> {
pub async fn read_record_batch_stream(
url: String,
) -> Result<impl futures::Stream<Item = super::ffi::FFIArrowRecordBatch>> {
use async_stream::stream;
let inner_stream = stream! {
let metadata = read_metadata_async(url.clone(), None).await.unwrap();
let compat_meta = crate::arrow2::metadata::FileMetaData::from(metadata.clone());

let arrow_schema = compat_meta.arrow_schema().unwrap_or_else(|_| {
let bar: Vec<arrow2::datatypes::Field> = vec![];
arrow2::datatypes::Schema::from(bar).into()
});
for row_group_meta in metadata.row_groups {
let foo = arrow_schema.clone().into();
let deserializer = _read_row_group(url.clone(), &row_group_meta, &foo).await.unwrap();
let schema = arrow_schema.clone().into();
let deserializer = _read_row_group(url.clone(), &row_group_meta, &schema).await.unwrap();
for maybe_chunk in deserializer {
let chunk = maybe_chunk.unwrap();
yield super::ffi::FFIArrowRecordBatch::from_chunk(chunk, arrow_schema.clone().into());
}
}
};
Ok(inner_stream)
}
}
7 changes: 4 additions & 3 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ pub async fn read_ffi_stream(
url: String,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::StreamExt;
let stream = super::reader_async::read_record_batch_stream(url).await?.map(|batch| Ok(batch.into()));
let stream = super::reader_async::read_record_batch_stream(url)
.await?
.map(|batch| Ok(batch.into()));
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())

}
}

0 comments on commit 8051f22

Please sign in to comment.