Skip to content

Commit

Permalink
refactor: unify path of write segment (#16517)
Browse files Browse the repository at this point in the history
* refactor: unify path of write segment

* fix missing header
  • Loading branch information
SkyFan2002 authored Sep 25, 2024
1 parent af0e9ba commit a402d6c
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 144 deletions.
4 changes: 2 additions & 2 deletions src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::SegmentInfo;

use crate::storages::fuse::get_snapshot_referenced_segments;

Expand Down Expand Up @@ -200,7 +200,7 @@ pub async fn do_gc_orphan_files(
// 2.2 Delete all the orphan segment files to be purged
let purged_file_num = segment_locations_to_be_purged.len();
fuse_table
.try_purge_location_files_and_cache::<CompactSegmentInfo>(
.try_purge_location_files_and_cache::<SegmentInfo, _>(
ctx.clone(),
HashSet::from_iter(segment_locations_to_be_purged.into_iter()),
)
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_expression::SendableDataBlockStream;
use databend_common_sql::optimizer::SExpr;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::statistics::gen_columns_statistics;
use databend_common_storages_fuse::statistics::merge_statistics;
use databend_common_storages_fuse::statistics::reducers::reduce_block_metas;
Expand Down Expand Up @@ -132,9 +131,11 @@ pub async fn generate_segments(
let block_metas = generate_blocks(fuse_table, blocks_per_segment).await?;
let summary = reduce_block_metas(&block_metas, BlockThresholds::default(), None);
let segment_info = SegmentInfo::new(block_metas, summary);
let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator());
let segment_location = segment_writer.write_segment_no_cache(&segment_info).await?;
segs.push((segment_location, segment_info))
let location = fuse_table
.meta_location_generator()
.gen_segment_info_location();
segment_info.write_meta(dal, &location).await?;
segs.push(((location, SegmentInfo::VERSION), segment_info))
}
Ok(segs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::operations::ReclusterMode;
use databend_common_storages_fuse::operations::ReclusterMutator;
Expand Down Expand Up @@ -60,7 +60,6 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
let location_generator = TableMetaLocationGenerator::with_prefix("_prefix".to_owned());

let data_accessor = ctx.get_application_level_data_operator()?.operator();
let seg_writer = SegmentWriter::new(&data_accessor, &location_generator);

let cluster_key_id = 0;
let gen_test_seg = |cluster_stats: Option<ClusterStatistics>| async {
Expand Down Expand Up @@ -88,7 +87,11 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
);

let segment = SegmentInfo::new(vec![test_block_meta], statistics);
Ok::<_, ErrorCode>((seg_writer.write_segment(segment).await?, location))
let segment_location = location_generator.gen_segment_info_location();
segment
.write_meta(&data_accessor, &segment_location)
.await?;
Ok::<_, ErrorCode>(((segment_location, SegmentInfo::VERSION), location))
};

let mut test_segment_locations = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use databend_common_storages_fuse::io::serialize_block;
use databend_common_storages_fuse::io::CompactSegmentInfoReader;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::io::SegmentsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::io::WriteSettings;
Expand Down Expand Up @@ -664,13 +663,13 @@ impl CompactSegmentTestFixture {
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_theads = self.ctx.get_settings().get_max_threads()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(
block_per_seg,
cluster_key_id,
max_theads,
&fuse_segment_io,
segment_writer.clone(),
data_accessor,
location_gen,
);

let rows_per_block = vec![1; num_block_of_segments.len()];
Expand Down Expand Up @@ -974,7 +973,6 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
settings.set_max_threads(2)?;
settings.set_max_storage_io_requests(4)?;

let segment_writer = SegmentWriter::new(&data_accessor, &location_gen);
let compact_segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), schema.clone());
let fuse_segment_io = SegmentsIO::create(ctx.clone(), data_accessor.clone(), schema);
Expand Down Expand Up @@ -1027,7 +1025,8 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
Some(cluster_key_id),
chunk_size,
&fuse_segment_io,
segment_writer.clone(),
&data_accessor,
&location_gen,
);
let state = seg_acc
.compact(locations, limit, |status| {
Expand Down
7 changes: 0 additions & 7 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ pub trait CachedObject<T> {
fn cache() -> Option<Self::Cache>;
}

impl CachedObject<CompactSegmentInfo> for CompactSegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
CacheManager::instance().get_table_segment_cache()
}
}

impl CachedObject<CompactSegmentInfo> for SegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
Expand Down
4 changes: 0 additions & 4 deletions src/query/storages/common/table_meta/src/meta/v4/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ pub struct CompactSegmentInfo {
}

impl CompactSegmentInfo {
pub fn from_slice(bytes: &[u8]) -> Result<Self> {
Self::from_reader(Cursor::new(bytes))
}

pub fn from_reader(mut r: impl Read) -> Result<Self> {
let SegmentHeader {
version,
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ pub use write::CachedMetaWriter;
pub use write::InvertedIndexBuilder;
pub use write::InvertedIndexWriter;
pub use write::MetaWriter;
pub use write::SegmentWriter;
pub use write::WriteSettings;
18 changes: 0 additions & 18 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchemaRef;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Versioned;
use fastrace::func_path;
use fastrace::prelude::*;
use opendal::Operator;
Expand Down Expand Up @@ -116,19 +113,4 @@ impl SegmentsIO {
)
.await
}

#[async_backtrace::framed]
pub async fn write_segment(dal: Operator, serialized_segment: SerializedSegment) -> Result<()> {
assert_eq!(
serialized_segment.segment.format_version,
SegmentInfo::VERSION
);
let raw_bytes = serialized_segment.segment.to_bytes()?;
let compact_segment_info = CompactSegmentInfo::from_slice(&raw_bytes)?;
dal.write(&serialized_segment.path, raw_bytes).await?;
if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() {
segment_cache.insert(serialized_segment.path, compact_segment_info);
}
Ok(())
}
}
18 changes: 8 additions & 10 deletions src/query/storages/fuse/src/io/write/meta_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use databend_common_exception::Result;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CachedObject;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
Expand All @@ -24,8 +23,6 @@ use opendal::Operator;

#[async_trait::async_trait]
pub trait MetaWriter<T> {
/// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot`
/// We should not use `write_meta`. Instead, use `write_meta_data`
async fn write_meta(&self, data_accessor: &Operator, location: &str) -> Result<()>;
}

Expand All @@ -42,24 +39,25 @@ where T: Marshal + Sync + Send

#[async_trait::async_trait]
pub trait CachedMetaWriter<T> {
/// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot`
/// We should not use `write_meta_through_cache`. Instead, use `write_meta_data_through_cache`
async fn write_meta_through_cache(self, data_accessor: &Operator, location: &str)
-> Result<()>;
async fn write_meta_through_cache(
&self,
data_accessor: &Operator,
location: &str,
) -> Result<()>;
}

#[async_trait::async_trait]
impl CachedMetaWriter<SegmentInfo> for SegmentInfo {
#[async_backtrace::framed]
async fn write_meta_through_cache(
self,
&self,
data_accessor: &Operator,
location: &str,
) -> Result<()> {
let bytes = self.marshal()?;
data_accessor.write(location, bytes.clone()).await?;
if let Some(cache) = CompactSegmentInfo::cache() {
cache.insert(location.to_owned(), CompactSegmentInfo::try_from(&self)?);
if let Some(cache) = SegmentInfo::cache() {
cache.insert(location.to_owned(), self.try_into()?);
}
Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/storages/fuse/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod block_writer;
mod inverted_index_writer;
mod meta_writer;
mod segment_writer;
mod write_settings;

pub(crate) use block_writer::create_inverted_index_builders;
Expand All @@ -33,5 +32,4 @@ pub(crate) use inverted_index_writer::create_tokenizer_manager;
pub use inverted_index_writer::InvertedIndexWriter;
pub use meta_writer::CachedMetaWriter;
pub use meta_writer::MetaWriter;
pub use segment_writer::SegmentWriter;
pub use write_settings::WriteSettings;
64 changes: 0 additions & 64 deletions src/query/storages/fuse/src/io/write/segment_writer.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use log::info;
use log::warn;
use opendal::Operator;

use crate::io::CachedMetaWriter;
use crate::io::SegmentsIO;
use crate::io::SerializedSegment;
use crate::io::TableMetaLocationGenerator;
use crate::operations::common::CommitMeta;
use crate::operations::common::ConflictResolveContext;
Expand Down Expand Up @@ -557,12 +557,8 @@ async fn write_segment(
}
// create new segment info
let new_segment = SegmentInfo::new(blocks, new_summary.clone());

// write the segment info.
let serialized_segment = SerializedSegment {
path: location.clone(),
segment: Arc::new(new_segment),
};
SegmentsIO::write_segment(dal, serialized_segment).await?;
new_segment
.write_meta_through_cache(&dal, &location)
.await?;
Ok((location, new_summary))
}
Loading

0 comments on commit a402d6c

Please sign in to comment.