diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs index f1154eb9e394..25ff1ca90dc6 100644 --- a/parquet/benches/arrow_writer.rs +++ b/parquet/benches/arrow_writer.rs @@ -26,9 +26,7 @@ use std::sync::Arc; use arrow::datatypes::*; use arrow::{record_batch::RecordBatch, util::data_gen::*}; -use parquet::{ - arrow::ArrowWriter, errors::Result, file::writer::InMemoryWriteableCursor, -}; +use parquet::{arrow::ArrowWriter, errors::Result}; fn create_primitive_bench_batch( size: usize, @@ -278,8 +276,8 @@ fn _create_nested_bench_batch( #[inline] fn write_batch(batch: &RecordBatch) -> Result<()> { // Write batch to an in-memory writer - let cursor = InMemoryWriteableCursor::default(); - let mut writer = ArrowWriter::try_new(cursor, batch.schema(), None)?; + let buffer = vec![]; + let mut writer = ArrowWriter::try_new(buffer, batch.schema(), None)?; writer.write(batch)?; writer.close()?; diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 2d199f69ea29..808f815e6b42 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -564,7 +564,7 @@ mod tests { .set_max_row_group_size(200) .build(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(arrow_schema), Some(props), diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index e3a1d1233487..34a14f3725f7 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -263,7 +263,6 @@ mod tests { use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; use crate::arrow::{ArrowWriter, ProjectionMask}; use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType}; - use crate::column::writer::get_typed_column_writer_mut; use crate::data_type::{ BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType, Int32Type, Int64Type, @@ -271,7 +270,7 @@ mod tests { use crate::errors::Result; use crate::file::properties::{WriterProperties, WriterVersion}; use crate::file::reader::{FileReader, SerializedFileReader}; - use crate::file::writer::{FileWriter, SerializedFileWriter}; + use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; use crate::schema::types::{Type, TypePtr}; use crate::util::cursor::SliceableCursor; @@ -936,21 +935,24 @@ mod tests { for (idx, v) in values.iter().enumerate() { let def_levels = def_levels.map(|d| d[idx].as_slice()); let mut row_group_writer = writer.next_row_group()?; - let mut column_writer = row_group_writer - .next_column()? - .expect("Column writer is none!"); + { + let mut column_writer = row_group_writer + .next_column()? + .expect("Column writer is none!"); - get_typed_column_writer_mut::(&mut column_writer) - .write_batch(v, def_levels, None)?; + column_writer + .typed::() + .write_batch(v, def_levels, None)?; - row_group_writer.close_column(column_writer)?; - writer.close_row_group(row_group_writer)? + column_writer.close()?; + } + row_group_writer.close()?; } writer.close() } - fn get_test_reader(file_name: &str) -> Arc { + fn get_test_reader(file_name: &str) -> Arc> { let file = get_test_file(file_name); let reader = @@ -1094,15 +1096,18 @@ mod tests { ) .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); + { + let mut row_group_writer = writer.next_row_group().unwrap(); + let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - get_typed_column_writer_mut::(&mut column_writer) - .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None) - .unwrap(); + column_writer + .typed::() + .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None) + .unwrap(); - row_group_writer.close_column(column_writer).unwrap(); - writer.close_row_group(row_group_writer).unwrap(); + column_writer.close().unwrap(); + row_group_writer.close().unwrap(); + } writer.close().unwrap(); } diff --git a/parquet/src/arrow/arrow_writer.rs b/parquet/src/arrow/arrow_writer.rs index 1918c967550a..e1fd93a9b6f3 100644 --- a/parquet/src/arrow/arrow_writer.rs +++ b/parquet/src/arrow/arrow_writer.rs @@ -18,6 +18,7 @@ //! Contains writer which writes arrow data into parquet data. use std::collections::VecDeque; +use std::io::Write; use std::sync::Arc; use arrow::array as arrow_array; @@ -35,10 +36,8 @@ use super::schema::{ use crate::column::writer::ColumnWriter; use crate::errors::{ParquetError, Result}; use crate::file::properties::WriterProperties; -use crate::{ - data_type::*, - file::writer::{FileWriter, ParquetWriter, RowGroupWriter, SerializedFileWriter}, -}; +use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; +use crate::{data_type::*, file::writer::SerializedFileWriter}; /// Arrow writer /// @@ -46,7 +45,7 @@ use crate::{ /// to produce row groups with `max_row_group_size` rows. Any remaining rows will be /// flushed on close, leading the final row group in the output file to potentially /// contain fewer than `max_row_group_size` rows -pub struct ArrowWriter { +pub struct ArrowWriter { /// Underlying Parquet writer writer: SerializedFileWriter, @@ -65,7 +64,7 @@ pub struct ArrowWriter { max_row_group_size: usize, } -impl ArrowWriter { +impl ArrowWriter { /// Try to create a new Arrow writer /// /// The writer will fail if: @@ -185,17 +184,17 @@ impl ArrowWriter { }) .collect(); - write_leaves(row_group_writer.as_mut(), &arrays, &mut levels)?; + write_leaves(&mut row_group_writer, &arrays, &mut levels)?; } - self.writer.close_row_group(row_group_writer)?; + row_group_writer.close()?; self.buffered_rows -= num_rows; Ok(()) } /// Close and finalize the underlying Parquet writer - pub fn close(&mut self) -> Result { + pub fn close(mut self) -> Result { self.flush()?; self.writer.close() } @@ -203,15 +202,17 @@ impl ArrowWriter { /// Convenience method to get the next ColumnWriter from the RowGroupWriter #[inline] -fn get_col_writer(row_group_writer: &mut dyn RowGroupWriter) -> Result { +fn get_col_writer<'a, W: Write>( + row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>, +) -> Result> { let col_writer = row_group_writer .next_column()? .expect("Unable to get column writer"); Ok(col_writer) } -fn write_leaves( - row_group_writer: &mut dyn RowGroupWriter, +fn write_leaves( + row_group_writer: &mut SerializedRowGroupWriter<'_, W>, arrays: &[ArrayRef], levels: &mut [Vec], ) -> Result<()> { @@ -250,12 +251,12 @@ fn write_leaves( let mut col_writer = get_col_writer(row_group_writer)?; for (array, levels) in arrays.iter().zip(levels.iter_mut()) { write_leaf( - &mut col_writer, + col_writer.untyped(), array, levels.pop().expect("Levels exhausted"), )?; } - row_group_writer.close_column(col_writer)?; + col_writer.close()?; Ok(()) } ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { @@ -313,12 +314,12 @@ fn write_leaves( // cast dictionary to a primitive let array = arrow::compute::cast(array, value_type)?; write_leaf( - &mut col_writer, + col_writer.untyped(), &array, levels.pop().expect("Levels exhausted"), )?; } - row_group_writer.close_column(col_writer)?; + col_writer.close()?; Ok(()) } ArrowDataType::Float16 => Err(ParquetError::ArrowError( @@ -336,8 +337,8 @@ fn write_leaves( } fn write_leaf( - writer: &mut ColumnWriter, - column: &arrow_array::ArrayRef, + writer: &mut ColumnWriter<'_>, + column: &ArrayRef, levels: LevelInfo, ) -> Result { let indices = levels.filter_array_indices(); @@ -705,7 +706,6 @@ mod tests { use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, - writer::InMemoryWriteableCursor, }; #[test] @@ -744,16 +744,14 @@ mod tests { let expected_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(a), Arc::new(b)]).unwrap(); - let cursor = InMemoryWriteableCursor::default(); + let mut buffer = vec![]; { - let mut writer = ArrowWriter::try_new(cursor.clone(), schema, None).unwrap(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, None).unwrap(); writer.write(&expected_batch).unwrap(); writer.close().unwrap(); } - let buffer = cursor.into_inner().unwrap(); - let cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); let reader = SerializedFileReader::new(cursor).unwrap(); let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(reader)); diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 820aa7e7a817..5416e4078538 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -1591,7 +1591,7 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), None, @@ -1660,7 +1660,7 @@ mod tests { // write to an empty parquet file so that schema is serialized let file = tempfile::tempfile().unwrap(); - let mut writer = ArrowWriter::try_new( + let writer = ArrowWriter::try_new( file.try_clone().unwrap(), Arc::new(schema.clone()), None, diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs index 7ed7bfc256e6..93a4f00d2eef 100644 --- a/parquet/src/column/mod.rs +++ b/parquet/src/column/mod.rs @@ -40,10 +40,11 @@ //! //! use parquet::{ //! column::{reader::ColumnReader, writer::ColumnWriter}, +//! data_type::Int32Type, //! file::{ //! properties::WriterProperties, //! reader::{FileReader, SerializedFileReader}, -//! writer::{FileWriter, SerializedFileWriter}, +//! writer::SerializedFileWriter, //! }, //! schema::parser::parse_message_type, //! }; @@ -65,20 +66,17 @@ //! let props = Arc::new(WriterProperties::builder().build()); //! let file = fs::File::create(path).unwrap(); //! let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); +//! //! let mut row_group_writer = writer.next_row_group().unwrap(); //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { -//! match col_writer { -//! // You can also use `get_typed_column_writer` method to extract typed writer. -//! ColumnWriter::Int32ColumnWriter(ref mut typed_writer) => { -//! typed_writer -//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1])) -//! .unwrap(); -//! } -//! _ => {} -//! } -//! row_group_writer.close_column(col_writer).unwrap(); +//! col_writer +//! .typed::() +//! .write_batch(&[1, 2, 3], Some(&[3, 3, 3, 2, 2]), Some(&[0, 1, 0, 1, 1])) +//! .unwrap(); +//! col_writer.close().unwrap(); //! } -//! writer.close_row_group(row_group_writer).unwrap(); +//! row_group_writer.close().unwrap(); +//! //! writer.close().unwrap(); //! //! // Reading data using column reader API. diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index a7d0ba8fc810..d80cafe0e0a8 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -39,15 +39,15 @@ use crate::util::bit_util::FromBytes; use crate::util::memory::ByteBufferPtr; /// Column writer for a Parquet type. -pub enum ColumnWriter { - BoolColumnWriter(ColumnWriterImpl), - Int32ColumnWriter(ColumnWriterImpl), - Int64ColumnWriter(ColumnWriterImpl), - Int96ColumnWriter(ColumnWriterImpl), - FloatColumnWriter(ColumnWriterImpl), - DoubleColumnWriter(ColumnWriterImpl), - ByteArrayColumnWriter(ColumnWriterImpl), - FixedLenByteArrayColumnWriter(ColumnWriterImpl), +pub enum ColumnWriter<'a> { + BoolColumnWriter(ColumnWriterImpl<'a, BoolType>), + Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>), + Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>), + Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>), + FloatColumnWriter(ColumnWriterImpl<'a, FloatType>), + DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>), + ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>), + FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>), } pub enum Level { @@ -76,11 +76,11 @@ macro_rules! gen_stats_section { } /// Gets a specific column writer corresponding to column descriptor `descr`. -pub fn get_column_writer( +pub fn get_column_writer<'a>( descr: ColumnDescPtr, props: WriterPropertiesPtr, - page_writer: Box, -) -> ColumnWriter { + page_writer: Box, +) -> ColumnWriter<'a> { match descr.physical_type() { Type::BOOLEAN => ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new( descr, @@ -139,9 +139,9 @@ pub fn get_typed_column_writer( } /// Similar to `get_typed_column_writer` but returns a reference. -pub fn get_typed_column_writer_ref( - col_writer: &ColumnWriter, -) -> &ColumnWriterImpl { +pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>( + col_writer: &'b ColumnWriter<'a>, +) -> &'b ColumnWriterImpl<'a, T> { T::get_column_writer_ref(col_writer).unwrap_or_else(|| { panic!( "Failed to convert column writer into a typed column writer for `{}` type", @@ -151,9 +151,9 @@ pub fn get_typed_column_writer_ref( } /// Similar to `get_typed_column_writer` but returns a reference. -pub fn get_typed_column_writer_mut( - col_writer: &mut ColumnWriter, -) -> &mut ColumnWriterImpl { +pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>( + col_writer: &'a mut ColumnWriter<'b>, +) -> &'a mut ColumnWriterImpl<'b, T> { T::get_column_writer_mut(col_writer).unwrap_or_else(|| { panic!( "Failed to convert column writer into a typed column writer for `{}` type", @@ -163,11 +163,11 @@ pub fn get_typed_column_writer_mut( } /// Typed column writer for a primitive column. -pub struct ColumnWriterImpl { +pub struct ColumnWriterImpl<'a, T: DataType> { // Column writer properties descr: ColumnDescPtr, props: WriterPropertiesPtr, - page_writer: Box, + page_writer: Box, has_dictionary: bool, dict_encoder: Option>, encoder: Box>, @@ -200,11 +200,11 @@ pub struct ColumnWriterImpl { _phantom: PhantomData, } -impl ColumnWriterImpl { +impl<'a, T: DataType> ColumnWriterImpl<'a, T> { pub fn new( descr: ColumnDescPtr, props: WriterPropertiesPtr, - page_writer: Box, + page_writer: Box, ) -> Self { let codec = props.compression(descr.path()); let compressor = create_codec(codec).unwrap(); @@ -1140,15 +1140,13 @@ mod tests { page::PageReader, reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl}, }; + use crate::file::writer::TrackedWrite; use crate::file::{ properties::WriterProperties, reader::SerializedPageReader, writer::SerializedPageWriter, }; use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as SchemaType}; - use crate::util::{ - io::{FileSink, FileSource}, - test_common::random_numbers_range, - }; + use crate::util::{io::FileSource, test_common::random_numbers_range}; use super::*; @@ -1825,9 +1823,9 @@ mod tests { fn test_column_writer_add_data_pages_with_dict() { // ARROW-5129: Test verifies that we add data page in case of dictionary encoding // and no fallback occurred so far. - let file = tempfile::tempfile().unwrap(); - let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let mut file = tempfile::tempfile().unwrap(); + let mut writer = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut writer)); let props = Arc::new( WriterProperties::builder() .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes @@ -2120,9 +2118,9 @@ mod tests { def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, ) { - let file = tempfile::tempfile().unwrap(); - let sink = FileSink::new(&file); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let mut file = tempfile::tempfile().unwrap(); + let mut writer = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut writer)); let max_def_level = match def_levels { Some(buf) => *buf.iter().max().unwrap_or(&0i16), @@ -2257,12 +2255,12 @@ mod tests { } /// Returns column writer. - fn get_test_column_writer( - page_writer: Box, + fn get_test_column_writer<'a, T: DataType>( + page_writer: Box, max_def_level: i16, max_rep_level: i16, props: WriterPropertiesPtr, - ) -> ColumnWriterImpl { + ) -> ColumnWriterImpl<'a, T> { let descr = Arc::new(get_test_column_descr::(max_def_level, max_rep_level)); let column_writer = get_column_writer(descr, props, page_writer); get_typed_column_writer::(column_writer) @@ -2343,7 +2341,7 @@ mod tests { max_def_level: i16, max_rep_level: i16, props: WriterPropertiesPtr, - ) -> ColumnWriterImpl { + ) -> ColumnWriterImpl<'static, T> { let descr = Arc::new(get_test_decimals_column_descr::( max_def_level, max_rep_level, @@ -2386,12 +2384,12 @@ mod tests { } /// Returns column writer for UINT32 Column provided as ConvertedType only - fn get_test_unsigned_int_given_as_converted_column_writer( - page_writer: Box, + fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>( + page_writer: Box, max_def_level: i16, max_rep_level: i16, props: WriterPropertiesPtr, - ) -> ColumnWriterImpl { + ) -> ColumnWriterImpl<'a, T> { let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::( max_def_level, max_rep_level, diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 28645a262546..c01fb153089d 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -1037,19 +1037,21 @@ pub trait DataType: 'static + Send { where Self: Sized; - fn get_column_writer(column_writer: ColumnWriter) -> Option> + fn get_column_writer( + column_writer: ColumnWriter<'_>, + ) -> Option> where Self: Sized; - fn get_column_writer_ref( - column_writer: &ColumnWriter, - ) -> Option<&ColumnWriterImpl> + fn get_column_writer_ref<'a, 'b: 'a>( + column_writer: &'b ColumnWriter<'a>, + ) -> Option<&'b ColumnWriterImpl<'a, Self>> where Self: Sized; - fn get_column_writer_mut( - column_writer: &mut ColumnWriter, - ) -> Option<&mut ColumnWriterImpl> + fn get_column_writer_mut<'a, 'b: 'a>( + column_writer: &'a mut ColumnWriter<'b>, + ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> where Self: Sized; } @@ -1094,26 +1096,26 @@ macro_rules! make_type { } fn get_column_writer( - column_writer: ColumnWriter, - ) -> Option> { + column_writer: ColumnWriter<'_>, + ) -> Option> { match column_writer { ColumnWriter::$writer_ident(w) => Some(w), _ => None, } } - fn get_column_writer_ref( - column_writer: &ColumnWriter, - ) -> Option<&ColumnWriterImpl> { + fn get_column_writer_ref<'a, 'b: 'a>( + column_writer: &'a ColumnWriter<'b>, + ) -> Option<&'a ColumnWriterImpl<'b, Self>> { match column_writer { ColumnWriter::$writer_ident(w) => Some(w), _ => None, } } - fn get_column_writer_mut( - column_writer: &mut ColumnWriter, - ) -> Option<&mut ColumnWriterImpl> { + fn get_column_writer_mut<'a, 'b: 'a>( + column_writer: &'a mut ColumnWriter<'b>, + ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> { match column_writer { ColumnWriter::$writer_ident(w) => Some(w), _ => None, diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs index 78fb7ef11fe3..d293dc7731ad 100644 --- a/parquet/src/file/mod.rs +++ b/parquet/src/file/mod.rs @@ -32,7 +32,7 @@ //! use parquet::{ //! file::{ //! properties::WriterProperties, -//! writer::{FileWriter, SerializedFileWriter}, +//! writer::SerializedFileWriter, //! }, //! schema::parser::parse_message_type, //! }; @@ -51,9 +51,9 @@ //! let mut row_group_writer = writer.next_row_group().unwrap(); //! while let Some(mut col_writer) = row_group_writer.next_column().unwrap() { //! // ... write values to a column writer -//! row_group_writer.close_column(col_writer).unwrap(); +//! col_writer.close().unwrap() //! } -//! writer.close_row_group(row_group_writer).unwrap(); +//! row_group_writer.close().unwrap(); //! writer.close().unwrap(); //! //! let bytes = fs::read(&path).unwrap(); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index c6d0d1066ba9..646550dcb6be 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -18,176 +18,176 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. -use std::{ - io::{Seek, SeekFrom, Write}, - sync::Arc, -}; +use std::{io::Write, sync::Arc}; use byteorder::{ByteOrder, LittleEndian}; use parquet_format as parquet; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol}; use crate::basic::PageType; +use crate::column::writer::{get_typed_column_writer_mut, ColumnWriterImpl}; use crate::column::{ page::{CompressedPage, Page, PageWriteSpec, PageWriter}, writer::{get_column_writer, ColumnWriter}, }; +use crate::data_type::DataType; use crate::errors::{ParquetError, Result}; use crate::file::{ metadata::*, properties::WriterPropertiesPtr, statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC, }; use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr}; -use crate::util::io::{FileSink, Position}; - -// Exposed publically so client code can implement [`ParquetWriter`] -pub use crate::util::io::TryClone; +use crate::util::io::TryClone; -// Exposed publically for convenience of writing Parquet to a buffer of bytes -pub use crate::util::cursor::InMemoryWriteableCursor; +/// A wrapper around a [`Write`] that keeps track of the number +/// of bytes that have been written +pub struct TrackedWrite { + inner: W, + bytes_written: usize, +} -// ---------------------------------------------------------------------- -// APIs for file & row group writers +impl TrackedWrite { + /// Create a new [`TrackedWrite`] from a [`Write`] + pub fn new(inner: W) -> Self { + Self { + inner, + bytes_written: 0, + } + } -/// Parquet file writer API. -/// Provides methods to write row groups sequentially. -/// -/// The main workflow should be as following: -/// - Create file writer, this will open a new file and potentially write some metadata. -/// - Request a new row group writer by calling `next_row_group`. -/// - Once finished writing row group, close row group writer by passing it into -/// `close_row_group` method - this will finalise row group metadata and update metrics. -/// - Write subsequent row groups, if necessary. -/// - After all row groups have been written, close the file writer using `close` method. -pub trait FileWriter { - /// Creates new row group from this file writer. - /// In case of IO error or Thrift error, returns `Err`. - /// - /// There is no limit on a number of row groups in a file; however, row groups have - /// to be written sequentially. Every time the next row group is requested, the - /// previous row group must be finalised and closed using `close_row_group` method. - fn next_row_group(&mut self) -> Result>; + /// Returns the number of bytes written to this instance + pub fn bytes_written(&self) -> usize { + self.bytes_written + } +} - /// Finalises and closes row group that was created using `next_row_group` method. - /// After calling this method, the next row group is available for writes. - fn close_row_group( - &mut self, - row_group_writer: Box, - ) -> Result<()>; +impl Write for TrackedWrite { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let bytes = self.inner.write(buf)?; + self.bytes_written += bytes; + Ok(bytes) + } - /// Closes and finalises file writer, returning the file metadata. - /// - /// All row groups must be appended before this method is called. - /// No writes are allowed after this point. - /// - /// Can be called multiple times. It is up to implementation to either result in - /// no-op, or return an `Err` for subsequent calls. - fn close(&mut self) -> Result; + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } } -/// Parquet row group writer API. -/// Provides methods to access column writers in an iterator-like fashion, order is -/// guaranteed to match the order of schema leaves (column descriptors). +/// Callback invoked on closing a column chunk, arguments are: /// -/// All columns should be written sequentially; the main workflow is: -/// - Request the next column using `next_column` method - this will return `None` if no -/// more columns are available to write. -/// - Once done writing a column, close column writer with `close_column` method - this -/// will finalise column chunk metadata and update row group metrics. -/// - Once all columns have been written, close row group writer with `close` method - -/// it will return row group metadata and is no-op on already closed row group. -pub trait RowGroupWriter { - /// Returns the next column writer, if available; otherwise returns `None`. - /// In case of any IO error or Thrift error, or if row group writer has already been - /// closed returns `Err`. - /// - /// To request the next column writer, the previous one must be finalised and closed - /// using `close_column`. - fn next_column(&mut self) -> Result>; +/// - the number of bytes written +/// - the number of rows written +/// - the column chunk metadata +/// +pub type OnCloseColumnChunk<'a> = + Box Result<()> + 'a>; - /// Closes column writer that was created using `next_column` method. - /// This should be called before requesting the next column writer. - fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>; +/// Callback invoked on closing a row group, arguments are: +/// +/// - the row group metadata +pub type OnCloseRowGroup<'a> = Box Result<()> + 'a>; - /// Closes this row group writer and returns row group metadata. - /// After calling this method row group writer must not be used. - /// - /// It is recommended to call this method before requesting another row group, but it - /// will be closed automatically before returning a new row group. - /// - /// Can be called multiple times. In subsequent calls will result in no-op and return - /// already created row group metadata. - fn close(&mut self) -> Result; -} +#[deprecated = "use std::io::Write"] +pub trait ParquetWriter: Write + std::io::Seek + TryClone {} +#[allow(deprecated)] +impl ParquetWriter for T {} // ---------------------------------------------------------------------- // Serialized impl for file & row group writers -pub trait ParquetWriter: Write + Seek + TryClone {} -impl ParquetWriter for T {} - -/// A serialized implementation for Parquet [`FileWriter`]. -/// See documentation on file writer for more information. -pub struct SerializedFileWriter { - buf: W, +/// Parquet file writer API. +/// Provides methods to write row groups sequentially. +/// +/// The main workflow should be as following: +/// - Create file writer, this will open a new file and potentially write some metadata. +/// - Request a new row group writer by calling `next_row_group`. +/// - Once finished writing row group, close row group writer by calling `close` +/// - Write subsequent row groups, if necessary. +/// - After all row groups have been written, close the file writer using `close` method. +pub struct SerializedFileWriter { + buf: TrackedWrite, schema: TypePtr, descr: SchemaDescPtr, props: WriterPropertiesPtr, - total_num_rows: i64, row_groups: Vec, - previous_writer_closed: bool, - is_closed: bool, + row_group_index: usize, } -impl SerializedFileWriter { +impl SerializedFileWriter { /// Creates new file writer. - pub fn new( - mut buf: W, - schema: TypePtr, - properties: WriterPropertiesPtr, - ) -> Result { + pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result { + let mut buf = TrackedWrite::new(buf); Self::start_file(&mut buf)?; Ok(Self { buf, schema: schema.clone(), descr: Arc::new(SchemaDescriptor::new(schema)), props: properties, - total_num_rows: 0, - row_groups: Vec::new(), - previous_writer_closed: true, - is_closed: false, + row_groups: vec![], + row_group_index: 0, }) } - /// Writes magic bytes at the beginning of the file. - fn start_file(buf: &mut W) -> Result<()> { - buf.write_all(&PARQUET_MAGIC)?; - Ok(()) + /// Creates new row group from this file writer. + /// In case of IO error or Thrift error, returns `Err`. + /// + /// There is no limit on a number of row groups in a file; however, row groups have + /// to be written sequentially. Every time the next row group is requested, the + /// previous row group must be finalised and closed using `RowGroupWriter::close` method. + pub fn next_row_group(&mut self) -> Result> { + self.assert_previous_writer_closed()?; + self.row_group_index += 1; + + let row_groups = &mut self.row_groups; + let on_close = |metadata| { + row_groups.push(metadata); + Ok(()) + }; + + let row_group_writer = SerializedRowGroupWriter::new( + self.descr.clone(), + self.props.clone(), + &mut self.buf, + Some(Box::new(on_close)), + ); + Ok(row_group_writer) + } + + /// Closes and finalises file writer, returning the file metadata. + /// + /// All row groups must be appended before this method is called. + /// No writes are allowed after this point. + /// + /// Can be called multiple times. It is up to implementation to either result in + /// no-op, or return an `Err` for subsequent calls. + pub fn close(mut self) -> Result { + self.assert_previous_writer_closed()?; + let metadata = self.write_metadata()?; + Ok(metadata) } - /// Finalises active row group writer, otherwise no-op. - fn finalise_row_group_writer( - &mut self, - mut row_group_writer: Box, - ) -> Result<()> { - let row_group_metadata = row_group_writer.close()?; - self.total_num_rows += row_group_metadata.num_rows(); - self.row_groups.push(row_group_metadata); + /// Writes magic bytes at the beginning of the file. + fn start_file(buf: &mut TrackedWrite) -> Result<()> { + buf.write_all(&PARQUET_MAGIC)?; Ok(()) } /// Assembles and writes metadata at the end of the file. fn write_metadata(&mut self) -> Result { + let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum(); + + let row_groups = self + .row_groups + .as_slice() + .iter() + .map(|v| v.to_thrift()) + .collect(); + let file_metadata = parquet::FileMetaData { + num_rows, + row_groups, version: self.props.writer_version().as_num(), schema: types::to_thrift(self.schema.as_ref())?, - num_rows: self.total_num_rows as i64, - row_groups: self - .row_groups - .as_slice() - .iter() - .map(|v| v.to_thrift()) - .collect(), key_value_metadata: self.props.key_value_metadata().cloned(), created_by: Some(self.props.created_by().to_owned()), column_orders: None, @@ -196,13 +196,13 @@ impl SerializedFileWriter { }; // Write file metadata - let start_pos = self.buf.seek(SeekFrom::Current(0))?; + let start_pos = self.buf.bytes_written(); { let mut protocol = TCompactOutputProtocol::new(&mut self.buf); file_metadata.write_to_out_protocol(&mut protocol)?; protocol.flush()?; } - let end_pos = self.buf.seek(SeekFrom::Current(0))?; + let end_pos = self.buf.bytes_written(); // Write footer let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; @@ -213,18 +213,9 @@ impl SerializedFileWriter { Ok(file_metadata) } - #[inline] - fn assert_closed(&self) -> Result<()> { - if self.is_closed { - Err(general_err!("File writer is closed")) - } else { - Ok(()) - } - } - #[inline] fn assert_previous_writer_closed(&self) -> Result<()> { - if !self.previous_writer_closed { + if self.row_group_index != self.row_groups.len() { Err(general_err!("Previous row group writer was not closed")) } else { Ok(()) @@ -232,157 +223,107 @@ impl SerializedFileWriter { } } -impl FileWriter for SerializedFileWriter { - #[inline] - fn next_row_group(&mut self) -> Result> { - self.assert_closed()?; - self.assert_previous_writer_closed()?; - let row_group_writer = SerializedRowGroupWriter::new( - self.descr.clone(), - self.props.clone(), - &self.buf, - ); - self.previous_writer_closed = false; - Ok(Box::new(row_group_writer)) - } - - #[inline] - fn close_row_group( - &mut self, - row_group_writer: Box, - ) -> Result<()> { - self.assert_closed()?; - let res = self.finalise_row_group_writer(row_group_writer); - self.previous_writer_closed = res.is_ok(); - res - } - - #[inline] - fn close(&mut self) -> Result { - self.assert_closed()?; - self.assert_previous_writer_closed()?; - let metadata = self.write_metadata()?; - self.is_closed = true; - Ok(metadata) - } -} - -/// A serialized implementation for Parquet [`RowGroupWriter`]. -/// Coordinates writing of a row group with column writers. -/// See documentation on row group writer for more information. -pub struct SerializedRowGroupWriter { +/// Parquet row group writer API. +/// Provides methods to access column writers in an iterator-like fashion, order is +/// guaranteed to match the order of schema leaves (column descriptors). +/// +/// All columns should be written sequentially; the main workflow is: +/// - Request the next column using `next_column` method - this will return `None` if no +/// more columns are available to write. +/// - Once done writing a column, close column writer with `close` +/// - Once all columns have been written, close row group writer with `close` method - +/// it will return row group metadata and is no-op on already closed row group. +pub struct SerializedRowGroupWriter<'a, W: Write> { descr: SchemaDescPtr, props: WriterPropertiesPtr, - buf: W, + buf: &'a mut TrackedWrite, total_rows_written: Option, total_bytes_written: u64, column_index: usize, - previous_writer_closed: bool, row_group_metadata: Option, column_chunks: Vec, + on_close: Option>, } -impl SerializedRowGroupWriter { +impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { + /// Creates a new `SerializedRowGroupWriter` with: + /// + /// - `schema_descr` - the schema to write + /// - `properties` - writer properties + /// - `buf` - the buffer to write data to + /// - `on_close` - an optional callback that will invoked on [`Self::close`] pub fn new( schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, - buf: &W, + buf: &'a mut TrackedWrite, + on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); Self { + buf, + on_close, + total_rows_written: None, descr: schema_descr, props: properties, - buf: buf.try_clone().unwrap(), - total_rows_written: None, - total_bytes_written: 0, column_index: 0, - previous_writer_closed: true, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), + total_bytes_written: 0, } } - /// Checks and finalises current column writer. - fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> { - let (bytes_written, rows_written, metadata) = match writer { - ColumnWriter::BoolColumnWriter(typed) => typed.close()?, - ColumnWriter::Int32ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int64ColumnWriter(typed) => typed.close()?, - ColumnWriter::Int96ColumnWriter(typed) => typed.close()?, - ColumnWriter::FloatColumnWriter(typed) => typed.close()?, - ColumnWriter::DoubleColumnWriter(typed) => typed.close()?, - ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?, - ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?, - }; - - // Update row group writer metrics - self.total_bytes_written += bytes_written; - self.column_chunks.push(metadata); - if let Some(rows) = self.total_rows_written { - if rows != rows_written { - return Err(general_err!( - "Incorrect number of rows, expected {} != {} rows", - rows, - rows_written - )); - } - } else { - self.total_rows_written = Some(rows_written); - } - - Ok(()) - } - - #[inline] - fn assert_closed(&self) -> Result<()> { - if self.row_group_metadata.is_some() { - Err(general_err!("Row group writer is closed")) - } else { - Ok(()) - } - } - - #[inline] - fn assert_previous_writer_closed(&self) -> Result<()> { - if !self.previous_writer_closed { - Err(general_err!("Previous column writer was not closed")) - } else { - Ok(()) - } - } -} - -impl RowGroupWriter for SerializedRowGroupWriter { - #[inline] - fn next_column(&mut self) -> Result> { - self.assert_closed()?; + /// Returns the next column writer, if available; otherwise returns `None`. + /// In case of any IO error or Thrift error, or if row group writer has already been + /// closed returns `Err`. + pub fn next_column(&mut self) -> Result>> { self.assert_previous_writer_closed()?; if self.column_index >= self.descr.num_columns() { return Ok(None); } - let sink = FileSink::new(&self.buf); - let page_writer = Box::new(SerializedPageWriter::new(sink)); + let page_writer = Box::new(SerializedPageWriter::new(self.buf)); let column_writer = get_column_writer( self.descr.column(self.column_index), self.props.clone(), page_writer, ); self.column_index += 1; - self.previous_writer_closed = false; - Ok(Some(column_writer)) - } + let total_bytes_written = &mut self.total_bytes_written; + let total_rows_written = &mut self.total_rows_written; + let column_chunks = &mut self.column_chunks; + + let on_close = |bytes_written, rows_written, metadata| { + // Update row group writer metrics + *total_bytes_written += bytes_written; + column_chunks.push(metadata); + if let Some(rows) = *total_rows_written { + if rows != rows_written { + return Err(general_err!( + "Incorrect number of rows, expected {} != {} rows", + rows, + rows_written + )); + } + } else { + *total_rows_written = Some(rows_written); + } - #[inline] - fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()> { - let res = self.finalise_column_writer(column_writer); - self.previous_writer_closed = res.is_ok(); - res + Ok(()) + }; + + Ok(Some(SerializedColumnWriter::new( + column_writer, + Some(Box::new(on_close)), + ))) } - #[inline] - fn close(&mut self) -> Result { + /// Closes this row group writer and returns row group metadata. + /// After calling this method row group writer must not be used. + /// + /// Can be called multiple times. In subsequent calls will result in no-op and return + /// already created row group metadata. + pub fn close(mut self) -> Result { if self.row_group_metadata.is_none() { self.assert_previous_writer_closed()?; @@ -393,25 +334,86 @@ impl RowGroupWriter for SerializedRowGroupWriter .set_num_rows(self.total_rows_written.unwrap_or(0) as i64) .build()?; - self.row_group_metadata = Some(Arc::new(row_group_metadata)); + let metadata = Arc::new(row_group_metadata); + self.row_group_metadata = Some(metadata.clone()); + + if let Some(on_close) = self.on_close.take() { + on_close(metadata)? + } } let metadata = self.row_group_metadata.as_ref().unwrap().clone(); Ok(metadata) } + + #[inline] + fn assert_previous_writer_closed(&self) -> Result<()> { + if self.column_index != self.column_chunks.len() { + Err(general_err!("Previous column writer was not closed")) + } else { + Ok(()) + } + } +} + +/// A wrapper around a [`ColumnWriter`] that invokes a callback on [`Self::close`] +pub struct SerializedColumnWriter<'a> { + inner: ColumnWriter<'a>, + on_close: Option>, +} + +impl<'a> SerializedColumnWriter<'a> { + /// Create a new [`SerializedColumnWriter`] from a `[`ColumnWriter`] and an + /// optional callback to be invoked on [`Self::close`] + pub fn new( + inner: ColumnWriter<'a>, + on_close: Option>, + ) -> Self { + Self { inner, on_close } + } + + /// Returns a reference to an untyped [`ColumnWriter`] + pub fn untyped(&mut self) -> &mut ColumnWriter<'a> { + &mut self.inner + } + + /// Returns a reference to a typed [`ColumnWriterImpl`] + pub fn typed(&mut self) -> &mut ColumnWriterImpl<'a, T> { + get_typed_column_writer_mut(&mut self.inner) + } + + /// Close this [`SerializedColumnWriter] + pub fn close(mut self) -> Result<()> { + let (bytes_written, rows_written, metadata) = match self.inner { + ColumnWriter::BoolColumnWriter(typed) => typed.close()?, + ColumnWriter::Int32ColumnWriter(typed) => typed.close()?, + ColumnWriter::Int64ColumnWriter(typed) => typed.close()?, + ColumnWriter::Int96ColumnWriter(typed) => typed.close()?, + ColumnWriter::FloatColumnWriter(typed) => typed.close()?, + ColumnWriter::DoubleColumnWriter(typed) => typed.close()?, + ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?, + ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?, + }; + + if let Some(on_close) = self.on_close.take() { + on_close(bytes_written, rows_written, metadata)? + } + + Ok(()) + } } /// A serialized implementation for Parquet [`PageWriter`]. /// Writes and serializes pages and metadata into output stream. /// /// `SerializedPageWriter` should not be used after calling `close()`. -pub struct SerializedPageWriter { - sink: T, +pub struct SerializedPageWriter<'a, W> { + sink: &'a mut TrackedWrite, } -impl SerializedPageWriter { +impl<'a, W: Write> SerializedPageWriter<'a, W> { /// Creates new page writer. - pub fn new(sink: T) -> Self { + pub fn new(sink: &'a mut TrackedWrite) -> Self { Self { sink } } @@ -419,13 +421,13 @@ impl SerializedPageWriter { /// Returns number of bytes that have been written into the sink. #[inline] fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result { - let start_pos = self.sink.pos(); + let start_pos = self.sink.bytes_written(); { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); header.write_to_out_protocol(&mut protocol)?; protocol.flush()?; } - Ok((self.sink.pos() - start_pos) as usize) + Ok(self.sink.bytes_written() - start_pos) } /// Serializes column chunk into Thrift. @@ -439,7 +441,7 @@ impl SerializedPageWriter { } } -impl PageWriter for SerializedPageWriter { +impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { fn write_page(&mut self, page: CompressedPage) -> Result { let uncompressed_size = page.uncompressed_size(); let compressed_size = page.compressed_size(); @@ -506,7 +508,7 @@ impl PageWriter for SerializedPageWriter { } } - let start_pos = self.sink.pos(); + let start_pos = self.sink.bytes_written() as u64; let header_size = self.serialize_page_header(page_header)?; self.sink.write_all(page.data())?; @@ -516,7 +518,7 @@ impl PageWriter for SerializedPageWriter { spec.uncompressed_size = uncompressed_size + header_size; spec.compressed_size = compressed_size + header_size; spec.offset = start_pos; - spec.bytes_written = self.sink.pos() - start_pos; + spec.bytes_written = self.sink.bytes_written() as u64 - start_pos; // Number of values is incremented for data pages only if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 { spec.num_values = num_values; @@ -544,6 +546,7 @@ mod tests { use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type}; use crate::column::page::PageReader; use crate::compression::{create_codec, Codec}; + use crate::data_type::Int32Type; use crate::file::{ properties::{WriterProperties, WriterVersion}, reader::{FileReader, SerializedFileReader, SerializedPageReader}, @@ -552,48 +555,6 @@ mod tests { use crate::record::RowAccessor; use crate::util::memory::ByteBufferPtr; - #[test] - fn test_file_writer_error_after_close() { - let file = tempfile::tempfile().unwrap(); - let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap()); - let props = Arc::new(WriterProperties::builder().build()); - let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - writer.close().unwrap(); - { - let res = writer.next_row_group(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!(format!("{}", err), "Parquet error: File writer is closed"); - } - } - { - let res = writer.close(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!(format!("{}", err), "Parquet error: File writer is closed"); - } - } - } - - #[test] - fn test_row_group_writer_error_after_close() { - let file = tempfile::tempfile().unwrap(); - let schema = Arc::new(types::Type::group_type_builder("schema").build().unwrap()); - let props = Arc::new(WriterProperties::builder().build()); - let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - row_group_writer.close().unwrap(); - - let res = row_group_writer.next_column(); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!( - format!("{}", err), - "Parquet error: Row group writer is closed" - ); - } - } - #[test] fn test_row_group_writer_error_not_all_columns_written() { let file = tempfile::tempfile().unwrap(); @@ -609,7 +570,7 @@ mod tests { ); let props = Arc::new(WriterProperties::builder().build()); let mut writer = SerializedFileWriter::new(file, schema, props).unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); + let row_group_writer = writer.next_row_group().unwrap(); let res = row_group_writer.close(); assert!(res.is_err()); if let Err(err) = res { @@ -647,24 +608,23 @@ mod tests { let mut row_group_writer = writer.next_row_group().unwrap(); let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); - if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer { - typed.write_batch(&[1, 2, 3], None, None).unwrap(); - } - row_group_writer.close_column(col_writer).unwrap(); + col_writer + .typed::() + .write_batch(&[1, 2, 3], None, None) + .unwrap(); + col_writer.close().unwrap(); let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); - if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer { - typed.write_batch(&[1, 2], None, None).unwrap(); - } + col_writer + .typed::() + .write_batch(&[1, 2], None, None) + .unwrap(); - let res = row_group_writer.close_column(col_writer); - assert!(res.is_err()); - if let Err(err) = res { - assert_eq!( - format!("{}", err), - "Parquet error: Incorrect number of rows, expected 3 != 2 rows" - ); - } + let err = col_writer.close().unwrap_err(); + assert_eq!( + err.to_string(), + "Parquet error: Incorrect number of rows, expected 3 != 2 rows" + ); } #[test] @@ -682,7 +642,7 @@ mod tests { .unwrap(), ); let props = Arc::new(WriterProperties::builder().build()); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -712,7 +672,7 @@ mod tests { )])) .build(), ); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -758,7 +718,7 @@ mod tests { .set_writer_version(WriterVersion::PARQUET_2_0) .build(), ); - let mut writer = + let writer = SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap(); writer.close().unwrap(); @@ -971,8 +931,8 @@ mod tests { let mut buffer: Vec = vec![]; let mut result_pages: Vec = vec![]; { - let cursor = Cursor::new(&mut buffer); - let mut page_writer = SerializedPageWriter::new(cursor); + let mut writer = TrackedWrite::new(&mut buffer); + let mut page_writer = SerializedPageWriter::new(&mut writer); for page in compressed_pages { page_writer.write_page(page).unwrap(); @@ -1041,22 +1001,15 @@ mod tests { for subset in &data { let mut row_group_writer = file_writer.next_row_group().unwrap(); - let col_writer = row_group_writer.next_column().unwrap(); - if let Some(mut writer) = col_writer { - match writer { - ColumnWriter::Int32ColumnWriter(ref mut typed) => { - rows += - typed.write_batch(&subset[..], None, None).unwrap() as i64; - } - _ => { - unimplemented!(); - } - } - row_group_writer.close_column(writer).unwrap(); + if let Some(mut writer) = row_group_writer.next_column().unwrap() { + rows += writer + .typed::() + .write_batch(&subset[..], None, None) + .unwrap() as i64; + writer.close().unwrap(); } - file_writer.close_row_group(row_group_writer).unwrap(); + row_group_writer.close().unwrap(); } - file_writer.close().unwrap(); let reader = assert_send(SerializedFileReader::new(file).unwrap()); @@ -1101,7 +1054,7 @@ mod tests { } fn test_bytes_roundtrip(data: Vec>) { - let cursor = InMemoryWriteableCursor::default(); + let mut cursor = Cursor::new(vec![]); let schema = Arc::new( types::Type::group_type_builder("schema") @@ -1119,30 +1072,24 @@ mod tests { { let props = Arc::new(WriterProperties::builder().build()); let mut writer = - SerializedFileWriter::new(cursor.clone(), schema, props).unwrap(); + SerializedFileWriter::new(&mut cursor, schema, props).unwrap(); for subset in &data { let mut row_group_writer = writer.next_row_group().unwrap(); - let col_writer = row_group_writer.next_column().unwrap(); - if let Some(mut writer) = col_writer { - match writer { - ColumnWriter::Int32ColumnWriter(ref mut typed) => { - rows += typed.write_batch(&subset[..], None, None).unwrap() - as i64; - } - _ => { - unimplemented!(); - } - } - row_group_writer.close_column(writer).unwrap(); + if let Some(mut writer) = row_group_writer.next_column().unwrap() { + rows += writer + .typed::() + .write_batch(&subset[..], None, None) + .unwrap() as i64; + + writer.close().unwrap(); } - writer.close_row_group(row_group_writer).unwrap(); + row_group_writer.close().unwrap(); } - writer.close().unwrap(); } - let buffer = cursor.into_inner().unwrap(); + let buffer = cursor.into_inner(); let reading_cursor = crate::file::serialized_reader::SliceableCursor::new(buffer); let reader = SerializedFileReader::new(reading_cursor).unwrap(); diff --git a/parquet/src/record/record_writer.rs b/parquet/src/record/record_writer.rs index 6668eec51494..fe803a7ff4ef 100644 --- a/parquet/src/record/record_writer.rs +++ b/parquet/src/record/record_writer.rs @@ -18,12 +18,12 @@ use crate::schema::types::TypePtr; use super::super::errors::ParquetError; -use super::super::file::writer::RowGroupWriter; +use super::super::file::writer::SerializedRowGroupWriter; pub trait RecordWriter { - fn write_to_row_group( + fn write_to_row_group( &self, - row_group_writer: &mut Box, + row_group_writer: &mut SerializedRowGroupWriter, ) -> Result<(), ParquetError>; /// Generated schema diff --git a/parquet/src/util/cursor.rs b/parquet/src/util/cursor.rs index c03fc66f27f7..ff7067fcbcad 100644 --- a/parquet/src/util/cursor.rs +++ b/parquet/src/util/cursor.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. +use crate::util::io::TryClone; use std::io::{self, Cursor, Error, ErrorKind, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Mutex}; use std::{cmp, fmt}; -use crate::file::writer::TryClone; - /// This is object to use if your file is already in memory. /// The sliceable cursor is similar to std::io::Cursor, except that it makes it easy to create "cursor slices". /// To achieve this, it uses Arc instead of shared references. Indeed reference fields are painful @@ -134,11 +133,13 @@ impl Seek for SliceableCursor { } /// Use this type to write Parquet to memory rather than a file. +#[deprecated = "use Vec instead"] #[derive(Debug, Default, Clone)] pub struct InMemoryWriteableCursor { buffer: Arc>>>, } +#[allow(deprecated)] impl InMemoryWriteableCursor { /// Consume this instance and return the underlying buffer as long as there are no other /// references to this instance. @@ -168,6 +169,7 @@ impl InMemoryWriteableCursor { } } +#[allow(deprecated)] impl TryClone for InMemoryWriteableCursor { fn try_clone(&self) -> std::io::Result { Ok(Self { @@ -176,6 +178,7 @@ impl TryClone for InMemoryWriteableCursor { } } +#[allow(deprecated)] impl Write for InMemoryWriteableCursor { fn write(&mut self, buf: &[u8]) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); @@ -188,6 +191,7 @@ impl Write for InMemoryWriteableCursor { } } +#[allow(deprecated)] impl Seek for InMemoryWriteableCursor { fn seek(&mut self, pos: SeekFrom) -> std::io::Result { let mut inner = self.buffer.lock().unwrap(); diff --git a/parquet/src/util/io.rs b/parquet/src/util/io.rs index c10e6d6161b5..a7b5e73074c6 100644 --- a/parquet/src/util/io.rs +++ b/parquet/src/util/io.rs @@ -17,7 +17,9 @@ use std::{cell::RefCell, cmp, fmt, io::*}; -use crate::file::{reader::Length, writer::ParquetWriter}; +use crate::file::reader::Length; +#[allow(deprecated)] +use crate::file::writer::ParquetWriter; const DEFAULT_BUF_SIZE: usize = 8 * 1024; @@ -156,6 +158,8 @@ impl Length for FileSource { /// Struct that represents `File` output stream with position tracking. /// Used as a sink in file writer. +#[deprecated = "use TrackedWrite instead"] +#[allow(deprecated)] pub struct FileSink { buf: BufWriter, // This is not necessarily position in the underlying file, @@ -163,6 +167,7 @@ pub struct FileSink { pos: u64, } +#[allow(deprecated)] impl FileSink { /// Creates new file sink. /// Position is set to whatever position file has. @@ -176,6 +181,7 @@ impl FileSink { } } +#[allow(deprecated)] impl Write for FileSink { fn write(&mut self, buf: &[u8]) -> Result { let num_bytes = self.buf.write(buf)?; @@ -188,6 +194,7 @@ impl Write for FileSink { } } +#[allow(deprecated)] impl Position for FileSink { fn pos(&self) -> u64 { self.pos @@ -271,6 +278,7 @@ mod tests { } #[test] + #[allow(deprecated)] fn test_io_write_with_pos() { let mut file = tempfile::tempfile().unwrap(); file.write_all(&[b'a', b'b', b'c']).unwrap(); diff --git a/parquet/tests/boolean_writer.rs b/parquet/tests/boolean_writer.rs index b9d757e71a8e..dc2eccfbf3c3 100644 --- a/parquet/tests/boolean_writer.rs +++ b/parquet/tests/boolean_writer.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use parquet::column::writer::ColumnWriter; +use parquet::data_type::BoolType; use parquet::file::properties::WriterProperties; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; -use parquet::file::writer::FileWriter; use parquet::file::writer::SerializedFileWriter; use parquet::schema::parser::parse_message_type; use std::fs; @@ -53,25 +52,15 @@ fn it_writes_data_without_hanging() { while let Some(mut col_writer) = row_group_writer.next_column().expect("next column") { - match col_writer { - ColumnWriter::BoolColumnWriter(ref mut typed_writer) => { - typed_writer - .write_batch(&my_bool_values, None, None) - .expect("writing bool column"); - } - _ => { - panic!("only test boolean values"); - } - } - row_group_writer - .close_column(col_writer) - .expect("close column"); + col_writer + .typed::() + .write_batch(&my_bool_values, None, None) + .expect("writing bool column"); + + col_writer.close().expect("close column"); } let rg_md = row_group_writer.close().expect("close row group"); println!("total rows written: {}", rg_md.num_rows()); - writer - .close_row_group(row_group_writer) - .expect("close row groups"); } writer.close().expect("close writer"); diff --git a/parquet/tests/custom_writer.rs b/parquet/tests/custom_writer.rs deleted file mode 100644 index 0a57e79d9558..000000000000 --- a/parquet/tests/custom_writer.rs +++ /dev/null @@ -1,100 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fs::File; -use std::{ - fs, - io::{prelude::*, SeekFrom}, - sync::Arc, -}; - -use parquet::file::writer::TryClone; -use parquet::{ - basic::Repetition, basic::Type, file::properties::WriterProperties, - file::writer::SerializedFileWriter, schema::types, -}; -use std::env; - -// Test creating some sort of custom writer to ensure the -// appropriate traits are exposed -struct CustomWriter { - file: File, -} - -impl Write for CustomWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.file.write(buf) - } - fn flush(&mut self) -> std::io::Result<()> { - self.file.flush() - } -} - -impl Seek for CustomWriter { - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { - self.file.seek(pos) - } -} - -impl TryClone for CustomWriter { - fn try_clone(&self) -> std::io::Result { - use std::io::{Error, ErrorKind}; - Err(Error::new(ErrorKind::Other, "Clone not supported")) - } -} - -#[test] -fn test_custom_writer() { - let schema = Arc::new( - types::Type::group_type_builder("schema") - .with_fields(&mut vec![Arc::new( - types::Type::primitive_type_builder("col1", Type::INT32) - .with_repetition(Repetition::REQUIRED) - .build() - .unwrap(), - )]) - .build() - .unwrap(), - ); - let props = Arc::new(WriterProperties::builder().build()); - - let file = get_temp_file("test_custom_file_writer"); - let test_file = file.try_clone().unwrap(); - - let writer = CustomWriter { file }; - - // test is that this file can be created - let file_writer = SerializedFileWriter::new(writer, schema, props).unwrap(); - std::mem::drop(file_writer); - - // ensure the file now exists and has non zero size - let metadata = test_file.metadata().unwrap(); - assert!(metadata.len() > 0); -} - -/// Returns file handle for a temp file in 'target' directory with a provided content -fn get_temp_file(file_name: &str) -> fs::File { - // build tmp path to a file in "target/debug/testdata" - let mut path_buf = env::current_dir().unwrap(); - path_buf.push("target"); - path_buf.push("debug"); - path_buf.push("testdata"); - fs::create_dir_all(&path_buf).unwrap(); - path_buf.push(file_name); - - File::create(path_buf).unwrap() -} diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index d5d1e6818d73..fc7af20ca3f1 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -98,9 +98,9 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke (quote! { impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group( + fn write_to_row_group( &self, - row_group_writer: &mut Box + row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W> ) -> Result<(), parquet::errors::ParquetError> { let mut row_group_writer = row_group_writer; let records = &self; // Used by all the writer snippets to be more clear @@ -110,7 +110,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke let mut some_column_writer = row_group_writer.next_column().unwrap(); if let Some(mut column_writer) = some_column_writer { #writer_snippets - row_group_writer.close_column(column_writer)?; + column_writer.close()?; } else { return Err(parquet::errors::ParquetError::General("Failed to get next column".into())) } diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index be2e6efaa16d..835ac793e409 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -147,7 +147,7 @@ impl Field { // this expression just switches between non-nullable and nullable write statements let write_batch_expr = if definition_levels.is_some() { quote! { - if let #column_writer(ref mut typed) = column_writer { + if let #column_writer(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{#ident}) @@ -155,7 +155,7 @@ impl Field { } } else { quote! { - if let #column_writer(ref mut typed) = column_writer { + if let #column_writer(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{#ident}) @@ -666,7 +666,7 @@ mod test { { let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( ); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer { + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , None , None ) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ counter } ) @@ -703,7 +703,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } ) @@ -727,7 +727,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer { + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } ) @@ -750,7 +750,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer { + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } ) @@ -975,7 +975,7 @@ mod test { assert_eq!(when.writer_snippet().to_string(),(quote!{ { let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect(); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) @@ -995,7 +995,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) @@ -1018,7 +1018,7 @@ mod test { assert_eq!(when.writer_snippet().to_string(),(quote!{ { let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) @@ -1038,7 +1038,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) @@ -1061,7 +1061,7 @@ mod test { assert_eq!(when.writer_snippet().to_string(),(quote!{ { let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id }) @@ -1081,7 +1081,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer { + if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id }) diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 2b7c060bbd5f..189802b9a527 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -54,10 +54,7 @@ mod tests { use super::*; use parquet::{ - file::{ - properties::WriterProperties, - writer::{FileWriter, SerializedFileWriter}, - }, + file::{properties::WriterProperties, writer::SerializedFileWriter}, schema::parser::parse_message_type, }; use std::{env, fs, io::Write, sync::Arc}; @@ -133,7 +130,7 @@ mod tests { let mut row_group = writer.next_row_group().unwrap(); drs.as_slice().write_to_row_group(&mut row_group).unwrap(); - writer.close_row_group(row_group).unwrap(); + row_group.close().unwrap(); writer.close().unwrap(); }