Skip to content

Commit

Permalink
Merge pull request #3270 from dantengsky/fix-3269
Browse files Browse the repository at this point in the history
ISSUE-3269: refactoring fuse engine sub modules
  • Loading branch information
BohuTANG authored Dec 7, 2021
2 parents 8bd2588 + c0da61a commit 7096d74
Show file tree
Hide file tree
Showing 31 changed files with 433 additions and 483 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions common/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ rusoto_core = "0.47.0"
rusoto_s3 = "0.47.0"
rusoto_sts = "0.47.0"
rusoto_credential = "0.47.0"
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.72"

[dev-dependencies]
common-metrics= {path = "../metrics"}
Expand Down
8 changes: 0 additions & 8 deletions common/dal/src/data_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@

use std::io::Read;
use std::io::Seek;
use std::sync::Arc;

use common_exception::Result;
use futures::stream::Stream;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::AsyncSeek;
use serde::de::DeserializeOwned;

pub type Bytes = Vec<u8>;

Expand Down Expand Up @@ -60,9 +58,3 @@ pub trait DataAccessor: Send + Sync {
Ok(buffer)
}
}

pub async fn read_obj<T: DeserializeOwned>(da: Arc<dyn DataAccessor>, loc: String) -> Result<T> {
let bytes = da.read(&loc).await?;
let r = serde_json::from_slice::<T>(&bytes)?;
Ok(r)
}
1 change: 0 additions & 1 deletion common/dal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub use accessors::azure_blob::AzureBlobInputStream;
pub use accessors::local::Local;
pub use context::DalContext;
pub use context::DalMetrics;
pub use data_accessor::read_obj;
pub use data_accessor::AsyncSeekableReader;
pub use data_accessor::Bytes;
pub use data_accessor::DataAccessor;
Expand Down
8 changes: 0 additions & 8 deletions common/dal/tests/it/accessors/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ async fn local_read(loops: u32) -> common_exception::Result<()> {
Ok(())
}

// enable this if need to re-produce issue #2997
#[tokio::test]
#[ignore]
async fn test_da_local_hangs() -> common_exception::Result<()> {
let read_fut = local_read(100);
futures::executor::block_on(read_fut)
}

#[tokio::test]
async fn test_da_local_normal() -> common_exception::Result<()> {
let read_fut = local_read(1000);
Expand Down
4 changes: 2 additions & 2 deletions query/src/storages/fuse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ t this stage, we rely on background tasks to merge the data properly.

In case of conflicts, "Coordinator" need to re-try the transaction.(OCC, Table level, READ-COMMITTED)

For this iteration, the "Coordinator" is the `Table` itself.
For this iteration, the "Coordinator" is the interpreter which execute the statement.


**Scan Flow:**
Expand All @@ -65,5 +65,5 @@ t this stage, we rely on background tasks to merge the data properly.

- `Table::read`

Prunes columns/roles by using the plan criteria, and statistics/index insides the parquet file.
Prunes columns/rows by using the plan criteria, and statistics/index insides the parquet file.

180 changes: 0 additions & 180 deletions query/src/storages/fuse/io/block_appender.rs

This file was deleted.

117 changes: 117 additions & 0 deletions query/src/storages/fuse/io/block_stream_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
use std::cmp::Reverse;
use std::sync::Arc;

use common_dal::DataAccessor;
use common_datablocks::DataBlock;
use common_datavalues::DataSchema;
use common_exception::Result;
use common_streams::SendableDataBlockStream;
use futures::stream::TryChunksError;
use futures::StreamExt;
use futures::TryStreamExt;

use super::block_writer;
use crate::storages::fuse::io::locations::gen_block_location;
use crate::storages::fuse::meta::SegmentInfo;
use crate::storages::fuse::meta::Statistics;
use crate::storages::fuse::statistics::StatisticsAccumulator;

pub struct BlockStreamWriter;

impl BlockStreamWriter {
pub async fn write_block_stream(
data_accessor: Arc<dyn DataAccessor>,
stream: SendableDataBlockStream,
data_schema: &DataSchema,
chunk_block_num: usize,
block_size_threshold: usize,
) -> Result<Vec<SegmentInfo>> {
// filter out empty blocks
let stream = stream.try_filter(|block| std::future::ready(block.num_rows() > 0));

// chunks by chunk_block_num
let mut stream = stream.try_chunks(chunk_block_num);

let mut segments = vec![];
// accumulate the stats and save the blocks
while let Some(item) = stream.next().await {
let blocks = item.map_err(|TryChunksError(_, e)| e)?;
// re-shape the blocks
let blocks = Self::reshape_blocks(blocks, block_size_threshold)?;
let mut acc = StatisticsAccumulator::new();

for block in blocks.into_iter() {
let partial_acc = acc.begin(&block)?;
let schema = block.schema().to_arrow();
let location = gen_block_location();
let file_size =
block_writer::write_block(&schema, block, &data_accessor, &location).await?;
acc = partial_acc.end(file_size, location);
}

// summary and generate a segment
let summary = acc.summary(data_schema)?;
let seg = SegmentInfo {
blocks: acc.blocks_metas,
summary: Statistics {
row_count: acc.summary_row_count,
block_count: acc.summary_block_count,
uncompressed_byte_size: acc.in_memory_size,
compressed_byte_size: acc.file_size,
col_stats: summary,
},
};
segments.push(seg)
}
Ok(segments)
}

// A simple strategy of merging small blocks into larger ones:
// for each n successive data blocks in `blocks`, if the sum of their `memory_size` exceeds
// `block_size_threshold`, they will be merged into one larger block.
// NOTE:
// - the max size of merge-block will be 2 * block_size_threshold
// - for block that is larger than `block_size_threshold`, they will NOT be split
pub(crate) fn reshape_blocks(
mut blocks: Vec<DataBlock>,
block_size_threshold: usize,
) -> Result<Vec<DataBlock>> {
// sort by memory_size DESC
blocks.sort_unstable_by_key(|r| Reverse(r.memory_size()));

let mut result = vec![];

let mut block_size_acc = 0;
let mut block_acc = vec![];

for block in blocks {
block_size_acc += block.memory_size();
block_acc.push(block);
if block_size_acc >= block_size_threshold {
result.push(DataBlock::concat_blocks(&block_acc)?);
block_acc.clear();
block_size_acc = 0;
}
}

if !block_acc.is_empty() {
result.push(DataBlock::concat_blocks(&block_acc)?)
}

Ok(result)
}
}
Loading

0 comments on commit 7096d74

Please sign in to comment.