Skip to content

Commit

Permalink
feat(compaction): limit memory usage for compaction read (#4590)
Browse files Browse the repository at this point in the history
* limit memory

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* tmp commit

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* separate compaction

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* remove data cache

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix format

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix license

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix sstable

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* do not compress

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix memory

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix test

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix format

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* fix conflict

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

* add debug log

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
Little-Wallace and mergify[bot] authored Aug 17, 2022
1 parent 0d920b4 commit 8883b35
Show file tree
Hide file tree
Showing 26 changed files with 981 additions and 527 deletions.
30 changes: 22 additions & 8 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ use risingwave_pb::task_service::task_service_server::TaskServiceServer;
use risingwave_rpc_client::{ExtraInfoSourceRef, MetaClient};
use risingwave_source::monitor::SourceMetrics;
use risingwave_source::MemSourceManager;
use risingwave_storage::hummock::compactor::{CompactionExecutor, Compactor, CompactorContext};
use risingwave_storage::hummock::compactor::{
CompactionExecutor, Compactor, CompactorContext, Context,
};
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::MemoryLimiter;
use risingwave_storage::hummock::{CompactorSstableStore, MemoryLimiter};
use risingwave_storage::monitor::{
monitor_cache, HummockMetrics, ObjectStoreMetrics, StateStoreMetrics,
};
Expand Down Expand Up @@ -142,33 +144,45 @@ pub async fn compute_node_serve(
let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let StateStoreImpl::HummockStateStore(storage) = &state_store {
extra_info_sources.push(storage.sstable_id_manager());
let memory_limiter = Arc::new(MemoryLimiter::new(
storage_config.compactor_memory_limit_mb as u64 * 1024 * 1024,
));
// Note: we treat `hummock+memory-shared` as a shared storage, so we won't start the
// compactor along with compute node.
if opts.state_store == "hummock+memory"
|| opts.state_store.starts_with("hummock+disk")
|| storage_config.disable_remote_compactor
{
tracing::info!("start embedded compactor");
let read_memory_limiter = Arc::new(MemoryLimiter::new(
storage_config.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2,
));
// todo: set shutdown_sender in HummockStorage.
let compactor_context = Arc::new(CompactorContext {
let write_memory_limit =
storage_config.compactor_memory_limit_mb as u64 * 1024 * 1024 / 2;
let context = Arc::new(Context {
options: storage_config,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store: storage.sstable_store(),
stats: state_store_metrics.clone(),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
filter_key_extractor_manager: filter_key_extractor_manager.clone(),
memory_limiter: memory_limiter.clone(),
read_memory_limiter,
sstable_id_manager: storage.sstable_id_manager(),
});
// TODO: use normal sstable store for single-process mode.
let compact_sstable_store = CompactorSstableStore::new(
storage.sstable_store(),
Arc::new(MemoryLimiter::new(write_memory_limit)),
);
let compactor_context = Arc::new(CompactorContext {
context,
sstable_store: Arc::new(compact_sstable_store),
});

let (handle, shutdown_sender) =
Compactor::start_compactor(compactor_context, hummock_meta_client, 1);
sub_tasks.push((handle, shutdown_sender));
}
monitor_cache(storage.sstable_store(), memory_limiter, &registry).unwrap();
monitor_cache(storage.sstable_store(), &registry).unwrap();
}

sub_tasks.push(MetaClient::start_heartbeat_loop(
Expand Down
2 changes: 1 addition & 1 deletion src/config/risingwave.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bloom_false_positive = 0.01
data_directory = "hummock_001"
block_cache_capacity_mb = 4096
meta_cache_capacity_mb = 1024
compactor_memory_limit_mb = 8096
compactor_memory_limit_mb = 5120

[storage.file_cache]
capacity = 1073741824 # 1 GiB
Expand Down
44 changes: 25 additions & 19 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ use risingwave_hummock_sdk::key_range::KeyRange;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::SstableInfo;
use risingwave_storage::hummock::compactor::{Compactor, DummyCompactionFilter};
use risingwave_storage::hummock::compactor::{
Compactor, ConcatSstableIterator, DummyCompactionFilter,
};
use risingwave_storage::hummock::iterator::{
ConcatIterator, ConcatSstableIterator, Forward, HummockIterator, HummockIteratorUnion,
MultiSstIterator, UnorderedMergeIteratorInner,
ConcatIterator, Forward, HummockIterator, HummockIteratorUnion, MultiSstIterator,
UnorderedMergeIteratorInner,
};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::sstable::SstableIteratorReadOptions;
use risingwave_storage::hummock::sstable_store::SstableStoreRef;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompressionAlgorithm, HummockResult, MemoryLimiter, SstableBuilder,
SstableBuilderOptions, SstableIterator, SstableMeta, SstableStore, TieredCache,
CachePolicy, CompactorSstableStore, CompressionAlgorithm, HummockResult, MemoryLimiter,
SstableBuilder, SstableBuilderOptions, SstableIterator, SstableMeta, SstableStore,
SstableStoreWrite, TieredCache,
};
use risingwave_storage::monitor::{StateStoreMetrics, StoreLocalStatistic};

Expand Down Expand Up @@ -154,7 +157,10 @@ impl TableBuilderFactory for LocalTableBuilderFactory {
}
}

async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store: SstableStoreRef) {
async fn compact<I: HummockIterator<Direction = Forward>>(
iter: I,
sstable_store: Arc<CompactorSstableStore>,
) {
let global_table_id = AtomicU64::new(32);
let mut builder = CapacitySplitTableBuilder::new(
LocalTableBuilderFactory {
Expand All @@ -175,7 +181,7 @@ async fn compact<I: HummockIterator<Direction = Forward>>(iter: I, sstable_store

Compactor::compact_and_build_sst(
&mut builder,
KeyRange::inf(),
&KeyRange::inf(),
iter,
false,
0,
Expand Down Expand Up @@ -254,26 +260,26 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) {
)),
];
let iter = MultiSstIterator::new(sub_iters, stats.clone());
async move { compact(iter, sstable_store1).await }
let sstable_store = Arc::new(CompactorSstableStore::new(
sstable_store1,
MemoryLimiter::unlimit(),
));
async move { compact(iter, sstable_store).await }
});
});
let sstable_store = Arc::new(CompactorSstableStore::new(
sstable_store.clone(),
MemoryLimiter::unlimit(),
));
c.bench_function("bench_merge_iterator", |b| {
let stats = Arc::new(StateStoreMetrics::unused());
b.to_async(FuturesExecutor).iter(|| {
let sstable_store1 = sstable_store.clone();
let sub_iters = vec![
ConcatSstableIterator::new(
level1.clone(),
sstable_store.clone(),
read_options.clone(),
),
ConcatSstableIterator::new(
level2.clone(),
sstable_store.clone(),
read_options.clone(),
),
ConcatSstableIterator::new(level1.clone(), KeyRange::inf(), sstable_store.clone()),
ConcatSstableIterator::new(level2.clone(), KeyRange::inf(), sstable_store.clone()),
];
let iter = UnorderedMergeIteratorInner::new(sub_iters, stats.clone());
let sstable_store1 = sstable_store.clone();
async move { compact(iter, sstable_store1).await }
});
});
Expand Down
39 changes: 24 additions & 15 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::common::WorkerType;
use risingwave_pb::hummock::compactor_service_server::CompactorServiceServer;
use risingwave_rpc_client::MetaClient;
use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext};
use risingwave_storage::hummock::compactor::{CompactionExecutor, CompactorContext, Context};
use risingwave_storage::hummock::hummock_meta_client::MonitoredHummockMetaClient;
use risingwave_storage::hummock::{MemoryLimiter, SstableIdManager, SstableStore};
use risingwave_storage::hummock::{
CompactorMemoryCollector, CompactorSstableStore, MemoryLimiter, SstableIdManager, SstableStore,
};
use risingwave_storage::monitor::{
monitor_cache, HummockMetrics, ObjectStoreMetrics, StateStoreMetrics,
};
Expand All @@ -45,7 +47,7 @@ pub async fn compactor_serve(
client_addr: HostAddr,
opts: CompactorOpts,
) -> (JoinHandle<()>, JoinHandle<()>, Sender<()>) {
let mut config = {
let config = {
if opts.config_path.is_empty() {
CompactorConfig::default()
} else {
Expand Down Expand Up @@ -80,9 +82,6 @@ pub async fn compactor_serve(

// use half of limit because any memory which would hold in meta-cache will be allocate by
// limited at first.
// TODO: replace meta-cache with memory limiter.
config.storage.meta_cache_capacity_mb = config.storage.compactor_memory_limit_mb / 2;

let storage_config = Arc::new(config.storage);
let state_store_stats = Arc::new(StateStoreMetrics::new(registry.clone()));
let object_store = Arc::new(
Expand All @@ -97,7 +96,7 @@ pub async fn compactor_serve(
let sstable_store = Arc::new(SstableStore::for_compactor(
object_store,
storage_config.data_directory.to_string(),
storage_config.block_cache_capacity_mb * (1 << 20),
1 << 20, // set 1MB memory to avoid panic.
storage_config.meta_cache_capacity_mb * (1 << 20),
));

Expand All @@ -113,29 +112,39 @@ pub async fn compactor_serve(
.await;

let observer_join_handle = observer_manager.start().await.unwrap();
let memory_limiter = Arc::new(MemoryLimiter::new(
(storage_config.compactor_memory_limit_mb as u64) << 20,
let output_limit_mb = storage_config.compactor_memory_limit_mb as u64 / 2;
let memory_limiter = Arc::new(MemoryLimiter::new(output_limit_mb << 20));
let input_limit_mb = storage_config.compactor_memory_limit_mb as u64 / 2;
let compact_sstable_store = Arc::new(CompactorSstableStore::new(
sstable_store.clone(),
Arc::new(MemoryLimiter::new(input_limit_mb << 20)),
));
monitor_cache(sstable_store.clone(), memory_limiter.clone(), &registry).unwrap();
let memory_collector = Arc::new(CompactorMemoryCollector::new(
memory_limiter.clone(),
compact_sstable_store.clone(),
));
monitor_cache(memory_collector, &registry).unwrap();
let sstable_id_manager = Arc::new(SstableIdManager::new(
hummock_meta_client.clone(),
storage_config.sstable_id_remote_fetch_number,
));

let compactor_context = Arc::new(CompactorContext {
let context = Arc::new(Context {
options: storage_config,
hummock_meta_client: hummock_meta_client.clone(),
sstable_store,
sstable_store: sstable_store.clone(),
stats: state_store_stats,
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
)),
filter_key_extractor_manager: filter_key_extractor_manager.clone(),
memory_limiter,
read_memory_limiter: memory_limiter,
sstable_id_manager: sstable_id_manager.clone(),
});

let compactor_context = Arc::new(CompactorContext {
context,
sstable_store: compact_sstable_store,
});
let sub_tasks = vec![
MetaClient::start_heartbeat_loop(
meta_client.clone(),
Expand Down
19 changes: 15 additions & 4 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ mod tests {
use risingwave_pb::hummock::pin_version_response::Payload;
use risingwave_pb::hummock::{HummockVersion, TableOption};
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::hummock::compactor::{CompactionExecutor, Compactor, CompactorContext};
use risingwave_storage::hummock::compactor::{
CompactionExecutor, Compactor, CompactorContext, Context,
};
use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store;
use risingwave_storage::hummock::{HummockStorage, MemoryLimiter, SstableIdManager};
use risingwave_storage::hummock::{
CompactorSstableStore, HummockStorage, MemoryLimiter, SstableIdManager,
};
use risingwave_storage::monitor::{StateStoreMetrics, StoreLocalStatistic};
use risingwave_storage::storage_value::StorageValue;
use risingwave_storage::store::{ReadOptions, WriteOptions};
Expand Down Expand Up @@ -144,19 +148,26 @@ mod tests {
hummock_meta_client: &Arc<dyn HummockMetaClient>,
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
) -> CompactorContext {
CompactorContext {
let context = Arc::new(Context {
options: storage.options().clone(),
sstable_store: storage.sstable_store(),
hummock_meta_client: hummock_meta_client.clone(),
stats: Arc::new(StateStoreMetrics::unused()),
is_share_buffer_compact: false,
compaction_executor: Arc::new(CompactionExecutor::new(Some(1))),
read_memory_limiter: MemoryLimiter::unlimit(),
filter_key_extractor_manager,
memory_limiter: Arc::new(MemoryLimiter::new(1024 * 1024 * 128)),
sstable_id_manager: Arc::new(SstableIdManager::new(
hummock_meta_client.clone(),
storage.options().sstable_id_remote_fetch_number,
)),
});
CompactorContext {
sstable_store: Arc::new(CompactorSstableStore::new(
context.sstable_store.clone(),
context.read_memory_limiter.clone(),
)),
context,
}
}

Expand Down
Loading

0 comments on commit 8883b35

Please sign in to comment.