Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify path of write segment #16517

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/ee/src/storages/fuse/operations/vacuum_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use databend_common_storages_fuse::io::SnapshotsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::SegmentInfo;

use crate::storages::fuse::get_snapshot_referenced_segments;

Expand Down Expand Up @@ -200,7 +200,7 @@ pub async fn do_gc_orphan_files(
// 2.2 Delete all the orphan segment files to be purged
let purged_file_num = segment_locations_to_be_purged.len();
fuse_table
.try_purge_location_files_and_cache::<CompactSegmentInfo>(
.try_purge_location_files_and_cache::<SegmentInfo, _>(
ctx.clone(),
HashSet::from_iter(segment_locations_to_be_purged.into_iter()),
)
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_expression::SendableDataBlockStream;
use databend_common_sql::optimizer::SExpr;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::statistics::gen_columns_statistics;
use databend_common_storages_fuse::statistics::merge_statistics;
use databend_common_storages_fuse::statistics::reducers::reduce_block_metas;
Expand Down Expand Up @@ -132,9 +131,11 @@ pub async fn generate_segments(
let block_metas = generate_blocks(fuse_table, blocks_per_segment).await?;
let summary = reduce_block_metas(&block_metas, BlockThresholds::default(), None);
let segment_info = SegmentInfo::new(block_metas, summary);
let segment_writer = SegmentWriter::new(dal, fuse_table.meta_location_generator());
let segment_location = segment_writer.write_segment_no_cache(&segment_info).await?;
segs.push((segment_location, segment_info))
let location = fuse_table
.meta_location_generator()
.gen_segment_info_location();
segment_info.write_meta(dal, &location).await?;
segs.push(((location, SegmentInfo::VERSION), segment_info))
}
Ok(segs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::operations::ReclusterMode;
use databend_common_storages_fuse::operations::ReclusterMutator;
Expand Down Expand Up @@ -60,7 +60,6 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
let location_generator = TableMetaLocationGenerator::with_prefix("_prefix".to_owned());

let data_accessor = ctx.get_application_level_data_operator()?.operator();
let seg_writer = SegmentWriter::new(&data_accessor, &location_generator);

let cluster_key_id = 0;
let gen_test_seg = |cluster_stats: Option<ClusterStatistics>| async {
Expand Down Expand Up @@ -88,7 +87,11 @@ async fn test_recluster_mutator_block_select() -> Result<()> {
);

let segment = SegmentInfo::new(vec![test_block_meta], statistics);
Ok::<_, ErrorCode>((seg_writer.write_segment(segment).await?, location))
let segment_location = location_generator.gen_segment_info_location();
segment
.write_meta(&data_accessor, &segment_location)
.await?;
Ok::<_, ErrorCode>(((segment_location, SegmentInfo::VERSION), location))
};

let mut test_segment_locations = vec![];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use databend_common_storages_fuse::io::serialize_block;
use databend_common_storages_fuse::io::CompactSegmentInfoReader;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::MetaWriter;
use databend_common_storages_fuse::io::SegmentWriter;
use databend_common_storages_fuse::io::SegmentsIO;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::io::WriteSettings;
Expand Down Expand Up @@ -664,13 +663,13 @@ impl CompactSegmentTestFixture {
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_theads = self.ctx.get_settings().get_max_threads()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(
block_per_seg,
cluster_key_id,
max_theads,
&fuse_segment_io,
segment_writer.clone(),
data_accessor,
location_gen,
);

let rows_per_block = vec![1; num_block_of_segments.len()];
Expand Down Expand Up @@ -974,7 +973,6 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
settings.set_max_threads(2)?;
settings.set_max_storage_io_requests(4)?;

let segment_writer = SegmentWriter::new(&data_accessor, &location_gen);
let compact_segment_reader =
MetaReaders::segment_info_reader(data_accessor.clone(), schema.clone());
let fuse_segment_io = SegmentsIO::create(ctx.clone(), data_accessor.clone(), schema);
Expand Down Expand Up @@ -1027,7 +1025,8 @@ async fn test_compact_segment_with_cluster() -> Result<()> {
Some(cluster_key_id),
chunk_size,
&fuse_segment_io,
segment_writer.clone(),
&data_accessor,
&location_gen,
);
let state = seg_acc
.compact(locations, limit, |status| {
Expand Down
7 changes: 0 additions & 7 deletions src/query/storages/common/cache/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ pub trait CachedObject<T> {
fn cache() -> Option<Self::Cache>;
}

impl CachedObject<CompactSegmentInfo> for CompactSegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
CacheManager::instance().get_table_segment_cache()
}
}

impl CachedObject<CompactSegmentInfo> for SegmentInfo {
type Cache = CompactSegmentInfoCache;
fn cache() -> Option<Self::Cache> {
Expand Down
4 changes: 0 additions & 4 deletions src/query/storages/common/table_meta/src/meta/v4/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,6 @@ pub struct CompactSegmentInfo {
}

impl CompactSegmentInfo {
pub fn from_slice(bytes: &[u8]) -> Result<Self> {
Self::from_reader(Cursor::new(bytes))
}

pub fn from_reader(mut r: impl Read) -> Result<Self> {
let SegmentHeader {
version,
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,4 @@ pub use write::CachedMetaWriter;
pub use write::InvertedIndexBuilder;
pub use write::InvertedIndexWriter;
pub use write::MetaWriter;
pub use write::SegmentWriter;
pub use write::WriteSettings;
18 changes: 0 additions & 18 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::TableSchemaRef;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CacheManager;
use databend_storages_common_cache::LoadParams;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::Versioned;
use fastrace::func_path;
use fastrace::prelude::*;
use opendal::Operator;
Expand Down Expand Up @@ -116,19 +113,4 @@ impl SegmentsIO {
)
.await
}

#[async_backtrace::framed]
pub async fn write_segment(dal: Operator, serialized_segment: SerializedSegment) -> Result<()> {
assert_eq!(
serialized_segment.segment.format_version,
SegmentInfo::VERSION
);
let raw_bytes = serialized_segment.segment.to_bytes()?;
let compact_segment_info = CompactSegmentInfo::from_slice(&raw_bytes)?;
dal.write(&serialized_segment.path, raw_bytes).await?;
if let Some(segment_cache) = CacheManager::instance().get_table_segment_cache() {
segment_cache.insert(serialized_segment.path, compact_segment_info);
}
Ok(())
}
}
18 changes: 8 additions & 10 deletions src/query/storages/fuse/src/io/write/meta_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use databend_common_exception::Result;
use databend_storages_common_cache::CacheAccessor;
use databend_storages_common_cache::CachedObject;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;
use databend_storages_common_table_meta::meta::TableSnapshotStatistics;
Expand All @@ -24,8 +23,6 @@ use opendal::Operator;

#[async_trait::async_trait]
pub trait MetaWriter<T> {
/// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot`
/// We should not use `write_meta`. Instead, use `write_meta_data`
async fn write_meta(&self, data_accessor: &Operator, location: &str) -> Result<()>;
}

Expand All @@ -42,24 +39,25 @@ where T: Marshal + Sync + Send

#[async_trait::async_trait]
pub trait CachedMetaWriter<T> {
/// If meta has a `to_bytes` function, such as `SegmentInfo` and `TableSnapshot`
/// We should not use `write_meta_through_cache`. Instead, use `write_meta_data_through_cache`
async fn write_meta_through_cache(self, data_accessor: &Operator, location: &str)
-> Result<()>;
async fn write_meta_through_cache(
&self,
data_accessor: &Operator,
location: &str,
) -> Result<()>;
}

#[async_trait::async_trait]
impl CachedMetaWriter<SegmentInfo> for SegmentInfo {
#[async_backtrace::framed]
async fn write_meta_through_cache(
self,
&self,
data_accessor: &Operator,
location: &str,
) -> Result<()> {
let bytes = self.marshal()?;
data_accessor.write(location, bytes.clone()).await?;
if let Some(cache) = CompactSegmentInfo::cache() {
cache.insert(location.to_owned(), CompactSegmentInfo::try_from(&self)?);
if let Some(cache) = SegmentInfo::cache() {
cache.insert(location.to_owned(), self.try_into()?);
}
Ok(())
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/storages/fuse/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
mod block_writer;
mod inverted_index_writer;
mod meta_writer;
mod segment_writer;
mod write_settings;

pub(crate) use block_writer::create_inverted_index_builders;
Expand All @@ -33,5 +32,4 @@ pub(crate) use inverted_index_writer::create_tokenizer_manager;
pub use inverted_index_writer::InvertedIndexWriter;
pub use meta_writer::CachedMetaWriter;
pub use meta_writer::MetaWriter;
pub use segment_writer::SegmentWriter;
pub use write_settings::WriteSettings;
64 changes: 0 additions & 64 deletions src/query/storages/fuse/src/io/write/segment_writer.rs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use log::info;
use log::warn;
use opendal::Operator;

use crate::io::CachedMetaWriter;
use crate::io::SegmentsIO;
use crate::io::SerializedSegment;
use crate::io::TableMetaLocationGenerator;
use crate::operations::common::CommitMeta;
use crate::operations::common::ConflictResolveContext;
Expand Down Expand Up @@ -557,12 +557,8 @@ async fn write_segment(
}
// create new segment info
let new_segment = SegmentInfo::new(blocks, new_summary.clone());

// write the segment info.
let serialized_segment = SerializedSegment {
path: location.clone(),
segment: Arc::new(new_segment),
};
SegmentsIO::write_segment(dal, serialized_segment).await?;
new_segment
.write_meta_through_cache(&dal, &location)
.await?;
Ok((location, new_summary))
}
Loading
Loading