From 68a281254a1d5ab03a7a37095656ac0aba8e99b5 Mon Sep 17 00:00:00 2001 From: Simon Lin Date: Thu, 5 Sep 2024 19:11:54 +1000 Subject: [PATCH] c --- Cargo.lock | 1 + crates/polars-arrow/Cargo.toml | 3 +- crates/polars-arrow/src/datatypes/field.rs | 6 ++ crates/polars-arrow/src/datatypes/schema.rs | 64 +-------------- .../src/io/avro/read/deserialize.rs | 6 +- crates/polars-arrow/src/io/avro/read/mod.rs | 8 +- .../polars-arrow/src/io/avro/read/schema.rs | 11 +-- .../polars-arrow/src/io/avro/write/schema.rs | 3 +- crates/polars-arrow/src/io/flight/mod.rs | 10 +-- crates/polars-arrow/src/io/ipc/read/common.rs | 38 ++++++--- crates/polars-arrow/src/io/ipc/read/file.rs | 4 +- .../src/io/ipc/read/file_async.rs | 26 +++--- crates/polars-arrow/src/io/ipc/read/reader.rs | 3 +- crates/polars-arrow/src/io/ipc/read/schema.rs | 15 ++-- crates/polars-arrow/src/io/ipc/read/stream.rs | 7 +- .../src/io/ipc/read/stream_async.rs | 11 +-- .../src/io/ipc/write/file_async.rs | 2 +- crates/polars-arrow/src/io/ipc/write/mod.rs | 3 +- .../polars-arrow/src/io/ipc/write/schema.rs | 3 +- .../polars-arrow/src/io/ipc/write/stream.rs | 2 +- .../src/io/ipc/write/stream_async.rs | 2 +- .../polars-arrow/src/io/ipc/write/writer.rs | 2 +- crates/polars-arrow/src/mmap/mod.rs | 14 ++-- crates/polars-core/src/frame/chunks.rs | 6 +- crates/polars-core/src/frame/mod.rs | 3 +- crates/polars-core/src/frame/row/dataframe.rs | 6 +- crates/polars-core/src/frame/row/mod.rs | 2 +- crates/polars-core/src/schema.rs | 29 +++---- crates/polars-io/src/avro/read.rs | 5 +- .../polars-io/src/parquet/read/async_impl.rs | 2 +- .../polars-io/src/parquet/read/predicates.rs | 3 +- .../polars-io/src/parquet/read/read_impl.rs | 23 +++--- crates/polars-io/src/parquet/read/reader.rs | 16 ++-- crates/polars-io/src/parquet/write/writer.rs | 3 +- crates/polars-io/src/shared.rs | 29 ++++--- crates/polars-io/src/utils/other.rs | 13 ++- crates/polars-json/src/json/write/mod.rs | 2 +- .../physical_plan/streaming/convert_alp.rs | 2 +- crates/polars-mem-engine/src/planner/lp.rs | 2 +- .../src/arrow/read/schema/convert.rs | 20 ++++- .../src/arrow/read/schema/metadata.rs | 2 +- .../src/arrow/read/schema/mod.rs | 3 +- crates/polars-parquet/src/arrow/write/mod.rs | 3 +- .../src/arrow/write/row_group.rs | 2 +- .../polars-parquet/src/arrow/write/schema.rs | 15 ++-- crates/polars-parquet/src/arrow/write/sink.rs | 4 +- .../sinks/group_by/generic/hash_table.rs | 2 +- .../executors/sinks/group_by/generic/sink.rs | 2 +- .../src/plans/conversion/convert_utils.rs | 2 +- .../src/dataframe/construction.rs | 6 +- crates/polars-python/src/functions/io.rs | 11 +-- .../polars-python/src/interop/arrow/to_py.rs | 2 +- crates/polars-python/src/lazyframe/general.rs | 2 +- crates/polars-schema/src/lib.rs | 1 + crates/polars-schema/src/schema.rs | 80 +++++++++++++++---- .../polars-stream/src/nodes/parquet_source.rs | 13 +-- .../src/physical_plan/lower_expr.rs | 2 +- crates/polars/tests/it/arrow/io/ipc/mod.rs | 9 ++- crates/polars/tests/it/io/avro/read.rs | 26 +++--- crates/polars/tests/it/io/avro/write.rs | 8 +- .../polars/tests/it/io/parquet/arrow/mod.rs | 16 ++-- .../polars/tests/it/io/parquet/arrow/read.rs | 12 ++- .../polars/tests/it/io/parquet/arrow/write.rs | 2 +- .../polars/tests/it/io/parquet/read/file.rs | 4 +- .../tests/it/io/parquet/read/row_group.rs | 7 +- .../polars/tests/it/io/parquet/roundtrip.rs | 2 +- 66 files changed, 331 insertions(+), 317 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dff964bd90f7..0a88b89e2edb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3018,6 +3018,7 @@ dependencies = [ "parking_lot", "polars-arrow-format", "polars-error", + "polars-schema", "polars-utils", "proptest", "rand", diff --git a/crates/polars-arrow/Cargo.toml b/crates/polars-arrow/Cargo.toml index d88d966cf278..4339a8ec48c0 100644 --- a/crates/polars-arrow/Cargo.toml +++ b/crates/polars-arrow/Cargo.toml @@ -24,6 +24,7 @@ hashbrown = { workspace = true } num-traits = { workspace = true } parking_lot = { workspace = true } polars-error = { workspace = true } +polars-schema = { workspace = true } polars-utils = { workspace = true } serde = { workspace = true, optional = true } simdutf8 = { workspace = true } @@ -153,7 +154,7 @@ compute = [ "compute_take", "compute_temporal", ] -serde = ["dep:serde"] +serde = ["dep:serde", "polars-schema/serde"] simd = [] # polars-arrow diff --git a/crates/polars-arrow/src/datatypes/field.rs b/crates/polars-arrow/src/datatypes/field.rs index f0548ae2b3ce..a06c6819ee40 100644 --- a/crates/polars-arrow/src/datatypes/field.rs +++ b/crates/polars-arrow/src/datatypes/field.rs @@ -25,6 +25,12 @@ pub struct Field { pub metadata: Metadata, } +impl From for (PlSmallStr, Field) { + fn from(value: Field) -> Self { + (value.name.clone(), value) + } +} + impl Field { /// Creates a new [`Field`]. pub fn new(name: PlSmallStr, data_type: ArrowDataType, is_nullable: bool) -> Self { diff --git a/crates/polars-arrow/src/datatypes/schema.rs b/crates/polars-arrow/src/datatypes/schema.rs index d5a273274029..02920204b4dc 100644 --- a/crates/polars-arrow/src/datatypes/schema.rs +++ b/crates/polars-arrow/src/datatypes/schema.rs @@ -1,9 +1,5 @@ use std::sync::Arc; -use polars_error::{polars_bail, PolarsResult}; -#[cfg(feature = "serde")] -use serde::{Deserialize, Serialize}; - use super::Field; /// An ordered sequence of [`Field`]s @@ -11,63 +7,5 @@ use super::Field; /// [`ArrowSchema`] is an abstraction used to read from, and write to, Arrow IPC format, /// Apache Parquet, and Apache Avro. All these formats have a concept of a schema /// with fields and metadata. -#[derive(Debug, Clone, PartialEq, Eq, Default)] -#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -pub struct ArrowSchema { - /// The fields composing this schema. - pub fields: Vec, -} - +pub type ArrowSchema = polars_schema::Schema; pub type ArrowSchemaRef = Arc; - -impl ArrowSchema { - #[inline] - pub fn len(&self) -> usize { - self.fields.len() - } - - #[inline] - pub fn is_empty(&self) -> bool { - self.fields.is_empty() - } - - /// Returns a new [`ArrowSchema`] with a subset of all fields whose `predicate` - /// evaluates to true. - pub fn filter bool>(self, predicate: F) -> Self { - let fields = self - .fields - .into_iter() - .enumerate() - .filter_map(|(index, f)| { - if (predicate)(index, &f) { - Some(f) - } else { - None - } - }) - .collect(); - - ArrowSchema { fields } - } - - pub fn try_project(&self, indices: &[usize]) -> PolarsResult { - let fields = indices.iter().map(|&i| { - let Some(out) = self.fields.get(i) else { - polars_bail!( - SchemaFieldNotFound: "projection index {} is out of bounds for schema of length {}", - i, self.fields.len() - ); - }; - - Ok(out.clone()) - }).collect::>>()?; - - Ok(ArrowSchema { fields }) - } -} - -impl From> for ArrowSchema { - fn from(fields: Vec) -> Self { - Self { fields } - } -} diff --git a/crates/polars-arrow/src/io/avro/read/deserialize.rs b/crates/polars-arrow/src/io/avro/read/deserialize.rs index 36297a37621d..c5d4b55b3f26 100644 --- a/crates/polars-arrow/src/io/avro/read/deserialize.rs +++ b/crates/polars-arrow/src/io/avro/read/deserialize.rs @@ -467,7 +467,7 @@ fn skip_item<'a>( /// `fields`, `avro_fields` and `projection` must have the same length. pub fn deserialize( block: &Block, - fields: &[Field], + fields: &ArrowSchema, avro_fields: &[AvroField], projection: &[bool], ) -> PolarsResult>> { @@ -479,7 +479,7 @@ pub fn deserialize( // create mutables, one per field let mut arrays: Vec> = fields - .iter() + .iter_values() .zip(avro_fields.iter()) .zip(projection.iter()) .map(|((field, avro_field), projection)| { @@ -496,7 +496,7 @@ pub fn deserialize( for _ in 0..rows { let iter = arrays .iter_mut() - .zip(fields.iter()) + .zip(fields.iter_values()) .zip(avro_fields.iter()) .zip(projection.iter()); diff --git a/crates/polars-arrow/src/io/avro/read/mod.rs b/crates/polars-arrow/src/io/avro/read/mod.rs index 701bf8a39579..fb321d5c8e6e 100644 --- a/crates/polars-arrow/src/io/avro/read/mod.rs +++ b/crates/polars-arrow/src/io/avro/read/mod.rs @@ -17,14 +17,14 @@ mod util; pub use schema::infer_schema; use crate::array::Array; -use crate::datatypes::Field; +use crate::datatypes::ArrowSchema; use crate::record_batch::RecordBatchT; /// Single threaded, blocking reader of Avro; [`Iterator`] of [`RecordBatchT`]. pub struct Reader { iter: BlockStreamingIterator, avro_fields: Vec, - fields: Vec, + fields: ArrowSchema, projection: Vec, } @@ -33,7 +33,7 @@ impl Reader { pub fn new( reader: R, metadata: FileMetadata, - fields: Vec, + fields: ArrowSchema, projection: Option>, ) -> Self { let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect()); @@ -56,7 +56,7 @@ impl Iterator for Reader { type Item = PolarsResult>>; fn next(&mut self) -> Option { - let fields = &self.fields[..]; + let fields = &self.fields; let avro_fields = &self.avro_fields; let projection = &self.projection; diff --git a/crates/polars-arrow/src/io/avro/read/schema.rs b/crates/polars-arrow/src/io/avro/read/schema.rs index 1538abbeddab..854362aa0c2f 100644 --- a/crates/polars-arrow/src/io/avro/read/schema.rs +++ b/crates/polars-arrow/src/io/avro/read/schema.rs @@ -26,18 +26,19 @@ fn external_props(schema: &AvroSchema) -> Metadata { /// Infers an [`ArrowSchema`] from the root [`Record`]. /// This pub fn infer_schema(record: &Record) -> PolarsResult { - Ok(record + record .fields .iter() .map(|field| { - schema_to_field( + let field = schema_to_field( &field.schema, Some(&field.name), external_props(&field.schema), - ) + )?; + + Ok((field.name.clone(), field)) }) - .collect::>>()? - .into()) + .collect::>() } fn schema_to_field( diff --git a/crates/polars-arrow/src/io/avro/write/schema.rs b/crates/polars-arrow/src/io/avro/write/schema.rs index 03e28c6d2acc..f7845d624881 100644 --- a/crates/polars-arrow/src/io/avro/write/schema.rs +++ b/crates/polars-arrow/src/io/avro/write/schema.rs @@ -10,8 +10,7 @@ use crate::datatypes::*; pub fn to_record(schema: &ArrowSchema, name: String) -> PolarsResult { let mut name_counter: i32 = 0; let fields = schema - .fields - .iter() + .iter_values() .map(|f| field_to_field(f, &mut name_counter)) .collect::>()?; Ok(Record { diff --git a/crates/polars-arrow/src/io/flight/mod.rs b/crates/polars-arrow/src/io/flight/mod.rs index 85a2be24ea13..c02a4889f7bb 100644 --- a/crates/polars-arrow/src/io/flight/mod.rs +++ b/crates/polars-arrow/src/io/flight/mod.rs @@ -79,7 +79,7 @@ pub fn serialize_schema_to_info( let encoded_data = if let Some(ipc_fields) = ipc_fields { schema_as_encoded_data(schema, ipc_fields) } else { - let ipc_fields = default_ipc_fields(&schema.fields); + let ipc_fields = default_ipc_fields(schema.iter_values()); schema_as_encoded_data(schema, &ipc_fields) }; @@ -92,7 +92,7 @@ fn _serialize_schema(schema: &ArrowSchema, ipc_fields: Option<&[IpcField]>) -> V if let Some(ipc_fields) = ipc_fields { write::schema_to_bytes(schema, ipc_fields) } else { - let ipc_fields = default_ipc_fields(&schema.fields); + let ipc_fields = default_ipc_fields(schema.iter_values()); write::schema_to_bytes(schema, &ipc_fields) } } @@ -113,7 +113,7 @@ pub fn deserialize_schemas(bytes: &[u8]) -> PolarsResult<(ArrowSchema, IpcSchema /// Deserializes [`FlightData`] representing a record batch message to [`RecordBatchT`]. pub fn deserialize_batch( data: &FlightData, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, dictionaries: &read::Dictionaries, ) -> PolarsResult>> { @@ -147,7 +147,7 @@ pub fn deserialize_batch( /// Deserializes [`FlightData`], assuming it to be a dictionary message, into `dictionaries`. pub fn deserialize_dictionary( data: &FlightData, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, dictionaries: &mut read::Dictionaries, ) -> PolarsResult<()> { @@ -182,7 +182,7 @@ pub fn deserialize_dictionary( /// or by upserting into `dictionaries` (when the message is a dictionary) pub fn deserialize_message( data: &FlightData, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, ) -> PolarsResult>>> { diff --git a/crates/polars-arrow/src/io/ipc/read/common.rs b/crates/polars-arrow/src/io/ipc/read/common.rs index ce88785bb6cd..ba47192e83a7 100644 --- a/crates/polars-arrow/src/io/ipc/read/common.rs +++ b/crates/polars-arrow/src/io/ipc/read/common.rs @@ -8,7 +8,7 @@ use polars_utils::pl_str::PlSmallStr; use super::deserialize::{read, skip}; use super::Dictionaries; use crate::array::*; -use crate::datatypes::{ArrowDataType, Field}; +use crate::datatypes::{ArrowDataType, ArrowSchema, Field}; use crate::io::ipc::read::OutOfSpecKind; use crate::io::ipc::{IpcField, IpcSchema}; use crate::record_batch::RecordBatchT; @@ -77,7 +77,7 @@ impl<'a, A, I: Iterator> Iterator for ProjectionIter<'a, A, I> { #[allow(clippy::too_many_arguments)] pub fn read_record_batch( batch: arrow_format::ipc::RecordBatchRef, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, projection: Option<&[usize]>, limit: Option, @@ -127,8 +127,10 @@ pub fn read_record_batch( let mut field_nodes = field_nodes.iter().collect::>(); let columns = if let Some(projection) = projection { - let projection = - ProjectionIter::new(projection, fields.iter().zip(ipc_schema.fields.iter())); + let projection = ProjectionIter::new( + projection, + fields.iter_values().zip(ipc_schema.fields.iter()), + ); projection .map(|maybe_field| match maybe_field { @@ -163,7 +165,7 @@ pub fn read_record_batch( .collect::>>()? } else { fields - .iter() + .iter_values() .zip(ipc_schema.fields.iter()) .map(|(field, ipc_field)| { read( @@ -227,11 +229,11 @@ fn find_first_dict_field<'a>( pub(crate) fn first_dict_field<'a>( id: i64, - fields: &'a [Field], + fields: &'a ArrowSchema, ipc_fields: &'a [IpcField], ) -> PolarsResult<(&'a Field, &'a IpcField)> { assert_eq!(fields.len(), ipc_fields.len()); - for (field, ipc_field) in fields.iter().zip(ipc_fields.iter()) { + for (field, ipc_field) in fields.iter_values().zip(ipc_fields.iter()) { if let Some(field) = find_first_dict_field(id, field, ipc_field) { return Ok(field); } @@ -246,7 +248,7 @@ pub(crate) fn first_dict_field<'a>( #[allow(clippy::too_many_arguments)] pub fn read_dictionary( batch: arrow_format::ipc::DictionaryBatchRef, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, reader: &mut R, @@ -280,7 +282,11 @@ pub fn read_dictionary( }; // Make a fake schema for the dictionary batch. - let fields = vec![Field::new(PlSmallStr::EMPTY, value_type.clone(), false)]; + let fields = std::iter::once(( + PlSmallStr::EMPTY, + Field::new(PlSmallStr::EMPTY, value_type.clone(), false), + )) + .collect(); let ipc_schema = IpcSchema { fields: vec![first_ipc_field.clone()], is_little_endian: ipc_schema.is_little_endian, @@ -305,10 +311,16 @@ pub fn read_dictionary( } pub fn prepare_projection( - fields: &[Field], + schema: &ArrowSchema, mut projection: Vec, -) -> (Vec, PlHashMap, Vec) { - let fields = projection.iter().map(|x| fields[*x].clone()).collect(); +) -> (Vec, PlHashMap, ArrowSchema) { + let schema = projection + .iter() + .map(|x| { + let (k, v) = schema.get_at_index(*x).unwrap(); + (k.clone(), v.clone()) + }) + .collect(); // todo: find way to do this more efficiently let mut indices = (0..projection.len()).collect::>(); @@ -335,7 +347,7 @@ pub fn prepare_projection( } } - (projection, map, fields) + (projection, map, schema) } pub fn apply_projection( diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index c873060969d1..6c831064d5a1 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -89,7 +89,7 @@ fn read_dictionary_block( read_dictionary( batch, - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema, dictionaries, reader, @@ -317,7 +317,7 @@ pub fn read_batch( read_record_batch( batch, - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema, projection, limit, diff --git a/crates/polars-arrow/src/io/ipc/read/file_async.rs b/crates/polars-arrow/src/io/ipc/read/file_async.rs index effd2151fc67..567a58c1a1fb 100644 --- a/crates/polars-arrow/src/io/ipc/read/file_async.rs +++ b/crates/polars-arrow/src/io/ipc/read/file_async.rs @@ -12,7 +12,7 @@ use super::common::{apply_projection, prepare_projection, read_dictionary, read_ use super::file::{deserialize_footer, get_record_batch}; use super::{Dictionaries, FileMetadata, OutOfSpecKind}; use crate::array::*; -use crate::datatypes::{ArrowSchema, Field}; +use crate::datatypes::ArrowSchema; use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER}; use crate::record_batch::RecordBatchT; @@ -38,8 +38,7 @@ impl<'a> FileStream<'a> { R: AsyncRead + AsyncSeek + Unpin + Send + 'a, { let (projection, schema) = if let Some(projection) = projection { - let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); - let schema = ArrowSchema { fields }; + let (p, h, schema) = prepare_projection(&metadata.schema, projection); (Some((p, h)), Some(schema)) } else { (None, None) @@ -218,7 +217,7 @@ where read_record_batch( batch, - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema, projection, limit, @@ -235,7 +234,7 @@ where async fn read_dictionaries( mut reader: R, - fields: &[Field], + fields: &ArrowSchema, ipc_schema: &IpcSchema, blocks: &[Block], scratch: &mut Vec, @@ -331,14 +330,15 @@ async fn cached_read_dictionaries( ) -> PolarsResult<()> { match (&dictionaries, metadata.dictionaries.as_deref()) { (None, Some(blocks)) => { - let new_dictionaries = read_dictionaries( - reader, - &metadata.schema.fields, - &metadata.ipc_schema, - blocks, - &mut Default::default(), - ) - .await?; + let new_dictionaries: hashbrown::HashMap, ahash::RandomState> = + read_dictionaries( + reader, + &metadata.schema, + &metadata.ipc_schema, + blocks, + &mut Default::default(), + ) + .await?; *dictionaries = Some(new_dictionaries); }, (None, None) => { diff --git a/crates/polars-arrow/src/io/ipc/read/reader.rs b/crates/polars-arrow/src/io/ipc/read/reader.rs index 6328223d427c..8369d2960233 100644 --- a/crates/polars-arrow/src/io/ipc/read/reader.rs +++ b/crates/polars-arrow/src/io/ipc/read/reader.rs @@ -33,8 +33,7 @@ impl FileReader { limit: Option, ) -> Self { let projection = projection.map(|projection| { - let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); - let schema = ArrowSchema { fields }; + let (p, h, schema) = prepare_projection(&metadata.schema, projection); (p, h, schema) }); Self { diff --git a/crates/polars-arrow/src/io/ipc/read/schema.rs b/crates/polars-arrow/src/io/ipc/read/schema.rs index 46925ca7599e..091c21f14c1c 100644 --- a/crates/polars-arrow/src/io/ipc/read/schema.rs +++ b/crates/polars-arrow/src/io/ipc/read/schema.rs @@ -381,10 +381,15 @@ pub(super) fn fb_to_schema( let fields = schema .fields()? .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingFields))?; - let (fields, ipc_fields) = try_unzip_vec(fields.iter().map(|field| { - let (field, fields) = deserialize_field(field?)?; - Ok((field, fields)) - }))?; + + let mut arrow_schema = ArrowSchema::with_capacity(fields.len()); + let mut ipc_fields = Vec::with_capacity(fields.len()); + + for field in fields { + let (field, ipc_field) = deserialize_field(field?)?; + arrow_schema.insert(field.name.clone(), field); + ipc_fields.push(ipc_field); + } let is_little_endian = match schema.endianness()? { arrow_format::ipc::Endianness::Little => true, @@ -392,7 +397,7 @@ pub(super) fn fb_to_schema( }; Ok(( - ArrowSchema { fields }, + arrow_schema, IpcSchema { fields: ipc_fields, is_little_endian, diff --git a/crates/polars-arrow/src/io/ipc/read/stream.rs b/crates/polars-arrow/src/io/ipc/read/stream.rs index de679de9899d..87241596cdbe 100644 --- a/crates/polars-arrow/src/io/ipc/read/stream.rs +++ b/crates/polars-arrow/src/io/ipc/read/stream.rs @@ -167,7 +167,7 @@ fn read_next( let chunk = read_record_batch( batch, - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema, projection.as_ref().map(|x| x.0.as_ref()), None, @@ -201,7 +201,7 @@ fn read_next( read_dictionary( batch, - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema, dictionaries, &mut dict_reader, @@ -250,8 +250,7 @@ impl StreamReader { /// To check if the reader is done, use `is_finished(self)` pub fn new(reader: R, metadata: StreamMetadata, projection: Option>) -> Self { let projection = projection.map(|projection| { - let (p, h, fields) = prepare_projection(&metadata.schema.fields, projection); - let schema = ArrowSchema { fields }; + let (p, h, schema) = prepare_projection(&metadata.schema, projection); (p, h, schema) }); diff --git a/crates/polars-arrow/src/io/ipc/read/stream_async.rs b/crates/polars-arrow/src/io/ipc/read/stream_async.rs index 8d66f81793ed..ab29550d8a14 100644 --- a/crates/polars-arrow/src/io/ipc/read/stream_async.rs +++ b/crates/polars-arrow/src/io/ipc/read/stream_async.rs @@ -132,9 +132,9 @@ async fn maybe_next( .read_to_end(&mut state.data_buffer) .await?; - read_record_batch( + let chunk = read_record_batch( batch, - &state.metadata.schema.fields, + &state.metadata.schema, &state.metadata.ipc_schema, None, None, @@ -144,8 +144,9 @@ async fn maybe_next( 0, state.data_buffer.len() as u64, &mut scratch, - ) - .map(|chunk| Some(StreamState::Some((state, chunk)))) + )?; + + Ok(Some(StreamState::Some((state, chunk)))) }, arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => { state.data_buffer.clear(); @@ -161,7 +162,7 @@ async fn maybe_next( read_dictionary( batch, - &state.metadata.schema.fields, + &state.metadata.schema, &state.metadata.ipc_schema, &mut state.dictionaries, &mut dict_reader, diff --git a/crates/polars-arrow/src/io/ipc/write/file_async.rs b/crates/polars-arrow/src/io/ipc/write/file_async.rs index 142e18b71cbb..aaae101785bc 100644 --- a/crates/polars-arrow/src/io/ipc/write/file_async.rs +++ b/crates/polars-arrow/src/io/ipc/write/file_async.rs @@ -44,7 +44,7 @@ where ipc_fields: Option>, options: WriteOptions, ) -> Self { - let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(&schema.fields)); + let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(schema.iter_values())); let encoded = EncodedData { ipc_message: schema_to_bytes(&schema, &fields), arrow_data: vec![], diff --git a/crates/polars-arrow/src/io/ipc/write/mod.rs b/crates/polars-arrow/src/io/ipc/write/mod.rs index e272f6e1c2c5..7dd2cf2a583d 100644 --- a/crates/polars-arrow/src/io/ipc/write/mod.rs +++ b/crates/polars-arrow/src/io/ipc/write/mod.rs @@ -61,10 +61,9 @@ fn default_ipc_field(data_type: &ArrowDataType, current_id: &mut i64) -> IpcFiel } /// Assigns every dictionary field a unique ID -pub fn default_ipc_fields(fields: &[Field]) -> Vec { +pub fn default_ipc_fields<'a>(fields: impl ExactSizeIterator) -> Vec { let mut dictionary_id = 0i64; fields - .iter() .map(|field| default_ipc_field(field.data_type().to_logical_type(), &mut dictionary_id)) .collect() } diff --git a/crates/polars-arrow/src/io/ipc/write/schema.rs b/crates/polars-arrow/src/io/ipc/write/schema.rs index 594e9275c6a3..d9ae41652c7d 100644 --- a/crates/polars-arrow/src/io/ipc/write/schema.rs +++ b/crates/polars-arrow/src/io/ipc/write/schema.rs @@ -32,8 +32,7 @@ pub fn serialize_schema( }; let fields = schema - .fields - .iter() + .iter_values() .zip(ipc_fields.iter()) .map(|(field, ipc_field)| serialize_field(field, ipc_field)) .collect::>(); diff --git a/crates/polars-arrow/src/io/ipc/write/stream.rs b/crates/polars-arrow/src/io/ipc/write/stream.rs index 5122fd848c7a..330b35d4ca4b 100644 --- a/crates/polars-arrow/src/io/ipc/write/stream.rs +++ b/crates/polars-arrow/src/io/ipc/write/stream.rs @@ -59,7 +59,7 @@ impl StreamWriter { self.ipc_fields = Some(if let Some(ipc_fields) = ipc_fields { ipc_fields } else { - default_ipc_fields(&schema.fields) + default_ipc_fields(schema.iter_values()) }); let encoded_message = EncodedData { diff --git a/crates/polars-arrow/src/io/ipc/write/stream_async.rs b/crates/polars-arrow/src/io/ipc/write/stream_async.rs index 9858739134c2..3718d6f82b29 100644 --- a/crates/polars-arrow/src/io/ipc/write/stream_async.rs +++ b/crates/polars-arrow/src/io/ipc/write/stream_async.rs @@ -36,7 +36,7 @@ where ipc_fields: Option>, write_options: WriteOptions, ) -> Self { - let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(&schema.fields)); + let fields = ipc_fields.unwrap_or_else(|| default_ipc_fields(schema.iter_values())); let task = Some(Self::start(writer, schema, &fields[..])); Self { writer: None, diff --git a/crates/polars-arrow/src/io/ipc/write/writer.rs b/crates/polars-arrow/src/io/ipc/write/writer.rs index 361a40bf5e06..ec010d4d0180 100644 --- a/crates/polars-arrow/src/io/ipc/write/writer.rs +++ b/crates/polars-arrow/src/io/ipc/write/writer.rs @@ -66,7 +66,7 @@ impl FileWriter { let ipc_fields = if let Some(ipc_fields) = ipc_fields { ipc_fields } else { - default_ipc_fields(&schema.fields) + default_ipc_fields(schema.iter_values()) }; Self { diff --git a/crates/polars-arrow/src/mmap/mod.rs b/crates/polars-arrow/src/mmap/mod.rs index ce8b27a4d4d9..8d1ed5f89155 100644 --- a/crates/polars-arrow/src/mmap/mod.rs +++ b/crates/polars-arrow/src/mmap/mod.rs @@ -10,7 +10,7 @@ use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult}; use polars_utils::pl_str::PlSmallStr; use crate::array::Array; -use crate::datatypes::{ArrowDataType, Field}; +use crate::datatypes::{ArrowDataType, ArrowSchema, Field}; use crate::io::ipc::read::file::{get_dictionary_batch, get_record_batch}; use crate::io::ipc::read::{ first_dict_field, Dictionaries, FileMetadata, IpcBuffer, Node, OutOfSpecKind, @@ -72,7 +72,7 @@ fn get_buffers_nodes(batch: RecordBatchRef) -> PolarsResult<(VecDeque } unsafe fn _mmap_record>( - fields: &[Field], + fields: &ArrowSchema, ipc_fields: &[IpcField], data: Arc, batch: RecordBatchRef, @@ -87,7 +87,7 @@ unsafe fn _mmap_record>( .unwrap_or_else(VecDeque::new); fields - .iter() + .iter_values() .map(|f| &f.data_type) .cloned() .zip(ipc_fields) @@ -108,7 +108,7 @@ unsafe fn _mmap_record>( } unsafe fn _mmap_unchecked>( - fields: &[Field], + fields: &ArrowSchema, ipc_fields: &[IpcField], data: Arc, block: Block, @@ -148,7 +148,7 @@ pub unsafe fn mmap_unchecked>( let (message, offset) = read_message(data.as_ref().as_ref(), block)?; let batch = get_record_batch(message)?; _mmap_record( - &metadata.schema.fields, + &metadata.schema, &metadata.ipc_schema.fields, data.clone(), batch, @@ -170,7 +170,7 @@ unsafe fn mmap_dictionary>( .id() .map_err(|err| polars_err!(ComputeError: "out-of-spec {:?}", OutOfSpecKind::InvalidFlatbufferId(err)))?; let (first_field, first_ipc_field) = - first_dict_field(id, &metadata.schema.fields, &metadata.ipc_schema.fields)?; + first_dict_field(id, &metadata.schema, &metadata.ipc_schema.fields)?; let batch = batch .data() @@ -189,7 +189,7 @@ unsafe fn mmap_dictionary>( let field = Field::new(PlSmallStr::EMPTY, value_type.clone(), false); let chunk = _mmap_record( - &[field], + &std::iter::once((field.name.clone(), field)).collect(), &[first_ipc_field.clone()], data.clone(), batch, diff --git a/crates/polars-core/src/frame/chunks.rs b/crates/polars-core/src/frame/chunks.rs index 2a371e85ab31..349a77c56d75 100644 --- a/crates/polars-core/src/frame/chunks.rs +++ b/crates/polars-core/src/frame/chunks.rs @@ -5,15 +5,15 @@ use crate::prelude::*; use crate::utils::_split_offsets; use crate::POOL; -impl TryFrom<(RecordBatch, &[ArrowField])> for DataFrame { +impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame { type Error = PolarsError; - fn try_from(arg: (RecordBatch, &[ArrowField])) -> PolarsResult { + fn try_from(arg: (RecordBatch, &ArrowSchema)) -> PolarsResult { let columns: PolarsResult> = arg .0 .columns() .iter() - .zip(arg.1) + .zip(arg.1.iter_values()) .map(|(arr, field)| Series::try_from((field, arr.clone()))) .collect(); diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 2e7b3adfd2f6..b2139bdd770d 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -335,8 +335,7 @@ impl DataFrame { /// Create an empty `DataFrame` with empty columns as per the `schema`. pub fn empty_with_arrow_schema(schema: &ArrowSchema) -> Self { let cols = schema - .fields - .iter() + .iter_values() .map(|fld| Series::new_empty(fld.name.clone(), &(fld.data_type().into()))) .collect(); unsafe { DataFrame::new_no_checks(cols) } diff --git a/crates/polars-core/src/frame/row/dataframe.rs b/crates/polars-core/src/frame/row/dataframe.rs index a2fe1041abb5..4a40a9ed6d6f 100644 --- a/crates/polars-core/src/frame/row/dataframe.rs +++ b/crates/polars-core/src/frame/row/dataframe.rs @@ -56,7 +56,7 @@ impl DataFrame { let capacity = rows.size_hint().0; let mut buffers: Vec<_> = schema - .iter_dtypes() + .iter_values() .map(|dtype| { let buf: AnyValueBuffer = (dtype, capacity).into(); buf @@ -98,7 +98,7 @@ impl DataFrame { let capacity = rows.size_hint().0; let mut buffers: Vec<_> = schema - .iter_dtypes() + .iter_values() .map(|dtype| { let buf: AnyValueBuffer = (dtype, capacity).into(); buf @@ -136,7 +136,7 @@ impl DataFrame { pub fn from_rows(rows: &[Row]) -> PolarsResult { let schema = rows_to_schema_first_non_null(rows, Some(50))?; let has_nulls = schema - .iter_dtypes() + .iter_values() .any(|dtype| matches!(dtype, DataType::Null)); polars_ensure!( !has_nulls, ComputeError: "unable to infer row types because of null values" diff --git a/crates/polars-core/src/frame/row/mod.rs b/crates/polars-core/src/frame/row/mod.rs index b627eaddef05..090f22c78dfe 100644 --- a/crates/polars-core/src/frame/row/mod.rs +++ b/crates/polars-core/src/frame/row/mod.rs @@ -207,7 +207,7 @@ pub fn rows_to_schema_first_non_null( for row in rows.iter().take(max_infer).skip(1) { // for i in 1..max_infer { let nulls: Vec<_> = schema - .iter_dtypes() + .iter_values() .enumerate() .filter_map(|(i, dtype)| { // double check struct and list types types diff --git a/crates/polars-core/src/schema.rs b/crates/polars-core/src/schema.rs index 0bf6f1bb2045..6485c513e6ec 100644 --- a/crates/polars-core/src/schema.rs +++ b/crates/polars-core/src/schema.rs @@ -6,7 +6,7 @@ use crate::prelude::*; use crate::utils::try_get_supertype; pub type SchemaRef = Arc; -pub type Schema = polars_schema::schema::Schema; +pub type Schema = polars_schema::Schema; pub trait SchemaExt { fn from_arrow_schema(value: &ArrowSchema) -> Self; @@ -25,8 +25,7 @@ pub trait SchemaExt { impl SchemaExt for Schema { fn from_arrow_schema(value: &ArrowSchema) -> Self { value - .fields - .iter() + .iter_values() .map(|x| (x.name.clone(), DataType::from_arrow(&x.data_type, true))) .collect() } @@ -56,11 +55,14 @@ impl SchemaExt for Schema { /// Convert self to `ArrowSchema` by cloning the fields. fn to_arrow(&self, compat_level: CompatLevel) -> ArrowSchema { - let fields: Vec<_> = self - .iter() - .map(|(name, dtype)| dtype.to_arrow_field(name.clone(), compat_level)) - .collect(); - ArrowSchema::from(fields) + self.iter() + .map(|(name, dtype)| { + ( + name.clone(), + dtype.to_arrow_field(name.clone(), compat_level), + ) + }) + .collect() } /// Iterates the [`Field`]s in this schema, constructing them anew by cloning each `(&name, &dtype)` pair. @@ -130,19 +132,19 @@ impl IndexOfSchema for Schema { impl IndexOfSchema for ArrowSchema { fn index_of(&self, name: &str) -> Option { - self.fields.iter().position(|f| f.name.as_str() == name) + self.iter_values().position(|f| f.name.as_str() == name) } fn get_names(&self) -> Vec<&PlSmallStr> { - self.fields.iter().map(|f| &f.name).collect() + self.iter_values().map(|f| &f.name).collect() } fn get_names_owned(&self) -> Vec { - self.fields.iter().map(|f| f.name.clone()).collect() + self.iter_values().map(|f| f.name.clone()).collect() } fn get_names_str(&self) -> Vec<&str> { - self.fields.iter().map(|f| f.name.as_str()).collect() + self.iter_values().map(|f| f.name.as_str()).collect() } } @@ -170,8 +172,7 @@ impl SchemaNamesAndDtypes for ArrowSchema { type DataType = ArrowDataType; fn get_names_and_dtypes(&'_ self) -> Vec<(&'_ str, Self::DataType)> { - self.fields - .iter() + self.iter_values() .map(|x| (x.name.as_str(), x.data_type.clone())) .collect() } diff --git a/crates/polars-io/src/avro/read.rs b/crates/polars-io/src/avro/read.rs index 3c386ef9d77a..e0823e6dd916 100644 --- a/crates/polars-io/src/avro/read.rs +++ b/crates/polars-io/src/avro/read.rs @@ -109,7 +109,7 @@ where } let (projection, projected_schema) = if let Some(projection) = self.projection { - let mut prj = vec![false; schema.fields.len()]; + let mut prj = vec![false; schema.len()]; for &index in projection.iter() { prj[index] = true; } @@ -118,8 +118,7 @@ where (None, schema.clone()) }; - let avro_reader = - avro::read::Reader::new(&mut self.reader, metadata, schema.fields, projection); + let avro_reader = avro::read::Reader::new(&mut self.reader, metadata, schema, projection); finish_reader( avro_reader, diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index a06b3f88a0dd..466b223982b4 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -268,7 +268,7 @@ impl FetchRowGroupsFromObjectStore { let projected_fields: Option> = projection.map(|projection| { projection .iter() - .map(|i| (schema.fields[*i].name.clone())) + .map(|i| (schema.get_at_index(*i).as_ref().unwrap().0.clone())) .collect() }); diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index b5003f1a1feb..b08c5b6125ae 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -22,8 +22,7 @@ pub(crate) fn collect_statistics( ) -> PolarsResult> { // TODO! fix this performance. This is a full sequential scan. let stats = schema - .fields - .iter() + .iter_values() .map(|field| match part_md.get_partitions(&field.name) { Some(md) => { let st = deserialize(field, &md)?; diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 096e7d8170c8..2ac4ef7c019f 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -69,7 +69,7 @@ fn column_idx_to_series( file_schema: &ArrowSchema, store: &mmap::ColumnStore, ) -> PolarsResult { - let field = &file_schema.fields[column_i]; + let field = file_schema.get_at_index(column_i).unwrap().1; #[cfg(debug_assertions)] { @@ -234,7 +234,7 @@ fn projected_columns_set<'a>( Some( projection .iter() - .map(|i| schema.fields[*i].name.as_str()) + .map(|i| schema.get_at_index(*i).unwrap().0.as_str()) .collect::>(), ) } @@ -303,7 +303,7 @@ fn rg_to_dfs_prefiltered( // column indexes of the schema. let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns); let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns); - for (i, field) in schema.fields.iter().enumerate() { + for (i, field) in schema.iter_values().enumerate() { if live_variables.contains(&field.name[..]) { live_idx_to_col_idx.push(i); } else { @@ -351,7 +351,7 @@ fn rg_to_dfs_prefiltered( .map(|i| { let col_idx = live_idx_to_col_idx[i]; - let name = &schema.fields[col_idx].name; + let name = schema.get_at_index(col_idx).unwrap().0; let field_md = part_mds[rg_idx].get_partitions(name).unwrap(); column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store) @@ -431,7 +431,7 @@ fn rg_to_dfs_prefiltered( .into_par_iter() .map(|i| { let col_idx = dead_idx_to_col_idx[i]; - let name = &schema.fields[col_idx].name; + let name = schema.get_at_index(col_idx).unwrap().0; #[cfg(debug_assertions)] { @@ -473,7 +473,12 @@ fn rg_to_dfs_prefiltered( MaskSetting::Auto => { // Prefiltering is more expensive for nested types so we make the cut-off // higher. - let is_nested = schema.fields[col_idx].data_type.is_nested(); + let is_nested = schema + .get_at_index(col_idx) + .unwrap() + .1 + .data_type + .is_nested(); // We empirically selected these numbers. let do_prefilter = (is_nested && prefilter_cost <= 0.01) @@ -572,7 +577,7 @@ fn rg_to_dfs_optionally_par_over_columns( projection .par_iter() .map(|column_i| { - let name = &schema.fields[*column_i].name; + let name = schema.get_at_index(*column_i).unwrap().0; let part = part_md.get_partitions(name).unwrap(); column_idx_to_series( @@ -589,7 +594,7 @@ fn rg_to_dfs_optionally_par_over_columns( projection .iter() .map(|column_i| { - let name = &schema.fields[*column_i].name; + let name = schema.get_at_index(*column_i).unwrap().0; let part = part_md.get_partitions(name).unwrap(); column_idx_to_series( @@ -703,7 +708,7 @@ fn rg_to_dfs_par_over_rg( let columns = projection .iter() .map(|column_i| { - let name = &schema.fields[*column_i].name; + let name = schema.get_at_index(*column_i).unwrap().0; let field_md = part_md.get_partitions(name).unwrap(); column_idx_to_series( diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index f5b52437dd82..25e8852a92ce 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -87,12 +87,10 @@ impl ParquetReader { let self_schema = self.schema()?; let self_schema = self_schema.as_ref(); - if let Some(ref projection) = self.projection { - let projection = projection.as_slice(); - + if let Some(projection) = self.projection.as_deref() { ensure_matching_schema( - &schema.try_project(projection)?, - &self_schema.try_project(projection)?, + &schema.try_project_indices(projection)?, + &self_schema.try_project_indices(projection)?, )?; } else { ensure_matching_schema(schema, self_schema)?; @@ -290,12 +288,10 @@ impl ParquetAsyncReader { let self_schema = self.schema().await?; let self_schema = self_schema.as_ref(); - if let Some(ref projection) = self.projection { - let projection = projection.as_slice(); - + if let Some(projection) = self.projection.as_deref() { ensure_matching_schema( - &schema.try_project(projection)?, - &self_schema.try_project(projection)?, + &schema.try_project_indices(projection)?, + &self_schema.try_project_indices(projection)?, )?; } else { ensure_matching_schema(schema, self_schema)?; diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 3129421e21d7..7ca2229ae62f 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -132,8 +132,7 @@ where fn get_encodings(schema: &ArrowSchema) -> Vec> { schema - .fields - .iter() + .iter_values() .map(|f| transverse(&f.data_type, encoding_map)) .collect() } diff --git a/crates/polars-io/src/shared.rs b/crates/polars-io/src/shared.rs index 1e13f9ed68c1..06d135e4ca5f 100644 --- a/crates/polars-io/src/shared.rs +++ b/crates/polars-io/src/shared.rs @@ -65,7 +65,7 @@ pub(crate) fn finish_reader( while let Some(batch) = reader.next_record_batch()? { let current_num_rows = num_rows as IdxSize; num_rows += batch.len(); - let mut df = DataFrame::try_from((batch, arrow_schema.fields.as_slice()))?; + let mut df = DataFrame::try_from((batch, arrow_schema))?; if let Some(rc) = &row_index { df.with_row_index_mut(rc.name.clone(), Some(current_num_rows + rc.offset)); @@ -97,8 +97,7 @@ pub(crate) fn finish_reader( if parsed_dfs.is_empty() { // Create an empty dataframe with the correct data types let empty_cols = arrow_schema - .fields - .iter() + .iter_values() .map(|fld| { Series::try_from((fld.name.clone(), new_empty_array(fld.data_type.clone()))) }) @@ -121,10 +120,22 @@ pub(crate) fn schema_to_arrow_checked( compat_level: CompatLevel, _file_name: &str, ) -> PolarsResult { - let fields = schema.iter_fields().map(|field| { - #[cfg(feature = "object")] - polars_ensure!(!matches!(field.data_type(), DataType::Object(_, _)), ComputeError: "cannot write 'Object' datatype to {}", _file_name); - Ok(field.data_type().to_arrow_field(field.name().clone(), compat_level)) - }).collect::>>()?; - Ok(ArrowSchema::from(fields)) + schema + .iter_fields() + .map(|field| { + #[cfg(feature = "object")] + { + polars_ensure!( + !matches!(field.data_type(), DataType::Object(_, _)), + ComputeError: "cannot write 'Object' datatype to {}", + _file_name + ); + } + + let field = field + .data_type() + .to_arrow_field(field.name().clone(), compat_level); + Ok((field.name.clone(), field)) + }) + .collect::>() } diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index 0294b123687d..3c1ab1e248d8 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -89,12 +89,11 @@ pub fn maybe_decompress_bytes<'a>(bytes: &'a [u8], out: &'a mut Vec) -> Pola feature = "avro" ))] pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema { - let fields = &schema.fields; - let fields = projection + projection .iter() - .map(|idx| fields[*idx].clone()) - .collect::>(); - ArrowSchema::from(fields) + .map(|idx| schema.get_at_index(*idx).unwrap()) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() } #[cfg(any( @@ -109,8 +108,8 @@ pub(crate) fn columns_to_projection( ) -> PolarsResult> { let mut prj = Vec::with_capacity(columns.len()); if columns.len() > 100 { - let mut column_names = PlHashMap::with_capacity(schema.fields.len()); - schema.fields.iter().enumerate().for_each(|(i, c)| { + let mut column_names = PlHashMap::with_capacity(schema.len()); + schema.iter_values().enumerate().for_each(|(i, c)| { column_names.insert(c.name.as_str(), i); }); diff --git a/crates/polars-json/src/json/write/mod.rs b/crates/polars-json/src/json/write/mod.rs index 3a9bac40a7fe..a23b245b68b2 100644 --- a/crates/polars-json/src/json/write/mod.rs +++ b/crates/polars-json/src/json/write/mod.rs @@ -114,7 +114,7 @@ impl<'a> FallibleStreamingIterator for RecordSerializer<'a> { let mut is_first_row = true; write!(&mut self.buffer, "{{")?; - for (f, ref mut it) in self.schema.fields.iter().zip(self.iterators.iter_mut()) { + for (f, ref mut it) in self.schema.iter_values().zip(self.iterators.iter_mut()) { if !is_first_row { write!(&mut self.buffer, ",")?; } diff --git a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs index 056893e9c0ba..d651f019afcf 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/convert_alp.rs @@ -396,7 +396,7 @@ pub(crate) fn insert_streaming_nodes( let valid_types = || { output_schema - .iter_dtypes() + .iter_values() .all(|dt| allowed_dtype(dt, string_cache)) }; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index fc7a29435c77..523cd1e5c588 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -244,7 +244,7 @@ fn create_physical_plan_impl( if streamable { // This can cause problems with string caches streamable = !input_schema - .iter_dtypes() + .iter_values() .any(|dt| dt.contains_categoricals()) || { #[cfg(feature = "dtype-categorical")] diff --git a/crates/polars-parquet/src/arrow/read/schema/convert.rs b/crates/polars-parquet/src/arrow/read/schema/convert.rs index e55e4a15c88c..aae71e871b00 100644 --- a/crates/polars-parquet/src/arrow/read/schema/convert.rs +++ b/crates/polars-parquet/src/arrow/read/schema/convert.rs @@ -1,5 +1,5 @@ //! This module has entry points, [`parquet_to_arrow_schema`] and the more configurable [`parquet_to_arrow_schema_with_options`]. -use arrow::datatypes::{ArrowDataType, Field, IntervalUnit, TimeUnit}; +use arrow::datatypes::{ArrowDataType, ArrowSchema, Field, IntervalUnit, TimeUnit}; use polars_utils::pl_str::PlSmallStr; use crate::arrow::read::schema::SchemaInferenceOptions; @@ -11,7 +11,7 @@ use crate::parquet::schema::Repetition; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. -pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { +pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> ArrowSchema { parquet_to_arrow_schema_with_options(fields, &None) } @@ -19,11 +19,12 @@ pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { pub fn parquet_to_arrow_schema_with_options( fields: &[ParquetType], options: &Option, -) -> Vec { +) -> ArrowSchema { fields .iter() .filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default()))) - .collect::>() + .map(|x| (x.name.clone(), x)) + .collect() } fn from_int32( @@ -450,6 +451,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(fields, expected); Ok(()) @@ -474,6 +476,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(fields, expected); Ok(()) @@ -494,6 +497,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(fields, expected); Ok(()) @@ -731,6 +735,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -773,6 +778,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -858,6 +864,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -891,6 +898,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -946,6 +954,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -1031,6 +1040,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -1146,6 +1156,7 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema(parquet_schema.fields()); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); Ok(()) @@ -1202,6 +1213,7 @@ mod tests { int96_coerce_to_timeunit: tu, }), ); + let fields = fields.iter_values().cloned().collect::>(); assert_eq!(arrow_fields, fields); } Ok(()) diff --git a/crates/polars-parquet/src/arrow/read/schema/metadata.rs b/crates/polars-parquet/src/arrow/read/schema/metadata.rs index d90d270b9666..0339032f5d84 100644 --- a/crates/polars-parquet/src/arrow/read/schema/metadata.rs +++ b/crates/polars-parquet/src/arrow/read/schema/metadata.rs @@ -66,7 +66,7 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> PolarsResult usize { /// Creates a parquet [`SchemaDescriptor`] from a [`ArrowSchema`]. pub fn to_parquet_schema(schema: &ArrowSchema) -> PolarsResult { let parquet_types = schema - .fields - .iter() + .iter_values() .map(to_parquet_type) .collect::>>()?; Ok(SchemaDescriptor::new( diff --git a/crates/polars-parquet/src/arrow/write/row_group.rs b/crates/polars-parquet/src/arrow/write/row_group.rs index 28928a2dab08..397b79ed46ee 100644 --- a/crates/polars-parquet/src/arrow/write/row_group.rs +++ b/crates/polars-parquet/src/arrow/write/row_group.rs @@ -82,7 +82,7 @@ impl + 'static, I: Iterator>, ) -> PolarsResult { - if encodings.len() != schema.fields.len() { + if encodings.len() != schema.len() { polars_bail!(InvalidOperation: "The number of encodings must equal the number of fields".to_string(), ) diff --git a/crates/polars-parquet/src/arrow/write/schema.rs b/crates/polars-parquet/src/arrow/write/schema.rs index 171b79cf3d24..40c9fbdedccf 100644 --- a/crates/polars-parquet/src/arrow/write/schema.rs +++ b/crates/polars-parquet/src/arrow/write/schema.rs @@ -49,16 +49,15 @@ fn convert_data_type(data_type: ArrowDataType) -> ArrowDataType { pub fn schema_to_metadata_key(schema: &ArrowSchema) -> KeyValue { // Convert schema until more arrow readers are aware of binview - let serialized_schema = if schema.fields.iter().any(|field| field.data_type.is_view()) { - let fields = schema - .fields - .iter() + let serialized_schema = if schema.iter_values().any(|field| field.data_type.is_view()) { + let schema = schema + .iter_values() .map(|field| convert_field(field.clone())) - .collect::>(); - let schema = ArrowSchema::from(fields); - schema_to_bytes(&schema, &default_ipc_fields(&schema.fields)) + .map(|x| (x.name.clone(), x)) + .collect(); + schema_to_bytes(&schema, &default_ipc_fields(schema.iter_values())) } else { - schema_to_bytes(schema, &default_ipc_fields(&schema.fields)) + schema_to_bytes(schema, &default_ipc_fields(schema.iter_values())) }; // manually prepending the length to the schema as arrow uses the legacy IPC format diff --git a/crates/polars-parquet/src/arrow/write/sink.rs b/crates/polars-parquet/src/arrow/write/sink.rs index 99a61f28fee2..3c60ff9e9f70 100644 --- a/crates/polars-parquet/src/arrow/write/sink.rs +++ b/crates/polars-parquet/src/arrow/write/sink.rs @@ -45,7 +45,7 @@ where encodings: Vec>, options: WriteOptions, ) -> PolarsResult { - if encodings.len() != schema.fields.len() { + if encodings.len() != schema.len() { polars_bail!(InvalidOperation: "The number of encodings must equal the number of fields".to_string(), ) @@ -120,7 +120,7 @@ where self: Pin<&mut Self>, item: RecordBatchT>, ) -> Result<(), Self::Error> { - if self.schema.fields.len() != item.arrays().len() { + if self.schema.len() != item.arrays().len() { polars_bail!(InvalidOperation: "The number of arrays in the chunk must equal the number of fields in the schema" ) diff --git a/crates/polars-pipe/src/executors/sinks/group_by/generic/hash_table.rs b/crates/polars-pipe/src/executors/sinks/group_by/generic/hash_table.rs index 920a865e3956..3e57db331b3e 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/generic/hash_table.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/generic/hash_table.rs @@ -259,7 +259,7 @@ impl AggHashTable { let key_dtypes = self .output_schema - .iter_dtypes() + .iter_values() .take(self.num_keys) .map(|dtype| dtype.to_physical().to_arrow(CompatLevel::newest())) .collect::>(); diff --git a/crates/polars-pipe/src/executors/sinks/group_by/generic/sink.rs b/crates/polars-pipe/src/executors/sinks/group_by/generic/sink.rs index dd3231d5af7d..50a68cd34d27 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/generic/sink.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/generic/sink.rs @@ -28,7 +28,7 @@ impl GenericGroupby2 { ) -> Self { let key_dtypes: Arc<[DataType]> = Arc::from( output_schema - .iter_dtypes() + .iter_values() .take(key_columns.len()) .cloned() .collect::>(), diff --git a/crates/polars-plan/src/plans/conversion/convert_utils.rs b/crates/polars-plan/src/plans/conversion/convert_utils.rs index 373f28a405bb..51e6940483ba 100644 --- a/crates/polars-plan/src/plans/conversion/convert_utils.rs +++ b/crates/polars-plan/src/plans/conversion/convert_utils.rs @@ -18,7 +18,7 @@ pub(super) fn convert_st_union( let mut exprs = vec![]; let input_schema = lp_arena.get(*input).schema(lp_arena); - let to_cast = input_schema.iter().zip(schema.iter_dtypes()).flat_map( + let to_cast = input_schema.iter().zip(schema.iter_values()).flat_map( |((left_name, left_type), st)| { if left_type != st { Some(col(left_name.clone()).cast(st.clone())) diff --git a/crates/polars-python/src/dataframe/construction.rs b/crates/polars-python/src/dataframe/construction.rs index 2fcdea55ab37..229e1d85b2bb 100644 --- a/crates/polars-python/src/dataframe/construction.rs +++ b/crates/polars-python/src/dataframe/construction.rs @@ -85,7 +85,7 @@ fn update_schema_from_rows( rows: &[Row], infer_schema_length: Option, ) -> PyResult<()> { - let schema_is_complete = schema.iter_dtypes().all(|dtype| dtype.is_known()); + let schema_is_complete = schema.iter_values().all(|dtype| dtype.is_known()); if schema_is_complete { return Ok(()); } @@ -95,7 +95,7 @@ fn update_schema_from_rows( rows_to_supertypes(rows, infer_schema_length).map_err(PyPolarsErr::from)?; let inferred_dtypes_slice = inferred_dtypes.as_slice(); - for (i, dtype) in schema.iter_dtypes_mut().enumerate() { + for (i, dtype) in schema.iter_values_mut().enumerate() { if !dtype.is_known() { *dtype = inferred_dtypes_slice.get(i).ok_or_else(|| { polars_err!(SchemaMismatch: "the number of columns in the schema does not match the data") @@ -120,7 +120,7 @@ fn resolve_schema_overrides(schema: &mut Schema, schema_overrides: Option PyResult { }; let dict = PyDict::new_bound(py); - fields_to_pydict(&metadata.schema.fields, &dict, py)?; + fields_to_pydict(&metadata.schema, &dict, py)?; Ok(dict.to_object(py)) } @@ -42,13 +43,13 @@ pub fn read_parquet_schema(py: Python, py_f: PyObject) -> PyResult { let arrow_schema = infer_schema(&metadata).map_err(PyPolarsErr::from)?; let dict = PyDict::new_bound(py); - fields_to_pydict(&arrow_schema.fields, &dict, py)?; + fields_to_pydict(&arrow_schema, &dict, py)?; Ok(dict.to_object(py)) } #[cfg(any(feature = "ipc", feature = "parquet"))] -fn fields_to_pydict(fields: &Vec, dict: &Bound<'_, PyDict>, py: Python) -> PyResult<()> { - for field in fields { +fn fields_to_pydict(schema: &ArrowSchema, dict: &Bound<'_, PyDict>, py: Python) -> PyResult<()> { + for field in schema.iter_values() { let dt = if field.metadata.get(DTYPE_ENUM_KEY) == Some(&DTYPE_ENUM_VALUE.into()) { Wrap(create_enum_data_type(Utf8ViewArray::new_empty( ArrowDataType::LargeUtf8, diff --git a/crates/polars-python/src/interop/arrow/to_py.rs b/crates/polars-python/src/interop/arrow/to_py.rs index c8ac9aee7093..da01554660b8 100644 --- a/crates/polars-python/src/interop/arrow/to_py.rs +++ b/crates/polars-python/src/interop/arrow/to_py.rs @@ -92,7 +92,7 @@ pub struct DataFrameStreamIterator { impl DataFrameStreamIterator { fn new(df: &DataFrame) -> Self { let schema = df.schema().to_arrow(CompatLevel::newest()); - let data_type = ArrowDataType::Struct(schema.fields); + let data_type = ArrowDataType::Struct(schema.into_iter_values().collect()); Self { columns: df.get_columns().to_vec(), diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index dff78ddca623..b26e8afc6ef5 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -236,7 +236,7 @@ impl PyLazyFrame { ShapeMismatch: "The length of the new names list should be equal to or less than the original column length", ); Ok(schema - .iter_dtypes() + .iter_values() .zip(new_names) .map(|(dtype, name)| Field::new(name.into(), dtype.clone())) .collect()) diff --git a/crates/polars-schema/src/lib.rs b/crates/polars-schema/src/lib.rs index 1ce7e1766648..902d6a57d6f9 100644 --- a/crates/polars-schema/src/lib.rs +++ b/crates/polars-schema/src/lib.rs @@ -1 +1,2 @@ pub mod schema; +pub use schema::Schema; diff --git a/crates/polars-schema/src/schema.rs b/crates/polars-schema/src/schema.rs index 04be14455bea..b927a88f3b63 100644 --- a/crates/polars-schema/src/schema.rs +++ b/crates/polars-schema/src/schema.rs @@ -2,7 +2,7 @@ use core::fmt::{Debug, Formatter}; use core::hash::{Hash, Hasher}; use indexmap::map::MutableKeys; -use polars_error::{polars_ensure, polars_err, PolarsResult}; +use polars_error::{polars_bail, polars_ensure, polars_err, PolarsResult}; use polars_utils::aliases::{InitHashMaps, PlIndexMap}; use polars_utils::pl_str::PlSmallStr; @@ -55,6 +55,10 @@ where Some(old_name) } + pub fn insert(&mut self, key: PlSmallStr, value: D) -> Option { + self.fields.insert(key, value) + } + /// Create a new schema from this one, inserting a field with `name` and `dtype` at the given `index`. /// /// If a field named `name` already exists, it is updated with the new dtype. Regardless, the field named `name` is @@ -286,14 +290,15 @@ where ) } - /// Iterates over references to the dtypes in this schema. - pub fn iter_dtypes(&self) -> impl '_ + ExactSizeIterator { - self.fields.iter().map(|(_name, dtype)| dtype) + /// Iterates over the `(&name, &dtype)` pairs in this schema. + /// + /// For an owned version, use [`iter_fields`][Self::iter_fields], which clones the data to iterate owned `Field`s + pub fn iter(&self) -> impl ExactSizeIterator + '_ { + self.fields.iter() } - /// Iterates over mut references to the dtypes in this schema. - pub fn iter_dtypes_mut(&mut self) -> impl '_ + ExactSizeIterator { - self.fields.iter_mut().map(|(_name, dtype)| dtype) + pub fn iter_mut(&mut self) -> impl ExactSizeIterator + '_ { + self.fields.iter_mut() } /// Iterates over references to the names in this schema. @@ -301,19 +306,26 @@ where self.fields.iter().map(|(name, _dtype)| name) } - /// Iterates over the `(&name, &dtype)` pairs in this schema. - /// - /// For an owned version, use [`iter_fields`][Self::iter_fields], which clones the data to iterate owned `Field`s - pub fn iter(&self) -> impl ExactSizeIterator + '_ { - self.fields.iter() + /// Iterates over references to the dtypes in this schema. + pub fn iter_values(&self) -> impl '_ + ExactSizeIterator { + self.fields.iter().map(|(_name, dtype)| dtype) } - pub fn iter_mut(&mut self) -> impl ExactSizeIterator + '_ { - self.fields.iter_mut() + pub fn into_iter_values(self) -> impl ExactSizeIterator { + self.fields.into_values() + } + + /// Iterates over mut references to the dtypes in this schema. + pub fn iter_values_mut(&mut self) -> impl '_ + ExactSizeIterator { + self.fields.iter_mut().map(|(_name, dtype)| dtype) + } + + pub fn index_of(&self, name: &str) -> Option { + self.fields.get_index_of(name) } /// Generates another schema with just the specified columns selected from this one. - pub fn select(&self, columns: I) -> PolarsResult + pub fn try_project(&self, columns: I) -> PolarsResult where I: IntoIterator, I::Item: AsRef, @@ -332,8 +344,42 @@ where Ok(Self::from(schema)) } - pub fn index_of(&self, name: &str) -> Option { - self.fields.get_index_of(name) + pub fn try_project_indices(&self, indices: &[usize]) -> PolarsResult { + let fields = indices + .iter() + .map(|&i| { + let Some((k, v)) = self.fields.get_index(i) else { + polars_bail!( + SchemaFieldNotFound: + "projection index {} is out of bounds for schema of length {}", + i, self.fields.len() + ); + }; + + Ok((k.clone(), v.clone())) + }) + .collect::>>()?; + + Ok(Self { fields }) + } + + /// Returns a new [`Schema`] with a subset of all fields whose `predicate` + /// evaluates to true. + pub fn filter bool>(self, predicate: F) -> Self { + let fields = self + .fields + .into_iter() + .enumerate() + .filter_map(|(index, (name, d))| { + if (predicate)(index, &d) { + Some((name, d)) + } else { + None + } + }) + .collect(); + + Self { fields } } } diff --git a/crates/polars-stream/src/nodes/parquet_source.rs b/crates/polars-stream/src/nodes/parquet_source.rs index a55a569c7697..7332b59e6fef 100644 --- a/crates/polars-stream/src/nodes/parquet_source.rs +++ b/crates/polars-stream/src/nodes/parquet_source.rs @@ -973,16 +973,10 @@ impl ParquetSourceNode { if let Some(columns) = self.file_options.with_columns.as_deref() { columns .iter() - .map(|x| { - // `index_of` on ArrowSchema is slow, so we use the polars native Schema, - // but we need to remember to subtact the row index. - let pos = self.file_info.schema.index_of(x.as_str()).unwrap() - - (self.file_options.row_index.is_some() as usize); - reader_schema.fields[pos].clone() - }) + .map(|x| reader_schema.get(x).unwrap().clone()) .collect() } else { - Arc::from(reader_schema.fields.as_slice()) + reader_schema.iter_values().cloned().collect() }; if self.verbose { @@ -1803,8 +1797,7 @@ fn ensure_metadata_has_projected_fields( // Note: We convert to Polars-native dtypes for timezone normalization. let mut schema = schema - .fields - .into_iter() + .into_iter_values() .map(|x| { let dtype = DataType::from_arrow(&x.data_type, true); (x.name, dtype) diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index 3057583b0992..eb42fc1076ec 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -701,7 +701,7 @@ fn build_select_node_with_ctx( return Ok(input); } - let output_schema = Arc::new(input_schema.select(&columns)?); + let output_schema = Arc::new(input_schema.try_project(&columns)?); let node_kind = PhysNodeKind::SimpleProjection { input, columns }; return Ok(ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind))); } diff --git a/crates/polars/tests/it/arrow/io/ipc/mod.rs b/crates/polars/tests/it/arrow/io/ipc/mod.rs index 3575b2c642a6..452f28bc0a23 100644 --- a/crates/polars/tests/it/arrow/io/ipc/mod.rs +++ b/crates/polars/tests/it/arrow/io/ipc/mod.rs @@ -7,6 +7,7 @@ use arrow::io::ipc::read::{read_file_metadata, FileReader}; use arrow::io::ipc::write::*; use arrow::io::ipc::IpcField; use arrow::record_batch::RecordBatchT; +use polars::prelude::PlSmallStr; use polars_error::*; pub(crate) fn write( @@ -49,8 +50,12 @@ fn round_trip( } fn prep_schema(array: &dyn Array) -> ArrowSchemaRef { - let fields = vec![Field::new("a".into(), array.data_type().clone(), true)]; - Arc::new(ArrowSchema::from(fields)) + let name = PlSmallStr::from_static("a"); + Arc::new(ArrowSchema::from_iter([Field::new( + name, + array.data_type().clone(), + true, + )])) } #[test] diff --git a/crates/polars/tests/it/io/avro/read.rs b/crates/polars/tests/it/io/avro/read.rs index e3f799f02674..dac9adbfc9d0 100644 --- a/crates/polars/tests/it/io/avro/read.rs +++ b/crates/polars/tests/it/io/avro/read.rs @@ -54,7 +54,7 @@ pub(super) fn schema() -> (AvroSchema, ArrowSchema) { } "#; - let schema = ArrowSchema::from(vec![ + let schema = ArrowSchema::from_iter([ Field::new("a".into(), ArrowDataType::Int64, false), Field::new("b".into(), ArrowDataType::Utf8, false), Field::new("c".into(), ArrowDataType::Int32, false), @@ -203,16 +203,14 @@ pub(super) fn read_avro( let metadata = read_metadata(file)?; let schema = read::infer_schema(&metadata.record)?; - let mut reader = read::Reader::new(file, metadata, schema.fields.clone(), projection.clone()); + let mut reader = read::Reader::new(file, metadata, schema.clone(), projection.clone()); let schema = if let Some(projection) = projection { - let fields = schema - .fields - .into_iter() + schema + .into_iter_values() .zip(projection.iter()) .filter_map(|x| if *x.1 { Some(x.0) } else { None }) - .collect::>(); - ArrowSchema::from(fields) + .collect() } else { schema }; @@ -254,8 +252,8 @@ fn test_projected() -> PolarsResult<()> { let avro = write_avro(Codec::Null).unwrap(); - for i in 0..expected_schema.fields.len() { - let mut projection = vec![false; expected_schema.fields.len()]; + for i in 0..expected_schema.len() { + let mut projection = vec![false; expected_schema.len()]; projection[i] = true; let expected = expected @@ -267,14 +265,12 @@ fn test_projected() -> PolarsResult<()> { .collect(); let expected = RecordBatchT::new(expected); - let expected_fields = expected_schema + let expected_schema = expected_schema .clone() - .fields - .into_iter() + .into_iter_values() .zip(projection.iter()) .filter_map(|x| if *x.1 { Some(x.0) } else { None }) - .collect::>(); - let expected_schema = ArrowSchema::from(expected_fields); + .collect(); let (result, schema) = read_avro(&avro, Some(projection))?; @@ -301,7 +297,7 @@ fn schema_list() -> (AvroSchema, ArrowSchema) { } "#; - let schema = ArrowSchema::from(vec![Field::new( + let schema = ArrowSchema::from_iter([Field::new( "h".into(), ArrowDataType::List(Box::new(Field::new( "item".into(), diff --git a/crates/polars/tests/it/io/avro/write.rs b/crates/polars/tests/it/io/avro/write.rs index 061eb52f88e9..43011eb7a2bf 100644 --- a/crates/polars/tests/it/io/avro/write.rs +++ b/crates/polars/tests/it/io/avro/write.rs @@ -15,7 +15,7 @@ use polars_error::PolarsResult; use super::read::read_avro; pub(super) fn schema() -> ArrowSchema { - ArrowSchema::from(vec![ + ArrowSchema::from_iter([ Field::new("int64".into(), ArrowDataType::Int64, false), Field::new("int64 nullable".into(), ArrowDataType::Int64, true), Field::new("utf8".into(), ArrowDataType::Utf8, false), @@ -178,7 +178,7 @@ fn deflate() -> PolarsResult<()> { } fn large_format_schema() -> ArrowSchema { - ArrowSchema::from(vec![ + ArrowSchema::from_iter([ Field::new("large_utf8".into(), ArrowDataType::LargeUtf8, false), Field::new("large_utf8_nullable".into(), ArrowDataType::LargeUtf8, true), Field::new("large_binary".into(), ArrowDataType::LargeBinary, false), @@ -201,7 +201,7 @@ fn large_format_data() -> RecordBatchT> { } fn large_format_expected_schema() -> ArrowSchema { - ArrowSchema::from(vec![ + ArrowSchema::from_iter([ Field::new("large_utf8".into(), ArrowDataType::Utf8, false), Field::new("large_utf8_nullable".into(), ArrowDataType::Utf8, true), Field::new("large_binary".into(), ArrowDataType::Binary, false), @@ -239,7 +239,7 @@ fn check_large_format() -> PolarsResult<()> { } fn struct_schema() -> ArrowSchema { - ArrowSchema::from(vec![ + ArrowSchema::from_iter([ Field::new( "struct".into(), ArrowDataType::Struct(vec![ diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index 7d90679f8804..b82a3200b992 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -1259,8 +1259,7 @@ fn integration_write( }; let encodings = schema - .fields - .iter() + .iter_values() .map(|f| { transverse(&f.data_type, |x| { if let ArrowDataType::Dictionary(..) = x { @@ -1346,7 +1345,7 @@ fn generic_data() -> PolarsResult<(ArrowSchema, RecordBatchT>)> { let array13 = PrimitiveArray::::from_slice([1, 2, 3]) .to(ArrowDataType::Interval(IntervalUnit::YearMonth)); - let schema = ArrowSchema::from(vec![ + let schema = ArrowSchema::from_iter([ Field::new("a1".into(), array1.data_type().clone(), true), Field::new("a2".into(), array2.data_type().clone(), true), Field::new("a3".into(), array3.data_type().clone(), true), @@ -1452,7 +1451,7 @@ fn assert_array_roundtrip( array: Box, limit: Option, ) -> PolarsResult<()> { - let schema = ArrowSchema::from(vec![Field::new( + let schema = ArrowSchema::from_iter([Field::new( "a1".into(), array.data_type().clone(), is_nullable, @@ -1585,11 +1584,8 @@ fn nested_dict_data( Some([true, false, true, true].into()), )?; - let schema = ArrowSchema::from(vec![Field::new( - "c1".into(), - values.data_type().clone(), - true, - )]); + let schema = + ArrowSchema::from_iter([Field::new("c1".into(), values.data_type().clone(), true)]); let chunk = RecordBatchT::try_new(vec![values.boxed()])?; Ok((schema, chunk)) @@ -1620,7 +1616,7 @@ fn nested_dict_limit() -> PolarsResult<()> { fn filter_chunk() -> PolarsResult<()> { let chunk1 = RecordBatchT::new(vec![PrimitiveArray::from_slice([1i16, 3]).boxed()]); let chunk2 = RecordBatchT::new(vec![PrimitiveArray::from_slice([2i16, 4]).boxed()]); - let schema = ArrowSchema::from(vec![Field::new("c1".into(), ArrowDataType::Int16, true)]); + let schema = ArrowSchema::from_iter([Field::new("c1".into(), ArrowDataType::Int16, true)]); let r = integration_write(&schema, &[chunk1.clone(), chunk2.clone()])?; diff --git a/crates/polars/tests/it/io/parquet/arrow/read.rs b/crates/polars/tests/it/io/parquet/arrow/read.rs index 8178097b5c8c..ac03b7fb1e10 100644 --- a/crates/polars/tests/it/io/parquet/arrow/read.rs +++ b/crates/polars/tests/it/io/parquet/arrow/read.rs @@ -123,13 +123,11 @@ fn read_int96_timestamps() -> PolarsResult<()> { let parse = |time_unit: TimeUnit| { let mut reader = Cursor::new(timestamp_data); let metadata = read_metadata(&mut reader)?; - let schema = arrow::datatypes::ArrowSchema { - fields: vec![arrow::datatypes::Field::new( - "timestamps".into(), - arrow::datatypes::ArrowDataType::Timestamp(time_unit, None), - false, - )], - }; + let schema = arrow::datatypes::ArrowSchema::from_iter([arrow::datatypes::Field::new( + "timestamps".into(), + arrow::datatypes::ArrowDataType::Timestamp(time_unit, None), + false, + )]); let reader = FileReader::new(reader, metadata.row_groups, schema, None); reader.collect::>>() }; diff --git a/crates/polars/tests/it/io/parquet/arrow/write.rs b/crates/polars/tests/it/io/parquet/arrow/write.rs index 50433be02b68..9f3782160c3c 100644 --- a/crates/polars/tests/it/io/parquet/arrow/write.rs +++ b/crates/polars/tests/it/io/parquet/arrow/write.rs @@ -41,7 +41,7 @@ fn round_trip_opt_stats( }; let field = Field::new("a1".into(), array.data_type().clone(), true); - let schema = ArrowSchema::from(vec![field]); + let schema = ArrowSchema::from_iter([field]); let options = WriteOptions { statistics: StatisticsOptions::full(), diff --git a/crates/polars/tests/it/io/parquet/read/file.rs b/crates/polars/tests/it/io/parquet/read/file.rs index d2ff7ee65630..5007dcdf0755 100644 --- a/crates/polars/tests/it/io/parquet/read/file.rs +++ b/crates/polars/tests/it/io/parquet/read/file.rs @@ -126,7 +126,7 @@ impl RowGroupReader { #[inline] fn _next(&mut self) -> PolarsResult> { - if self.schema.fields.is_empty() { + if self.schema.is_empty() { return Ok(None); } if self.remaining_rows == 0 { @@ -145,7 +145,7 @@ impl RowGroupReader { let column_chunks = read_columns_many( &mut self.reader, &row_group, - self.schema.fields.clone(), + &self.schema, Some(Filter::new_limited(self.remaining_rows)), )?; diff --git a/crates/polars/tests/it/io/parquet/read/row_group.rs b/crates/polars/tests/it/io/parquet/read/row_group.rs index 44e6622589c5..54c8d17c0076 100644 --- a/crates/polars/tests/it/io/parquet/read/row_group.rs +++ b/crates/polars/tests/it/io/parquet/read/row_group.rs @@ -3,6 +3,7 @@ use std::io::{Read, Seek}; use arrow::array::Array; use arrow::datatypes::Field; use arrow::record_batch::RecordBatchT; +use polars::prelude::ArrowSchema; use polars_error::PolarsResult; use polars_parquet::arrow::read::{column_iter_to_arrays, Filter}; use polars_parquet::parquet::metadata::ColumnChunkMetaData; @@ -133,19 +134,19 @@ pub fn to_deserializer( pub fn read_columns_many( reader: &mut R, row_group: &RowGroupMetaData, - fields: Vec, + fields: &ArrowSchema, filter: Option, ) -> PolarsResult>> { // reads all the necessary columns for all fields from the row group // This operation is IO-bounded `O(C)` where C is the number of columns in the row group let field_columns = fields - .iter() + .iter_values() .map(|field| read_columns(reader, row_group.columns(), &field.name)) .collect::>>()?; field_columns .into_iter() - .zip(fields.clone()) + .zip(fields.iter_values().cloned()) .map(|(columns, field)| to_deserializer(columns.clone(), field, filter.clone())) .collect() } diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index 31c305c4db40..aeb2c6b83e63 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -19,7 +19,7 @@ fn round_trip( encodings: Vec, ) -> PolarsResult<()> { let field = Field::new("a1".into(), array.data_type().clone(), true); - let schema = ArrowSchema::from(vec![field]); + let schema = ArrowSchema::from_iter([field]); let options = WriteOptions { statistics: StatisticsOptions::full(),