Skip to content

Commit

Permalink
drafting spin lock
Browse files Browse the repository at this point in the history
  • Loading branch information
zuston committed Dec 29, 2023
1 parent 85ba845 commit a659133
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 21 deletions.
12 changes: 11 additions & 1 deletion rust/experimental/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/experimental/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ signal-hook = "0.3.17"
clap = "3.0.14"
socket2 = { version="0.4", features = ["all"]}
cap = "0.1.2"
spin = "0.9.8"

[dependencies.hdrs]
version = "0.3.0"
Expand Down
9 changes: 5 additions & 4 deletions rust/experimental/server/src/store/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ use std::collections::VecDeque;
use await_tree::InstrumentAwait;
use std::str::FromStr;
use std::sync::Arc;
use spin::mutex::Mutex;

use crate::runtime::manager::RuntimeManager;
use tokio::sync::{Mutex, Semaphore};
use tokio::sync::Semaphore;

trait PersistentStore: Store + Persistent + Send + Sync {}
impl PersistentStore for LocalFileStore {}
Expand Down Expand Up @@ -399,7 +400,7 @@ impl Store for HybridStore {
let buffer = self
.hot_store
.get_or_create_underlying_staging_buffer(uid.clone());
let mut buffer_inner = buffer.lock().await;
let mut buffer_inner = buffer.lock();
if size.as_bytes() < buffer_inner.get_staging_size()? as u64 {
let (in_flight_uid, blocks) = buffer_inner.migrate_staging_to_in_flight()?;
self.make_memory_buffer_flush(in_flight_uid, blocks, uid.clone())
Expand All @@ -410,7 +411,7 @@ impl Store for HybridStore {

// if the used size exceed the ratio of high watermark,
// then send watermark flush trigger
if let Ok(_lock) = self.memory_spill_lock.try_lock() {
if let Some(_lock) = self.memory_spill_lock.try_lock() {
let used_ratio = self.hot_store.memory_used_ratio().await;
if used_ratio > self.config.memory_spill_high_watermark {
if let Err(e) = self.memory_watermark_flush_trigger_sender.send(()).await {
Expand Down Expand Up @@ -503,7 +504,7 @@ pub async fn watermark_flush(store: Arc<HybridStore>) -> Result<()> {

let mut flushed_size = 0u64;
for (partition_id, buffer) in buffers {
let mut buffer_inner = buffer.lock().await;
let mut buffer_inner = buffer.lock();
let (in_flight_uid, blocks) = buffer_inner.migrate_staging_to_in_flight()?;
drop(buffer_inner);
for block in &blocks {
Expand Down
25 changes: 9 additions & 16 deletions rust/experimental/server/src/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;

use crate::store::mem::ticket::TicketManager;
use crate::store::mem::InstrumentAwait;
use croaring::Treemap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use spin::mutex::Mutex;

pub struct MemoryStore {
// todo: change to RW lock
Expand Down Expand Up @@ -162,7 +161,7 @@ impl MemoryStore {

let buffers = self.state.clone().into_read_only();
for buffer in buffers.iter() {
let staging_size = buffer.1.lock().await.staging_size;
let staging_size = buffer.1.lock().staging_size;
let valset = sorted_tree_map
.entry(staging_size)
.or_insert_with(|| vec![]);
Expand Down Expand Up @@ -193,7 +192,7 @@ impl MemoryStore {

pub async fn get_partitioned_buffer_size(&self, uid: &PartitionedUId) -> Result<u64> {
let buffer = self.get_underlying_partition_buffer(uid);
let buffer = buffer.lock().await;
let buffer = buffer.lock();
Ok(buffer.total_size as u64)
}

Expand All @@ -207,7 +206,7 @@ impl MemoryStore {
in_flight_blocks_id: i64,
) -> Result<()> {
let buffer = self.get_or_create_underlying_staging_buffer(uid);
let mut buffer_ref = buffer.lock().await;
let mut buffer_ref = buffer.lock();
buffer_ref.flight_finished(&in_flight_blocks_id)?;
Ok(())
}
Expand Down Expand Up @@ -258,10 +257,7 @@ impl Store for MemoryStore {
async fn insert(&self, ctx: WritingViewContext) -> Result<(), WorkerError> {
let uid = ctx.uid;
let buffer = self.get_or_create_underlying_staging_buffer(uid.clone());
let mut buffer_guarded = buffer
.lock()
.instrument_await("trying buffer lock to insert")
.await;
let mut buffer_guarded = buffer.lock();

let blocks = ctx.data_blocks;
let inserted_size = buffer_guarded.add(blocks)?;
Expand All @@ -277,10 +273,7 @@ impl Store for MemoryStore {
async fn get(&self, ctx: ReadingViewContext) -> Result<ResponseData, WorkerError> {
let uid = ctx.uid;
let buffer = self.get_or_create_underlying_staging_buffer(uid);
let buffer = buffer
.lock()
.instrument_await("getting partitioned buffer lock")
.await;
let buffer = buffer.lock();

let options = ctx.reading_options;
let (fetched_blocks, length) = match options {
Expand Down Expand Up @@ -419,7 +412,7 @@ impl Store for MemoryStore {
let mut used = 0;
for removed_pid in _removed_list {
if let Some(entry) = self.state.remove(removed_pid) {
used += entry.1.lock().await.total_size;
used += entry.1.lock().total_size;
}
}

Expand Down Expand Up @@ -768,7 +761,7 @@ mod test {

// case4: some data are in inflight blocks
let buffer = store.get_or_create_underlying_staging_buffer(uid.clone());
let mut buffer = runtime.wait(buffer.lock());
let mut buffer = buffer.lock();
let owned = buffer.staging.to_owned();
buffer.staging.clear();
let mut idx = 0;
Expand Down Expand Up @@ -806,7 +799,7 @@ mod test {
// case5: old data in in_flight and latest data in staging.
// read it from the block id 9, and read size of 30
let buffer = store.get_or_create_underlying_staging_buffer(uid.clone());
let mut buffer = runtime.wait(buffer.lock());
let mut buffer = buffer.lock();
buffer.staging.push(PartitionedDataBlock {
block_id: 20,
length: 10,
Expand Down

0 comments on commit a659133

Please sign in to comment.