Skip to content

Commit

Permalink
[apache#1407] fix(rust): drop events and release memory when errors h…
Browse files Browse the repository at this point in the history
…appened
  • Loading branch information
zuston committed Feb 6, 2024
1 parent 4dacb1e commit e7ccae3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 15 deletions.
3 changes: 3 additions & 0 deletions rust/experimental/server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pub enum WorkerError {

#[error("Data should be read from hdfs in client side instead of from server side")]
NOT_READ_HDFS_DATA_FROM_SERVER,

#[error("Spill event has been retried exceed the max limit for app: {0}")]
SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(String),
}

impl From<AcquireError> for WorkerError {
Expand Down
12 changes: 12 additions & 0 deletions rust/experimental/server/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,19 @@ pub static GAUGE_IN_SPILL_DATA_SIZE: Lazy<IntGauge> =
pub static GAUGE_GRPC_REQUEST_QUEUE_SIZE: Lazy<IntGauge> =
Lazy::new(|| IntGauge::new("grpc_request_queue_size", "grpc request queue size").unwrap());

pub static TOTAL_SPILL_EVENTS_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
IntCounter::new(
"total_spill_events_dropped",
"total spill events dropped number",
)
.expect("")
});

fn register_custom_metrics() {
REGISTRY
.register(Box::new(TOTAL_SPILL_EVENTS_DROPPED.clone()))
.expect("");

REGISTRY
.register(Box::new(GAUGE_TOPN_APP_RESIDENT_DATA_SIZE.clone()))
.expect("");
Expand Down
48 changes: 33 additions & 15 deletions rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::metric::{
GAUGE_IN_SPILL_DATA_SIZE, GAUGE_MEMORY_SPILL_OPERATION, GAUGE_MEMORY_SPILL_TO_HDFS,
GAUGE_MEMORY_SPILL_TO_LOCALFILE, TOTAL_MEMORY_SPILL_OPERATION,
TOTAL_MEMORY_SPILL_OPERATION_FAILED, TOTAL_MEMORY_SPILL_TO_HDFS,
TOTAL_MEMORY_SPILL_TO_LOCALFILE,
TOTAL_MEMORY_SPILL_TO_LOCALFILE, TOTAL_SPILL_EVENTS_DROPPED,
};
use crate::readable_size::ReadableSize;
#[cfg(feature = "hdfs")]
Expand All @@ -41,7 +41,7 @@ use crate::store::{
use anyhow::{anyhow, Result};

use async_trait::async_trait;
use log::{debug, error, info};
use log::{debug, error, info, warn};
use prometheus::core::{Atomic, AtomicU64};
use std::any::Any;

Expand Down Expand Up @@ -177,10 +177,13 @@ impl HybridStore {
spill_message: SpillMessage,
) -> Result<String, WorkerError> {
let mut ctx: WritingViewContext = spill_message.ctx;
let in_flight_blocks_id: i64 = spill_message.id;
let retry_cnt = spill_message.retry_cnt;

let uid = ctx.uid.clone();
if retry_cnt > 3 {
let app_id = ctx.uid.app_id;
return Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(app_id));
}

let blocks = &ctx.data_blocks;
let mut spill_size = 0i64;
for block in blocks {
Expand Down Expand Up @@ -254,11 +257,6 @@ impl HybridStore {
}
}

self.hot_store
.release_in_flight_blocks_in_underlying_staging_buffer(uid, in_flight_blocks_id)
.await?;
self.hot_store.free_used(spill_size).await?;

match candidate_store.name().await {
StorageType::LOCALFILE => {
GAUGE_MEMORY_SPILL_TO_LOCALFILE.dec();
Expand Down Expand Up @@ -317,6 +315,17 @@ impl HybridStore {

Ok(())
}

async fn release_data_in_memory(&self, data_size: u64, message: &SpillMessage) -> Result<()> {
let uid = &message.ctx.uid;
let in_flight_id = message.id;
self.hot_store
.release_in_flight_blocks_in_underlying_staging_buffer(uid.clone(), in_flight_id)
.await?;
self.hot_store.free_used(data_size as i64).await?;
self.hot_store.desc_to_in_flight_buffer_size(data_size);
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -360,7 +369,7 @@ impl Store for HybridStore {

TOTAL_MEMORY_SPILL_OPERATION.inc();
GAUGE_MEMORY_SPILL_OPERATION.inc();
let store_cloned = store.clone();
let store_ref = store.clone();
store
.runtime_manager
.write_runtime
Expand All @@ -371,14 +380,23 @@ impl Store for HybridStore {
}

GAUGE_IN_SPILL_DATA_SIZE.add(size as i64);
match store_cloned
match store_ref
.memory_spill_to_persistent_store(message.clone())
.instrument_await("memory_spill_to_persistent_store.")
.await
{
Ok(msg) => {
store_cloned.hot_store.desc_to_in_flight_buffer_size(size);
debug!("{}", msg)
debug!("{}", msg);
if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
error!("Errors on releasing memory data, that should not happen. err: {:#?}", err);
}
}
Err(WorkerError::SPILL_EVENT_EXCEED_RETRY_MAX_LIMIT(_)) | Err(WorkerError::PARTIAL_DATA_LOST(_)) => {
warn!("Dropping the spill event for app: {:?}. Attention: this will make data lost!", message.ctx.uid.app_id);
if let Err(err) = store_ref.release_data_in_memory(size, &message).await {
error!("Errors on releasing memory data when dropping the spill event, that should not happen. err: {:#?}", err);
}
TOTAL_SPILL_EVENTS_DROPPED.inc();
}
Err(error) => {
TOTAL_MEMORY_SPILL_OPERATION_FAILED.inc();
Expand All @@ -389,10 +407,10 @@ impl Store for HybridStore {

message.retry_cnt = message.retry_cnt + 1;
// re-push to the queue to execute
let _ = store_cloned.memory_spill_send.send(message).await;
let _ = store_ref.memory_spill_send.send(message).await;
}
}
store_cloned.memory_spill_event_num.dec_by(1);
store_ref.memory_spill_event_num.dec_by(1);
GAUGE_IN_SPILL_DATA_SIZE.sub(size as i64);
GAUGE_MEMORY_SPILL_OPERATION.dec();
drop(concurrency_guarder);
Expand Down

0 comments on commit e7ccae3

Please sign in to comment.