diff --git a/parquet_derive/src/lib.rs b/parquet_derive/src/lib.rs index fc7af20ca3f1..6525513cbaa1 100644 --- a/parquet_derive/src/lib.rs +++ b/parquet_derive/src/lib.rs @@ -40,9 +40,8 @@ mod parquet_field; /// Example: /// /// ```ignore -/// use parquet; -/// use parquet::record::RecordWriter; -/// use parquet::schema::parser::parse_message_type; +/// use parquet::file::properties::WriterProperties; +/// use parquet::file::writer::SerializedFileWriter; /// /// use std::sync::Arc; // @@ -97,11 +96,13 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke field_infos.iter().map(|x| x.parquet_type()).collect(); (quote! { - impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] { - fn write_to_row_group( + impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] { + fn write_to_row_group( &self, - row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W> - ) -> Result<(), parquet::errors::ParquetError> { + row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W> + ) -> Result<(), ::parquet::errors::ParquetError> { + use ::parquet::column::writer::ColumnWriter; + let mut row_group_writer = row_group_writer; let records = &self; // Used by all the writer snippets to be more clear @@ -112,7 +113,7 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke #writer_snippets column_writer.close()?; } else { - return Err(parquet::errors::ParquetError::General("Failed to get next column".into())) + return Err(::parquet::errors::ParquetError::General("Failed to get next column".into())) } } );* @@ -120,17 +121,16 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke Ok(()) } - fn schema(&self) -> Result { - use parquet::schema::types::Type as ParquetType; - use parquet::schema::types::TypePtr; - use parquet::basic::LogicalType; - use parquet::basic::*; + fn schema(&self) -> Result<::parquet::schema::types::TypePtr, ::parquet::errors::ParquetError> { + use ::parquet::schema::types::Type as ParquetType; + use ::parquet::schema::types::TypePtr; + use ::parquet::basic::LogicalType; - let mut fields: Vec = Vec::new(); + let mut fields: ::std::vec::Vec = ::std::vec::Vec::new(); #( #field_types );*; - let group = parquet::schema::types::Type::group_type_builder("rust_schema") + let group = ParquetType::group_type_builder("rust_schema") .with_fields(&mut fields) .build()?; Ok(group.into()) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index 835ac793e409..0642e23327f7 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -181,28 +181,28 @@ impl Field { let field_name = &self.ident.to_string(); let physical_type = match self.ty.physical_type() { parquet::basic::Type::BOOLEAN => quote! { - parquet::basic::Type::BOOLEAN + ::parquet::basic::Type::BOOLEAN }, parquet::basic::Type::INT32 => quote! { - parquet::basic::Type::INT32 + ::parquet::basic::Type::INT32 }, parquet::basic::Type::INT64 => quote! { - parquet::basic::Type::INT64 + ::parquet::basic::Type::INT64 }, parquet::basic::Type::INT96 => quote! { - parquet::basic::Type::INT96 + ::parquet::basic::Type::INT96 }, parquet::basic::Type::FLOAT => quote! { - parquet::basic::Type::FLOAT + ::parquet::basic::Type::FLOAT }, parquet::basic::Type::DOUBLE => quote! { - parquet::basic::Type::DOUBLE + ::parquet::basic::Type::DOUBLE }, parquet::basic::Type::BYTE_ARRAY => quote! { - parquet::basic::Type::BYTE_ARRAY + ::parquet::basic::Type::BYTE_ARRAY }, parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! { - parquet::basic::Type::FIXED_LEN_BYTE_ARRAY + ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY }, }; let logical_type = self.ty.logical_type(); @@ -250,7 +250,7 @@ impl Field { let some = if is_a_timestamp { quote! { Some(inner.timestamp_millis()) } } else if is_a_date { - quote! { Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) } + quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) } } else if is_a_uuid { quote! { Some((&inner.to_string()[..]).into()) } } else if is_a_byte_buf { @@ -286,7 +286,7 @@ impl Field { let access = if is_a_timestamp { quote! { rec.#field_name.timestamp_millis() } } else if is_a_date { - quote! { rec.#field_name.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } + quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 } } else if is_a_uuid { quote! { (&rec.#field_name.to_string()[..]).into() } } else if is_a_byte_buf { @@ -336,29 +336,19 @@ impl Type { match self.physical_type() { BasicType::BOOLEAN => { - syn::parse_quote!(parquet::column::writer::ColumnWriter::BoolColumnWriter) + syn::parse_quote!(ColumnWriter::BoolColumnWriter) + } + BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter), + BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter), + BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter), + BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter), + BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter), + BasicType::BYTE_ARRAY => { + syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter) + } + BasicType::FIXED_LEN_BYTE_ARRAY => { + syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter) } - BasicType::INT32 => syn::parse_quote!( - parquet::column::writer::ColumnWriter::Int32ColumnWriter - ), - BasicType::INT64 => syn::parse_quote!( - parquet::column::writer::ColumnWriter::Int64ColumnWriter - ), - BasicType::INT96 => syn::parse_quote!( - parquet::column::writer::ColumnWriter::Int96ColumnWriter - ), - BasicType::FLOAT => syn::parse_quote!( - parquet::column::writer::ColumnWriter::FloatColumnWriter - ), - BasicType::DOUBLE => syn::parse_quote!( - parquet::column::writer::ColumnWriter::DoubleColumnWriter - ), - BasicType::BYTE_ARRAY => syn::parse_quote!( - parquet::column::writer::ColumnWriter::ByteArrayColumnWriter - ), - BasicType::FIXED_LEN_BYTE_ARRAY => syn::parse_quote!( - parquet::column::writer::ColumnWriter::FixedLenByteArrayColumnWriter - ), } } @@ -557,16 +547,18 @@ impl Type { let last_part = self.last_part(); match last_part.trim() { - "NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }), + "NaiveDateTime" => { + Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }) + } _ => None, } } fn repetition(&self) -> proc_macro2::TokenStream { - match &self { - Type::Option(_) => quote! { Repetition::OPTIONAL }, + match self { + Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL }, Type::Reference(_, ty) => ty.repetition(), - _ => quote! { Repetition::REQUIRED }, + _ => quote! { ::parquet::basic::Repetition::REQUIRED }, } } @@ -666,7 +658,7 @@ mod test { { let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( ); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() { + if let ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , None , None ) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ counter } ) @@ -703,7 +695,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { + if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } ) @@ -727,7 +719,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { + if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } ) @@ -750,7 +742,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() { + if let ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() { typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ; } else { panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } ) @@ -779,12 +771,8 @@ mod test { assert_eq!( column_writers, vec![ - syn::parse_quote!( - parquet::column::writer::ColumnWriter::BoolColumnWriter - ), - syn::parse_quote!( - parquet::column::writer::ColumnWriter::ByteArrayColumnWriter - ) + syn::parse_quote!(ColumnWriter::BoolColumnWriter), + syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter) ] ); } @@ -833,9 +821,9 @@ mod test { let snippet: proc_macro2::TokenStream = quote! { struct LotsOfInnerTypes { a_vec: Vec, - a_option: std::option::Option, - a_silly_string: std::string::String, - a_complicated_thing: std::option::Option>, + a_option: ::std::option::Option, + a_silly_string: ::std::string::String, + a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>, } }; @@ -855,8 +843,8 @@ mod test { vec![ "u8", "bool", - "std :: string :: String", - "std :: result :: Result < () , () >" + ":: std :: string :: String", + ":: std :: result :: Result < () , () >" ] ) } @@ -866,13 +854,13 @@ mod test { use parquet::basic::Type as BasicType; let snippet: proc_macro2::TokenStream = quote! { struct LotsOfInnerTypes { - a_buf: Vec, + a_buf: ::std::vec::Vec, a_number: i32, - a_verbose_option: std::option::Option, - a_silly_string: std::string::String, + a_verbose_option: ::std::option::Option, + a_silly_string: String, a_fix_byte_buf: [u8; 10], - a_complex_option: Option<&Vec>, - a_complex_vec: &Vec<&Option>, + a_complex_option: ::std::option::Option<&Vec>, + a_complex_vec: &::std::vec::Vec<&Option>, } }; @@ -901,10 +889,10 @@ mod test { fn test_convert_comprehensive_owned_struct() { let snippet: proc_macro2::TokenStream = quote! { struct VecHolder { - a_vec: Vec, - a_option: std::option::Option, - a_silly_string: std::string::String, - a_complicated_thing: std::option::Option>, + a_vec: ::std::vec::Vec, + a_option: ::std::option::Option, + a_silly_string: ::std::string::String, + a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>, } }; @@ -916,9 +904,9 @@ mod test { vec![ Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))), Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))), - Type::TypePath(syn::parse_quote!(std::string::String)), + Type::TypePath(syn::parse_quote!(::std::string::String)), Type::Option(Box::new(Type::TypePath( - syn::parse_quote!(std::result::Result<(),()>) + syn::parse_quote!(::std::result::Result<(),()>) ))), ] ); @@ -975,7 +963,7 @@ mod test { assert_eq!(when.writer_snippet().to_string(),(quote!{ { let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect(); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { + if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) @@ -995,7 +983,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { + if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) @@ -1017,8 +1005,8 @@ mod test { let when = Field::from(&fields[0]); assert_eq!(when.writer_snippet().to_string(),(quote!{ { - let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { + let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect(); + if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth }) @@ -1032,13 +1020,13 @@ mod test { let definition_levels : Vec = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect(); let vals : Vec<_> = records.iter().filter_map(|rec| { if let Some(inner) = rec.maybe_happened { - Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) + Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) } else { None } }).collect(); - if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { + if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened }) @@ -1061,7 +1049,7 @@ mod test { assert_eq!(when.writer_snippet().to_string(),(quote!{ { let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { + if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], None, None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id }) @@ -1081,7 +1069,7 @@ mod test { } }).collect(); - if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { + if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() { typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?; } else { panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id }) @@ -1105,7 +1093,7 @@ mod test { let converted_type = time.ty.converted_type(); assert_eq!( converted_type.unwrap().to_string(), - quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string() + quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string() ); } } diff --git a/parquet_derive_test/src/lib.rs b/parquet_derive_test/src/lib.rs index 189802b9a527..746644793ff2 100644 --- a/parquet_derive_test/src/lib.rs +++ b/parquet_derive_test/src/lib.rs @@ -17,12 +17,7 @@ #![allow(clippy::approx_constant)] -extern crate parquet; - -#[macro_use] -extern crate parquet_derive; - -use parquet::record::RecordWriter; +use parquet_derive::ParquetRecordWriter; #[derive(ParquetRecordWriter)] struct ACompleteRecord<'a> { @@ -53,11 +48,13 @@ struct ACompleteRecord<'a> { mod tests { use super::*; + use std::{env, fs, io::Write, sync::Arc}; + use parquet::{ file::{properties::WriterProperties, writer::SerializedFileWriter}, + record::RecordWriter, schema::parser::parse_message_type, }; - use std::{env, fs, io::Write, sync::Arc}; #[test] fn test_parquet_derive_hello() {