Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build sst in stream way #747

Merged
merged 11 commits into from
Mar 22, 2023
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "32.0.0" }
paste = "1.0"
pin-project-lite = "0.2.8"
profile = { path = "components/profile" }
prometheus = "0.12"
prometheus-static-metric = "0.5"
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,6 @@ xorfilter-rs = { workspace = true }
common_types = { workspace = true, features = ["test"] }
common_util = { workspace = true, features = ["test"] }
env_logger = { workspace = true }
pin-project-lite = { workspace = true }
rand = { workspace = true }
wal = { workspace = true, features = ["test"] }
20 changes: 19 additions & 1 deletion analytic_engine/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::{
instance::{
flush_compaction::TableFlushOptions, write_worker::CompactionNotifier, Instance, SpaceStore,
},
sst::factory::SstWriteOptions,
table::data::TableDataRef,
TableOptions,
};
Expand Down Expand Up @@ -287,6 +288,7 @@ impl SchedulerImpl {
space_store: Arc<SpaceStore>,
runtime: Arc<Runtime>,
config: SchedulerConfig,
write_sst_max_buffer_size: usize,
) -> Self {
let (tx, rx) = mpsc::channel(config.schedule_channel_len);
let running = Arc::new(AtomicBool::new(true));
Expand All @@ -300,6 +302,7 @@ impl SchedulerImpl {
picker_manager: PickerManager::default(),
max_ongoing_tasks: config.max_ongoing_tasks,
max_unflushed_duration: config.max_unflushed_duration.0,
write_sst_max_buffer_size,
limit: Arc::new(OngoingTaskLimit {
ongoing_tasks: AtomicUsize::new(0),
request_buf: RwLock::new(RequestQueue::default()),
Expand Down Expand Up @@ -367,6 +370,7 @@ struct ScheduleWorker {
max_unflushed_duration: Duration,
picker_manager: PickerManager,
max_ongoing_tasks: usize,
write_sst_max_buffer_size: usize,
limit: Arc<OngoingTaskLimit>,
running: Arc<AtomicBool>,
memory_limit: MemoryLimit,
Expand Down Expand Up @@ -462,13 +466,27 @@ impl ScheduleWorker {

let sender = self.sender.clone();
let request_id = RequestId::next_id();
let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

// Do actual costly compact job in background.
self.runtime.spawn(async move {
// Release the token after compaction finished.
let _token = token;

let res = space_store
.compact_table(runtime, &table_data, request_id, &compaction_task)
.compact_table(
runtime,
&table_data,
request_id,
&compaction_task,
&sst_write_options,
)
.await;

if let Err(e) = &res {
Expand Down
15 changes: 7 additions & 8 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ impl Instance {
storage_format_hint: table_data.table_options().storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};

for time_range in &time_ranges {
Expand Down Expand Up @@ -674,6 +675,7 @@ impl Instance {
storage_format_hint,
num_rows_per_row_group: table_data.table_options().num_rows_per_row_group,
compression: table_data.table_options().compression,
max_buffer_size: self.write_sst_max_buffer_size,
};
let mut writer = self
.space_store
Expand Down Expand Up @@ -729,6 +731,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
task: &CompactionTask,
sst_write_options: &SstWriteOptions,
) -> Result<()> {
debug!(
"Begin compact table, table_name:{}, id:{}, task:{:?}",
Expand Down Expand Up @@ -766,6 +769,7 @@ impl SpaceStore {
table_data,
request_id,
input,
sst_write_options,
&mut edit_meta,
)
.await?;
Expand Down Expand Up @@ -796,6 +800,7 @@ impl SpaceStore {
table_data: &TableData,
request_id: RequestId,
input: &CompactionInputFiles,
sst_write_options: &SstWriteOptions,
edit_meta: &mut VersionEditMeta,
) -> Result<()> {
debug!(
Expand Down Expand Up @@ -907,18 +912,12 @@ impl SpaceStore {
let file_id = table_data.alloc_file_id();
let sst_file_path = table_data.set_sst_file_path(file_id);

let storage_format_hint = table_data.table_options().storage_format_hint;
let sst_write_options = SstWriteOptions {
storage_format_hint,
num_rows_per_row_group: table_options.num_rows_per_row_group,
compression: table_options.compression,
};
let mut sst_writer = self
.sst_factory
.create_writer(&sst_write_options, &sst_file_path, self.store_picker())
.create_writer(sst_write_options, &sst_file_path, self.store_picker())
.await
.context(CreateSstWriter {
storage_format_hint,
storage_format_hint: sst_write_options.storage_format_hint,
})?;

let sst_info = sst_writer
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ pub struct Instance {
pub(crate) space_write_buffer_size: usize,
/// Replay wal batch size
pub(crate) replay_batch_size: usize,
/// Write sst max buffer size
pub(crate) write_sst_max_buffer_size: usize,
/// Options for scanning sst
pub(crate) iter_options: IterOptions,
pub(crate) remote_engine: Option<RemoteEngineRef>,
Expand Down
2 changes: 2 additions & 0 deletions analytic_engine/src/instance/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl Instance {
space_store.clone(),
bg_runtime.clone(),
scheduler_config,
ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
));

let file_purger = FilePurger::start(&bg_runtime, store_picker.default_store().clone());
Expand All @@ -92,6 +93,7 @@ impl Instance {
db_write_buffer_size: ctx.config.db_write_buffer_size,
space_write_buffer_size: ctx.config.space_write_buffer_size,
replay_batch_size: ctx.config.replay_batch_size,
write_sst_max_buffer_size: ctx.config.write_sst_max_buffer_size.as_bytes() as usize,
iter_options,
remote_engine: remote_engine_ref,
});
Expand Down
5 changes: 4 additions & 1 deletion analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod table_options;
#[cfg(any(test, feature = "test"))]
pub mod tests;

use common_util::config::ReadableDuration;
use common_util::config::{ReadableDuration, ReadableSize};
use manifest::details::Options as ManifestOptions;
use message_queue::kafka::config::Config as KafkaConfig;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct Config {
pub scan_batch_size: usize,
/// Sst background reading parallelism
pub sst_background_read_parallelism: usize,
/// Max buffer size for writing sst
pub write_sst_max_buffer_size: ReadableSize,

/// Wal storage config
///
Expand Down Expand Up @@ -108,6 +110,7 @@ impl Default for Config {
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
write_sst_max_buffer_size: ReadableSize::mb(10),
wal: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
}
Expand Down
1 change: 1 addition & 0 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct SstWriteOptions {
pub storage_format_hint: StorageFormatHint,
pub num_rows_per_row_group: usize,
pub compression: Compression,
pub max_buffer_size: usize,
}

#[derive(Debug, Default)]
Expand Down
13 changes: 7 additions & 6 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use std::{

use lru::LruCache;
use parquet::file::metadata::FileMetaData;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{OptionExt, ResultExt};

use crate::sst::{
meta_data::{
DecodeCustomMetaData, EmptyCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result,
},
meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result},
parquet::encoding,
};

Expand Down Expand Up @@ -41,11 +39,14 @@ impl MetaData {
let kv_metas = file_meta_data
.key_value_metadata()
.context(KvMetaDataNotFound)?;
ensure!(!kv_metas.is_empty(), EmptyCustomMetaData);
let kv_meta = kv_metas
.iter()
.find(|kv| kv.key == encoding::META_KEY)
.context(KvMetaDataNotFound)?;

let custom = {
let mut sst_meta =
encoding::decode_sst_meta_data(&kv_metas[0]).context(DecodeCustomMetaData)?;
encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?;
if ignore_sst_filter {
sst_meta.parquet_filter = None;
}
Expand Down
Loading