Skip to content

Commit

Permalink
feat: pass table options when opening/creating regions
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Feb 20, 2023
1 parent d6f4a43 commit 4650d5a
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 8 deletions.
18 changes: 15 additions & 3 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ impl<S: StorageEngine> MitoEngineInner<S> {
})?;
let opts = CreateOptions {
parent_dir: table_dir.clone(),
write_buffer_size: request
.table_options
.write_buffer_size
.map(|size| size.0 as usize),
ttl: request.table_options.ttl,
};

let region = self
Expand Down Expand Up @@ -449,14 +454,21 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let table_id = request.table_id;
let engine_ctx = StorageEngineContext::default();
let table_dir = table_dir(catalog_name, schema_name, table_id);
let opts = OpenOptions {
parent_dir: table_dir.to_string(),
};

let Some((manifest, table_info)) = self
.recover_table_manifest_and_info(table_name, &table_dir)
.await? else { return Ok(None) };

let opts = OpenOptions {
parent_dir: table_dir.to_string(),
write_buffer_size: table_info
.meta
.options
.write_buffer_size
.map(|s| s.0 as usize),
ttl: table_info.meta.options.ttl,
};

debug!(
"Opening table {}, table info recovered: {:?}",
table_id, table_info
Expand Down
18 changes: 14 additions & 4 deletions src/storage/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl<S: LogStore> EngineInner<S> {

let mut guard = SlotGuard::new(name, &self.regions);

let store_config = self.region_store_config(&opts.parent_dir, name);
let store_config = self.region_store_config(&opts.parent_dir, opts.write_buffer_size, name);

let region = match RegionImpl::open(name.to_string(), store_config, opts).await? {
None => return Ok(None),
Expand Down Expand Up @@ -319,7 +319,8 @@ impl<S: LogStore> EngineInner<S> {
.context(error::InvalidRegionDescSnafu {
region: &region_name,
})?;
let store_config = self.region_store_config(&opts.parent_dir, &region_name);
let store_config =
self.region_store_config(&opts.parent_dir, opts.write_buffer_size, &region_name);

let region = RegionImpl::create(metadata, store_config).await?;

Expand All @@ -335,21 +336,30 @@ impl<S: LogStore> EngineInner<S> {
slot.get_ready_region()
}

fn region_store_config(&self, parent_dir: &str, region_name: &str) -> StoreConfig<S> {
fn region_store_config(
&self,
parent_dir: &str,
write_buffer_size: Option<usize>,
region_name: &str,
) -> StoreConfig<S> {
let parent_dir = util::normalize_dir(parent_dir);

let sst_dir = &region_sst_dir(&parent_dir, region_name);
let sst_layer = Arc::new(FsAccessLayer::new(sst_dir, self.object_store.clone()));
let manifest_dir = region_manifest_dir(&parent_dir, region_name);
let manifest = RegionManifest::new(&manifest_dir, self.object_store.clone());

let flush_strategy = write_buffer_size
.map(|size| Arc::new(SizeBasedStrategy::new(size)) as Arc<_>)
.unwrap_or_else(|| self.flush_strategy.clone());

StoreConfig {
log_store: self.log_store.clone(),
sst_layer,
manifest,
memtable_builder: self.memtable_builder.clone(),
flush_scheduler: self.flush_scheduler.clone(),
flush_strategy: self.flush_strategy.clone(),
flush_strategy,
compaction_scheduler: self.compaction_scheduler.clone(),
engine_config: self.config.clone(),
file_purger: self.file_purger.clone(),
Expand Down
9 changes: 9 additions & 0 deletions src/storage/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ pub struct SizeBasedStrategy {
mutable_limitation: usize,
}

impl SizeBasedStrategy {
pub fn new(max_write_buffer_size: usize) -> Self {
Self {
max_write_buffer_size,
mutable_limitation: get_mutable_limitation(max_write_buffer_size),
}
}
}

#[inline]
fn get_mutable_limitation(max_write_buffer_size: usize) -> usize {
// Inspired by RocksDB
Expand Down
10 changes: 10 additions & 0 deletions src/store-api/src/storage/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds
//! chunks of rows, support operations like PUT/DELETE/SCAN.
use std::time::Duration;

use async_trait::async_trait;
use common_error::ext::ErrorExt;

Expand Down Expand Up @@ -82,11 +84,19 @@ pub struct EngineContext {}
pub struct CreateOptions {
/// Region parent directory
pub parent_dir: String,
/// Region memtable max size in bytes
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
}

/// Options to open a region.
#[derive(Debug, Clone, Default)]
pub struct OpenOptions {
/// Region parent directory
pub parent_dir: String,
/// Region memtable max size in bytes
pub write_buffer_size: Option<usize>,
/// Region SST files TTL
pub ttl: Option<Duration>,
}
2 changes: 1 addition & 1 deletion src/table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ tokio.workspace = true
parquet = { workspace = true, features = ["async"] }
tempdir = "0.3"
tokio-util = { version = "0.7", features = ["compat"] }
serde_json.workspace = true
serde_json.workspace = true

0 comments on commit 4650d5a

Please sign in to comment.