Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor code into lower-level functions, use ? operator #25

Merged
merged 2 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 33 additions & 88 deletions src/arrow1.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
use js_sys::Uint8Array;
use wasm_bindgen::prelude::*;
#[cfg(feature = "arrow1")]
use {
arrow::ipc::reader::StreamReader,
arrow::ipc::writer::StreamWriter,
parquet::arrow::arrow_writer::ArrowWriter,
parquet::arrow::{ArrowReader, ParquetFileArrowReader},
parquet::errors::ParquetError,
parquet::file::properties::WriterProperties,
parquet::file::reader::{FileReader, SerializedFileReader},
parquet::file::serialized_reader::SliceableCursor,
parquet::file::writer::InMemoryWriteableCursor,
std::io::Cursor,
std::sync::Arc,
};

// A macro to provide `println!(..)`-style syntax for `console.log` logging.
#[cfg(target_arch = "wasm32")]
Expand All @@ -17,22 +29,10 @@ macro_rules! log {
}

#[cfg(feature = "arrow1")]
#[wasm_bindgen(js_name = readParquet1)]
pub fn read_parquet(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
use js_sys::Uint8Array;

use arrow::ipc::writer::StreamWriter;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::file::serialized_reader::SliceableCursor;
use std::sync::Arc;

pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ParquetError> {
// Create Parquet reader
let sliceable_cursor = SliceableCursor::new(Arc::new(parquet_file.to_vec()));
let parquet_reader = match SerializedFileReader::new(sliceable_cursor) {
Ok(parquet_reader) => parquet_reader,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let parquet_reader = SerializedFileReader::new(sliceable_cursor)?;
let parquet_metadata = parquet_reader.metadata();
let parquet_file_metadata = parquet_metadata.file_metadata();
let row_count = parquet_file_metadata.num_rows() as usize;
Expand All @@ -41,98 +41,43 @@ pub fn read_parquet(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_reader));
// TODO: use Parquet column group row count for arrow record reader row count (i.e. don't read
// entire file into one IPC batch)
let record_batch_reader = match arrow_reader.get_record_reader(row_count) {
Ok(record_batch_reader) => record_batch_reader,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let arrow_schema = match arrow_reader.get_schema() {
Ok(arrow_schema) => arrow_schema,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let record_batch_reader = arrow_reader.get_record_reader(row_count)?;
let arrow_schema = arrow_reader.get_schema()?;

// Create IPC Writer
let mut output_file = Vec::new();
let mut writer = match StreamWriter::try_new(&mut output_file, &arrow_schema) {
Ok(writer) => writer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let mut writer = StreamWriter::try_new(&mut output_file, &arrow_schema)?;

// Iterate over record batches, writing them to IPC stream
for maybe_record_batch in record_batch_reader {
let record_batch = match maybe_record_batch {
Ok(record_batch) => record_batch,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
match writer.write(&record_batch) {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let record_batch = maybe_record_batch?;
writer.write(&record_batch)?;
}
match writer.finish() {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
writer.finish()?;

let writer_buffer = match writer.into_inner() {
Ok(writer_buffer) => writer_buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_len = match (writer_buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&writer_buffer);
return Ok(return_vec);
let writer_buffer = writer.into_inner()?;
return Ok(writer_buffer.to_vec());
}

#[cfg(feature = "arrow1")]
#[wasm_bindgen(js_name = writeParquet1)]
pub fn write_parquet(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
use arrow::ipc::reader::StreamReader;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
use parquet::file::writer::InMemoryWriteableCursor;
use std::io::Cursor;

pub fn write_parquet(arrow_file: &[u8]) -> Result<Vec<u8>, ParquetError> {
// Create IPC reader
let input_file = Cursor::new(arrow_file);
let arrow_ipc_reader = match StreamReader::try_new(input_file) {
Ok(arrow_ipc_reader) => arrow_ipc_reader,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let arrow_ipc_reader = StreamReader::try_new(input_file)?;
let arrow_schema = arrow_ipc_reader.schema();

// Create Parquet writer
let cursor = InMemoryWriteableCursor::default();
let props = WriterProperties::builder().build();
let mut writer = match ArrowWriter::try_new(cursor.clone(), arrow_schema, Some(props)) {
Ok(writer) => writer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let mut writer = ArrowWriter::try_new(cursor.clone(), arrow_schema, Some(props))?;

// Iterate over IPC chunks, writing each batch to Parquet
for maybe_record_batch in arrow_ipc_reader {
let record_batch = match maybe_record_batch {
Ok(record_batch) => record_batch,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

match writer.write(&record_batch) {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let record_batch = maybe_record_batch?;
writer.write(&record_batch)?;
}
match writer.close() {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_buffer = cursor.data();
let return_len = match (return_buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&return_buffer);
return Ok(return_vec);
writer.close()?;

return Ok(cursor.data());
}
108 changes: 29 additions & 79 deletions src/arrow2.rs
Original file line number Diff line number Diff line change
@@ -1,75 +1,45 @@
use js_sys::Uint8Array;

use wasm_bindgen::prelude::*;

#[cfg(feature = "arrow2")]
#[wasm_bindgen(js_name = readParquet2)]
pub fn read_parquet(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
use arrow2::io::ipc::write::{
StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions,
};
use arrow2::io::parquet::read::FileReader as ParquetFileReader;
use std::io::Cursor;
use {
arrow2::error::ArrowError,
arrow2::io::ipc::read::{read_file_metadata, FileReader as IPCFileReader},
arrow2::io::ipc::write::{StreamWriter as IPCStreamWriter, WriteOptions as IPCWriteOptions},
arrow2::io::parquet::read::FileReader as ParquetFileReader,
// NOTE: It's FileReader on latest main but RecordReader in 0.9.2
arrow2::io::parquet::write::{
Compression, Encoding, FileWriter as ParquetFileWriter, RowGroupIterator, Version,
WriteOptions as ParquetWriteOptions,
},
std::io::Cursor,
};

#[cfg(feature = "arrow2")]
pub fn read_parquet(parquet_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
// Create Parquet reader
let input_file = Cursor::new(parquet_file);
let file_reader = match ParquetFileReader::try_new(input_file, None, None, None, None) {
Ok(file_reader) => file_reader,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let file_reader = ParquetFileReader::try_new(input_file, None, None, None, None)?;
let schema = file_reader.schema().clone();

// Create IPC writer
let mut output_file = Vec::new();
let options = IPCWriteOptions { compression: None };
let mut writer = IPCStreamWriter::new(&mut output_file, options);
match writer.start(&schema, None) {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
}
writer.start(&schema, None)?;

// Iterate over reader chunks, writing each into the IPC writer
for maybe_chunk in file_reader {
let chunk = match maybe_chunk {
Ok(chunk) => chunk,
Err(error) => {
return Err(JsValue::from_str(format!("{}", error).as_str()));
}
};

match writer.write(&chunk, None) {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let chunk = maybe_chunk?;
writer.write(&chunk, None)?;
}

match writer.finish() {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

Ok(unsafe { Uint8Array::view(&output_file) })
writer.finish()?;
return Ok(output_file);
}

#[cfg(feature = "arrow2")]
#[wasm_bindgen(js_name = writeParquet2)]
pub fn write_parquet(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
use arrow2::io::ipc::read::{read_file_metadata, FileReader as IPCFileReader};
// NOTE: It's FileReader on latest main but RecordReader in 0.9.2
use arrow2::io::parquet::write::{
Compression, Encoding, FileWriter as ParquetFileWriter, RowGroupIterator, Version,
WriteOptions as ParquetWriteOptions,
};
use std::io::Cursor;

pub fn write_parquet(arrow_file: &[u8]) -> Result<Vec<u8>, ArrowError> {
// Create IPC reader
let mut input_file = Cursor::new(arrow_file);

let stream_metadata = match read_file_metadata(&mut input_file) {
Ok(stream_metadata) => stream_metadata,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let stream_metadata = read_file_metadata(&mut input_file)?;
let arrow_ipc_reader = IPCFileReader::new(input_file, stream_metadata.clone(), None);

// Create Parquet writer
Expand All @@ -81,23 +51,11 @@ pub fn write_parquet(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
};

let schema = stream_metadata.schema.clone();
let mut parquet_writer = match ParquetFileWriter::try_new(&mut output_file, schema, options) {
Ok(parquet_writer) => parquet_writer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

match parquet_writer.start() {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let mut parquet_writer = ParquetFileWriter::try_new(&mut output_file, schema, options)?;
parquet_writer.start()?;

for maybe_chunk in arrow_ipc_reader {
let chunk = match maybe_chunk {
Ok(chunk) => chunk,
Err(error) => {
return Err(JsValue::from_str(format!("{}", error).as_str()));
}
};
let chunk = maybe_chunk?;

let iter = vec![Ok(chunk)];

Expand All @@ -118,20 +76,12 @@ pub fn write_parquet(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
// for loop over `row_groups`, which is a `Result`. This is more readably written as an `if let` statement
for group in row_groups {
for maybe_column in group {
let column = match maybe_column {
Ok(column) => column,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let column = maybe_column?;
let (group, len) = column;
match parquet_writer.write(group, len) {
Ok(_) => {}
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
parquet_writer.write(group, len)?;
}
}
}
let _size = parquet_writer.end(None);

Ok(unsafe { Uint8Array::view(&output_file) })
let _size = parquet_writer.end(None)?;
return Ok(output_file);
}
73 changes: 73 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod arrow1;
mod arrow2;
mod utils;

use js_sys::Uint8Array;
use wasm_bindgen::prelude::*;

// When the `wee_alloc` feature is enabled, use `wee_alloc` as the global
Expand All @@ -12,6 +13,78 @@ use wasm_bindgen::prelude::*;
#[global_allocator]
static ALLOC: wee_alloc::WeeAlloc = wee_alloc::WeeAlloc::INIT;*/

#[cfg(feature = "arrow1")]
#[wasm_bindgen(js_name = readParquet1)]
pub fn read_parquet1(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow1::read_parquet(parquet_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[cfg(feature = "arrow1")]
#[wasm_bindgen(js_name = writeParquet1)]
pub fn write_parquet1(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow1::write_parquet(arrow_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[cfg(feature = "arrow2")]
#[wasm_bindgen(js_name = readParquet2)]
pub fn read_parquet2(parquet_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow2::read_parquet(parquet_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[cfg(feature = "arrow2")]
#[wasm_bindgen(js_name = writeParquet2)]
pub fn write_parquet2(arrow_file: &[u8]) -> Result<Uint8Array, JsValue> {
let buffer = match crate::arrow2::write_parquet(arrow_file) {
// This function would return a rust vec that would be copied to a Uint8Array here
Ok(buffer) => buffer,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};

let return_len = match (buffer.len() as usize).try_into() {
Ok(return_len) => return_len,
Err(error) => return Err(JsValue::from_str(format!("{}", error).as_str())),
};
let return_vec = Uint8Array::new_with_length(return_len);
return_vec.copy_from(&buffer);
return Ok(return_vec);
}

#[wasm_bindgen(js_name = setPanicHook)]
pub fn set_panic_hook() {
utils::set_panic_hook();
Expand Down