Skip to content

Commit

Permalink
Fully qualifying parquet items (#2638)
Browse files Browse the repository at this point in the history
* fully qualifying parquet items

* rustfmt

* remove glob import

* remove unnecessary borrow

* import repeated items

* rustfmt
  • Loading branch information
dingxiangfei2009 authored Sep 6, 2022
1 parent 5976784 commit 31aaef2
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 93 deletions.
30 changes: 15 additions & 15 deletions parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ mod parquet_field;
/// Example:
///
/// ```ignore
/// use parquet;
/// use parquet::record::RecordWriter;
/// use parquet::schema::parser::parse_message_type;
/// use parquet::file::properties::WriterProperties;
/// use parquet::file::writer::SerializedFileWriter;
///
/// use std::sync::Arc;
//
Expand Down Expand Up @@ -97,11 +96,13 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
field_infos.iter().map(|x| x.parquet_type()).collect();

(quote! {
impl #generics RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
fn write_to_row_group<W: std::io::Write>(
impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
fn write_to_row_group<W: ::std::io::Write>(
&self,
row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), parquet::errors::ParquetError> {
row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W>
) -> Result<(), ::parquet::errors::ParquetError> {
use ::parquet::column::writer::ColumnWriter;

let mut row_group_writer = row_group_writer;
let records = &self; // Used by all the writer snippets to be more clear

Expand All @@ -112,25 +113,24 @@ pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::Toke
#writer_snippets
column_writer.close()?;
} else {
return Err(parquet::errors::ParquetError::General("Failed to get next column".into()))
return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))
}
}
);*

Ok(())
}

fn schema(&self) -> Result<parquet::schema::types::TypePtr, parquet::errors::ParquetError> {
use parquet::schema::types::Type as ParquetType;
use parquet::schema::types::TypePtr;
use parquet::basic::LogicalType;
use parquet::basic::*;
fn schema(&self) -> Result<::parquet::schema::types::TypePtr, ::parquet::errors::ParquetError> {
use ::parquet::schema::types::Type as ParquetType;
use ::parquet::schema::types::TypePtr;
use ::parquet::basic::LogicalType;

let mut fields: Vec<TypePtr> = Vec::new();
let mut fields: ::std::vec::Vec<TypePtr> = ::std::vec::Vec::new();
#(
#field_types
);*;
let group = parquet::schema::types::Type::group_type_builder("rust_schema")
let group = ParquetType::group_type_builder("rust_schema")
.with_fields(&mut fields)
.build()?;
Ok(group.into())
Expand Down
130 changes: 59 additions & 71 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,28 +181,28 @@ impl Field {
let field_name = &self.ident.to_string();
let physical_type = match self.ty.physical_type() {
parquet::basic::Type::BOOLEAN => quote! {
parquet::basic::Type::BOOLEAN
::parquet::basic::Type::BOOLEAN
},
parquet::basic::Type::INT32 => quote! {
parquet::basic::Type::INT32
::parquet::basic::Type::INT32
},
parquet::basic::Type::INT64 => quote! {
parquet::basic::Type::INT64
::parquet::basic::Type::INT64
},
parquet::basic::Type::INT96 => quote! {
parquet::basic::Type::INT96
::parquet::basic::Type::INT96
},
parquet::basic::Type::FLOAT => quote! {
parquet::basic::Type::FLOAT
::parquet::basic::Type::FLOAT
},
parquet::basic::Type::DOUBLE => quote! {
parquet::basic::Type::DOUBLE
::parquet::basic::Type::DOUBLE
},
parquet::basic::Type::BYTE_ARRAY => quote! {
parquet::basic::Type::BYTE_ARRAY
::parquet::basic::Type::BYTE_ARRAY
},
parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
},
};
let logical_type = self.ty.logical_type();
Expand Down Expand Up @@ -250,7 +250,7 @@ impl Field {
let some = if is_a_timestamp {
quote! { Some(inner.timestamp_millis()) }
} else if is_a_date {
quote! { Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) }
quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) }
} else if is_a_uuid {
quote! { Some((&inner.to_string()[..]).into()) }
} else if is_a_byte_buf {
Expand Down Expand Up @@ -286,7 +286,7 @@ impl Field {
let access = if is_a_timestamp {
quote! { rec.#field_name.timestamp_millis() }
} else if is_a_date {
quote! { rec.#field_name.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
} else if is_a_uuid {
quote! { (&rec.#field_name.to_string()[..]).into() }
} else if is_a_byte_buf {
Expand Down Expand Up @@ -336,29 +336,19 @@ impl Type {

match self.physical_type() {
BasicType::BOOLEAN => {
syn::parse_quote!(parquet::column::writer::ColumnWriter::BoolColumnWriter)
syn::parse_quote!(ColumnWriter::BoolColumnWriter)
}
BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
BasicType::BYTE_ARRAY => {
syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
}
BasicType::FIXED_LEN_BYTE_ARRAY => {
syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
}
BasicType::INT32 => syn::parse_quote!(
parquet::column::writer::ColumnWriter::Int32ColumnWriter
),
BasicType::INT64 => syn::parse_quote!(
parquet::column::writer::ColumnWriter::Int64ColumnWriter
),
BasicType::INT96 => syn::parse_quote!(
parquet::column::writer::ColumnWriter::Int96ColumnWriter
),
BasicType::FLOAT => syn::parse_quote!(
parquet::column::writer::ColumnWriter::FloatColumnWriter
),
BasicType::DOUBLE => syn::parse_quote!(
parquet::column::writer::ColumnWriter::DoubleColumnWriter
),
BasicType::BYTE_ARRAY => syn::parse_quote!(
parquet::column::writer::ColumnWriter::ByteArrayColumnWriter
),
BasicType::FIXED_LEN_BYTE_ARRAY => syn::parse_quote!(
parquet::column::writer::ColumnWriter::FixedLenByteArrayColumnWriter
),
}
}

Expand Down Expand Up @@ -557,16 +547,18 @@ impl Type {
let last_part = self.last_part();

match last_part.trim() {
"NaiveDateTime" => Some(quote! { ConvertedType::TIMESTAMP_MILLIS }),
"NaiveDateTime" => {
Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS })
}
_ => None,
}
}

fn repetition(&self) -> proc_macro2::TokenStream {
match &self {
Type::Option(_) => quote! { Repetition::OPTIONAL },
match self {
Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
Type::Reference(_, ty) => ty.repetition(),
_ => quote! { Repetition::REQUIRED },
_ => quote! { ::parquet::basic::Repetition::REQUIRED },
}
}

Expand Down Expand Up @@ -666,7 +658,7 @@ mod test {
{
let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );

if let parquet::column::writer::ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() {
if let ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , None , None ) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
Expand Down Expand Up @@ -703,7 +695,7 @@ mod test {
}
}).collect();

if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
Expand All @@ -727,7 +719,7 @@ mod test {
}
}).collect();

if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
Expand All @@ -750,7 +742,7 @@ mod test {
}
}).collect();

if let parquet::column::writer::ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() {
if let ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() {
typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
} else {
panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
Expand Down Expand Up @@ -779,12 +771,8 @@ mod test {
assert_eq!(
column_writers,
vec![
syn::parse_quote!(
parquet::column::writer::ColumnWriter::BoolColumnWriter
),
syn::parse_quote!(
parquet::column::writer::ColumnWriter::ByteArrayColumnWriter
)
syn::parse_quote!(ColumnWriter::BoolColumnWriter),
syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
]
);
}
Expand Down Expand Up @@ -833,9 +821,9 @@ mod test {
let snippet: proc_macro2::TokenStream = quote! {
struct LotsOfInnerTypes {
a_vec: Vec<u8>,
a_option: std::option::Option<bool>,
a_silly_string: std::string::String,
a_complicated_thing: std::option::Option<std::result::Result<(),()>>,
a_option: ::std::option::Option<bool>,
a_silly_string: ::std::string::String,
a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
}
};

Expand All @@ -855,8 +843,8 @@ mod test {
vec![
"u8",
"bool",
"std :: string :: String",
"std :: result :: Result < () , () >"
":: std :: string :: String",
":: std :: result :: Result < () , () >"
]
)
}
Expand All @@ -866,13 +854,13 @@ mod test {
use parquet::basic::Type as BasicType;
let snippet: proc_macro2::TokenStream = quote! {
struct LotsOfInnerTypes {
a_buf: Vec<u8>,
a_buf: ::std::vec::Vec<u8>,
a_number: i32,
a_verbose_option: std::option::Option<bool>,
a_silly_string: std::string::String,
a_verbose_option: ::std::option::Option<bool>,
a_silly_string: String,
a_fix_byte_buf: [u8; 10],
a_complex_option: Option<&Vec<u8>>,
a_complex_vec: &Vec<&Option<u8>>,
a_complex_option: ::std::option::Option<&Vec<u8>>,
a_complex_vec: &::std::vec::Vec<&Option<u8>>,
}
};

Expand Down Expand Up @@ -901,10 +889,10 @@ mod test {
fn test_convert_comprehensive_owned_struct() {
let snippet: proc_macro2::TokenStream = quote! {
struct VecHolder {
a_vec: Vec<u8>,
a_option: std::option::Option<bool>,
a_silly_string: std::string::String,
a_complicated_thing: std::option::Option<std::result::Result<(),()>>,
a_vec: ::std::vec::Vec<u8>,
a_option: ::std::option::Option<bool>,
a_silly_string: ::std::string::String,
a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
}
};

Expand All @@ -916,9 +904,9 @@ mod test {
vec![
Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
Type::TypePath(syn::parse_quote!(std::string::String)),
Type::TypePath(syn::parse_quote!(::std::string::String)),
Type::Option(Box::new(Type::TypePath(
syn::parse_quote!(std::result::Result<(),()>)
syn::parse_quote!(::std::result::Result<(),()>)
))),
]
);
Expand Down Expand Up @@ -975,7 +963,7 @@ mod test {
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
Expand All @@ -995,7 +983,7 @@ mod test {
}
}).collect();

if let parquet::column::writer::ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
Expand All @@ -1017,8 +1005,8 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
Expand All @@ -1032,13 +1020,13 @@ mod test {
let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
let vals : Vec<_> = records.iter().filter_map(|rec| {
if let Some(inner) = rec.maybe_happened {
Some(inner.signed_duration_since(chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
} else {
None
}
}).collect();

if let parquet::column::writer::ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
Expand All @@ -1061,7 +1049,7 @@ mod test {
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect();
if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
Expand All @@ -1081,7 +1069,7 @@ mod test {
}
}).collect();

if let parquet::column::writer::ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
Expand All @@ -1105,7 +1093,7 @@ mod test {
let converted_type = time.ty.converted_type();
assert_eq!(
converted_type.unwrap().to_string(),
quote! { ConvertedType::TIMESTAMP_MILLIS }.to_string()
quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
);
}
}
Loading

0 comments on commit 31aaef2

Please sign in to comment.