From 8051f226ff94ef129cbca3492b173f2fc8364456 Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Wed, 23 Aug 2023 03:34:39 +1000 Subject: [PATCH] Formatting! --- src/arrow1/ffi.rs | 4 +--- src/arrow1/reader_async.rs | 1 - src/arrow1/wasm.rs | 3 ++- src/arrow2/reader_async.rs | 12 +++++++----- src/arrow2/wasm.rs | 7 ++++--- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/arrow1/ffi.rs b/src/arrow1/ffi.rs index 2295de4f..815f1052 100644 --- a/src/arrow1/ffi.rs +++ b/src/arrow1/ffi.rs @@ -1,5 +1,3 @@ - - use arrow::array::{Array, StructArray}; use arrow::datatypes::{Field, Schema}; use arrow::ffi::{self, from_ffi, to_ffi}; @@ -114,7 +112,7 @@ impl From for RecordBatch { fn from(value: FFIArrowRecordBatch) -> Self { let array_data = from_ffi(*value.array, &value.field).unwrap(); let intermediate = StructArray::from(array_data); - + RecordBatch::from(intermediate) } } diff --git a/src/arrow1/reader_async.rs b/src/arrow1/reader_async.rs index 17ac1c00..bef91365 100644 --- a/src/arrow1/reader_async.rs +++ b/src/arrow1/reader_async.rs @@ -71,7 +71,6 @@ pub async fn read_record_batch_stream( Some(_content_length) => _content_length, None => get_content_length(url.clone()).await?, }; - let content_length = usize::try_from(content_length).unwrap(); let reader = crate::common::fetch::create_reader(url, content_length, None); let builder = ParquetRecordBatchStreamBuilder::new(reader.compat()).await?; diff --git a/src/arrow1/wasm.rs b/src/arrow1/wasm.rs index b0ebe619..740cf269 100644 --- a/src/arrow1/wasm.rs +++ b/src/arrow1/wasm.rs @@ -1,5 +1,5 @@ use crate::arrow1::error::WasmResult; -use crate::arrow1::ffi::{FFIArrowRecordBatch, FFIArrowTable}; +use crate::arrow1::ffi::FFIArrowTable; use crate::utils::assert_parquet_file_not_empty; use wasm_bindgen::prelude::*; @@ -81,6 +81,7 @@ pub async fn read_ffi_stream( url: String, content_length: Option, ) -> WasmResult { + use crate::arrow1::ffi::FFIArrowRecordBatch; use futures::StreamExt; let parquet_stream = crate::arrow1::reader_async::read_record_batch_stream(url, content_length).await?; diff --git a/src/arrow2/reader_async.rs b/src/arrow2/reader_async.rs index 30358929..1d0670e6 100644 --- a/src/arrow2/reader_async.rs +++ b/src/arrow2/reader_async.rs @@ -116,19 +116,21 @@ pub async fn read_row_group( Ok(output_file) } -pub async fn read_record_batch_stream(url: String) -> Result> { +pub async fn read_record_batch_stream( + url: String, +) -> Result> { use async_stream::stream; let inner_stream = stream! { let metadata = read_metadata_async(url.clone(), None).await.unwrap(); let compat_meta = crate::arrow2::metadata::FileMetaData::from(metadata.clone()); - + let arrow_schema = compat_meta.arrow_schema().unwrap_or_else(|_| { let bar: Vec = vec![]; arrow2::datatypes::Schema::from(bar).into() }); for row_group_meta in metadata.row_groups { - let foo = arrow_schema.clone().into(); - let deserializer = _read_row_group(url.clone(), &row_group_meta, &foo).await.unwrap(); + let schema = arrow_schema.clone().into(); + let deserializer = _read_row_group(url.clone(), &row_group_meta, &schema).await.unwrap(); for maybe_chunk in deserializer { let chunk = maybe_chunk.unwrap(); yield super::ffi::FFIArrowRecordBatch::from_chunk(chunk, arrow_schema.clone().into()); @@ -136,4 +138,4 @@ pub async fn read_record_batch_stream(url: String) -> Result WasmResult { use futures::StreamExt; - let stream = super::reader_async::read_record_batch_stream(url).await?.map(|batch| Ok(batch.into())); + let stream = super::reader_async::read_record_batch_stream(url) + .await? + .map(|batch| Ok(batch.into())); Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) - -} \ No newline at end of file +}