diff --git a/src/common/arrow/src/lib.rs b/src/common/arrow/src/lib.rs index e8dd27efc2c9f..834c3f2f4f41c 100644 --- a/src/common/arrow/src/lib.rs +++ b/src/common/arrow/src/lib.rs @@ -20,6 +20,7 @@ mod parquet_write; pub use arrow; pub use arrow_format; pub use parquet2 as parquet; +pub use parquet_read::read_columns_async; pub use parquet_read::read_columns_many_async; pub use parquet_write::write_parquet_file; diff --git a/src/common/arrow/src/parquet_read.rs b/src/common/arrow/src/parquet_read.rs index 207415e01e8ae..00b9437f0890f 100644 --- a/src/common/arrow/src/parquet_read.rs +++ b/src/common/arrow/src/parquet_read.rs @@ -47,7 +47,7 @@ where Result::Ok(chunk) } -async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( +pub async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( reader: &mut R, columns: &'a [ColumnChunkMetaData], field_name: &str, diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs index 471beec5942ac..4f2562c608ac6 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/impls/input_format_parquet.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; use std::fmt::Debug; use std::fmt::Formatter; @@ -26,12 +27,14 @@ use common_arrow::arrow::chunk::Chunk; use common_arrow::arrow::datatypes::Field; use common_arrow::arrow::io::parquet::read; use common_arrow::arrow::io::parquet::read::read_columns; +use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::to_deserializer; use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; use common_arrow::parquet::metadata::ColumnChunkMetaData; use common_arrow::parquet::metadata::FileMetaData; use common_arrow::parquet::metadata::RowGroupMetaData; use common_arrow::parquet::read::read_metadata; +use common_arrow::read_columns_async; use common_datablocks::DataBlock; use common_datavalues::remove_nullable; use common_datavalues::DataField; @@ -41,21 +44,34 @@ use common_exception::Result; use common_io::prelude::FormatSettings; use common_pipeline_core::Pipeline; use common_settings::Settings; -use opendal::Object; +use futures::AsyncRead; +use futures::AsyncSeek; +use opendal::Operator; use similar_asserts::traits::MakeDiff; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::InputData; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_split::DynData; +use crate::processors::sources::input_formats::input_split::FileInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::InputFormat; pub struct InputFormatParquet; +impl DynData for FileMetaData { + fn as_any(&self) -> &dyn Any { + self + } +} + +fn col_offset(meta: &ColumnChunkMetaData) -> i64 { + meta.data_page_offset() +} + #[async_trait::async_trait] impl InputFormat for InputFormatParquet { fn get_format_settings(&self, _settings: &Arc) -> Result { @@ -71,33 +87,60 @@ impl InputFormat for InputFormatParquet { b'_' } - async fn read_file_meta( + async fn get_splits( &self, - _obj: &Object, - _size: usize, - ) -> Result>> { - // todo(youngsofun): execute_copy_aligned - Ok(None) - } - - async fn read_split_meta( - &self, - _obj: &Object, - _split_info: &SplitInfo, - ) -> Result>> { - Ok(None) - } - - fn split_files(&self, file_infos: Vec, _split_size: usize) -> Vec { - file_infos - .into_iter() - .map(SplitInfo::from_file_info) - .collect() + plan: &CopyIntoPlan, + op: &Operator, + _settings: &Arc, + schema: &DataSchemaRef, + ) -> Result>> { + let mut infos = vec![]; + for path in &plan.files { + let obj = op.object(path); + let size = obj.metadata().await?.content_length() as usize; + let mut reader = obj.seekable_reader(..(size as u64)); + let mut file_meta = read_metadata_async(&mut reader) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + let row_groups = mem::take(&mut file_meta.row_groups); + let fields = Arc::new(get_fields(&file_meta, schema)?); + let read_file_meta = Arc::new(FileMeta { fields }); + let file_info = Arc::new(FileInfo { + path: path.clone(), + size, + num_splits: row_groups.len(), + compress_alg: None, + }); + + for (i, rg) in row_groups.into_iter().enumerate() { + if !rg.columns().is_empty() { + let offset = rg + .columns() + .iter() + .map(col_offset) + .min() + .expect("must success") as usize; + let size = rg.total_byte_size(); + let meta = Arc::new(SplitMeta { + file: read_file_meta.clone(), + meta: rg, + }); + let info = Arc::new(SplitInfo { + file: file_info.clone(), + seq_in_file: i, + offset, + size, + format_info: Some(meta), + }); + infos.push(info); + } + } + } + Ok(infos) } fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { - // todo(youngsofun): execute_copy_aligned - ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline) + ParquetFormatPipe::execute_copy_aligned(ctx, pipeline) } fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { @@ -109,10 +152,37 @@ pub struct ParquetFormatPipe; #[async_trait::async_trait] impl InputFormatPipe for ParquetFormatPipe { + type SplitMeta = SplitMeta; type ReadBatch = ReadBatch; type RowBatch = RowGroupInMemory; type AligningState = AligningState; type BlockBuilder = ParquetBlockBuilder; + + async fn read_split( + ctx: Arc, + split_info: &Arc, + ) -> Result { + let meta = Self::get_split_meta(split_info).expect("must success"); + let op = ctx.source.get_operator()?; + let obj = op.object(&split_info.file.path); + let mut reader = obj.seekable_reader(..(split_info.file.size as u64)); + RowGroupInMemory::read_async(&mut reader, meta.meta.clone(), meta.file.fields.clone()).await + } +} + +pub struct FileMeta { + pub fields: Arc>, +} + +pub struct SplitMeta { + pub file: Arc, + pub meta: RowGroupMetaData, +} + +impl DynData for SplitMeta { + fn as_any(&self) -> &dyn Any { + self + } } pub struct RowGroupInMemory { @@ -144,6 +214,27 @@ impl RowGroupInMemory { }) } + async fn read_async( + reader: &mut R, + meta: RowGroupMetaData, + fields: Arc>, + ) -> Result { + let field_names = fields.iter().map(|x| x.name.as_str()).collect::>(); + let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names); + let mut filed_arrays = vec![]; + for field_name in field_names { + let meta_data = read_columns_async(reader, meta.columns(), field_name).await?; + let data = meta_data.into_iter().map(|t| t.1).collect::>(); + filed_arrays.push(data) + } + Ok(Self { + meta, + field_meta_indexes, + field_arrays: filed_arrays, + fields, + }) + } + fn get_arrow_chunk(&mut self) -> Result>> { let mut column_chunks = vec![]; let field_arrays = mem::take(&mut self.field_arrays); @@ -212,14 +303,14 @@ impl BlockBuilderTrait for ParquetBlockBuilder { pub struct AligningState { ctx: Arc, - split_info: SplitInfo, + split_info: Arc, buffers: Vec>, } impl AligningStateTrait for AligningState { type Pipe = ParquetFormatPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { + fn try_create(ctx: &Arc, split_info: &Arc) -> Result { Ok(AligningState { ctx: ctx.clone(), split_info: split_info.clone(), @@ -238,7 +329,7 @@ impl AligningStateTrait for AligningState { let size = file_in_memory.len(); tracing::debug!( "aligning parquet file {} of {} bytes", - self.split_info.file_info.path, + self.split_info.file.path, size, ); let mut cursor = Cursor::new(file_in_memory); @@ -256,7 +347,7 @@ impl AligningStateTrait for AligningState { } tracing::info!( "align parquet file {} of {} bytes to {} row groups", - self.split_info.file_info.path, + self.split_info.file.path, size, row_batches.len() ); diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs index 207464c5f089d..6eaa07ade30c6 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_context.rs @@ -37,10 +37,9 @@ use crate::processors::sources::input_formats::impls::input_format_csv::InputFor use crate::processors::sources::input_formats::impls::input_format_ndjson::InputFormatNDJson; use crate::processors::sources::input_formats::impls::input_format_parquet::InputFormatParquet; use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_format_text::InputFormatText; use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::InputFormat; const MIN_ROW_PER_BLOCK: usize = 800 * 1000; @@ -108,7 +107,7 @@ pub struct InputContext { pub schema: DataSchemaRef, pub source: InputSource, pub format: Arc, - pub splits: Vec, + pub splits: Vec>, // row format only pub rows_to_skip: usize, @@ -170,11 +169,11 @@ impl InputContext { } let plan = Box::new(CopyIntoPlan { stage_info, files }); let read_batch_size = settings.get_input_read_buffer_size()? as usize; - let split_size = 128usize * 1024 * 1024; let file_format_options = &plan.stage_info.file_format_options; let format = Self::get_input_format(&file_format_options.format)?; - let file_infos = Self::get_file_infos(&format, &operator, &plan).await?; - let splits = format.split_files(file_infos, split_size); + let splits = format + .get_splits(&plan, &operator, &settings, &schema) + .await?; let rows_per_block = MIN_ROW_PER_BLOCK; let record_delimiter = { if file_format_options.record_delimiter.is_empty() { @@ -263,31 +262,6 @@ impl InputContext { }) } - async fn get_file_infos( - format: &Arc, - op: &Operator, - plan: &CopyIntoPlan, - ) -> Result> { - let mut infos = vec![]; - for p in &plan.files { - let obj = op.object(p); - let size = obj.metadata().await?.content_length() as usize; - let file_meta = format.read_file_meta(&obj, size).await?; - let compress_alg = InputContext::get_compression_alg_copy( - plan.stage_info.file_format_options.compression, - p, - )?; - let info = FileInfo { - path: p.clone(), - size, - compress_alg, - file_meta, - }; - infos.push(info) - } - Ok(infos) - } - pub fn num_prefetch_splits(&self) -> Result { Ok(self.settings.get_max_threads()? as usize) } diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs index 3fe63cb38a773..919c08e5b3918 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format.rs @@ -12,29 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::any::Any; -use std::cmp::min; -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; +use common_datavalues::DataSchemaRef; use common_exception::Result; use common_io::prelude::FormatSettings; use common_pipeline_core::Pipeline; use common_settings::Settings; -use opendal::io_util::CompressAlgorithm; -use opendal::Object; +use opendal::Operator; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; - -pub trait InputData: Send + Sync + 'static { - fn as_any(&self) -> &dyn Any; -} - -pub trait InputState: Send { - fn as_any(&mut self) -> &mut dyn Any; -} +use crate::processors::sources::input_formats::input_split::SplitInfo; #[async_trait::async_trait] pub trait InputFormat: Send + Sync { @@ -44,102 +34,15 @@ pub trait InputFormat: Send + Sync { fn default_field_delimiter(&self) -> u8; - async fn read_file_meta(&self, obj: &Object, size: usize) - -> Result>>; - - async fn read_split_meta( + async fn get_splits( &self, - obj: &Object, - split_info: &SplitInfo, - ) -> Result>>; - - fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec; + plan: &CopyIntoPlan, + op: &Operator, + settings: &Arc, + schema: &DataSchemaRef, + ) -> Result>>; fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()>; } - -#[derive(Clone)] -pub struct FileInfo { - pub path: String, - pub size: usize, - pub compress_alg: Option, - pub file_meta: Option>, -} - -impl FileInfo { - pub fn split_by_size(&self, split_size: usize) -> Vec { - let mut splits = vec![]; - let n = (self.size + split_size - 1) / split_size; - for i in 0..n - 1 { - splits.push(SplitInfo { - file_info: self.clone(), - seq_infile: i, - is_end: i == n - 1, - offset: i * split_size, - len: min((i + 1) * split_size, self.size), - split_meta: None, - }) - } - splits - } -} - -impl Debug for FileInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("FileInfo") - .field("path", &self.path) - .field("size", &self.size) - .finish() - } -} - -#[derive(Clone)] -pub struct SplitInfo { - pub file_info: FileInfo, - pub seq_infile: usize, - pub is_end: bool, - pub offset: usize, - pub len: usize, - pub split_meta: Option>, -} - -impl SplitInfo { - pub fn from_file_info(file_info: FileInfo) -> Self { - let len = file_info.size; - Self { - file_info, - seq_infile: 0, - is_end: true, - offset: 0, - len, - split_meta: None, - } - } - - pub fn from_stream_split(path: String, compress_alg: Option) -> Self { - SplitInfo { - file_info: FileInfo { - path, - size: 0, - compress_alg, - file_meta: None, - }, - seq_infile: 0, - offset: 0, - len: 0, - is_end: false, - split_meta: None, - } - } -} - -impl Debug for SplitInfo { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SplitInfo") - .field("file_info", &self.file_info) - .field("seq_infile", &self.seq_infile) - .finish() - } -} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs index c7a521683718c..0d36fe236a047 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_format_text.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use chrono_tz::Tz; use common_datablocks::DataBlock; +use common_datavalues::DataSchemaRef; use common_datavalues::TypeDeserializer; use common_datavalues::TypeDeserializerImpl; use common_exception::ErrorCode; @@ -28,18 +29,19 @@ use common_pipeline_core::Pipeline; use common_settings::Settings; use opendal::io_util::DecompressDecoder; use opendal::io_util::DecompressState; -use opendal::Object; +use opendal::Operator; use super::InputFormat; use crate::processors::sources::input_formats::delimiter::RecordDelimiter; use crate::processors::sources::input_formats::impls::input_format_csv::CsvReaderState; +use crate::processors::sources::input_formats::input_context::CopyIntoPlan; use crate::processors::sources::input_formats::input_context::InputContext; -use crate::processors::sources::input_formats::input_format::FileInfo; -use crate::processors::sources::input_formats::input_format::InputData; -use crate::processors::sources::input_formats::input_format::SplitInfo; use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait; use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait; use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe; +use crate::processors::sources::input_formats::input_split::split_by_size; +use crate::processors::sources::input_formats::input_split::FileInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; pub trait InputFormatTextBase: Sized + Send + Sync + 'static { fn format_type() -> StageFileFormatType; @@ -79,6 +81,7 @@ pub struct InputFormatTextPipe { #[async_trait::async_trait] impl InputFormatPipe for InputFormatTextPipe { + type SplitMeta = (); type ReadBatch = Vec; type RowBatch = RowBatch; type AligningState = AligningState; @@ -99,34 +102,6 @@ impl InputFormat for InputFormatText { T::default_field_delimiter() } - async fn read_file_meta( - &self, - _obj: &Object, - _size: usize, - ) -> Result>> { - Ok(None) - } - - async fn read_split_meta( - &self, - _obj: &Object, - _split_info: &SplitInfo, - ) -> Result>> { - Ok(None) - } - - fn split_files(&self, file_infos: Vec, split_size: usize) -> Vec { - let mut splits = vec![]; - for f in file_infos { - if f.compress_alg.is_none() || !T::is_splittable() { - splits.push(SplitInfo::from_file_info(f)) - } else { - splits.append(&mut f.split_by_size(split_size)) - } - } - splits - } - fn exec_copy(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { tracing::info!("exe text"); InputFormatTextPipe::::execute_copy_with_aligner(ctx, pipeline) @@ -135,6 +110,58 @@ impl InputFormat for InputFormatText { fn exec_stream(&self, ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { InputFormatTextPipe::::execute_stream(ctx, pipeline) } + + async fn get_splits( + &self, + plan: &CopyIntoPlan, + op: &Operator, + _settings: &Arc, + _schema: &DataSchemaRef, + ) -> Result>> { + let mut infos = vec![]; + for path in &plan.files { + let obj = op.object(path); + let size = obj.metadata().await?.content_length() as usize; + let compress_alg = InputContext::get_compression_alg_copy( + plan.stage_info.file_format_options.compression, + path, + )?; + if compress_alg.is_none() || !T::is_splittable() { + let file = Arc::new(FileInfo { + path: path.clone(), + size, // dummy + num_splits: 1, + compress_alg, + }); + infos.push(Arc::new(SplitInfo { + file, + seq_in_file: 0, + offset: 0, + size, // dummy + format_info: None, + })); + } else { + let split_size = 128usize * 1024 * 1024; + let split_offsets = split_by_size(size, split_size); + let file = Arc::new(FileInfo { + path: path.clone(), + size, + num_splits: split_offsets.len(), + compress_alg, + }); + for (i, (offset, size)) in split_offsets.into_iter().enumerate() { + infos.push(Arc::new(SplitInfo { + file: file.clone(), + seq_in_file: i, + offset, + size, + format_info: None, + })); + } + } + } + Ok(infos) + } } #[derive(Default)] @@ -257,13 +284,13 @@ impl AligningState { impl AligningStateTrait for AligningState { type Pipe = InputFormatTextPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result { - let rows_to_skip = if split_info.seq_infile == 0 { + fn try_create(ctx: &Arc, split_info: &Arc) -> Result { + let rows_to_skip = if split_info.seq_in_file == 0 { ctx.rows_to_skip } else { 0 }; - let path = split_info.file_info.path.clone(); + let path = split_info.file.path.clone(); let decoder = ctx.get_compression_alg(&path)?.map(DecompressDecoder::new); let csv_reader = if T::format_type() == StageFileFormatType::Csv { diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs index 28742a5e2bee0..de67afffa7586 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_pipeline.rs @@ -34,13 +34,13 @@ use opendal::io_util::CompressAlgorithm; use crate::processors::sources::input_formats::input_context::InputContext; use crate::processors::sources::input_formats::input_context::InputPlan; use crate::processors::sources::input_formats::input_context::StreamPlan; -use crate::processors::sources::input_formats::input_format::SplitInfo; +use crate::processors::sources::input_formats::input_split::SplitInfo; use crate::processors::sources::input_formats::source_aligner::Aligner; use crate::processors::sources::input_formats::source_deserializer::DeserializeSource; use crate::processors::sources::input_formats::transform_deserializer::DeserializeTransformer; pub struct Split { - pub(crate) info: SplitInfo, + pub(crate) info: Arc, pub(crate) rx: Receiver>, } @@ -54,7 +54,7 @@ pub struct StreamingReadBatch { pub trait AligningStateTrait: Sized { type Pipe: InputFormatPipe; - fn try_create(ctx: &Arc, split_info: &SplitInfo) -> Result; + fn try_create(ctx: &Arc, split_info: &Arc) -> Result; fn align( &mut self, @@ -74,11 +74,20 @@ pub trait BlockBuilderTrait { #[async_trait::async_trait] pub trait InputFormatPipe: Sized + Send + 'static { + type SplitMeta; type ReadBatch: From> + Send + Debug; type RowBatch: Send; type AligningState: AligningStateTrait + Send; type BlockBuilder: BlockBuilderTrait + Send; + fn get_split_meta(split_info: &Arc) -> Option<&Self::SplitMeta> { + split_info + .format_info + .as_ref()? + .as_any() + .downcast_ref::() + } + fn execute_stream(ctx: Arc, pipeline: &mut Pipeline) -> Result<()> { let mut input = ctx.source.take_receiver()?; @@ -93,8 +102,10 @@ pub trait InputFormatPipe: Sized + Send + 'static { if batch.is_start { let (data_tx, data_rx) = tokio::sync::mpsc::channel(1); sender = Some(data_tx); - let split_info = - SplitInfo::from_stream_split(batch.path.clone(), batch.compression); + let split_info = Arc::new(SplitInfo::from_stream_split( + batch.path.clone(), + batch.compression, + )); split_tx .send(Ok(Split { info: split_info, @@ -168,7 +179,7 @@ pub trait InputFormatPipe: Sized + Send + 'static { tokio::spawn(async move { let mut futs = FuturesUnordered::new(); for s in &ctx_clone.splits { - let fut = Self::read_split(ctx_clone.clone(), s.clone()); + let fut = Self::read_split(ctx_clone.clone(), s); futs.push(fut); if futs.len() >= p { let row_batch = futs.next().await.unwrap().unwrap(); @@ -238,19 +249,22 @@ pub trait InputFormatPipe: Sized + Send + 'static { Ok(()) } - async fn read_split(_ctx: Arc, _split_info: SplitInfo) -> Result { + async fn read_split( + _ctx: Arc, + _split_info: &Arc, + ) -> Result { unimplemented!() } #[tracing::instrument(level = "debug", skip(ctx, batch_tx))] async fn copy_reader_with_aligner( ctx: Arc, - split_info: SplitInfo, + split_info: Arc, batch_tx: Sender>, ) -> Result<()> { tracing::debug!("start"); let operator = ctx.source.get_operator()?; - let object = operator.object(&split_info.file_info.path); + let object = operator.object(&split_info.file.path); let offset = split_info.offset as u64; let mut reader = object.range_reader(offset..).await?; loop { diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs new file mode 100644 index 0000000000000..fcb032599c8bf --- /dev/null +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/input_split.rs @@ -0,0 +1,83 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::Debug; +use std::fmt::Display; +use std::fmt::Formatter; +use std::sync::Arc; + +use opendal::io_util::CompressAlgorithm; + +pub trait DynData: Send + Sync + 'static { + fn as_any(&self) -> &dyn Any; +} + +#[derive(Debug)] +pub struct FileInfo { + pub path: String, + pub size: usize, + pub num_splits: usize, + pub compress_alg: Option, +} + +pub struct SplitInfo { + pub file: Arc, + pub seq_in_file: usize, + pub offset: usize, + pub size: usize, + pub format_info: Option>, +} + +impl Debug for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileInfo") + .field("seq_in_file", &self.seq_in_file) + .field("offset", &self.offset) + .field("size", &self.size) + .finish() + } +} + +impl Display for SplitInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}[{}]", self.file.path, self.seq_in_file) + } +} + +pub fn split_by_size(size: usize, split_size: usize) -> Vec<(usize, usize)> { + let mut splits = vec![]; + let n = (size + split_size - 1) / split_size; + for i in 0..n - 1 { + splits.push((i * split_size, std::cmp::min((i + 1) * split_size, size))) + } + splits +} + +impl SplitInfo { + pub fn from_stream_split(path: String, compress_alg: Option) -> Self { + SplitInfo { + file: Arc::new(FileInfo { + path, + size: 0, + num_splits: 1, + compress_alg, + }), + seq_in_file: 0, + offset: 0, + size: 0, + format_info: None, + } + } +} diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs index e281ac5197a4d..d96ebe0910433 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/mod.rs @@ -18,6 +18,7 @@ mod input_context; mod input_format; mod input_format_text; mod input_pipeline; +mod input_split; mod source_aligner; mod source_deserializer; mod transform_deserializer; diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs index 7ab0cbf8c00e3..1ac650c16b57a 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_aligner.rs @@ -139,11 +139,7 @@ impl Processor for Aligner { self.state = Some(I::AligningState::try_create(&self.ctx, &split.info)?); self.batch_rx = Some(split.rx); self.received_end_batch_of_split = false; - tracing::debug!( - "aligner recv new split {} {}", - &split.info.file_info.path, - split.info.seq_infile - ); + tracing::debug!("aligner recv new split {}", &split.info); } Ok(Err(e)) => { return Err(e); diff --git a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs index dadd99dd7cdb2..3af6dd0dbef22 100644 --- a/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs +++ b/src/query/pipeline/sources/src/processors/sources/input_formats/source_deserializer.rs @@ -76,12 +76,16 @@ impl Processor for DeserializeSource { } else { match self.output_buffer.pop_front() { Some(data_block) => { + tracing::info!("DeserializeSource push rows {}", data_block.num_rows()); self.output.push_data(Ok(data_block)); Ok(Event::NeedConsume) } None => { if self.input_buffer.is_some() { Ok(Event::Sync) + } else if self.input_finished { + self.output.finish(); + Ok(Event::Finished) } else { Ok(Event::Async) }