Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parquet): read in parallel. #7903

Merged
merged 8 commits into from
Sep 27, 2022
1 change: 1 addition & 0 deletions src/common/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod parquet_write;
pub use arrow;
pub use arrow_format;
pub use parquet2 as parquet;
pub use parquet_read::read_columns_async;
pub use parquet_read::read_columns_many_async;
pub use parquet_write::write_parquet_file;

Expand Down
2 changes: 1 addition & 1 deletion src/common/arrow/src/parquet_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
Result::Ok(chunk)
}

async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
pub async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
Expand All @@ -26,12 +27,14 @@ use common_arrow::arrow::chunk::Chunk;
use common_arrow::arrow::datatypes::Field;
use common_arrow::arrow::io::parquet::read;
use common_arrow::arrow::io::parquet::read::read_columns;
use common_arrow::arrow::io::parquet::read::read_metadata_async;
use common_arrow::arrow::io::parquet::read::to_deserializer;
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
use common_arrow::parquet::metadata::ColumnChunkMetaData;
use common_arrow::parquet::metadata::FileMetaData;
use common_arrow::parquet::metadata::RowGroupMetaData;
use common_arrow::parquet::read::read_metadata;
use common_arrow::read_columns_async;
use common_datablocks::DataBlock;
use common_datavalues::remove_nullable;
use common_datavalues::DataField;
Expand All @@ -41,21 +44,34 @@ use common_exception::Result;
use common_io::prelude::FormatSettings;
use common_pipeline_core::Pipeline;
use common_settings::Settings;
use opendal::Object;
use futures::AsyncRead;
use futures::AsyncSeek;
use opendal::Operator;
use similar_asserts::traits::MakeDiff;

use crate::processors::sources::input_formats::delimiter::RecordDelimiter;
use crate::processors::sources::input_formats::input_context::CopyIntoPlan;
use crate::processors::sources::input_formats::input_context::InputContext;
use crate::processors::sources::input_formats::input_format::FileInfo;
use crate::processors::sources::input_formats::input_format::InputData;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::input_pipeline::AligningStateTrait;
use crate::processors::sources::input_formats::input_pipeline::BlockBuilderTrait;
use crate::processors::sources::input_formats::input_pipeline::InputFormatPipe;
use crate::processors::sources::input_formats::input_split::DynData;
use crate::processors::sources::input_formats::input_split::FileInfo;
use crate::processors::sources::input_formats::input_split::SplitInfo;
use crate::processors::sources::input_formats::InputFormat;

pub struct InputFormatParquet;

impl DynData for FileMetaData {
fn as_any(&self) -> &dyn Any {
self
}
}

fn col_offset(meta: &ColumnChunkMetaData) -> i64 {
meta.data_page_offset()
}

#[async_trait::async_trait]
impl InputFormat for InputFormatParquet {
fn get_format_settings(&self, _settings: &Arc<Settings>) -> Result<FormatSettings> {
Expand All @@ -71,33 +87,60 @@ impl InputFormat for InputFormatParquet {
b'_'
}

async fn read_file_meta(
async fn get_splits(
&self,
_obj: &Object,
_size: usize,
) -> Result<Option<Arc<dyn InputData>>> {
// todo(youngsofun): execute_copy_aligned
Ok(None)
}

async fn read_split_meta(
&self,
_obj: &Object,
_split_info: &SplitInfo,
) -> Result<Option<Box<dyn InputData>>> {
Ok(None)
}

fn split_files(&self, file_infos: Vec<FileInfo>, _split_size: usize) -> Vec<SplitInfo> {
file_infos
.into_iter()
.map(SplitInfo::from_file_info)
.collect()
plan: &CopyIntoPlan,
op: &Operator,
_settings: &Arc<Settings>,
schema: &DataSchemaRef,
) -> Result<Vec<Arc<SplitInfo>>> {
let mut infos = vec![];
for path in &plan.files {
let obj = op.object(path);
let size = obj.metadata().await?.content_length() as usize;
let mut reader = obj.seekable_reader(..(size as u64));
let mut file_meta = read_metadata_async(&mut reader)
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
let row_groups = mem::take(&mut file_meta.row_groups);
let fields = Arc::new(get_fields(&file_meta, schema)?);
let read_file_meta = Arc::new(FileMeta { fields });
let file_info = Arc::new(FileInfo {
path: path.clone(),
size,
num_splits: row_groups.len(),
compress_alg: None,
});

for (i, rg) in row_groups.into_iter().enumerate() {
if !rg.columns().is_empty() {
let offset = rg
.columns()
.iter()
.map(col_offset)
.min()
.expect("must success") as usize;
let size = rg.total_byte_size();
let meta = Arc::new(SplitMeta {
file: read_file_meta.clone(),
meta: rg,
});
let info = Arc::new(SplitInfo {
file: file_info.clone(),
seq_in_file: i,
offset,
size,
format_info: Some(meta),
});
infos.push(info);
}
}
}
Ok(infos)
}

fn exec_copy(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
// todo(youngsofun): execute_copy_aligned
ParquetFormatPipe::execute_copy_with_aligner(ctx, pipeline)
ParquetFormatPipe::execute_copy_aligned(ctx, pipeline)
}

fn exec_stream(&self, ctx: Arc<InputContext>, pipeline: &mut Pipeline) -> Result<()> {
Expand All @@ -109,10 +152,37 @@ pub struct ParquetFormatPipe;

#[async_trait::async_trait]
impl InputFormatPipe for ParquetFormatPipe {
type SplitMeta = SplitMeta;
type ReadBatch = ReadBatch;
type RowBatch = RowGroupInMemory;
type AligningState = AligningState;
type BlockBuilder = ParquetBlockBuilder;

async fn read_split(
ctx: Arc<InputContext>,
split_info: &Arc<SplitInfo>,
) -> Result<Self::RowBatch> {
let meta = Self::get_split_meta(split_info).expect("must success");
let op = ctx.source.get_operator()?;
let obj = op.object(&split_info.file.path);
let mut reader = obj.seekable_reader(..(split_info.file.size as u64));
RowGroupInMemory::read_async(&mut reader, meta.meta.clone(), meta.file.fields.clone()).await
}
}

pub struct FileMeta {
pub fields: Arc<Vec<Field>>,
}

pub struct SplitMeta {
pub file: Arc<FileMeta>,
pub meta: RowGroupMetaData,
}

impl DynData for SplitMeta {
fn as_any(&self) -> &dyn Any {
self
}
}

pub struct RowGroupInMemory {
Expand Down Expand Up @@ -144,6 +214,27 @@ impl RowGroupInMemory {
})
}

async fn read_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
meta: RowGroupMetaData,
fields: Arc<Vec<Field>>,
) -> Result<Self> {
let field_names = fields.iter().map(|x| x.name.as_str()).collect::<Vec<_>>();
let field_meta_indexes = split_column_metas_by_field(meta.columns(), &field_names);
let mut filed_arrays = vec![];
for field_name in field_names {
let meta_data = read_columns_async(reader, meta.columns(), field_name).await?;
let data = meta_data.into_iter().map(|t| t.1).collect::<Vec<_>>();
filed_arrays.push(data)
}
Ok(Self {
meta,
field_meta_indexes,
field_arrays: filed_arrays,
fields,
})
}

fn get_arrow_chunk(&mut self) -> Result<Chunk<Box<dyn Array>>> {
let mut column_chunks = vec![];
let field_arrays = mem::take(&mut self.field_arrays);
Expand Down Expand Up @@ -212,14 +303,14 @@ impl BlockBuilderTrait for ParquetBlockBuilder {

pub struct AligningState {
ctx: Arc<InputContext>,
split_info: SplitInfo,
split_info: Arc<SplitInfo>,
buffers: Vec<Vec<u8>>,
}

impl AligningStateTrait for AligningState {
type Pipe = ParquetFormatPipe;

fn try_create(ctx: &Arc<InputContext>, split_info: &SplitInfo) -> Result<Self> {
fn try_create(ctx: &Arc<InputContext>, split_info: &Arc<SplitInfo>) -> Result<Self> {
Ok(AligningState {
ctx: ctx.clone(),
split_info: split_info.clone(),
Expand All @@ -238,7 +329,7 @@ impl AligningStateTrait for AligningState {
let size = file_in_memory.len();
tracing::debug!(
"aligning parquet file {} of {} bytes",
self.split_info.file_info.path,
self.split_info.file.path,
size,
);
let mut cursor = Cursor::new(file_in_memory);
Expand All @@ -256,7 +347,7 @@ impl AligningStateTrait for AligningState {
}
tracing::info!(
"align parquet file {} of {} bytes to {} row groups",
self.split_info.file_info.path,
self.split_info.file.path,
size,
row_batches.len()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@ use crate::processors::sources::input_formats::impls::input_format_csv::InputFor
use crate::processors::sources::input_formats::impls::input_format_ndjson::InputFormatNDJson;
use crate::processors::sources::input_formats::impls::input_format_parquet::InputFormatParquet;
use crate::processors::sources::input_formats::impls::input_format_tsv::InputFormatTSV;
use crate::processors::sources::input_formats::input_format::FileInfo;
use crate::processors::sources::input_formats::input_format::SplitInfo;
use crate::processors::sources::input_formats::input_format_text::InputFormatText;
use crate::processors::sources::input_formats::input_pipeline::StreamingReadBatch;
use crate::processors::sources::input_formats::input_split::SplitInfo;
use crate::processors::sources::input_formats::InputFormat;

const MIN_ROW_PER_BLOCK: usize = 800 * 1000;
Expand Down Expand Up @@ -108,7 +107,7 @@ pub struct InputContext {
pub schema: DataSchemaRef,
pub source: InputSource,
pub format: Arc<dyn InputFormat>,
pub splits: Vec<SplitInfo>,
pub splits: Vec<Arc<SplitInfo>>,

// row format only
pub rows_to_skip: usize,
Expand Down Expand Up @@ -170,11 +169,11 @@ impl InputContext {
}
let plan = Box::new(CopyIntoPlan { stage_info, files });
let read_batch_size = settings.get_input_read_buffer_size()? as usize;
let split_size = 128usize * 1024 * 1024;
let file_format_options = &plan.stage_info.file_format_options;
let format = Self::get_input_format(&file_format_options.format)?;
let file_infos = Self::get_file_infos(&format, &operator, &plan).await?;
let splits = format.split_files(file_infos, split_size);
let splits = format
.get_splits(&plan, &operator, &settings, &schema)
.await?;
let rows_per_block = MIN_ROW_PER_BLOCK;
let record_delimiter = {
if file_format_options.record_delimiter.is_empty() {
Expand Down Expand Up @@ -263,31 +262,6 @@ impl InputContext {
})
}

async fn get_file_infos(
format: &Arc<dyn InputFormat>,
op: &Operator,
plan: &CopyIntoPlan,
) -> Result<Vec<FileInfo>> {
let mut infos = vec![];
for p in &plan.files {
let obj = op.object(p);
let size = obj.metadata().await?.content_length() as usize;
let file_meta = format.read_file_meta(&obj, size).await?;
let compress_alg = InputContext::get_compression_alg_copy(
plan.stage_info.file_format_options.compression,
p,
)?;
let info = FileInfo {
path: p.clone(),
size,
compress_alg,
file_meta,
};
infos.push(info)
}
Ok(infos)
}

pub fn num_prefetch_splits(&self) -> Result<usize> {
Ok(self.settings.get_max_threads()? as usize)
}
Expand Down
Loading