Skip to content

Commit

Permalink
Formatting!
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Plus-Time committed Aug 22, 2023
1 parent 0dcc0b2 commit 7995d88
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
4 changes: 1 addition & 3 deletions src/arrow1/ffi.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


use arrow::array::{Array, StructArray};
use arrow::datatypes::{Field, Schema};
use arrow::ffi::{self, from_ffi, to_ffi};
Expand Down Expand Up @@ -114,7 +112,7 @@ impl From<FFIArrowRecordBatch> for RecordBatch {
fn from(value: FFIArrowRecordBatch) -> Self {
let array_data = from_ffi(*value.array, &value.field).unwrap();
let intermediate = StructArray::from(array_data);

RecordBatch::from(intermediate)
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/arrow1/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::arrow1::error::WasmResult;
use crate::arrow1::ffi::{FFIArrowRecordBatch, FFIArrowTable};
use crate::arrow1::ffi::FFIArrowTable;
use crate::utils::assert_parquet_file_not_empty;
use wasm_bindgen::prelude::*;

Expand Down Expand Up @@ -81,6 +81,7 @@ pub async fn read_ffi_stream(
url: String,
content_length: Option<usize>,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use crate::arrow1::ffi::FFIArrowRecordBatch;
use futures::StreamExt;
let parquet_stream =
crate::arrow1::reader_async::read_record_batch_stream(url, content_length).await?;
Expand Down
8 changes: 5 additions & 3 deletions src/arrow2/reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ pub async fn read_row_group(
Ok(output_file)
}

pub async fn read_record_batch_stream(url: String) -> Result<impl futures::Stream<Item = super::ffi::FFIArrowRecordBatch>> {
pub async fn read_record_batch_stream(
url: String,
) -> Result<impl futures::Stream<Item = super::ffi::FFIArrowRecordBatch>> {
use async_stream::stream;
let inner_stream = stream! {
let metadata = read_metadata_async(url.clone(), None).await.unwrap();
let compat_meta = crate::arrow2::metadata::FileMetaData::from(metadata.clone());

let arrow_schema = compat_meta.arrow_schema().unwrap_or_else(|_| {
let bar: Vec<arrow2::datatypes::Field> = vec![];
arrow2::datatypes::Schema::from(bar).into()
Expand All @@ -136,4 +138,4 @@ pub async fn read_record_batch_stream(url: String) -> Result<impl futures::Strea
}
};
Ok(inner_stream)
}
}
7 changes: 4 additions & 3 deletions src/arrow2/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ pub async fn read_ffi_stream(
url: String,
) -> WasmResult<wasm_streams::readable::sys::ReadableStream> {
use futures::StreamExt;
let stream = super::reader_async::read_record_batch_stream(url).await?.map(|batch| Ok(batch.into()));
let stream = super::reader_async::read_record_batch_stream(url)
.await?
.map(|batch| Ok(batch.into()));
Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw())

}
}

0 comments on commit 7995d88

Please sign in to comment.