From 2d152771381653ffb7454ad30c7a733ce8fe5307 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 10 Oct 2021 04:33:21 +0000 Subject: [PATCH 1/3] Migrated to arrow_format crate. --- Cargo.toml | 5 +- src/io/ipc/convert.rs | 33 +- src/io/ipc/gen/File.rs | 471 -- src/io/ipc/gen/Message.rs | 1344 ------ src/io/ipc/gen/Schema.rs | 4663 -------------------- src/io/ipc/gen/SparseTensor.rs | 1961 -------- src/io/ipc/gen/Tensor.rs | 977 ---- src/io/ipc/gen/mod.rs | 31 - src/io/ipc/mod.rs | 17 +- src/io/ipc/read/array/binary.rs | 10 +- src/io/ipc/read/array/boolean.rs | 7 +- src/io/ipc/read/array/dictionary.rs | 7 +- src/io/ipc/read/array/fixed_size_binary.rs | 10 +- src/io/ipc/read/array/fixed_size_list.rs | 12 +- src/io/ipc/read/array/list.rs | 12 +- src/io/ipc/read/array/map.rs | 12 +- src/io/ipc/read/array/primitive.rs | 10 +- src/io/ipc/read/array/struct_.rs | 12 +- src/io/ipc/read/array/union.rs | 14 +- src/io/ipc/read/array/utf8.rs | 10 +- src/io/ipc/read/common.rs | 10 +- src/io/ipc/read/deserialize.rs | 11 +- src/io/ipc/read/read_basic.rs | 15 +- src/io/ipc/read/reader.rs | 26 +- src/io/ipc/read/stream.rs | 19 +- src/io/ipc/write/common.rs | 73 +- src/io/ipc/write/mod.rs | 2 +- src/io/ipc/write/schema.rs | 9 +- src/io/ipc/write/serialize.rs | 9 + src/io/ipc/write/writer.rs | 16 +- 30 files changed, 182 insertions(+), 9626 deletions(-) delete mode 100644 src/io/ipc/gen/File.rs delete mode 100644 src/io/ipc/gen/Message.rs delete mode 100644 src/io/ipc/gen/Schema.rs delete mode 100644 src/io/ipc/gen/SparseTensor.rs delete mode 100644 src/io/ipc/gen/Tensor.rs delete mode 100644 src/io/ipc/gen/mod.rs diff --git a/Cargo.toml b/Cargo.toml index d5982d72553..e6ba606731a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -flatbuffers = { version = "=2.0.0", optional = true } +arrow-format = { version = "0.1.3", optional = true, features = ["ipc"] } + hex = { version = "^0.4", optional = true } # for IPC compression @@ -107,7 +108,7 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["flatbuffers"] +io_ipc = ["arrow-format"] io_ipc_compression = ["lz4", "zstd"] io_parquet_compression = [ "parquet2/zstd", diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index d6f3edfc1b9..b69cc83bad6 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,22 +17,20 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::{ - get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +use arrow_format::ipc::flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; -use crate::io::ipc::convert::ipc::UnionMode; -use crate::io::ipc::endianess::is_native_little_endian; - +use std::collections::{BTreeMap, HashMap}; mod ipc { - pub use super::super::gen::File::*; - pub use super::super::gen::Message::*; - pub use super::super::gen::Schema::*; + pub use arrow_format::ipc::File::*; + pub use arrow_format::ipc::Message::*; + pub use arrow_format::ipc::Schema::*; } -use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; -use std::collections::{BTreeMap, HashMap}; - -use DataType::*; +use crate::datatypes::{ + get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +}; +use crate::io::ipc::endianess::is_native_little_endian; pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -294,7 +292,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo ipc::Type::Union => { let type_ = field.type_as_union().unwrap(); - let is_sparse = type_.mode() == UnionMode::Sparse; + let is_sparse = type_.mode() == ipc::UnionMode::Sparse; let ids = type_.typeIds().map(|x| x.iter().collect()); @@ -378,7 +376,7 @@ pub(crate) fn build_field<'a>( let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); - let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() { + let fb_dictionary = if let DataType::Dictionary(index_type, inner) = field.data_type() { if let DataType::Extension(name, _, metadata) = inner.as_ref() { write_extension(fbb, name, metadata, &mut kv_vec); } @@ -428,6 +426,7 @@ pub(crate) fn build_field<'a>( } fn type_to_field_type(data_type: &DataType) -> ipc::Type { + use DataType::*; match data_type { Null => ipc::Type::Null, Boolean => ipc::Type::Bool, @@ -461,6 +460,7 @@ pub(crate) fn get_fb_field_type<'a>( is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FbFieldType<'a> { + use DataType::*; let type_type = type_to_field_type(data_type); // some IPC implementations expect an empty list for child data, instead of a null value. @@ -711,9 +711,9 @@ pub(crate) fn get_fb_field_type<'a>( let mut builder = ipc::UnionBuilder::new(fbb); builder.add_mode(if *is_sparse { - UnionMode::Sparse + ipc::UnionMode::Sparse } else { - UnionMode::Dense + ipc::UnionMode::Dense }); if let Some(ids) = ids { @@ -745,6 +745,7 @@ pub(crate) fn get_fb_dictionary<'a>( dict_is_ordered: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> WIPOffset> { + use DataType::*; // We assume that the dictionary index type (as an integer) has already been // validated elsewhere, and can safely assume we are dealing with integers let mut index_builder = ipc::IntBuilder::new(fbb); diff --git a/src/io/ipc/gen/File.rs b/src/io/ipc/gen/File.rs deleted file mode 100644 index a5a1512ce95..00000000000 --- a/src/io/ipc/gen/File.rs +++ /dev/null @@ -1,471 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![allow(dead_code)] -#![allow(unused_imports)] - -use super::Schema::*; -use flatbuffers::EndianScalar; -use std::{cmp::Ordering, mem}; -// automatically generated by the FlatBuffers compiler, do not modify - -// struct Block, aligned to 8 -#[repr(transparent)] -#[derive(Clone, Copy, PartialEq)] -pub struct Block(pub [u8; 24]); -impl std::fmt::Debug for Block { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Block") - .field("offset", &self.offset()) - .field("metaDataLength", &self.metaDataLength()) - .field("bodyLength", &self.bodyLength()) - .finish() - } -} - -impl flatbuffers::SimpleToVerifyInSlice for Block {} -impl flatbuffers::SafeSliceAccess for Block {} -impl<'a> flatbuffers::Follow<'a> for Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - <&'a Block>::follow(buf, loc) - } -} -impl<'a> flatbuffers::Follow<'a> for &'a Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - flatbuffers::follow_cast_ref::(buf, loc) - } -} -impl<'b> flatbuffers::Push for Block { - type Output = Block; - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} -impl<'b> flatbuffers::Push for &'b Block { - type Output = Block; - - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(*self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} - -impl<'a> flatbuffers::Verifiable for Block { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.in_buffer::(pos) - } -} -impl Block { - #[allow(clippy::too_many_arguments)] - pub fn new(offset: i64, metaDataLength: i32, bodyLength: i64) -> Self { - let mut s = Self([0; 24]); - s.set_offset(offset); - s.set_metaDataLength(metaDataLength); - s.set_bodyLength(bodyLength); - s - } - - /// Index to the start of the RecordBlock (note this is past the Message header) - pub fn offset(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[0..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_offset(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[0..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the metadata - pub fn metaDataLength(&self) -> i32 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[8..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_metaDataLength(&mut self, x: i32) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i32 as *const u8, - self.0[8..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the data (this is aligned so there can be a gap between this and - /// the metadata). - pub fn bodyLength(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[16..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_bodyLength(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[16..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } -} - -pub enum FooterOffset {} -#[derive(Copy, Clone, PartialEq)] - -/// ---------------------------------------------------------------------- -/// Arrow File metadata -/// -pub struct Footer<'a> { - pub _tab: flatbuffers::Table<'a>, -} - -impl<'a> flatbuffers::Follow<'a> for Footer<'a> { - type Inner = Footer<'a>; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table { buf, loc }, - } - } -} - -impl<'a> Footer<'a> { - #[inline] - pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Footer { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args FooterArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = FooterBuilder::new(_fbb); - if let Some(x) = args.custom_metadata { - builder.add_custom_metadata(x); - } - if let Some(x) = args.recordBatches { - builder.add_recordBatches(x); - } - if let Some(x) = args.dictionaries { - builder.add_dictionaries(x); - } - if let Some(x) = args.schema { - builder.add_schema(x); - } - builder.add_version(args.version); - builder.finish() - } - - pub const VT_VERSION: flatbuffers::VOffsetT = 4; - pub const VT_SCHEMA: flatbuffers::VOffsetT = 6; - pub const VT_DICTIONARIES: flatbuffers::VOffsetT = 8; - pub const VT_RECORDBATCHES: flatbuffers::VOffsetT = 10; - pub const VT_CUSTOM_METADATA: flatbuffers::VOffsetT = 12; - - #[inline] - pub fn version(&self) -> MetadataVersion { - self._tab - .get::(Footer::VT_VERSION, Some(MetadataVersion::V1)) - .unwrap() - } - #[inline] - pub fn schema(&self) -> Option> { - self._tab - .get::>(Footer::VT_SCHEMA, None) - } - #[inline] - pub fn dictionaries(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_DICTIONARIES, - None, - ) - .map(|v| v.safe_slice()) - } - #[inline] - pub fn recordBatches(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_RECORDBATCHES, - None, - ) - .map(|v| v.safe_slice()) - } - /// User-defined metadata - #[inline] - pub fn custom_metadata( - &self, - ) -> Option>>> { - self._tab.get::>, - >>(Footer::VT_CUSTOM_METADATA, None) - } -} - -impl flatbuffers::Verifiable for Footer<'_> { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::(&"version", Self::VT_VERSION, false)? - .visit_field::>(&"schema", Self::VT_SCHEMA, false)? - .visit_field::>>( - &"dictionaries", - Self::VT_DICTIONARIES, - false, - )? - .visit_field::>>( - &"recordBatches", - Self::VT_RECORDBATCHES, - false, - )? - .visit_field::>, - >>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)? - .finish(); - Ok(()) - } -} -pub struct FooterArgs<'a> { - pub version: MetadataVersion, - pub schema: Option>>, - pub dictionaries: Option>>, - pub recordBatches: Option>>, - pub custom_metadata: Option< - flatbuffers::WIPOffset>>>, - >, -} -impl<'a> Default for FooterArgs<'a> { - #[inline] - fn default() -> Self { - FooterArgs { - version: MetadataVersion::V1, - schema: None, - dictionaries: None, - recordBatches: None, - custom_metadata: None, - } - } -} -pub struct FooterBuilder<'a: 'b, 'b> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, - start_: flatbuffers::WIPOffset, -} -impl<'a: 'b, 'b> FooterBuilder<'a, 'b> { - #[inline] - pub fn add_version(&mut self, version: MetadataVersion) { - self.fbb_ - .push_slot::(Footer::VT_VERSION, version, MetadataVersion::V1); - } - #[inline] - pub fn add_schema(&mut self, schema: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(Footer::VT_SCHEMA, schema); - } - #[inline] - pub fn add_dictionaries( - &mut self, - dictionaries: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_DICTIONARIES, dictionaries); - } - #[inline] - pub fn add_recordBatches( - &mut self, - recordBatches: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_RECORDBATCHES, recordBatches); - } - #[inline] - pub fn add_custom_metadata( - &mut self, - custom_metadata: flatbuffers::WIPOffset< - flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, - >, - ) { - self.fbb_.push_slot_always::>( - Footer::VT_CUSTOM_METADATA, - custom_metadata, - ); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> FooterBuilder<'a, 'b> { - let start = _fbb.start_table(); - FooterBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } -} - -impl std::fmt::Debug for Footer<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Footer"); - ds.field("version", &self.version()); - ds.field("schema", &self.schema()); - ds.field("dictionaries", &self.dictionaries()); - ds.field("recordBatches", &self.recordBatches()); - ds.field("custom_metadata", &self.custom_metadata()); - ds.finish() - } -} -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::root_unchecked::>(buf) } -} - -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::size_prefixed_root_unchecked::>(buf) } -} - -#[inline] -/// Verifies that a buffer of bytes contains a `Footer` -/// and returns it. -/// Note that verification is still experimental and may not -/// catch every error, or be maximally performant. For the -/// previous, unchecked, behavior use -/// `root_as_footer_unchecked`. -pub fn root_as_footer(buf: &[u8]) -> Result { - flatbuffers::root::