From 3e8650ec3ffcb4fb31466841d18ea669f60bc8df Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 16:56:14 +0800 Subject: [PATCH 1/7] refactor(format): move SplitInfo to separate file. --- .../impls/input_format_parquet.rs | 6 +- .../sources/input_formats/input_context.rs | 3 +- .../sources/input_formats/input_format.rs | 93 +------------------ .../input_formats/input_format_text.rs | 3 + .../sources/input_formats/input_pipeline.rs | 2 +- .../sources/input_formats/input_split.rs | 64 +++++++++++++ .../processors/sources/input_formats/mod.rs | 1 + 7 files changed, 74 insertions(+), 98 deletions(-) create mode 100644 src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 471beec5942a..46030f2f683e 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -46,12 +46,12 @@ use similar_asserts::traits::MakeDiff; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; use crate::processors::sources::input_formats::input_context::InputContext; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::InputData; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_split::DynData; +use crate::processors::sources::input_formats::input_split::FileInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::InputFormat; pub struct InputFormatParquet; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 207464c5f089..969937984bb4 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -37,10 +37,9 @@ use crate::processors::sources::input_formats::impls::input_format_csv::InputFor use crate::processors::sources::input_formats::impls::input_format_ndjson::InputFormatNDJson; use crate::processors::sources::input_formats::impls::input_format_parquet::InputFormatParquet; use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_format_text::InputFormatText; use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::InputFormat; const MIN_ROW_PER_BLOCK: usize = 800 * 1000; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs index 3fe63cb38a77..bf27ee5a06c1 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::cmp::min; -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; use common_exception::Result; @@ -27,10 +23,7 @@ use opendal::Object; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; use crate::processors::sources::input_formats::input_context::InputContext; - -pub trait InputData: Send + Sync + 'static { - fn as_any(&self) -> &dyn Any; -} +use crate::processors::sources::input_formats::input_split::SplitInfo; pub trait InputState: Send { fn as_any(&mut self) -> &mut dyn Any; @@ -59,87 +52,3 @@ pub trait InputFormat: Send + Sync { fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; } - -#[derive(Clone)] -pub struct FileInfo { - pub path: String, - pub size: usize, - pub compress_alg: Option, - pub file_meta: Option>, -} - -impl FileInfo { - pub fn split_by_size(&self, split_size: usize) -> Vec { - let mut splits = vec![]; - let n = (self.size + split_size - 1) / split_size; - for i in 0..n - 1 { - splits.push(SplitInfo { - file_info: self.clone(), - seq_infile: i, - is_end: i == n - 1, - offset: i * split_size, - len: min((i + 1) * split_size, self.size), - split_meta: None, - }) - } - splits - } -} - -impl Debug for FileInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FileInfo") - .field("path", &self.path) - .field("size", &self.size) - .finish() - } -} - -#[derive(Clone)] -pub struct SplitInfo { - pub file_info: FileInfo, - pub seq_infile: usize, - pub is_end: bool, - pub offset: usize, - pub len: usize, - pub split_meta: Option>, -} - -impl SplitInfo { - pub fn from_file_info(file_info: FileInfo) -> Self { - let len = file_info.size; - Self { - file_info, - seq_infile: 0, - is_end: true, - offset: 0, - len, - split_meta: None, - } - } - - pub fn from_stream_split(path: String, compress_alg: Option) -> Self { - SplitInfo { - file_info: FileInfo { - path, - size: 0, - compress_alg, - file_meta: None, - }, - seq_infile: 0, - offset: 0, - len: 0, - is_end: false, - split_meta: None, - } - } -} - -impl Debug for SplitInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SplitInfo") - .field("file_info", &self.file_info) - .field("seq_infile", &self.seq_infile) - .finish() - } -} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index c7a521683718..aed932443817 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -40,6 +40,9 @@ use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_split::split_by_size; +use crate::processors::sources::input_formats::input_split::FileInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; pub trait InputFormatTextBase: Sized + Send + Sync + 'static { fn format_type() -> StageFileFormatType; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 28742a5e2bee..352cdf21ee5d 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -34,7 +34,7 @@ use opendal::io_util::CompressAlgorithm; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_context::InputPlan; use crate::processors::sources::input_formats::input_context::StreamPlan; -use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::source_aligner::Aligner; use crate::processors::sources::input_formats::source_deserializer::DeserializeSource; use crate::processors::sources::input_formats::transform_deserializer::DeserializeTransformer; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs new file mode 100644 index 000000000000..512839b09b5f --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs @@ -0,0 +1,64 @@ +use std::any::Any; +use std::fmt::{Debug, Display}; +use std::fmt::Formatter; +use std::sync::Arc; + +use opendal::io_util::CompressAlgorithm; + +pub trait DynData: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} + +#[derive(Debug)] +pub struct FileInfo { + pub path: String, + pub size: usize, + pub num_splits: usize, + pub compress_alg: Option, +} + +pub struct SplitInfo { + pub file: Arc, + pub seq_in_file: usize, + pub offset: usize, + pub size: usize, + pub format_info: Option>, +} + +impl Debug for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}[{}]", self.file.path, self.seq_in_file) + } +} + +impl Display for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}[{}]", self.file.path, self.seq_in_file) + } +} + +pub fn split_by_size(size: usize, split_size: usize) -> Vec<(usize, usize)> { + let mut splits = vec![]; + let n = (size + split_size - 1) / split_size; + for i in 0..n - 1 { + splits.push((i * split_size, std::cmp::min((i + 1) * split_size, size))) + } + splits +} + +impl SplitInfo { + pub fn from_stream_split(path: String, compress_alg: Option) -> Self { + SplitInfo { + file: Arc::new(FileInfo { + path, + size: 0, + num_splits: 1, + compress_alg + }), + seq_in_file: 0, + offset: 0, + size: 0, + format_info: None + } + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs index e281ac5197a4..d96ebe091043 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs @@ -18,6 +18,7 @@ mod input_context; mod input_format; mod input_format_text; mod input_pipeline; +mod input_split; mod source_aligner; mod source_deserializer; mod transform_deserializer; From 012b370f56296af4dcedd174d480dc604e5029c7 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 17:17:33 +0800 Subject: [PATCH 2/7] refactor(format): merge multi fn of InputFormat about get_splits to one. --- .../impls/input_format_parquet.rs | 108 +++++++++++++----- .../sources/input_formats/input_context.rs | 33 +----- .../sources/input_formats/input_format.rs | 24 ++-- .../input_formats/input_format_text.rs | 93 +++++++++------ .../sources/input_formats/input_pipeline.rs | 21 ++-- .../sources/input_formats/input_split.rs | 6 +- .../sources/input_formats/source_aligner.rs | 4 +- 7 files changed, 172 insertions(+), 117 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 46030f2f683e..82f74cc7fc55 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -26,6 +27,7 @@ use common_arrow::arrow::chunk::Chunk; use common_arrow::arrow::datatypes::Field; use common_arrow::arrow::io::parquet::read; use common_arrow::arrow::io::parquet::read::read_columns; +use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::to_deserializer; use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; use common_arrow::parquet::metadata::ColumnChunkMetaData; @@ -41,10 +43,11 @@ use common_exception::Result; use common_io::prelude::FormatSettings; use common_pipeline_core::Pipeline; use common_settings::Settings; -use opendal::Object; +use opendal::Operator; use similar_asserts::traits::MakeDiff; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; @@ -56,6 +59,16 @@ use crate::processors::sources::input_formats::InputFormat; pub struct InputFormatParquet; +impl DynData for FileMetaData { + fn as_any(&self) -> &dyn Any { + self + } +} + +fn col_offset(meta: &ColumnChunkMetaData) -> i64 { + meta.data_page_offset() +} + #[async_trait::async_trait] impl InputFormat for InputFormatParquet { fn get_format_settings(&self, _settings: &Arc) -> Result { @@ -71,28 +84,56 @@ impl InputFormat for InputFormatParquet { b'_' } - async fn read_file_meta( - &self, - _obj: &Object, - _size: usize, - ) -> Result>> { - // todo(youngsofun): execute_copy_aligned - Ok(None) - } - - async fn read_split_meta( + async fn get_splits( &self, - _obj: &Object, - _split_info: &SplitInfo, - ) -> Result>> { - Ok(None) - } - - fn split_files(&self, file_infos: Vec, _split_size: usize) -> Vec { - file_infos - .into_iter() - .map(SplitInfo::from_file_info) - .collect() + plan: &CopyIntoPlan, + op: &Operator, + _settings: &Arc, + schema: &DataSchemaRef, + ) -> Result>> { + let mut infos = vec![]; + for path in &plan.files { + let obj = op.object(path); + let size = obj.metadata().await?.content_length() as usize; + let mut reader = obj.seekable_reader(..(size as u64)); + let mut file_meta = read_metadata_async(&mut reader) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let row_groups = mem::take(&mut file_meta.row_groups); + let fields = Arc::new(get_fields(&file_meta, schema)?); + let read_file_meta = Arc::new(FileMeta { fields }); + let file_info = Arc::new(FileInfo { + path: path.clone(), + size, + num_splits: row_groups.len(), + compress_alg: None, + }); + + for (i, rg) in row_groups.into_iter().enumerate() { + if !rg.columns().is_empty() { + let offset = rg + .columns() + .iter() + .map(col_offset) + .min() + .expect("must success") as usize; + let size = rg.total_byte_size(); + let meta = Arc::new(SplitMeta { + file: read_file_meta.clone(), + meta: rg, + }); + let info = Arc::new(SplitInfo { + file: file_info.clone(), + seq_in_file: i, + offset, + size, + format_info: Some(meta), + }); + infos.push(info); + } + } + } + Ok(infos) } fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { @@ -115,6 +156,21 @@ impl InputFormatPipe for ParquetFormatPipe { type BlockBuilder = ParquetBlockBuilder; } +struct FileMeta { + pub fields: Arc>, +} + +struct SplitMeta { + pub file: Arc, + pub meta: RowGroupMetaData, +} + +impl DynData for SplitMeta { + fn as_any(&self) -> &dyn Any { + self + } +} + pub struct RowGroupInMemory { pub meta: RowGroupMetaData, pub fields: Arc>, @@ -212,14 +268,14 @@ impl BlockBuilderTrait for ParquetBlockBuilder { pub struct AligningState { ctx: Arc, - split_info: SplitInfo, + split_info: Arc, buffers: Vec>, } impl AligningStateTrait for AligningState { type Pipe = ParquetFormatPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + fn try_create(ctx: &Arc, split_info: &Arc) -> Result { Ok(AligningState { ctx: ctx.clone(), split_info: split_info.clone(), @@ -238,7 +294,7 @@ impl AligningStateTrait for AligningState { let size = file_in_memory.len(); tracing::debug!( "aligning parquet file {} of {} bytes", - self.split_info.file_info.path, + self.split_info.file.path, size, ); let mut cursor = Cursor::new(file_in_memory); @@ -256,7 +312,7 @@ impl AligningStateTrait for AligningState { } tracing::info!( "align parquet file {} of {} bytes to {} row groups", - self.split_info.file_info.path, + self.split_info.file.path, size, row_batches.len() ); diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 969937984bb4..6eaa07ade30c 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -107,7 +107,7 @@ pub struct InputContext { pub schema: DataSchemaRef, pub source: InputSource, pub format: Arc, - pub splits: Vec, + pub splits: Vec>, // row format only pub rows_to_skip: usize, @@ -169,11 +169,11 @@ impl InputContext { } let plan = Box::new(CopyIntoPlan { stage_info, files }); let read_batch_size = settings.get_input_read_buffer_size()? as usize; - let split_size = 128usize * 1024 * 1024; let file_format_options = &plan.stage_info.file_format_options; let format = Self::get_input_format(&file_format_options.format)?; - let file_infos = Self::get_file_infos(&format, &operator, &plan).await?; - let splits = format.split_files(file_infos, split_size); + let splits = format + .get_splits(&plan, &operator, &settings, &schema) + .await?; let rows_per_block = MIN_ROW_PER_BLOCK; let record_delimiter = { if file_format_options.record_delimiter.is_empty() { @@ -262,31 +262,6 @@ impl InputContext { }) } - async fn get_file_infos( - format: &Arc, - op: &Operator, - plan: &CopyIntoPlan, - ) -> Result> { - let mut infos = vec![]; - for p in &plan.files { - let obj = op.object(p); - let size = obj.metadata().await?.content_length() as usize; - let file_meta = format.read_file_meta(&obj, size).await?; - let compress_alg = InputContext::get_compression_alg_copy( - plan.stage_info.file_format_options.compression, - p, - )?; - let info = FileInfo { - path: p.clone(), - size, - compress_alg, - file_meta, - }; - infos.push(info) - } - Ok(infos) - } - pub fn num_prefetch_splits(&self) -> Result { Ok(self.settings.get_max_threads()? as usize) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs index bf27ee5a06c1..919c08e5b391 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -14,21 +14,18 @@ use std::sync::Arc; +use common_datavalues::DataSchemaRef; use common_exception::Result; use common_io::prelude::FormatSettings; use common_pipeline_core::Pipeline; use common_settings::Settings; -use opendal::io_util::CompressAlgorithm; -use opendal::Object; +use opendal::Operator; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_split::SplitInfo; -pub trait InputState: Send { - fn as_any(&mut self) -> &mut dyn Any; -} - #[async_trait::async_trait] pub trait InputFormat: Send + Sync { fn get_format_settings(&self, settings: &Arc) -> Result; @@ -37,16 +34,13 @@ pub trait InputFormat: Send + Sync { fn default_field_delimiter(&self) -> u8; - async fn read_file_meta(&self, obj: &Object, size: usize) - -> Result>>; - - async fn read_split_meta( + async fn get_splits( &self, - obj: &Object, - split_info: &SplitInfo, - ) -> Result>>; - - fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec; + plan: &CopyIntoPlan, + op: &Operator, + settings: &Arc, + schema: &DataSchemaRef, + ) -> Result>>; fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index aed932443817..a2d5502c1cec 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use chrono_tz::Tz; use common_datablocks::DataBlock; +use common_datavalues::DataSchemaRef; use common_datavalues::TypeDeserializer; use common_datavalues::TypeDeserializerImpl; use common_exception::ErrorCode; @@ -28,15 +29,13 @@ use common_pipeline_core::Pipeline; use common_settings::Settings; use opendal::io_util::DecompressDecoder; use opendal::io_util::DecompressState; -use opendal::Object; +use opendal::Operator; use super::InputFormat; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; use crate::processors::sources::input_formats::impls::input_format_csv::CsvReaderState; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::InputData; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; @@ -102,34 +101,6 @@ impl InputFormat for InputFormatText { T::default_field_delimiter() } - async fn read_file_meta( - &self, - _obj: &Object, - _size: usize, - ) -> Result>> { - Ok(None) - } - - async fn read_split_meta( - &self, - _obj: &Object, - _split_info: &SplitInfo, - ) -> Result>> { - Ok(None) - } - - fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { - let mut splits = vec![]; - for f in file_infos { - if f.compress_alg.is_none() || !T::is_splittable() { - splits.push(SplitInfo::from_file_info(f)) - } else { - splits.append(&mut f.split_by_size(split_size)) - } - } - splits - } - fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { tracing::info!("exe text"); InputFormatTextPipe::::execute_copy_with_aligner(ctx, pipeline) @@ -138,6 +109,58 @@ impl InputFormat for InputFormatText { fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { InputFormatTextPipe::::execute_stream(ctx, pipeline) } + + async fn get_splits( + &self, + plan: &CopyIntoPlan, + op: &Operator, + _settings: &Arc, + _schema: &DataSchemaRef, + ) -> Result>> { + let mut infos = vec![]; + for path in &plan.files { + let obj = op.object(path); + let size = obj.metadata().await?.content_length() as usize; + let compress_alg = InputContext::get_compression_alg_copy( + plan.stage_info.file_format_options.compression, + path, + )?; + if compress_alg.is_none() || !T::is_splittable() { + let file = Arc::new(FileInfo { + path: path.clone(), + size, // dummy + num_splits: 1, + compress_alg, + }); + infos.push(Arc::new(SplitInfo { + file, + seq_in_file: 1, + offset: 0, + size, // dummy + format_info: None, + })); + } else { + let split_size = 128usize * 1024 * 1024; + let split_offsets = split_by_size(size, split_size); + let file = Arc::new(FileInfo { + path: path.clone(), + size, + num_splits: split_offsets.len(), + compress_alg, + }); + for (i, (offset, size)) in split_offsets.into_iter().enumerate() { + infos.push(Arc::new(SplitInfo { + file: file.clone(), + seq_in_file: i, + offset, + size, + format_info: None, + })); + } + } + } + Ok(infos) + } } #[derive(Default)] @@ -260,13 +283,13 @@ impl AligningState { impl AligningStateTrait for AligningState { type Pipe = InputFormatTextPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { - let rows_to_skip = if split_info.seq_infile == 0 { + fn try_create(ctx: &Arc, split_info: &Arc) -> Result { + let rows_to_skip = if split_info.seq_in_file == 0 { ctx.rows_to_skip } else { 0 }; - let path = split_info.file_info.path.clone(); + let path = split_info.file.path.clone(); let decoder = ctx.get_compression_alg(&path)?.map(DecompressDecoder::new); let csv_reader = if T::format_type() == StageFileFormatType::Csv { diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 352cdf21ee5d..841a10ad8ea0 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -40,7 +40,7 @@ use crate::processors::sources::input_formats::source_deserializer::DeserializeS use crate::processors::sources::input_formats::transform_deserializer::DeserializeTransformer; pub struct Split { - pub(crate) info: SplitInfo, + pub(crate) info: Arc, pub(crate) rx: Receiver>, } @@ -54,7 +54,7 @@ pub struct StreamingReadBatch { pub trait AligningStateTrait: Sized { type Pipe: InputFormatPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result; + fn try_create(ctx: &Arc, split_info: &Arc) -> Result; fn align( &mut self, @@ -93,8 +93,10 @@ pub trait InputFormatPipe: Sized + Send + 'static { if batch.is_start { let (data_tx, data_rx) = tokio::sync::mpsc::channel(1); sender = Some(data_tx); - let split_info = - SplitInfo::from_stream_split(batch.path.clone(), batch.compression); + let split_info = Arc::new(SplitInfo::from_stream_split( + batch.path.clone(), + batch.compression, + )); split_tx .send(Ok(Split { info: split_info, @@ -168,7 +170,7 @@ pub trait InputFormatPipe: Sized + Send + 'static { tokio::spawn(async move { let mut futs = FuturesUnordered::new(); for s in &ctx_clone.splits { - let fut = Self::read_split(ctx_clone.clone(), s.clone()); + let fut = Self::read_split(ctx_clone.clone(), s); futs.push(fut); if futs.len() >= p { let row_batch = futs.next().await.unwrap().unwrap(); @@ -238,19 +240,22 @@ pub trait InputFormatPipe: Sized + Send + 'static { Ok(()) } - async fn read_split(_ctx: Arc, _split_info: SplitInfo) -> Result { + async fn read_split( + _ctx: Arc, + _split_info: &Arc, + ) -> Result { unimplemented!() } #[tracing::instrument(level = "debug", skip(ctx, batch_tx))] async fn copy_reader_with_aligner( ctx: Arc, - split_info: SplitInfo, + split_info: Arc, batch_tx: Sender>, ) -> Result<()> { tracing::debug!("start"); let operator = ctx.source.get_operator()?; - let object = operator.object(&split_info.file_info.path); + let object = operator.object(&split_info.file.path); let offset = split_info.offset as u64; let mut reader = object.range_reader(offset..).await?; loop { diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs index 512839b09b5f..98da642cc547 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs @@ -27,7 +27,11 @@ pub struct SplitInfo { impl Debug for SplitInfo { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}[{}]", self.file.path, self.seq_in_file) + f.debug_struct("FileInfo") + .field("seq_in_file", &self.seq_in_file) + .field("offset", &self.offset) + .field("size", &self.size) + .finish() } } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs index 7ab0cbf8c00e..cad610de38c6 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs @@ -140,9 +140,7 @@ impl Processor for Aligner { self.batch_rx = Some(split.rx); self.received_end_batch_of_split = false; tracing::debug!( - "aligner recv new split {} {}", - &split.info.file_info.path, - split.info.seq_infile + "aligner recv new split {}", &split.info ); } Ok(Err(e)) => { From b25f3b9db08d4d1fcad1357ef3b345908ca1508b Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 17:47:15 +0800 Subject: [PATCH 3/7] refactor(format): trait InputFormatPipe add fn get_split_meta. --- .../impls/input_format_parquet.rs | 5 +++-- .../input_formats/input_format_text.rs | 3 ++- .../sources/input_formats/input_pipeline.rs | 9 ++++++++ .../sources/input_formats/input_split.rs | 21 ++++++++++++++++--- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 82f74cc7fc55..30394d2418bd 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -150,17 +150,18 @@ pub struct ParquetFormatPipe; #[async_trait::async_trait] impl InputFormatPipe for ParquetFormatPipe { + type SplitMeta = SplitMeta; type ReadBatch = ReadBatch; type RowBatch = RowGroupInMemory; type AligningState = AligningState; type BlockBuilder = ParquetBlockBuilder; } -struct FileMeta { +pub struct FileMeta { pub fields: Arc>, } -struct SplitMeta { +pub struct SplitMeta { pub file: Arc, pub meta: RowGroupMetaData, } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index a2d5502c1cec..0d36fe236a04 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -81,6 +81,7 @@ pub struct InputFormatTextPipe { #[async_trait::async_trait] impl InputFormatPipe for InputFormatTextPipe { + type SplitMeta = (); type ReadBatch = Vec; type RowBatch = RowBatch; type AligningState = AligningState; @@ -134,7 +135,7 @@ impl InputFormat for InputFormatText { }); infos.push(Arc::new(SplitInfo { file, - seq_in_file: 1, + seq_in_file: 0, offset: 0, size, // dummy format_info: None, diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 841a10ad8ea0..de67afffa758 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -74,11 +74,20 @@ pub trait BlockBuilderTrait { #[async_trait::async_trait] pub trait InputFormatPipe: Sized + Send + 'static { + type SplitMeta; type ReadBatch: From> + Send + Debug; type RowBatch: Send; type AligningState: AligningStateTrait + Send; type BlockBuilder: BlockBuilderTrait + Send; + fn get_split_meta(split_info: &Arc) -> Option<&Self::SplitMeta> { + split_info + .format_info + .as_ref()? + .as_any() + .downcast_ref::() + } + fn execute_stream(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { let mut input = ctx.source.take_receiver()?; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs index 98da642cc547..fcb032599c8b 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs @@ -1,5 +1,20 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::any::Any; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; +use std::fmt::Display; use std::fmt::Formatter; use std::sync::Arc; @@ -57,12 +72,12 @@ impl SplitInfo { path, size: 0, num_splits: 1, - compress_alg + compress_alg, }), seq_in_file: 0, offset: 0, size: 0, - format_info: None + format_info: None, } } } From 73bda3bf0824c135d1ef7472d2ce6a1e87fc0206 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 18:59:26 +0800 Subject: [PATCH 4/7] export parquet_read::read_columns_async --- src/common/arrow/src/lib.rs | 1 + src/common/arrow/src/parquet_read.rs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/arrow/src/lib.rs b/src/common/arrow/src/lib.rs index e8dd27efc2c9..834c3f2f4f41 100644 --- a/src/common/arrow/src/lib.rs +++ b/src/common/arrow/src/lib.rs @@ -20,6 +20,7 @@ mod parquet_write; pub use arrow; pub use arrow_format; pub use parquet2 as parquet; +pub use parquet_read::read_columns_async; pub use parquet_read::read_columns_many_async; pub use parquet_write::write_parquet_file; diff --git a/src/common/arrow/src/parquet_read.rs b/src/common/arrow/src/parquet_read.rs index 207415e01e8a..00b9437f0890 100644 --- a/src/common/arrow/src/parquet_read.rs +++ b/src/common/arrow/src/parquet_read.rs @@ -47,7 +47,7 @@ where Result::Ok(chunk) } -async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( +pub async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( reader: &mut R, columns: &'a [ColumnChunkMetaData], field_name: &str, From f2c649ee6eb61e1ad15fa4a9b3cdd7cffc2718bb Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 19:02:10 +0800 Subject: [PATCH 5/7] chore: polish log. --- .../src/processors/sources/input_formats/source_aligner.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs index cad610de38c6..1ac650c16b57 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs @@ -139,9 +139,7 @@ impl Processor for Aligner { self.state = Some(I::AligningState::try_create(&self.ctx, &split.info)?); self.batch_rx = Some(split.rx); self.received_end_batch_of_split = false; - tracing::debug!( - "aligner recv new split {}", &split.info - ); + tracing::debug!("aligner recv new split {}", &split.info); } Ok(Err(e)) => { return Err(e); From 8bce85884bc9aa4b3d484d3fb8812853fab9004a Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 19:03:44 +0800 Subject: [PATCH 6/7] feat(parquet): read in parallel. --- .../impls/input_format_parquet.rs | 38 ++++++++++++++++++- .../input_formats/source_deserializer.rs | 3 ++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 30394d2418bd..4f2562c608ac 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -34,6 +34,7 @@ use common_arrow::parquet::metadata::ColumnChunkMetaData; use common_arrow::parquet::metadata::FileMetaData; use common_arrow::parquet::metadata::RowGroupMetaData; use common_arrow::parquet::read::read_metadata; +use common_arrow::read_columns_async; use common_datablocks::DataBlock; use common_datavalues::remove_nullable; use common_datavalues::DataField; @@ -43,6 +44,8 @@ use common_exception::Result; use common_io::prelude::FormatSettings; use common_pipeline_core::Pipeline; use common_settings::Settings; +use futures::AsyncRead; +use futures::AsyncSeek; use opendal::Operator; use similar_asserts::traits::MakeDiff; @@ -137,8 +140,7 @@ impl InputFormat for InputFormatParquet { } fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { - // todo(youngsofun): execute_copy_aligned - ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline) + ParquetFormatPipe::execute_copy_aligned(ctx, pipeline) } fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { @@ -155,6 +157,17 @@ impl InputFormatPipe for ParquetFormatPipe { type RowBatch = RowGroupInMemory; type AligningState = AligningState; type BlockBuilder = ParquetBlockBuilder; + + async fn read_split( + ctx: Arc, + split_info: &Arc, + ) -> Result { + let meta = Self::get_split_meta(split_info).expect("must success"); + let op = ctx.source.get_operator()?; + let obj = op.object(&split_info.file.path); + let mut reader = obj.seekable_reader(..(split_info.file.size as u64)); + RowGroupInMemory::read_async(&mut reader, meta.meta.clone(), meta.file.fields.clone()).await + } } pub struct FileMeta { @@ -201,6 +214,27 @@ impl RowGroupInMemory { }) } + async fn read_async( + reader: &mut R, + meta: RowGroupMetaData, + fields: Arc>, + ) -> Result { + let field_names = fields.iter().map(|x| x.name.as_str()).collect::>(); + let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names); + let mut filed_arrays = vec![]; + for field_name in field_names { + let meta_data = read_columns_async(reader, meta.columns(), field_name).await?; + let data = meta_data.into_iter().map(|t| t.1).collect::>(); + filed_arrays.push(data) + } + Ok(Self { + meta, + field_meta_indexes, + field_arrays: filed_arrays, + fields, + }) + } + fn get_arrow_chunk(&mut self) -> Result>> { let mut column_chunks = vec![]; let field_arrays = mem::take(&mut self.field_arrays); diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs index dadd99dd7cdb..5f1a6218d460 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs @@ -76,12 +76,15 @@ impl Processor for DeserializeSource { } else { match self.output_buffer.pop_front() { Some(data_block) => { + tracing::info!("DeserializeSource push rows {}", data_block.num_rows()); self.output.push_data(Ok(data_block)); Ok(Event::NeedConsume) } None => { if self.input_buffer.is_some() { Ok(Event::Sync) + } else if self.input_finished { + Ok(Event::Finished) } else { Ok(Event::Async) } From b4134b8092886ecee2c6c429dbe1e7755cf24af3 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 26 Sep 2022 22:21:20 +0800 Subject: [PATCH 7/7] fix(parquet): DeserializeSource did not finish output. --- .../src/processors/sources/input_formats/source_deserializer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs index 5f1a6218d460..3af6dd0dbef2 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs @@ -84,6 +84,7 @@ impl Processor for DeserializeSource { if self.input_buffer.is_some() { Ok(Event::Sync) } else if self.input_finished { + self.output.finish(); Ok(Event::Finished) } else { Ok(Event::Async)